commit e2ef561b61242c262d3591fc40d7a6de779de211 Author: Nicolas Duhamel Date: Fri Oct 28 20:33:16 2022 +0200 First version diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..5929b8e --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module git.quimbo.fr/nicolas/mqtt + +go 1.19 + +require ( + github.com/eclipse/paho.mqtt.golang v1.4.2 // indirect + github.com/gorilla/websocket v1.4.2 // indirect + golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..ddd8ce6 --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= +github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/mqtt.go b/mqtt.go new file mode 100644 index 0000000..19c427a --- /dev/null +++ b/mqtt.go @@ -0,0 +1,179 @@ +package mqtt + +import ( + "context" + "time" + + paho "github.com/eclipse/paho.mqtt.golang" +) + +// copy the paho lib Message interface +// so no need for deps to import paho lib +type Message interface { + Duplicate() bool + Qos() byte + Retained() bool + Topic() string + MessageID() uint16 + Payload() []byte + Ack() +} + +type Config struct { + Host string + Port string + Username string + Password string + ClientID string + CleanSession bool + AutoReconnect bool + Retained bool + KeepAlive time.Duration + MsgChanDept uint +} + +// return &Config{ +// Host: host, +// Port: port, +// Username: username, +// Password: password, +// ClientID: clientId, +// CleanSession: true, +// AutoReconnect: true, +// Retained: false, +// KeepAlive: 15 * time.Second, +// MsgChanDept: 100, +// } +// } + +type Client struct { + Config Config + pahoClient paho.Client +} + +func New(conf Config) *Client { + opts := paho.NewClientOptions() + + broker := conf.Host + ":" + conf.Port + + opts.AddBroker(broker) + opts.SetClientID(conf.ClientID) + opts.SetUsername(conf.Username) + opts.SetPassword(conf.Password) + opts.SetCleanSession(conf.CleanSession) + opts.SetAutoReconnect(conf.AutoReconnect) + opts.SetKeepAlive(conf.KeepAlive) + opts.SetMessageChannelDepth(conf.MsgChanDept) + if conf.AutoReconnect { + opts.SetResumeSubs(true) + } + + client := paho.NewClient(opts) + + return &Client{ + Config: conf, + pahoClient: client, + } +} + +func (c *Client) Connect(ctx context.Context) error { + if token := c.pahoClient.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + go func(ctx context.Context) { + //TODO remove the select, only wait on signal chanel + for { + select { + case <-ctx.Done(): + c.pahoClient.Disconnect(1000) + return + } + } + }(ctx) + + return nil +} + +type Subscriber struct { + OnMessage chan Message + OnError chan error +} + +func (c *Client) Subscribe(ctx context.Context, topic string, qos int) *Subscriber { + sub := &Subscriber{ + OnMessage: make(chan Message), + OnError: make(chan error), + } + + go func(ctx context.Context, sub *Subscriber) { + defer func() { + close(sub.OnMessage) + close(sub.OnError) + }() + + if token := c.pahoClient.Subscribe(topic, byte(qos), func(mqttClient paho.Client, message paho.Message) { + sub.OnMessage <- message + }); token.Wait() && token.Error() != nil { + sub.OnError <- token.Error() + } + + <-ctx.Done() + }(ctx, sub) + + return sub +} + +type frame struct { + topic string + qos int + payload []byte + retain bool +} + +type Publisher struct { + OnError chan error + publish chan *frame +} + +// Publisher will create MQTT publisher and private listener for messages to be published. +// All messages to be published are sent through private publish channel. +// Errors will be sent to OnError channel. +func (c *Client) Publisher(ctx context.Context) *Publisher { + pub := &Publisher{ + OnError: make(chan error), + publish: make(chan *frame), + } + + go func(ctx context.Context, pub *Publisher) { + defer close(pub.OnError) + + for { + select { + case fr := <-pub.publish: + if token := c.pahoClient. + Publish(fr.topic, + byte(fr.qos), + fr.retain, + fr.payload); token.Wait() && token.Error() != nil { + pub.OnError <- token.Error() + } + case <-ctx.Done(): + return + } + } + }(ctx, pub) + + return pub +} + +// Publish message to topic through private pub.publish channel. +// Thread-safe. +func (pub *Publisher) Publish(topic string, qos int, message []byte, retain bool) { + pub.publish <- &frame{ + topic: topic, + qos: qos, + payload: message, + retain: retain, + } +}