diff --git a/backend/events/channel.go b/backend/events/channel.go new file mode 100644 index 0000000..be15a9e --- /dev/null +++ b/backend/events/channel.go @@ -0,0 +1,122 @@ +package events + +import ( + "net" + "time" + + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" +) + +type channel struct { + log *logrus.Entry + events map[string]Eventer + // Channel where the eventer will write messages to + serverEventStream chan ServerEvent + // Channel where the eventer will write errors to + serverErrorStream chan ServerError + // Done channel when we need to stop the connection + done chan struct{} + // Underlying ws connection + conn *websocket.Conn +} + +// go routine writing events to the websocket connection +func (c *channel) writer() { + // Create the ping timer that will ping the client every pingWait seconds + // to check that he's still listening + pingTicker := time.NewTicker(pingWait) + defer pingTicker.Stop() + + // If we exit, close the connection to tell the reader to stop + defer c.conn.Close() + + // Write loop + for { + select { + case <-pingTicker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + c.log.Warnf("error writing message: %s", err) + return + } + case e := <-c.serverEventStream: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteJSON(e); err != nil { + c.log.Warnf("error writing JSON message: %s", err) + return + } + case err := <-c.serverErrorStream: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteJSON(err); err != nil { + c.log.Warnf("error writing JSON error: %s", err) + return + } + case <-c.done: + c.log.Debug("all done finished") + return + } + } +} + +// go routine reading messages from the websocket connection +func (c *channel) reader() { + // Read loop + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + for { + msg := ClientMessage{} + + // Read the client's message + if err := c.conn.ReadJSON(&msg); err != nil { + switch e := err.(type) { + case *websocket.CloseError: + c.log.Info("close error") + case net.Error: + if e.Timeout() { + c.log.WithField( + "error", err, + ).Warn("timeout") + } else { + c.log.WithField( + "error", err, + ).Warn("unknown net error") + } + default: + c.log.WithField( + "error", err, + ).Warn("unknown error reading message") + } + + // We got an error, we close all the subscribed events and stop the + // writer goroutine + c.close() + return + } + + e, ok := c.events[msg.Message] + if !ok { + c.log.Warnf("no such event to subscribe %q", msg.Message) + continue + } + + switch msg.Type { + case "subscribe": + c.log.Debugf("subscribe to %s", msg.Message) + e.Subscribe(e.Launch) + case "unsubscribe": + c.log.Debugf("unsubscribe from %s", msg.Message) + e.Unsubscribe() + default: + c.log.Warnf("invalid type: %s", msg.Type) + } + } +} + +func (c *channel) close() { + // Unsubscribe from all events + for _, e := range c.events { + e.Unsubscribe() + } + // Tell the writer to stop + c.done <- struct{}{} +} diff --git a/backend/events/handlers.go b/backend/events/handlers.go index b92a052..7ef55fb 100644 --- a/backend/events/handlers.go +++ b/backend/events/handlers.go @@ -2,7 +2,6 @@ package events import ( "errors" - "net" "net/http" "time" @@ -10,7 +9,6 @@ import ( "git.quimbo.fr/odwrtw/canape/backend/users" "git.quimbo.fr/odwrtw/canape/backend/web" "github.com/gorilla/websocket" - "github.com/sirupsen/logrus" ) const ( @@ -53,12 +51,14 @@ func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { return nil }) - // Channel where the eventers will write to + // Channel where the eventers will write events to serverEventStream := make(chan ServerEvent) + // Channel where the eventers will write errors to + serverErrorStream := make(chan ServerError) events := map[string]Eventer{ "torrents": &TorrentNotifier{ - Notifier: NewNotifier(serverEventStream), + Notifier: NewNotifier(serverEventStream, serverErrorStream), user: user, log: env.Log, }, @@ -66,6 +66,7 @@ func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { c := channel{ serverEventStream: serverEventStream, + serverErrorStream: serverErrorStream, done: make(chan struct{}, 1), conn: ws, log: env.Log, @@ -79,106 +80,3 @@ func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { return nil } - -type channel struct { - log *logrus.Entry - events map[string]Eventer - // Channel where the eventer will write to - serverEventStream chan ServerEvent - // Done channel when we need to stop the connection - done chan struct{} - // Underlying ws connection - conn *websocket.Conn -} - -func (c *channel) writer() { - // Create the ping timer that will ping the client every pingWait seconds - // to check that he's still listening - pingTicker := time.NewTicker(pingWait) - defer pingTicker.Stop() - - // If we exit, close the connection to tell the reader to stop - defer c.conn.Close() - - // Write loop - for { - select { - case <-pingTicker.C: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) - if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { - c.log.Warnf("error writing message: %s", err) - return - } - case e := <-c.serverEventStream: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) - if err := c.conn.WriteJSON(e); err != nil { - c.log.Warnf("error writing JSON message: %s", err) - return - } - case <-c.done: - c.log.Debug("all done finished") - return - } - } -} - -func (c *channel) reader() { - // Read loop - c.conn.SetReadDeadline(time.Now().Add(pongWait)) - for { - msg := ClientMessage{} - - // Read the client's message - if err := c.conn.ReadJSON(&msg); err != nil { - switch e := err.(type) { - case *websocket.CloseError: - c.log.Info("close error") - case net.Error: - if e.Timeout() { - c.log.WithField( - "error", err, - ).Warn("timeout") - } else { - c.log.WithField( - "error", err, - ).Warn("unknown net error") - } - default: - c.log.WithField( - "error", err, - ).Warn("unknown error reading message") - } - - // We got an error, we close all the subscribed events and stop the - // writer goroutine - c.close() - return - } - - e, ok := c.events[msg.Message] - if !ok { - c.log.Warnf("no such event to subscribe %q", msg.Message) - continue - } - - switch msg.Type { - case "subscribe": - c.log.Debugf("subscribe to %s", msg.Message) - e.Subscribe(e.Launch) - case "unsubscribe": - c.log.Debugf("unsubscribe from %s", msg.Message) - e.Unsubscribe() - default: - c.log.Warnf("invalid type: %s", msg.Type) - } - } -} - -func (c *channel) close() { - // Unsubscribe from all events - for _, e := range c.events { - e.Unsubscribe() - } - // Tell the writer to stop - c.done <- struct{}{} -} diff --git a/backend/events/notifier.go b/backend/events/notifier.go index c517489..a7d9f38 100644 --- a/backend/events/notifier.go +++ b/backend/events/notifier.go @@ -4,23 +4,58 @@ import ( "sync" ) +// ErrorLevel is the level of the ServerError +type ErrorLevel string + +// Status is the status of an event +type Status string + +// Different ErrorLevels and statuses +const ( + // ErrorLevel + WarningError ErrorLevel = "warning" + FatalError ErrorLevel = "fatal" + // Statuses + OK Status = "ok" + Error Status = "error" +) + +// Event is the base of a message +type Event struct { + Type string `json:"type"` + Status Status `json:"status"` +} + // ClientMessage represents a message sent by the client type ClientMessage struct { - Type string `json:"type"` + Event Message string `json:"message"` } // ServerEvent represents an event sent to the client type ServerEvent struct { - Type string `json:"type"` + Event Data interface{} `json:"message"` } +// ErrorEvent represents an event error +type ErrorEvent struct { + Level ErrorLevel `json:"level"` + Message string `json:"message"` +} + +// ServerError represents an error sent to the client +type ServerError struct { + Event + ErrorEvent `json:"error"` +} + // Eventer define an interface that any eventer must follow type Eventer interface { Subscribe(func()) Unsubscribe() Launch() + Name() string } // Notifier represents the base of any Eventer @@ -30,12 +65,14 @@ type Notifier struct { subscribed bool done chan struct{} eventStream chan ServerEvent + errorStream chan ServerError } // NewNotifier returns a new notifier -func NewNotifier(eventStream chan ServerEvent) *Notifier { +func NewNotifier(eventStream chan ServerEvent, errorStream chan ServerError) *Notifier { return &Notifier{ eventStream: eventStream, + errorStream: errorStream, done: make(chan struct{}), } } @@ -68,3 +105,37 @@ func (t *Notifier) Unsubscribe() { t.done <- struct{}{} } + +// Error sends an error into the errorStream channel +func (t *Notifier) Error(name string, err error) { + t.errorStream <- ServerError{ + Event: Event{ + Type: name, + Status: Error, + }, + ErrorEvent: ErrorEvent{ + Level: WarningError, + Message: err.Error(), + }, + } +} + +// FatalError sends an error into the errorStream channel and tell the client +// that the error is fatal, the notifier won't communicate until a new +// subscribe +func (t *Notifier) FatalError(name string, err error) { + t.Lock() + defer t.Unlock() + + t.errorStream <- ServerError{ + Event: Event{ + Type: name, + Status: Error, + }, + ErrorEvent: ErrorEvent{ + Level: FatalError, + Message: err.Error(), + }, + } + t.subscribed = false +} diff --git a/backend/events/torrents.go b/backend/events/torrents.go index 8872666..c037cb1 100644 --- a/backend/events/torrents.go +++ b/backend/events/torrents.go @@ -18,6 +18,11 @@ type TorrentNotifier struct { log *logrus.Entry } +// Name implements the Notifier interface +func (t *TorrentNotifier) Name() string { + return "torrents" +} + // Launch implements the Notifier interface func (t *TorrentNotifier) Launch() { // Create the timer that will check for the torrents every X seconds @@ -34,6 +39,7 @@ func (t *TorrentNotifier) Launch() { err = t.torrentsUpdate() if err != nil { t.log.Warnf("got error getting torrent update: %s", err) + t.FatalError(err) return } @@ -43,6 +49,7 @@ func (t *TorrentNotifier) Launch() { err := t.torrentsUpdate() if err != nil { t.log.Warnf("got error getting torrent update: %s", err) + t.FatalError(err) return } case <-t.done: @@ -52,6 +59,12 @@ func (t *TorrentNotifier) Launch() { } } +// FatalError is a wrapper around Notifier FatalError +func (t *TorrentNotifier) FatalError(err error) { + t.Notifier.FatalError(t.Name(), err) +} + +// torrentsUpdate sends to the eventStream if torrents change func (t *TorrentNotifier) torrentsUpdate() error { // Get torrents torrents, err := t.client.GetTorrents() @@ -65,7 +78,10 @@ func (t *TorrentNotifier) torrentsUpdate() error { // If they're different, send the event event := ServerEvent{ - Type: "torrents", + Event: Event{ + Type: t.Name(), + Status: OK, + }, Data: torrents, } t.eventStream <- event