atproto relay implementation in zig zlay.waow.tech

feat: multi-source backfill progress tracking

Change backfill_progress PK from (collection) to (collection, source)
so the same collection can be imported from multiple relays independently.
Enables backfilling from relay.waow.tech alongside bsky.network.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+26 -10
+8 -8
src/backfill.zig
··· 81 81 // insert progress rows (skip existing) 82 82 for (collections) |collection| { 83 83 _ = self.db.exec( 84 - "INSERT INTO backfill_progress (collection, source) VALUES ($1, $2) ON CONFLICT (collection) DO NOTHING", 84 + "INSERT INTO backfill_progress (collection, source) VALUES ($1, $2) ON CONFLICT (collection, source) DO NOTHING", 85 85 .{ collection, self.source }, 86 86 ) catch |err| { 87 87 log.warn("failed to insert progress for {s}: {s}", .{ collection, @errorName(err) }); ··· 200 200 var imported: i64 = 0; 201 201 { 202 202 var row = (self.db.rowUnsafe( 203 - "SELECT completed_at IS NOT NULL, cursor, imported_count FROM backfill_progress WHERE collection = $1", 204 - .{collection}, 203 + "SELECT completed_at IS NOT NULL, cursor, imported_count FROM backfill_progress WHERE collection = $1 AND source = $2", 204 + .{ collection, self.source }, 205 205 ) catch return error.DatabaseError) orelse return; 206 206 defer row.deinit() catch {}; 207 207 ··· 243 243 // update cursor in progress table 244 244 const new_cursor = fetch_result.next_cursor orelse ""; 245 245 _ = self.db.exec( 246 - "UPDATE backfill_progress SET cursor = $1, imported_count = $2 WHERE collection = $3", 247 - .{ new_cursor, imported, collection }, 246 + "UPDATE backfill_progress SET cursor = $1, imported_count = $2 WHERE collection = $3 AND source = $4", 247 + .{ new_cursor, imported, collection, self.source }, 248 248 ) catch {}; 249 249 250 250 if (fetch_result.next_cursor) |nc| { ··· 257 257 } else { 258 258 // no more pages — mark complete 259 259 _ = self.db.exec( 260 - "UPDATE backfill_progress SET completed_at = now(), cursor = '', imported_count = $1 WHERE collection = $2", 261 - .{ imported, collection }, 260 + "UPDATE backfill_progress SET completed_at = now(), cursor = '', imported_count = $1 WHERE collection = $2 AND source = $3", 261 + .{ imported, collection, self.source }, 262 262 ) catch {}; 263 263 log.info("{s}: complete ({d} DIDs, {d} pages)", .{ collection, imported, page_count }); 264 264 break; ··· 360 360 361 361 // per-collection detail 362 362 var result = self.db.query( 363 - "SELECT collection, source, cursor, imported_count, completed_at IS NOT NULL FROM backfill_progress ORDER BY collection", 363 + "SELECT collection, source, cursor, imported_count, completed_at IS NOT NULL FROM backfill_progress ORDER BY collection, source", 364 364 .{}, 365 365 ) catch return error.DatabaseError; 366 366 defer result.deinit();
+18 -2
src/event_log.zig
··· 177 177 178 178 _ = try pool.exec( 179 179 \\CREATE TABLE IF NOT EXISTS backfill_progress ( 180 - \\ collection TEXT PRIMARY KEY, 180 + \\ collection TEXT NOT NULL, 181 181 \\ source TEXT NOT NULL, 182 182 \\ cursor TEXT NOT NULL DEFAULT '', 183 183 \\ imported_count BIGINT NOT NULL DEFAULT 0, 184 184 \\ completed_at TIMESTAMPTZ, 185 - \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now() 185 + \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now(), 186 + \\ PRIMARY KEY (collection, source) 186 187 \\) 187 188 , .{}); 189 + 190 + // migrate: old schema had collection as sole PK — add source to composite PK 191 + _ = pool.exec( 192 + \\DO $$ BEGIN 193 + \\ IF EXISTS ( 194 + \\ SELECT 1 FROM pg_constraint 195 + \\ WHERE conname = 'backfill_progress_pkey' 196 + \\ AND conrelid = 'backfill_progress'::regclass 197 + \\ AND array_length(conkey, 1) = 1 198 + \\ ) THEN 199 + \\ ALTER TABLE backfill_progress DROP CONSTRAINT backfill_progress_pkey; 200 + \\ ALTER TABLE backfill_progress ADD PRIMARY KEY (collection, source); 201 + \\ END IF; 202 + \\END $$ 203 + , .{}) catch {}; 188 204 189 205 var self = DiskPersist{ 190 206 .allocator = allocator,