A decentralized music tracking and discovery platform built on AT Protocol 🎵

Use WAL-mode client and batch event inserts

+96 -45
+2
tap/.gitignore
··· 1 1 .env 2 2 data/ 3 3 *.db 4 + *.db-shm 5 + *.db-wal
+10 -4
tap/src/drizzle.ts
··· 1 1 import { drizzle } from "drizzle-orm/libsql"; 2 + import { createClient } from "@libsql/client"; 2 3 3 - const db = drizzle({ 4 - connection: { 5 - url: Deno.env.get("TAP_CACHE_DATABASE_URL") || "file:tap-cache.db", 6 - }, 4 + const client = createClient({ 5 + url: Deno.env.get("TAP_CACHE_DATABASE_URL") || "file:tap-cache.db", 7 6 }); 7 + 8 + await client.execute("PRAGMA journal_mode = WAL;"); 9 + await client.execute("PRAGMA busy_timeout = 5000;"); 10 + await client.execute("PRAGMA synchronous = NORMAL;"); 11 + await client.execute("PRAGMA cache_size = -10000;"); 12 + 13 + const db = drizzle(client); 8 14 9 15 export default { db };
+84 -41
tap/src/tap.ts
··· 4 4 import schema from "./schema/mod.ts"; 5 5 import _ from "@es-toolkit/es-toolkit/compat"; 6 6 import { broadcastEvent } from "./main.ts"; 7 + import type { InsertEvent } from "./schema/event.ts"; 7 8 8 9 export const TAP_WS_URL = Deno.env.get("TAP_URL") || "http://localhost:2480"; 10 + 11 + const BATCH_SIZE = 50; 12 + const BATCH_TIMEOUT_MS = 100; 9 13 10 14 export default function connectToTap() { 11 15 const tap = new Tap(TAP_WS_URL); 16 + const indexer = new SimpleIndexer(); 12 17 13 - const indexer = new SimpleIndexer(); 18 + // Batch buffers 19 + let eventBatch: InsertEvent[] = []; 20 + let batchTimer: number | null = null; 21 + 22 + async function flushBatch() { 23 + if (eventBatch.length === 0) return; 24 + 25 + const toInsert = [...eventBatch]; 26 + eventBatch = []; 27 + 28 + try { 29 + const results = await ctx.db 30 + .insert(schema.events) 31 + .values(toInsert) 32 + .onConflictDoNothing() 33 + .returning() 34 + .execute(); 35 + 36 + for (const result of results) { 37 + broadcastEvent(result); 38 + } 39 + 40 + logger.info`📝 Batch inserted ${results.length} events`; 41 + } catch (error) { 42 + logger.error`Failed to insert batch: ${error}`; 43 + } 44 + } 14 45 15 - indexer.identity(async (evt) => { 16 - const result = await ctx.db 17 - .insert(schema.events) 18 - .values({ 19 - id: evt.id, 20 - type: evt.type, 21 - did: evt.did, 22 - handle: evt.handle, 23 - status: evt.status, 24 - isActive: evt.isActive, 25 - }) 26 - .onConflictDoNothing() 27 - .returning() 28 - .execute(); 46 + function addToBatch(event: InsertEvent) { 47 + eventBatch.push(event); 48 + 49 + if (batchTimer !== null) { 50 + clearTimeout(batchTimer); 51 + } 29 52 30 - if (result.length > 0) { 31 - broadcastEvent(result[0]); 53 + if (eventBatch.length >= BATCH_SIZE) { 54 + flushBatch(); 55 + } else { 56 + batchTimer = setTimeout(() => { 57 + flushBatch(); 58 + batchTimer = null; 59 + }, BATCH_TIMEOUT_MS); 32 60 } 61 + } 62 + 63 + indexer.identity(async (evt) => { 64 + addToBatch({ 65 + id: evt.id, 66 + type: evt.type, 67 + did: evt.did, 68 + handle: evt.handle, 69 + status: evt.status, 70 + isActive: evt.isActive, 71 + action: null, 72 + rev: null, 73 + collection: null, 74 + rkey: null, 75 + record: null, 76 + cid: null, 77 + live: null, 78 + }); 33 79 34 80 logger.info`${evt.did} updated identity: ${evt.handle} (${evt.status})`; 35 81 }); 36 82 37 83 indexer.record(async (evt) => { 38 - logger.info`${evt}`; 39 - const result = await ctx.db 40 - .insert(schema.events) 41 - .values({ 42 - id: evt.id, 43 - type: evt.type, 44 - action: evt.action, 45 - did: evt.did, 46 - rev: evt.rev, 47 - collection: evt.collection, 48 - rkey: evt.rkey, 49 - record: JSON.stringify(evt.record), 50 - cid: evt.cid, 51 - live: evt.live, 52 - }) 53 - .onConflictDoNothing() 54 - .returning() 55 - .execute(); 84 + addToBatch({ 85 + id: evt.id, 86 + type: evt.type, 87 + action: evt.action, 88 + did: evt.did, 89 + rev: evt.rev, 90 + collection: evt.collection, 91 + rkey: evt.rkey, 92 + record: JSON.stringify(evt.record), 93 + cid: evt.cid, 94 + live: evt.live, 95 + handle: null, 96 + status: null, 97 + isActive: null, 98 + }); 56 99 57 - if (result.length > 0) { 58 - broadcastEvent(result[0]); 59 - } 60 - 61 - const uri = `at://${_.get(result, "[0].did")}/${_.get(result, "[0].collection")}/${_.get(result, "[0].rkey")}`; 62 - logger.info`New record inserted: ${result.length} ${uri}`; 100 + const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`; 101 + logger.info`New record: ${uri}`; 63 102 }); 64 103 65 104 indexer.error((err) => logger.error`${err}`); 66 105 67 106 const channel = tap.channel(indexer); 68 107 channel.start(); 108 + 109 + globalThis.addEventListener("beforeunload", () => { 110 + flushBatch(); 111 + }); 69 112 }