package events import ( "encoding/json" "fmt" "net/http" "sync" "time" "git.quimbo.fr/odwrtw/canape/backend/auth" "git.quimbo.fr/odwrtw/canape/backend/models" "git.quimbo.fr/odwrtw/canape/backend/web" "github.com/gorilla/mux" "github.com/gorilla/websocket" ) const ( // Ping every 30 seconds, must be less than pongWait pingWait = 30 * time.Second // Time allowed to read the next pong message from the client pongWait = 35 * time.Second // Time allowed to write to the client writeWait = 30 * time.Second ) // Eventers is a global map of all the available Eventers var Eventers map[string]*PolochonEventers var once sync.Once func initEventers(env *web.Env) { once.Do(func() { env.Log.Infof("Initialising eventers") Eventers = map[string]*PolochonEventers{ torrentEventName: NewTorrentEventers(env), videoEventName: NewVideoEventers(env), } }) } // WsHandler handles the websockets messages func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { initEventers(env) // Get the user user := auth.GetCurrentUser(r, env.Log) upgrader := websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } // Upgrade the request for websockets ws, err := upgrader.Upgrade(w, r, nil) if err != nil { env.Log.Warnf("got error upgrading request: %+v", err) if _, ok := err.(websocket.HandshakeError); !ok { env.Log.Warn("handshake error") } return err } defer ws.Close() // The pong handler only postpone the read deadline ws.SetPongHandler(func(string) error { _ = ws.SetReadDeadline(time.Now().Add(pongWait)) return nil }) // Channel where the eventers will write events to serverEventStream := make(chan ServerEvent) // Channel where the eventers will write errors to serverErrorStream := make(chan ServerError) requestID := auth.GetRequestIDFromRequest(r) c := Channel{ serverEventStream: serverEventStream, serverErrorStream: serverErrorStream, done: make(chan struct{}, 1), conn: ws, User: user, db: env.Database, Events: map[string]struct{}{}, ID: requestID, closed: false, } // Launch the go routine responsible for writing events in the websocket go c.writer(env) // Launch the reader responsible for reading events from the websocket c.reader(env) return nil } // PolochonHookHandler handles the websockets messages func PolochonHookHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { vars := mux.Vars(r) id := vars["id"] token := r.URL.Query().Get("token") env.Log.Infof("got call on hook with %s and %s", id, token) // Get the polochon associated p, err := models.GetPolochonByID(env.Database, id) if err != nil { return env.RenderError(w, err) } // Check the auth if token != p.AuthToken { return env.RenderError(w, fmt.Errorf("Forbidden")) } e := Eventers[videoEventName] notifier, ok := e.polochons[id] if !ok { env.Log.Infof("nobody is listening") return env.RenderOK(w, "nobody is listening") } var data struct { Type string `json:"type"` Data interface{} `json:"data"` } if err := json.NewDecoder(r.Body).Decode(&data); err != nil { return err } notifier.NotifyAll(data) return env.RenderOK(w, "All good") } // HookDebugHandler handles the websockets messages func HookDebugHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { initEventers(env) debug := map[string]map[string][]*Channel{} for e, event := range Eventers { debug[e] = map[string][]*Channel{} for poloName, polo := range event.polochons { debug[e][poloName] = polo.Subscribers() } } return env.RenderJSON(w, debug) }