package events import ( "net" "time" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" ) type channel struct { log *logrus.Entry events map[string]Eventer // Channel where the eventer will write messages to serverEventStream chan ServerEvent // Channel where the eventer will write errors to serverErrorStream chan ServerError // Done channel when we need to stop the connection done chan struct{} // Underlying ws connection conn *websocket.Conn } // go routine writing events to the websocket connection func (c *channel) writer() { // Create the ping timer that will ping the client every pingWait seconds // to check that he's still listening pingTicker := time.NewTicker(pingWait) defer pingTicker.Stop() // If we exit, close the connection to tell the reader to stop defer c.conn.Close() // Write loop for { select { case <-pingTicker.C: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { c.log.Warnf("error writing message: %s", err) return } case e := <-c.serverEventStream: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteJSON(e); err != nil { c.log.Warnf("error writing JSON message: %s", err) return } case err := <-c.serverErrorStream: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteJSON(err); err != nil { c.log.Warnf("error writing JSON error: %s", err) return } case <-c.done: c.log.Debug("all done finished") return } } } // go routine reading messages from the websocket connection func (c *channel) reader() { // Read loop c.conn.SetReadDeadline(time.Now().Add(pongWait)) for { msg := ClientMessage{} // Read the client's message if err := c.conn.ReadJSON(&msg); err != nil { switch e := err.(type) { case *websocket.CloseError: c.log.Info("close error") case net.Error: if e.Timeout() { c.log.WithField( "error", err, ).Warn("timeout") } else { c.log.WithField( "error", err, ).Warn("unknown net error") } default: c.log.WithField( "error", err, ).Warn("unknown error reading message") } // We got an error, we close all the subscribed events and stop the // writer goroutine c.close() return } e, ok := c.events[msg.Message] if !ok { c.log.Warnf("no such event to subscribe %q", msg.Message) continue } switch msg.Type { case "subscribe": c.log.Debugf("subscribe to %s", msg.Message) e.Subscribe(e.Launch) case "unsubscribe": c.log.Debugf("unsubscribe from %s", msg.Message) e.Unsubscribe() default: c.log.Warnf("invalid type: %s", msg.Type) } } } func (c *channel) close() { // Unsubscribe from all events for _, e := range c.events { e.Unsubscribe() } // Tell the writer to stop c.done <- struct{}{} }