Fix deadlock in channels

We now close the Channels when its connection is closed so that we never
try to send events into a dead channel
Add a debug handler showing who is subscribed to what
This commit is contained in:
Lucas BEE 2019-07-17 12:16:06 +00:00
parent f48292e8e4
commit 07823efd74
6 changed files with 62 additions and 29 deletions

View File

@ -22,11 +22,12 @@ type Channel struct {
// Underlying ws connection // Underlying ws connection
conn *websocket.Conn conn *websocket.Conn
// A channel is directly linked to a user // A channel is directly linked to a user
User *models.User User *models.User `json:"user"`
// List of events the user is listening to // List of events the user is listening to
events map[string]struct{} Events map[string]struct{} `json:"events"`
ID string `json:"id"`
db *sqlx.DB db *sqlx.DB
id string closed bool
} }
// go routine writing events to the websocket connection // go routine writing events to the websocket connection
@ -110,7 +111,7 @@ func (c *Channel) reader() {
switch msg.Type { switch msg.Type {
case "subscribe": case "subscribe":
c.log.Debugf("subscribe to %s", msg.Message) c.log.Debugf("subscribe to %s", msg.Message)
if _, ok := c.events[e.Name]; ok { if _, ok := c.Events[e.Name]; ok {
c.log.Infof("user %s is already subscribed to %s", c.User.Name, e.Name) c.log.Infof("user %s is already subscribed to %s", c.User.Name, e.Name)
continue continue
} }
@ -118,10 +119,10 @@ func (c *Channel) reader() {
c.Error(e.Name, err) c.Error(e.Name, err)
continue continue
} }
c.events[e.Name] = struct{}{} c.Events[e.Name] = struct{}{}
case "unsubscribe": case "unsubscribe":
c.log.Debugf("unsubscribe from %s", msg.Message) c.log.Debugf("unsubscribe from %s", msg.Message)
if _, ok := c.events[e.Name]; !ok { if _, ok := c.Events[e.Name]; !ok {
c.log.Infof("user %s is not subscribed to %s", c.User.Name, e.Name) c.log.Infof("user %s is not subscribed to %s", c.User.Name, e.Name)
continue continue
} }
@ -133,16 +134,23 @@ func (c *Channel) reader() {
} }
func (c *Channel) close() { func (c *Channel) close() {
// Unsubscribe from all events // The channel is now closed
for eventName := range c.events { c.closed = true
// Unsubscribe from all Events
for eventName := range c.Events {
Eventers[eventName].Unsubscribe(c) Eventers[eventName].Unsubscribe(c)
} }
// Tell the writer to stop // Tell the writer to stop
c.done <- struct{}{} c.done <- struct{}{}
} }
// Error sends an error into the errorStream channel // Error sends an error into the errorStream channel
func (c *Channel) Error(name string, err error) { func (c *Channel) Error(name string, err error) {
if c.closed {
return
}
c.log.WithField("name", name).Warn(err) c.log.WithField("name", name).Warn(err)
c.serverErrorStream <- ServerError{ c.serverErrorStream <- ServerError{
Event: Event{ Event: Event{
@ -158,6 +166,9 @@ func (c *Channel) Error(name string, err error) {
// FatalError sends an error into the errorStream channel // FatalError sends an error into the errorStream channel
func (c *Channel) FatalError(name string, err error) { func (c *Channel) FatalError(name string, err error) {
if c.closed {
return
}
c.log.WithField("name", name).Warn(err) c.log.WithField("name", name).Warn(err)
c.serverErrorStream <- ServerError{ c.serverErrorStream <- ServerError{
Event: Event{ Event: Event{
@ -171,6 +182,14 @@ func (c *Channel) FatalError(name string, err error) {
} }
} }
// SendEvent sends an event to the serverEvent channel
func (c *Channel) sendEvent(event ServerEvent) {
if c.closed {
return
}
c.serverEventStream <- event
}
// Unsubscribe unsubscribes a user from all eventers // Unsubscribe unsubscribes a user from all eventers
func Unsubscribe(u *models.User) { func Unsubscribe(u *models.User) {
chanl := &Channel{User: u} chanl := &Channel{User: u}

View File

@ -55,4 +55,5 @@ type Eventer interface {
NotifyAll(data interface{}) NotifyAll(data interface{})
Launch() error Launch() error
Finish() Finish()
FatalError(error)
} }

View File

@ -70,8 +70,9 @@ func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
log: env.Log, log: env.Log,
User: user, User: user,
db: env.Database, db: env.Database,
events: map[string]struct{}{}, Events: map[string]struct{}{},
id: requestID, ID: requestID,
closed: false,
} }
// Launch the go routine responsible for writing events in the websocket // Launch the go routine responsible for writing events in the websocket
@ -87,7 +88,7 @@ func PolochonHookHandler(env *web.Env, w http.ResponseWriter, r *http.Request) e
vars := mux.Vars(r) vars := mux.Vars(r)
id := vars["id"] id := vars["id"]
token := r.URL.Query().Get("token") token := r.URL.Query().Get("token")
env.Log.Infof("Got call on hook with %s and %s", id, token) env.Log.Infof("got call on hook with %s and %s", id, token)
// Get the polochon associated // Get the polochon associated
p, err := models.GetPolochonByID(env.Database, id) p, err := models.GetPolochonByID(env.Database, id)
@ -118,3 +119,18 @@ func PolochonHookHandler(env *web.Env, w http.ResponseWriter, r *http.Request) e
return env.RenderOK(w, "All good") return env.RenderOK(w, "All good")
} }
// HookDebugHandler handles the websockets messages
func HookDebugHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
debug := map[string]map[string][]*Channel{}
for e, event := range Eventers {
debug[e] = map[string][]*Channel{}
for poloName, polo := range event.polochons {
debug[e][poloName] = polo.Subscribers()
}
}
return env.RenderJSON(w, debug)
}

View File

@ -55,7 +55,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error {
// listening // listening
tn, ok := p.polochons[chanl.User.PolochonID.String] tn, ok := p.polochons[chanl.User.PolochonID.String]
if !ok { if !ok {
p.log.Debugf("Eventer not already created, create it for polochon %s", chanl.User.PolochonID.String) p.log.Debugf("create new eventer for polochon %s", chanl.User.PolochonID.String)
// Get the user's polochon // Get the user's polochon
polo, err := chanl.User.GetPolochon(chanl.db) polo, err := chanl.User.GetPolochon(chanl.db)
@ -71,6 +71,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error {
go func() { go func() {
err := tn.Launch() err := tn.Launch()
if err != nil { if err != nil {
tn.FatalError(err)
delete(p.polochons, chanl.User.PolochonID.String) delete(p.polochons, chanl.User.PolochonID.String)
} }
}() }()
@ -81,7 +82,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error {
// Add the Channel to the Eventer // Add the Channel to the Eventer
tn.Append(chanl) tn.Append(chanl)
p.log.Debugf("Eventer created for polochon %s ...", chanl.User.PolochonID.String) p.log.Debugf("eventer created for polochon %s", chanl.User.PolochonID.String)
return nil return nil
} }
@ -96,14 +97,14 @@ func (p *PolochonEventers) Unsubscribe(chanl *Channel) {
tn, ok := p.polochons[chanl.User.PolochonID.String] tn, ok := p.polochons[chanl.User.PolochonID.String]
if !ok { if !ok {
p.log.Warnf("no Eventer for polochon %s, not unsubscribing", chanl.User.PolochonID.String) p.log.Warnf("no eventer for polochon %s, not unsubscribing", chanl.User.PolochonID.String)
return return
} }
tn.Unsubscribe(chanl) tn.Unsubscribe(chanl)
if len(tn.Subscribers()) == 0 { if len(tn.Subscribers()) == 0 {
p.log.Debugf("just deleted the last user of this polochon instance, delete it") p.log.Debugf("empty subscribers for this polochon, delete it")
tn.Finish() tn.Finish()
// Delete the polochon from the Eventer when it's finished // Delete the polochon from the Eventer when it's finished
delete(p.polochons, chanl.User.PolochonID.String) delete(p.polochons, chanl.User.PolochonID.String)
@ -124,15 +125,15 @@ func (e *BaseEventer) Unsubscribe(chanl *Channel) error {
// Ugly ... // Ugly ...
// If we don't have a channel ID, check the user ID // If we don't have a channel ID, check the user ID
// Else check for the channel ID // Else check for the channel ID
if (chanl.id == "" && e.users[i].User.ID != chanl.User.ID) || if (chanl.ID == "" && e.users[i].User.ID != chanl.User.ID) ||
(chanl.id != "" && e.users[i].id != chanl.id) { (chanl.ID != "" && e.users[i].ID != chanl.ID) {
i++ i++
continue continue
} }
e.log.Debugf("found the user channel %s for user %s, deleting it...", chanl.id, chanl.User.Name) e.log.Debugf("found the user channel %s for user %s, deleting it...", chanl.ID, chanl.User.Name)
// Delete this event from the list of events the channel is subscribed // Delete this event from the list of events the channel is subscribed
delete(e.users[i].events, e.name) delete(e.users[i].Events, e.name)
// Send the disconnected event // Send the disconnected event
event := ServerEvent{ event := ServerEvent{
@ -142,7 +143,7 @@ func (e *BaseEventer) Unsubscribe(chanl *Channel) error {
}, },
} }
e.users[i].serverEventStream <- event e.users[i].sendEvent(event)
// Replace the current element with the last one // Replace the current element with the last one
e.users[i] = e.users[len(e.users)-1] e.users[i] = e.users[len(e.users)-1]
@ -160,7 +161,7 @@ func (e *BaseEventer) FatalError(err error) {
// Send the error // Send the error
chanl.FatalError(e.name, err) chanl.FatalError(e.name, err)
// Delete the event from the channel events // Delete the event from the channel events
delete(chanl.events, e.name) delete(chanl.Events, e.name)
} }
} }
@ -188,6 +189,6 @@ func (e *BaseEventer) NotifyAll(data interface{}) {
// Send the events to all the subscribed users // Send the events to all the subscribed users
for _, chanl := range e.users { for _, chanl := range e.users {
e.log.Debugf("sending event to %s", chanl.User.Name) e.log.Debugf("sending event to %s", chanl.User.Name)
chanl.serverEventStream <- event chanl.sendEvent(event)
} }
} }

View File

@ -33,8 +33,7 @@ func NewTorrentEventer(polo *models.Polochon, log *logrus.Entry) (Eventer, error
// Create a new papi client // Create a new papi client
client, err := polo.NewPapiClient() client, err := polo.NewPapiClient()
if err != nil { if err != nil {
log.Warnf("failed to create the polochon client %s", err) return nil, fmt.Errorf("failed to instanciate polochon client %s", err)
return nil, fmt.Errorf("failed to instanciate polochon client")
} }
// This is the first time this polochon is requested, create the TorrentEventer // This is the first time this polochon is requested, create the TorrentEventer
@ -64,7 +63,7 @@ func (t *TorrentEventer) Append(chanl *Channel) {
Data: t.torrents, Data: t.torrents,
} }
chanl.serverEventStream <- event chanl.sendEvent(event)
t.BaseEventer.Append(chanl) t.BaseEventer.Append(chanl)
} }
@ -78,8 +77,6 @@ func (t *TorrentEventer) Launch() error {
err := t.torrentsUpdate() err := t.torrentsUpdate()
if err != nil { if err != nil {
t.log.Warnf("got error getting torrent update: %s", err)
t.FatalError(err)
return err return err
} }
@ -88,8 +85,6 @@ func (t *TorrentEventer) Launch() error {
case <-timeTicker.C: case <-timeTicker.C:
err := t.torrentsUpdate() err := t.torrentsUpdate()
if err != nil { if err != nil {
t.log.Warnf("got error getting torrent update: %s", err)
t.FatalError(err)
return err return err
} }
case <-t.done: case <-t.done:

View File

@ -82,6 +82,7 @@ func setupRoutes(env *web.Env) {
// Route to handle websocket events // Route to handle websocket events
env.Handle("/events/polochons/{id}/hook", events.PolochonHookHandler).Methods("POST") env.Handle("/events/polochons/{id}/hook", events.PolochonHookHandler).Methods("POST")
env.Handle("/events/debug", events.HookDebugHandler).WithRole(models.AdminRole).Methods("GET")
// Admin routes // Admin routes
env.Handle("/admins/users", admin.GetUsersHandler).WithRole(models.AdminRole).Methods("GET") env.Handle("/admins/users", admin.GetUsersHandler).WithRole(models.AdminRole).Methods("GET")