atproto relay implementation in zig
zlay.waow.tech
1//! disk persistence matching indigo's diskpersist format
2//!
3//! append-only log files with relay-assigned sequence numbers.
4//! Postgres metadata index for fast cursor→file lookup.
5//!
6//! on-disk entry format (28-byte LE header + CBOR payload):
7//! [4B flags LE] [4B kind LE] [4B payload_len LE] [8B uid LE] [8B seq LE] [payload]
8//!
9//! file naming: evts-{startSeq} (rotated every events_per_file events)
10//!
11//! see: indigo cmd/relay/stream/persist/diskpersist/diskpersist.go
12
13const std = @import("std");
14const pg = @import("pg");
15const lru = @import("lru.zig");
16
17const Allocator = std.mem.Allocator;
18const log = std.log.scoped(.relay);
19
20// --- constants matching indigo ---
21
22pub const header_size: usize = 28; // 4 + 4 + 4 + 8 + 8
23
24pub const EvtKind = enum(u32) {
25 commit = 1,
26 handle = 2, // deprecated
27 tombstone = 3, // deprecated
28 identity = 4,
29 account = 5,
30 sync = 6,
31};
32
33pub const EvtFlags = struct {
34 pub const takedown: u32 = 1;
35 pub const rebased: u32 = 2;
36};
37
38const default_events_per_file: u32 = 10_000;
39const default_flush_interval_ms: u64 = 100;
40const default_flush_threshold: usize = 400;
41
42// --- header ---
43
44pub const EvtHeader = struct {
45 flags: u32,
46 kind: u32,
47 len: u32, // payload length (not including header)
48 uid: u64,
49 seq: u64,
50
51 pub fn encode(self: EvtHeader, buf: *[header_size]u8) void {
52 std.mem.writeInt(u32, buf[0..4], self.flags, .little);
53 std.mem.writeInt(u32, buf[4..8], self.kind, .little);
54 std.mem.writeInt(u32, buf[8..12], self.len, .little);
55 std.mem.writeInt(u64, buf[12..20], self.uid, .little);
56 std.mem.writeInt(u64, buf[20..28], self.seq, .little);
57 }
58
59 pub fn decode(buf: *const [header_size]u8) EvtHeader {
60 return .{
61 .flags = std.mem.readInt(u32, buf[0..4], .little),
62 .kind = std.mem.readInt(u32, buf[4..8], .little),
63 .len = std.mem.readInt(u32, buf[8..12], .little),
64 .uid = std.mem.readInt(u64, buf[12..20], .little),
65 .seq = std.mem.readInt(u64, buf[20..28], .little),
66 };
67 }
68};
69
70// --- persist job (write buffer entry) ---
71
72const PersistJob = struct {
73 data: []u8, // header + payload, owned
74 seq: u64, // assigned seq (for broadcast ordering)
75};
76
77// --- disk persistence ---
78
79pub const DiskPersist = struct {
80 allocator: Allocator,
81 dir_path: []const u8,
82 dir: std.fs.Dir,
83 db: *pg.Pool,
84 current_file: ?std.fs.File = null,
85 current_file_path: ?[]const u8 = null,
86
87 // sequence state
88 cur_seq: u64 = 1,
89 event_counter: u64 = 0,
90
91 // config
92 events_per_file: u32 = default_events_per_file,
93 retention_hours: u64 = 72, // 3 days
94
95 // DID → UID cache (matches indigo's bidirectional ARC cache)
96 did_cache: lru.LruCache(u64),
97
98 // write buffer (flushed periodically or when threshold hit)
99 outbuf: std.ArrayListUnmanaged(u8) = .{},
100 evtbuf: std.ArrayListUnmanaged(PersistJob) = .{},
101 mutex: std.Thread.Mutex = .{},
102
103 // flush thread
104 flush_thread: ?std.Thread = null,
105 alive: std.atomic.Value(bool) = .{ .raw = true },
106 flush_cond: std.Thread.Condition = .{},
107
108 /// current evtbuf entry count (for metrics — non-blocking, returns 0 if lock is contended)
109 pub fn evtbufLen(self: *DiskPersist) usize {
110 if (!self.mutex.tryLock()) return 0;
111 defer self.mutex.unlock();
112 return self.evtbuf.items.len;
113 }
114
115 /// current DID cache entry count (for metrics)
116 pub fn didCacheLen(self: *DiskPersist) u32 {
117 return self.did_cache.count();
118 }
119
120 /// DID cache hashmap backing capacity (for memory attribution)
121 pub fn didCacheMapCap(self: *DiskPersist) u32 {
122 return self.did_cache.mapCapacity();
123 }
124
125 /// evtbuf allocated capacity in jobs (for memory attribution — non-blocking)
126 pub fn evtbufCap(self: *DiskPersist) usize {
127 if (!self.mutex.tryLock()) return 0;
128 defer self.mutex.unlock();
129 return self.evtbuf.capacity;
130 }
131
132 /// outbuf allocated capacity in bytes (for memory attribution — non-blocking)
133 pub fn outbufCap(self: *DiskPersist) usize {
134 if (!self.mutex.tryLock()) return 0;
135 defer self.mutex.unlock();
136 return self.outbuf.capacity;
137 }
138
139 pub fn init(allocator: Allocator, dir_path: []const u8, database_url: []const u8) !DiskPersist {
140 // ensure directory exists
141 std.fs.cwd().makePath(dir_path) catch |err| switch (err) {
142 error.PathAlreadyExists => {},
143 else => return err,
144 };
145
146 var dir = try std.fs.cwd().openDir(dir_path, .{ .iterate = true });
147 errdefer dir.close();
148
149 // connect to Postgres
150 const uri = std.Uri.parse(database_url) catch return error.InvalidDatabaseUrl;
151 const pool = try pg.Pool.initUri(allocator, uri, .{ .size = 5 });
152 errdefer pool.deinit();
153
154 // create tables (matching indigo's Go relay schema)
155 _ = try pool.exec(
156 \\CREATE TABLE IF NOT EXISTS log_file_refs (
157 \\ id BIGSERIAL PRIMARY KEY,
158 \\ path TEXT NOT NULL,
159 \\ archived BOOLEAN NOT NULL DEFAULT false,
160 \\ seq_start BIGINT NOT NULL,
161 \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now()
162 \\)
163 , .{});
164
165 _ = try pool.exec(
166 \\CREATE TABLE IF NOT EXISTS account (
167 \\ uid BIGSERIAL PRIMARY KEY,
168 \\ did TEXT NOT NULL UNIQUE,
169 \\ host_id BIGINT NOT NULL DEFAULT 0,
170 \\ status TEXT NOT NULL DEFAULT 'active',
171 \\ upstream_status TEXT NOT NULL DEFAULT 'active',
172 \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now()
173 \\)
174 , .{});
175
176 // migration: add columns if they don't exist (for existing deployments)
177 _ = pool.exec("ALTER TABLE account ADD COLUMN IF NOT EXISTS host_id BIGINT NOT NULL DEFAULT 0", .{}) catch {};
178 _ = pool.exec("ALTER TABLE account ADD COLUMN IF NOT EXISTS upstream_status TEXT NOT NULL DEFAULT 'active'", .{}) catch {};
179
180 _ = try pool.exec(
181 \\CREATE TABLE IF NOT EXISTS account_repo (
182 \\ uid BIGINT PRIMARY KEY REFERENCES account(uid),
183 \\ rev TEXT NOT NULL,
184 \\ commit_data_cid TEXT NOT NULL
185 \\)
186 , .{});
187
188 _ = try pool.exec(
189 \\CREATE TABLE IF NOT EXISTS domain_ban (
190 \\ id BIGSERIAL PRIMARY KEY,
191 \\ domain TEXT NOT NULL UNIQUE,
192 \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now()
193 \\)
194 , .{});
195
196 _ = try pool.exec(
197 \\CREATE TABLE IF NOT EXISTS host (
198 \\ id BIGSERIAL PRIMARY KEY,
199 \\ hostname TEXT NOT NULL UNIQUE,
200 \\ status TEXT NOT NULL DEFAULT 'active',
201 \\ last_seq BIGINT NOT NULL DEFAULT 0,
202 \\ failed_attempts INTEGER NOT NULL DEFAULT 0,
203 \\ account_limit BIGINT,
204 \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
205 \\ updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
206 \\)
207 , .{});
208
209 // migration: add account_limit column to existing host tables
210 _ = try pool.exec("ALTER TABLE host ADD COLUMN IF NOT EXISTS account_limit BIGINT", .{});
211
212 _ = try pool.exec(
213 \\CREATE TABLE IF NOT EXISTS backfill_progress (
214 \\ collection TEXT NOT NULL,
215 \\ source TEXT NOT NULL,
216 \\ cursor TEXT NOT NULL DEFAULT '',
217 \\ imported_count BIGINT NOT NULL DEFAULT 0,
218 \\ completed_at TIMESTAMPTZ,
219 \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
220 \\ PRIMARY KEY (collection, source)
221 \\)
222 , .{});
223
224 // migrate: old schema had collection as sole PK — add source to composite PK
225 _ = pool.exec(
226 \\DO $$ BEGIN
227 \\ IF EXISTS (
228 \\ SELECT 1 FROM pg_constraint
229 \\ WHERE conname = 'backfill_progress_pkey'
230 \\ AND conrelid = 'backfill_progress'::regclass
231 \\ AND array_length(conkey, 1) = 1
232 \\ ) THEN
233 \\ ALTER TABLE backfill_progress DROP CONSTRAINT backfill_progress_pkey;
234 \\ ALTER TABLE backfill_progress ADD PRIMARY KEY (collection, source);
235 \\ END IF;
236 \\END $$
237 , .{}) catch {};
238
239 var self = DiskPersist{
240 .allocator = allocator,
241 .dir_path = try allocator.dupe(u8, dir_path),
242 .dir = dir,
243 .db = pool,
244 .did_cache = lru.LruCache(u64).init(allocator, 500_000),
245 };
246
247 // recover from existing log files
248 try self.resumeLog();
249
250 return self;
251 }
252
253 pub fn deinit(self: *DiskPersist) void {
254 // stop flush thread
255 self.alive.store(false, .release);
256 self.flush_cond.signal();
257 if (self.flush_thread) |t| t.join();
258
259 // flush remaining
260 self.mutex.lock();
261 self.flushLocked() catch {};
262 self.mutex.unlock();
263
264 // free write buffer
265 for (self.evtbuf.items) |job| self.allocator.free(job.data);
266 self.evtbuf.deinit(self.allocator);
267 self.outbuf.deinit(self.allocator);
268
269 self.did_cache.deinit();
270
271 if (self.current_file) |f| f.close();
272 if (self.current_file_path) |p| self.allocator.free(p);
273 self.dir.close();
274 self.db.deinit();
275 self.allocator.free(self.dir_path);
276 }
277
278 /// start the background flush thread
279 pub fn start(self: *DiskPersist) !void {
280 self.flush_thread = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, flushLoop, .{self});
281 }
282
283 pub const UidResult = struct {
284 uid: u64,
285 host_changed: bool = false,
286 is_new: bool = false,
287 };
288
289 /// resolve a DID to a numeric UID, checking host association.
290 /// does NOT set host_id — caller must verify authority first via DID doc
291 /// resolution, then call setAccountHostId on success.
292 /// returns is_new=true when no host_id is set yet (first encounter),
293 /// host_changed=true when host_id differs from stored value.
294 pub fn uidForDidFromHost(self: *DiskPersist, did: []const u8, host_id: u64) !UidResult {
295 const uid = try self.uidForDid(did);
296 if (host_id > 0) {
297 const current_host = self.getAccountHostId(uid) catch 0;
298 if (current_host == 0) {
299 return .{ .uid = uid, .is_new = true };
300 } else if (current_host != host_id) {
301 log.info("account {s} (uid={d}) host mismatch: current={d} new={d}", .{ did, uid, current_host, host_id });
302 return .{ .uid = uid, .host_changed = true };
303 }
304 }
305 return .{ .uid = uid };
306 }
307
308 /// resolve a DID to a numeric UID. creates a new account row on first encounter.
309 /// matches indigo's Relay.DidToUid → Account.UID mapping.
310 pub fn uidForDid(self: *DiskPersist, did: []const u8) !u64 {
311 // fast path: check in-memory cache
312 if (self.did_cache.get(did)) |uid| return uid;
313
314 // check database
315 if (try self.db.rowUnsafe(
316 "SELECT uid FROM account WHERE did = $1",
317 .{did},
318 )) |row| {
319 var r = row;
320 defer r.deinit() catch {};
321 const uid: u64 = @intCast(r.get(i64, 0));
322 self.didCachePut(did, uid);
323 return uid;
324 }
325
326 // create new account row (ignore if already exists from concurrent insert)
327 _ = self.db.exec(
328 "INSERT INTO account (did) VALUES ($1) ON CONFLICT (did) DO NOTHING",
329 .{did},
330 ) catch |err| {
331 log.warn("failed to create account for {s}: {s}", .{ did, @errorName(err) });
332 return err;
333 };
334
335 // read back the UID (whether we just created it or it already existed)
336 var row = try self.db.rowUnsafe(
337 "SELECT uid FROM account WHERE did = $1",
338 .{did},
339 ) orelse return error.AccountCreationFailed;
340 defer row.deinit() catch {};
341 const uid: u64 = @intCast(row.get(i64, 0));
342
343 self.didCachePut(did, uid);
344 return uid;
345 }
346
347 /// insert into did_cache (LRU handles eviction at capacity).
348 fn didCachePut(self: *DiskPersist, did: []const u8, uid: u64) void {
349 self.did_cache.put(did, uid) catch {};
350 }
351
352 /// per-DID sync state for chain tracking
353 pub const AccountState = struct {
354 rev: []const u8,
355 data_cid: []const u8,
356 };
357
358 /// get stored sync state for a user (from account_repo table)
359 pub fn getAccountState(self: *DiskPersist, uid: u64, allocator: Allocator) !?AccountState {
360 var row = (try self.db.rowUnsafe(
361 "SELECT rev, commit_data_cid FROM account_repo WHERE uid = $1",
362 .{@as(i64, @intCast(uid))},
363 )) orelse return null;
364 defer row.deinit() catch {};
365 const rev = row.get([]const u8, 0);
366 const data_cid = row.get([]const u8, 1);
367 if (rev.len == 0 or data_cid.len == 0) return null;
368 return .{
369 .rev = try allocator.dupe(u8, rev),
370 .data_cid = try allocator.dupe(u8, data_cid),
371 };
372 }
373
374 /// update stored sync state after a verified commit (conditional upsert into account_repo).
375 /// uses WHERE clause to ensure rev only moves forward, preventing race conditions
376 /// when concurrent workers process commits for the same DID.
377 /// returns true if the state was updated, false if a newer rev was already stored.
378 pub fn updateAccountState(self: *DiskPersist, uid: u64, rev: []const u8, data_cid: []const u8) !bool {
379 const rows_affected = try self.db.exec(
380 "INSERT INTO account_repo (uid, rev, commit_data_cid) VALUES ($1, $2, $3) ON CONFLICT (uid) DO UPDATE SET rev = EXCLUDED.rev, commit_data_cid = EXCLUDED.commit_data_cid WHERE account_repo.rev < EXCLUDED.rev",
381 .{ @as(i64, @intCast(uid)), rev, data_cid },
382 );
383 return (rows_affected orelse 0) > 0;
384 }
385
386 // --- account status ---
387
388 /// get the host_id for an account. returns 0 if not set.
389 pub fn getAccountHostId(self: *DiskPersist, uid: u64) !u64 {
390 var row = (try self.db.rowUnsafe(
391 "SELECT host_id FROM account WHERE uid = $1",
392 .{@as(i64, @intCast(uid))},
393 )) orelse return 0;
394 defer row.deinit() catch {};
395 const hid = row.get(i64, 0);
396 return if (hid > 0) @intCast(hid) else 0;
397 }
398
399 /// count accounts on a host (for rate limit scaling, matches Go relay's host.AccountCount)
400 pub fn getHostAccountCount(self: *DiskPersist, host_id: u64) u64 {
401 var row = (self.db.rowUnsafe(
402 "SELECT COUNT(*) FROM account WHERE host_id = $1",
403 .{@as(i64, @intCast(host_id))},
404 ) catch return 0) orelse return 0;
405 defer row.deinit() catch {};
406 const count = row.get(i64, 0);
407 return if (count > 0) @intCast(count) else 0;
408 }
409
410 /// effective account count for rate limit scaling.
411 /// uses admin-configured limit if set, otherwise actual COUNT(*).
412 pub fn getEffectiveAccountCount(self: *DiskPersist, host_id: u64) u64 {
413 var row = (self.db.rowUnsafe(
414 "SELECT COALESCE(h.account_limit, COUNT(a.uid)) FROM host h LEFT JOIN account a ON a.host_id = h.id WHERE h.id = $1 GROUP BY h.id",
415 .{@as(i64, @intCast(host_id))},
416 ) catch return 0) orelse return 0;
417 defer row.deinit() catch {};
418 const count = row.get(i64, 0);
419 return if (count > 0) @intCast(count) else 0;
420 }
421
422 /// set admin-configured account limit for a host (overrides COUNT(*) for rate limiting).
423 /// pass null to clear the override and revert to actual COUNT(*).
424 pub fn setHostAccountLimit(self: *DiskPersist, host_id: u64, limit: ?u64) !void {
425 if (limit) |l| {
426 const clamped: i64 = if (l > @as(u64, @intCast(std.math.maxInt(i64)))) std.math.maxInt(i64) else @intCast(l);
427 _ = try self.db.exec(
428 "UPDATE host SET account_limit = $2, updated_at = now() WHERE id = $1",
429 .{ @as(i64, @intCast(host_id)), clamped },
430 );
431 } else {
432 _ = try self.db.exec(
433 "UPDATE host SET account_limit = NULL, updated_at = now() WHERE id = $1",
434 .{@as(i64, @intCast(host_id))},
435 );
436 }
437 }
438
439 /// set the host_id for an account (first encounter or migration)
440 pub fn setAccountHostId(self: *DiskPersist, uid: u64, host_id: u64) !void {
441 _ = try self.db.exec(
442 "UPDATE account SET host_id = $2 WHERE uid = $1",
443 .{ @as(i64, @intCast(uid)), @as(i64, @intCast(host_id)) },
444 );
445 }
446
447 /// update the upstream status for an account (from #account events).
448 /// Go relay: account.go UpdateAccountUpstreamStatus
449 pub fn updateAccountUpstreamStatus(self: *DiskPersist, uid: u64, upstream_status: []const u8) !void {
450 _ = try self.db.exec(
451 "UPDATE account SET upstream_status = $2 WHERE uid = $1",
452 .{ @as(i64, @intCast(uid)), upstream_status },
453 );
454 }
455
456 /// check if an account is active (both local status and upstream status).
457 /// Go relay: models.Account.IsActive()
458 pub fn isAccountActive(self: *DiskPersist, uid: u64) !bool {
459 var row = (try self.db.rowUnsafe(
460 "SELECT status, upstream_status FROM account WHERE uid = $1",
461 .{@as(i64, @intCast(uid))},
462 )) orelse return false;
463 defer row.deinit() catch {};
464 const status = row.get([]const u8, 0);
465 const upstream = row.get([]const u8, 1);
466 // active if local is active AND upstream is active
467 const local_ok = std.mem.eql(u8, status, "active");
468 const upstream_ok = std.mem.eql(u8, upstream, "active");
469 return local_ok and upstream_ok;
470 }
471
472 // --- host management ---
473
474 pub const Host = struct {
475 id: u64,
476 hostname: []const u8,
477 status: []const u8,
478 last_seq: u64,
479 failed_attempts: u32,
480 account_limit: ?u64 = null,
481 };
482
483 /// get or create a host row. returns {id, last_seq}.
484 pub fn getOrCreateHost(self: *DiskPersist, hostname: []const u8) !struct { id: u64, last_seq: u64 } {
485 _ = self.db.exec(
486 "INSERT INTO host (hostname) VALUES ($1) ON CONFLICT (hostname) DO NOTHING",
487 .{hostname},
488 ) catch |err| {
489 log.warn("failed to create host {s}: {s}", .{ hostname, @errorName(err) });
490 return err;
491 };
492
493 var row = try self.db.rowUnsafe(
494 "SELECT id, last_seq FROM host WHERE hostname = $1",
495 .{hostname},
496 ) orelse return error.HostCreationFailed;
497 defer row.deinit() catch {};
498 return .{
499 .id = @intCast(row.get(i64, 0)),
500 .last_seq = @intCast(row.get(i64, 1)),
501 };
502 }
503
504 /// check if a host is banned or blocked by status
505 pub fn isHostBanned(self: *DiskPersist, hostname: []const u8) bool {
506 var row = self.db.rowUnsafe(
507 "SELECT status FROM host WHERE hostname = $1",
508 .{hostname},
509 ) catch return false;
510 if (row) |*r| {
511 defer r.deinit() catch {};
512 const status = r.get([]const u8, 0);
513 return std.mem.eql(u8, status, "banned") or std.mem.eql(u8, status, "blocked");
514 }
515 return false;
516 }
517
518 /// update cursor position for a host
519 pub fn updateHostSeq(self: *DiskPersist, host_id: u64, seq: u64) !void {
520 _ = try self.db.exec(
521 "UPDATE host SET last_seq = $2, updated_at = now() WHERE id = $1",
522 .{ @as(i64, @intCast(host_id)), @as(i64, @intCast(seq)) },
523 );
524 }
525
526 /// look up host ID by hostname. returns null if not found.
527 pub fn getHostIdForHostname(self: *DiskPersist, hostname: []const u8) !?u64 {
528 var row = (try self.db.rowUnsafe(
529 "SELECT id FROM host WHERE hostname = $1",
530 .{hostname},
531 )) orelse return null;
532 defer row.deinit() catch {};
533 return @intCast(row.get(i64, 0));
534 }
535
536 /// update host status (active, blocked, exhausted)
537 pub fn updateHostStatus(self: *DiskPersist, host_id: u64, status: []const u8) !void {
538 _ = try self.db.exec(
539 "UPDATE host SET status = $2, updated_at = now() WHERE id = $1",
540 .{ @as(i64, @intCast(host_id)), status },
541 );
542 }
543
544 /// list all active hosts
545 pub fn listActiveHosts(self: *DiskPersist, allocator: Allocator) ![]Host {
546 var hosts: std.ArrayListUnmanaged(Host) = .{};
547 errdefer {
548 for (hosts.items) |h| {
549 allocator.free(h.hostname);
550 allocator.free(h.status);
551 }
552 hosts.deinit(allocator);
553 }
554
555 var result = try self.db.query(
556 "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host WHERE status = 'active' ORDER BY id ASC",
557 .{},
558 );
559 defer result.deinit();
560
561 while (result.nextUnsafe() catch null) |row| {
562 try hosts.append(allocator, .{
563 .id = @intCast(row.get(i64, 0)),
564 .hostname = try allocator.dupe(u8, row.get([]const u8, 1)),
565 .status = try allocator.dupe(u8, row.get([]const u8, 2)),
566 .last_seq = @intCast(row.get(i64, 3)),
567 .failed_attempts = @intCast(row.get(i32, 4)),
568 .account_limit = if (row.get(?i64, 5)) |v| @as(?u64, @intCast(v)) else null,
569 });
570 }
571
572 return try hosts.toOwnedSlice(allocator);
573 }
574
575 /// list all hosts (any status) for admin view
576 pub fn listAllHosts(self: *DiskPersist, allocator: Allocator) ![]Host {
577 var hosts: std.ArrayListUnmanaged(Host) = .{};
578 errdefer {
579 for (hosts.items) |h| {
580 allocator.free(h.hostname);
581 allocator.free(h.status);
582 }
583 hosts.deinit(allocator);
584 }
585
586 var result = try self.db.query(
587 "SELECT id, hostname, status, last_seq, failed_attempts, account_limit FROM host ORDER BY id ASC",
588 .{},
589 );
590 defer result.deinit();
591
592 while (result.nextUnsafe() catch null) |row| {
593 try hosts.append(allocator, .{
594 .id = @intCast(row.get(i64, 0)),
595 .hostname = try allocator.dupe(u8, row.get([]const u8, 1)),
596 .status = try allocator.dupe(u8, row.get([]const u8, 2)),
597 .last_seq = @intCast(row.get(i64, 3)),
598 .failed_attempts = @intCast(row.get(i32, 4)),
599 .account_limit = if (row.get(?i64, 5)) |v| @as(?u64, @intCast(v)) else null,
600 });
601 }
602
603 return try hosts.toOwnedSlice(allocator);
604 }
605
606 /// increment failure count and return new value
607 pub fn incrementHostFailures(self: *DiskPersist, host_id: u64) !u32 {
608 _ = try self.db.exec(
609 "UPDATE host SET failed_attempts = failed_attempts + 1, updated_at = now() WHERE id = $1",
610 .{@as(i64, @intCast(host_id))},
611 );
612 var row = try self.db.rowUnsafe(
613 "SELECT failed_attempts FROM host WHERE id = $1",
614 .{@as(i64, @intCast(host_id))},
615 ) orelse return 0;
616 defer row.deinit() catch {};
617 return @intCast(row.get(i32, 0));
618 }
619
620 /// check if a hostname (or any parent domain) is banned.
621 /// Go relay: domain_ban.go DomainIsBanned — suffix-based check.
622 pub fn isDomainBanned(self: *DiskPersist, hostname: []const u8) bool {
623 // check each suffix: "pds.host.example.com", "host.example.com", "example.com"
624 var offset: usize = 0;
625 while (offset < hostname.len) {
626 const suffix = hostname[offset..];
627 var row = self.db.rowUnsafe(
628 "SELECT 1 FROM domain_ban WHERE domain = $1",
629 .{suffix},
630 ) catch return false;
631 if (row) |*r| {
632 r.deinit() catch {};
633 return true;
634 }
635 // advance past next dot
636 if (std.mem.indexOfScalarPos(u8, hostname, offset, '.')) |dot| {
637 offset = dot + 1;
638 } else break;
639 }
640 return false;
641 }
642
643 /// reset failure count (on successful connection)
644 pub fn resetHostFailures(self: *DiskPersist, host_id: u64) !void {
645 _ = try self.db.exec(
646 "UPDATE host SET failed_attempts = 0, updated_at = now() WHERE id = $1",
647 .{@as(i64, @intCast(host_id))},
648 );
649 }
650
651 /// persist an event. assigns a sequence number. returns the assigned seq.
652 /// the event is buffered and will be flushed to disk asynchronously.
653 pub fn persist(self: *DiskPersist, kind: EvtKind, uid: u64, payload: []const u8) !u64 {
654 // build the on-disk record: header + payload
655 const total_len = header_size + payload.len;
656 const data = try self.allocator.alloc(u8, total_len);
657 errdefer self.allocator.free(data);
658
659 // write header (seq filled in later under lock)
660 const header = EvtHeader{
661 .flags = 0,
662 .kind = @intFromEnum(kind),
663 .len = @intCast(payload.len),
664 .uid = uid,
665 .seq = 0, // placeholder
666 };
667 header.encode(data[0..header_size]);
668 @memcpy(data[header_size..], payload);
669
670 self.mutex.lock();
671 defer self.mutex.unlock();
672
673 // assign seq
674 const seq = self.cur_seq;
675 self.cur_seq += 1;
676 std.mem.writeInt(u64, data[20..28], seq, .little);
677
678 try self.evtbuf.append(self.allocator, .{ .data = data, .seq = seq });
679 try self.outbuf.appendSlice(self.allocator, data);
680 self.event_counter += 1;
681
682 // flush if threshold hit
683 if (self.evtbuf.items.len >= default_flush_threshold) {
684 try self.flushLocked();
685 }
686
687 return seq;
688 }
689
690 /// playback events with seq > since. calls cb for each event.
691 pub fn playback(self: *DiskPersist, since: u64, allocator: Allocator, entries: *std.ArrayListUnmanaged(PlaybackEntry)) !void {
692 self.mutex.lock();
693 defer self.mutex.unlock();
694
695 const since_i: i64 = @intCast(since);
696
697 // find the log file containing `since`
698 var start_files: std.ArrayListUnmanaged(LogFileRef) = .{};
699 defer start_files.deinit(allocator);
700
701 if (since > 0) {
702 // find file whose seq_start is just before `since`
703 if (try self.db.rowUnsafe(
704 "SELECT id, path, seq_start FROM log_file_refs WHERE seq_start <= $1 ORDER BY seq_start DESC LIMIT 1",
705 .{since_i},
706 )) |row| {
707 var r = row;
708 defer r.deinit() catch {};
709 try start_files.append(allocator, .{
710 .path = try allocator.dupe(u8, r.get([]const u8, 1)),
711 .seq_start = @intCast(r.get(i64, 2)),
712 });
713 }
714 }
715
716 // find all subsequent files
717 {
718 var result = try self.db.query(
719 "SELECT id, path, seq_start FROM log_file_refs WHERE seq_start > $1 ORDER BY seq_start ASC",
720 .{since_i},
721 );
722 defer result.deinit();
723 while (result.nextUnsafe() catch null) |r| {
724 try start_files.append(allocator, .{
725 .path = try allocator.dupe(u8, r.get([]const u8, 1)),
726 .seq_start = @intCast(r.get(i64, 2)),
727 });
728 }
729 }
730
731 defer for (start_files.items) |f| allocator.free(f.path);
732
733 // read events from each file
734 for (start_files.items) |ref| {
735 var file = self.dir.openFile(ref.path, .{}) catch continue;
736 defer file.close();
737 try readEventsFrom(allocator, file, since, entries);
738 }
739 }
740
741 /// last assigned sequence number, or null if empty
742 pub fn lastSeq(self: *DiskPersist) ?u64 {
743 if (self.cur_seq <= 1) return null;
744 return self.cur_seq - 1;
745 }
746
747 /// garbage-collect log files older than the retention period
748 pub fn gc(self: *DiskPersist) !void {
749 self.mutex.lock();
750 defer self.mutex.unlock();
751
752 const cutoff_interval = try std.fmt.allocPrint(self.allocator, "{d} hours", .{self.retention_hours});
753 defer self.allocator.free(cutoff_interval);
754
755 // find expired refs
756 var expired: std.ArrayListUnmanaged(GcRef) = .{};
757 defer {
758 for (expired.items) |e| self.allocator.free(e.path);
759 expired.deinit(self.allocator);
760 }
761
762 {
763 var result = try self.db.query(
764 "SELECT id, path FROM log_file_refs WHERE created_at < now() - $1::interval",
765 .{cutoff_interval},
766 );
767 defer result.deinit();
768 while (result.nextUnsafe() catch null) |r| {
769 try expired.append(self.allocator, .{
770 .id = r.get(i64, 0),
771 .path = try self.allocator.dupe(u8, r.get([]const u8, 1)),
772 });
773 }
774 }
775
776 for (expired.items) |ref| {
777 // skip current file
778 if (self.current_file_path) |cur| {
779 if (std.mem.eql(u8, ref.path, cur)) continue;
780 }
781
782 // delete db record first (prevents playback from finding it)
783 _ = self.db.exec("DELETE FROM log_file_refs WHERE id = $1", .{ref.id}) catch |err| {
784 log.warn("gc: failed to delete db record {d}: {s}", .{ ref.id, @errorName(err) });
785 continue;
786 };
787
788 // delete file
789 self.dir.deleteFile(ref.path) catch |err| {
790 log.warn("gc: failed to delete {s}: {s}", .{ ref.path, @errorName(err) });
791 };
792 }
793
794 if (expired.items.len > 0) {
795 log.info("gc: removed {d} expired log files", .{expired.items.len});
796 }
797 }
798
799 /// take down all events for a user (set flag + zero payload)
800 pub fn takeDownUser(self: *DiskPersist, uid: u64) !void {
801 self.mutex.lock();
802 defer self.mutex.unlock();
803
804 // iterate all log files
805 var refs: std.ArrayListUnmanaged([]const u8) = .{};
806 defer {
807 for (refs.items) |p| self.allocator.free(p);
808 refs.deinit(self.allocator);
809 }
810
811 {
812 var result = try self.db.query("SELECT path FROM log_file_refs ORDER BY seq_start DESC", .{});
813 defer result.deinit();
814 while (result.nextUnsafe() catch null) |r| {
815 try refs.append(self.allocator, try self.allocator.dupe(u8, r.get([]const u8, 0)));
816 }
817 }
818
819 for (refs.items) |path| {
820 var file = self.dir.openFile(path, .{ .mode = .read_write }) catch continue;
821 defer file.close();
822 mutateUserEventsInFile(file, uid) catch |err| {
823 log.warn("takedown: failed to process {s}: {s}", .{ path, @errorName(err) });
824 };
825 }
826 }
827
828 // --- internals ---
829
830 fn resumeLog(self: *DiskPersist) !void {
831 // find most recent log file
832 if (try self.db.rowUnsafe(
833 "SELECT id, path, seq_start FROM log_file_refs ORDER BY seq_start DESC LIMIT 1",
834 .{},
835 )) |row| {
836 var r = row;
837 defer r.deinit() catch {};
838 const path = r.get([]const u8, 1);
839 const seq_start: u64 = @intCast(r.get(i64, 2));
840
841 var file = self.dir.openFile(path, .{ .mode = .read_write }) catch {
842 // file missing, start fresh
843 try self.initLogFile();
844 return;
845 };
846
847 // scan for last seq
848 const last_seq = scanForLastSeq(file) catch {
849 file.close();
850 try self.initLogFile();
851 return;
852 };
853
854 if (last_seq) |ls| {
855 self.cur_seq = ls + 1;
856 log.info("recovered seq from disk: last_seq={d}", .{ls});
857 } else {
858 self.cur_seq = if (seq_start > 0) seq_start else 1;
859 }
860
861 // seek to end for appending
862 const stat = try file.stat();
863 try file.seekTo(stat.size);
864
865 self.current_file = file;
866 self.current_file_path = try self.allocator.dupe(u8, path);
867 } else {
868 try self.initLogFile();
869 }
870 }
871
872 fn initLogFile(self: *DiskPersist) !void {
873 try self.createLogFile(self.cur_seq);
874 }
875
876 fn createLogFile(self: *DiskPersist, start_seq: u64) !void {
877 var name_buf: [64]u8 = undefined;
878 const name = std.fmt.bufPrint(&name_buf, "evts-{d}", .{start_seq}) catch unreachable;
879
880 if (self.current_file) |f| f.close();
881 if (self.current_file_path) |p| self.allocator.free(p);
882
883 self.current_file = try self.dir.createFile(name, .{ .truncate = false });
884 self.current_file_path = try self.allocator.dupe(u8, name);
885
886 // register in Postgres
887 _ = try self.db.exec(
888 "INSERT INTO log_file_refs (path, seq_start) VALUES ($1, $2)",
889 .{ name, @as(i64, @intCast(start_seq)) },
890 );
891
892 self.event_counter = 0;
893 }
894
895 fn flushLocked(self: *DiskPersist) !void {
896 if (self.evtbuf.items.len == 0) return;
897
898 // write buffered bytes to current file
899 const file = self.current_file orelse return;
900 file.writeAll(self.outbuf.items) catch |err| {
901 log.err("flush: write failed: {s}", .{@errorName(err)});
902 return err;
903 };
904
905 // clear buffers
906 self.outbuf.clearRetainingCapacity();
907
908 // free job data
909 for (self.evtbuf.items) |job| {
910 self.allocator.free(job.data);
911 }
912 self.evtbuf.clearRetainingCapacity();
913
914 // check if we need to rotate
915 if (self.event_counter >= self.events_per_file) {
916 self.createLogFile(self.cur_seq) catch |err| {
917 log.err("flush: log rotation failed: {s}", .{@errorName(err)});
918 };
919 }
920 }
921
922 fn flushLoop(self: *DiskPersist) void {
923 while (self.alive.load(.acquire)) {
924 // wait for flush interval or signal
925 {
926 self.mutex.lock();
927 defer self.mutex.unlock();
928 self.flush_cond.timedWait(&self.mutex, default_flush_interval_ms * std.time.ns_per_ms) catch {};
929 self.flushLocked() catch {};
930 }
931 }
932 }
933};
934
935// --- playback types ---
936
937pub const PlaybackEntry = struct {
938 seq: u64,
939 kind: u32,
940 uid: u64,
941 data: []const u8, // payload only (owned by caller)
942};
943
944const LogFileRef = struct {
945 path: []const u8,
946 seq_start: u64,
947};
948
949const GcRef = struct {
950 id: i64,
951 path: []const u8,
952};
953
954// --- file-level operations ---
955
956fn readEventsFrom(allocator: Allocator, file: std.fs.File, since: u64, result: *std.ArrayListUnmanaged(PlaybackEntry)) !void {
957 const file_size = (try file.stat()).size;
958
959 // if since > 0, scan to the right position
960 if (since > 0) {
961 seekToSeq(file, since, file_size) catch return;
962 }
963
964 // read events
965 while (true) {
966 var hdr_buf: [header_size]u8 = undefined;
967 const n = file.readAll(&hdr_buf) catch break;
968 if (n < header_size) break;
969
970 const hdr = EvtHeader.decode(&hdr_buf);
971
972 // skip taken down / rebased events
973 if (hdr.flags & (EvtFlags.takedown | EvtFlags.rebased) != 0) {
974 file.seekBy(@intCast(hdr.len)) catch break;
975 continue;
976 }
977
978 if (hdr.seq <= since) {
979 file.seekBy(@intCast(hdr.len)) catch break;
980 continue;
981 }
982
983 // read payload
984 const data = allocator.alloc(u8, hdr.len) catch break;
985 const read_n = file.readAll(data) catch {
986 allocator.free(data);
987 break;
988 };
989 if (read_n < hdr.len) {
990 allocator.free(data);
991 break;
992 }
993
994 result.append(allocator, .{
995 .seq = hdr.seq,
996 .kind = hdr.kind,
997 .uid = hdr.uid,
998 .data = data,
999 }) catch {
1000 allocator.free(data);
1001 break;
1002 };
1003 }
1004}
1005
1006/// scan file headers to seek to the first event with seq > target
1007fn seekToSeq(file: std.fs.File, target: u64, file_size: u64) !void {
1008 try file.seekTo(0);
1009 var pos: u64 = 0;
1010 while (pos + header_size <= file_size) {
1011 var hdr_buf: [header_size]u8 = undefined;
1012 const n = try file.readAll(&hdr_buf);
1013 if (n < header_size) break;
1014
1015 const hdr = EvtHeader.decode(&hdr_buf);
1016 if (hdr.seq > target) {
1017 // seek back to start of this header
1018 try file.seekTo(pos);
1019 return;
1020 }
1021 pos += header_size + hdr.len;
1022 try file.seekTo(pos);
1023 }
1024}
1025
1026/// scan a file for the last sequence number
1027fn scanForLastSeq(file: std.fs.File) !?u64 {
1028 try file.seekTo(0);
1029 const file_size = (try file.stat()).size;
1030
1031 var last_seq: ?u64 = null;
1032 var pos: u64 = 0;
1033 while (pos + header_size <= file_size) {
1034 var hdr_buf: [header_size]u8 = undefined;
1035 try file.seekTo(pos);
1036 const n = try file.readAll(&hdr_buf);
1037 if (n < header_size) break;
1038
1039 const hdr = EvtHeader.decode(&hdr_buf);
1040 last_seq = hdr.seq;
1041 pos += header_size + hdr.len;
1042 }
1043 return last_seq;
1044}
1045
1046/// set takedown flag + zero payload for all events belonging to uid
1047fn mutateUserEventsInFile(file: std.fs.File, uid: u64) !void {
1048 const file_size = (try file.stat()).size;
1049 var pos: u64 = 0;
1050
1051 while (pos + header_size <= file_size) {
1052 var hdr_buf: [header_size]u8 = undefined;
1053 try file.seekTo(pos);
1054 const n = try file.readAll(&hdr_buf);
1055 if (n < header_size) break;
1056
1057 const hdr = EvtHeader.decode(&hdr_buf);
1058
1059 if (hdr.uid == uid and (hdr.flags & EvtFlags.takedown) == 0) {
1060 // set takedown flag
1061 const new_flags = hdr.flags | EvtFlags.takedown;
1062 var flags_buf: [4]u8 = undefined;
1063 std.mem.writeInt(u32, &flags_buf, new_flags, .little);
1064 try file.seekTo(pos);
1065 try file.writeAll(&flags_buf);
1066
1067 // zero the payload
1068 const payload_start = pos + header_size;
1069 try file.seekTo(payload_start);
1070 var zeros: [4096]u8 = [_]u8{0} ** 4096;
1071 var remaining: u64 = hdr.len;
1072 while (remaining > 0) {
1073 const chunk = @min(remaining, zeros.len);
1074 try file.writeAll(zeros[0..chunk]);
1075 remaining -= chunk;
1076 }
1077 }
1078
1079 pos += header_size + hdr.len;
1080 }
1081}
1082
1083// === tests ===
1084
1085test "header encode/decode roundtrip" {
1086 const hdr = EvtHeader{
1087 .flags = 0,
1088 .kind = @intFromEnum(EvtKind.commit),
1089 .len = 1234,
1090 .uid = 42,
1091 .seq = 99999,
1092 };
1093 var buf: [header_size]u8 = undefined;
1094 hdr.encode(&buf);
1095 const decoded = EvtHeader.decode(&buf);
1096 try std.testing.expectEqual(hdr.flags, decoded.flags);
1097 try std.testing.expectEqual(hdr.kind, decoded.kind);
1098 try std.testing.expectEqual(hdr.len, decoded.len);
1099 try std.testing.expectEqual(hdr.uid, decoded.uid);
1100 try std.testing.expectEqual(hdr.seq, decoded.seq);
1101}
1102
1103test "header is little-endian" {
1104 const hdr = EvtHeader{ .flags = 0, .kind = 1, .len = 256, .uid = 0, .seq = 1 };
1105 var buf: [header_size]u8 = undefined;
1106 hdr.encode(&buf);
1107 // len=256 in LE is 0x00 0x01 0x00 0x00
1108 try std.testing.expectEqual(@as(u8, 0x00), buf[8]);
1109 try std.testing.expectEqual(@as(u8, 0x01), buf[9]);
1110}
1111
1112fn requireDatabaseUrl() ![]const u8 {
1113 return std.posix.getenv("DATABASE_URL") orelse return error.SkipZigTest;
1114}
1115
1116test "persist and playback" {
1117 const database_url = try requireDatabaseUrl();
1118
1119 var tmp = std.testing.tmpDir(.{});
1120 defer tmp.cleanup();
1121
1122 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp);
1123 defer std.testing.allocator.free(dir_path);
1124
1125 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url);
1126 defer dp.deinit();
1127
1128 // persist some events (sync flush, no background thread)
1129 const seq1 = try dp.persist(.commit, 1, "payload-one");
1130 const seq2 = try dp.persist(.identity, 2, "payload-two");
1131 const seq3 = try dp.persist(.commit, 1, "payload-three");
1132
1133 try std.testing.expectEqual(@as(u64, 1), seq1);
1134 try std.testing.expectEqual(@as(u64, 2), seq2);
1135 try std.testing.expectEqual(@as(u64, 3), seq3);
1136
1137 // flush manually
1138 {
1139 dp.mutex.lock();
1140 defer dp.mutex.unlock();
1141 try dp.flushLocked();
1142 }
1143
1144 // playback from cursor=0 → all events
1145 var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{};
1146 defer {
1147 for (entries.items) |e| std.testing.allocator.free(e.data);
1148 entries.deinit(std.testing.allocator);
1149 }
1150 try dp.playback(0, std.testing.allocator, &entries);
1151
1152 try std.testing.expectEqual(@as(usize, 3), entries.items.len);
1153 try std.testing.expectEqualStrings("payload-one", entries.items[0].data);
1154 try std.testing.expectEqual(@as(u64, 1), entries.items[0].seq);
1155 try std.testing.expectEqualStrings("payload-two", entries.items[1].data);
1156 try std.testing.expectEqualStrings("payload-three", entries.items[2].data);
1157}
1158
1159test "playback with cursor" {
1160 const database_url = try requireDatabaseUrl();
1161
1162 var tmp = std.testing.tmpDir(.{});
1163 defer tmp.cleanup();
1164
1165 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp);
1166 defer std.testing.allocator.free(dir_path);
1167
1168 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url);
1169 defer dp.deinit();
1170
1171 _ = try dp.persist(.commit, 1, "a");
1172 _ = try dp.persist(.commit, 2, "b");
1173 _ = try dp.persist(.commit, 3, "c");
1174 {
1175 dp.mutex.lock();
1176 defer dp.mutex.unlock();
1177 try dp.flushLocked();
1178 }
1179
1180 // playback from cursor=2 → only seq 3
1181 var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{};
1182 defer {
1183 for (entries.items) |e| std.testing.allocator.free(e.data);
1184 entries.deinit(std.testing.allocator);
1185 }
1186 try dp.playback(2, std.testing.allocator, &entries);
1187
1188 try std.testing.expectEqual(@as(usize, 1), entries.items.len);
1189 try std.testing.expectEqual(@as(u64, 3), entries.items[0].seq);
1190 try std.testing.expectEqualStrings("c", entries.items[0].data);
1191}
1192
1193test "seq recovery after reinit" {
1194 const database_url = try requireDatabaseUrl();
1195
1196 var tmp = std.testing.tmpDir(.{});
1197 defer tmp.cleanup();
1198
1199 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp);
1200 defer std.testing.allocator.free(dir_path);
1201
1202 // write some events
1203 {
1204 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url);
1205 defer dp.deinit();
1206 _ = try dp.persist(.commit, 1, "x");
1207 _ = try dp.persist(.commit, 2, "y");
1208 _ = try dp.persist(.account, 3, "z");
1209 dp.mutex.lock();
1210 defer dp.mutex.unlock();
1211 try dp.flushLocked();
1212 }
1213
1214 // reinit — should recover seq
1215 {
1216 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url);
1217 defer dp.deinit();
1218 try std.testing.expectEqual(@as(u64, 3), dp.lastSeq().?);
1219 const seq4 = try dp.persist(.commit, 1, "w");
1220 try std.testing.expectEqual(@as(u64, 4), seq4);
1221 }
1222}
1223
1224test "takedown zeros payload" {
1225 const database_url = try requireDatabaseUrl();
1226
1227 var tmp = std.testing.tmpDir(.{});
1228 defer tmp.cleanup();
1229
1230 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp);
1231 defer std.testing.allocator.free(dir_path);
1232
1233 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url);
1234 defer dp.deinit();
1235
1236 _ = try dp.persist(.commit, 42, "secret-data");
1237 _ = try dp.persist(.commit, 99, "other-data");
1238 {
1239 dp.mutex.lock();
1240 defer dp.mutex.unlock();
1241 try dp.flushLocked();
1242 }
1243
1244 // take down user 42
1245 try dp.takeDownUser(42);
1246
1247 // playback should skip user 42's events
1248 var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{};
1249 defer {
1250 for (entries.items) |e| std.testing.allocator.free(e.data);
1251 entries.deinit(std.testing.allocator);
1252 }
1253 try dp.playback(0, std.testing.allocator, &entries);
1254
1255 try std.testing.expectEqual(@as(usize, 1), entries.items.len);
1256 try std.testing.expectEqualStrings("other-data", entries.items[0].data);
1257}
1258
1259test "uidForDid assigns and caches UIDs" {
1260 const database_url = try requireDatabaseUrl();
1261
1262 var tmp = std.testing.tmpDir(.{});
1263 defer tmp.cleanup();
1264
1265 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp);
1266 defer std.testing.allocator.free(dir_path);
1267
1268 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url);
1269 defer dp.deinit();
1270
1271 // first call creates the account
1272 const uid1 = try dp.uidForDid("did:plc:alice");
1273 try std.testing.expect(uid1 > 0);
1274
1275 // same DID returns same UID (from cache)
1276 const uid1_again = try dp.uidForDid("did:plc:alice");
1277 try std.testing.expectEqual(uid1, uid1_again);
1278
1279 // different DID gets a different UID
1280 const uid2 = try dp.uidForDid("did:plc:bob");
1281 try std.testing.expect(uid2 > 0);
1282 try std.testing.expect(uid1 != uid2);
1283}
1284
1285test "uidForDid survives reinit" {
1286 const database_url = try requireDatabaseUrl();
1287
1288 var tmp = std.testing.tmpDir(.{});
1289 defer tmp.cleanup();
1290
1291 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp);
1292 defer std.testing.allocator.free(dir_path);
1293
1294 var uid1: u64 = undefined;
1295 {
1296 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url);
1297 defer dp.deinit();
1298 uid1 = try dp.uidForDid("did:plc:carol");
1299 }
1300
1301 // reinit — UID should be the same from database
1302 {
1303 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url);
1304 defer dp.deinit();
1305 const uid1_again = try dp.uidForDid("did:plc:carol");
1306 try std.testing.expectEqual(uid1, uid1_again);
1307 }
1308}
1309
1310test "takedown with real UIDs" {
1311 const database_url = try requireDatabaseUrl();
1312
1313 var tmp = std.testing.tmpDir(.{});
1314 defer tmp.cleanup();
1315
1316 const dir_path = try tmpDirRealPath(std.testing.allocator, tmp);
1317 defer std.testing.allocator.free(dir_path);
1318
1319 var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url);
1320 defer dp.deinit();
1321
1322 const alice_uid = try dp.uidForDid("did:plc:alice");
1323 const bob_uid = try dp.uidForDid("did:plc:bob");
1324
1325 _ = try dp.persist(.commit, alice_uid, "alice-post");
1326 _ = try dp.persist(.commit, bob_uid, "bob-post");
1327 _ = try dp.persist(.commit, alice_uid, "alice-post-2");
1328 {
1329 dp.mutex.lock();
1330 defer dp.mutex.unlock();
1331 try dp.flushLocked();
1332 }
1333
1334 // take down alice
1335 try dp.takeDownUser(alice_uid);
1336
1337 // playback should only have bob's event
1338 var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{};
1339 defer {
1340 for (entries.items) |e| std.testing.allocator.free(e.data);
1341 entries.deinit(std.testing.allocator);
1342 }
1343 try dp.playback(0, std.testing.allocator, &entries);
1344
1345 try std.testing.expectEqual(@as(usize, 1), entries.items.len);
1346 try std.testing.expectEqualStrings("bob-post", entries.items[0].data);
1347 try std.testing.expectEqual(bob_uid, entries.items[0].uid);
1348}
1349
1350fn tmpDirRealPath(allocator: Allocator, tmp: std.testing.TmpDir) ![]const u8 {
1351 var buf: [std.fs.max_path_bytes]u8 = undefined;
1352 const real = try tmp.dir.realpath(".", &buf);
1353 return try allocator.dupe(u8, real);
1354}