canape/backend/events/handlers.go
Grégoire Delattre e41c9bfdfa Return the video details embedded in the torrents
This requires the eventers to have the app env
2020-04-11 17:45:33 +02:00

149 lines
3.6 KiB
Go

package events
import (
"encoding/json"
"fmt"
"net/http"
"time"
"git.quimbo.fr/odwrtw/canape/backend/auth"
"git.quimbo.fr/odwrtw/canape/backend/models"
"git.quimbo.fr/odwrtw/canape/backend/web"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)
const (
// Ping every 30 seconds, must be less than pongWait
pingWait = 30 * time.Second
// Time allowed to read the next pong message from the client
pongWait = 35 * time.Second
// Time allowed to write to the client
writeWait = 30 * time.Second
)
// Eventers is a global map of all the available Eventers
var Eventers map[string]*PolochonEventers
var eventersSetup bool
func initEventers(env *web.Env) {
if eventersSetup {
return
}
Eventers = map[string]*PolochonEventers{
torrentEventName: NewTorrentEventers(env),
videoEventName: NewVideoEventers(env),
}
eventersSetup = true
}
// WsHandler handles the websockets messages
func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
initEventers(env)
// Get the user
user := auth.GetCurrentUser(r, env.Log)
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// Upgrade the request for websockets
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
env.Log.Warnf("got error upgrading request: %+v", err)
if _, ok := err.(websocket.HandshakeError); !ok {
env.Log.Warn("handshake error")
}
return err
}
defer ws.Close()
// The pong handler only postpone the read deadline
ws.SetPongHandler(func(string) error {
_ = ws.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
// Channel where the eventers will write events to
serverEventStream := make(chan ServerEvent)
// Channel where the eventers will write errors to
serverErrorStream := make(chan ServerError)
requestID := auth.GetRequestIDFromRequest(r)
c := Channel{
serverEventStream: serverEventStream,
serverErrorStream: serverErrorStream,
done: make(chan struct{}, 1),
conn: ws,
User: user,
db: env.Database,
Events: map[string]struct{}{},
ID: requestID,
closed: false,
}
// Launch the go routine responsible for writing events in the websocket
go c.writer(env)
// Launch the reader responsible for reading events from the websocket
c.reader(env)
return nil
}
// PolochonHookHandler handles the websockets messages
func PolochonHookHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
id := vars["id"]
token := r.URL.Query().Get("token")
env.Log.Infof("got call on hook with %s and %s", id, token)
// Get the polochon associated
p, err := models.GetPolochonByID(env.Database, id)
if err != nil {
return env.RenderError(w, err)
}
// Check the auth
if token != p.AuthToken {
return env.RenderError(w, fmt.Errorf("Forbidden"))
}
e := Eventers[videoEventName]
notifier, ok := e.polochons[id]
if !ok {
env.Log.Infof("nobody is listening")
return env.RenderOK(w, "nobody is listening")
}
var data struct {
Type string `json:"type"`
Data interface{} `json:"data"`
}
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
return err
}
notifier.NotifyAll(data)
return env.RenderOK(w, "All good")
}
// HookDebugHandler handles the websockets messages
func HookDebugHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
initEventers(env)
debug := map[string]map[string][]*Channel{}
for e, event := range Eventers {
debug[e] = map[string][]*Channel{}
for poloName, polo := range event.polochons {
debug[e][poloName] = polo.Subscribers()
}
}
return env.RenderJSON(w, debug)
}