canape/backend/events/channel.go
Grégoire Delattre 7be65b6a9a Get the video images from the models
Return the video details embedded in the torrents

This requires the eventers to have the app env
2020-04-13 17:38:47 +02:00

200 lines
4.8 KiB
Go

package events
import (
"net"
"time"
"git.quimbo.fr/odwrtw/canape/backend/models"
"git.quimbo.fr/odwrtw/canape/backend/web"
"github.com/gorilla/websocket"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
)
// Channel represents the channel of the user and the server
type Channel struct {
// Channel where the eventer will write messages to
serverEventStream chan ServerEvent
// Channel where the eventer will write errors to
serverErrorStream chan ServerError
// Done channel when we need to stop the connection
done chan struct{}
// Underlying ws connection
conn *websocket.Conn
// A channel is directly linked to a user
User *models.User `json:"user"`
// List of events the user is listening to
Events map[string]struct{} `json:"events"`
ID string `json:"id"`
db *sqlx.DB
closed bool
}
// go routine writing events to the websocket connection
func (c *Channel) writer(env *web.Env) {
// Create the ping timer that will ping the client every pingWait seconds
// to check that he's still listening
pingTicker := time.NewTicker(pingWait)
defer pingTicker.Stop()
// If we exit, close the connection to tell the reader to stop
defer c.conn.Close()
// Write loop
for {
select {
case <-pingTicker.C:
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
env.Log.Warnf("error writing message: %s", err)
return
}
case e := <-c.serverEventStream:
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteJSON(e); err != nil {
env.Log.Warnf("error writing JSON message: %s", err)
return
}
case err := <-c.serverErrorStream:
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteJSON(err); err != nil {
env.Log.Warnf("error writing JSON error: %s", err)
return
}
case <-c.done:
env.Log.Debug("all done finished")
return
}
}
}
// go routine reading messages from the websocket connection
func (c *Channel) reader(env *web.Env) {
// Read loop
_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
for {
msg := ClientMessage{}
// Read the client's message
if err := c.conn.ReadJSON(&msg); err != nil {
switch e := err.(type) {
case *websocket.CloseError:
env.Log.Info("close error")
case net.Error:
if e.Timeout() {
env.Log.WithField(
"error", err,
).Warn("timeout")
} else {
env.Log.WithField(
"error", err,
).Warn("unknown net error")
}
default:
env.Log.WithField(
"error", err,
).Warn("unknown error reading message")
}
// We got an error, we close all the subscribed events and stop the
// writer goroutine
c.close()
return
}
e, ok := Eventers[msg.Message]
if !ok {
env.Log.Warnf("no such event to subscribe %q", msg.Message)
continue
}
switch msg.Type {
case "subscribe":
env.Log.Debugf("subscribe to %s", msg.Message)
if _, ok := c.Events[e.Name]; ok {
env.Log.Infof("user %s is already subscribed to %s", c.User.Name, e.Name)
continue
}
if err := e.Subscribe(c); err != nil {
c.Error(e.Name, err, env.Log)
continue
}
c.Events[e.Name] = struct{}{}
case "unsubscribe":
env.Log.Debugf("unsubscribe from %s", msg.Message)
if _, ok := c.Events[e.Name]; !ok {
env.Log.Infof("user %s is not subscribed to %s", c.User.Name, e.Name)
continue
}
e.Unsubscribe(c)
default:
env.Log.Warnf("invalid type: %s", msg.Type)
}
}
}
func (c *Channel) close() {
// The channel is now closed
c.closed = true
// Unsubscribe from all Events
for eventName := range c.Events {
Eventers[eventName].Unsubscribe(c)
}
// Tell the writer to stop
c.done <- struct{}{}
}
// Error sends an error into the errorStream channel
func (c *Channel) Error(name string, err error, log *logrus.Entry) {
if c.closed {
return
}
log.WithField("name", name).Warn(err)
c.serverErrorStream <- ServerError{
Event: Event{
Type: name,
Status: Error,
},
ErrorEvent: ErrorEvent{
Level: WarningError,
Message: err.Error(),
},
}
}
// FatalError sends an error into the errorStream channel
func (c *Channel) FatalError(name string, err error, log *logrus.Entry) {
if c.closed {
return
}
log.WithField("name", name).Warn(err)
c.serverErrorStream <- ServerError{
Event: Event{
Type: name,
Status: Error,
},
ErrorEvent: ErrorEvent{
Level: FatalError,
Message: err.Error(),
},
}
}
// SendEvent sends an event to the serverEvent channel
func (c *Channel) sendEvent(event ServerEvent) {
if c.closed {
return
}
c.serverEventStream <- event
}
// Unsubscribe unsubscribes a user from all eventers
func Unsubscribe(u *models.User) {
chanl := &Channel{User: u}
for _, event := range Eventers {
event.Unsubscribe(chanl)
}
}