atproto relay implementation in zig zlay.waow.tech
at main 1354 lines 48 kB view raw
1//! disk persistence matching indigo's diskpersist format 2//! 3//! append-only log files with relay-assigned sequence numbers. 4//! Postgres metadata index for fast cursor→file lookup. 5//! 6//! on-disk entry format (28-byte LE header + CBOR payload): 7//! [4B flags LE] [4B kind LE] [4B payload_len LE] [8B uid LE] [8B seq LE] [payload] 8//! 9//! file naming: evts-{startSeq} (rotated every events_per_file events) 10//! 11//! see: indigo cmd/relay/stream/persist/diskpersist/diskpersist.go 12 13const std = @import("std"); 14const pg = @import("pg"); 15const lru = @import("lru.zig"); 16 17const Allocator = std.mem.Allocator; 18const log = std.log.scoped(.relay); 19 20// --- constants matching indigo --- 21 22pub const header_size: usize = 28; // 4 + 4 + 4 + 8 + 8 23 24pub const EvtKind = enum(u32) { 25 commit = 1, 26 handle = 2, // deprecated 27 tombstone = 3, // deprecated 28 identity = 4, 29 account = 5, 30 sync = 6, 31}; 32 33pub const EvtFlags = struct { 34 pub const takedown: u32 = 1; 35 pub const rebased: u32 = 2; 36}; 37 38const default_events_per_file: u32 = 10_000; 39const default_flush_interval_ms: u64 = 100; 40const default_flush_threshold: usize = 400; 41 42// --- header --- 43 44pub const EvtHeader = struct { 45 flags: u32, 46 kind: u32, 47 len: u32, // payload length (not including header) 48 uid: u64, 49 seq: u64, 50 51 pub fn encode(self: EvtHeader, buf: *[header_size]u8) void { 52 std.mem.writeInt(u32, buf[0..4], self.flags, .little); 53 std.mem.writeInt(u32, buf[4..8], self.kind, .little); 54 std.mem.writeInt(u32, buf[8..12], self.len, .little); 55 std.mem.writeInt(u64, buf[12..20], self.uid, .little); 56 std.mem.writeInt(u64, buf[20..28], self.seq, .little); 57 } 58 59 pub fn decode(buf: *const [header_size]u8) EvtHeader { 60 return .{ 61 .flags = std.mem.readInt(u32, buf[0..4], .little), 62 .kind = std.mem.readInt(u32, buf[4..8], .little), 63 .len = std.mem.readInt(u32, buf[8..12], .little), 64 .uid = std.mem.readInt(u64, buf[12..20], .little), 65 .seq = std.mem.readInt(u64, buf[20..28], .little), 66 }; 67 } 68}; 69 70// --- persist job (write buffer entry) --- 71 72const PersistJob = struct { 73 data: []u8, // header + payload, owned 74 seq: u64, // assigned seq (for broadcast ordering) 75}; 76 77// --- disk persistence --- 78 79pub const DiskPersist = struct { 80 allocator: Allocator, 81 dir_path: []const u8, 82 dir: std.fs.Dir, 83 db: *pg.Pool, 84 current_file: ?std.fs.File = null, 85 current_file_path: ?[]const u8 = null, 86 87 // sequence state 88 cur_seq: u64 = 1, 89 event_counter: u64 = 0, 90 91 // config 92 events_per_file: u32 = default_events_per_file, 93 retention_hours: u64 = 72, // 3 days 94 95 // DID → UID cache (matches indigo's bidirectional ARC cache) 96 did_cache: lru.LruCache(u64), 97 98 // write buffer (flushed periodically or when threshold hit) 99 outbuf: std.ArrayListUnmanaged(u8) = .{}, 100 evtbuf: std.ArrayListUnmanaged(PersistJob) = .{}, 101 mutex: std.Thread.Mutex = .{}, 102 103 // flush thread 104 flush_thread: ?std.Thread = null, 105 alive: std.atomic.Value(bool) = .{ .raw = true }, 106 flush_cond: std.Thread.Condition = .{}, 107 108 /// current evtbuf entry count (for metrics — non-blocking, returns 0 if lock is contended) 109 pub fn evtbufLen(self: *DiskPersist) usize { 110 if (!self.mutex.tryLock()) return 0; 111 defer self.mutex.unlock(); 112 return self.evtbuf.items.len; 113 } 114 115 /// current DID cache entry count (for metrics) 116 pub fn didCacheLen(self: *DiskPersist) u32 { 117 return self.did_cache.count(); 118 } 119 120 /// DID cache hashmap backing capacity (for memory attribution) 121 pub fn didCacheMapCap(self: *DiskPersist) u32 { 122 return self.did_cache.mapCapacity(); 123 } 124 125 /// evtbuf allocated capacity in jobs (for memory attribution — non-blocking) 126 pub fn evtbufCap(self: *DiskPersist) usize { 127 if (!self.mutex.tryLock()) return 0; 128 defer self.mutex.unlock(); 129 return self.evtbuf.capacity; 130 } 131 132 /// outbuf allocated capacity in bytes (for memory attribution — non-blocking) 133 pub fn outbufCap(self: *DiskPersist) usize { 134 if (!self.mutex.tryLock()) return 0; 135 defer self.mutex.unlock(); 136 return self.outbuf.capacity; 137 } 138 139 pub fn init(allocator: Allocator, dir_path: []const u8, database_url: []const u8) !DiskPersist { 140 // ensure directory exists 141 std.fs.cwd().makePath(dir_path) catch |err| switch (err) { 142 error.PathAlreadyExists => {}, 143 else => return err, 144 }; 145 146 var dir = try std.fs.cwd().openDir(dir_path, .{ .iterate = true }); 147 errdefer dir.close(); 148 149 // connect to Postgres 150 const uri = std.Uri.parse(database_url) catch return error.InvalidDatabaseUrl; 151 const pool = try pg.Pool.initUri(allocator, uri, .{ .size = 5 }); 152 errdefer pool.deinit(); 153 154 // create tables (matching indigo's Go relay schema) 155 _ = try pool.exec( 156 \\CREATE TABLE IF NOT EXISTS log_file_refs ( 157 \\ id BIGSERIAL PRIMARY KEY, 158 \\ path TEXT NOT NULL, 159 \\ archived BOOLEAN NOT NULL DEFAULT false, 160 \\ seq_start BIGINT NOT NULL, 161 \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now() 162 \\) 163 , .{}); 164 165 _ = try pool.exec( 166 \\CREATE TABLE IF NOT EXISTS account ( 167 \\ uid BIGSERIAL PRIMARY KEY, 168 \\ did TEXT NOT NULL UNIQUE, 169 \\ host_id BIGINT NOT NULL DEFAULT 0, 170 \\ status TEXT NOT NULL DEFAULT 'active', 171 \\ upstream_status TEXT NOT NULL DEFAULT 'active', 172 \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now() 173 \\) 174 , .{}); 175 176 // migration: add columns if they don't exist (for existing deployments) 177 _ = pool.exec("ALTER TABLE account ADD COLUMN IF NOT EXISTS host_id BIGINT NOT NULL DEFAULT 0", .{}) catch {}; 178 _ = pool.exec("ALTER TABLE account ADD COLUMN IF NOT EXISTS upstream_status TEXT NOT NULL DEFAULT 'active'", .{}) catch {}; 179 180 _ = try pool.exec( 181 \\CREATE TABLE IF NOT EXISTS account_repo ( 182 \\ uid BIGINT PRIMARY KEY REFERENCES account(uid), 183 \\ rev TEXT NOT NULL, 184 \\ commit_data_cid TEXT NOT NULL 185 \\) 186 , .{}); 187 188 _ = try pool.exec( 189 \\CREATE TABLE IF NOT EXISTS domain_ban ( 190 \\ id BIGSERIAL PRIMARY KEY, 191 \\ domain TEXT NOT NULL UNIQUE, 192 \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now() 193 \\) 194 , .{}); 195 196 _ = try pool.exec( 197 \\CREATE TABLE IF NOT EXISTS host ( 198 \\ id BIGSERIAL PRIMARY KEY, 199 \\ hostname TEXT NOT NULL UNIQUE, 200 \\ status TEXT NOT NULL DEFAULT 'active', 201 \\ last_seq BIGINT NOT NULL DEFAULT 0, 202 \\ failed_attempts INTEGER NOT NULL DEFAULT 0, 203 \\ account_limit BIGINT, 204 \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now(), 205 \\ updated_at TIMESTAMPTZ NOT NULL DEFAULT now() 206 \\) 207 , .{}); 208 209 // migration: add account_limit column to existing host tables 210 _ = try pool.exec("ALTER TABLE host ADD COLUMN IF NOT EXISTS account_limit BIGINT", .{}); 211 212 _ = try pool.exec( 213 \\CREATE TABLE IF NOT EXISTS backfill_progress ( 214 \\ collection TEXT NOT NULL, 215 \\ source TEXT NOT NULL, 216 \\ cursor TEXT NOT NULL DEFAULT '', 217 \\ imported_count BIGINT NOT NULL DEFAULT 0, 218 \\ completed_at TIMESTAMPTZ, 219 \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now(), 220 \\ PRIMARY KEY (collection, source) 221 \\) 222 , .{}); 223 224 // migrate: old schema had collection as sole PK — add source to composite PK 225 _ = pool.exec( 226 \\DO $$ BEGIN 227 \\ IF EXISTS ( 228 \\ SELECT 1 FROM pg_constraint 229 \\ WHERE conname = 'backfill_progress_pkey' 230 \\ AND conrelid = 'backfill_progress'::regclass 231 \\ AND array_length(conkey, 1) = 1 232 \\ ) THEN 233 \\ ALTER TABLE backfill_progress DROP CONSTRAINT backfill_progress_pkey; 234 \\ ALTER TABLE backfill_progress ADD PRIMARY KEY (collection, source); 235 \\ END IF; 236 \\END $$ 237 , .{}) catch {}; 238 239 var self = DiskPersist{ 240 .allocator = allocator, 241 .dir_path = try allocator.dupe(u8, dir_path), 242 .dir = dir, 243 .db = pool, 244 .did_cache = lru.LruCache(u64).init(allocator, 500_000), 245 }; 246 247 // recover from existing log files 248 try self.resumeLog(); 249 250 return self; 251 } 252 253 pub fn deinit(self: *DiskPersist) void { 254 // stop flush thread 255 self.alive.store(false, .release); 256 self.flush_cond.signal(); 257 if (self.flush_thread) |t| t.join(); 258 259 // flush remaining 260 self.mutex.lock(); 261 self.flushLocked() catch {}; 262 self.mutex.unlock(); 263 264 // free write buffer 265 for (self.evtbuf.items) |job| self.allocator.free(job.data); 266 self.evtbuf.deinit(self.allocator); 267 self.outbuf.deinit(self.allocator); 268 269 self.did_cache.deinit(); 270 271 if (self.current_file) |f| f.close(); 272 if (self.current_file_path) |p| self.allocator.free(p); 273 self.dir.close(); 274 self.db.deinit(); 275 self.allocator.free(self.dir_path); 276 } 277 278 /// start the background flush thread 279 pub fn start(self: *DiskPersist) !void { 280 self.flush_thread = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, flushLoop, .{self}); 281 } 282 283 pub const UidResult = struct { 284 uid: u64, 285 host_changed: bool = false, 286 is_new: bool = false, 287 }; 288 289 /// resolve a DID to a numeric UID, checking host association. 290 /// does NOT set host_id — caller must verify authority first via DID doc 291 /// resolution, then call setAccountHostId on success. 292 /// returns is_new=true when no host_id is set yet (first encounter), 293 /// host_changed=true when host_id differs from stored value. 294 pub fn uidForDidFromHost(self: *DiskPersist, did: []const u8, host_id: u64) !UidResult { 295 const uid = try self.uidForDid(did); 296 if (host_id > 0) { 297 const current_host = self.getAccountHostId(uid) catch 0; 298 if (current_host == 0) { 299 return .{ .uid = uid, .is_new = true }; 300 } else if (current_host != host_id) { 301 log.info("account {s} (uid={d}) host mismatch: current={d} new={d}", .{ did, uid, current_host, host_id }); 302 return .{ .uid = uid, .host_changed = true }; 303 } 304 } 305 return .{ .uid = uid }; 306 } 307 308 /// resolve a DID to a numeric UID. creates a new account row on first encounter. 309 /// matches indigo's Relay.DidToUid → Account.UID mapping. 310 pub fn uidForDid(self: *DiskPersist, did: []const u8) !u64 { 311 // fast path: check in-memory cache 312 if (self.did_cache.get(did)) |uid| return uid; 313 314 // check database 315 if (try self.db.rowUnsafe( 316 "SELECT uid FROM account WHERE did = $1", 317 .{did}, 318 )) |row| { 319 var r = row; 320 defer r.deinit() catch {}; 321 const uid: u64 = @intCast(r.get(i64, 0)); 322 self.didCachePut(did, uid); 323 return uid; 324 } 325 326 // create new account row (ignore if already exists from concurrent insert) 327 _ = self.db.exec( 328 "INSERT INTO account (did) VALUES ($1) ON CONFLICT (did) DO NOTHING", 329 .{did}, 330 ) catch |err| { 331 log.warn("failed to create account for {s}: {s}", .{ did, @errorName(err) }); 332 return err; 333 }; 334 335 // read back the UID (whether we just created it or it already existed) 336 var row = try self.db.rowUnsafe( 337 "SELECT uid FROM account WHERE did = $1", 338 .{did}, 339 ) orelse return error.AccountCreationFailed; 340 defer row.deinit() catch {}; 341 const uid: u64 = @intCast(row.get(i64, 0)); 342 343 self.didCachePut(did, uid); 344 return uid; 345 } 346 347 /// insert into did_cache (LRU handles eviction at capacity). 348 fn didCachePut(self: *DiskPersist, did: []const u8, uid: u64) void { 349 self.did_cache.put(did, uid) catch {}; 350 } 351 352 /// per-DID sync state for chain tracking 353 pub const AccountState = struct { 354 rev: []const u8, 355 data_cid: []const u8, 356 }; 357 358 /// get stored sync state for a user (from account_repo table) 359 pub fn getAccountState(self: *DiskPersist, uid: u64, allocator: Allocator) !?AccountState { 360 var row = (try self.db.rowUnsafe( 361 "SELECT rev, commit_data_cid FROM account_repo WHERE uid = $1", 362 .{@as(i64, @intCast(uid))}, 363 )) orelse return null; 364 defer row.deinit() catch {}; 365 const rev = row.get([]const u8, 0); 366 const data_cid = row.get([]const u8, 1); 367 if (rev.len == 0 or data_cid.len == 0) return null; 368 return .{ 369 .rev = try allocator.dupe(u8, rev), 370 .data_cid = try allocator.dupe(u8, data_cid), 371 }; 372 } 373 374 /// update stored sync state after a verified commit (conditional upsert into account_repo). 375 /// uses WHERE clause to ensure rev only moves forward, preventing race conditions 376 /// when concurrent workers process commits for the same DID. 377 /// returns true if the state was updated, false if a newer rev was already stored. 378 pub fn updateAccountState(self: *DiskPersist, uid: u64, rev: []const u8, data_cid: []const u8) !bool { 379 const rows_affected = try self.db.exec( 380 "INSERT INTO account_repo (uid, rev, commit_data_cid) VALUES ($1, $2, $3) ON CONFLICT (uid) DO UPDATE SET rev = EXCLUDED.rev, commit_data_cid = EXCLUDED.commit_data_cid WHERE account_repo.rev < EXCLUDED.rev", 381 .{ @as(i64, @intCast(uid)), rev, data_cid }, 382 ); 383 return (rows_affected orelse 0) > 0; 384 } 385 386 // --- account status --- 387 388 /// get the host_id for an account. returns 0 if not set. 389 pub fn getAccountHostId(self: *DiskPersist, uid: u64) !u64 { 390 var row = (try self.db.rowUnsafe( 391 "SELECT host_id FROM account WHERE uid = $1", 392 .{@as(i64, @intCast(uid))}, 393 )) orelse return 0; 394 defer row.deinit() catch {}; 395 const hid = row.get(i64, 0); 396 return if (hid > 0) @intCast(hid) else 0; 397 } 398 399 /// count accounts on a host (for rate limit scaling, matches Go relay's host.AccountCount) 400 pub fn getHostAccountCount(self: *DiskPersist, host_id: u64) u64 { 401 var row = (self.db.rowUnsafe( 402 "SELECT COUNT(*) FROM account WHERE host_id = $1", 403 .{@as(i64, @intCast(host_id))}, 404 ) catch return 0) orelse return 0; 405 defer row.deinit() catch {}; 406 const count = row.get(i64, 0); 407 return if (count > 0) @intCast(count) else 0; 408 } 409 410 /// effective account count for rate limit scaling. 411 /// uses admin-configured limit if set, otherwise actual COUNT(*). 412 pub fn getEffectiveAccountCount(self: *DiskPersist, host_id: u64) u64 { 413 var row = (self.db.rowUnsafe( 414 "SELECT COALESCE(h.account_limit, COUNT(a.uid)) FROM host h LEFT JOIN account a ON a.host_id = h.id WHERE h.id = $1 GROUP BY h.id", 415 .{@as(i64, @intCast(host_id))}, 416 ) catch return 0) orelse return 0; 417 defer row.deinit() catch {}; 418 const count = row.get(i64, 0); 419 return if (count > 0) @intCast(count) else 0; 420 } 421 422 /// set admin-configured account limit for a host (overrides COUNT(*) for rate limiting). 423 /// pass null to clear the override and revert to actual COUNT(*). 424 pub fn setHostAccountLimit(self: *DiskPersist, host_id: u64, limit: ?u64) !void { 425 if (limit) |l| { 426 const clamped: i64 = if (l > @as(u64, @intCast(std.math.maxInt(i64)))) std.math.maxInt(i64) else @intCast(l); 427 _ = try self.db.exec( 428 "UPDATE host SET account_limit = $2, updated_at = now() WHERE id = $1", 429 .{ @as(i64, @intCast(host_id)), clamped }, 430 ); 431 } else { 432 _ = try self.db.exec( 433 "UPDATE host SET account_limit = NULL, updated_at = now() WHERE id = $1", 434 .{@as(i64, @intCast(host_id))}, 435 ); 436 } 437 } 438 439 /// set the host_id for an account (first encounter or migration) 440 pub fn setAccountHostId(self: *DiskPersist, uid: u64, host_id: u64) !void { 441 _ = try self.db.exec( 442 "UPDATE account SET host_id = $2 WHERE uid = $1", 443 .{ @as(i64, @intCast(uid)), @as(i64, @intCast(host_id)) }, 444 ); 445 } 446 447 /// update the upstream status for an account (from #account events). 448 /// Go relay: account.go UpdateAccountUpstreamStatus 449 pub fn updateAccountUpstreamStatus(self: *DiskPersist, uid: u64, upstream_status: []const u8) !void { 450 _ = try self.db.exec( 451 "UPDATE account SET upstream_status = $2 WHERE uid = $1", 452 .{ @as(i64, @intCast(uid)), upstream_status }, 453 ); 454 } 455 456 /// check if an account is active (both local status and upstream status). 457 /// Go relay: models.Account.IsActive() 458 pub fn isAccountActive(self: *DiskPersist, uid: u64) !bool { 459 var row = (try self.db.rowUnsafe( 460 "SELECT status, upstream_status FROM account WHERE uid = $1", 461 .{@as(i64, @intCast(uid))}, 462 )) orelse return false; 463 defer row.deinit() catch {}; 464 const status = row.get([]const u8, 0); 465 const upstream = row.get([]const u8, 1); 466 // active if local is active AND upstream is active 467 const local_ok = std.mem.eql(u8, status, "active"); 468 const upstream_ok = std.mem.eql(u8, upstream, "active"); 469 return local_ok and upstream_ok; 470 } 471 472 // --- host management --- 473 474 pub const Host = struct { 475 id: u64, 476 hostname: []const u8, 477 status: []const u8, 478 last_seq: u64, 479 failed_attempts: u32, 480 account_limit: ?u64 = null, 481 }; 482 483 /// get or create a host row. returns {id, last_seq}. 484 pub fn getOrCreateHost(self: *DiskPersist, hostname: []const u8) !struct { id: u64, last_seq: u64 } { 485 _ = self.db.exec( 486 "INSERT INTO host (hostname) VALUES ($1) ON CONFLICT (hostname) DO NOTHING", 487 .{hostname}, 488 ) catch |err| { 489 log.warn("failed to create host {s}: {s}", .{ hostname, @errorName(err) }); 490 return err; 491 }; 492 493 var row = try self.db.rowUnsafe( 494 "SELECT id, last_seq FROM host WHERE hostname = $1", 495 .{hostname}, 496 ) orelse return error.HostCreationFailed; 497 defer row.deinit() catch {}; 498 return .{ 499 .id = @intCast(row.get(i64, 0)), 500 .last_seq = @intCast(row.get(i64, 1)), 501 }; 502 } 503 504 /// check if a host is banned or blocked by status 505 pub fn isHostBanned(self: *DiskPersist, hostname: []const u8) bool { 506 var row = self.db.rowUnsafe( 507 "SELECT status FROM host WHERE hostname = $1", 508 .{hostname}, 509 ) catch return false; 510 if (row) |*r| { 511 defer r.deinit() catch {}; 512 const status = r.get([]const u8, 0); 513 return std.mem.eql(u8, status, "banned") or std.mem.eql(u8, status, "blocked"); 514 } 515 return false; 516 } 517 518 /// update cursor position for a host 519 pub fn updateHostSeq(self: *DiskPersist, host_id: u64, seq: u64) !void { 520 _ = try self.db.exec( 521 "UPDATE host SET last_seq = $2, updated_at = now() WHERE id = $1", 522 .{ @as(i64, @intCast(host_id)), @as(i64, @intCast(seq)) }, 523 ); 524 } 525 526 /// look up host ID by hostname. returns null if not found. 527 pub fn getHostIdForHostname(self: *DiskPersist, hostname: []const u8) !?u64 { 528 var row = (try self.db.rowUnsafe( 529 "SELECT id FROM host WHERE hostname = $1", 530 .{hostname}, 531 )) orelse return null; 532 defer row.deinit() catch {}; 533 return @intCast(row.get(i64, 0)); 534 } 535 536 /// update host status (active, blocked, exhausted) 537 pub fn updateHostStatus(self: *DiskPersist, host_id: u64, status: []const u8) !void { 538 _ = try self.db.exec( 539 "UPDATE host SET status = $2, updated_at = now() WHERE id = $1", 540 .{ @as(i64, @intCast(host_id)), status }, 541 ); 542 } 543 544 /// list all active hosts 545 pub fn listActiveHosts(self: *DiskPersist, allocator: Allocator) ![]Host { 546 var hosts: std.ArrayListUnmanaged(Host) = .{}; 547 errdefer { 548 for (hosts.items) |h| { 549 allocator.free(h.hostname); 550 allocator.free(h.status); 551 } 552 hosts.deinit(allocator); 553 } 554 555 var result = try self.db.query( 556 "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host WHERE status = 'active' ORDER BY id ASC", 557 .{}, 558 ); 559 defer result.deinit(); 560 561 while (result.nextUnsafe() catch null) |row| { 562 try hosts.append(allocator, .{ 563 .id = @intCast(row.get(i64, 0)), 564 .hostname = try allocator.dupe(u8, row.get([]const u8, 1)), 565 .status = try allocator.dupe(u8, row.get([]const u8, 2)), 566 .last_seq = @intCast(row.get(i64, 3)), 567 .failed_attempts = @intCast(row.get(i32, 4)), 568 .account_limit = if (row.get(?i64, 5)) |v| @as(?u64, @intCast(v)) else null, 569 }); 570 } 571 572 return try hosts.toOwnedSlice(allocator); 573 } 574 575 /// list all hosts (any status) for admin view 576 pub fn listAllHosts(self: *DiskPersist, allocator: Allocator) ![]Host { 577 var hosts: std.ArrayListUnmanaged(Host) = .{}; 578 errdefer { 579 for (hosts.items) |h| { 580 allocator.free(h.hostname); 581 allocator.free(h.status); 582 } 583 hosts.deinit(allocator); 584 } 585 586 var result = try self.db.query( 587 "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host ORDER BY id ASC", 588 .{}, 589 ); 590 defer result.deinit(); 591 592 while (result.nextUnsafe() catch null) |row| { 593 try hosts.append(allocator, .{ 594 .id = @intCast(row.get(i64, 0)), 595 .hostname = try allocator.dupe(u8, row.get([]const u8, 1)), 596 .status = try allocator.dupe(u8, row.get([]const u8, 2)), 597 .last_seq = @intCast(row.get(i64, 3)), 598 .failed_attempts = @intCast(row.get(i32, 4)), 599 .account_limit = if (row.get(?i64, 5)) |v| @as(?u64, @intCast(v)) else null, 600 }); 601 } 602 603 return try hosts.toOwnedSlice(allocator); 604 } 605 606 /// increment failure count and return new value 607 pub fn incrementHostFailures(self: *DiskPersist, host_id: u64) !u32 { 608 _ = try self.db.exec( 609 "UPDATE host SET failed_attempts = failed_attempts + 1, updated_at = now() WHERE id = $1", 610 .{@as(i64, @intCast(host_id))}, 611 ); 612 var row = try self.db.rowUnsafe( 613 "SELECT failed_attempts FROM host WHERE id = $1", 614 .{@as(i64, @intCast(host_id))}, 615 ) orelse return 0; 616 defer row.deinit() catch {}; 617 return @intCast(row.get(i32, 0)); 618 } 619 620 /// check if a hostname (or any parent domain) is banned. 621 /// Go relay: domain_ban.go DomainIsBanned — suffix-based check. 622 pub fn isDomainBanned(self: *DiskPersist, hostname: []const u8) bool { 623 // check each suffix: "pds.host.example.com", "host.example.com", "example.com" 624 var offset: usize = 0; 625 while (offset < hostname.len) { 626 const suffix = hostname[offset..]; 627 var row = self.db.rowUnsafe( 628 "SELECT 1 FROM domain_ban WHERE domain = $1", 629 .{suffix}, 630 ) catch return false; 631 if (row) |*r| { 632 r.deinit() catch {}; 633 return true; 634 } 635 // advance past next dot 636 if (std.mem.indexOfScalarPos(u8, hostname, offset, '.')) |dot| { 637 offset = dot + 1; 638 } else break; 639 } 640 return false; 641 } 642 643 /// reset failure count (on successful connection) 644 pub fn resetHostFailures(self: *DiskPersist, host_id: u64) !void { 645 _ = try self.db.exec( 646 "UPDATE host SET failed_attempts = 0, updated_at = now() WHERE id = $1", 647 .{@as(i64, @intCast(host_id))}, 648 ); 649 } 650 651 /// persist an event. assigns a sequence number. returns the assigned seq. 652 /// the event is buffered and will be flushed to disk asynchronously. 653 pub fn persist(self: *DiskPersist, kind: EvtKind, uid: u64, payload: []const u8) !u64 { 654 // build the on-disk record: header + payload 655 const total_len = header_size + payload.len; 656 const data = try self.allocator.alloc(u8, total_len); 657 errdefer self.allocator.free(data); 658 659 // write header (seq filled in later under lock) 660 const header = EvtHeader{ 661 .flags = 0, 662 .kind = @intFromEnum(kind), 663 .len = @intCast(payload.len), 664 .uid = uid, 665 .seq = 0, // placeholder 666 }; 667 header.encode(data[0..header_size]); 668 @memcpy(data[header_size..], payload); 669 670 self.mutex.lock(); 671 defer self.mutex.unlock(); 672 673 // assign seq 674 const seq = self.cur_seq; 675 self.cur_seq += 1; 676 std.mem.writeInt(u64, data[20..28], seq, .little); 677 678 try self.evtbuf.append(self.allocator, .{ .data = data, .seq = seq }); 679 try self.outbuf.appendSlice(self.allocator, data); 680 self.event_counter += 1; 681 682 // flush if threshold hit 683 if (self.evtbuf.items.len >= default_flush_threshold) { 684 try self.flushLocked(); 685 } 686 687 return seq; 688 } 689 690 /// playback events with seq > since. calls cb for each event. 691 pub fn playback(self: *DiskPersist, since: u64, allocator: Allocator, entries: *std.ArrayListUnmanaged(PlaybackEntry)) !void { 692 self.mutex.lock(); 693 defer self.mutex.unlock(); 694 695 const since_i: i64 = @intCast(since); 696 697 // find the log file containing `since` 698 var start_files: std.ArrayListUnmanaged(LogFileRef) = .{}; 699 defer start_files.deinit(allocator); 700 701 if (since > 0) { 702 // find file whose seq_start is just before `since` 703 if (try self.db.rowUnsafe( 704 "SELECT id, path, seq_start FROM log_file_refs WHERE seq_start <= $1 ORDER BY seq_start DESC LIMIT 1", 705 .{since_i}, 706 )) |row| { 707 var r = row; 708 defer r.deinit() catch {}; 709 try start_files.append(allocator, .{ 710 .path = try allocator.dupe(u8, r.get([]const u8, 1)), 711 .seq_start = @intCast(r.get(i64, 2)), 712 }); 713 } 714 } 715 716 // find all subsequent files 717 { 718 var result = try self.db.query( 719 "SELECT id, path, seq_start FROM log_file_refs WHERE seq_start > $1 ORDER BY seq_start ASC", 720 .{since_i}, 721 ); 722 defer result.deinit(); 723 while (result.nextUnsafe() catch null) |r| { 724 try start_files.append(allocator, .{ 725 .path = try allocator.dupe(u8, r.get([]const u8, 1)), 726 .seq_start = @intCast(r.get(i64, 2)), 727 }); 728 } 729 } 730 731 defer for (start_files.items) |f| allocator.free(f.path); 732 733 // read events from each file 734 for (start_files.items) |ref| { 735 var file = self.dir.openFile(ref.path, .{}) catch continue; 736 defer file.close(); 737 try readEventsFrom(allocator, file, since, entries); 738 } 739 } 740 741 /// last assigned sequence number, or null if empty 742 pub fn lastSeq(self: *DiskPersist) ?u64 { 743 if (self.cur_seq <= 1) return null; 744 return self.cur_seq - 1; 745 } 746 747 /// garbage-collect log files older than the retention period 748 pub fn gc(self: *DiskPersist) !void { 749 self.mutex.lock(); 750 defer self.mutex.unlock(); 751 752 const cutoff_interval = try std.fmt.allocPrint(self.allocator, "{d} hours", .{self.retention_hours}); 753 defer self.allocator.free(cutoff_interval); 754 755 // find expired refs 756 var expired: std.ArrayListUnmanaged(GcRef) = .{}; 757 defer { 758 for (expired.items) |e| self.allocator.free(e.path); 759 expired.deinit(self.allocator); 760 } 761 762 { 763 var result = try self.db.query( 764 "SELECT id, path FROM log_file_refs WHERE created_at < now() - $1::interval", 765 .{cutoff_interval}, 766 ); 767 defer result.deinit(); 768 while (result.nextUnsafe() catch null) |r| { 769 try expired.append(self.allocator, .{ 770 .id = r.get(i64, 0), 771 .path = try self.allocator.dupe(u8, r.get([]const u8, 1)), 772 }); 773 } 774 } 775 776 for (expired.items) |ref| { 777 // skip current file 778 if (self.current_file_path) |cur| { 779 if (std.mem.eql(u8, ref.path, cur)) continue; 780 } 781 782 // delete db record first (prevents playback from finding it) 783 _ = self.db.exec("DELETE FROM log_file_refs WHERE id = $1", .{ref.id}) catch |err| { 784 log.warn("gc: failed to delete db record {d}: {s}", .{ ref.id, @errorName(err) }); 785 continue; 786 }; 787 788 // delete file 789 self.dir.deleteFile(ref.path) catch |err| { 790 log.warn("gc: failed to delete {s}: {s}", .{ ref.path, @errorName(err) }); 791 }; 792 } 793 794 if (expired.items.len > 0) { 795 log.info("gc: removed {d} expired log files", .{expired.items.len}); 796 } 797 } 798 799 /// take down all events for a user (set flag + zero payload) 800 pub fn takeDownUser(self: *DiskPersist, uid: u64) !void { 801 self.mutex.lock(); 802 defer self.mutex.unlock(); 803 804 // iterate all log files 805 var refs: std.ArrayListUnmanaged([]const u8) = .{}; 806 defer { 807 for (refs.items) |p| self.allocator.free(p); 808 refs.deinit(self.allocator); 809 } 810 811 { 812 var result = try self.db.query("SELECT path FROM log_file_refs ORDER BY seq_start DESC", .{}); 813 defer result.deinit(); 814 while (result.nextUnsafe() catch null) |r| { 815 try refs.append(self.allocator, try self.allocator.dupe(u8, r.get([]const u8, 0))); 816 } 817 } 818 819 for (refs.items) |path| { 820 var file = self.dir.openFile(path, .{ .mode = .read_write }) catch continue; 821 defer file.close(); 822 mutateUserEventsInFile(file, uid) catch |err| { 823 log.warn("takedown: failed to process {s}: {s}", .{ path, @errorName(err) }); 824 }; 825 } 826 } 827 828 // --- internals --- 829 830 fn resumeLog(self: *DiskPersist) !void { 831 // find most recent log file 832 if (try self.db.rowUnsafe( 833 "SELECT id, path, seq_start FROM log_file_refs ORDER BY seq_start DESC LIMIT 1", 834 .{}, 835 )) |row| { 836 var r = row; 837 defer r.deinit() catch {}; 838 const path = r.get([]const u8, 1); 839 const seq_start: u64 = @intCast(r.get(i64, 2)); 840 841 var file = self.dir.openFile(path, .{ .mode = .read_write }) catch { 842 // file missing, start fresh 843 try self.initLogFile(); 844 return; 845 }; 846 847 // scan for last seq 848 const last_seq = scanForLastSeq(file) catch { 849 file.close(); 850 try self.initLogFile(); 851 return; 852 }; 853 854 if (last_seq) |ls| { 855 self.cur_seq = ls + 1; 856 log.info("recovered seq from disk: last_seq={d}", .{ls}); 857 } else { 858 self.cur_seq = if (seq_start > 0) seq_start else 1; 859 } 860 861 // seek to end for appending 862 const stat = try file.stat(); 863 try file.seekTo(stat.size); 864 865 self.current_file = file; 866 self.current_file_path = try self.allocator.dupe(u8, path); 867 } else { 868 try self.initLogFile(); 869 } 870 } 871 872 fn initLogFile(self: *DiskPersist) !void { 873 try self.createLogFile(self.cur_seq); 874 } 875 876 fn createLogFile(self: *DiskPersist, start_seq: u64) !void { 877 var name_buf: [64]u8 = undefined; 878 const name = std.fmt.bufPrint(&name_buf, "evts-{d}", .{start_seq}) catch unreachable; 879 880 if (self.current_file) |f| f.close(); 881 if (self.current_file_path) |p| self.allocator.free(p); 882 883 self.current_file = try self.dir.createFile(name, .{ .truncate = false }); 884 self.current_file_path = try self.allocator.dupe(u8, name); 885 886 // register in Postgres 887 _ = try self.db.exec( 888 "INSERT INTO log_file_refs (path, seq_start) VALUES ($1, $2)", 889 .{ name, @as(i64, @intCast(start_seq)) }, 890 ); 891 892 self.event_counter = 0; 893 } 894 895 fn flushLocked(self: *DiskPersist) !void { 896 if (self.evtbuf.items.len == 0) return; 897 898 // write buffered bytes to current file 899 const file = self.current_file orelse return; 900 file.writeAll(self.outbuf.items) catch |err| { 901 log.err("flush: write failed: {s}", .{@errorName(err)}); 902 return err; 903 }; 904 905 // clear buffers 906 self.outbuf.clearRetainingCapacity(); 907 908 // free job data 909 for (self.evtbuf.items) |job| { 910 self.allocator.free(job.data); 911 } 912 self.evtbuf.clearRetainingCapacity(); 913 914 // check if we need to rotate 915 if (self.event_counter >= self.events_per_file) { 916 self.createLogFile(self.cur_seq) catch |err| { 917 log.err("flush: log rotation failed: {s}", .{@errorName(err)}); 918 }; 919 } 920 } 921 922 fn flushLoop(self: *DiskPersist) void { 923 while (self.alive.load(.acquire)) { 924 // wait for flush interval or signal 925 { 926 self.mutex.lock(); 927 defer self.mutex.unlock(); 928 self.flush_cond.timedWait(&self.mutex, default_flush_interval_ms * std.time.ns_per_ms) catch {}; 929 self.flushLocked() catch {}; 930 } 931 } 932 } 933}; 934 935// --- playback types --- 936 937pub const PlaybackEntry = struct { 938 seq: u64, 939 kind: u32, 940 uid: u64, 941 data: []const u8, // payload only (owned by caller) 942}; 943 944const LogFileRef = struct { 945 path: []const u8, 946 seq_start: u64, 947}; 948 949const GcRef = struct { 950 id: i64, 951 path: []const u8, 952}; 953 954// --- file-level operations --- 955 956fn readEventsFrom(allocator: Allocator, file: std.fs.File, since: u64, result: *std.ArrayListUnmanaged(PlaybackEntry)) !void { 957 const file_size = (try file.stat()).size; 958 959 // if since > 0, scan to the right position 960 if (since > 0) { 961 seekToSeq(file, since, file_size) catch return; 962 } 963 964 // read events 965 while (true) { 966 var hdr_buf: [header_size]u8 = undefined; 967 const n = file.readAll(&hdr_buf) catch break; 968 if (n < header_size) break; 969 970 const hdr = EvtHeader.decode(&hdr_buf); 971 972 // skip taken down / rebased events 973 if (hdr.flags & (EvtFlags.takedown | EvtFlags.rebased) != 0) { 974 file.seekBy(@intCast(hdr.len)) catch break; 975 continue; 976 } 977 978 if (hdr.seq <= since) { 979 file.seekBy(@intCast(hdr.len)) catch break; 980 continue; 981 } 982 983 // read payload 984 const data = allocator.alloc(u8, hdr.len) catch break; 985 const read_n = file.readAll(data) catch { 986 allocator.free(data); 987 break; 988 }; 989 if (read_n < hdr.len) { 990 allocator.free(data); 991 break; 992 } 993 994 result.append(allocator, .{ 995 .seq = hdr.seq, 996 .kind = hdr.kind, 997 .uid = hdr.uid, 998 .data = data, 999 }) catch { 1000 allocator.free(data); 1001 break; 1002 }; 1003 } 1004} 1005 1006/// scan file headers to seek to the first event with seq > target 1007fn seekToSeq(file: std.fs.File, target: u64, file_size: u64) !void { 1008 try file.seekTo(0); 1009 var pos: u64 = 0; 1010 while (pos + header_size <= file_size) { 1011 var hdr_buf: [header_size]u8 = undefined; 1012 const n = try file.readAll(&hdr_buf); 1013 if (n < header_size) break; 1014 1015 const hdr = EvtHeader.decode(&hdr_buf); 1016 if (hdr.seq > target) { 1017 // seek back to start of this header 1018 try file.seekTo(pos); 1019 return; 1020 } 1021 pos += header_size + hdr.len; 1022 try file.seekTo(pos); 1023 } 1024} 1025 1026/// scan a file for the last sequence number 1027fn scanForLastSeq(file: std.fs.File) !?u64 { 1028 try file.seekTo(0); 1029 const file_size = (try file.stat()).size; 1030 1031 var last_seq: ?u64 = null; 1032 var pos: u64 = 0; 1033 while (pos + header_size <= file_size) { 1034 var hdr_buf: [header_size]u8 = undefined; 1035 try file.seekTo(pos); 1036 const n = try file.readAll(&hdr_buf); 1037 if (n < header_size) break; 1038 1039 const hdr = EvtHeader.decode(&hdr_buf); 1040 last_seq = hdr.seq; 1041 pos += header_size + hdr.len; 1042 } 1043 return last_seq; 1044} 1045 1046/// set takedown flag + zero payload for all events belonging to uid 1047fn mutateUserEventsInFile(file: std.fs.File, uid: u64) !void { 1048 const file_size = (try file.stat()).size; 1049 var pos: u64 = 0; 1050 1051 while (pos + header_size <= file_size) { 1052 var hdr_buf: [header_size]u8 = undefined; 1053 try file.seekTo(pos); 1054 const n = try file.readAll(&hdr_buf); 1055 if (n < header_size) break; 1056 1057 const hdr = EvtHeader.decode(&hdr_buf); 1058 1059 if (hdr.uid == uid and (hdr.flags & EvtFlags.takedown) == 0) { 1060 // set takedown flag 1061 const new_flags = hdr.flags | EvtFlags.takedown; 1062 var flags_buf: [4]u8 = undefined; 1063 std.mem.writeInt(u32, &flags_buf, new_flags, .little); 1064 try file.seekTo(pos); 1065 try file.writeAll(&flags_buf); 1066 1067 // zero the payload 1068 const payload_start = pos + header_size; 1069 try file.seekTo(payload_start); 1070 var zeros: [4096]u8 = [_]u8{0} ** 4096; 1071 var remaining: u64 = hdr.len; 1072 while (remaining > 0) { 1073 const chunk = @min(remaining, zeros.len); 1074 try file.writeAll(zeros[0..chunk]); 1075 remaining -= chunk; 1076 } 1077 } 1078 1079 pos += header_size + hdr.len; 1080 } 1081} 1082 1083// === tests === 1084 1085test "header encode/decode roundtrip" { 1086 const hdr = EvtHeader{ 1087 .flags = 0, 1088 .kind = @intFromEnum(EvtKind.commit), 1089 .len = 1234, 1090 .uid = 42, 1091 .seq = 99999, 1092 }; 1093 var buf: [header_size]u8 = undefined; 1094 hdr.encode(&buf); 1095 const decoded = EvtHeader.decode(&buf); 1096 try std.testing.expectEqual(hdr.flags, decoded.flags); 1097 try std.testing.expectEqual(hdr.kind, decoded.kind); 1098 try std.testing.expectEqual(hdr.len, decoded.len); 1099 try std.testing.expectEqual(hdr.uid, decoded.uid); 1100 try std.testing.expectEqual(hdr.seq, decoded.seq); 1101} 1102 1103test "header is little-endian" { 1104 const hdr = EvtHeader{ .flags = 0, .kind = 1, .len = 256, .uid = 0, .seq = 1 }; 1105 var buf: [header_size]u8 = undefined; 1106 hdr.encode(&buf); 1107 // len=256 in LE is 0x00 0x01 0x00 0x00 1108 try std.testing.expectEqual(@as(u8, 0x00), buf[8]); 1109 try std.testing.expectEqual(@as(u8, 0x01), buf[9]); 1110} 1111 1112fn requireDatabaseUrl() ![]const u8 { 1113 return std.posix.getenv("DATABASE_URL") orelse return error.SkipZigTest; 1114} 1115 1116test "persist and playback" { 1117 const database_url = try requireDatabaseUrl(); 1118 1119 var tmp = std.testing.tmpDir(.{}); 1120 defer tmp.cleanup(); 1121 1122 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 1123 defer std.testing.allocator.free(dir_path); 1124 1125 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 1126 defer dp.deinit(); 1127 1128 // persist some events (sync flush, no background thread) 1129 const seq1 = try dp.persist(.commit, 1, "payload-one"); 1130 const seq2 = try dp.persist(.identity, 2, "payload-two"); 1131 const seq3 = try dp.persist(.commit, 1, "payload-three"); 1132 1133 try std.testing.expectEqual(@as(u64, 1), seq1); 1134 try std.testing.expectEqual(@as(u64, 2), seq2); 1135 try std.testing.expectEqual(@as(u64, 3), seq3); 1136 1137 // flush manually 1138 { 1139 dp.mutex.lock(); 1140 defer dp.mutex.unlock(); 1141 try dp.flushLocked(); 1142 } 1143 1144 // playback from cursor=0 → all events 1145 var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{}; 1146 defer { 1147 for (entries.items) |e| std.testing.allocator.free(e.data); 1148 entries.deinit(std.testing.allocator); 1149 } 1150 try dp.playback(0, std.testing.allocator, &entries); 1151 1152 try std.testing.expectEqual(@as(usize, 3), entries.items.len); 1153 try std.testing.expectEqualStrings("payload-one", entries.items[0].data); 1154 try std.testing.expectEqual(@as(u64, 1), entries.items[0].seq); 1155 try std.testing.expectEqualStrings("payload-two", entries.items[1].data); 1156 try std.testing.expectEqualStrings("payload-three", entries.items[2].data); 1157} 1158 1159test "playback with cursor" { 1160 const database_url = try requireDatabaseUrl(); 1161 1162 var tmp = std.testing.tmpDir(.{}); 1163 defer tmp.cleanup(); 1164 1165 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 1166 defer std.testing.allocator.free(dir_path); 1167 1168 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 1169 defer dp.deinit(); 1170 1171 _ = try dp.persist(.commit, 1, "a"); 1172 _ = try dp.persist(.commit, 2, "b"); 1173 _ = try dp.persist(.commit, 3, "c"); 1174 { 1175 dp.mutex.lock(); 1176 defer dp.mutex.unlock(); 1177 try dp.flushLocked(); 1178 } 1179 1180 // playback from cursor=2 → only seq 3 1181 var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{}; 1182 defer { 1183 for (entries.items) |e| std.testing.allocator.free(e.data); 1184 entries.deinit(std.testing.allocator); 1185 } 1186 try dp.playback(2, std.testing.allocator, &entries); 1187 1188 try std.testing.expectEqual(@as(usize, 1), entries.items.len); 1189 try std.testing.expectEqual(@as(u64, 3), entries.items[0].seq); 1190 try std.testing.expectEqualStrings("c", entries.items[0].data); 1191} 1192 1193test "seq recovery after reinit" { 1194 const database_url = try requireDatabaseUrl(); 1195 1196 var tmp = std.testing.tmpDir(.{}); 1197 defer tmp.cleanup(); 1198 1199 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 1200 defer std.testing.allocator.free(dir_path); 1201 1202 // write some events 1203 { 1204 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 1205 defer dp.deinit(); 1206 _ = try dp.persist(.commit, 1, "x"); 1207 _ = try dp.persist(.commit, 2, "y"); 1208 _ = try dp.persist(.account, 3, "z"); 1209 dp.mutex.lock(); 1210 defer dp.mutex.unlock(); 1211 try dp.flushLocked(); 1212 } 1213 1214 // reinit — should recover seq 1215 { 1216 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 1217 defer dp.deinit(); 1218 try std.testing.expectEqual(@as(u64, 3), dp.lastSeq().?); 1219 const seq4 = try dp.persist(.commit, 1, "w"); 1220 try std.testing.expectEqual(@as(u64, 4), seq4); 1221 } 1222} 1223 1224test "takedown zeros payload" { 1225 const database_url = try requireDatabaseUrl(); 1226 1227 var tmp = std.testing.tmpDir(.{}); 1228 defer tmp.cleanup(); 1229 1230 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 1231 defer std.testing.allocator.free(dir_path); 1232 1233 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 1234 defer dp.deinit(); 1235 1236 _ = try dp.persist(.commit, 42, "secret-data"); 1237 _ = try dp.persist(.commit, 99, "other-data"); 1238 { 1239 dp.mutex.lock(); 1240 defer dp.mutex.unlock(); 1241 try dp.flushLocked(); 1242 } 1243 1244 // take down user 42 1245 try dp.takeDownUser(42); 1246 1247 // playback should skip user 42's events 1248 var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{}; 1249 defer { 1250 for (entries.items) |e| std.testing.allocator.free(e.data); 1251 entries.deinit(std.testing.allocator); 1252 } 1253 try dp.playback(0, std.testing.allocator, &entries); 1254 1255 try std.testing.expectEqual(@as(usize, 1), entries.items.len); 1256 try std.testing.expectEqualStrings("other-data", entries.items[0].data); 1257} 1258 1259test "uidForDid assigns and caches UIDs" { 1260 const database_url = try requireDatabaseUrl(); 1261 1262 var tmp = std.testing.tmpDir(.{}); 1263 defer tmp.cleanup(); 1264 1265 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 1266 defer std.testing.allocator.free(dir_path); 1267 1268 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 1269 defer dp.deinit(); 1270 1271 // first call creates the account 1272 const uid1 = try dp.uidForDid("did:plc:alice"); 1273 try std.testing.expect(uid1 > 0); 1274 1275 // same DID returns same UID (from cache) 1276 const uid1_again = try dp.uidForDid("did:plc:alice"); 1277 try std.testing.expectEqual(uid1, uid1_again); 1278 1279 // different DID gets a different UID 1280 const uid2 = try dp.uidForDid("did:plc:bob"); 1281 try std.testing.expect(uid2 > 0); 1282 try std.testing.expect(uid1 != uid2); 1283} 1284 1285test "uidForDid survives reinit" { 1286 const database_url = try requireDatabaseUrl(); 1287 1288 var tmp = std.testing.tmpDir(.{}); 1289 defer tmp.cleanup(); 1290 1291 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 1292 defer std.testing.allocator.free(dir_path); 1293 1294 var uid1: u64 = undefined; 1295 { 1296 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 1297 defer dp.deinit(); 1298 uid1 = try dp.uidForDid("did:plc:carol"); 1299 } 1300 1301 // reinit — UID should be the same from database 1302 { 1303 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 1304 defer dp.deinit(); 1305 const uid1_again = try dp.uidForDid("did:plc:carol"); 1306 try std.testing.expectEqual(uid1, uid1_again); 1307 } 1308} 1309 1310test "takedown with real UIDs" { 1311 const database_url = try requireDatabaseUrl(); 1312 1313 var tmp = std.testing.tmpDir(.{}); 1314 defer tmp.cleanup(); 1315 1316 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); 1317 defer std.testing.allocator.free(dir_path); 1318 1319 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); 1320 defer dp.deinit(); 1321 1322 const alice_uid = try dp.uidForDid("did:plc:alice"); 1323 const bob_uid = try dp.uidForDid("did:plc:bob"); 1324 1325 _ = try dp.persist(.commit, alice_uid, "alice-post"); 1326 _ = try dp.persist(.commit, bob_uid, "bob-post"); 1327 _ = try dp.persist(.commit, alice_uid, "alice-post-2"); 1328 { 1329 dp.mutex.lock(); 1330 defer dp.mutex.unlock(); 1331 try dp.flushLocked(); 1332 } 1333 1334 // take down alice 1335 try dp.takeDownUser(alice_uid); 1336 1337 // playback should only have bob's event 1338 var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{}; 1339 defer { 1340 for (entries.items) |e| std.testing.allocator.free(e.data); 1341 entries.deinit(std.testing.allocator); 1342 } 1343 try dp.playback(0, std.testing.allocator, &entries); 1344 1345 try std.testing.expectEqual(@as(usize, 1), entries.items.len); 1346 try std.testing.expectEqualStrings("bob-post", entries.items[0].data); 1347 try std.testing.expectEqual(bob_uid, entries.items[0].uid); 1348} 1349 1350fn tmpDirRealPath(allocator: Allocator, tmp: std.testing.TmpDir) ![]const u8 { 1351 var buf: [std.fs.max_path_bytes]u8 = undefined; 1352 const real = try tmp.dir.realpath(".", &buf); 1353 return try allocator.dupe(u8, real); 1354}