Add WebSockets endpoint /events for torrents
This commit is contained in:
parent
42a0a8eb73
commit
18127fa3a0
184
backend/events/handlers.go
Normal file
184
backend/events/handlers.go
Normal file
@ -0,0 +1,184 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"git.quimbo.fr/odwrtw/canape/backend/auth"
|
||||
"git.quimbo.fr/odwrtw/canape/backend/users"
|
||||
"git.quimbo.fr/odwrtw/canape/backend/web"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
// Ping every 30 seconds, must be less than pongWait
|
||||
pingWait = 30 * time.Second
|
||||
// Time allowed to read the next pong message from the client
|
||||
pongWait = 35 * time.Second
|
||||
// Time allowed to write to the client
|
||||
writeWait = 30 * time.Second
|
||||
)
|
||||
|
||||
// WsHandler handles the websockets messages
|
||||
func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
|
||||
// Get the user
|
||||
v := auth.GetCurrentUser(r, env.Log)
|
||||
user, ok := v.(*users.User)
|
||||
if !ok {
|
||||
return env.RenderError(w, errors.New("invalid user type"))
|
||||
}
|
||||
|
||||
upgrader := websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
|
||||
// Upgrade the request for websockets
|
||||
ws, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
env.Log.Warnf("got error upgrading request: %+v", err)
|
||||
if _, ok := err.(websocket.HandshakeError); !ok {
|
||||
env.Log.Warn("handshake error")
|
||||
}
|
||||
return err
|
||||
}
|
||||
defer ws.Close()
|
||||
|
||||
// The pong handler only postpone the read deadline
|
||||
ws.SetPongHandler(func(string) error {
|
||||
ws.SetReadDeadline(time.Now().Add(pongWait))
|
||||
return nil
|
||||
})
|
||||
|
||||
// Channel where the eventers will write to
|
||||
serverEventStream := make(chan ServerEvent)
|
||||
|
||||
events := map[string]Eventer{
|
||||
"torrents": &TorrentNotifier{
|
||||
Notifier: NewNotifier(serverEventStream),
|
||||
user: user,
|
||||
log: env.Log,
|
||||
},
|
||||
}
|
||||
|
||||
c := channel{
|
||||
serverEventStream: serverEventStream,
|
||||
done: make(chan struct{}, 1),
|
||||
conn: ws,
|
||||
log: env.Log,
|
||||
events: events,
|
||||
}
|
||||
|
||||
// Launch the go routine responsible for writing events in the websocket
|
||||
go c.writer()
|
||||
// Launch the reader responsible for reading events from the websocket
|
||||
c.reader()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type channel struct {
|
||||
log *logrus.Entry
|
||||
events map[string]Eventer
|
||||
// Channel where the eventer will write to
|
||||
serverEventStream chan ServerEvent
|
||||
// Done channel when we need to stop the connection
|
||||
done chan struct{}
|
||||
// Underlying ws connection
|
||||
conn *websocket.Conn
|
||||
}
|
||||
|
||||
func (c *channel) writer() {
|
||||
// Create the ping timer that will ping the client every pingWait seconds
|
||||
// to check that he's still listening
|
||||
pingTicker := time.NewTicker(pingWait)
|
||||
defer pingTicker.Stop()
|
||||
|
||||
// If we exit, close the connection to tell the reader to stop
|
||||
defer c.conn.Close()
|
||||
|
||||
// Write loop
|
||||
for {
|
||||
select {
|
||||
case <-pingTicker.C:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
c.log.Warnf("error writing message: %s", err)
|
||||
return
|
||||
}
|
||||
case e := <-c.serverEventStream:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
if err := c.conn.WriteJSON(e); err != nil {
|
||||
c.log.Warnf("error writing JSON message: %s", err)
|
||||
return
|
||||
}
|
||||
case <-c.done:
|
||||
c.log.Debug("all done finished")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *channel) reader() {
|
||||
// Read loop
|
||||
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
for {
|
||||
msg := ClientMessage{}
|
||||
|
||||
// Read the client's message
|
||||
if err := c.conn.ReadJSON(&msg); err != nil {
|
||||
switch e := err.(type) {
|
||||
case *websocket.CloseError:
|
||||
c.log.Info("close error")
|
||||
case net.Error:
|
||||
if e.Timeout() {
|
||||
c.log.WithField(
|
||||
"error", err,
|
||||
).Warn("timeout")
|
||||
} else {
|
||||
c.log.WithField(
|
||||
"error", err,
|
||||
).Warn("unknown net error")
|
||||
}
|
||||
default:
|
||||
c.log.WithField(
|
||||
"error", err,
|
||||
).Warn("unknown error reading message")
|
||||
}
|
||||
|
||||
// We got an error, we close all the subscribed events and stop the
|
||||
// writer goroutine
|
||||
c.close()
|
||||
return
|
||||
}
|
||||
|
||||
e, ok := c.events[msg.Message]
|
||||
if !ok {
|
||||
c.log.Warnf("no such event to subscribe %q", msg.Message)
|
||||
continue
|
||||
}
|
||||
|
||||
switch msg.Type {
|
||||
case "subscribe":
|
||||
c.log.Debugf("subscribe to %s", msg.Message)
|
||||
e.Subscribe(e.Launch)
|
||||
case "unsubscribe":
|
||||
c.log.Debugf("unsubscribe from %s", msg.Message)
|
||||
e.Unsubscribe()
|
||||
default:
|
||||
c.log.Warnf("invalid type: %s", msg.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *channel) close() {
|
||||
// Unsubscribe from all events
|
||||
for _, e := range c.events {
|
||||
e.Unsubscribe()
|
||||
}
|
||||
// Tell the writer to stop
|
||||
c.done <- struct{}{}
|
||||
}
|
70
backend/events/notifier.go
Normal file
70
backend/events/notifier.go
Normal file
@ -0,0 +1,70 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// ClientMessage represents a message sent by the client
|
||||
type ClientMessage struct {
|
||||
Type string `json:"type"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
// ServerEvent represents an event sent to the client
|
||||
type ServerEvent struct {
|
||||
Type string `json:"type"`
|
||||
Data interface{} `json:"message"`
|
||||
}
|
||||
|
||||
// Eventer define an interface that any eventer must follow
|
||||
type Eventer interface {
|
||||
Subscribe(func())
|
||||
Unsubscribe()
|
||||
Launch()
|
||||
}
|
||||
|
||||
// Notifier represents the base of any Eventer
|
||||
// Notify implements Subscribe and Unsubscribe methods
|
||||
type Notifier struct {
|
||||
sync.RWMutex
|
||||
subscribed bool
|
||||
done chan struct{}
|
||||
eventStream chan ServerEvent
|
||||
}
|
||||
|
||||
// NewNotifier returns a new notifier
|
||||
func NewNotifier(eventStream chan ServerEvent) *Notifier {
|
||||
return &Notifier{
|
||||
eventStream: eventStream,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe subscribes a client on the Notifier
|
||||
func (t *Notifier) Subscribe(launch func()) {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
// Check if already subscribed
|
||||
if t.subscribed {
|
||||
return
|
||||
}
|
||||
t.subscribed = true
|
||||
|
||||
go launch()
|
||||
}
|
||||
|
||||
// Unsubscribe unsubscribes a client from receiving any more events from
|
||||
// the Notifier
|
||||
func (t *Notifier) Unsubscribe() {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
// Check if already unsubscribed
|
||||
if !t.subscribed {
|
||||
return
|
||||
}
|
||||
t.subscribed = false
|
||||
|
||||
t.done <- struct{}{}
|
||||
}
|
76
backend/events/torrents.go
Normal file
76
backend/events/torrents.go
Normal file
@ -0,0 +1,76 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"git.quimbo.fr/odwrtw/canape/backend/users"
|
||||
"github.com/odwrtw/papi"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// TorrentNotifier is a struct implementing the Notifier interface
|
||||
type TorrentNotifier struct {
|
||||
*Notifier
|
||||
user *users.User
|
||||
client *papi.Client
|
||||
torrents []papi.Torrent
|
||||
log *logrus.Entry
|
||||
}
|
||||
|
||||
// Launch implements the Notifier interface
|
||||
func (t *TorrentNotifier) Launch() {
|
||||
// Create the timer that will check for the torrents every X seconds
|
||||
timeTicker := time.NewTicker(10 * time.Second)
|
||||
defer timeTicker.Stop()
|
||||
|
||||
// Create a new papi client
|
||||
client, err := t.user.NewPapiClient()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
t.client = client
|
||||
|
||||
err = t.torrentsUpdate()
|
||||
if err != nil {
|
||||
t.log.Warnf("got error getting torrent update: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timeTicker.C:
|
||||
err := t.torrentsUpdate()
|
||||
if err != nil {
|
||||
t.log.Warnf("got error getting torrent update: %s", err)
|
||||
return
|
||||
}
|
||||
case <-t.done:
|
||||
t.log.Info("quit torrent notifier")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TorrentNotifier) torrentsUpdate() error {
|
||||
// Get torrents
|
||||
torrents, err := t.client.GetTorrents()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if reflect.DeepEqual(t.torrents, torrents) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If they're different, send the event
|
||||
event := ServerEvent{
|
||||
Type: "torrents",
|
||||
Data: torrents,
|
||||
}
|
||||
t.eventStream <- event
|
||||
|
||||
t.torrents = torrents
|
||||
|
||||
return nil
|
||||
}
|
@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
admin "git.quimbo.fr/odwrtw/canape/backend/admins"
|
||||
"git.quimbo.fr/odwrtw/canape/backend/events"
|
||||
extmedias "git.quimbo.fr/odwrtw/canape/backend/external_medias"
|
||||
"git.quimbo.fr/odwrtw/canape/backend/movies"
|
||||
"git.quimbo.fr/odwrtw/canape/backend/ratings"
|
||||
@ -66,6 +67,9 @@ func setupRoutes(env *web.Env) {
|
||||
// Route to refresh all movies and shows
|
||||
env.Handle("/refresh", extmedias.RefreshHandler).WithRole(users.AdminRole).Methods("POST")
|
||||
|
||||
// Route to handle websocket events
|
||||
env.Handle("/events", events.WsHandler).WithRole(users.UserRole).Methods("GET")
|
||||
|
||||
// Admin routes
|
||||
env.Handle("/admins/users", admin.GetUsersHandler).WithRole(users.AdminRole).Methods("GET")
|
||||
env.Handle("/admins/users", admin.UpdateUserHandler).WithRole(users.AdminRole).Methods("POST")
|
||||
|
@ -47,7 +47,7 @@ func DownloadHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error
|
||||
return env.RenderOK(w, "Torrent added")
|
||||
}
|
||||
|
||||
// ListHandler downloads a torrent via polochon
|
||||
// ListHandler lists torrents of a polochon
|
||||
func ListHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
|
||||
v := auth.GetCurrentUser(r, env.Log)
|
||||
user, ok := v.(*users.User)
|
||||
@ -133,14 +133,9 @@ func SearchHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
|
||||
}
|
||||
|
||||
// Sort by seeds
|
||||
sort.Sort(BySeed(results))
|
||||
sort.Slice(results, func(i, j int) bool {
|
||||
return results[i].Seeders > results[j].Seeders
|
||||
})
|
||||
|
||||
return env.RenderJSON(w, results)
|
||||
}
|
||||
|
||||
// BySeed is an helper to sort torrents by seeders
|
||||
type BySeed []*polochon.Torrent
|
||||
|
||||
func (t BySeed) Len() int { return len(t) }
|
||||
func (t BySeed) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
|
||||
func (t BySeed) Less(i, j int) bool { return t[i].Seeders > t[j].Seeders }
|
||||
|
1
go.mod
1
go.mod
@ -6,6 +6,7 @@ require (
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/go-sql-driver/mysql v1.4.1 // indirect
|
||||
github.com/gorilla/mux v1.7.1
|
||||
github.com/gorilla/websocket v1.4.0
|
||||
github.com/gregdel/srt2vtt v0.0.0-20170314031115-46562d19ab2d
|
||||
github.com/jmoiron/sqlx v1.2.0
|
||||
github.com/lib/pq v1.1.1
|
||||
|
1
go.sum
1
go.sum
@ -22,6 +22,7 @@ github.com/gorilla/mux v1.7.1 h1:Dw4jY2nghMMRsh1ol8dv1axHkDwMQK2DHerMNJsIpJU=
|
||||
github.com/gorilla/mux v1.7.1/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||
github.com/gorilla/rpc v1.1.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ=
|
||||
github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ=
|
||||
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
|
||||
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||
github.com/gregdel/argo v0.0.0-20190104143955-4ac365771987/go.mod h1:dS8cBtIK+0jAZy09xmjPn0W1EMmNKiYQMuZ0BXseLio=
|
||||
github.com/gregdel/pushover v0.0.0-20190217183207-15d3fef40636 h1:6agUllU8gUNAallyB+afeLXMRLL6Q1z+S6YC7Pi1EIY=
|
||||
|
Loading…
x
Reference in New Issue
Block a user