forked from
jollywhoppers.com/witchsky.app
Bluesky app fork with some witchin' additions 馃挮
1import {
2 type AppBskyFeedDefs,
3 type AppBskyFeedGetTimeline,
4 type BskyAgent,
5} from '@atproto/api'
6import shuffle from 'lodash.shuffle'
7
8import {bundleAsync} from '#/lib/async/bundle'
9import {timeout} from '#/lib/async/timeout'
10import {feedUriToHref} from '#/lib/strings/url-helpers'
11import {getContentLanguages} from '#/state/preferences/languages'
12import {type FeedParams} from '#/state/queries/post-feed'
13import {FeedTuner} from '../feed-manip'
14import {type FeedTunerFn} from '../feed-manip'
15import {
16 type FeedAPI,
17 type FeedAPIResponse,
18 type ReasonFeedSource,
19} from './types'
20import {createBskyTopicsHeader, isBlueskyOwnedFeed} from './utils'
21
22const REQUEST_WAIT_MS = 500 // 500ms
23const POST_AGE_CUTOFF = 60e3 * 60 * 24 // 24hours
24
25export class MergeFeedAPI implements FeedAPI {
26 userInterests?: string
27 agent: BskyAgent
28 params: FeedParams
29 feedTuners: FeedTunerFn[]
30 following: MergeFeedSource_Following
31 customFeeds: MergeFeedSource_Custom[] = []
32 feedCursor = 0
33 itemCursor = 0
34 sampleCursor = 0
35
36 constructor({
37 agent,
38 feedParams,
39 feedTuners,
40 userInterests,
41 }: {
42 agent: BskyAgent
43 feedParams: FeedParams
44 feedTuners: FeedTunerFn[]
45 userInterests?: string
46 }) {
47 this.agent = agent
48 this.params = feedParams
49 this.feedTuners = feedTuners
50 this.userInterests = userInterests
51 this.following = new MergeFeedSource_Following({
52 agent: this.agent,
53 feedTuners: this.feedTuners,
54 })
55 }
56
57 reset() {
58 this.following = new MergeFeedSource_Following({
59 agent: this.agent,
60 feedTuners: this.feedTuners,
61 })
62 this.customFeeds = []
63 this.feedCursor = 0
64 this.itemCursor = 0
65 this.sampleCursor = 0
66 if (this.params.mergeFeedSources) {
67 this.customFeeds = shuffle(
68 this.params.mergeFeedSources.map(
69 feedUri =>
70 new MergeFeedSource_Custom({
71 agent: this.agent,
72 feedUri,
73 feedTuners: this.feedTuners,
74 userInterests: this.userInterests,
75 }),
76 ),
77 )
78 } else {
79 this.customFeeds = []
80 }
81 }
82
83 async peekLatest(): Promise<AppBskyFeedDefs.FeedViewPost> {
84 const res = await this.agent.getTimeline({
85 limit: 1,
86 })
87 return res.data.feed[0]
88 }
89
90 async fetch({
91 cursor,
92 limit,
93 }: {
94 cursor: string | undefined
95 limit: number
96 }): Promise<FeedAPIResponse> {
97 if (!cursor) {
98 this.reset()
99 }
100
101 const promises = []
102
103 // always keep following topped up
104 if (this.following.numReady < limit) {
105 await this.following.fetchNext(60)
106 }
107
108 // pick the next feeds to sample from
109 const feeds = this.customFeeds.slice(this.feedCursor, this.feedCursor + 3)
110 this.feedCursor += 3
111 if (this.feedCursor > this.customFeeds.length) {
112 this.feedCursor = 0
113 }
114
115 // top up the feeds
116 const outOfFollows =
117 !this.following.hasMore && this.following.numReady < limit
118 if (this.params.mergeFeedEnabled || outOfFollows) {
119 for (const feed of feeds) {
120 if (feed.numReady < 5) {
121 promises.push(feed.fetchNext(10))
122 }
123 }
124 }
125
126 // wait for requests (all capped at a fixed timeout)
127 await Promise.all(promises)
128
129 // assemble a response by sampling from feeds with content
130 const posts: AppBskyFeedDefs.FeedViewPost[] = []
131 while (posts.length < limit) {
132 let slice = this.sampleItem()
133 if (slice[0]) {
134 posts.push(slice[0])
135 } else {
136 break
137 }
138 }
139
140 return {
141 cursor: String(this.itemCursor),
142 feed: posts,
143 }
144 }
145
146 sampleItem() {
147 const i = this.itemCursor++
148 const candidateFeeds = this.customFeeds.filter(f => f.numReady > 0)
149 const canSample = candidateFeeds.length > 0
150 const hasFollows = this.following.hasMore
151 const hasFollowsReady = this.following.numReady > 0
152
153 // this condition establishes the frequency that custom feeds are woven into follows
154 const shouldSample =
155 this.params.mergeFeedEnabled &&
156 i >= 15 &&
157 candidateFeeds.length >= 2 &&
158 (i % 4 === 0 || i % 5 === 0)
159
160 if (!canSample && !hasFollows) {
161 // no data available
162 return []
163 }
164 if (shouldSample || !hasFollows) {
165 // time to sample, or the user isnt following anybody
166 return candidateFeeds[this.sampleCursor++ % candidateFeeds.length].take(1)
167 }
168 if (!hasFollowsReady) {
169 // stop here so more follows can be fetched
170 return []
171 }
172 // provide follow
173 return this.following.take(1)
174 }
175}
176
177class MergeFeedSource {
178 agent: BskyAgent
179 feedTuners: FeedTunerFn[]
180 sourceInfo: ReasonFeedSource | undefined
181 cursor: string | undefined = undefined
182 queue: AppBskyFeedDefs.FeedViewPost[] = []
183 hasMore = true
184
185 constructor({
186 agent,
187 feedTuners,
188 }: {
189 agent: BskyAgent
190 feedTuners: FeedTunerFn[]
191 }) {
192 this.agent = agent
193 this.feedTuners = feedTuners
194 }
195
196 get numReady() {
197 return this.queue.length
198 }
199
200 get needsFetch() {
201 return this.hasMore && this.queue.length === 0
202 }
203
204 take(n: number): AppBskyFeedDefs.FeedViewPost[] {
205 return this.queue.splice(0, n)
206 }
207
208 async fetchNext(n: number) {
209 await Promise.race([this._fetchNextInner(n), timeout(REQUEST_WAIT_MS)])
210 }
211
212 _fetchNextInner = bundleAsync(async (n: number) => {
213 const res = await this._getFeed(this.cursor, n)
214 if (res.success) {
215 this.cursor = res.data.cursor
216 if (res.data.feed.length) {
217 this.queue = this.queue.concat(res.data.feed)
218 } else {
219 this.hasMore = false
220 }
221 } else {
222 this.hasMore = false
223 }
224 })
225
226 protected _getFeed(
227 _cursor: string | undefined,
228 _limit: number,
229 ): Promise<AppBskyFeedGetTimeline.Response> {
230 throw new Error('Must be overridden')
231 }
232}
233
234class MergeFeedSource_Following extends MergeFeedSource {
235 tuner = new FeedTuner(this.feedTuners)
236
237 async fetchNext(n: number) {
238 return this._fetchNextInner(n)
239 }
240
241 protected async _getFeed(
242 cursor: string | undefined,
243 limit: number,
244 ): Promise<AppBskyFeedGetTimeline.Response> {
245 const res = await this.agent.getTimeline({cursor, limit})
246 // run the tuner pre-emptively to ensure better mixing
247 const slices = this.tuner.tune(res.data.feed, {
248 dryRun: false,
249 })
250 res.data.feed = slices.map(slice => slice._feedPost)
251 return res
252 }
253}
254
255class MergeFeedSource_Custom extends MergeFeedSource {
256 agent: BskyAgent
257 minDate: Date
258 feedUri: string
259 userInterests?: string
260
261 constructor({
262 agent,
263 feedUri,
264 feedTuners,
265 userInterests,
266 }: {
267 agent: BskyAgent
268 feedUri: string
269 feedTuners: FeedTunerFn[]
270 userInterests?: string
271 }) {
272 super({
273 agent,
274 feedTuners,
275 })
276 this.agent = agent
277 this.feedUri = feedUri
278 this.userInterests = userInterests
279 this.sourceInfo = {
280 $type: 'reasonFeedSource',
281 uri: feedUri,
282 href: feedUriToHref(feedUri),
283 }
284 this.minDate = new Date(Date.now() - POST_AGE_CUTOFF)
285 }
286
287 protected async _getFeed(
288 cursor: string | undefined,
289 limit: number,
290 ): Promise<AppBskyFeedGetTimeline.Response> {
291 try {
292 const contentLangs = getContentLanguages().join(',')
293 const isBlueskyOwned = isBlueskyOwnedFeed(this.feedUri)
294 const res = await this.agent.app.bsky.feed.getFeed(
295 {
296 cursor,
297 limit,
298 feed: this.feedUri,
299 },
300 {
301 headers: {
302 ...(isBlueskyOwned
303 ? createBskyTopicsHeader(this.userInterests)
304 : {}),
305 'Accept-Language': contentLangs,
306 },
307 },
308 )
309 // NOTE
310 // some custom feeds fail to enforce the pagination limit
311 // so we manually truncate here
312 // -prf
313 if (limit && res.data.feed.length > limit) {
314 res.data.feed = res.data.feed.slice(0, limit)
315 }
316 // filter out older posts
317 res.data.feed = res.data.feed.filter(
318 post => new Date(post.post.indexedAt) > this.minDate,
319 )
320 // attach source info
321 for (const post of res.data.feed) {
322 // @ts-ignore
323 post.__source = this.sourceInfo
324 }
325 return res
326 } catch {
327 // dont bubble custom-feed errors
328 return {success: false, headers: {}, data: {feed: []}}
329 }
330 }
331}