From 18127fa3a0ac15c774d8f5a3d397c1792e5e60b4 Mon Sep 17 00:00:00 2001 From: Lucas BEE Date: Thu, 15 Mar 2018 13:13:50 +0100 Subject: [PATCH] Add WebSockets endpoint /events for torrents --- backend/events/handlers.go | 184 +++++++++++++++++++++++++++++++++++ backend/events/notifier.go | 70 +++++++++++++ backend/events/torrents.go | 76 +++++++++++++++ backend/routes.go | 4 + backend/torrents/handlers.go | 13 +-- go.mod | 1 + go.sum | 1 + 7 files changed, 340 insertions(+), 9 deletions(-) create mode 100644 backend/events/handlers.go create mode 100644 backend/events/notifier.go create mode 100644 backend/events/torrents.go diff --git a/backend/events/handlers.go b/backend/events/handlers.go new file mode 100644 index 0000000..b92a052 --- /dev/null +++ b/backend/events/handlers.go @@ -0,0 +1,184 @@ +package events + +import ( + "errors" + "net" + "net/http" + "time" + + "git.quimbo.fr/odwrtw/canape/backend/auth" + "git.quimbo.fr/odwrtw/canape/backend/users" + "git.quimbo.fr/odwrtw/canape/backend/web" + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" +) + +const ( + // Ping every 30 seconds, must be less than pongWait + pingWait = 30 * time.Second + // Time allowed to read the next pong message from the client + pongWait = 35 * time.Second + // Time allowed to write to the client + writeWait = 30 * time.Second +) + +// WsHandler handles the websockets messages +func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { + // Get the user + v := auth.GetCurrentUser(r, env.Log) + user, ok := v.(*users.User) + if !ok { + return env.RenderError(w, errors.New("invalid user type")) + } + + upgrader := websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + + // Upgrade the request for websockets + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + env.Log.Warnf("got error upgrading request: %+v", err) + if _, ok := err.(websocket.HandshakeError); !ok { + env.Log.Warn("handshake error") + } + return err + } + defer ws.Close() + + // The pong handler only postpone the read deadline + ws.SetPongHandler(func(string) error { + ws.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + // Channel where the eventers will write to + serverEventStream := make(chan ServerEvent) + + events := map[string]Eventer{ + "torrents": &TorrentNotifier{ + Notifier: NewNotifier(serverEventStream), + user: user, + log: env.Log, + }, + } + + c := channel{ + serverEventStream: serverEventStream, + done: make(chan struct{}, 1), + conn: ws, + log: env.Log, + events: events, + } + + // Launch the go routine responsible for writing events in the websocket + go c.writer() + // Launch the reader responsible for reading events from the websocket + c.reader() + + return nil +} + +type channel struct { + log *logrus.Entry + events map[string]Eventer + // Channel where the eventer will write to + serverEventStream chan ServerEvent + // Done channel when we need to stop the connection + done chan struct{} + // Underlying ws connection + conn *websocket.Conn +} + +func (c *channel) writer() { + // Create the ping timer that will ping the client every pingWait seconds + // to check that he's still listening + pingTicker := time.NewTicker(pingWait) + defer pingTicker.Stop() + + // If we exit, close the connection to tell the reader to stop + defer c.conn.Close() + + // Write loop + for { + select { + case <-pingTicker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + c.log.Warnf("error writing message: %s", err) + return + } + case e := <-c.serverEventStream: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteJSON(e); err != nil { + c.log.Warnf("error writing JSON message: %s", err) + return + } + case <-c.done: + c.log.Debug("all done finished") + return + } + } +} + +func (c *channel) reader() { + // Read loop + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + for { + msg := ClientMessage{} + + // Read the client's message + if err := c.conn.ReadJSON(&msg); err != nil { + switch e := err.(type) { + case *websocket.CloseError: + c.log.Info("close error") + case net.Error: + if e.Timeout() { + c.log.WithField( + "error", err, + ).Warn("timeout") + } else { + c.log.WithField( + "error", err, + ).Warn("unknown net error") + } + default: + c.log.WithField( + "error", err, + ).Warn("unknown error reading message") + } + + // We got an error, we close all the subscribed events and stop the + // writer goroutine + c.close() + return + } + + e, ok := c.events[msg.Message] + if !ok { + c.log.Warnf("no such event to subscribe %q", msg.Message) + continue + } + + switch msg.Type { + case "subscribe": + c.log.Debugf("subscribe to %s", msg.Message) + e.Subscribe(e.Launch) + case "unsubscribe": + c.log.Debugf("unsubscribe from %s", msg.Message) + e.Unsubscribe() + default: + c.log.Warnf("invalid type: %s", msg.Type) + } + } +} + +func (c *channel) close() { + // Unsubscribe from all events + for _, e := range c.events { + e.Unsubscribe() + } + // Tell the writer to stop + c.done <- struct{}{} +} diff --git a/backend/events/notifier.go b/backend/events/notifier.go new file mode 100644 index 0000000..c517489 --- /dev/null +++ b/backend/events/notifier.go @@ -0,0 +1,70 @@ +package events + +import ( + "sync" +) + +// ClientMessage represents a message sent by the client +type ClientMessage struct { + Type string `json:"type"` + Message string `json:"message"` +} + +// ServerEvent represents an event sent to the client +type ServerEvent struct { + Type string `json:"type"` + Data interface{} `json:"message"` +} + +// Eventer define an interface that any eventer must follow +type Eventer interface { + Subscribe(func()) + Unsubscribe() + Launch() +} + +// Notifier represents the base of any Eventer +// Notify implements Subscribe and Unsubscribe methods +type Notifier struct { + sync.RWMutex + subscribed bool + done chan struct{} + eventStream chan ServerEvent +} + +// NewNotifier returns a new notifier +func NewNotifier(eventStream chan ServerEvent) *Notifier { + return &Notifier{ + eventStream: eventStream, + done: make(chan struct{}), + } +} + +// Subscribe subscribes a client on the Notifier +func (t *Notifier) Subscribe(launch func()) { + t.Lock() + defer t.Unlock() + + // Check if already subscribed + if t.subscribed { + return + } + t.subscribed = true + + go launch() +} + +// Unsubscribe unsubscribes a client from receiving any more events from +// the Notifier +func (t *Notifier) Unsubscribe() { + t.Lock() + defer t.Unlock() + + // Check if already unsubscribed + if !t.subscribed { + return + } + t.subscribed = false + + t.done <- struct{}{} +} diff --git a/backend/events/torrents.go b/backend/events/torrents.go new file mode 100644 index 0000000..8872666 --- /dev/null +++ b/backend/events/torrents.go @@ -0,0 +1,76 @@ +package events + +import ( + "reflect" + "time" + + "git.quimbo.fr/odwrtw/canape/backend/users" + "github.com/odwrtw/papi" + "github.com/sirupsen/logrus" +) + +// TorrentNotifier is a struct implementing the Notifier interface +type TorrentNotifier struct { + *Notifier + user *users.User + client *papi.Client + torrents []papi.Torrent + log *logrus.Entry +} + +// Launch implements the Notifier interface +func (t *TorrentNotifier) Launch() { + // Create the timer that will check for the torrents every X seconds + timeTicker := time.NewTicker(10 * time.Second) + defer timeTicker.Stop() + + // Create a new papi client + client, err := t.user.NewPapiClient() + if err != nil { + return + } + t.client = client + + err = t.torrentsUpdate() + if err != nil { + t.log.Warnf("got error getting torrent update: %s", err) + return + } + + for { + select { + case <-timeTicker.C: + err := t.torrentsUpdate() + if err != nil { + t.log.Warnf("got error getting torrent update: %s", err) + return + } + case <-t.done: + t.log.Info("quit torrent notifier") + return + } + } +} + +func (t *TorrentNotifier) torrentsUpdate() error { + // Get torrents + torrents, err := t.client.GetTorrents() + if err != nil { + return err + } + + if reflect.DeepEqual(t.torrents, torrents) { + return nil + } + + // If they're different, send the event + event := ServerEvent{ + Type: "torrents", + Data: torrents, + } + t.eventStream <- event + + t.torrents = torrents + + return nil +} diff --git a/backend/routes.go b/backend/routes.go index 8ca124a..1560739 100644 --- a/backend/routes.go +++ b/backend/routes.go @@ -2,6 +2,7 @@ package main import ( admin "git.quimbo.fr/odwrtw/canape/backend/admins" + "git.quimbo.fr/odwrtw/canape/backend/events" extmedias "git.quimbo.fr/odwrtw/canape/backend/external_medias" "git.quimbo.fr/odwrtw/canape/backend/movies" "git.quimbo.fr/odwrtw/canape/backend/ratings" @@ -66,6 +67,9 @@ func setupRoutes(env *web.Env) { // Route to refresh all movies and shows env.Handle("/refresh", extmedias.RefreshHandler).WithRole(users.AdminRole).Methods("POST") + // Route to handle websocket events + env.Handle("/events", events.WsHandler).WithRole(users.UserRole).Methods("GET") + // Admin routes env.Handle("/admins/users", admin.GetUsersHandler).WithRole(users.AdminRole).Methods("GET") env.Handle("/admins/users", admin.UpdateUserHandler).WithRole(users.AdminRole).Methods("POST") diff --git a/backend/torrents/handlers.go b/backend/torrents/handlers.go index 66d413e..d2bc262 100644 --- a/backend/torrents/handlers.go +++ b/backend/torrents/handlers.go @@ -47,7 +47,7 @@ func DownloadHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error return env.RenderOK(w, "Torrent added") } -// ListHandler downloads a torrent via polochon +// ListHandler lists torrents of a polochon func ListHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { v := auth.GetCurrentUser(r, env.Log) user, ok := v.(*users.User) @@ -133,14 +133,9 @@ func SearchHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { } // Sort by seeds - sort.Sort(BySeed(results)) + sort.Slice(results, func(i, j int) bool { + return results[i].Seeders > results[j].Seeders + }) return env.RenderJSON(w, results) } - -// BySeed is an helper to sort torrents by seeders -type BySeed []*polochon.Torrent - -func (t BySeed) Len() int { return len(t) } -func (t BySeed) Swap(i, j int) { t[i], t[j] = t[j], t[i] } -func (t BySeed) Less(i, j int) bool { return t[i].Seeders > t[j].Seeders } diff --git a/go.mod b/go.mod index cde3699..6c6f943 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/go-sql-driver/mysql v1.4.1 // indirect github.com/gorilla/mux v1.7.1 + github.com/gorilla/websocket v1.4.0 github.com/gregdel/srt2vtt v0.0.0-20170314031115-46562d19ab2d github.com/jmoiron/sqlx v1.2.0 github.com/lib/pq v1.1.1 diff --git a/go.sum b/go.sum index f26730e..5e3849b 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,7 @@ github.com/gorilla/mux v1.7.1 h1:Dw4jY2nghMMRsh1ol8dv1axHkDwMQK2DHerMNJsIpJU= github.com/gorilla/mux v1.7.1/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/rpc v1.1.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ= github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ= +github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gregdel/argo v0.0.0-20190104143955-4ac365771987/go.mod h1:dS8cBtIK+0jAZy09xmjPn0W1EMmNKiYQMuZ0BXseLio= github.com/gregdel/pushover v0.0.0-20190217183207-15d3fef40636 h1:6agUllU8gUNAallyB+afeLXMRLL6Q1z+S6YC7Pi1EIY=