From e41c9bfdfa209932d37bf5405f8539d568d3292e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Delattre?= Date: Sat, 11 Apr 2020 12:25:29 +0200 Subject: [PATCH] Return the video details embedded in the torrents This requires the eventers to have the app env --- backend/events/channel.go | 44 +++++++++++++++---------------- backend/events/handlers.go | 26 +++++++++++++----- backend/events/polochons.go | 39 ++++++++++++--------------- backend/events/torrent_eventer.go | 28 ++++++++++++-------- backend/events/video_eventer.go | 10 +++---- backend/models/torrent_video.go | 42 +++++++++++++++++++++++++++++ backend/torrents/handlers.go | 10 ++++++- 7 files changed, 131 insertions(+), 68 deletions(-) create mode 100644 backend/models/torrent_video.go diff --git a/backend/events/channel.go b/backend/events/channel.go index 64f2133..e92a5e0 100644 --- a/backend/events/channel.go +++ b/backend/events/channel.go @@ -5,6 +5,7 @@ import ( "time" "git.quimbo.fr/odwrtw/canape/backend/models" + "git.quimbo.fr/odwrtw/canape/backend/web" "github.com/gorilla/websocket" "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" @@ -12,7 +13,6 @@ import ( // Channel represents the channel of the user and the server type Channel struct { - log *logrus.Entry // Channel where the eventer will write messages to serverEventStream chan ServerEvent // Channel where the eventer will write errors to @@ -31,7 +31,7 @@ type Channel struct { } // go routine writing events to the websocket connection -func (c *Channel) writer() { +func (c *Channel) writer(env *web.Env) { // Create the ping timer that will ping the client every pingWait seconds // to check that he's still listening pingTicker := time.NewTicker(pingWait) @@ -46,30 +46,30 @@ func (c *Channel) writer() { case <-pingTicker.C: _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { - c.log.Warnf("error writing message: %s", err) + env.Log.Warnf("error writing message: %s", err) return } case e := <-c.serverEventStream: _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteJSON(e); err != nil { - c.log.Warnf("error writing JSON message: %s", err) + env.Log.Warnf("error writing JSON message: %s", err) return } case err := <-c.serverErrorStream: _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteJSON(err); err != nil { - c.log.Warnf("error writing JSON error: %s", err) + env.Log.Warnf("error writing JSON error: %s", err) return } case <-c.done: - c.log.Debug("all done finished") + env.Log.Debug("all done finished") return } } } // go routine reading messages from the websocket connection -func (c *Channel) reader() { +func (c *Channel) reader(env *web.Env) { // Read loop _ = c.conn.SetReadDeadline(time.Now().Add(pongWait)) for { @@ -79,19 +79,19 @@ func (c *Channel) reader() { if err := c.conn.ReadJSON(&msg); err != nil { switch e := err.(type) { case *websocket.CloseError: - c.log.Info("close error") + env.Log.Info("close error") case net.Error: if e.Timeout() { - c.log.WithField( + env.Log.WithField( "error", err, ).Warn("timeout") } else { - c.log.WithField( + env.Log.WithField( "error", err, ).Warn("unknown net error") } default: - c.log.WithField( + env.Log.WithField( "error", err, ).Warn("unknown error reading message") } @@ -104,31 +104,31 @@ func (c *Channel) reader() { e, ok := Eventers[msg.Message] if !ok { - c.log.Warnf("no such event to subscribe %q", msg.Message) + env.Log.Warnf("no such event to subscribe %q", msg.Message) continue } switch msg.Type { case "subscribe": - c.log.Debugf("subscribe to %s", msg.Message) + env.Log.Debugf("subscribe to %s", msg.Message) if _, ok := c.Events[e.Name]; ok { - c.log.Infof("user %s is already subscribed to %s", c.User.Name, e.Name) + env.Log.Infof("user %s is already subscribed to %s", c.User.Name, e.Name) continue } if err := e.Subscribe(c); err != nil { - c.Error(e.Name, err) + c.Error(e.Name, err, env.Log) continue } c.Events[e.Name] = struct{}{} case "unsubscribe": - c.log.Debugf("unsubscribe from %s", msg.Message) + env.Log.Debugf("unsubscribe from %s", msg.Message) if _, ok := c.Events[e.Name]; !ok { - c.log.Infof("user %s is not subscribed to %s", c.User.Name, e.Name) + env.Log.Infof("user %s is not subscribed to %s", c.User.Name, e.Name) continue } e.Unsubscribe(c) default: - c.log.Warnf("invalid type: %s", msg.Type) + env.Log.Warnf("invalid type: %s", msg.Type) } } } @@ -147,11 +147,11 @@ func (c *Channel) close() { } // Error sends an error into the errorStream channel -func (c *Channel) Error(name string, err error) { +func (c *Channel) Error(name string, err error, log *logrus.Entry) { if c.closed { return } - c.log.WithField("name", name).Warn(err) + log.WithField("name", name).Warn(err) c.serverErrorStream <- ServerError{ Event: Event{ Type: name, @@ -165,11 +165,11 @@ func (c *Channel) Error(name string, err error) { } // FatalError sends an error into the errorStream channel -func (c *Channel) FatalError(name string, err error) { +func (c *Channel) FatalError(name string, err error, log *logrus.Entry) { if c.closed { return } - c.log.WithField("name", name).Warn(err) + log.WithField("name", name).Warn(err) c.serverErrorStream <- ServerError{ Event: Event{ Type: name, diff --git a/backend/events/handlers.go b/backend/events/handlers.go index de430c2..782b7c0 100644 --- a/backend/events/handlers.go +++ b/backend/events/handlers.go @@ -22,14 +22,26 @@ const ( writeWait = 30 * time.Second ) -// Eventers is a map of all the available Eventers -var Eventers = map[string]*PolochonEventers{ - torrentEventName: NewTorrentEventers(), - videoEventName: NewVideoEventers(), +// Eventers is a global map of all the available Eventers +var Eventers map[string]*PolochonEventers +var eventersSetup bool + +func initEventers(env *web.Env) { + if eventersSetup { + return + } + + Eventers = map[string]*PolochonEventers{ + torrentEventName: NewTorrentEventers(env), + videoEventName: NewVideoEventers(env), + } + eventersSetup = true } // WsHandler handles the websockets messages func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { + initEventers(env) + // Get the user user := auth.GetCurrentUser(r, env.Log) @@ -67,7 +79,6 @@ func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { serverErrorStream: serverErrorStream, done: make(chan struct{}, 1), conn: ws, - log: env.Log, User: user, db: env.Database, Events: map[string]struct{}{}, @@ -76,9 +87,9 @@ func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { } // Launch the go routine responsible for writing events in the websocket - go c.writer() + go c.writer(env) // Launch the reader responsible for reading events from the websocket - c.reader() + c.reader(env) return nil } @@ -122,6 +133,7 @@ func PolochonHookHandler(env *web.Env, w http.ResponseWriter, r *http.Request) e // HookDebugHandler handles the websockets messages func HookDebugHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { + initEventers(env) debug := map[string]map[string][]*Channel{} for e, event := range Eventers { diff --git a/backend/events/polochons.go b/backend/events/polochons.go index fb03f11..d65e7b7 100644 --- a/backend/events/polochons.go +++ b/backend/events/polochons.go @@ -5,13 +5,13 @@ import ( "sync" "git.quimbo.fr/odwrtw/canape/backend/models" - "github.com/sirupsen/logrus" + "git.quimbo.fr/odwrtw/canape/backend/web" ) // BaseEventer represents the basis of a Eventer type BaseEventer struct { + env *web.Env users []*Channel - log *logrus.Entry name string } @@ -20,22 +20,17 @@ type BaseEventer struct { // Eventer per polochon type PolochonEventers struct { sync.RWMutex + env *web.Env Name string - log *logrus.Entry polochons map[string]Eventer - NewEventer func(polo *models.Polochon, log *logrus.Entry) (Eventer, error) + NewEventer func(*web.Env, *models.Polochon) (Eventer, error) } // NewEventers returns a new PolochonEventers -func NewEventers() *PolochonEventers { - // Setup the logger - logger := logrus.New() - logger.Formatter = &logrus.TextFormatter{FullTimestamp: true} - logger.Level = logrus.DebugLevel - +func NewEventers(env *web.Env) *PolochonEventers { return &PolochonEventers{ + env: env, polochons: map[string]Eventer{}, - log: logrus.NewEntry(logger), } } @@ -46,7 +41,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error { p.Lock() defer p.Unlock() - p.log.Debugf("subscribing with the user %s", chanl.User.Name) + p.env.Log.Debugf("subscribing with the user %s", chanl.User.Name) if !chanl.User.PolochonActivated { return fmt.Errorf("polochon not activated") } @@ -55,7 +50,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error { // listening tn, ok := p.polochons[chanl.User.PolochonID.String] if !ok { - p.log.Debugf("create new eventer for polochon %s", chanl.User.PolochonID.String) + p.env.Log.Debugf("create new eventer for polochon %s", chanl.User.PolochonID.String) // Get the user's polochon polo, err := chanl.User.GetPolochon(chanl.db) @@ -64,7 +59,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error { } // Create a new Eventer for this polochon - tn, err = p.NewEventer(polo, p.log) + tn, err = p.NewEventer(p.env, polo) if err != nil { return err } @@ -82,7 +77,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error { // Add the Channel to the Eventer tn.Append(chanl) - p.log.Debugf("eventer created for polochon %s", chanl.User.PolochonID.String) + p.env.Log.Debugf("eventer created for polochon %s", chanl.User.PolochonID.String) return nil } @@ -93,21 +88,21 @@ func (p *PolochonEventers) Unsubscribe(chanl *Channel) { p.Lock() defer p.Unlock() - p.log.Debugf("unsubscribing from %s with %s", p.Name, chanl.User.Name) + p.env.Log.Debugf("unsubscribing from %s with %s", p.Name, chanl.User.Name) tn, ok := p.polochons[chanl.User.PolochonID.String] if !ok { - p.log.Warnf("no eventer for polochon %s, not unsubscribing", chanl.User.PolochonID.String) + p.env.Log.Warnf("no eventer for polochon %s, not unsubscribing", chanl.User.PolochonID.String) return } if err := tn.Unsubscribe(chanl); err != nil { - p.log.Errorf("failed to unsubscribe eventer: %s", err.Error()) + p.env.Log.Errorf("failed to unsubscribe eventer: %s", err.Error()) // TODO: check if we need to return here } if len(tn.Subscribers()) == 0 { - p.log.Debugf("empty subscribers for this polochon, delete it") + p.env.Log.Debugf("empty subscribers for this polochon, delete it") tn.Finish() // Delete the polochon from the Eventer when it's finished delete(p.polochons, chanl.User.PolochonID.String) @@ -133,7 +128,7 @@ func (e *BaseEventer) Unsubscribe(chanl *Channel) error { i++ continue } - e.log.Debugf("found the user channel %s for user %s, deleting it...", chanl.ID, chanl.User.Name) + e.env.Log.Debugf("found the user channel %s for user %s, deleting it...", chanl.ID, chanl.User.Name) // Delete this event from the list of events the channel is subscribed delete(e.users[i].Events, e.name) @@ -162,7 +157,7 @@ func (e *BaseEventer) Unsubscribe(chanl *Channel) error { func (e *BaseEventer) FatalError(err error) { for _, chanl := range e.users { // Send the error - chanl.FatalError(e.name, err) + chanl.FatalError(e.name, err, e.env.Log) // Delete the event from the channel events delete(chanl.Events, e.name) } @@ -191,7 +186,7 @@ func (e *BaseEventer) NotifyAll(data interface{}) { // Send the events to all the subscribed users for _, chanl := range e.users { - e.log.Debugf("sending event to %s", chanl.User.Name) + e.env.Log.Debugf("sending event to %s", chanl.User.Name) chanl.sendEvent(event) } } diff --git a/backend/events/torrent_eventer.go b/backend/events/torrent_eventer.go index 269a7a3..45dece2 100644 --- a/backend/events/torrent_eventer.go +++ b/backend/events/torrent_eventer.go @@ -6,8 +6,8 @@ import ( "time" "git.quimbo.fr/odwrtw/canape/backend/models" + "git.quimbo.fr/odwrtw/canape/backend/web" "github.com/odwrtw/papi" - "github.com/sirupsen/logrus" ) // TorrentEventer represents the Eventer for torrents @@ -21,15 +21,15 @@ type TorrentEventer struct { var torrentEventName = "torrents" // NewTorrentEventers returns a new PolochonEventers for torrents -func NewTorrentEventers() *PolochonEventers { - eventer := NewEventers() +func NewTorrentEventers(env *web.Env) *PolochonEventers { + eventer := NewEventers(env) eventer.NewEventer = NewTorrentEventer eventer.Name = torrentEventName return eventer } // NewTorrentEventer returns a new Eventer for a specific Polochon -func NewTorrentEventer(polo *models.Polochon, log *logrus.Entry) (Eventer, error) { +func NewTorrentEventer(env *web.Env, polo *models.Polochon) (Eventer, error) { // Create a new papi client client, err := polo.NewPapiClient() if err != nil { @@ -39,8 +39,8 @@ func NewTorrentEventer(polo *models.Polochon, log *logrus.Entry) (Eventer, error // This is the first time this polochon is requested, create the TorrentEventer tn := &TorrentEventer{ BaseEventer: &BaseEventer{ + env: env, users: []*Channel{}, - log: log, name: torrentEventName, }, pClient: client, @@ -77,7 +77,7 @@ func (t *TorrentEventer) Launch() error { err := t.torrentsUpdate() if err != nil { - t.log.Warnf("got error getting torrents: %s", err) + t.env.Log.Warnf("got error getting torrents: %s", err) } for { @@ -85,10 +85,10 @@ func (t *TorrentEventer) Launch() error { case <-timeTicker.C: err := t.torrentsUpdate() if err != nil { - t.log.Warnf("got error getting torrents: %s", err) + t.env.Log.Warnf("got error getting torrents: %s", err) } case <-t.done: - t.log.Debug("quit torrent notifier") + t.env.Log.Debug("quit torrent notifier") return nil } } @@ -106,9 +106,15 @@ func (t *TorrentEventer) torrentsUpdate() error { return nil } - t.log.Debugf("torrents have changed!") + t.env.Log.Debugf("torrents have changed!") - t.NotifyAll(torrents) + notification := make([]*models.TorrentVideo, len(torrents)) + for i := range torrents { + notification[i] = models.NewTorrentVideo(&torrents[i]) + notification[i].Update(t.env.Backend.Detailer, t.env.Log) + } + + t.NotifyAll(notification) t.torrents = torrents @@ -118,6 +124,6 @@ func (t *TorrentEventer) torrentsUpdate() error { // Finish implements the Eventer interface // It is called when there is no more users subscribed func (t *TorrentEventer) Finish() { - t.log.Debugf("sending the done channel") + t.env.Log.Debugf("sending the done channel") t.done <- struct{}{} } diff --git a/backend/events/video_eventer.go b/backend/events/video_eventer.go index 5f00ca6..975136f 100644 --- a/backend/events/video_eventer.go +++ b/backend/events/video_eventer.go @@ -2,7 +2,7 @@ package events import ( "git.quimbo.fr/odwrtw/canape/backend/models" - "github.com/sirupsen/logrus" + "git.quimbo.fr/odwrtw/canape/backend/web" ) // VideoEventer represents the Eventer for tests @@ -13,19 +13,19 @@ type VideoEventer struct { var videoEventName = "newVideo" // NewVideoEventers implements the Eventer interface -func NewVideoEventers() *PolochonEventers { - eventer := NewEventers() +func NewVideoEventers(env *web.Env) *PolochonEventers { + eventer := NewEventers(env) eventer.NewEventer = NewVideoEventer eventer.Name = videoEventName return eventer } // NewVideoEventer returns a new Eventer -func NewVideoEventer(polo *models.Polochon, log *logrus.Entry) (Eventer, error) { +func NewVideoEventer(env *web.Env, polo *models.Polochon) (Eventer, error) { return &VideoEventer{ BaseEventer: &BaseEventer{ + env: env, users: []*Channel{}, - log: log, name: videoEventName, }, }, nil diff --git a/backend/models/torrent_video.go b/backend/models/torrent_video.go new file mode 100644 index 0000000..e6c745b --- /dev/null +++ b/backend/models/torrent_video.go @@ -0,0 +1,42 @@ +package models + +import ( + "github.com/odwrtw/papi" + polochon "github.com/odwrtw/polochon/lib" + "github.com/sirupsen/logrus" +) + +// TorrentVideo reprensents a torrent embeding the video inforamtions +type TorrentVideo struct { + *papi.Torrent + Video polochon.Video `json:"video,omitempty"` +} + +// NewTorrentVideo returns a new TorrentVideo +func NewTorrentVideo(t *papi.Torrent) *TorrentVideo { + torrent := &polochon.Torrent{ + ImdbID: t.ImdbID, + Type: polochon.VideoType(t.Type), + Quality: polochon.Quality(t.Quality), + Season: t.Season, + Episode: t.Episode, + } + + return &TorrentVideo{ + Torrent: t, + Video: torrent.Video(), + } +} + +// Update updates the Torrent video with the database details +func (t *TorrentVideo) Update(detailer polochon.Detailer, log *logrus.Entry) { + if t.Video == nil { + return + } + + // TODO: refresh the video in db if not found + err := detailer.GetDetails(t.Video, log) + if err != nil { + log.WithField("function", "TorrentVideo.Update").Errorf(err.Error()) + } +} diff --git a/backend/torrents/handlers.go b/backend/torrents/handlers.go index 58f975d..edcb76a 100644 --- a/backend/torrents/handlers.go +++ b/backend/torrents/handlers.go @@ -7,6 +7,7 @@ import ( "sort" "git.quimbo.fr/odwrtw/canape/backend/auth" + "git.quimbo.fr/odwrtw/canape/backend/models" "git.quimbo.fr/odwrtw/canape/backend/web" "github.com/gorilla/mux" "github.com/odwrtw/papi" @@ -46,11 +47,18 @@ func ListHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error { return env.RenderError(w, err) } - torrents, err := client.GetTorrents() + list, err := client.GetTorrents() if err != nil { return env.RenderError(w, err) } + torrents := make([]*models.TorrentVideo, len(list)) + for i, t := range list { + tv := models.NewTorrentVideo(&t) + tv.Update(env.Backend.Detailer, env.Log) + torrents[i] = tv + } + return env.RenderJSON(w, torrents) }