package events import ( "fmt" "sync" "git.quimbo.fr/odwrtw/canape/backend/models" "github.com/sirupsen/logrus" ) // BaseEventer represents the basis of a Eventer type BaseEventer struct { users []*Channel log *logrus.Entry name string } // PolochonEventers represents the basis of Eventers // It is mainly composed of a map of polochons, and a Method to create new // Eventer per polochon type PolochonEventers struct { sync.RWMutex Name string log *logrus.Entry polochons map[string]Eventer NewEventer func(polo *models.Polochon, log *logrus.Entry) (Eventer, error) } // NewEventers returns a new PolochonEventers func NewEventers() *PolochonEventers { // Setup the logger logger := logrus.New() logger.Formatter = &logrus.TextFormatter{FullTimestamp: true} logger.Level = logrus.DebugLevel return &PolochonEventers{ polochons: map[string]Eventer{}, log: logrus.NewEntry(logger), } } // Subscribe subscribes a channel to a PolochonEventers // It fetches or creates a new Eventer for the needed polochon, call Launch() // in a go routine and then Append the user to the list of subscribed users func (p *PolochonEventers) Subscribe(chanl *Channel) error { p.Lock() defer p.Unlock() p.log.Debugf("subscribing with the user %s", chanl.User.Name) if !chanl.User.PolochonActivated { return fmt.Errorf("polochon not activated") } // If we have the polochon, just append the user to the list of users // listening tn, ok := p.polochons[chanl.User.PolochonID.String] if !ok { p.log.Debugf("Eventer not already created, create it for polochon %s", chanl.User.PolochonID.String) // Get the user's polochon polo, err := chanl.User.GetPolochon(chanl.db) if err != nil { return fmt.Errorf("failed to retrieve polochon") } // Create a new Eventer for this polochon tn, err = p.NewEventer(polo, p.log) if err != nil { return err } go func() { err := tn.Launch() if err != nil { delete(p.polochons, chanl.User.PolochonID.String) } }() p.polochons[chanl.User.PolochonID.String] = tn } // Add the Channel to the Eventer tn.Append(chanl) p.log.Debugf("Eventer created for polochon %s ...", chanl.User.PolochonID.String) return nil } // Unsubscribe unsubscribes a client from receiving any more events from // the Eventer // If the list of users is empty, delete the Eventer func (p *PolochonEventers) Unsubscribe(chanl *Channel) { p.Lock() defer p.Unlock() p.log.Debugf("unsubscribing from %s with %s", p.Name, chanl.User.Name) tn, ok := p.polochons[chanl.User.PolochonID.String] if !ok { p.log.Warnf("no Eventer for polochon %s, not unsubscribing", chanl.User.PolochonID.String) return } tn.Unsubscribe(chanl) if len(tn.Subscribers()) == 0 { p.log.Debugf("just deleted the last user of this polochon instance, delete it") tn.Finish() // Delete the polochon from the Eventer when it's finished delete(p.polochons, chanl.User.PolochonID.String) } } // Unsubscribe unsubscribes a client from receiving any more events from // the Eventer func (e *BaseEventer) Unsubscribe(chanl *Channel) error { // Need to delete the given channel from the channel list // TODO: Find a better way to handle this // It is needed when the user changes his polochon, or get deactivated // If the ID of the channel is empty, delete all the channels of // this user l := len(e.users) i := 0 for i < l { // Ugly ... // If we don't have a channel ID, check the user ID // Else check for the channel ID if (chanl.id == "" && e.users[i].User.ID != chanl.User.ID) || (chanl.id != "" && e.users[i].id != chanl.id) { i++ continue } e.log.Debugf("found the user channel %s for user %s, deleting it...", chanl.id, chanl.User.Name) // Delete this event from the list of events the channel is subscribed delete(e.users[i].events, e.name) // Send the disconnected event event := ServerEvent{ Event: Event{ Type: e.name, Status: Disconnected, }, } e.users[i].serverEventStream <- event // Replace the current element with the last one e.users[i] = e.users[len(e.users)-1] e.users = e.users[:len(e.users)-1] l-- } return nil } // FatalError sends an error into the errorStream channel and tell all the // clients that the error is fatal, the notifier won't communicate until a new // subscribe func (e *BaseEventer) FatalError(err error) { for _, chanl := range e.users { // Send the error chanl.FatalError(e.name, err) // Delete the event from the channel events delete(chanl.events, e.name) } } // Append just append a channel to the list of users func (e *BaseEventer) Append(chanl *Channel) { e.users = append(e.users, chanl) } // Subscribers just returns the list of subscribers func (e *BaseEventer) Subscribers() []*Channel { return e.users } // NotifyAll notifies all the listener func (e *BaseEventer) NotifyAll(data interface{}) { // If they're different, prepare the event event := ServerEvent{ Event: Event{ Type: e.name, Status: OK, }, Data: data, } // Send the events to all the subscribed users for _, chanl := range e.users { e.log.Debugf("sending event to %s", chanl.User.Name) chanl.serverEventStream <- event } }