193 lines
5.1 KiB
Go
193 lines
5.1 KiB
Go
package events
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"git.quimbo.fr/odwrtw/canape/backend/models"
|
|
"git.quimbo.fr/odwrtw/canape/backend/web"
|
|
)
|
|
|
|
// BaseEventer represents the basis of a Eventer
|
|
type BaseEventer struct {
|
|
env *web.Env
|
|
users []*Channel
|
|
name string
|
|
}
|
|
|
|
// PolochonEventers represents the basis of Eventers
|
|
// It is mainly composed of a map of polochons, and a Method to create new
|
|
// Eventer per polochon
|
|
type PolochonEventers struct {
|
|
sync.RWMutex
|
|
env *web.Env
|
|
Name string
|
|
polochons map[string]Eventer
|
|
NewEventer func(*web.Env, *models.Polochon) (Eventer, error)
|
|
}
|
|
|
|
// NewEventers returns a new PolochonEventers
|
|
func NewEventers(env *web.Env) *PolochonEventers {
|
|
return &PolochonEventers{
|
|
env: env,
|
|
polochons: map[string]Eventer{},
|
|
}
|
|
}
|
|
|
|
// Subscribe subscribes a channel to a PolochonEventers
|
|
// It fetches or creates a new Eventer for the needed polochon, call Launch()
|
|
// in a go routine and then Append the user to the list of subscribed users
|
|
func (p *PolochonEventers) Subscribe(chanl *Channel) error {
|
|
p.Lock()
|
|
defer p.Unlock()
|
|
|
|
p.env.Log.Debugf("subscribing with the user %s", chanl.User.Name)
|
|
if !chanl.User.PolochonActivated {
|
|
return fmt.Errorf("polochon not activated")
|
|
}
|
|
|
|
// If we have the polochon, just append the user to the list of users
|
|
// listening
|
|
tn, ok := p.polochons[chanl.User.PolochonID.String]
|
|
if !ok {
|
|
p.env.Log.Debugf("create new eventer for polochon %s", chanl.User.PolochonID.String)
|
|
|
|
// Get the user's polochon
|
|
polo, err := chanl.User.GetPolochon(chanl.db)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to retrieve polochon")
|
|
}
|
|
|
|
// Create a new Eventer for this polochon
|
|
tn, err = p.NewEventer(p.env, polo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
go func() {
|
|
err := tn.Launch()
|
|
if err != nil {
|
|
tn.FatalError(err)
|
|
delete(p.polochons, chanl.User.PolochonID.String)
|
|
}
|
|
}()
|
|
|
|
p.polochons[chanl.User.PolochonID.String] = tn
|
|
}
|
|
|
|
// Add the Channel to the Eventer
|
|
tn.Append(chanl)
|
|
|
|
p.env.Log.Debugf("eventer created for polochon %s", chanl.User.PolochonID.String)
|
|
return nil
|
|
}
|
|
|
|
// Unsubscribe unsubscribes a client from receiving any more events from
|
|
// the Eventer
|
|
// If the list of users is empty, delete the Eventer
|
|
func (p *PolochonEventers) Unsubscribe(chanl *Channel) {
|
|
p.Lock()
|
|
defer p.Unlock()
|
|
|
|
p.env.Log.Debugf("unsubscribing from %s with %s", p.Name, chanl.User.Name)
|
|
|
|
tn, ok := p.polochons[chanl.User.PolochonID.String]
|
|
if !ok {
|
|
p.env.Log.Warnf("no eventer for polochon %s, not unsubscribing", chanl.User.PolochonID.String)
|
|
return
|
|
}
|
|
|
|
if err := tn.Unsubscribe(chanl); err != nil {
|
|
p.env.Log.Errorf("failed to unsubscribe eventer: %s", err.Error())
|
|
// TODO: check if we need to return here
|
|
}
|
|
|
|
if len(tn.Subscribers()) == 0 {
|
|
p.env.Log.Debugf("empty subscribers for this polochon, delete it")
|
|
tn.Finish()
|
|
// Delete the polochon from the Eventer when it's finished
|
|
delete(p.polochons, chanl.User.PolochonID.String)
|
|
}
|
|
}
|
|
|
|
// Unsubscribe unsubscribes a client from receiving any more events from
|
|
// the Eventer
|
|
func (e *BaseEventer) Unsubscribe(chanl *Channel) error {
|
|
// Need to delete the given channel from the channel list
|
|
// TODO: Find a better way to handle this
|
|
// It is needed when the user changes his polochon, or get deactivated
|
|
// If the ID of the channel is empty, delete all the channels of
|
|
// this user
|
|
l := len(e.users)
|
|
i := 0
|
|
for i < l {
|
|
// Ugly ...
|
|
// If we don't have a channel ID, check the user ID
|
|
// Else check for the channel ID
|
|
if (chanl.ID == "" && e.users[i].User.ID != chanl.User.ID) ||
|
|
(chanl.ID != "" && e.users[i].ID != chanl.ID) {
|
|
i++
|
|
continue
|
|
}
|
|
e.env.Log.Debugf("found the user channel %s for user %s, deleting it...", chanl.ID, chanl.User.Name)
|
|
|
|
// Delete this event from the list of events the channel is subscribed
|
|
delete(e.users[i].Events, e.name)
|
|
|
|
// Send the disconnected event
|
|
event := ServerEvent{
|
|
Event: Event{
|
|
Type: e.name,
|
|
Status: Disconnected,
|
|
},
|
|
}
|
|
|
|
e.users[i].sendEvent(event)
|
|
|
|
// Replace the current element with the last one
|
|
e.users[i] = e.users[len(e.users)-1]
|
|
e.users = e.users[:len(e.users)-1]
|
|
l--
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// FatalError sends an error into the errorStream channel and tell all the
|
|
// clients that the error is fatal, the notifier won't communicate until a new
|
|
// subscribe
|
|
func (e *BaseEventer) FatalError(err error) {
|
|
for _, chanl := range e.users {
|
|
// Send the error
|
|
chanl.FatalError(e.name, err, e.env.Log)
|
|
// Delete the event from the channel events
|
|
delete(chanl.Events, e.name)
|
|
}
|
|
}
|
|
|
|
// Append just append a channel to the list of users
|
|
func (e *BaseEventer) Append(chanl *Channel) {
|
|
e.users = append(e.users, chanl)
|
|
}
|
|
|
|
// Subscribers just returns the list of subscribers
|
|
func (e *BaseEventer) Subscribers() []*Channel {
|
|
return e.users
|
|
}
|
|
|
|
// NotifyAll notifies all the listener
|
|
func (e *BaseEventer) NotifyAll(data interface{}) {
|
|
// If they're different, prepare the event
|
|
event := ServerEvent{
|
|
Event: Event{
|
|
Type: e.name,
|
|
Status: OK,
|
|
},
|
|
Data: data,
|
|
}
|
|
|
|
// Send the events to all the subscribed users
|
|
for _, chanl := range e.users {
|
|
e.env.Log.Debugf("sending event to %s", chanl.User.Name)
|
|
chanl.sendEvent(event)
|
|
}
|
|
}
|