events: Add an error channel

The server now sends errors to the client
This commit is contained in:
Lucas BEE 2019-05-27 11:29:34 +00:00
parent a7741ab62b
commit 2bd90e5cb5
4 changed files with 218 additions and 111 deletions

122
backend/events/channel.go Normal file
View File

@ -0,0 +1,122 @@
package events
import (
"net"
"time"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)
type channel struct {
log *logrus.Entry
events map[string]Eventer
// Channel where the eventer will write messages to
serverEventStream chan ServerEvent
// Channel where the eventer will write errors to
serverErrorStream chan ServerError
// Done channel when we need to stop the connection
done chan struct{}
// Underlying ws connection
conn *websocket.Conn
}
// go routine writing events to the websocket connection
func (c *channel) writer() {
// Create the ping timer that will ping the client every pingWait seconds
// to check that he's still listening
pingTicker := time.NewTicker(pingWait)
defer pingTicker.Stop()
// If we exit, close the connection to tell the reader to stop
defer c.conn.Close()
// Write loop
for {
select {
case <-pingTicker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.log.Warnf("error writing message: %s", err)
return
}
case e := <-c.serverEventStream:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteJSON(e); err != nil {
c.log.Warnf("error writing JSON message: %s", err)
return
}
case err := <-c.serverErrorStream:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteJSON(err); err != nil {
c.log.Warnf("error writing JSON error: %s", err)
return
}
case <-c.done:
c.log.Debug("all done finished")
return
}
}
}
// go routine reading messages from the websocket connection
func (c *channel) reader() {
// Read loop
c.conn.SetReadDeadline(time.Now().Add(pongWait))
for {
msg := ClientMessage{}
// Read the client's message
if err := c.conn.ReadJSON(&msg); err != nil {
switch e := err.(type) {
case *websocket.CloseError:
c.log.Info("close error")
case net.Error:
if e.Timeout() {
c.log.WithField(
"error", err,
).Warn("timeout")
} else {
c.log.WithField(
"error", err,
).Warn("unknown net error")
}
default:
c.log.WithField(
"error", err,
).Warn("unknown error reading message")
}
// We got an error, we close all the subscribed events and stop the
// writer goroutine
c.close()
return
}
e, ok := c.events[msg.Message]
if !ok {
c.log.Warnf("no such event to subscribe %q", msg.Message)
continue
}
switch msg.Type {
case "subscribe":
c.log.Debugf("subscribe to %s", msg.Message)
e.Subscribe(e.Launch)
case "unsubscribe":
c.log.Debugf("unsubscribe from %s", msg.Message)
e.Unsubscribe()
default:
c.log.Warnf("invalid type: %s", msg.Type)
}
}
}
func (c *channel) close() {
// Unsubscribe from all events
for _, e := range c.events {
e.Unsubscribe()
}
// Tell the writer to stop
c.done <- struct{}{}
}

View File

@ -2,7 +2,6 @@ package events
import (
"errors"
"net"
"net/http"
"time"
@ -10,7 +9,6 @@ import (
"git.quimbo.fr/odwrtw/canape/backend/users"
"git.quimbo.fr/odwrtw/canape/backend/web"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)
const (
@ -53,12 +51,14 @@ func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
return nil
})
// Channel where the eventers will write to
// Channel where the eventers will write events to
serverEventStream := make(chan ServerEvent)
// Channel where the eventers will write errors to
serverErrorStream := make(chan ServerError)
events := map[string]Eventer{
"torrents": &TorrentNotifier{
Notifier: NewNotifier(serverEventStream),
Notifier: NewNotifier(serverEventStream, serverErrorStream),
user: user,
log: env.Log,
},
@ -66,6 +66,7 @@ func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
c := channel{
serverEventStream: serverEventStream,
serverErrorStream: serverErrorStream,
done: make(chan struct{}, 1),
conn: ws,
log: env.Log,
@ -79,106 +80,3 @@ func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
return nil
}
type channel struct {
log *logrus.Entry
events map[string]Eventer
// Channel where the eventer will write to
serverEventStream chan ServerEvent
// Done channel when we need to stop the connection
done chan struct{}
// Underlying ws connection
conn *websocket.Conn
}
func (c *channel) writer() {
// Create the ping timer that will ping the client every pingWait seconds
// to check that he's still listening
pingTicker := time.NewTicker(pingWait)
defer pingTicker.Stop()
// If we exit, close the connection to tell the reader to stop
defer c.conn.Close()
// Write loop
for {
select {
case <-pingTicker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.log.Warnf("error writing message: %s", err)
return
}
case e := <-c.serverEventStream:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteJSON(e); err != nil {
c.log.Warnf("error writing JSON message: %s", err)
return
}
case <-c.done:
c.log.Debug("all done finished")
return
}
}
}
func (c *channel) reader() {
// Read loop
c.conn.SetReadDeadline(time.Now().Add(pongWait))
for {
msg := ClientMessage{}
// Read the client's message
if err := c.conn.ReadJSON(&msg); err != nil {
switch e := err.(type) {
case *websocket.CloseError:
c.log.Info("close error")
case net.Error:
if e.Timeout() {
c.log.WithField(
"error", err,
).Warn("timeout")
} else {
c.log.WithField(
"error", err,
).Warn("unknown net error")
}
default:
c.log.WithField(
"error", err,
).Warn("unknown error reading message")
}
// We got an error, we close all the subscribed events and stop the
// writer goroutine
c.close()
return
}
e, ok := c.events[msg.Message]
if !ok {
c.log.Warnf("no such event to subscribe %q", msg.Message)
continue
}
switch msg.Type {
case "subscribe":
c.log.Debugf("subscribe to %s", msg.Message)
e.Subscribe(e.Launch)
case "unsubscribe":
c.log.Debugf("unsubscribe from %s", msg.Message)
e.Unsubscribe()
default:
c.log.Warnf("invalid type: %s", msg.Type)
}
}
}
func (c *channel) close() {
// Unsubscribe from all events
for _, e := range c.events {
e.Unsubscribe()
}
// Tell the writer to stop
c.done <- struct{}{}
}

