canape/backend/events/handlers.go

185 lines
4.2 KiB
Go

package events
import (
"errors"
"net"
"net/http"
"time"
"git.quimbo.fr/odwrtw/canape/backend/auth"
"git.quimbo.fr/odwrtw/canape/backend/users"
"git.quimbo.fr/odwrtw/canape/backend/web"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)
const (
// Ping every 30 seconds, must be less than pongWait
pingWait = 30 * time.Second
// Time allowed to read the next pong message from the client
pongWait = 35 * time.Second
// Time allowed to write to the client
writeWait = 30 * time.Second
)
// WsHandler handles the websockets messages
func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
// Get the user
v := auth.GetCurrentUser(r, env.Log)
user, ok := v.(*users.User)
if !ok {
return env.RenderError(w, errors.New("invalid user type"))
}
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// Upgrade the request for websockets
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
env.Log.Warnf("got error upgrading request: %+v", err)
if _, ok := err.(websocket.HandshakeError); !ok {
env.Log.Warn("handshake error")
}
return err
}
defer ws.Close()
// The pong handler only postpone the read deadline
ws.SetPongHandler(func(string) error {
ws.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
// Channel where the eventers will write to
serverEventStream := make(chan ServerEvent)
events := map[string]Eventer{
"torrents": &TorrentNotifier{
Notifier: NewNotifier(serverEventStream),
user: user,
log: env.Log,
},
}
c := channel{
serverEventStream: serverEventStream,
done: make(chan struct{}, 1),
conn: ws,
log: env.Log,
events: events,
}
// Launch the go routine responsible for writing events in the websocket
go c.writer()
// Launch the reader responsible for reading events from the websocket
c.reader()
return nil
}
type channel struct {
log *logrus.Entry
events map[string]Eventer
// Channel where the eventer will write to
serverEventStream chan ServerEvent
// Done channel when we need to stop the connection
done chan struct{}
// Underlying ws connection
conn *websocket.Conn
}
func (c *channel) writer() {
// Create the ping timer that will ping the client every pingWait seconds
// to check that he's still listening
pingTicker := time.NewTicker(pingWait)
defer pingTicker.Stop()
// If we exit, close the connection to tell the reader to stop
defer c.conn.Close()
// Write loop
for {
select {
case <-pingTicker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.log.Warnf("error writing message: %s", err)
return
}
case e := <-c.serverEventStream:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteJSON(e); err != nil {
c.log.Warnf("error writing JSON message: %s", err)
return
}
case <-c.done:
c.log.Debug("all done finished")
return
}
}
}
func (c *channel) reader() {
// Read loop
c.conn.SetReadDeadline(time.Now().Add(pongWait))
for {
msg := ClientMessage{}
// Read the client's message
if err := c.conn.ReadJSON(&msg); err != nil {
switch e := err.(type) {
case *websocket.CloseError:
c.log.Info("close error")
case net.Error:
if e.Timeout() {
c.log.WithField(
"error", err,
).Warn("timeout")
} else {
c.log.WithField(
"error", err,
).Warn("unknown net error")
}
default:
c.log.WithField(
"error", err,
).Warn("unknown error reading message")
}
// We got an error, we close all the subscribed events and stop the
// writer goroutine
c.close()
return
}
e, ok := c.events[msg.Message]
if !ok {
c.log.Warnf("no such event to subscribe %q", msg.Message)
continue
}
switch msg.Type {
case "subscribe":
c.log.Debugf("subscribe to %s", msg.Message)
e.Subscribe(e.Launch)
case "unsubscribe":
c.log.Debugf("unsubscribe from %s", msg.Message)
e.Unsubscribe()
default:
c.log.Warnf("invalid type: %s", msg.Type)
}
}
}
func (c *channel) close() {
// Unsubscribe from all events
for _, e := range c.events {
e.Unsubscribe()
}
// Tell the writer to stop
c.done <- struct{}{}
}