rss email digests over ssh because you're a cool kid
herald.dunkirk.sh
go
rss
rss-reader
ssh
charm
1// Package scheduler provides functionality for Herald.
2package scheduler
3
4import (
5 "context"
6 "net/http"
7 "sync"
8 "sync/atomic"
9 "time"
10
11 "github.com/kierank/herald/store"
12 "github.com/mmcdole/gofeed"
13)
14
15const (
16 feedFetchTimeout = 15 * time.Second
17 maxConcurrentFetch = 30
18)
19
20type FetchResult struct {
21 FeedID int64
22 FeedName string
23 FeedURL string
24 Items []FetchedItem
25 ETag string
26 LastModified string
27 Error error
28}
29
30type FetchedItem struct {
31 GUID string
32 Title string
33 Link string
34 Content string
35 Published time.Time
36}
37
38func FetchFeed(ctx context.Context, feed *store.Feed) *FetchResult {
39 result := &FetchResult{
40 FeedID: feed.ID,
41 FeedURL: feed.URL,
42 }
43
44 if feed.Name.Valid {
45 result.FeedName = feed.Name.String
46 }
47
48 ctx, cancel := context.WithTimeout(ctx, feedFetchTimeout)
49 defer cancel()
50
51 req, err := http.NewRequestWithContext(ctx, http.MethodGet, feed.URL, nil)
52 if err != nil {
53 result.Error = err
54 return result
55 }
56
57 req.Header.Set("User-Agent", "Herald/1.0 (RSS Aggregator)")
58
59 if feed.ETag.Valid && feed.ETag.String != "" {
60 req.Header.Set("If-None-Match", feed.ETag.String)
61 }
62 if feed.LastModified.Valid && feed.LastModified.String != "" {
63 req.Header.Set("If-Modified-Since", feed.LastModified.String)
64 }
65
66 client := &http.Client{
67 Timeout: 15 * time.Second,
68 }
69
70 resp, err := client.Do(req)
71 if err != nil {
72 result.Error = err
73 return result
74 }
75 defer func() { _ = resp.Body.Close() }()
76
77 if resp.StatusCode == http.StatusNotModified {
78 return result
79 }
80
81 if resp.StatusCode != http.StatusOK {
82 result.Error = &httpError{StatusCode: resp.StatusCode}
83 return result
84 }
85
86 result.ETag = resp.Header.Get("ETag")
87 result.LastModified = resp.Header.Get("Last-Modified")
88
89 parser := gofeed.NewParser()
90 parsedFeed, err := parser.Parse(resp.Body)
91 if err != nil {
92 result.Error = err
93 return result
94 }
95
96 if result.FeedName == "" && parsedFeed.Title != "" {
97 result.FeedName = parsedFeed.Title
98 }
99
100 for _, item := range parsedFeed.Items {
101 fetchedItem := FetchedItem{
102 GUID: item.GUID,
103 Title: item.Title,
104 Link: item.Link,
105 }
106
107 if fetchedItem.GUID == "" {
108 fetchedItem.GUID = item.Link
109 }
110
111 if item.Content != "" {
112 fetchedItem.Content = item.Content
113 } else if item.Description != "" {
114 fetchedItem.Content = item.Description
115 }
116
117 if item.PublishedParsed != nil {
118 fetchedItem.Published = *item.PublishedParsed
119 } else if item.UpdatedParsed != nil {
120 fetchedItem.Published = *item.UpdatedParsed
121 }
122
123 result.Items = append(result.Items, fetchedItem)
124 }
125
126 return result
127}
128
129func FetchFeeds(ctx context.Context, feeds []*store.Feed, progress *atomic.Int32) []*FetchResult {
130 results := make([]*FetchResult, len(feeds))
131 var wg sync.WaitGroup
132
133 concurrent := maxConcurrentFetch
134 if len(feeds) < concurrent {
135 concurrent = len(feeds)
136 }
137 semaphore := make(chan struct{}, concurrent)
138
139 for i, feed := range feeds {
140 wg.Add(1)
141 go func(idx int, f *store.Feed) {
142 defer func() {
143 if progress != nil {
144 progress.Add(1)
145 }
146 wg.Done()
147 }()
148 semaphore <- struct{}{} // Acquire
149 defer func() { <-semaphore }() // Release
150 results[idx] = FetchFeed(ctx, f)
151 }(i, feed)
152 }
153
154 wg.Wait()
155 return results
156}
157
158type httpError struct {
159 StatusCode int
160}
161
162func (e *httpError) Error() string {
163 return http.StatusText(e.StatusCode)
164}