View File

@ -4,23 +4,58 @@ import (
"sync"
)
// ErrorLevel is the level of the ServerError
type ErrorLevel string
// Status is the status of an event
type Status string
// Different ErrorLevels and statuses
const (
// ErrorLevel
WarningError ErrorLevel = "warning"
FatalError ErrorLevel = "fatal"
// Statuses
OK Status = "ok"
Error Status = "error"
)
// Event is the base of a message
type Event struct {
Type string `json:"type"`
Status Status `json:"status"`
}
// ClientMessage represents a message sent by the client
type ClientMessage struct {
Type string `json:"type"`
Event
Message string `json:"message"`
}
// ServerEvent represents an event sent to the client
type ServerEvent struct {
Type string `json:"type"`
Event
Data interface{} `json:"message"`
}
// ErrorEvent represents an event error
type ErrorEvent struct {
Level ErrorLevel `json:"level"`
Message string `json:"message"`
}
// ServerError represents an error sent to the client
type ServerError struct {
Event
ErrorEvent `json:"error"`
}
// Eventer define an interface that any eventer must follow
type Eventer interface {
Subscribe(func())
Unsubscribe()
Launch()
Name() string
}
// Notifier represents the base of any Eventer
@ -30,12 +65,14 @@ type Notifier struct {
subscribed bool
done chan struct{}
eventStream chan ServerEvent
errorStream chan ServerError
}
// NewNotifier returns a new notifier
func NewNotifier(eventStream chan ServerEvent) *Notifier {
func NewNotifier(eventStream chan ServerEvent, errorStream chan ServerError) *Notifier {
return &Notifier{
eventStream: eventStream,
errorStream: errorStream,
done: make(chan struct{}),
}
}
@ -68,3 +105,37 @@ func (t *Notifier) Unsubscribe() {
t.done <- struct{}{}
}
// Error sends an error into the errorStream channel
func (t *Notifier) Error(name string, err error) {
t.errorStream <- ServerError{
Event: Event{
Type: name,
Status: Error,
},
ErrorEvent: ErrorEvent{
Level: WarningError,
Message: err.Error(),
},
}
}
// FatalError sends an error into the errorStream channel and tell the client
// that the error is fatal, the notifier won't communicate until a new
// subscribe
func (t *Notifier) FatalError(name string, err error) {
t.Lock()
defer t.Unlock()
t.errorStream <- ServerError{
Event: Event{
Type: name,
Status: Error,
},
ErrorEvent: ErrorEvent{
Level: FatalError,
Message: err.Error(),
},
}
t.subscribed = false
}

View File

@ -18,6 +18,11 @@ type TorrentNotifier struct {
log *logrus.Entry
}
// Name implements the Notifier interface
func (t *TorrentNotifier) Name() string {
return "torrents"
}
// Launch implements the Notifier interface
func (t *TorrentNotifier) Launch() {
// Create the timer that will check for the torrents every X seconds
@ -34,6 +39,7 @@ func (t *TorrentNotifier) Launch() {
err = t.torrentsUpdate()
if err != nil {
t.log.Warnf("got error getting torrent update: %s", err)
t.FatalError(err)
return
}
@ -43,6 +49,7 @@ func (t *TorrentNotifier) Launch() {
err := t.torrentsUpdate()
if err != nil {
t.log.Warnf("got error getting torrent update: %s", err)
t.FatalError(err)
return
}
case <-t.done:
@ -52,6 +59,12 @@ func (t *TorrentNotifier) Launch() {
}
}
// FatalError is a wrapper around Notifier FatalError
func (t *TorrentNotifier) FatalError(err error) {
t.Notifier.FatalError(t.Name(), err)
}
// torrentsUpdate sends to the eventStream if torrents change
func (t *TorrentNotifier) torrentsUpdate() error {
// Get torrents
torrents, err := t.client.GetTorrents()
@ -65,7 +78,10 @@ func (t *TorrentNotifier) torrentsUpdate() error {
// If they're different, send the event
event := ServerEvent{
Type: "torrents",
Event: Event{
Type: t.Name(),
Status: OK,
},
Data: torrents,
}
t.eventStream <- event