First version

This commit is contained in:
Nicolas Duhamel 2022-10-28 20:33:16 +02:00
commit e2ef561b61
3 changed files with 201 additions and 0 deletions

10
go.mod Normal file
View File

@ -0,0 +1,10 @@
module git.quimbo.fr/nicolas/mqtt
go 1.19
require (
github.com/eclipse/paho.mqtt.golang v1.4.2 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
)

12
go.sum Normal file
View File

@ -0,0 +1,12 @@
github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4=
github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

179
mqtt.go Normal file
View File

@ -0,0 +1,179 @@
package mqtt
import (
"context"
"time"
paho "github.com/eclipse/paho.mqtt.golang"
)
// copy the paho lib Message interface
// so no need for deps to import paho lib
type Message interface {
Duplicate() bool
Qos() byte
Retained() bool
Topic() string
MessageID() uint16
Payload() []byte
Ack()
}
type Config struct {
Host string
Port string
Username string
Password string
ClientID string
CleanSession bool
AutoReconnect bool
Retained bool
KeepAlive time.Duration
MsgChanDept uint
}
// return &Config{
// Host: host,
// Port: port,
// Username: username,
// Password: password,
// ClientID: clientId,
// CleanSession: true,
// AutoReconnect: true,
// Retained: false,
// KeepAlive: 15 * time.Second,
// MsgChanDept: 100,
// }
// }
type Client struct {
Config Config
pahoClient paho.Client
}
func New(conf Config) *Client {
opts := paho.NewClientOptions()
broker := conf.Host + ":" + conf.Port
opts.AddBroker(broker)
opts.SetClientID(conf.ClientID)
opts.SetUsername(conf.Username)
opts.SetPassword(conf.Password)
opts.SetCleanSession(conf.CleanSession)
opts.SetAutoReconnect(conf.AutoReconnect)
opts.SetKeepAlive(conf.KeepAlive)
opts.SetMessageChannelDepth(conf.MsgChanDept)
if conf.AutoReconnect {
opts.SetResumeSubs(true)
}
client := paho.NewClient(opts)
return &Client{
Config: conf,
pahoClient: client,
}
}
func (c *Client) Connect(ctx context.Context) error {
if token := c.pahoClient.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
go func(ctx context.Context) {
//TODO remove the select, only wait on signal chanel
for {
select {
case <-ctx.Done():
c.pahoClient.Disconnect(1000)
return
}
}
}(ctx)
return nil
}
type Subscriber struct {
OnMessage chan Message
OnError chan error
}
func (c *Client) Subscribe(ctx context.Context, topic string, qos int) *Subscriber {
sub := &Subscriber{
OnMessage: make(chan Message),
OnError: make(chan error),
}
go func(ctx context.Context, sub *Subscriber) {
defer func() {
close(sub.OnMessage)
close(sub.OnError)
}()
if token := c.pahoClient.Subscribe(topic, byte(qos), func(mqttClient paho.Client, message paho.Message) {
sub.OnMessage <- message
}); token.Wait() && token.Error() != nil {
sub.OnError <- token.Error()
}
<-ctx.Done()
}(ctx, sub)
return sub
}
type frame struct {
topic string
qos int
payload []byte
retain bool
}
type Publisher struct {
OnError chan error
publish chan *frame
}
// Publisher will create MQTT publisher and private listener for messages to be published.
// All messages to be published are sent through private publish channel.
// Errors will be sent to OnError channel.
func (c *Client) Publisher(ctx context.Context) *Publisher {
pub := &Publisher{
OnError: make(chan error),
publish: make(chan *frame),
}
go func(ctx context.Context, pub *Publisher) {
defer close(pub.OnError)
for {
select {
case fr := <-pub.publish:
if token := c.pahoClient.
Publish(fr.topic,
byte(fr.qos),
fr.retain,
fr.payload); token.Wait() && token.Error() != nil {
pub.OnError <- token.Error()
}
case <-ctx.Done():
return
}
}
}(ctx, pub)
return pub
}
// Publish message to topic through private pub.publish channel.
// Thread-safe.
func (pub *Publisher) Publish(topic string, qos int, message []byte, retain bool) {
pub.publish <- &frame{
topic: topic,
qos: qos,
payload: message,
retain: retain,
}
}