atproto relay implementation in zig zlay.waow.tech

feat: switch persistence from SQLite to Postgres

replace zqlite with pg.zig for metadata storage. schema now matches
the Go indigo relay (account, account_repo, log_file_refs, domain_ban).
removes the seqToSqlite/sqliteToSeq XOR hack — Postgres BIGINT handles
u64 natively. DATABASE_URL env var replaces RELAY_DB_PATH.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+153 -159
-1
Dockerfile
··· 3 COPY zig-out/bin/zlay /usr/local/bin/zlay 4 RUN mkdir -p /data/events 5 ENV RELAY_DATA_DIR=/data/events 6 - ENV RELAY_DB_PATH=/data/relay.sqlite 7 EXPOSE 3000 3001 8 ENTRYPOINT ["/usr/local/bin/zlay"]
··· 3 COPY zig-out/bin/zlay /usr/local/bin/zlay 4 RUN mkdir -p /data/events 5 ENV RELAY_DATA_DIR=/data/events 6 EXPOSE 3000 3001 7 ENTRYPOINT ["/usr/local/bin/zlay"]
+2 -2
build.zig
··· 12 .target = target, 13 .optimize = optimize, 14 }); 15 - const zqlite = b.dependency("zqlite", .{ 16 .target = target, 17 .optimize = optimize, 18 }); ··· 20 const imports: []const std.Build.Module.Import = &.{ 21 .{ .name = "zat", .module = zat.module("zat") }, 22 .{ .name = "websocket", .module = websocket.module("websocket") }, 23 - .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 24 }; 25 26 // relay executable
··· 12 .target = target, 13 .optimize = optimize, 14 }); 15 + const pg = b.dependency("pg", .{ 16 .target = target, 17 .optimize = optimize, 18 }); ··· 20 const imports: []const std.Build.Module.Import = &.{ 21 .{ .name = "zat", .module = zat.module("zat") }, 22 .{ .name = "websocket", .module = websocket.module("websocket") }, 23 + .{ .name = "pg", .module = pg.module("pg") }, 24 }; 25 26 // relay executable
+3 -3
build.zig.zon
··· 12 .url = "https://github.com/karlseguin/websocket.zig/archive/97fefafa59cc78ce177cff540b8685cd7f699276.tar.gz", 13 .hash = "websocket-0.1.0-ZPISdRlzAwBB_Bz2UMMqxYqF6YEVTIBoFsbzwPUJTHIc", 14 }, 15 - .zqlite = .{ 16 - .url = "https://github.com/karlseguin/zqlite.zig/archive/05a88d6758753e1c63fdd45b211dde2057094b0c.tar.gz", 17 - .hash = "zqlite-0.0.1-RWLaYz6bmAAT7E_jxopXf-j5Ea8VQldnxsd6TU8sa0Bb", 18 }, 19 }, 20 .paths = .{
··· 12 .url = "https://github.com/karlseguin/websocket.zig/archive/97fefafa59cc78ce177cff540b8685cd7f699276.tar.gz", 13 .hash = "websocket-0.1.0-ZPISdRlzAwBB_Bz2UMMqxYqF6YEVTIBoFsbzwPUJTHIc", 14 }, 15 + .pg = .{ 16 + .url = "git+https://github.com/karlseguin/pg.zig?ref=master#e58b318b7867ef065b3135983f829219c5eef891", 17 + .hash = "pg-0.0.0-Wp_7gXFoBgD0fQ72WICKa-bxLga03AXXQ3BbIsjjohQ3", 18 }, 19 }, 20 .paths = .{
+145 -150
src/event_log.zig
··· 1 //! disk persistence matching indigo's diskpersist format 2 //! 3 //! append-only log files with relay-assigned sequence numbers. 4 - //! SQLite metadata index for fast cursor→file lookup. 5 //! 6 //! on-disk entry format (28-byte LE header + CBOR payload): 7 //! [4B flags LE] [4B kind LE] [4B payload_len LE] [8B uid LE] [8B seq LE] [payload] ··· 11 //! see: indigo cmd/relay/stream/persist/diskpersist/diskpersist.go 12 13 const std = @import("std"); 14 - const zqlite = @import("zqlite"); 15 16 const Allocator = std.mem.Allocator; 17 const log = std.log.scoped(.relay); ··· 38 const default_flush_interval_ms: u64 = 100; 39 const default_flush_threshold: usize = 400; 40 41 - /// convert u64 seq to i64 for SQLite storage. 42 - /// XOR with sign bit to preserve ordering across the full u64 range. 43 - fn seqToSqlite(seq: u64) i64 { 44 - return @bitCast(seq ^ (1 << 63)); 45 - } 46 - 47 - /// convert i64 from SQLite back to u64 seq. 48 - fn sqliteToSeq(val: i64) u64 { 49 - return @as(u64, @bitCast(val)) ^ (1 << 63); 50 - } 51 - 52 // --- header --- 53 54 pub const EvtHeader = struct { ··· 90 allocator: Allocator, 91 dir_path: []const u8, 92 dir: std.fs.Dir, 93 - db: zqlite.Conn, 94 current_file: ?std.fs.File = null, 95 current_file_path: ?[]const u8 = null, 96 ··· 116 alive: std.atomic.Value(bool) = .{ .raw = true }, 117 flush_cond: std.Thread.Condition = .{}, 118 119 - pub fn init(allocator: Allocator, dir_path: []const u8, db_path: []const u8) !DiskPersist { 120 // ensure directory exists 121 std.fs.cwd().makePath(dir_path) catch |err| switch (err) { 122 error.PathAlreadyExists => {}, ··· 126 var dir = try std.fs.cwd().openDir(dir_path, .{ .iterate = true }); 127 errdefer dir.close(); 128 129 - // ensure db parent directory exists 130 - if (std.fs.path.dirname(db_path)) |parent| { 131 - std.fs.cwd().makePath(parent) catch |err| switch (err) { 132 - error.PathAlreadyExists => {}, 133 - else => return err, 134 - }; 135 - } 136 - 137 - // open SQLite 138 - const db_path_z = try allocator.dupeZ(u8, db_path); 139 - defer allocator.free(db_path_z); 140 - var db = try zqlite.open(db_path_z, zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite); 141 - errdefer db.close(); 142 - 143 - // pragmas 144 - try db.execNoArgs("PRAGMA journal_mode=WAL"); 145 - try db.execNoArgs("PRAGMA busy_timeout=5000"); 146 - try db.execNoArgs("PRAGMA synchronous=NORMAL"); 147 148 - // create tables 149 - try db.execNoArgs( 150 \\CREATE TABLE IF NOT EXISTS log_file_refs ( 151 - \\ id INTEGER PRIMARY KEY AUTOINCREMENT, 152 \\ path TEXT NOT NULL, 153 - \\ archived INTEGER NOT NULL DEFAULT 0, 154 - \\ seq_start INTEGER NOT NULL, 155 - \\ created_at TEXT NOT NULL DEFAULT (datetime('now')) 156 \\) 157 - ); 158 - try db.execNoArgs( 159 \\CREATE TABLE IF NOT EXISTS account ( 160 - \\ uid INTEGER PRIMARY KEY AUTOINCREMENT, 161 \\ did TEXT NOT NULL UNIQUE, 162 \\ status TEXT NOT NULL DEFAULT 'active', 163 - \\ rev TEXT, 164 - \\ commit_data_cid BLOB, 165 - \\ created_at TEXT NOT NULL DEFAULT (datetime('now')) 166 \\) 167 - ); 168 169 var self = DiskPersist{ 170 .allocator = allocator, 171 .dir_path = try allocator.dupe(u8, dir_path), 172 .dir = dir, 173 - .db = db, 174 }; 175 176 // recover from existing log files ··· 205 if (self.current_file) |f| f.close(); 206 if (self.current_file_path) |p| self.allocator.free(p); 207 self.dir.close(); 208 - self.db.close(); 209 self.allocator.free(self.dir_path); 210 } 211 ··· 225 } 226 227 // check database 228 - if (self.db.row( 229 - "SELECT uid FROM account WHERE did = ?", 230 .{did}, 231 - )) |maybe_row| { 232 - if (maybe_row) |r| { 233 - defer r.deinit(); 234 - const uid: u64 = @intCast(r.int(0)); 235 - // populate cache 236 - const did_duped = try self.allocator.dupe(u8, did); 237 - self.did_cache_mutex.lock(); 238 - defer self.did_cache_mutex.unlock(); 239 - self.did_cache.put(self.allocator, did_duped, uid) catch { 240 - self.allocator.free(did_duped); 241 - }; 242 - return uid; 243 - } 244 - } else |_| {} 245 246 // create new account row (ignore if already exists from concurrent insert) 247 - self.db.exec( 248 - "INSERT OR IGNORE INTO account (did) VALUES (?)", 249 .{did}, 250 ) catch |err| { 251 log.warn("failed to create account for {s}: {s}", .{ did, @errorName(err) }); ··· 253 }; 254 255 // read back the UID (whether we just created it or it already existed) 256 - const row = try self.db.row( 257 - "SELECT uid FROM account WHERE did = ?", 258 .{did}, 259 ) orelse return error.AccountCreationFailed; 260 - defer row.deinit(); 261 - const uid: u64 = @intCast(row.int(0)); 262 263 // populate cache 264 const did_duped = try self.allocator.dupe(u8, did); ··· 277 data_cid: []const u8, 278 }; 279 280 - /// get stored sync state for a user 281 pub fn getAccountState(self: *DiskPersist, uid: u64, allocator: Allocator) !?AccountState { 282 - const row = (try self.db.row( 283 - "SELECT rev, commit_data_cid FROM account WHERE uid = ? AND rev IS NOT NULL", 284 .{@as(i64, @intCast(uid))}, 285 )) orelse return null; 286 - defer row.deinit(); 287 - const rev = row.text(0); 288 - const data_cid = row.blob(1); 289 if (rev.len == 0 or data_cid.len == 0) return null; 290 return .{ 291 .rev = try allocator.dupe(u8, rev), ··· 293 }; 294 } 295 296 - /// update stored sync state after a verified commit 297 pub fn updateAccountState(self: *DiskPersist, uid: u64, rev: []const u8, data_cid: []const u8) !void { 298 - try self.db.exec( 299 - "UPDATE account SET rev = ?, commit_data_cid = ? WHERE uid = ?", 300 - .{ rev, data_cid, @as(i64, @intCast(uid)) }, 301 ); 302 } 303 ··· 341 } 342 343 /// playback events with seq > since. calls cb for each event. 344 - pub fn playback(self: *DiskPersist, since: u64, allocator: Allocator, result: *std.ArrayListUnmanaged(PlaybackEntry)) !void { 345 self.mutex.lock(); 346 defer self.mutex.unlock(); 347 348 // find the log file containing `since` 349 var start_files: std.ArrayListUnmanaged(LogFileRef) = .{}; ··· 351 352 if (since > 0) { 353 // find file whose seq_start is just before `since` 354 - if (self.db.row("SELECT id, path, seq_start FROM log_file_refs WHERE seq_start <= ? ORDER BY seq_start DESC LIMIT 1", .{seqToSqlite(since)})) |row| { 355 - if (row) |r| { 356 - defer r.deinit(); 357 - try start_files.append(allocator, .{ 358 - .path = try allocator.dupe(u8, r.text(1)), 359 - .seq_start = sqliteToSeq(r.int(2)), 360 - }); 361 - } 362 - } else |_| {} 363 } 364 365 // find all subsequent files 366 { 367 - var rows = try self.db.rows("SELECT id, path, seq_start FROM log_file_refs WHERE seq_start > ? ORDER BY seq_start ASC", .{seqToSqlite(since)}); 368 - defer rows.deinit(); 369 - while (rows.next()) |r| { 370 try start_files.append(allocator, .{ 371 - .path = try allocator.dupe(u8, r.text(1)), 372 - .seq_start = sqliteToSeq(r.int(2)), 373 }); 374 } 375 } ··· 380 for (start_files.items) |ref| { 381 var file = self.dir.openFile(ref.path, .{}) catch continue; 382 defer file.close(); 383 - try readEventsFrom(allocator, file, since, result); 384 } 385 } 386 ··· 395 self.mutex.lock(); 396 defer self.mutex.unlock(); 397 398 - const cutoff_hours = self.retention_hours; 399 - const cutoff_sql = try std.fmt.allocPrint(self.allocator, "-{d} hours", .{cutoff_hours}); 400 - defer self.allocator.free(cutoff_sql); 401 402 // find expired refs 403 var expired: std.ArrayListUnmanaged(GcRef) = .{}; ··· 407 } 408 409 { 410 - var rows = try self.db.rows( 411 - "SELECT id, path FROM log_file_refs WHERE created_at < datetime('now', ?)", 412 - .{cutoff_sql}, 413 ); 414 - defer rows.deinit(); 415 - while (rows.next()) |r| { 416 try expired.append(self.allocator, .{ 417 - .id = r.int(0), 418 - .path = try self.allocator.dupe(u8, r.text(1)), 419 }); 420 } 421 } ··· 427 } 428 429 // delete db record first (prevents playback from finding it) 430 - self.db.exec("DELETE FROM log_file_refs WHERE id = ?", .{ref.id}) catch |err| { 431 log.warn("gc: failed to delete db record {d}: {s}", .{ ref.id, @errorName(err) }); 432 continue; 433 }; ··· 456 } 457 458 { 459 - var rows = try self.db.rows("SELECT path FROM log_file_refs ORDER BY seq_start DESC", .{}); 460 - defer rows.deinit(); 461 - while (rows.next()) |r| { 462 - try refs.append(self.allocator, try self.allocator.dupe(u8, r.text(0))); 463 } 464 } 465 ··· 476 477 fn resumeLog(self: *DiskPersist) !void { 478 // find most recent log file 479 - const r = try self.db.row("SELECT id, path, seq_start FROM log_file_refs ORDER BY seq_start DESC LIMIT 1", .{}); 480 - if (r) |row| { 481 - defer row.deinit(); 482 - const path = row.text(1); 483 - const seq_start: u64 = sqliteToSeq(row.int(2)); 484 485 var file = self.dir.openFile(path, .{ .mode = .read_write }) catch { 486 // file missing, start fresh ··· 527 self.current_file = try self.dir.createFile(name, .{ .truncate = false }); 528 self.current_file_path = try self.allocator.dupe(u8, name); 529 530 - // register in SQLite 531 - try self.db.exec( 532 - "INSERT INTO log_file_refs (path, seq_start) VALUES (?, ?)", 533 - .{ name, seqToSqlite(start_seq) }, 534 ); 535 536 self.event_counter = 0; ··· 753 try std.testing.expectEqual(@as(u8, 0x01), buf[9]); 754 } 755 756 test "persist and playback" { 757 var tmp = std.testing.tmpDir(.{}); 758 defer tmp.cleanup(); 759 760 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 761 defer std.testing.allocator.free(dir_path); 762 763 - const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 764 - defer std.testing.allocator.free(db_path); 765 - 766 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 767 defer dp.deinit(); 768 769 // persist some events (sync flush, no background thread) ··· 798 } 799 800 test "playback with cursor" { 801 var tmp = std.testing.tmpDir(.{}); 802 defer tmp.cleanup(); 803 804 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 805 defer std.testing.allocator.free(dir_path); 806 807 - const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 808 - defer std.testing.allocator.free(db_path); 809 - 810 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 811 defer dp.deinit(); 812 813 _ = try dp.persist(.commit, 1, "a"); ··· 833 } 834 835 test "seq recovery after reinit" { 836 var tmp = std.testing.tmpDir(.{}); 837 defer tmp.cleanup(); 838 839 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 840 defer std.testing.allocator.free(dir_path); 841 842 - const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 843 - defer std.testing.allocator.free(db_path); 844 - 845 // write some events 846 { 847 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 848 defer dp.deinit(); 849 _ = try dp.persist(.commit, 1, "x"); 850 _ = try dp.persist(.commit, 2, "y"); ··· 856 857 // reinit — should recover seq 858 { 859 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 860 defer dp.deinit(); 861 try std.testing.expectEqual(@as(u64, 3), dp.lastSeq().?); 862 const seq4 = try dp.persist(.commit, 1, "w"); ··· 865 } 866 867 test "takedown zeros payload" { 868 var tmp = std.testing.tmpDir(.{}); 869 defer tmp.cleanup(); 870 871 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 872 defer std.testing.allocator.free(dir_path); 873 874 - const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 875 - defer std.testing.allocator.free(db_path); 876 - 877 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 878 defer dp.deinit(); 879 880 _ = try dp.persist(.commit, 42, "secret-data"); ··· 901 } 902 903 test "uidForDid assigns and caches UIDs" { 904 var tmp = std.testing.tmpDir(.{}); 905 defer tmp.cleanup(); 906 907 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 908 defer std.testing.allocator.free(dir_path); 909 910 - const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 911 - defer std.testing.allocator.free(db_path); 912 - 913 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 914 defer dp.deinit(); 915 916 // first call creates the account ··· 928 } 929 930 test "uidForDid survives reinit" { 931 var tmp = std.testing.tmpDir(.{}); 932 defer tmp.cleanup(); 933 934 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 935 defer std.testing.allocator.free(dir_path); 936 937 - const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 938 - defer std.testing.allocator.free(db_path); 939 - 940 var uid1: u64 = undefined; 941 { 942 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 943 defer dp.deinit(); 944 uid1 = try dp.uidForDid("did:plc:carol"); 945 } 946 947 // reinit — UID should be the same from database 948 { 949 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 950 defer dp.deinit(); 951 const uid1_again = try dp.uidForDid("did:plc:carol"); 952 try std.testing.expectEqual(uid1, uid1_again); ··· 954 } 955 956 test "takedown with real UIDs" { 957 var tmp = std.testing.tmpDir(.{}); 958 defer tmp.cleanup(); 959 960 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 961 defer std.testing.allocator.free(dir_path); 962 963 - const db_path = try std.fmt.allocPrint(std.testing.allocator, "{s}/relay.sqlite", .{dir_path}); 964 - defer std.testing.allocator.free(db_path); 965 - 966 - var dp = try DiskPersist.init(std.testing.allocator, dir_path, db_path); 967 defer dp.deinit(); 968 969 const alice_uid = try dp.uidForDid("did:plc:alice");
··· 1 //! disk persistence matching indigo's diskpersist format 2 //! 3 //! append-only log files with relay-assigned sequence numbers. 4 + //! Postgres metadata index for fast cursor→file lookup. 5 //! 6 //! on-disk entry format (28-byte LE header + CBOR payload): 7 //! [4B flags LE] [4B kind LE] [4B payload_len LE] [8B uid LE] [8B seq LE] [payload] ··· 11 //! see: indigo cmd/relay/stream/persist/diskpersist/diskpersist.go 12 13 const std = @import("std"); 14 + const pg = @import("pg"); 15 16 const Allocator = std.mem.Allocator; 17 const log = std.log.scoped(.relay); ··· 38 const default_flush_interval_ms: u64 = 100; 39 const default_flush_threshold: usize = 400; 40 41 // --- header --- 42 43 pub const EvtHeader = struct { ··· 79 allocator: Allocator, 80 dir_path: []const u8, 81 dir: std.fs.Dir, 82 + db: *pg.Pool, 83 current_file: ?std.fs.File = null, 84 current_file_path: ?[]const u8 = null, 85 ··· 105 alive: std.atomic.Value(bool) = .{ .raw = true }, 106 flush_cond: std.Thread.Condition = .{}, 107 108 + pub fn init(allocator: Allocator, dir_path: []const u8, database_url: []const u8) !DiskPersist { 109 // ensure directory exists 110 std.fs.cwd().makePath(dir_path) catch |err| switch (err) { 111 error.PathAlreadyExists => {}, ··· 115 var dir = try std.fs.cwd().openDir(dir_path, .{ .iterate = true }); 116 errdefer dir.close(); 117 118 + // connect to Postgres 119 + const uri = std.Uri.parse(database_url) catch return error.InvalidDatabaseUrl; 120 + const pool = try pg.Pool.initUri(allocator, uri, .{ .size = 5 }); 121 + errdefer pool.deinit(); 122 123 + // create tables (matching indigo's Go relay schema) 124 + _ = try pool.exec( 125 \\CREATE TABLE IF NOT EXISTS log_file_refs ( 126 + \\ id BIGSERIAL PRIMARY KEY, 127 \\ path TEXT NOT NULL, 128 + \\ archived BOOLEAN NOT NULL DEFAULT false, 129 + \\ seq_start BIGINT NOT NULL, 130 + \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now() 131 \\) 132 + , .{}); 133 + 134 + _ = try pool.exec( 135 \\CREATE TABLE IF NOT EXISTS account ( 136 + \\ uid BIGSERIAL PRIMARY KEY, 137 \\ did TEXT NOT NULL UNIQUE, 138 \\ status TEXT NOT NULL DEFAULT 'active', 139 + \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now() 140 \\) 141 + , .{}); 142 + 143 + _ = try pool.exec( 144 + \\CREATE TABLE IF NOT EXISTS account_repo ( 145 + \\ uid BIGINT PRIMARY KEY REFERENCES account(uid), 146 + \\ rev TEXT NOT NULL, 147 + \\ commit_data_cid TEXT NOT NULL 148 + \\) 149 + , .{}); 150 + 151 + _ = try pool.exec( 152 + \\CREATE TABLE IF NOT EXISTS domain_ban ( 153 + \\ id BIGSERIAL PRIMARY KEY, 154 + \\ domain TEXT NOT NULL UNIQUE, 155 + \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now() 156 + \\) 157 + , .{}); 158 159 var self = DiskPersist{ 160 .allocator = allocator, 161 .dir_path = try allocator.dupe(u8, dir_path), 162 .dir = dir, 163 + .db = pool, 164 }; 165 166 // recover from existing log files ··· 195 if (self.current_file) |f| f.close(); 196 if (self.current_file_path) |p| self.allocator.free(p); 197 self.dir.close(); 198 + self.db.deinit(); 199 self.allocator.free(self.dir_path); 200 } 201 ··· 215 } 216 217 // check database 218 + if (try self.db.rowUnsafe( 219 + "SELECT uid FROM account WHERE did = $1", 220 .{did}, 221 + )) |row| { 222 + var r = row; 223 + defer r.deinit() catch {}; 224 + const uid: u64 = @intCast(r.get(i64, 0)); 225 + // populate cache 226 + const did_duped = try self.allocator.dupe(u8, did); 227 + self.did_cache_mutex.lock(); 228 + defer self.did_cache_mutex.unlock(); 229 + self.did_cache.put(self.allocator, did_duped, uid) catch { 230 + self.allocator.free(did_duped); 231 + }; 232 + return uid; 233 + } 234 235 // create new account row (ignore if already exists from concurrent insert) 236 + _ = self.db.exec( 237 + "INSERT INTO account (did) VALUES ($1) ON CONFLICT (did) DO NOTHING", 238 .{did}, 239 ) catch |err| { 240 log.warn("failed to create account for {s}: {s}", .{ did, @errorName(err) }); ··· 242 }; 243 244 // read back the UID (whether we just created it or it already existed) 245 + var row = try self.db.rowUnsafe( 246 + "SELECT uid FROM account WHERE did = $1", 247 .{did}, 248 ) orelse return error.AccountCreationFailed; 249 + defer row.deinit() catch {}; 250 + const uid: u64 = @intCast(row.get(i64, 0)); 251 252 // populate cache 253 const did_duped = try self.allocator.dupe(u8, did); ··· 266 data_cid: []const u8, 267 }; 268 269 + /// get stored sync state for a user (from account_repo table) 270 pub fn getAccountState(self: *DiskPersist, uid: u64, allocator: Allocator) !?AccountState { 271 + var row = (try self.db.rowUnsafe( 272 + "SELECT rev, commit_data_cid FROM account_repo WHERE uid = $1", 273 .{@as(i64, @intCast(uid))}, 274 )) orelse return null; 275 + defer row.deinit() catch {}; 276 + const rev = row.get([]const u8, 0); 277 + const data_cid = row.get([]const u8, 1); 278 if (rev.len == 0 or data_cid.len == 0) return null; 279 return .{ 280 .rev = try allocator.dupe(u8, rev), ··· 282 }; 283 } 284 285 + /// update stored sync state after a verified commit (upsert into account_repo) 286 pub fn updateAccountState(self: *DiskPersist, uid: u64, rev: []const u8, data_cid: []const u8) !void { 287 + _ = try self.db.exec( 288 + "INSERT INTO account_repo (uid, rev, commit_data_cid) VALUES ($1, $2, $3) ON CONFLICT (uid) DO UPDATE SET rev = EXCLUDED.rev, commit_data_cid = EXCLUDED.commit_data_cid", 289 + .{ @as(i64, @intCast(uid)), rev, data_cid }, 290 ); 291 } 292 ··· 330 } 331 332 /// playback events with seq > since. calls cb for each event. 333 + pub fn playback(self: *DiskPersist, since: u64, allocator: Allocator, entries: *std.ArrayListUnmanaged(PlaybackEntry)) !void { 334 self.mutex.lock(); 335 defer self.mutex.unlock(); 336 + 337 + const since_i: i64 = @intCast(since); 338 339 // find the log file containing `since` 340 var start_files: std.ArrayListUnmanaged(LogFileRef) = .{}; ··· 342 343 if (since > 0) { 344 // find file whose seq_start is just before `since` 345 + if (try self.db.rowUnsafe( 346 + "SELECT id, path, seq_start FROM log_file_refs WHERE seq_start <= $1 ORDER BY seq_start DESC LIMIT 1", 347 + .{since_i}, 348 + )) |row| { 349 + var r = row; 350 + defer r.deinit() catch {}; 351 + try start_files.append(allocator, .{ 352 + .path = try allocator.dupe(u8, r.get([]const u8, 1)), 353 + .seq_start = @intCast(r.get(i64, 2)), 354 + }); 355 + } 356 } 357 358 // find all subsequent files 359 { 360 + var result = try self.db.query( 361 + "SELECT id, path, seq_start FROM log_file_refs WHERE seq_start > $1 ORDER BY seq_start ASC", 362 + .{since_i}, 363 + ); 364 + defer result.deinit(); 365 + while (result.nextUnsafe() catch null) |r| { 366 try start_files.append(allocator, .{ 367 + .path = try allocator.dupe(u8, r.get([]const u8, 1)), 368 + .seq_start = @intCast(r.get(i64, 2)), 369 }); 370 } 371 } ··· 376 for (start_files.items) |ref| { 377 var file = self.dir.openFile(ref.path, .{}) catch continue; 378 defer file.close(); 379 + try readEventsFrom(allocator, file, since, entries); 380 } 381 } 382 ··· 391 self.mutex.lock(); 392 defer self.mutex.unlock(); 393 394 + const cutoff_interval = try std.fmt.allocPrint(self.allocator, "{d} hours", .{self.retention_hours}); 395 + defer self.allocator.free(cutoff_interval); 396 397 // find expired refs 398 var expired: std.ArrayListUnmanaged(GcRef) = .{}; ··· 402 } 403 404 { 405 + var result = try self.db.query( 406 + "SELECT id, path FROM log_file_refs WHERE created_at < now() - $1::interval", 407 + .{cutoff_interval}, 408 ); 409 + defer result.deinit(); 410 + while (result.nextUnsafe() catch null) |r| { 411 try expired.append(self.allocator, .{ 412 + .id = r.get(i64, 0), 413 + .path = try self.allocator.dupe(u8, r.get([]const u8, 1)), 414 }); 415 } 416 } ··· 422 } 423 424 // delete db record first (prevents playback from finding it) 425 + _ = self.db.exec("DELETE FROM log_file_refs WHERE id = $1", .{ref.id}) catch |err| { 426 log.warn("gc: failed to delete db record {d}: {s}", .{ ref.id, @errorName(err) }); 427 continue; 428 }; ··· 451 } 452 453 { 454 + var result = try self.db.query("SELECT path FROM log_file_refs ORDER BY seq_start DESC", .{}); 455 + defer result.deinit(); 456 + while (result.nextUnsafe() catch null) |r| { 457 + try refs.append(self.allocator, try self.allocator.dupe(u8, r.get([]const u8, 0))); 458 } 459 } 460 ··· 471 472 fn resumeLog(self: *DiskPersist) !void { 473 // find most recent log file 474 + if (try self.db.rowUnsafe( 475 + "SELECT id, path, seq_start FROM log_file_refs ORDER BY seq_start DESC LIMIT 1", 476 + .{}, 477 + )) |row| { 478 + var r = row; 479 + defer r.deinit() catch {}; 480 + const path = r.get([]const u8, 1); 481 + const seq_start: u64 = @intCast(r.get(i64, 2)); 482 483 var file = self.dir.openFile(path, .{ .mode = .read_write }) catch { 484 // file missing, start fresh ··· 525 self.current_file = try self.dir.createFile(name, .{ .truncate = false }); 526 self.current_file_path = try self.allocator.dupe(u8, name); 527 528 + // register in Postgres 529 + _ = try self.db.exec( 530 + "INSERT INTO log_file_refs (path, seq_start) VALUES ($1, $2)", 531 + .{ name, @as(i64, @intCast(start_seq)) }, 532 ); 533 534 self.event_counter = 0; ··· 751 try std.testing.expectEqual(@as(u8, 0x01), buf[9]); 752 } 753 754 + fn requireDatabaseUrl() ![]const u8 { 755 + return std.posix.getenv("DATABASE_URL") orelse return error.SkipZigTest; 756 + } 757 + 758 test "persist and playback" { 759 + const database_url = try requireDatabaseUrl(); 760 + 761 var tmp = std.testing.tmpDir(.{}); 762 defer tmp.cleanup(); 763 764 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 765 defer std.testing.allocator.free(dir_path); 766 767 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 768 defer dp.deinit(); 769 770 // persist some events (sync flush, no background thread) ··· 799 } 800 801 test "playback with cursor" { 802 + const database_url = try requireDatabaseUrl(); 803 + 804 var tmp = std.testing.tmpDir(.{}); 805 defer tmp.cleanup(); 806 807 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 808 defer std.testing.allocator.free(dir_path); 809 810 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 811 defer dp.deinit(); 812 813 _ = try dp.persist(.commit, 1, "a"); ··· 833 } 834 835 test "seq recovery after reinit" { 836 + const database_url = try requireDatabaseUrl(); 837 + 838 var tmp = std.testing.tmpDir(.{}); 839 defer tmp.cleanup(); 840 841 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 842 defer std.testing.allocator.free(dir_path); 843 844 // write some events 845 { 846 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 847 defer dp.deinit(); 848 _ = try dp.persist(.commit, 1, "x"); 849 _ = try dp.persist(.commit, 2, "y"); ··· 855 856 // reinit — should recover seq 857 { 858 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 859 defer dp.deinit(); 860 try std.testing.expectEqual(@as(u64, 3), dp.lastSeq().?); 861 const seq4 = try dp.persist(.commit, 1, "w"); ··· 864 } 865 866 test "takedown zeros payload" { 867 + const database_url = try requireDatabaseUrl(); 868 + 869 var tmp = std.testing.tmpDir(.{}); 870 defer tmp.cleanup(); 871 872 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 873 defer std.testing.allocator.free(dir_path); 874 875 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 876 defer dp.deinit(); 877 878 _ = try dp.persist(.commit, 42, "secret-data"); ··· 899 } 900 901 test "uidForDid assigns and caches UIDs" { 902 + const database_url = try requireDatabaseUrl(); 903 + 904 var tmp = std.testing.tmpDir(.{}); 905 defer tmp.cleanup(); 906 907 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 908 defer std.testing.allocator.free(dir_path); 909 910 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 911 defer dp.deinit(); 912 913 // first call creates the account ··· 925 } 926 927 test "uidForDid survives reinit" { 928 + const database_url = try requireDatabaseUrl(); 929 + 930 var tmp = std.testing.tmpDir(.{}); 931 defer tmp.cleanup(); 932 933 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 934 defer std.testing.allocator.free(dir_path); 935 936 var uid1: u64 = undefined; 937 { 938 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 939 defer dp.deinit(); 940 uid1 = try dp.uidForDid("did:plc:carol"); 941 } 942 943 // reinit — UID should be the same from database 944 { 945 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 946 defer dp.deinit(); 947 const uid1_again = try dp.uidForDid("did:plc:carol"); 948 try std.testing.expectEqual(uid1, uid1_again); ··· 950 } 951 952 test "takedown with real UIDs" { 953 + const database_url = try requireDatabaseUrl(); 954 + 955 var tmp = std.testing.tmpDir(.{}); 956 defer tmp.cleanup(); 957 958 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 959 defer std.testing.allocator.free(dir_path); 960 961 + var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 962 defer dp.deinit(); 963 964 const alice_uid = try dp.uidForDid("did:plc:alice");
+3 -3
src/main.zig
··· 62 defer val.deinit(); 63 try val.start(); 64 65 - // init disk persistence (indigo-compatible diskpersist format + SQLite index) 66 - const db_path = std.posix.getenv("RELAY_DB_PATH") orelse "data/relay.sqlite"; 67 - var dp = event_log_mod.DiskPersist.init(allocator, data_dir, db_path) catch |err| { 68 log.err("failed to init disk persist at {s}: {s}", .{ data_dir, @errorName(err) }); 69 return err; 70 };
··· 62 defer val.deinit(); 63 try val.start(); 64 65 + // init disk persistence (indigo-compatible diskpersist format + Postgres index) 66 + const database_url = std.posix.getenv("DATABASE_URL") orelse "postgres://relay:relay@localhost:5432/relay"; 67 + var dp = event_log_mod.DiskPersist.init(allocator, data_dir, database_url) catch |err| { 68 log.err("failed to init disk persist at {s}: {s}", .{ data_dir, @errorName(err) }); 69 return err; 70 };