my fork of the bluesky client
1import {AppBskyFeedDefs, AppBskyFeedGetTimeline, BskyAgent} from '@atproto/api'
2import shuffle from 'lodash.shuffle'
3
4import {bundleAsync} from '#/lib/async/bundle'
5import {timeout} from '#/lib/async/timeout'
6import {feedUriToHref} from '#/lib/strings/url-helpers'
7import {getContentLanguages} from '#/state/preferences/languages'
8import {FeedParams} from '#/state/queries/post-feed'
9import {FeedTuner} from '../feed-manip'
10import {FeedTunerFn} from '../feed-manip'
11import {FeedAPI, FeedAPIResponse, ReasonFeedSource} from './types'
12import {createBskyTopicsHeader, isBlueskyOwnedFeed} from './utils'
13
14const REQUEST_WAIT_MS = 500 // 500ms
15const POST_AGE_CUTOFF = 60e3 * 60 * 24 // 24hours
16
17export class MergeFeedAPI implements FeedAPI {
18 userInterests?: string
19 agent: BskyAgent
20 params: FeedParams
21 feedTuners: FeedTunerFn[]
22 following: MergeFeedSource_Following
23 customFeeds: MergeFeedSource_Custom[] = []
24 feedCursor = 0
25 itemCursor = 0
26 sampleCursor = 0
27
28 constructor({
29 agent,
30 feedParams,
31 feedTuners,
32 userInterests,
33 }: {
34 agent: BskyAgent
35 feedParams: FeedParams
36 feedTuners: FeedTunerFn[]
37 userInterests?: string
38 }) {
39 this.agent = agent
40 this.params = feedParams
41 this.feedTuners = feedTuners
42 this.userInterests = userInterests
43 this.following = new MergeFeedSource_Following({
44 agent: this.agent,
45 feedTuners: this.feedTuners,
46 })
47 }
48
49 reset() {
50 this.following = new MergeFeedSource_Following({
51 agent: this.agent,
52 feedTuners: this.feedTuners,
53 })
54 this.customFeeds = []
55 this.feedCursor = 0
56 this.itemCursor = 0
57 this.sampleCursor = 0
58 if (this.params.mergeFeedSources) {
59 this.customFeeds = shuffle(
60 this.params.mergeFeedSources.map(
61 feedUri =>
62 new MergeFeedSource_Custom({
63 agent: this.agent,
64 feedUri,
65 feedTuners: this.feedTuners,
66 userInterests: this.userInterests,
67 }),
68 ),
69 )
70 } else {
71 this.customFeeds = []
72 }
73 }
74
75 async peekLatest(): Promise<AppBskyFeedDefs.FeedViewPost> {
76 const res = await this.agent.getTimeline({
77 limit: 1,
78 })
79 return res.data.feed[0]
80 }
81
82 async fetch({
83 cursor,
84 limit,
85 }: {
86 cursor: string | undefined
87 limit: number
88 }): Promise<FeedAPIResponse> {
89 if (!cursor) {
90 this.reset()
91 }
92
93 const promises = []
94
95 // always keep following topped up
96 if (this.following.numReady < limit) {
97 await this.following.fetchNext(60)
98 }
99
100 // pick the next feeds to sample from
101 const feeds = this.customFeeds.slice(this.feedCursor, this.feedCursor + 3)
102 this.feedCursor += 3
103 if (this.feedCursor > this.customFeeds.length) {
104 this.feedCursor = 0
105 }
106
107 // top up the feeds
108 const outOfFollows =
109 !this.following.hasMore && this.following.numReady < limit
110 if (this.params.mergeFeedEnabled || outOfFollows) {
111 for (const feed of feeds) {
112 if (feed.numReady < 5) {
113 promises.push(feed.fetchNext(10))
114 }
115 }
116 }
117
118 // wait for requests (all capped at a fixed timeout)
119 await Promise.all(promises)
120
121 // assemble a response by sampling from feeds with content
122 const posts: AppBskyFeedDefs.FeedViewPost[] = []
123 while (posts.length < limit) {
124 let slice = this.sampleItem()
125 if (slice[0]) {
126 posts.push(slice[0])
127 } else {
128 break
129 }
130 }
131
132 return {
133 cursor: String(this.itemCursor),
134 feed: posts,
135 }
136 }
137
138 sampleItem() {
139 const i = this.itemCursor++
140 const candidateFeeds = this.customFeeds.filter(f => f.numReady > 0)
141 const canSample = candidateFeeds.length > 0
142 const hasFollows = this.following.hasMore
143 const hasFollowsReady = this.following.numReady > 0
144
145 // this condition establishes the frequency that custom feeds are woven into follows
146 const shouldSample =
147 this.params.mergeFeedEnabled &&
148 i >= 15 &&
149 candidateFeeds.length >= 2 &&
150 (i % 4 === 0 || i % 5 === 0)
151
152 if (!canSample && !hasFollows) {
153 // no data available
154 return []
155 }
156 if (shouldSample || !hasFollows) {
157 // time to sample, or the user isnt following anybody
158 return candidateFeeds[this.sampleCursor++ % candidateFeeds.length].take(1)
159 }
160 if (!hasFollowsReady) {
161 // stop here so more follows can be fetched
162 return []
163 }
164 // provide follow
165 return this.following.take(1)
166 }
167}
168
169class MergeFeedSource {
170 agent: BskyAgent
171 feedTuners: FeedTunerFn[]
172 sourceInfo: ReasonFeedSource | undefined
173 cursor: string | undefined = undefined
174 queue: AppBskyFeedDefs.FeedViewPost[] = []
175 hasMore = true
176
177 constructor({
178 agent,
179 feedTuners,
180 }: {
181 agent: BskyAgent
182 feedTuners: FeedTunerFn[]
183 }) {
184 this.agent = agent
185 this.feedTuners = feedTuners
186 }
187
188 get numReady() {
189 return this.queue.length
190 }
191
192 get needsFetch() {
193 return this.hasMore && this.queue.length === 0
194 }
195
196 take(n: number): AppBskyFeedDefs.FeedViewPost[] {
197 return this.queue.splice(0, n)
198 }
199
200 async fetchNext(n: number) {
201 await Promise.race([this._fetchNextInner(n), timeout(REQUEST_WAIT_MS)])
202 }
203
204 _fetchNextInner = bundleAsync(async (n: number) => {
205 const res = await this._getFeed(this.cursor, n)
206 if (res.success) {
207 this.cursor = res.data.cursor
208 if (res.data.feed.length) {
209 this.queue = this.queue.concat(res.data.feed)
210 } else {
211 this.hasMore = false
212 }
213 } else {
214 this.hasMore = false
215 }
216 })
217
218 protected _getFeed(
219 _cursor: string | undefined,
220 _limit: number,
221 ): Promise<AppBskyFeedGetTimeline.Response> {
222 throw new Error('Must be overridden')
223 }
224}
225
226class MergeFeedSource_Following extends MergeFeedSource {
227 tuner = new FeedTuner(this.feedTuners)
228
229 async fetchNext(n: number) {
230 return this._fetchNextInner(n)
231 }
232
233 protected async _getFeed(
234 cursor: string | undefined,
235 limit: number,
236 ): Promise<AppBskyFeedGetTimeline.Response> {
237 const res = await this.agent.getTimeline({cursor, limit})
238 // run the tuner pre-emptively to ensure better mixing
239 const slices = this.tuner.tune(res.data.feed, {
240 dryRun: false,
241 })
242 res.data.feed = slices.map(slice => slice._feedPost)
243 return res
244 }
245}
246
247class MergeFeedSource_Custom extends MergeFeedSource {
248 agent: BskyAgent
249 minDate: Date
250 feedUri: string
251 userInterests?: string
252
253 constructor({
254 agent,
255 feedUri,
256 feedTuners,
257 userInterests,
258 }: {
259 agent: BskyAgent
260 feedUri: string
261 feedTuners: FeedTunerFn[]
262 userInterests?: string
263 }) {
264 super({
265 agent,
266 feedTuners,
267 })
268 this.agent = agent
269 this.feedUri = feedUri
270 this.userInterests = userInterests
271 this.sourceInfo = {
272 $type: 'reasonFeedSource',
273 uri: feedUri,
274 href: feedUriToHref(feedUri),
275 }
276 this.minDate = new Date(Date.now() - POST_AGE_CUTOFF)
277 }
278
279 protected async _getFeed(
280 cursor: string | undefined,
281 limit: number,
282 ): Promise<AppBskyFeedGetTimeline.Response> {
283 try {
284 const contentLangs = getContentLanguages().join(',')
285 const isBlueskyOwned = isBlueskyOwnedFeed(this.feedUri)
286 const res = await this.agent.app.bsky.feed.getFeed(
287 {
288 cursor,
289 limit,
290 feed: this.feedUri,
291 },
292 {
293 headers: {
294 ...(isBlueskyOwned
295 ? createBskyTopicsHeader(this.userInterests)
296 : {}),
297 'Accept-Language': contentLangs,
298 },
299 },
300 )
301 // NOTE
302 // some custom feeds fail to enforce the pagination limit
303 // so we manually truncate here
304 // -prf
305 if (limit && res.data.feed.length > limit) {
306 res.data.feed = res.data.feed.slice(0, limit)
307 }
308 // filter out older posts
309 res.data.feed = res.data.feed.filter(
310 post => new Date(post.post.indexedAt) > this.minDate,
311 )
312 // attach source info
313 for (const post of res.data.feed) {
314 post.__source = this.sourceInfo
315 }
316 return res
317 } catch {
318 // dont bubble custom-feed errors
319 return {success: false, headers: {}, data: {feed: []}}
320 }
321 }
322}