Bluesky app fork with some witchin' additions 馃挮
at main 331 lines 8.3 kB view raw
1import { 2 type AppBskyFeedDefs, 3 type AppBskyFeedGetTimeline, 4 type BskyAgent, 5} from '@atproto/api' 6import shuffle from 'lodash.shuffle' 7 8import {bundleAsync} from '#/lib/async/bundle' 9import {timeout} from '#/lib/async/timeout' 10import {feedUriToHref} from '#/lib/strings/url-helpers' 11import {getContentLanguages} from '#/state/preferences/languages' 12import {type FeedParams} from '#/state/queries/post-feed' 13import {FeedTuner} from '../feed-manip' 14import {type FeedTunerFn} from '../feed-manip' 15import { 16 type FeedAPI, 17 type FeedAPIResponse, 18 type ReasonFeedSource, 19} from './types' 20import {createBskyTopicsHeader, isBlueskyOwnedFeed} from './utils' 21 22const REQUEST_WAIT_MS = 500 // 500ms 23const POST_AGE_CUTOFF = 60e3 * 60 * 24 // 24hours 24 25export class MergeFeedAPI implements FeedAPI { 26 userInterests?: string 27 agent: BskyAgent 28 params: FeedParams 29 feedTuners: FeedTunerFn[] 30 following: MergeFeedSource_Following 31 customFeeds: MergeFeedSource_Custom[] = [] 32 feedCursor = 0 33 itemCursor = 0 34 sampleCursor = 0 35 36 constructor({ 37 agent, 38 feedParams, 39 feedTuners, 40 userInterests, 41 }: { 42 agent: BskyAgent 43 feedParams: FeedParams 44 feedTuners: FeedTunerFn[] 45 userInterests?: string 46 }) { 47 this.agent = agent 48 this.params = feedParams 49 this.feedTuners = feedTuners 50 this.userInterests = userInterests 51 this.following = new MergeFeedSource_Following({ 52 agent: this.agent, 53 feedTuners: this.feedTuners, 54 }) 55 } 56 57 reset() { 58 this.following = new MergeFeedSource_Following({ 59 agent: this.agent, 60 feedTuners: this.feedTuners, 61 }) 62 this.customFeeds = [] 63 this.feedCursor = 0 64 this.itemCursor = 0 65 this.sampleCursor = 0 66 if (this.params.mergeFeedSources) { 67 this.customFeeds = shuffle( 68 this.params.mergeFeedSources.map( 69 feedUri => 70 new MergeFeedSource_Custom({ 71 agent: this.agent, 72 feedUri, 73 feedTuners: this.feedTuners, 74 userInterests: this.userInterests, 75 }), 76 ), 77 ) 78 } else { 79 this.customFeeds = [] 80 } 81 } 82 83 async peekLatest(): Promise<AppBskyFeedDefs.FeedViewPost> { 84 const res = await this.agent.getTimeline({ 85 limit: 1, 86 }) 87 return res.data.feed[0] 88 } 89 90 async fetch({ 91 cursor, 92 limit, 93 }: { 94 cursor: string | undefined 95 limit: number 96 }): Promise<FeedAPIResponse> { 97 if (!cursor) { 98 this.reset() 99 } 100 101 const promises = [] 102 103 // always keep following topped up 104 if (this.following.numReady < limit) { 105 await this.following.fetchNext(60) 106 } 107 108 // pick the next feeds to sample from 109 const feeds = this.customFeeds.slice(this.feedCursor, this.feedCursor + 3) 110 this.feedCursor += 3 111 if (this.feedCursor > this.customFeeds.length) { 112 this.feedCursor = 0 113 } 114 115 // top up the feeds 116 const outOfFollows = 117 !this.following.hasMore && this.following.numReady < limit 118 if (this.params.mergeFeedEnabled || outOfFollows) { 119 for (const feed of feeds) { 120 if (feed.numReady < 5) { 121 promises.push(feed.fetchNext(10)) 122 } 123 } 124 } 125 126 // wait for requests (all capped at a fixed timeout) 127 await Promise.all(promises) 128 129 // assemble a response by sampling from feeds with content 130 const posts: AppBskyFeedDefs.FeedViewPost[] = [] 131 while (posts.length < limit) { 132 let slice = this.sampleItem() 133 if (slice[0]) { 134 posts.push(slice[0]) 135 } else { 136 break 137 } 138 } 139 140 return { 141 cursor: String(this.itemCursor), 142 feed: posts, 143 } 144 } 145 146 sampleItem() { 147 const i = this.itemCursor++ 148 const candidateFeeds = this.customFeeds.filter(f => f.numReady > 0) 149 const canSample = candidateFeeds.length > 0 150 const hasFollows = this.following.hasMore 151 const hasFollowsReady = this.following.numReady > 0 152 153 // this condition establishes the frequency that custom feeds are woven into follows 154 const shouldSample = 155 this.params.mergeFeedEnabled && 156 i >= 15 && 157 candidateFeeds.length >= 2 && 158 (i % 4 === 0 || i % 5 === 0) 159 160 if (!canSample && !hasFollows) { 161 // no data available 162 return [] 163 } 164 if (shouldSample || !hasFollows) { 165 // time to sample, or the user isnt following anybody 166 return candidateFeeds[this.sampleCursor++ % candidateFeeds.length].take(1) 167 } 168 if (!hasFollowsReady) { 169 // stop here so more follows can be fetched 170 return [] 171 } 172 // provide follow 173 return this.following.take(1) 174 } 175} 176 177class MergeFeedSource { 178 agent: BskyAgent 179 feedTuners: FeedTunerFn[] 180 sourceInfo: ReasonFeedSource | undefined 181 cursor: string | undefined = undefined 182 queue: AppBskyFeedDefs.FeedViewPost[] = [] 183 hasMore = true 184 185 constructor({ 186 agent, 187 feedTuners, 188 }: { 189 agent: BskyAgent 190 feedTuners: FeedTunerFn[] 191 }) { 192 this.agent = agent 193 this.feedTuners = feedTuners 194 } 195 196 get numReady() { 197 return this.queue.length 198 } 199 200 get needsFetch() { 201 return this.hasMore && this.queue.length === 0 202 } 203 204 take(n: number): AppBskyFeedDefs.FeedViewPost[] { 205 return this.queue.splice(0, n) 206 } 207 208 async fetchNext(n: number) { 209 await Promise.race([this._fetchNextInner(n), timeout(REQUEST_WAIT_MS)]) 210 } 211 212 _fetchNextInner = bundleAsync(async (n: number) => { 213 const res = await this._getFeed(this.cursor, n) 214 if (res.success) { 215 this.cursor = res.data.cursor 216 if (res.data.feed.length) { 217 this.queue = this.queue.concat(res.data.feed) 218 } else { 219 this.hasMore = false 220 } 221 } else { 222 this.hasMore = false 223 } 224 }) 225 226 protected _getFeed( 227 _cursor: string | undefined, 228 _limit: number, 229 ): Promise<AppBskyFeedGetTimeline.Response> { 230 throw new Error('Must be overridden') 231 } 232} 233 234class MergeFeedSource_Following extends MergeFeedSource { 235 tuner = new FeedTuner(this.feedTuners) 236 237 async fetchNext(n: number) { 238 return this._fetchNextInner(n) 239 } 240 241 protected async _getFeed( 242 cursor: string | undefined, 243 limit: number, 244 ): Promise<AppBskyFeedGetTimeline.Response> { 245 const res = await this.agent.getTimeline({cursor, limit}) 246 // run the tuner pre-emptively to ensure better mixing 247 const slices = this.tuner.tune(res.data.feed, { 248 dryRun: false, 249 }) 250 res.data.feed = slices.map(slice => slice._feedPost) 251 return res 252 } 253} 254 255class MergeFeedSource_Custom extends MergeFeedSource { 256 agent: BskyAgent 257 minDate: Date 258 feedUri: string 259 userInterests?: string 260 261 constructor({ 262 agent, 263 feedUri, 264 feedTuners, 265 userInterests, 266 }: { 267 agent: BskyAgent 268 feedUri: string 269 feedTuners: FeedTunerFn[] 270 userInterests?: string 271 }) { 272 super({ 273 agent, 274 feedTuners, 275 }) 276 this.agent = agent 277 this.feedUri = feedUri 278 this.userInterests = userInterests 279 this.sourceInfo = { 280 $type: 'reasonFeedSource', 281 uri: feedUri, 282 href: feedUriToHref(feedUri), 283 } 284 this.minDate = new Date(Date.now() - POST_AGE_CUTOFF) 285 } 286 287 protected async _getFeed( 288 cursor: string | undefined, 289 limit: number, 290 ): Promise<AppBskyFeedGetTimeline.Response> { 291 try { 292 const contentLangs = getContentLanguages().join(',') 293 const isBlueskyOwned = isBlueskyOwnedFeed(this.feedUri) 294 const res = await this.agent.app.bsky.feed.getFeed( 295 { 296 cursor, 297 limit, 298 feed: this.feedUri, 299 }, 300 { 301 headers: { 302 ...(isBlueskyOwned 303 ? createBskyTopicsHeader(this.userInterests) 304 : {}), 305 'Accept-Language': contentLangs, 306 }, 307 }, 308 ) 309 // NOTE 310 // some custom feeds fail to enforce the pagination limit 311 // so we manually truncate here 312 // -prf 313 if (limit && res.data.feed.length > limit) { 314 res.data.feed = res.data.feed.slice(0, limit) 315 } 316 // filter out older posts 317 res.data.feed = res.data.feed.filter( 318 post => new Date(post.post.indexedAt) > this.minDate, 319 ) 320 // attach source info 321 for (const post of res.data.feed) { 322 // @ts-ignore 323 post.__source = this.sourceInfo 324 } 325 return res 326 } catch { 327 // dont bubble custom-feed errors 328 return {success: false, headers: {}, data: {feed: []}} 329 } 330 } 331}