A decentralized music tracking and discovery platform built on AT Protocol 🎵

Improve batch flushing and DB pragmas

Serialize batch flushes with a flushPromise and requeue failed
events to enable retries. Clear timers and flush immediately when
batch is full. Reduce batch size to 100 and increase timeout to
100ms.

Update SQLite PRAGMAs: busy_timeout=30000, wal_autocheckpoint=1000,
temp_store=MEMORY

+46 -25
+8 -1
tap/src/drizzle.ts
··· 6 6 }); 7 7 8 8 await client.execute("PRAGMA journal_mode = WAL;"); 9 - await client.execute("PRAGMA busy_timeout = 5000;"); 9 + 10 + await client.execute("PRAGMA busy_timeout = 30000;"); 11 + 10 12 await client.execute("PRAGMA synchronous = NORMAL;"); 13 + 11 14 await client.execute("PRAGMA cache_size = -10000;"); 15 + 16 + await client.execute("PRAGMA wal_autocheckpoint = 1000;"); 17 + 18 + await client.execute("PRAGMA temp_store = MEMORY;"); 12 19 13 20 const db = drizzle(client); 14 21
+38 -24
tap/src/tap.ts
··· 8 8 9 9 export const TAP_WS_URL = Deno.env.get("TAP_URL") || "http://localhost:2480"; 10 10 11 - const BATCH_SIZE = 200; 12 - const BATCH_TIMEOUT_MS = 50; 11 + const BATCH_SIZE = 100; 12 + const BATCH_TIMEOUT_MS = 100; 13 13 14 14 export default function connectToTap() { 15 15 const tap = new Tap(TAP_WS_URL); ··· 17 17 18 18 let eventBatch: InsertEvent[] = []; 19 19 let batchTimer: number | null = null; 20 - let isFlushingBatch = false; 20 + let flushPromise: Promise<void> | null = null; 21 21 22 22 async function flushBatch() { 23 - if (eventBatch.length === 0 || isFlushingBatch) return; 23 + if (flushPromise) { 24 + await flushPromise; 25 + return; 26 + } 24 27 25 - isFlushingBatch = true; 26 - const toInsert = [...eventBatch]; 27 - eventBatch = []; 28 + if (eventBatch.length === 0) return; 28 29 29 - try { 30 - logger.info`🔄 Flushing batch of ${toInsert.length} events...`; 30 + flushPromise = (async () => { 31 + const toInsert = [...eventBatch]; 32 + eventBatch = []; 31 33 32 - const results = await ctx.db 33 - .insert(schema.events) 34 - .values(toInsert) 35 - .onConflictDoNothing() 36 - .returning() 37 - .execute(); 34 + try { 35 + logger.info`🔄 Flushing batch of ${toInsert.length} events...`; 38 36 39 - for (const result of results) { 40 - broadcastEvent(result); 37 + const results = await ctx.db 38 + .insert(schema.events) 39 + .values(toInsert) 40 + .onConflictDoNothing() 41 + .returning() 42 + .execute(); 43 + 44 + for (const result of results) { 45 + broadcastEvent(result); 46 + } 47 + 48 + logger.info`📝 Batch inserted ${results.length} events`; 49 + } catch (error) { 50 + logger.error`Failed to insert batch: ${error}`; 51 + // Re-add failed events to the front of the batch for retry 52 + eventBatch = [...toInsert, ...eventBatch]; 53 + } finally { 54 + flushPromise = null; 41 55 } 56 + })(); 42 57 43 - logger.info`📝 Batch inserted ${results.length} events`; 44 - } catch (error) { 45 - logger.error`Failed to insert batch: ${error}`; 46 - } finally { 47 - isFlushingBatch = false; 48 - } 58 + await flushPromise; 49 59 } 50 60 51 61 function addToBatch(event: InsertEvent) { 52 62 eventBatch.push(event); 53 63 64 + // Clear existing timer 54 65 if (batchTimer !== null) { 55 66 clearTimeout(batchTimer); 67 + batchTimer = null; 56 68 } 57 69 70 + // Flush immediately if batch is full 58 71 if (eventBatch.length >= BATCH_SIZE) { 59 72 flushBatch().catch((err) => logger.error`Flush error: ${err}`); 60 73 } else { 74 + // Set timer to flush after timeout 61 75 batchTimer = setTimeout(() => { 76 + batchTimer = null; 62 77 flushBatch().catch((err) => logger.error`Flush error: ${err}`); 63 - batchTimer = null; 64 78 }, BATCH_TIMEOUT_MS); 65 79 } 66 80 }