canape/backend/events/channel.go
Lucas BEE 2bd90e5cb5 events: Add an error channel
The server now sends errors to the client
2019-05-27 12:18:05 +00:00

123 lines
2.8 KiB
Go

package events
import (
"net"
"time"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)
type channel struct {
log *logrus.Entry
events map[string]Eventer
// Channel where the eventer will write messages to
serverEventStream chan ServerEvent
// Channel where the eventer will write errors to
serverErrorStream chan ServerError
// Done channel when we need to stop the connection
done chan struct{}
// Underlying ws connection
conn *websocket.Conn
}
// go routine writing events to the websocket connection
func (c *channel) writer() {
// Create the ping timer that will ping the client every pingWait seconds
// to check that he's still listening
pingTicker := time.NewTicker(pingWait)
defer pingTicker.Stop()
// If we exit, close the connection to tell the reader to stop
defer c.conn.Close()
// Write loop
for {
select {
case <-pingTicker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.log.Warnf("error writing message: %s", err)
return
}
case e := <-c.serverEventStream:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteJSON(e); err != nil {
c.log.Warnf("error writing JSON message: %s", err)
return
}
case err := <-c.serverErrorStream:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteJSON(err); err != nil {
c.log.Warnf("error writing JSON error: %s", err)
return
}
case <-c.done:
c.log.Debug("all done finished")
return
}
}
}
// go routine reading messages from the websocket connection
func (c *channel) reader() {
// Read loop
c.conn.SetReadDeadline(time.Now().Add(pongWait))
for {
msg := ClientMessage{}
// Read the client's message
if err := c.conn.ReadJSON(&msg); err != nil {
switch e := err.(type) {
case *websocket.CloseError:
c.log.Info("close error")
case net.Error:
if e.Timeout() {
c.log.WithField(
"error", err,
).Warn("timeout")
} else {
c.log.WithField(
"error", err,
).Warn("unknown net error")
}
default:
c.log.WithField(
"error", err,
).Warn("unknown error reading message")
}
// We got an error, we close all the subscribed events and stop the
// writer goroutine
c.close()
return
}
e, ok := c.events[msg.Message]
if !ok {
c.log.Warnf("no such event to subscribe %q", msg.Message)
continue
}
switch msg.Type {
case "subscribe":
c.log.Debugf("subscribe to %s", msg.Message)
e.Subscribe(e.Launch)
case "unsubscribe":
c.log.Debugf("unsubscribe from %s", msg.Message)
e.Unsubscribe()
default:
c.log.Warnf("invalid type: %s", msg.Type)
}
}
}
func (c *channel) close() {
// Unsubscribe from all events
for _, e := range c.events {
e.Unsubscribe()
}
// Tell the writer to stop
c.done <- struct{}{}
}