package events import ( "net" "time" "git.quimbo.fr/odwrtw/canape/backend/models" "github.com/gorilla/websocket" "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" ) // Channel represents the channel of the user and the server type Channel struct { log *logrus.Entry // Channel where the eventer will write messages to serverEventStream chan ServerEvent // Channel where the eventer will write errors to serverErrorStream chan ServerError // Done channel when we need to stop the connection done chan struct{} // Underlying ws connection conn *websocket.Conn // A channel is directly linked to a user User *models.User `json:"user"` // List of events the user is listening to Events map[string]struct{} `json:"events"` ID string `json:"id"` db *sqlx.DB closed bool } // go routine writing events to the websocket connection func (c *Channel) writer() { // Create the ping timer that will ping the client every pingWait seconds // to check that he's still listening pingTicker := time.NewTicker(pingWait) defer pingTicker.Stop() // If we exit, close the connection to tell the reader to stop defer c.conn.Close() // Write loop for { select { case <-pingTicker.C: _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { c.log.Warnf("error writing message: %s", err) return } case e := <-c.serverEventStream: _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteJSON(e); err != nil { c.log.Warnf("error writing JSON message: %s", err) return } case err := <-c.serverErrorStream: _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteJSON(err); err != nil { c.log.Warnf("error writing JSON error: %s", err) return } case <-c.done: c.log.Debug("all done finished") return } } } // go routine reading messages from the websocket connection func (c *Channel) reader() { // Read loop _ = c.conn.SetReadDeadline(time.Now().Add(pongWait)) for { msg := ClientMessage{} // Read the client's message if err := c.conn.ReadJSON(&msg); err != nil { switch e := err.(type) { case *websocket.CloseError: c.log.Info("close error") case net.Error: if e.Timeout() { c.log.WithField( "error", err, ).Warn("timeout") } else { c.log.WithField( "error", err, ).Warn("unknown net error") } default: c.log.WithField( "error", err, ).Warn("unknown error reading message") } // We got an error, we close all the subscribed events and stop the // writer goroutine c.close() return } e, ok := Eventers[msg.Message] if !ok { c.log.Warnf("no such event to subscribe %q", msg.Message) continue } switch msg.Type { case "subscribe": c.log.Debugf("subscribe to %s", msg.Message) if _, ok := c.Events[e.Name]; ok { c.log.Infof("user %s is already subscribed to %s", c.User.Name, e.Name) continue } if err := e.Subscribe(c); err != nil { c.Error(e.Name, err) continue } c.Events[e.Name] = struct{}{} case "unsubscribe": c.log.Debugf("unsubscribe from %s", msg.Message) if _, ok := c.Events[e.Name]; !ok { c.log.Infof("user %s is not subscribed to %s", c.User.Name, e.Name) continue } e.Unsubscribe(c) default: c.log.Warnf("invalid type: %s", msg.Type) } } } func (c *Channel) close() { // The channel is now closed c.closed = true // Unsubscribe from all Events for eventName := range c.Events { Eventers[eventName].Unsubscribe(c) } // Tell the writer to stop c.done <- struct{}{} } // Error sends an error into the errorStream channel func (c *Channel) Error(name string, err error) { if c.closed { return } c.log.WithField("name", name).Warn(err) c.serverErrorStream <- ServerError{ Event: Event{ Type: name, Status: Error, }, ErrorEvent: ErrorEvent{ Level: WarningError, Message: err.Error(), }, } } // FatalError sends an error into the errorStream channel func (c *Channel) FatalError(name string, err error) { if c.closed { return } c.log.WithField("name", name).Warn(err) c.serverErrorStream <- ServerError{ Event: Event{ Type: name, Status: Error, }, ErrorEvent: ErrorEvent{ Level: FatalError, Message: err.Error(), }, } } // SendEvent sends an event to the serverEvent channel func (c *Channel) sendEvent(event ServerEvent) { if c.closed { return } c.serverEventStream <- event } // Unsubscribe unsubscribes a user from all eventers func Unsubscribe(u *models.User) { chanl := &Channel{User: u} for _, event := range Eventers { event.Unsubscribe(chanl) } }