Return the video details embedded in the torrents
This requires the eventers to have the app env
This commit is contained in:
parent
101e68372d
commit
e41c9bfdfa
@ -5,6 +5,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.quimbo.fr/odwrtw/canape/backend/models"
|
"git.quimbo.fr/odwrtw/canape/backend/models"
|
||||||
|
"git.quimbo.fr/odwrtw/canape/backend/web"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
@ -12,7 +13,6 @@ import (
|
|||||||
|
|
||||||
// Channel represents the channel of the user and the server
|
// Channel represents the channel of the user and the server
|
||||||
type Channel struct {
|
type Channel struct {
|
||||||
log *logrus.Entry
|
|
||||||
// Channel where the eventer will write messages to
|
// Channel where the eventer will write messages to
|
||||||
serverEventStream chan ServerEvent
|
serverEventStream chan ServerEvent
|
||||||
// Channel where the eventer will write errors to
|
// Channel where the eventer will write errors to
|
||||||
@ -31,7 +31,7 @@ type Channel struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// go routine writing events to the websocket connection
|
// go routine writing events to the websocket connection
|
||||||
func (c *Channel) writer() {
|
func (c *Channel) writer(env *web.Env) {
|
||||||
// Create the ping timer that will ping the client every pingWait seconds
|
// Create the ping timer that will ping the client every pingWait seconds
|
||||||
// to check that he's still listening
|
// to check that he's still listening
|
||||||
pingTicker := time.NewTicker(pingWait)
|
pingTicker := time.NewTicker(pingWait)
|
||||||
@ -46,30 +46,30 @@ func (c *Channel) writer() {
|
|||||||
case <-pingTicker.C:
|
case <-pingTicker.C:
|
||||||
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||||
c.log.Warnf("error writing message: %s", err)
|
env.Log.Warnf("error writing message: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case e := <-c.serverEventStream:
|
case e := <-c.serverEventStream:
|
||||||
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
if err := c.conn.WriteJSON(e); err != nil {
|
if err := c.conn.WriteJSON(e); err != nil {
|
||||||
c.log.Warnf("error writing JSON message: %s", err)
|
env.Log.Warnf("error writing JSON message: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case err := <-c.serverErrorStream:
|
case err := <-c.serverErrorStream:
|
||||||
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
if err := c.conn.WriteJSON(err); err != nil {
|
if err := c.conn.WriteJSON(err); err != nil {
|
||||||
c.log.Warnf("error writing JSON error: %s", err)
|
env.Log.Warnf("error writing JSON error: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case <-c.done:
|
case <-c.done:
|
||||||
c.log.Debug("all done finished")
|
env.Log.Debug("all done finished")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// go routine reading messages from the websocket connection
|
// go routine reading messages from the websocket connection
|
||||||
func (c *Channel) reader() {
|
func (c *Channel) reader(env *web.Env) {
|
||||||
// Read loop
|
// Read loop
|
||||||
_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||||
for {
|
for {
|
||||||
@ -79,19 +79,19 @@ func (c *Channel) reader() {
|
|||||||
if err := c.conn.ReadJSON(&msg); err != nil {
|
if err := c.conn.ReadJSON(&msg); err != nil {
|
||||||
switch e := err.(type) {
|
switch e := err.(type) {
|
||||||
case *websocket.CloseError:
|
case *websocket.CloseError:
|
||||||
c.log.Info("close error")
|
env.Log.Info("close error")
|
||||||
case net.Error:
|
case net.Error:
|
||||||
if e.Timeout() {
|
if e.Timeout() {
|
||||||
c.log.WithField(
|
env.Log.WithField(
|
||||||
"error", err,
|
"error", err,
|
||||||
).Warn("timeout")
|
).Warn("timeout")
|
||||||
} else {
|
} else {
|
||||||
c.log.WithField(
|
env.Log.WithField(
|
||||||
"error", err,
|
"error", err,
|
||||||
).Warn("unknown net error")
|
).Warn("unknown net error")
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
c.log.WithField(
|
env.Log.WithField(
|
||||||
"error", err,
|
"error", err,
|
||||||
).Warn("unknown error reading message")
|
).Warn("unknown error reading message")
|
||||||
}
|
}
|
||||||
@ -104,31 +104,31 @@ func (c *Channel) reader() {
|
|||||||
|
|
||||||
e, ok := Eventers[msg.Message]
|
e, ok := Eventers[msg.Message]
|
||||||
if !ok {
|
if !ok {
|
||||||
c.log.Warnf("no such event to subscribe %q", msg.Message)
|
env.Log.Warnf("no such event to subscribe %q", msg.Message)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
switch msg.Type {
|
switch msg.Type {
|
||||||
case "subscribe":
|
case "subscribe":
|
||||||
c.log.Debugf("subscribe to %s", msg.Message)
|
env.Log.Debugf("subscribe to %s", msg.Message)
|
||||||
if _, ok := c.Events[e.Name]; ok {
|
if _, ok := c.Events[e.Name]; ok {
|
||||||
c.log.Infof("user %s is already subscribed to %s", c.User.Name, e.Name)
|
env.Log.Infof("user %s is already subscribed to %s", c.User.Name, e.Name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := e.Subscribe(c); err != nil {
|
if err := e.Subscribe(c); err != nil {
|
||||||
c.Error(e.Name, err)
|
c.Error(e.Name, err, env.Log)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
c.Events[e.Name] = struct{}{}
|
c.Events[e.Name] = struct{}{}
|
||||||
case "unsubscribe":
|
case "unsubscribe":
|
||||||
c.log.Debugf("unsubscribe from %s", msg.Message)
|
env.Log.Debugf("unsubscribe from %s", msg.Message)
|
||||||
if _, ok := c.Events[e.Name]; !ok {
|
if _, ok := c.Events[e.Name]; !ok {
|
||||||
c.log.Infof("user %s is not subscribed to %s", c.User.Name, e.Name)
|
env.Log.Infof("user %s is not subscribed to %s", c.User.Name, e.Name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
e.Unsubscribe(c)
|
e.Unsubscribe(c)
|
||||||
default:
|
default:
|
||||||
c.log.Warnf("invalid type: %s", msg.Type)
|
env.Log.Warnf("invalid type: %s", msg.Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -147,11 +147,11 @@ func (c *Channel) close() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Error sends an error into the errorStream channel
|
// Error sends an error into the errorStream channel
|
||||||
func (c *Channel) Error(name string, err error) {
|
func (c *Channel) Error(name string, err error, log *logrus.Entry) {
|
||||||
if c.closed {
|
if c.closed {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.log.WithField("name", name).Warn(err)
|
log.WithField("name", name).Warn(err)
|
||||||
c.serverErrorStream <- ServerError{
|
c.serverErrorStream <- ServerError{
|
||||||
Event: Event{
|
Event: Event{
|
||||||
Type: name,
|
Type: name,
|
||||||
@ -165,11 +165,11 @@ func (c *Channel) Error(name string, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FatalError sends an error into the errorStream channel
|
// FatalError sends an error into the errorStream channel
|
||||||
func (c *Channel) FatalError(name string, err error) {
|
func (c *Channel) FatalError(name string, err error, log *logrus.Entry) {
|
||||||
if c.closed {
|
if c.closed {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.log.WithField("name", name).Warn(err)
|
log.WithField("name", name).Warn(err)
|
||||||
c.serverErrorStream <- ServerError{
|
c.serverErrorStream <- ServerError{
|
||||||
Event: Event{
|
Event: Event{
|
||||||
Type: name,
|
Type: name,
|
||||||
|
@ -22,14 +22,26 @@ const (
|
|||||||
writeWait = 30 * time.Second
|
writeWait = 30 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// Eventers is a map of all the available Eventers
|
// Eventers is a global map of all the available Eventers
|
||||||
var Eventers = map[string]*PolochonEventers{
|
var Eventers map[string]*PolochonEventers
|
||||||
torrentEventName: NewTorrentEventers(),
|
var eventersSetup bool
|
||||||
videoEventName: NewVideoEventers(),
|
|
||||||
|
func initEventers(env *web.Env) {
|
||||||
|
if eventersSetup {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
Eventers = map[string]*PolochonEventers{
|
||||||
|
torrentEventName: NewTorrentEventers(env),
|
||||||
|
videoEventName: NewVideoEventers(env),
|
||||||
|
}
|
||||||
|
eventersSetup = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// WsHandler handles the websockets messages
|
// WsHandler handles the websockets messages
|
||||||
func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
|
func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
|
||||||
|
initEventers(env)
|
||||||
|
|
||||||
// Get the user
|
// Get the user
|
||||||
user := auth.GetCurrentUser(r, env.Log)
|
user := auth.GetCurrentUser(r, env.Log)
|
||||||
|
|
||||||
@ -67,7 +79,6 @@ func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
|
|||||||
serverErrorStream: serverErrorStream,
|
serverErrorStream: serverErrorStream,
|
||||||
done: make(chan struct{}, 1),
|
done: make(chan struct{}, 1),
|
||||||
conn: ws,
|
conn: ws,
|
||||||
log: env.Log,
|
|
||||||
User: user,
|
User: user,
|
||||||
db: env.Database,
|
db: env.Database,
|
||||||
Events: map[string]struct{}{},
|
Events: map[string]struct{}{},
|
||||||
@ -76,9 +87,9 @@ func WsHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Launch the go routine responsible for writing events in the websocket
|
// Launch the go routine responsible for writing events in the websocket
|
||||||
go c.writer()
|
go c.writer(env)
|
||||||
// Launch the reader responsible for reading events from the websocket
|
// Launch the reader responsible for reading events from the websocket
|
||||||
c.reader()
|
c.reader(env)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -122,6 +133,7 @@ func PolochonHookHandler(env *web.Env, w http.ResponseWriter, r *http.Request) e
|
|||||||
|
|
||||||
// HookDebugHandler handles the websockets messages
|
// HookDebugHandler handles the websockets messages
|
||||||
func HookDebugHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
|
func HookDebugHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
|
||||||
|
initEventers(env)
|
||||||
debug := map[string]map[string][]*Channel{}
|
debug := map[string]map[string][]*Channel{}
|
||||||
|
|
||||||
for e, event := range Eventers {
|
for e, event := range Eventers {
|
||||||
|
@ -5,13 +5,13 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.quimbo.fr/odwrtw/canape/backend/models"
|
"git.quimbo.fr/odwrtw/canape/backend/models"
|
||||||
"github.com/sirupsen/logrus"
|
"git.quimbo.fr/odwrtw/canape/backend/web"
|
||||||
)
|
)
|
||||||
|
|
||||||
// BaseEventer represents the basis of a Eventer
|
// BaseEventer represents the basis of a Eventer
|
||||||
type BaseEventer struct {
|
type BaseEventer struct {
|
||||||
|
env *web.Env
|
||||||
users []*Channel
|
users []*Channel
|
||||||
log *logrus.Entry
|
|
||||||
name string
|
name string
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -20,22 +20,17 @@ type BaseEventer struct {
|
|||||||
// Eventer per polochon
|
// Eventer per polochon
|
||||||
type PolochonEventers struct {
|
type PolochonEventers struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
|
env *web.Env
|
||||||
Name string
|
Name string
|
||||||
log *logrus.Entry
|
|
||||||
polochons map[string]Eventer
|
polochons map[string]Eventer
|
||||||
NewEventer func(polo *models.Polochon, log *logrus.Entry) (Eventer, error)
|
NewEventer func(*web.Env, *models.Polochon) (Eventer, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEventers returns a new PolochonEventers
|
// NewEventers returns a new PolochonEventers
|
||||||
func NewEventers() *PolochonEventers {
|
func NewEventers(env *web.Env) *PolochonEventers {
|
||||||
// Setup the logger
|
|
||||||
logger := logrus.New()
|
|
||||||
logger.Formatter = &logrus.TextFormatter{FullTimestamp: true}
|
|
||||||
logger.Level = logrus.DebugLevel
|
|
||||||
|
|
||||||
return &PolochonEventers{
|
return &PolochonEventers{
|
||||||
|
env: env,
|
||||||
polochons: map[string]Eventer{},
|
polochons: map[string]Eventer{},
|
||||||
log: logrus.NewEntry(logger),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -46,7 +41,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error {
|
|||||||
p.Lock()
|
p.Lock()
|
||||||
defer p.Unlock()
|
defer p.Unlock()
|
||||||
|
|
||||||
p.log.Debugf("subscribing with the user %s", chanl.User.Name)
|
p.env.Log.Debugf("subscribing with the user %s", chanl.User.Name)
|
||||||
if !chanl.User.PolochonActivated {
|
if !chanl.User.PolochonActivated {
|
||||||
return fmt.Errorf("polochon not activated")
|
return fmt.Errorf("polochon not activated")
|
||||||
}
|
}
|
||||||
@ -55,7 +50,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error {
|
|||||||
// listening
|
// listening
|
||||||
tn, ok := p.polochons[chanl.User.PolochonID.String]
|
tn, ok := p.polochons[chanl.User.PolochonID.String]
|
||||||
if !ok {
|
if !ok {
|
||||||
p.log.Debugf("create new eventer for polochon %s", chanl.User.PolochonID.String)
|
p.env.Log.Debugf("create new eventer for polochon %s", chanl.User.PolochonID.String)
|
||||||
|
|
||||||
// Get the user's polochon
|
// Get the user's polochon
|
||||||
polo, err := chanl.User.GetPolochon(chanl.db)
|
polo, err := chanl.User.GetPolochon(chanl.db)
|
||||||
@ -64,7 +59,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create a new Eventer for this polochon
|
// Create a new Eventer for this polochon
|
||||||
tn, err = p.NewEventer(polo, p.log)
|
tn, err = p.NewEventer(p.env, polo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -82,7 +77,7 @@ func (p *PolochonEventers) Subscribe(chanl *Channel) error {
|
|||||||
// Add the Channel to the Eventer
|
// Add the Channel to the Eventer
|
||||||
tn.Append(chanl)
|
tn.Append(chanl)
|
||||||
|
|
||||||
p.log.Debugf("eventer created for polochon %s", chanl.User.PolochonID.String)
|
p.env.Log.Debugf("eventer created for polochon %s", chanl.User.PolochonID.String)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,21 +88,21 @@ func (p *PolochonEventers) Unsubscribe(chanl *Channel) {
|
|||||||
p.Lock()
|
p.Lock()
|
||||||
defer p.Unlock()
|
defer p.Unlock()
|
||||||
|
|
||||||
p.log.Debugf("unsubscribing from %s with %s", p.Name, chanl.User.Name)
|
p.env.Log.Debugf("unsubscribing from %s with %s", p.Name, chanl.User.Name)
|
||||||
|
|
||||||
tn, ok := p.polochons[chanl.User.PolochonID.String]
|
tn, ok := p.polochons[chanl.User.PolochonID.String]
|
||||||
if !ok {
|
if !ok {
|
||||||
p.log.Warnf("no eventer for polochon %s, not unsubscribing", chanl.User.PolochonID.String)
|
p.env.Log.Warnf("no eventer for polochon %s, not unsubscribing", chanl.User.PolochonID.String)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tn.Unsubscribe(chanl); err != nil {
|
if err := tn.Unsubscribe(chanl); err != nil {
|
||||||
p.log.Errorf("failed to unsubscribe eventer: %s", err.Error())
|
p.env.Log.Errorf("failed to unsubscribe eventer: %s", err.Error())
|
||||||
// TODO: check if we need to return here
|
// TODO: check if we need to return here
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(tn.Subscribers()) == 0 {
|
if len(tn.Subscribers()) == 0 {
|
||||||
p.log.Debugf("empty subscribers for this polochon, delete it")
|
p.env.Log.Debugf("empty subscribers for this polochon, delete it")
|
||||||
tn.Finish()
|
tn.Finish()
|
||||||
// Delete the polochon from the Eventer when it's finished
|
// Delete the polochon from the Eventer when it's finished
|
||||||
delete(p.polochons, chanl.User.PolochonID.String)
|
delete(p.polochons, chanl.User.PolochonID.String)
|
||||||
@ -133,7 +128,7 @@ func (e *BaseEventer) Unsubscribe(chanl *Channel) error {
|
|||||||
i++
|
i++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
e.log.Debugf("found the user channel %s for user %s, deleting it...", chanl.ID, chanl.User.Name)
|
e.env.Log.Debugf("found the user channel %s for user %s, deleting it...", chanl.ID, chanl.User.Name)
|
||||||
|
|
||||||
// Delete this event from the list of events the channel is subscribed
|
// Delete this event from the list of events the channel is subscribed
|
||||||
delete(e.users[i].Events, e.name)
|
delete(e.users[i].Events, e.name)
|
||||||
@ -162,7 +157,7 @@ func (e *BaseEventer) Unsubscribe(chanl *Channel) error {
|
|||||||
func (e *BaseEventer) FatalError(err error) {
|
func (e *BaseEventer) FatalError(err error) {
|
||||||
for _, chanl := range e.users {
|
for _, chanl := range e.users {
|
||||||
// Send the error
|
// Send the error
|
||||||
chanl.FatalError(e.name, err)
|
chanl.FatalError(e.name, err, e.env.Log)
|
||||||
// Delete the event from the channel events
|
// Delete the event from the channel events
|
||||||
delete(chanl.Events, e.name)
|
delete(chanl.Events, e.name)
|
||||||
}
|
}
|
||||||
@ -191,7 +186,7 @@ func (e *BaseEventer) NotifyAll(data interface{}) {
|
|||||||
|
|
||||||
// Send the events to all the subscribed users
|
// Send the events to all the subscribed users
|
||||||
for _, chanl := range e.users {
|
for _, chanl := range e.users {
|
||||||
e.log.Debugf("sending event to %s", chanl.User.Name)
|
e.env.Log.Debugf("sending event to %s", chanl.User.Name)
|
||||||
chanl.sendEvent(event)
|
chanl.sendEvent(event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6,8 +6,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.quimbo.fr/odwrtw/canape/backend/models"
|
"git.quimbo.fr/odwrtw/canape/backend/models"
|
||||||
|
"git.quimbo.fr/odwrtw/canape/backend/web"
|
||||||
"github.com/odwrtw/papi"
|
"github.com/odwrtw/papi"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TorrentEventer represents the Eventer for torrents
|
// TorrentEventer represents the Eventer for torrents
|
||||||
@ -21,15 +21,15 @@ type TorrentEventer struct {
|
|||||||
var torrentEventName = "torrents"
|
var torrentEventName = "torrents"
|
||||||
|
|
||||||
// NewTorrentEventers returns a new PolochonEventers for torrents
|
// NewTorrentEventers returns a new PolochonEventers for torrents
|
||||||
func NewTorrentEventers() *PolochonEventers {
|
func NewTorrentEventers(env *web.Env) *PolochonEventers {
|
||||||
eventer := NewEventers()
|
eventer := NewEventers(env)
|
||||||
eventer.NewEventer = NewTorrentEventer
|
eventer.NewEventer = NewTorrentEventer
|
||||||
eventer.Name = torrentEventName
|
eventer.Name = torrentEventName
|
||||||
return eventer
|
return eventer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTorrentEventer returns a new Eventer for a specific Polochon
|
// NewTorrentEventer returns a new Eventer for a specific Polochon
|
||||||
func NewTorrentEventer(polo *models.Polochon, log *logrus.Entry) (Eventer, error) {
|
func NewTorrentEventer(env *web.Env, polo *models.Polochon) (Eventer, error) {
|
||||||
// Create a new papi client
|
// Create a new papi client
|
||||||
client, err := polo.NewPapiClient()
|
client, err := polo.NewPapiClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -39,8 +39,8 @@ func NewTorrentEventer(polo *models.Polochon, log *logrus.Entry) (Eventer, error
|
|||||||
// This is the first time this polochon is requested, create the TorrentEventer
|
// This is the first time this polochon is requested, create the TorrentEventer
|
||||||
tn := &TorrentEventer{
|
tn := &TorrentEventer{
|
||||||
BaseEventer: &BaseEventer{
|
BaseEventer: &BaseEventer{
|
||||||
|
env: env,
|
||||||
users: []*Channel{},
|
users: []*Channel{},
|
||||||
log: log,
|
|
||||||
name: torrentEventName,
|
name: torrentEventName,
|
||||||
},
|
},
|
||||||
pClient: client,
|
pClient: client,
|
||||||
@ -77,7 +77,7 @@ func (t *TorrentEventer) Launch() error {
|
|||||||
|
|
||||||
err := t.torrentsUpdate()
|
err := t.torrentsUpdate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("got error getting torrents: %s", err)
|
t.env.Log.Warnf("got error getting torrents: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -85,10 +85,10 @@ func (t *TorrentEventer) Launch() error {
|
|||||||
case <-timeTicker.C:
|
case <-timeTicker.C:
|
||||||
err := t.torrentsUpdate()
|
err := t.torrentsUpdate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("got error getting torrents: %s", err)
|
t.env.Log.Warnf("got error getting torrents: %s", err)
|
||||||
}
|
}
|
||||||
case <-t.done:
|
case <-t.done:
|
||||||
t.log.Debug("quit torrent notifier")
|
t.env.Log.Debug("quit torrent notifier")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -106,9 +106,15 @@ func (t *TorrentEventer) torrentsUpdate() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
t.log.Debugf("torrents have changed!")
|
t.env.Log.Debugf("torrents have changed!")
|
||||||
|
|
||||||
t.NotifyAll(torrents)
|
notification := make([]*models.TorrentVideo, len(torrents))
|
||||||
|
for i := range torrents {
|
||||||
|
notification[i] = models.NewTorrentVideo(&torrents[i])
|
||||||
|
notification[i].Update(t.env.Backend.Detailer, t.env.Log)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.NotifyAll(notification)
|
||||||
|
|
||||||
t.torrents = torrents
|
t.torrents = torrents
|
||||||
|
|
||||||
@ -118,6 +124,6 @@ func (t *TorrentEventer) torrentsUpdate() error {
|
|||||||
// Finish implements the Eventer interface
|
// Finish implements the Eventer interface
|
||||||
// It is called when there is no more users subscribed
|
// It is called when there is no more users subscribed
|
||||||
func (t *TorrentEventer) Finish() {
|
func (t *TorrentEventer) Finish() {
|
||||||
t.log.Debugf("sending the done channel")
|
t.env.Log.Debugf("sending the done channel")
|
||||||
t.done <- struct{}{}
|
t.done <- struct{}{}
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,7 @@ package events
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"git.quimbo.fr/odwrtw/canape/backend/models"
|
"git.quimbo.fr/odwrtw/canape/backend/models"
|
||||||
"github.com/sirupsen/logrus"
|
"git.quimbo.fr/odwrtw/canape/backend/web"
|
||||||
)
|
)
|
||||||
|
|
||||||
// VideoEventer represents the Eventer for tests
|
// VideoEventer represents the Eventer for tests
|
||||||
@ -13,19 +13,19 @@ type VideoEventer struct {
|
|||||||
var videoEventName = "newVideo"
|
var videoEventName = "newVideo"
|
||||||
|
|
||||||
// NewVideoEventers implements the Eventer interface
|
// NewVideoEventers implements the Eventer interface
|
||||||
func NewVideoEventers() *PolochonEventers {
|
func NewVideoEventers(env *web.Env) *PolochonEventers {
|
||||||
eventer := NewEventers()
|
eventer := NewEventers(env)
|
||||||
eventer.NewEventer = NewVideoEventer
|
eventer.NewEventer = NewVideoEventer
|
||||||
eventer.Name = videoEventName
|
eventer.Name = videoEventName
|
||||||
return eventer
|
return eventer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewVideoEventer returns a new Eventer
|
// NewVideoEventer returns a new Eventer
|
||||||
func NewVideoEventer(polo *models.Polochon, log *logrus.Entry) (Eventer, error) {
|
func NewVideoEventer(env *web.Env, polo *models.Polochon) (Eventer, error) {
|
||||||
return &VideoEventer{
|
return &VideoEventer{
|
||||||
BaseEventer: &BaseEventer{
|
BaseEventer: &BaseEventer{
|
||||||
|
env: env,
|
||||||
users: []*Channel{},
|
users: []*Channel{},
|
||||||
log: log,
|
|
||||||
name: videoEventName,
|
name: videoEventName,
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
|
42
backend/models/torrent_video.go
Normal file
42
backend/models/torrent_video.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/odwrtw/papi"
|
||||||
|
polochon "github.com/odwrtw/polochon/lib"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TorrentVideo reprensents a torrent embeding the video inforamtions
|
||||||
|
type TorrentVideo struct {
|
||||||
|
*papi.Torrent
|
||||||
|
Video polochon.Video `json:"video,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTorrentVideo returns a new TorrentVideo
|
||||||
|
func NewTorrentVideo(t *papi.Torrent) *TorrentVideo {
|
||||||
|
torrent := &polochon.Torrent{
|
||||||
|
ImdbID: t.ImdbID,
|
||||||
|
Type: polochon.VideoType(t.Type),
|
||||||
|
Quality: polochon.Quality(t.Quality),
|
||||||
|
Season: t.Season,
|
||||||
|
Episode: t.Episode,
|
||||||
|
}
|
||||||
|
|
||||||
|
return &TorrentVideo{
|
||||||
|
Torrent: t,
|
||||||
|
Video: torrent.Video(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update updates the Torrent video with the database details
|
||||||
|
func (t *TorrentVideo) Update(detailer polochon.Detailer, log *logrus.Entry) {
|
||||||
|
if t.Video == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: refresh the video in db if not found
|
||||||
|
err := detailer.GetDetails(t.Video, log)
|
||||||
|
if err != nil {
|
||||||
|
log.WithField("function", "TorrentVideo.Update").Errorf(err.Error())
|
||||||
|
}
|
||||||
|
}
|
@ -7,6 +7,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"git.quimbo.fr/odwrtw/canape/backend/auth"
|
"git.quimbo.fr/odwrtw/canape/backend/auth"
|
||||||
|
"git.quimbo.fr/odwrtw/canape/backend/models"
|
||||||
"git.quimbo.fr/odwrtw/canape/backend/web"
|
"git.quimbo.fr/odwrtw/canape/backend/web"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/odwrtw/papi"
|
"github.com/odwrtw/papi"
|
||||||
@ -46,11 +47,18 @@ func ListHandler(env *web.Env, w http.ResponseWriter, r *http.Request) error {
|
|||||||
return env.RenderError(w, err)
|
return env.RenderError(w, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
torrents, err := client.GetTorrents()
|
list, err := client.GetTorrents()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return env.RenderError(w, err)
|
return env.RenderError(w, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
torrents := make([]*models.TorrentVideo, len(list))
|
||||||
|
for i, t := range list {
|
||||||
|
tv := models.NewTorrentVideo(&t)
|
||||||
|
tv.Update(env.Backend.Detailer, env.Log)
|
||||||
|
torrents[i] = tv
|
||||||
|
}
|
||||||
|
|
||||||
return env.RenderJSON(w, torrents)
|
return env.RenderJSON(w, torrents)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user