my fork of the bluesky client
at main 322 lines 8.2 kB view raw
1import {AppBskyFeedDefs, AppBskyFeedGetTimeline, BskyAgent} from '@atproto/api' 2import shuffle from 'lodash.shuffle' 3 4import {bundleAsync} from '#/lib/async/bundle' 5import {timeout} from '#/lib/async/timeout' 6import {feedUriToHref} from '#/lib/strings/url-helpers' 7import {getContentLanguages} from '#/state/preferences/languages' 8import {FeedParams} from '#/state/queries/post-feed' 9import {FeedTuner} from '../feed-manip' 10import {FeedTunerFn} from '../feed-manip' 11import {FeedAPI, FeedAPIResponse, ReasonFeedSource} from './types' 12import {createBskyTopicsHeader, isBlueskyOwnedFeed} from './utils' 13 14const REQUEST_WAIT_MS = 500 // 500ms 15const POST_AGE_CUTOFF = 60e3 * 60 * 24 // 24hours 16 17export class MergeFeedAPI implements FeedAPI { 18 userInterests?: string 19 agent: BskyAgent 20 params: FeedParams 21 feedTuners: FeedTunerFn[] 22 following: MergeFeedSource_Following 23 customFeeds: MergeFeedSource_Custom[] = [] 24 feedCursor = 0 25 itemCursor = 0 26 sampleCursor = 0 27 28 constructor({ 29 agent, 30 feedParams, 31 feedTuners, 32 userInterests, 33 }: { 34 agent: BskyAgent 35 feedParams: FeedParams 36 feedTuners: FeedTunerFn[] 37 userInterests?: string 38 }) { 39 this.agent = agent 40 this.params = feedParams 41 this.feedTuners = feedTuners 42 this.userInterests = userInterests 43 this.following = new MergeFeedSource_Following({ 44 agent: this.agent, 45 feedTuners: this.feedTuners, 46 }) 47 } 48 49 reset() { 50 this.following = new MergeFeedSource_Following({ 51 agent: this.agent, 52 feedTuners: this.feedTuners, 53 }) 54 this.customFeeds = [] 55 this.feedCursor = 0 56 this.itemCursor = 0 57 this.sampleCursor = 0 58 if (this.params.mergeFeedSources) { 59 this.customFeeds = shuffle( 60 this.params.mergeFeedSources.map( 61 feedUri => 62 new MergeFeedSource_Custom({ 63 agent: this.agent, 64 feedUri, 65 feedTuners: this.feedTuners, 66 userInterests: this.userInterests, 67 }), 68 ), 69 ) 70 } else { 71 this.customFeeds = [] 72 } 73 } 74 75 async peekLatest(): Promise<AppBskyFeedDefs.FeedViewPost> { 76 const res = await this.agent.getTimeline({ 77 limit: 1, 78 }) 79 return res.data.feed[0] 80 } 81 82 async fetch({ 83 cursor, 84 limit, 85 }: { 86 cursor: string | undefined 87 limit: number 88 }): Promise<FeedAPIResponse> { 89 if (!cursor) { 90 this.reset() 91 } 92 93 const promises = [] 94 95 // always keep following topped up 96 if (this.following.numReady < limit) { 97 await this.following.fetchNext(60) 98 } 99 100 // pick the next feeds to sample from 101 const feeds = this.customFeeds.slice(this.feedCursor, this.feedCursor + 3) 102 this.feedCursor += 3 103 if (this.feedCursor > this.customFeeds.length) { 104 this.feedCursor = 0 105 } 106 107 // top up the feeds 108 const outOfFollows = 109 !this.following.hasMore && this.following.numReady < limit 110 if (this.params.mergeFeedEnabled || outOfFollows) { 111 for (const feed of feeds) { 112 if (feed.numReady < 5) { 113 promises.push(feed.fetchNext(10)) 114 } 115 } 116 } 117 118 // wait for requests (all capped at a fixed timeout) 119 await Promise.all(promises) 120 121 // assemble a response by sampling from feeds with content 122 const posts: AppBskyFeedDefs.FeedViewPost[] = [] 123 while (posts.length < limit) { 124 let slice = this.sampleItem() 125 if (slice[0]) { 126 posts.push(slice[0]) 127 } else { 128 break 129 } 130 } 131 132 return { 133 cursor: String(this.itemCursor), 134 feed: posts, 135 } 136 } 137 138 sampleItem() { 139 const i = this.itemCursor++ 140 const candidateFeeds = this.customFeeds.filter(f => f.numReady > 0) 141 const canSample = candidateFeeds.length > 0 142 const hasFollows = this.following.hasMore 143 const hasFollowsReady = this.following.numReady > 0 144 145 // this condition establishes the frequency that custom feeds are woven into follows 146 const shouldSample = 147 this.params.mergeFeedEnabled && 148 i >= 15 && 149 candidateFeeds.length >= 2 && 150 (i % 4 === 0 || i % 5 === 0) 151 152 if (!canSample && !hasFollows) { 153 // no data available 154 return [] 155 } 156 if (shouldSample || !hasFollows) { 157 // time to sample, or the user isnt following anybody 158 return candidateFeeds[this.sampleCursor++ % candidateFeeds.length].take(1) 159 } 160 if (!hasFollowsReady) { 161 // stop here so more follows can be fetched 162 return [] 163 } 164 // provide follow 165 return this.following.take(1) 166 } 167} 168 169class MergeFeedSource { 170 agent: BskyAgent 171 feedTuners: FeedTunerFn[] 172 sourceInfo: ReasonFeedSource | undefined 173 cursor: string | undefined = undefined 174 queue: AppBskyFeedDefs.FeedViewPost[] = [] 175 hasMore = true 176 177 constructor({ 178 agent, 179 feedTuners, 180 }: { 181 agent: BskyAgent 182 feedTuners: FeedTunerFn[] 183 }) { 184 this.agent = agent 185 this.feedTuners = feedTuners 186 } 187 188 get numReady() { 189 return this.queue.length 190 } 191 192 get needsFetch() { 193 return this.hasMore && this.queue.length === 0 194 } 195 196 take(n: number): AppBskyFeedDefs.FeedViewPost[] { 197 return this.queue.splice(0, n) 198 } 199 200 async fetchNext(n: number) { 201 await Promise.race([this._fetchNextInner(n), timeout(REQUEST_WAIT_MS)]) 202 } 203 204 _fetchNextInner = bundleAsync(async (n: number) => { 205 const res = await this._getFeed(this.cursor, n) 206 if (res.success) { 207 this.cursor = res.data.cursor 208 if (res.data.feed.length) { 209 this.queue = this.queue.concat(res.data.feed) 210 } else { 211 this.hasMore = false 212 } 213 } else { 214 this.hasMore = false 215 } 216 }) 217 218 protected _getFeed( 219 _cursor: string | undefined, 220 _limit: number, 221 ): Promise<AppBskyFeedGetTimeline.Response> { 222 throw new Error('Must be overridden') 223 } 224} 225 226class MergeFeedSource_Following extends MergeFeedSource { 227 tuner = new FeedTuner(this.feedTuners) 228 229 async fetchNext(n: number) { 230 return this._fetchNextInner(n) 231 } 232 233 protected async _getFeed( 234 cursor: string | undefined, 235 limit: number, 236 ): Promise<AppBskyFeedGetTimeline.Response> { 237 const res = await this.agent.getTimeline({cursor, limit}) 238 // run the tuner pre-emptively to ensure better mixing 239 const slices = this.tuner.tune(res.data.feed, { 240 dryRun: false, 241 }) 242 res.data.feed = slices.map(slice => slice._feedPost) 243 return res 244 } 245} 246 247class MergeFeedSource_Custom extends MergeFeedSource { 248 agent: BskyAgent 249 minDate: Date 250 feedUri: string 251 userInterests?: string 252 253 constructor({ 254 agent, 255 feedUri, 256 feedTuners, 257 userInterests, 258 }: { 259 agent: BskyAgent 260 feedUri: string 261 feedTuners: FeedTunerFn[] 262 userInterests?: string 263 }) { 264 super({ 265 agent, 266 feedTuners, 267 }) 268 this.agent = agent 269 this.feedUri = feedUri 270 this.userInterests = userInterests 271 this.sourceInfo = { 272 $type: 'reasonFeedSource', 273 uri: feedUri, 274 href: feedUriToHref(feedUri), 275 } 276 this.minDate = new Date(Date.now() - POST_AGE_CUTOFF) 277 } 278 279 protected async _getFeed( 280 cursor: string | undefined, 281 limit: number, 282 ): Promise<AppBskyFeedGetTimeline.Response> { 283 try { 284 const contentLangs = getContentLanguages().join(',') 285 const isBlueskyOwned = isBlueskyOwnedFeed(this.feedUri) 286 const res = await this.agent.app.bsky.feed.getFeed( 287 { 288 cursor, 289 limit, 290 feed: this.feedUri, 291 }, 292 { 293 headers: { 294 ...(isBlueskyOwned 295 ? createBskyTopicsHeader(this.userInterests) 296 : {}), 297 'Accept-Language': contentLangs, 298 }, 299 }, 300 ) 301 // NOTE 302 // some custom feeds fail to enforce the pagination limit 303 // so we manually truncate here 304 // -prf 305 if (limit && res.data.feed.length > limit) { 306 res.data.feed = res.data.feed.slice(0, limit) 307 } 308 // filter out older posts 309 res.data.feed = res.data.feed.filter( 310 post => new Date(post.post.indexedAt) > this.minDate, 311 ) 312 // attach source info 313 for (const post of res.data.feed) { 314 post.__source = this.sourceInfo 315 } 316 return res 317 } catch { 318 // dont bubble custom-feed errors 319 return {success: false, headers: {}, data: {feed: []}} 320 } 321 } 322}