We now close the Channels when its connection is closed so that we never try to send events into a dead channel Add a debug handler showing who is subscribed to what
200 lines
4.6 KiB
Go
200 lines
4.6 KiB
Go
package events
|
|
|
|
import (
|
|
"net"
|
|
"time"
|
|
|
|
"git.quimbo.fr/odwrtw/canape/backend/models"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// Channel represents the channel of the user and the server
|
|
type Channel struct {
|
|
log *logrus.Entry
|
|
// Channel where the eventer will write messages to
|
|
serverEventStream chan ServerEvent
|
|
// Channel where the eventer will write errors to
|
|
serverErrorStream chan ServerError
|
|
// Done channel when we need to stop the connection
|
|
done chan struct{}
|
|
// Underlying ws connection
|
|
conn *websocket.Conn
|
|
// A channel is directly linked to a user
|
|
User *models.User `json:"user"`
|
|
// List of events the user is listening to
|
|
Events map[string]struct{} `json:"events"`
|
|
ID string `json:"id"`
|
|
db *sqlx.DB
|
|
closed bool
|
|
}
|
|
|
|
// go routine writing events to the websocket connection
|
|
func (c *Channel) writer() {
|
|
// Create the ping timer that will ping the client every pingWait seconds
|
|
// to check that he's still listening
|
|
pingTicker := time.NewTicker(pingWait)
|
|
defer pingTicker.Stop()
|
|
|
|
// If we exit, close the connection to tell the reader to stop
|
|
defer c.conn.Close()
|
|
|
|
// Write loop
|
|
for {
|
|
select {
|
|
case <-pingTicker.C:
|
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
|
c.log.Warnf("error writing message: %s", err)
|
|
return
|
|
}
|
|
case e := <-c.serverEventStream:
|
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
if err := c.conn.WriteJSON(e); err != nil {
|
|
c.log.Warnf("error writing JSON message: %s", err)
|
|
return
|
|
}
|
|
case err := <-c.serverErrorStream:
|
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
if err := c.conn.WriteJSON(err); err != nil {
|
|
c.log.Warnf("error writing JSON error: %s", err)
|
|
return
|
|
}
|
|
case <-c.done:
|
|
c.log.Debug("all done finished")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// go routine reading messages from the websocket connection
|
|
func (c *Channel) reader() {
|
|
// Read loop
|
|
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
for {
|
|
msg := ClientMessage{}
|
|
|
|
// Read the client's message
|
|
if err := c.conn.ReadJSON(&msg); err != nil {
|
|
switch e := err.(type) {
|
|
case *websocket.CloseError:
|
|
c.log.Info("close error")
|
|
case net.Error:
|
|
if e.Timeout() {
|
|
c.log.WithField(
|
|
"error", err,
|
|
).Warn("timeout")
|
|
} else {
|
|
c.log.WithField(
|
|
"error", err,
|
|
).Warn("unknown net error")
|
|
}
|
|
default:
|
|
c.log.WithField(
|
|
"error", err,
|
|
).Warn("unknown error reading message")
|
|
}
|
|
|
|
// We got an error, we close all the subscribed events and stop the
|
|
// writer goroutine
|
|
c.close()
|
|
return
|
|
}
|
|
|
|
e, ok := Eventers[msg.Message]
|
|
if !ok {
|
|
c.log.Warnf("no such event to subscribe %q", msg.Message)
|
|
continue
|
|
}
|
|
|
|
switch msg.Type {
|
|
case "subscribe":
|
|
c.log.Debugf("subscribe to %s", msg.Message)
|
|
if _, ok := c.Events[e.Name]; ok {
|
|
c.log.Infof("user %s is already subscribed to %s", c.User.Name, e.Name)
|
|
continue
|
|
}
|
|
if err := e.Subscribe(c); err != nil {
|
|
c.Error(e.Name, err)
|
|
continue
|
|
}
|
|
c.Events[e.Name] = struct{}{}
|
|
case "unsubscribe":
|
|
c.log.Debugf("unsubscribe from %s", msg.Message)
|
|
if _, ok := c.Events[e.Name]; !ok {
|
|
c.log.Infof("user %s is not subscribed to %s", c.User.Name, e.Name)
|
|
continue
|
|
}
|
|
e.Unsubscribe(c)
|
|
default:
|
|
c.log.Warnf("invalid type: %s", msg.Type)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Channel) close() {
|
|
// The channel is now closed
|
|
c.closed = true
|
|
|
|
// Unsubscribe from all Events
|
|
for eventName := range c.Events {
|
|
Eventers[eventName].Unsubscribe(c)
|
|
}
|
|
|
|
// Tell the writer to stop
|
|
c.done <- struct{}{}
|
|
}
|
|
|
|
// Error sends an error into the errorStream channel
|
|
func (c *Channel) Error(name string, err error) {
|
|
if c.closed {
|
|
return
|
|
}
|
|
c.log.WithField("name", name).Warn(err)
|
|
c.serverErrorStream <- ServerError{
|
|
Event: Event{
|
|
Type: name,
|
|
Status: Error,
|
|
},
|
|
ErrorEvent: ErrorEvent{
|
|
Level: WarningError,
|
|
Message: err.Error(),
|
|
},
|
|
}
|
|
}
|
|
|
|
// FatalError sends an error into the errorStream channel
|
|
func (c *Channel) FatalError(name string, err error) {
|
|
if c.closed {
|
|
return
|
|
}
|
|
c.log.WithField("name", name).Warn(err)
|
|
c.serverErrorStream <- ServerError{
|
|
Event: Event{
|
|
Type: name,
|
|
Status: Error,
|
|
},
|
|
ErrorEvent: ErrorEvent{
|
|
Level: FatalError,
|
|
Message: err.Error(),
|
|
},
|
|
}
|
|
}
|
|
|
|
// SendEvent sends an event to the serverEvent channel
|
|
func (c *Channel) sendEvent(event ServerEvent) {
|
|
if c.closed {
|
|
return
|
|
}
|
|
c.serverEventStream <- event
|
|
}
|
|
|
|
// Unsubscribe unsubscribes a user from all eventers
|
|
func Unsubscribe(u *models.User) {
|
|
chanl := &Channel{User: u}
|
|
for _, event := range Eventers {
|
|
event.Unsubscribe(chanl)
|
|
}
|
|
}
|