diff --git a/backend/events/channel.go b/backend/events/channel.go index f3d3830..59563b4 100644 --- a/backend/events/channel.go +++ b/backend/events/channel.go @@ -22,11 +22,12 @@ type Channel struct { // Underlying ws connection conn *websocket.Conn // A channel is directly linked to a user - User *models.User + User *models.User `json:"user"` // List of events the user is listening to - events map[string]struct{} + Events map[string]struct{} `json:"events"` + ID string `json:"id"` db *sqlx.DB - id string + closed bool } // go routine writing events to the websocket connection @@ -110,7 +111,7 @@ func (c *Channel) reader() { switch msg.Type { case "subscribe": c.log.Debugf("subscribe to %s", msg.Message) - if _, ok := c.events[e.Name]; ok { + if _, ok := c.Events[e.Name]; ok { c.log.Infof("user %s is already subscribed to %s", c.User.Name, e.Name) continue } @@ -118,10 +119,10 @@ func (c *Channel) reader() { c.Error(e.Name, err) continue } - c.events[e.Name] = struct{}{} + c.Events[e.Name] = struct{}{} case "unsubscribe": c.log.Debugf("unsubscribe from %s", msg.Message) - if _, ok := c.events[e.Name]; !ok { + if _, ok := c.Events[e.Name]; !ok { c.log.Infof("user %s is not subscribed to %s", c.User.Name, e.Name) continue } @@ -133,16 +134,23 @@ func (c *Channel) reader() { } func (c *Channel) close() { - // Unsubscribe from all events - for eventName := range c.events { + // The channel is now closed + c.closed = true + + // Unsubscribe from all Events + for eventName := range c.Events { Eventers[eventName].Unsubscribe(c) } + // Tell the writer to stop c.done <- struct{}{} } // Error sends an error into the errorStream channel func (c *Channel) Error(name string, err error) { + if c.closed { + return + } c.log.WithField("name", name).Warn(err) c.serverErrorStream <- ServerError{ Event: Event{ @@ -158,6 +166,9 @@ func (c *Channel) Error(name string, err error) { // FatalError sends an error into the errorStream channel func (c *Channel) FatalError(name string, err error) { + if c.closed { + return + } c.log.WithField("name", name).Warn(err) c.serverErrorStream <- ServerError{ Event: Event{ @@ -171,6 +182,14 @@ func (c *Channel) FatalError(name string, err error) { } } +// SendEvent sends an event to the serverEvent channel +func (c *Channel) sendEvent(event ServerEvent) { + if c.closed { + return + } + c.serverEventStream <- event +} + // Unsubscribe unsubscribes a user from all eventers func Unsubscribe(u *models.User) { chanl := &Channel{User: u} diff --git a/backend/events/events.go b/backend/events/events.go index 648443e..a59af0b 100644 --- a/backend/events/events.go +++ b/backend/events/events.go @@ -55,4 +55,5 @@ type Eventer interface { NotifyAll(data interface{}) Launch() error Finish() + FatalError(error) } diff --git a/backend/events/handlers.go b/backend/events/handlers.go index a495787..669f86f 100644 --- a/backend/events/handlers.go +++ b/backend/events/handlers.go @@ -70,8 +70,9 @@ func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { log: env.Log, User: user, db: env.Database, - events: map[string]struct{}{}, - id: requestID, + Events: map[string]struct{}{}, + ID: requestID, + closed: false, } // Launch the go routine responsible for writing events in the websocket @@ -87,7 +88,7 @@ func PolochonHookHandler(env *web.Env, w http.ResponseWriter, r *http.Request) e vars := mux.Vars(r) id := vars["id"] token := r.URL.Query().Get("token") - env.Log.Infof("Got call on hook with %s and %s", id, token) + env.Log.Infof("got call on hook with %s and %s", id, token) // Get the polochon associated p, err := models.GetPolochonByID(env.Database, id) @@ -118,3 +119,18 @@ func PolochonHookHandler(env *web.Env, w http.ResponseWriter, r *http.Request) e return env.RenderOK(w, "All good") } + +// HookDebugHandler handles the websockets messages +func HookDebugHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { + debug := map[string]map[string][]*Channel{} + + for e, event := range Eventers { + debug[e] = map[string][]*Channel{} + for poloName, polo := range event.polochons { + debug[e][poloName] = polo.Subscribers() + } + + } + + return env.RenderJSON(w, debug) +} diff --git a/backend/events/polochons.go b/backend/events/polochons.go index b5c3525..e91410e 100644 --- a/backend/events/polochons.go +++ b/backend/events/polochons.go @@ -55,7 +55,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error { // listening tn, ok := p.polochons[chanl.User.PolochonID.String] if !ok { - p.log.Debugf("Eventer not already created, create it for polochon %s", chanl.User.PolochonID.String) + p.log.Debugf("create new eventer for polochon %s", chanl.User.PolochonID.String) // Get the user's polochon polo, err := chanl.User.GetPolochon(chanl.db) @@ -71,6 +71,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error { go func() { err := tn.Launch() if err != nil { + tn.FatalError(err) delete(p.polochons, chanl.User.PolochonID.String) } }() @@ -81,7 +82,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error { // Add the Channel to the Eventer tn.Append(chanl) - p.log.Debugf("Eventer created for polochon %s ...", chanl.User.PolochonID.String) + p.log.Debugf("eventer created for polochon %s", chanl.User.PolochonID.String) return nil } @@ -96,14 +97,14 @@ func (p *PolochonEventers) Unsubscribe(chanl *Channel) { tn, ok := p.polochons[chanl.User.PolochonID.String] if !ok { - p.log.Warnf("no Eventer for polochon %s, not unsubscribing", chanl.User.PolochonID.String) + p.log.Warnf("no eventer for polochon %s, not unsubscribing", chanl.User.PolochonID.String) return } tn.Unsubscribe(chanl) if len(tn.Subscribers()) == 0 { - p.log.Debugf("just deleted the last user of this polochon instance, delete it") + p.log.Debugf("empty subscribers for this polochon, delete it") tn.Finish() // Delete the polochon from the Eventer when it's finished delete(p.polochons, chanl.User.PolochonID.String) @@ -124,15 +125,15 @@ func (e *BaseEventer) Unsubscribe(chanl *Channel) error { // Ugly ... // If we don't have a channel ID, check the user ID // Else check for the channel ID - if (chanl.id == "" && e.users[i].User.ID != chanl.User.ID) || - (chanl.id != "" && e.users[i].id != chanl.id) { + if (chanl.ID == "" && e.users[i].User.ID != chanl.User.ID) || + (chanl.ID != "" && e.users[i].ID != chanl.ID) { i++ continue } - e.log.Debugf("found the user channel %s for user %s, deleting it...", chanl.id, chanl.User.Name) + e.log.Debugf("found the user channel %s for user %s, deleting it...", chanl.ID, chanl.User.Name) // Delete this event from the list of events the channel is subscribed - delete(e.users[i].events, e.name) + delete(e.users[i].Events, e.name) // Send the disconnected event event := ServerEvent{ @@ -142,7 +143,7 @@ func (e *BaseEventer) Unsubscribe(chanl *Channel) error { }, } - e.users[i].serverEventStream <- event + e.users[i].sendEvent(event) // Replace the current element with the last one e.users[i] = e.users[len(e.users)-1] @@ -160,7 +161,7 @@ func (e *BaseEventer) FatalError(err error) { // Send the error chanl.FatalError(e.name, err) // Delete the event from the channel events - delete(chanl.events, e.name) + delete(chanl.Events, e.name) } } @@ -188,6 +189,6 @@ func (e *BaseEventer) NotifyAll(data interface{}) { // Send the events to all the subscribed users for _, chanl := range e.users { e.log.Debugf("sending event to %s", chanl.User.Name) - chanl.serverEventStream <- event + chanl.sendEvent(event) } } diff --git a/backend/events/torrent_eventer.go b/backend/events/torrent_eventer.go index d6317b9..39ecec0 100644 --- a/backend/events/torrent_eventer.go +++ b/backend/events/torrent_eventer.go @@ -33,8 +33,7 @@ func NewTorrentEventer(polo *models.Polochon, log *logrus.Entry) (Eventer, error // Create a new papi client client, err := polo.NewPapiClient() if err != nil { - log.Warnf("failed to create the polochon client %s", err) - return nil, fmt.Errorf("failed to instanciate polochon client") + return nil, fmt.Errorf("failed to instanciate polochon client %s", err) } // This is the first time this polochon is requested, create the TorrentEventer @@ -64,7 +63,7 @@ func (t *TorrentEventer) Append(chanl *Channel) { Data: t.torrents, } - chanl.serverEventStream <- event + chanl.sendEvent(event) t.BaseEventer.Append(chanl) } @@ -78,8 +77,6 @@ func (t *TorrentEventer) Launch() error { err := t.torrentsUpdate() if err != nil { - t.log.Warnf("got error getting torrent update: %s", err) - t.FatalError(err) return err } @@ -88,8 +85,6 @@ func (t *TorrentEventer) Launch() error { case <-timeTicker.C: err := t.torrentsUpdate() if err != nil { - t.log.Warnf("got error getting torrent update: %s", err) - t.FatalError(err) return err } case <-t.done: diff --git a/backend/routes.go b/backend/routes.go index 10fe5e9..4f802d6 100644 --- a/backend/routes.go +++ b/backend/routes.go @@ -82,6 +82,7 @@ func setupRoutes(env *web.Env) { // Route to handle websocket events env.Handle("/events/polochons/{id}/hook", events.PolochonHookHandler).Methods("POST") + env.Handle("/events/debug", events.HookDebugHandler).WithRole(models.AdminRole).Methods("GET") // Admin routes env.Handle("/admins/users", admin.GetUsersHandler).WithRole(models.AdminRole).Methods("GET")