This repository has been archived by the owner on Jun 7, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
notify.go
81 lines (74 loc) · 1.76 KB
/
notify.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main
import (
"context"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
// anyTime returns one of PublishedParsed or UpdatedParsed if one is not nil
func anyTime(lfi LinkedFeedItem) time.Time {
if lfi.UpdatedParsed == nil {
return *lfi.PublishedParsed
}
return *lfi.UpdatedParsed
}
func findNewItems(oldFeed, newFeed LinkedFeed) []LinkedFeedItem {
var buf []LinkedFeedItem
tmp := make(map[string]struct{}, len(newFeed.Items))
var newestInOld time.Time
for _, i := range oldFeed.Items {
tmp[i.GUID] = struct{}{}
if anyTime(i).After(newestInOld) {
newestInOld = anyTime(i)
}
}
for _, i := range newFeed.Items { // For each new...
if _, found := tmp[i.GUID]; !found {
if anyTime(i).After(newestInOld) {
buf = append(buf, i)
}
}
}
return buf
}
func Notify(ctx context.Context, n NotifyParam, out chan<- LinkedFeedItem, errChan chan error) {
var wg sync.WaitGroup
for _, u := range n.urls {
// When only showing new items, fetch the initial feed
// Othwerwise start with nothing
initial := true
var last LinkedFeed
wg.Add(1)
go func(u string) {
defer wg.Done()
// gofeed.Parser is not thread-safe
for {
select {
case <-ctx.Done():
return
default:
lf, err := n.Fetcher.Fetch(ctx, u)
if err != nil {
errChan <- err
return
}
if lf.Feed.FeedLink != u {
log.Debugf("feed request url and self-reference url mismatch: requested %s, got %s", u, lf.Feed.FeedLink)
}
if initial && n.mode == newItems {
// immediately move on in "newItems" mode
initial = false
} else {
newItems := findNewItems(last, lf)
for _, item := range newItems {
out <- item
}
time.Sleep(n.poll)
}
last = lf
}
}
}(u)
}
wg.Wait()
}