rss email digests over ssh because you're a cool kid herald.dunkirk.sh
go rss rss-reader ssh charm
at main 164 lines 3.4 kB view raw
1// Package scheduler provides functionality for Herald. 2package scheduler 3 4import ( 5 "context" 6 "net/http" 7 "sync" 8 "sync/atomic" 9 "time" 10 11 "github.com/kierank/herald/store" 12 "github.com/mmcdole/gofeed" 13) 14 15const ( 16 feedFetchTimeout = 15 * time.Second 17 maxConcurrentFetch = 30 18) 19 20type FetchResult struct { 21 FeedID int64 22 FeedName string 23 FeedURL string 24 Items []FetchedItem 25 ETag string 26 LastModified string 27 Error error 28} 29 30type FetchedItem struct { 31 GUID string 32 Title string 33 Link string 34 Content string 35 Published time.Time 36} 37 38func FetchFeed(ctx context.Context, feed *store.Feed) *FetchResult { 39 result := &FetchResult{ 40 FeedID: feed.ID, 41 FeedURL: feed.URL, 42 } 43 44 if feed.Name.Valid { 45 result.FeedName = feed.Name.String 46 } 47 48 ctx, cancel := context.WithTimeout(ctx, feedFetchTimeout) 49 defer cancel() 50 51 req, err := http.NewRequestWithContext(ctx, http.MethodGet, feed.URL, nil) 52 if err != nil { 53 result.Error = err 54 return result 55 } 56 57 req.Header.Set("User-Agent", "Herald/1.0 (RSS Aggregator)") 58 59 if feed.ETag.Valid && feed.ETag.String != "" { 60 req.Header.Set("If-None-Match", feed.ETag.String) 61 } 62 if feed.LastModified.Valid && feed.LastModified.String != "" { 63 req.Header.Set("If-Modified-Since", feed.LastModified.String) 64 } 65 66 client := &http.Client{ 67 Timeout: 15 * time.Second, 68 } 69 70 resp, err := client.Do(req) 71 if err != nil { 72 result.Error = err 73 return result 74 } 75 defer func() { _ = resp.Body.Close() }() 76 77 if resp.StatusCode == http.StatusNotModified { 78 return result 79 } 80 81 if resp.StatusCode != http.StatusOK { 82 result.Error = &httpError{StatusCode: resp.StatusCode} 83 return result 84 } 85 86 result.ETag = resp.Header.Get("ETag") 87 result.LastModified = resp.Header.Get("Last-Modified") 88 89 parser := gofeed.NewParser() 90 parsedFeed, err := parser.Parse(resp.Body) 91 if err != nil { 92 result.Error = err 93 return result 94 } 95 96 if result.FeedName == "" && parsedFeed.Title != "" { 97 result.FeedName = parsedFeed.Title 98 } 99 100 for _, item := range parsedFeed.Items { 101 fetchedItem := FetchedItem{ 102 GUID: item.GUID, 103 Title: item.Title, 104 Link: item.Link, 105 } 106 107 if fetchedItem.GUID == "" { 108 fetchedItem.GUID = item.Link 109 } 110 111 if item.Content != "" { 112 fetchedItem.Content = item.Content 113 } else if item.Description != "" { 114 fetchedItem.Content = item.Description 115 } 116 117 if item.PublishedParsed != nil { 118 fetchedItem.Published = *item.PublishedParsed 119 } else if item.UpdatedParsed != nil { 120 fetchedItem.Published = *item.UpdatedParsed 121 } 122 123 result.Items = append(result.Items, fetchedItem) 124 } 125 126 return result 127} 128 129func FetchFeeds(ctx context.Context, feeds []*store.Feed, progress *atomic.Int32) []*FetchResult { 130 results := make([]*FetchResult, len(feeds)) 131 var wg sync.WaitGroup 132 133 concurrent := maxConcurrentFetch 134 if len(feeds) < concurrent { 135 concurrent = len(feeds) 136 } 137 semaphore := make(chan struct{}, concurrent) 138 139 for i, feed := range feeds { 140 wg.Add(1) 141 go func(idx int, f *store.Feed) { 142 defer func() { 143 if progress != nil { 144 progress.Add(1) 145 } 146 wg.Done() 147 }() 148 semaphore <- struct{}{} // Acquire 149 defer func() { <-semaphore }() // Release 150 results[idx] = FetchFeed(ctx, f) 151 }(i, feed) 152 } 153 154 wg.Wait() 155 return results 156} 157 158type httpError struct { 159 StatusCode int 160} 161 162func (e *httpError) Error() string { 163 return http.StatusText(e.StatusCode) 164}