129 lines
3.0 KiB
Go
129 lines
3.0 KiB
Go
package events
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
"time"
|
|
|
|
"git.quimbo.fr/odwrtw/canape/backend/models"
|
|
"git.quimbo.fr/odwrtw/canape/backend/web"
|
|
polochon "github.com/odwrtw/polochon/lib"
|
|
"github.com/odwrtw/polochon/lib/papi"
|
|
)
|
|
|
|
// TorrentEventer represents the Eventer for torrents
|
|
type TorrentEventer struct {
|
|
*BaseEventer
|
|
done chan struct{}
|
|
pClient *papi.Client
|
|
// previous keep the previous data
|
|
previous []*polochon.Torrent
|
|
// data holds the computed data
|
|
data []*models.TorrentVideo
|
|
}
|
|
|
|
var torrentEventName = "torrents"
|
|
|
|
// NewTorrentEventers returns a new PolochonEventers for torrents
|
|
func NewTorrentEventers(env *web.Env) *PolochonEventers {
|
|
eventer := NewEventers(env)
|
|
eventer.NewEventer = NewTorrentEventer
|
|
eventer.Name = torrentEventName
|
|
return eventer
|
|
}
|
|
|
|
// NewTorrentEventer returns a new Eventer for a specific Polochon
|
|
func NewTorrentEventer(env *web.Env, polo *models.Polochon) (Eventer, error) {
|
|
// Create a new papi client
|
|
client, err := polo.NewPapiClient()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to instanciate polochon client %s", err)
|
|
}
|
|
|
|
// This is the first time this polochon is requested, create the TorrentEventer
|
|
tn := &TorrentEventer{
|
|
BaseEventer: &BaseEventer{
|
|
env: env,
|
|
users: []*Channel{},
|
|
name: torrentEventName,
|
|
},
|
|
pClient: client,
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
return tn, nil
|
|
}
|
|
|
|
// Append implements the Eventer interface
|
|
// It just appends a channel to the list of users
|
|
// When the user is appended, send him the torrents infos we have
|
|
func (t *TorrentEventer) Append(chanl *Channel) {
|
|
event := ServerEvent{
|
|
Event: Event{
|
|
Type: torrentEventName,
|
|
Status: OK,
|
|
},
|
|
Data: t.data,
|
|
}
|
|
|
|
chanl.sendEvent(event)
|
|
|
|
t.BaseEventer.Append(chanl)
|
|
}
|
|
|
|
// Launch implements the Eventer interface
|
|
// It starts a ticker to fetch torrents and notify the users
|
|
func (t *TorrentEventer) Launch() error {
|
|
// Create the timer that will check for the torrents every X seconds
|
|
timeTicker := time.NewTicker(10 * time.Second)
|
|
defer timeTicker.Stop()
|
|
|
|
err := t.torrentsUpdate()
|
|
if err != nil {
|
|
t.env.Log.Warnf("got error getting torrents: %s", err)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-timeTicker.C:
|
|
err := t.torrentsUpdate()
|
|
if err != nil {
|
|
t.env.Log.Warnf("got error getting torrents: %s", err)
|
|
}
|
|
case <-t.done:
|
|
t.env.Log.Debug("quit torrent notifier")
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// torrentsUpdate sends to the eventStream if torrents change
|
|
func (t *TorrentEventer) torrentsUpdate() error {
|
|
torrents, err := t.pClient.GetTorrents()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if reflect.DeepEqual(t.previous, torrents) {
|
|
return nil
|
|
}
|
|
|
|
t.env.Log.Debugf("torrents have changed!")
|
|
|
|
data := models.NewTorrentVideos(t.env.Backend.Detailer, t.env.Database, t.env.Log, torrents)
|
|
|
|
t.NotifyAll(data)
|
|
|
|
t.previous = torrents
|
|
t.data = data
|
|
|
|
return nil
|
|
}
|
|
|
|
// Finish implements the Eventer interface
|
|
// It is called when there is no more users subscribed
|
|
func (t *TorrentEventer) Finish() {
|
|
t.env.Log.Debugf("sending the done channel")
|
|
t.done <- struct{}{}
|
|
}
|