Bluesky app fork with some witchin' additions 💫

Retry clops (#3800)

* Add convo retries, sketch out tests

* Only append nextMessage to messages

* Remove debug code

authored by

Eric Bailey and committed by
GitHub
fc0eab2d 333ccdad

+222 -83
+9
src/screens/Messages/Conversation/MessagesList.tsx
··· 7 7 import {isWeb} from 'platform/detection' 8 8 import {MessageInput} from '#/screens/Messages/Conversation/MessageInput' 9 9 import {MessageItem} from '#/screens/Messages/Conversation/MessageItem' 10 + import {Button, ButtonText} from '#/components/Button' 10 11 import {Loader} from '#/components/Loader' 11 12 import {Text} from '#/components/Typography' 12 13 ··· 31 32 return <Text>Deleted message</Text> 32 33 } else if (item.type === 'pending-message') { 33 34 return <Text>{item.message.text}</Text> 35 + } else if (item.type === 'pending-retry') { 36 + return ( 37 + <View> 38 + <Button label="Retry" onPress={item.retry}> 39 + <ButtonText>Retry</ButtonText> 40 + </Button> 41 + </View> 42 + ) 34 43 } 35 44 36 45 return null
-38
src/state/messages/__tests__/client.test.ts
··· 1 - import {describe, it} from '@jest/globals' 2 - 3 - describe(`#/state/dms/client`, () => { 4 - describe(`ChatsService`, () => { 5 - describe(`unread count`, () => { 6 - it.todo(`marks a chat as read, decrements total unread count`) 7 - }) 8 - 9 - describe(`log processing`, () => { 10 - /* 11 - * We receive a new chat log AND messages for it in the same batch. We 12 - * need to first initialize the chat, then process the received logs. 13 - */ 14 - describe(`handles new chats and subsequent messages received in same log batch`, () => { 15 - it.todo(`receives new chat and messages`) 16 - it.todo( 17 - `receives new chat, new messages come in while still initializing new chat`, 18 - ) 19 - }) 20 - }) 21 - 22 - describe(`reset state`, () => { 23 - it.todo(`after period of inactivity, rehydrates entirely fresh state`) 24 - }) 25 - }) 26 - 27 - describe(`ChatService`, () => { 28 - describe(`history fetching`, () => { 29 - it.todo(`fetches initial chat history`) 30 - it.todo(`fetches additional chat history`) 31 - it.todo(`handles history fetch failure`) 32 - }) 33 - 34 - describe(`optimistic updates`, () => { 35 - it.todo(`adds sending messages`) 36 - }) 37 - }) 38 - })
+57
src/state/messages/__tests__/convo.test.ts
··· 1 + import {describe, it} from '@jest/globals' 2 + 3 + describe(`#/state/messages/convo`, () => { 4 + describe(`status states`, () => { 5 + it.todo(`cannot re-initialize from a non-unintialized state`) 6 + it.todo(`can re-initialize from a failed state`) 7 + 8 + describe(`destroy`, () => { 9 + it.todo(`cannot be interacted with when destroyed`) 10 + it.todo(`polling is stopped when destroyed`) 11 + it.todo(`events are cleaned up when destroyed`) 12 + }) 13 + }) 14 + 15 + describe(`history fetching`, () => { 16 + it.todo(`fetches initial chat history`) 17 + it.todo(`fetches additional chat history`) 18 + it.todo(`handles history fetch failure`) 19 + it.todo(`does not insert deleted messages`) 20 + }) 21 + 22 + describe(`sending messages`, () => { 23 + it.todo(`optimistically adds sending messages`) 24 + it.todo(`sends messages in order`) 25 + it.todo(`failed message send fails all sending messages`) 26 + it.todo(`can retry all failed messages via retry ConvoItem`) 27 + it.todo( 28 + `successfully sent messages are re-ordered, if needed, by events received from server`, 29 + ) 30 + }) 31 + 32 + describe(`deleting messages`, () => { 33 + it.todo(`messages are optimistically deleted from the chat`) 34 + it.todo(`messages are confirmed deleted via events from the server`) 35 + }) 36 + 37 + describe(`log handling`, () => { 38 + it.todo(`updates rev to latest message received`) 39 + it.todo(`only handles log events for this convoId`) 40 + it.todo(`does not insert deleted messages`) 41 + }) 42 + 43 + describe(`item ordering`, () => { 44 + it.todo(`pending items are first, and in order`) 45 + it.todo(`new message items are next, and in order`) 46 + it.todo(`past message items are next, and in order`) 47 + }) 48 + 49 + describe(`inactivity`, () => { 50 + it.todo( 51 + `below a certain threshold of inactivity, restore entirely from log`, 52 + ) 53 + it.todo( 54 + `above a certain threshold of inactivity, rehydrate entirely fresh state`, 55 + ) 56 + }) 57 + })
+156 -45
src/state/messages/convo.ts
··· 6 6 import {EventEmitter} from 'eventemitter3' 7 7 import {nanoid} from 'nanoid/non-secure' 8 8 9 + import {isNative} from '#/platform/detection' 10 + 9 11 export type ConvoParams = { 10 12 convoId: string 11 13 agent: BskyAgent ··· 44 46 key: string 45 47 message: ChatBskyConvoSendMessage.InputSchema['message'] 46 48 } 49 + | { 50 + type: 'pending-retry' 51 + key: string 52 + retry: () => void 53 + } 47 54 48 55 export type ConvoState = 49 56 | { ··· 66 73 status: ConvoStatus.Destroyed 67 74 } 68 75 76 + export function isConvoItemMessage( 77 + item: ConvoItem, 78 + ): item is ConvoItem & {type: 'message'} { 79 + if (!item) return false 80 + return ( 81 + item.type === 'message' || 82 + item.type === 'deleted-message' || 83 + item.type === 'pending-message' 84 + ) 85 + } 86 + 69 87 export class Convo { 70 88 private convoId: string 71 89 private agent: BskyAgent ··· 90 108 string, 91 109 {id: string; message: ChatBskyConvoSendMessage.InputSchema['message']} 92 110 > = new Map() 111 + private footerItems: Map<string, ConvoItem> = new Map() 93 112 94 113 private pendingEventIngestion: Promise<void> | undefined 114 + private isProcessingPendingMessages = false 95 115 96 116 constructor(params: ConvoParams) { 97 117 this.convoId = params.convoId ··· 165 185 { 166 186 cursor: this.historyCursor, 167 187 convoId: this.convoId, 168 - limit: 20, 188 + limit: isNative ? 25 : 50, 169 189 }, 170 190 { 171 191 headers: { ··· 230 250 /* 231 251 * This is VERY important. We don't want to insert any messages from 232 252 * your other chats. 233 - * 234 - * TODO there may be a better way to handle this 235 253 */ 236 254 if (log.convoId !== this.convoId) continue 237 255 ··· 241 259 ) { 242 260 if (this.newMessages.has(log.message.id)) { 243 261 // Trust the log as the source of truth on ordering 244 - // TODO test this 245 262 this.newMessages.delete(log.message.id) 246 263 } 247 264 this.newMessages.set(log.message.id, log.message) ··· 269 286 this.commit() 270 287 } 271 288 289 + async processPendingMessages() { 290 + const pendingMessage = Array.from(this.pendingMessages.values()).shift() 291 + 292 + /* 293 + * If there are no pending messages, we're done. 294 + */ 295 + if (!pendingMessage) { 296 + this.isProcessingPendingMessages = false 297 + return 298 + } 299 + 300 + try { 301 + this.isProcessingPendingMessages = true 302 + 303 + // throw new Error('UNCOMMENT TO TEST RETRY') 304 + const {id, message} = pendingMessage 305 + 306 + const response = await this.agent.api.chat.bsky.convo.sendMessage( 307 + { 308 + convoId: this.convoId, 309 + message, 310 + }, 311 + { 312 + encoding: 'application/json', 313 + headers: { 314 + Authorization: this.__tempFromUserDid, 315 + }, 316 + }, 317 + ) 318 + const res = response.data 319 + 320 + /* 321 + * Insert into `newMessages` as soon as we have a real ID. That way, when 322 + * we get an event log back, we can replace in situ. 323 + */ 324 + this.newMessages.set(res.id, { 325 + ...res, 326 + $type: 'chat.bsky.convo.defs#messageView', 327 + sender: this.convo?.members.find(m => m.did === this.__tempFromUserDid), 328 + }) 329 + this.pendingMessages.delete(id) 330 + 331 + await this.processPendingMessages() 332 + 333 + this.commit() 334 + } catch (e) { 335 + this.footerItems.set('pending-retry', { 336 + type: 'pending-retry', 337 + key: 'pending-retry', 338 + retry: this.batchRetryPendingMessages.bind(this), 339 + }) 340 + this.commit() 341 + } 342 + } 343 + 344 + async batchRetryPendingMessages() { 345 + this.footerItems.delete('pending-retry') 346 + this.commit() 347 + 348 + try { 349 + const messageArray = Array.from(this.pendingMessages.values()) 350 + const {data} = await this.agent.api.chat.bsky.convo.sendMessageBatch( 351 + { 352 + items: messageArray.map(({message}) => ({ 353 + convoId: this.convoId, 354 + message, 355 + })), 356 + }, 357 + { 358 + encoding: 'application/json', 359 + headers: { 360 + Authorization: this.__tempFromUserDid, 361 + }, 362 + }, 363 + ) 364 + const {items} = data 365 + 366 + /* 367 + * Insert into `newMessages` as soon as we have a real ID. That way, when 368 + * we get an event log back, we can replace in situ. 369 + */ 370 + for (const item of items) { 371 + this.newMessages.set(item.id, { 372 + ...item, 373 + $type: 'chat.bsky.convo.defs#messageView', 374 + sender: this.convo?.members.find( 375 + m => m.did === this.__tempFromUserDid, 376 + ), 377 + }) 378 + } 379 + 380 + for (const pendingMessage of messageArray) { 381 + this.pendingMessages.delete(pendingMessage.id) 382 + } 383 + 384 + this.commit() 385 + } catch (e) { 386 + this.footerItems.set('pending-retry', { 387 + type: 'pending-retry', 388 + key: 'pending-retry', 389 + retry: this.batchRetryPendingMessages.bind(this), 390 + }) 391 + this.commit() 392 + } 393 + } 394 + 272 395 async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) { 273 396 if (this.status === ConvoStatus.Destroyed) return 274 397 // Ignore empty messages for now since they have no other purpose atm 275 - if (!message.text) return 398 + if (!message.text.trim()) return 276 399 277 400 const tempId = nanoid() 278 401 ··· 282 405 }) 283 406 this.commit() 284 407 285 - await new Promise(y => setTimeout(y, 500)) 286 - const response = await this.agent.api.chat.bsky.convo.sendMessage( 287 - { 288 - convoId: this.convoId, 289 - message, 290 - }, 291 - { 292 - encoding: 'application/json', 293 - headers: { 294 - Authorization: this.__tempFromUserDid, 295 - }, 296 - }, 297 - ) 298 - const res = response.data 299 - 300 - /* 301 - * Insert into `newMessages` as soon as we have a real ID. That way, when 302 - * we get an event log back, we can replace in situ. 303 - */ 304 - this.newMessages.set(res.id, { 305 - ...res, 306 - $type: 'chat.bsky.convo.defs#messageView', 307 - sender: this.convo?.members.find(m => m.did === this.__tempFromUserDid), 308 - }) 309 - this.pendingMessages.delete(tempId) 310 - 311 - this.commit() 408 + if (!this.isProcessingPendingMessages) { 409 + this.processPendingMessages() 410 + } 312 411 } 313 412 314 413 /* ··· 345 444 }) 346 445 }) 347 446 447 + this.footerItems.forEach(item => { 448 + items.unshift(item) 449 + }) 450 + 348 451 this.pastMessages.forEach(m => { 349 452 if (ChatBskyConvoDefs.isMessageView(m)) { 350 453 items.push({ ··· 365 468 366 469 return items.map((item, i) => { 367 470 let nextMessage = null 471 + const isMessage = isConvoItemMessage(item) 368 472 369 - if ( 370 - ChatBskyConvoDefs.isMessageView(item.message) || 371 - ChatBskyConvoDefs.isDeletedMessageView(item.message) 372 - ) { 373 - const next = items[i - 1] 473 + if (isMessage) { 374 474 if ( 375 - next && 376 - (ChatBskyConvoDefs.isMessageView(next.message) || 377 - ChatBskyConvoDefs.isDeletedMessageView(next.message)) 475 + isMessage && 476 + (ChatBskyConvoDefs.isMessageView(item.message) || 477 + ChatBskyConvoDefs.isDeletedMessageView(item.message)) 378 478 ) { 379 - nextMessage = next.message 479 + const next = items[i - 1] 480 + 481 + if ( 482 + isConvoItemMessage(next) && 483 + next && 484 + (ChatBskyConvoDefs.isMessageView(next.message) || 485 + ChatBskyConvoDefs.isDeletedMessageView(next.message)) 486 + ) { 487 + nextMessage = next.message 488 + } 380 489 } 381 - } 382 490 383 - return { 384 - ...item, 385 - nextMessage, 491 + return { 492 + ...item, 493 + nextMessage, 494 + } 386 495 } 496 + 497 + return item 387 498 }) 388 499 } 389 500