//! disk persistence matching indigo's diskpersist format //! //! append-only log files with relay-assigned sequence numbers. //! Postgres metadata index for fast cursor→file lookup. //! //! on-disk entry format (28-byte LE header + CBOR payload): //! [4B flags LE] [4B kind LE] [4B payload_len LE] [8B uid LE] [8B seq LE] [payload] //! //! file naming: evts-{startSeq} (rotated every events_per_file events) //! //! see: indigo cmd/relay/stream/persist/diskpersist/diskpersist.go const std = @import("std"); const pg = @import("pg"); const lru = @import("lru.zig"); const Allocator = std.mem.Allocator; const log = std.log.scoped(.relay); // --- constants matching indigo --- pub const header_size: usize = 28; // 4 + 4 + 4 + 8 + 8 pub const EvtKind = enum(u32) { commit = 1, handle = 2, // deprecated tombstone = 3, // deprecated identity = 4, account = 5, sync = 6, }; pub const EvtFlags = struct { pub const takedown: u32 = 1; pub const rebased: u32 = 2; }; const default_events_per_file: u32 = 10_000; const default_flush_interval_ms: u64 = 100; const default_flush_threshold: usize = 400; // --- header --- pub const EvtHeader = struct { flags: u32, kind: u32, len: u32, // payload length (not including header) uid: u64, seq: u64, pub fn encode(self: EvtHeader, buf: *[header_size]u8) void { std.mem.writeInt(u32, buf[0..4], self.flags, .little); std.mem.writeInt(u32, buf[4..8], self.kind, .little); std.mem.writeInt(u32, buf[8..12], self.len, .little); std.mem.writeInt(u64, buf[12..20], self.uid, .little); std.mem.writeInt(u64, buf[20..28], self.seq, .little); } pub fn decode(buf: *const [header_size]u8) EvtHeader { return .{ .flags = std.mem.readInt(u32, buf[0..4], .little), .kind = std.mem.readInt(u32, buf[4..8], .little), .len = std.mem.readInt(u32, buf[8..12], .little), .uid = std.mem.readInt(u64, buf[12..20], .little), .seq = std.mem.readInt(u64, buf[20..28], .little), }; } }; // --- persist job (write buffer entry) --- const PersistJob = struct { data: []u8, // header + payload, owned seq: u64, // assigned seq (for broadcast ordering) }; // --- disk persistence --- pub const DiskPersist = struct { allocator: Allocator, dir_path: []const u8, dir: std.fs.Dir, db: *pg.Pool, current_file: ?std.fs.File = null, current_file_path: ?[]const u8 = null, // sequence state cur_seq: u64 = 1, event_counter: u64 = 0, // config events_per_file: u32 = default_events_per_file, retention_hours: u64 = 72, // 3 days // DID → UID cache (matches indigo's bidirectional ARC cache) did_cache: lru.LruCache(u64), // write buffer (flushed periodically or when threshold hit) outbuf: std.ArrayListUnmanaged(u8) = .{}, evtbuf: std.ArrayListUnmanaged(PersistJob) = .{}, mutex: std.Thread.Mutex = .{}, // flush thread flush_thread: ?std.Thread = null, alive: std.atomic.Value(bool) = .{ .raw = true }, flush_cond: std.Thread.Condition = .{}, /// current evtbuf entry count (for metrics — non-blocking, returns 0 if lock is contended) pub fn evtbufLen(self: *DiskPersist) usize { if (!self.mutex.tryLock()) return 0; defer self.mutex.unlock(); return self.evtbuf.items.len; } /// current DID cache entry count (for metrics) pub fn didCacheLen(self: *DiskPersist) u32 { return self.did_cache.count(); } /// DID cache hashmap backing capacity (for memory attribution) pub fn didCacheMapCap(self: *DiskPersist) u32 { return self.did_cache.mapCapacity(); } /// evtbuf allocated capacity in jobs (for memory attribution — non-blocking) pub fn evtbufCap(self: *DiskPersist) usize { if (!self.mutex.tryLock()) return 0; defer self.mutex.unlock(); return self.evtbuf.capacity; } /// outbuf allocated capacity in bytes (for memory attribution — non-blocking) pub fn outbufCap(self: *DiskPersist) usize { if (!self.mutex.tryLock()) return 0; defer self.mutex.unlock(); return self.outbuf.capacity; } pub fn init(allocator: Allocator, dir_path: []const u8, database_url: []const u8) !DiskPersist { // ensure directory exists std.fs.cwd().makePath(dir_path) catch |err| switch (err) { error.PathAlreadyExists => {}, else => return err, }; var dir = try std.fs.cwd().openDir(dir_path, .{ .iterate = true }); errdefer dir.close(); // connect to Postgres const uri = std.Uri.parse(database_url) catch return error.InvalidDatabaseUrl; const pool = try pg.Pool.initUri(allocator, uri, .{ .size = 5 }); errdefer pool.deinit(); // create tables (matching indigo's Go relay schema) _ = try pool.exec( \\CREATE TABLE IF NOT EXISTS log_file_refs ( \\ id BIGSERIAL PRIMARY KEY, \\ path TEXT NOT NULL, \\ archived BOOLEAN NOT NULL DEFAULT false, \\ seq_start BIGINT NOT NULL, \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now() \\) , .{}); _ = try pool.exec( \\CREATE TABLE IF NOT EXISTS account ( \\ uid BIGSERIAL PRIMARY KEY, \\ did TEXT NOT NULL UNIQUE, \\ host_id BIGINT NOT NULL DEFAULT 0, \\ status TEXT NOT NULL DEFAULT 'active', \\ upstream_status TEXT NOT NULL DEFAULT 'active', \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now() \\) , .{}); // migration: add columns if they don't exist (for existing deployments) _ = pool.exec("ALTER TABLE account ADD COLUMN IF NOT EXISTS host_id BIGINT NOT NULL DEFAULT 0", .{}) catch {}; _ = pool.exec("ALTER TABLE account ADD COLUMN IF NOT EXISTS upstream_status TEXT NOT NULL DEFAULT 'active'", .{}) catch {}; _ = try pool.exec( \\CREATE TABLE IF NOT EXISTS account_repo ( \\ uid BIGINT PRIMARY KEY REFERENCES account(uid), \\ rev TEXT NOT NULL, \\ commit_data_cid TEXT NOT NULL \\) , .{}); _ = try pool.exec( \\CREATE TABLE IF NOT EXISTS domain_ban ( \\ id BIGSERIAL PRIMARY KEY, \\ domain TEXT NOT NULL UNIQUE, \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now() \\) , .{}); _ = try pool.exec( \\CREATE TABLE IF NOT EXISTS host ( \\ id BIGSERIAL PRIMARY KEY, \\ hostname TEXT NOT NULL UNIQUE, \\ status TEXT NOT NULL DEFAULT 'active', \\ last_seq BIGINT NOT NULL DEFAULT 0, \\ failed_attempts INTEGER NOT NULL DEFAULT 0, \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now(), \\ updated_at TIMESTAMPTZ NOT NULL DEFAULT now() \\) , .{}); _ = try pool.exec( \\CREATE TABLE IF NOT EXISTS backfill_progress ( \\ collection TEXT NOT NULL, \\ source TEXT NOT NULL, \\ cursor TEXT NOT NULL DEFAULT '', \\ imported_count BIGINT NOT NULL DEFAULT 0, \\ completed_at TIMESTAMPTZ, \\ created_at TIMESTAMPTZ NOT NULL DEFAULT now(), \\ PRIMARY KEY (collection, source) \\) , .{}); // migrate: old schema had collection as sole PK — add source to composite PK _ = pool.exec( \\DO $$ BEGIN \\ IF EXISTS ( \\ SELECT 1 FROM pg_constraint \\ WHERE conname = 'backfill_progress_pkey' \\ AND conrelid = 'backfill_progress'::regclass \\ AND array_length(conkey, 1) = 1 \\ ) THEN \\ ALTER TABLE backfill_progress DROP CONSTRAINT backfill_progress_pkey; \\ ALTER TABLE backfill_progress ADD PRIMARY KEY (collection, source); \\ END IF; \\END $$ , .{}) catch {}; var self = DiskPersist{ .allocator = allocator, .dir_path = try allocator.dupe(u8, dir_path), .dir = dir, .db = pool, .did_cache = lru.LruCache(u64).init(allocator, 500_000), }; // recover from existing log files try self.resumeLog(); return self; } pub fn deinit(self: *DiskPersist) void { // stop flush thread self.alive.store(false, .release); self.flush_cond.signal(); if (self.flush_thread) |t| t.join(); // flush remaining self.mutex.lock(); self.flushLocked() catch {}; self.mutex.unlock(); // free write buffer for (self.evtbuf.items) |job| self.allocator.free(job.data); self.evtbuf.deinit(self.allocator); self.outbuf.deinit(self.allocator); self.did_cache.deinit(); if (self.current_file) |f| f.close(); if (self.current_file_path) |p| self.allocator.free(p); self.dir.close(); self.db.deinit(); self.allocator.free(self.dir_path); } /// start the background flush thread pub fn start(self: *DiskPersist) !void { self.flush_thread = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, flushLoop, .{self}); } pub const UidResult = struct { uid: u64, host_changed: bool = false, is_new: bool = false, }; /// resolve a DID to a numeric UID, checking host association. /// does NOT set host_id — caller must verify authority first via DID doc /// resolution, then call setAccountHostId on success. /// returns is_new=true when no host_id is set yet (first encounter), /// host_changed=true when host_id differs from stored value. pub fn uidForDidFromHost(self: *DiskPersist, did: []const u8, host_id: u64) !UidResult { const uid = try self.uidForDid(did); if (host_id > 0) { const current_host = self.getAccountHostId(uid) catch 0; if (current_host == 0) { return .{ .uid = uid, .is_new = true }; } else if (current_host != host_id) { log.info("account {s} (uid={d}) host mismatch: current={d} new={d}", .{ did, uid, current_host, host_id }); return .{ .uid = uid, .host_changed = true }; } } return .{ .uid = uid }; } /// resolve a DID to a numeric UID. creates a new account row on first encounter. /// matches indigo's Relay.DidToUid → Account.UID mapping. pub fn uidForDid(self: *DiskPersist, did: []const u8) !u64 { // fast path: check in-memory cache if (self.did_cache.get(did)) |uid| return uid; // check database if (try self.db.rowUnsafe( "SELECT uid FROM account WHERE did = $1", .{did}, )) |row| { var r = row; defer r.deinit() catch {}; const uid: u64 = @intCast(r.get(i64, 0)); self.didCachePut(did, uid); return uid; } // create new account row (ignore if already exists from concurrent insert) _ = self.db.exec( "INSERT INTO account (did) VALUES ($1) ON CONFLICT (did) DO NOTHING", .{did}, ) catch |err| { log.warn("failed to create account for {s}: {s}", .{ did, @errorName(err) }); return err; }; // read back the UID (whether we just created it or it already existed) var row = try self.db.rowUnsafe( "SELECT uid FROM account WHERE did = $1", .{did}, ) orelse return error.AccountCreationFailed; defer row.deinit() catch {}; const uid: u64 = @intCast(row.get(i64, 0)); self.didCachePut(did, uid); return uid; } /// insert into did_cache (LRU handles eviction at capacity). fn didCachePut(self: *DiskPersist, did: []const u8, uid: u64) void { self.did_cache.put(did, uid) catch {}; } /// per-DID sync state for chain tracking pub const AccountState = struct { rev: []const u8, data_cid: []const u8, }; /// get stored sync state for a user (from account_repo table) pub fn getAccountState(self: *DiskPersist, uid: u64, allocator: Allocator) !?AccountState { var row = (try self.db.rowUnsafe( "SELECT rev, commit_data_cid FROM account_repo WHERE uid = $1", .{@as(i64, @intCast(uid))}, )) orelse return null; defer row.deinit() catch {}; const rev = row.get([]const u8, 0); const data_cid = row.get([]const u8, 1); if (rev.len == 0 or data_cid.len == 0) return null; return .{ .rev = try allocator.dupe(u8, rev), .data_cid = try allocator.dupe(u8, data_cid), }; } /// update stored sync state after a verified commit (conditional upsert into account_repo). /// uses WHERE clause to ensure rev only moves forward, preventing race conditions /// when concurrent workers process commits for the same DID. /// returns true if the state was updated, false if a newer rev was already stored. pub fn updateAccountState(self: *DiskPersist, uid: u64, rev: []const u8, data_cid: []const u8) !bool { const rows_affected = try self.db.exec( "INSERT INTO account_repo (uid, rev, commit_data_cid) VALUES ($1, $2, $3) ON CONFLICT (uid) DO UPDATE SET rev = EXCLUDED.rev, commit_data_cid = EXCLUDED.commit_data_cid WHERE account_repo.rev < EXCLUDED.rev", .{ @as(i64, @intCast(uid)), rev, data_cid }, ); return (rows_affected orelse 0) > 0; } // --- account status --- /// get the host_id for an account. returns 0 if not set. pub fn getAccountHostId(self: *DiskPersist, uid: u64) !u64 { var row = (try self.db.rowUnsafe( "SELECT host_id FROM account WHERE uid = $1", .{@as(i64, @intCast(uid))}, )) orelse return 0; defer row.deinit() catch {}; const hid = row.get(i64, 0); return if (hid > 0) @intCast(hid) else 0; } /// count accounts on a host (for rate limit scaling, matches Go relay's host.AccountCount) pub fn getHostAccountCount(self: *DiskPersist, host_id: u64) u64 { var row = (self.db.rowUnsafe( "SELECT COUNT(*) FROM account WHERE host_id = $1", .{@as(i64, @intCast(host_id))}, ) catch return 0) orelse return 0; defer row.deinit() catch {}; const count = row.get(i64, 0); return if (count > 0) @intCast(count) else 0; } /// set the host_id for an account (first encounter or migration) pub fn setAccountHostId(self: *DiskPersist, uid: u64, host_id: u64) !void { _ = try self.db.exec( "UPDATE account SET host_id = $2 WHERE uid = $1", .{ @as(i64, @intCast(uid)), @as(i64, @intCast(host_id)) }, ); } /// update the upstream status for an account (from #account events). /// Go relay: account.go UpdateAccountUpstreamStatus pub fn updateAccountUpstreamStatus(self: *DiskPersist, uid: u64, upstream_status: []const u8) !void { _ = try self.db.exec( "UPDATE account SET upstream_status = $2 WHERE uid = $1", .{ @as(i64, @intCast(uid)), upstream_status }, ); } /// check if an account is active (both local status and upstream status). /// Go relay: models.Account.IsActive() pub fn isAccountActive(self: *DiskPersist, uid: u64) !bool { var row = (try self.db.rowUnsafe( "SELECT status, upstream_status FROM account WHERE uid = $1", .{@as(i64, @intCast(uid))}, )) orelse return false; defer row.deinit() catch {}; const status = row.get([]const u8, 0); const upstream = row.get([]const u8, 1); // active if local is active AND upstream is active const local_ok = std.mem.eql(u8, status, "active"); const upstream_ok = std.mem.eql(u8, upstream, "active"); return local_ok and upstream_ok; } // --- host management --- pub const Host = struct { id: u64, hostname: []const u8, status: []const u8, last_seq: u64, failed_attempts: u32, }; /// get or create a host row. returns {id, last_seq}. pub fn getOrCreateHost(self: *DiskPersist, hostname: []const u8) !struct { id: u64, last_seq: u64 } { _ = self.db.exec( "INSERT INTO host (hostname) VALUES ($1) ON CONFLICT (hostname) DO NOTHING", .{hostname}, ) catch |err| { log.warn("failed to create host {s}: {s}", .{ hostname, @errorName(err) }); return err; }; var row = try self.db.rowUnsafe( "SELECT id, last_seq FROM host WHERE hostname = $1", .{hostname}, ) orelse return error.HostCreationFailed; defer row.deinit() catch {}; return .{ .id = @intCast(row.get(i64, 0)), .last_seq = @intCast(row.get(i64, 1)), }; } /// check if a host is banned or blocked by status pub fn isHostBanned(self: *DiskPersist, hostname: []const u8) bool { var row = self.db.rowUnsafe( "SELECT status FROM host WHERE hostname = $1", .{hostname}, ) catch return false; if (row) |*r| { defer r.deinit() catch {}; const status = r.get([]const u8, 0); return std.mem.eql(u8, status, "banned") or std.mem.eql(u8, status, "blocked"); } return false; } /// update cursor position for a host pub fn updateHostSeq(self: *DiskPersist, host_id: u64, seq: u64) !void { _ = try self.db.exec( "UPDATE host SET last_seq = $2, updated_at = now() WHERE id = $1", .{ @as(i64, @intCast(host_id)), @as(i64, @intCast(seq)) }, ); } /// look up host ID by hostname. returns null if not found. pub fn getHostIdForHostname(self: *DiskPersist, hostname: []const u8) !?u64 { var row = (try self.db.rowUnsafe( "SELECT id FROM host WHERE hostname = $1", .{hostname}, )) orelse return null; defer row.deinit() catch {}; return @intCast(row.get(i64, 0)); } /// update host status (active, blocked, exhausted) pub fn updateHostStatus(self: *DiskPersist, host_id: u64, status: []const u8) !void { _ = try self.db.exec( "UPDATE host SET status = $2, updated_at = now() WHERE id = $1", .{ @as(i64, @intCast(host_id)), status }, ); } /// list all active hosts pub fn listActiveHosts(self: *DiskPersist, allocator: Allocator) ![]Host { var hosts: std.ArrayListUnmanaged(Host) = .{}; errdefer { for (hosts.items) |h| { allocator.free(h.hostname); allocator.free(h.status); } hosts.deinit(allocator); } var result = try self.db.query( "SELECT id, hostname, status, last_seq, failed_attempts FROM host WHERE status = 'active' ORDER BY id ASC", .{}, ); defer result.deinit(); while (result.nextUnsafe() catch null) |row| { try hosts.append(allocator, .{ .id = @intCast(row.get(i64, 0)), .hostname = try allocator.dupe(u8, row.get([]const u8, 1)), .status = try allocator.dupe(u8, row.get([]const u8, 2)), .last_seq = @intCast(row.get(i64, 3)), .failed_attempts = @intCast(row.get(i32, 4)), }); } return try hosts.toOwnedSlice(allocator); } /// list all hosts (any status) for admin view pub fn listAllHosts(self: *DiskPersist, allocator: Allocator) ![]Host { var hosts: std.ArrayListUnmanaged(Host) = .{}; errdefer { for (hosts.items) |h| { allocator.free(h.hostname); allocator.free(h.status); } hosts.deinit(allocator); } var result = try self.db.query( "SELECT id, hostname, status, last_seq, failed_attempts FROM host ORDER BY id ASC", .{}, ); defer result.deinit(); while (result.nextUnsafe() catch null) |row| { try hosts.append(allocator, .{ .id = @intCast(row.get(i64, 0)), .hostname = try allocator.dupe(u8, row.get([]const u8, 1)), .status = try allocator.dupe(u8, row.get([]const u8, 2)), .last_seq = @intCast(row.get(i64, 3)), .failed_attempts = @intCast(row.get(i32, 4)), }); } return try hosts.toOwnedSlice(allocator); } /// increment failure count and return new value pub fn incrementHostFailures(self: *DiskPersist, host_id: u64) !u32 { _ = try self.db.exec( "UPDATE host SET failed_attempts = failed_attempts + 1, updated_at = now() WHERE id = $1", .{@as(i64, @intCast(host_id))}, ); var row = try self.db.rowUnsafe( "SELECT failed_attempts FROM host WHERE id = $1", .{@as(i64, @intCast(host_id))}, ) orelse return 0; defer row.deinit() catch {}; return @intCast(row.get(i32, 0)); } /// check if a hostname (or any parent domain) is banned. /// Go relay: domain_ban.go DomainIsBanned — suffix-based check. pub fn isDomainBanned(self: *DiskPersist, hostname: []const u8) bool { // check each suffix: "pds.host.example.com", "host.example.com", "example.com" var offset: usize = 0; while (offset < hostname.len) { const suffix = hostname[offset..]; var row = self.db.rowUnsafe( "SELECT 1 FROM domain_ban WHERE domain = $1", .{suffix}, ) catch return false; if (row) |*r| { r.deinit() catch {}; return true; } // advance past next dot if (std.mem.indexOfScalarPos(u8, hostname, offset, '.')) |dot| { offset = dot + 1; } else break; } return false; } /// reset failure count (on successful connection) pub fn resetHostFailures(self: *DiskPersist, host_id: u64) !void { _ = try self.db.exec( "UPDATE host SET failed_attempts = 0, updated_at = now() WHERE id = $1", .{@as(i64, @intCast(host_id))}, ); } /// persist an event. assigns a sequence number. returns the assigned seq. /// the event is buffered and will be flushed to disk asynchronously. pub fn persist(self: *DiskPersist, kind: EvtKind, uid: u64, payload: []const u8) !u64 { // build the on-disk record: header + payload const total_len = header_size + payload.len; const data = try self.allocator.alloc(u8, total_len); errdefer self.allocator.free(data); // write header (seq filled in later under lock) const header = EvtHeader{ .flags = 0, .kind = @intFromEnum(kind), .len = @intCast(payload.len), .uid = uid, .seq = 0, // placeholder }; header.encode(data[0..header_size]); @memcpy(data[header_size..], payload); self.mutex.lock(); defer self.mutex.unlock(); // assign seq const seq = self.cur_seq; self.cur_seq += 1; std.mem.writeInt(u64, data[20..28], seq, .little); try self.evtbuf.append(self.allocator, .{ .data = data, .seq = seq }); try self.outbuf.appendSlice(self.allocator, data); self.event_counter += 1; // flush if threshold hit if (self.evtbuf.items.len >= default_flush_threshold) { try self.flushLocked(); } return seq; } /// playback events with seq > since. calls cb for each event. pub fn playback(self: *DiskPersist, since: u64, allocator: Allocator, entries: *std.ArrayListUnmanaged(PlaybackEntry)) !void { self.mutex.lock(); defer self.mutex.unlock(); const since_i: i64 = @intCast(since); // find the log file containing `since` var start_files: std.ArrayListUnmanaged(LogFileRef) = .{}; defer start_files.deinit(allocator); if (since > 0) { // find file whose seq_start is just before `since` if (try self.db.rowUnsafe( "SELECT id, path, seq_start FROM log_file_refs WHERE seq_start <= $1 ORDER BY seq_start DESC LIMIT 1", .{since_i}, )) |row| { var r = row; defer r.deinit() catch {}; try start_files.append(allocator, .{ .path = try allocator.dupe(u8, r.get([]const u8, 1)), .seq_start = @intCast(r.get(i64, 2)), }); } } // find all subsequent files { var result = try self.db.query( "SELECT id, path, seq_start FROM log_file_refs WHERE seq_start > $1 ORDER BY seq_start ASC", .{since_i}, ); defer result.deinit(); while (result.nextUnsafe() catch null) |r| { try start_files.append(allocator, .{ .path = try allocator.dupe(u8, r.get([]const u8, 1)), .seq_start = @intCast(r.get(i64, 2)), }); } } defer for (start_files.items) |f| allocator.free(f.path); // read events from each file for (start_files.items) |ref| { var file = self.dir.openFile(ref.path, .{}) catch continue; defer file.close(); try readEventsFrom(allocator, file, since, entries); } } /// last assigned sequence number, or null if empty pub fn lastSeq(self: *DiskPersist) ?u64 { if (self.cur_seq <= 1) return null; return self.cur_seq - 1; } /// garbage-collect log files older than the retention period pub fn gc(self: *DiskPersist) !void { self.mutex.lock(); defer self.mutex.unlock(); const cutoff_interval = try std.fmt.allocPrint(self.allocator, "{d} hours", .{self.retention_hours}); defer self.allocator.free(cutoff_interval); // find expired refs var expired: std.ArrayListUnmanaged(GcRef) = .{}; defer { for (expired.items) |e| self.allocator.free(e.path); expired.deinit(self.allocator); } { var result = try self.db.query( "SELECT id, path FROM log_file_refs WHERE created_at < now() - $1::interval", .{cutoff_interval}, ); defer result.deinit(); while (result.nextUnsafe() catch null) |r| { try expired.append(self.allocator, .{ .id = r.get(i64, 0), .path = try self.allocator.dupe(u8, r.get([]const u8, 1)), }); } } for (expired.items) |ref| { // skip current file if (self.current_file_path) |cur| { if (std.mem.eql(u8, ref.path, cur)) continue; } // delete db record first (prevents playback from finding it) _ = self.db.exec("DELETE FROM log_file_refs WHERE id = $1", .{ref.id}) catch |err| { log.warn("gc: failed to delete db record {d}: {s}", .{ ref.id, @errorName(err) }); continue; }; // delete file self.dir.deleteFile(ref.path) catch |err| { log.warn("gc: failed to delete {s}: {s}", .{ ref.path, @errorName(err) }); }; } if (expired.items.len > 0) { log.info("gc: removed {d} expired log files", .{expired.items.len}); } } /// take down all events for a user (set flag + zero payload) pub fn takeDownUser(self: *DiskPersist, uid: u64) !void { self.mutex.lock(); defer self.mutex.unlock(); // iterate all log files var refs: std.ArrayListUnmanaged([]const u8) = .{}; defer { for (refs.items) |p| self.allocator.free(p); refs.deinit(self.allocator); } { var result = try self.db.query("SELECT path FROM log_file_refs ORDER BY seq_start DESC", .{}); defer result.deinit(); while (result.nextUnsafe() catch null) |r| { try refs.append(self.allocator, try self.allocator.dupe(u8, r.get([]const u8, 0))); } } for (refs.items) |path| { var file = self.dir.openFile(path, .{ .mode = .read_write }) catch continue; defer file.close(); mutateUserEventsInFile(file, uid) catch |err| { log.warn("takedown: failed to process {s}: {s}", .{ path, @errorName(err) }); }; } } // --- internals --- fn resumeLog(self: *DiskPersist) !void { // find most recent log file if (try self.db.rowUnsafe( "SELECT id, path, seq_start FROM log_file_refs ORDER BY seq_start DESC LIMIT 1", .{}, )) |row| { var r = row; defer r.deinit() catch {}; const path = r.get([]const u8, 1); const seq_start: u64 = @intCast(r.get(i64, 2)); var file = self.dir.openFile(path, .{ .mode = .read_write }) catch { // file missing, start fresh try self.initLogFile(); return; }; // scan for last seq const last_seq = scanForLastSeq(file) catch { file.close(); try self.initLogFile(); return; }; if (last_seq) |ls| { self.cur_seq = ls + 1; log.info("recovered seq from disk: last_seq={d}", .{ls}); } else { self.cur_seq = if (seq_start > 0) seq_start else 1; } // seek to end for appending const stat = try file.stat(); try file.seekTo(stat.size); self.current_file = file; self.current_file_path = try self.allocator.dupe(u8, path); } else { try self.initLogFile(); } } fn initLogFile(self: *DiskPersist) !void { try self.createLogFile(self.cur_seq); } fn createLogFile(self: *DiskPersist, start_seq: u64) !void { var name_buf: [64]u8 = undefined; const name = std.fmt.bufPrint(&name_buf, "evts-{d}", .{start_seq}) catch unreachable; if (self.current_file) |f| f.close(); if (self.current_file_path) |p| self.allocator.free(p); self.current_file = try self.dir.createFile(name, .{ .truncate = false }); self.current_file_path = try self.allocator.dupe(u8, name); // register in Postgres _ = try self.db.exec( "INSERT INTO log_file_refs (path, seq_start) VALUES ($1, $2)", .{ name, @as(i64, @intCast(start_seq)) }, ); self.event_counter = 0; } fn flushLocked(self: *DiskPersist) !void { if (self.evtbuf.items.len == 0) return; // write buffered bytes to current file const file = self.current_file orelse return; file.writeAll(self.outbuf.items) catch |err| { log.err("flush: write failed: {s}", .{@errorName(err)}); return err; }; // clear buffers self.outbuf.clearRetainingCapacity(); // free job data for (self.evtbuf.items) |job| { self.allocator.free(job.data); } self.evtbuf.clearRetainingCapacity(); // check if we need to rotate if (self.event_counter >= self.events_per_file) { self.createLogFile(self.cur_seq) catch |err| { log.err("flush: log rotation failed: {s}", .{@errorName(err)}); }; } } fn flushLoop(self: *DiskPersist) void { while (self.alive.load(.acquire)) { // wait for flush interval or signal { self.mutex.lock(); defer self.mutex.unlock(); self.flush_cond.timedWait(&self.mutex, default_flush_interval_ms * std.time.ns_per_ms) catch {}; self.flushLocked() catch {}; } } } }; // --- playback types --- pub const PlaybackEntry = struct { seq: u64, kind: u32, uid: u64, data: []const u8, // payload only (owned by caller) }; const LogFileRef = struct { path: []const u8, seq_start: u64, }; const GcRef = struct { id: i64, path: []const u8, }; // --- file-level operations --- fn readEventsFrom(allocator: Allocator, file: std.fs.File, since: u64, result: *std.ArrayListUnmanaged(PlaybackEntry)) !void { const file_size = (try file.stat()).size; // if since > 0, scan to the right position if (since > 0) { seekToSeq(file, since, file_size) catch return; } // read events while (true) { var hdr_buf: [header_size]u8 = undefined; const n = file.readAll(&hdr_buf) catch break; if (n < header_size) break; const hdr = EvtHeader.decode(&hdr_buf); // skip taken down / rebased events if (hdr.flags & (EvtFlags.takedown | EvtFlags.rebased) != 0) { file.seekBy(@intCast(hdr.len)) catch break; continue; } if (hdr.seq <= since) { file.seekBy(@intCast(hdr.len)) catch break; continue; } // read payload const data = allocator.alloc(u8, hdr.len) catch break; const read_n = file.readAll(data) catch { allocator.free(data); break; }; if (read_n < hdr.len) { allocator.free(data); break; } result.append(allocator, .{ .seq = hdr.seq, .kind = hdr.kind, .uid = hdr.uid, .data = data, }) catch { allocator.free(data); break; }; } } /// scan file headers to seek to the first event with seq > target fn seekToSeq(file: std.fs.File, target: u64, file_size: u64) !void { try file.seekTo(0); var pos: u64 = 0; while (pos + header_size <= file_size) { var hdr_buf: [header_size]u8 = undefined; const n = try file.readAll(&hdr_buf); if (n < header_size) break; const hdr = EvtHeader.decode(&hdr_buf); if (hdr.seq > target) { // seek back to start of this header try file.seekTo(pos); return; } pos += header_size + hdr.len; try file.seekTo(pos); } } /// scan a file for the last sequence number fn scanForLastSeq(file: std.fs.File) !?u64 { try file.seekTo(0); const file_size = (try file.stat()).size; var last_seq: ?u64 = null; var pos: u64 = 0; while (pos + header_size <= file_size) { var hdr_buf: [header_size]u8 = undefined; try file.seekTo(pos); const n = try file.readAll(&hdr_buf); if (n < header_size) break; const hdr = EvtHeader.decode(&hdr_buf); last_seq = hdr.seq; pos += header_size + hdr.len; } return last_seq; } /// set takedown flag + zero payload for all events belonging to uid fn mutateUserEventsInFile(file: std.fs.File, uid: u64) !void { const file_size = (try file.stat()).size; var pos: u64 = 0; while (pos + header_size <= file_size) { var hdr_buf: [header_size]u8 = undefined; try file.seekTo(pos); const n = try file.readAll(&hdr_buf); if (n < header_size) break; const hdr = EvtHeader.decode(&hdr_buf); if (hdr.uid == uid and (hdr.flags & EvtFlags.takedown) == 0) { // set takedown flag const new_flags = hdr.flags | EvtFlags.takedown; var flags_buf: [4]u8 = undefined; std.mem.writeInt(u32, &flags_buf, new_flags, .little); try file.seekTo(pos); try file.writeAll(&flags_buf); // zero the payload const payload_start = pos + header_size; try file.seekTo(payload_start); var zeros: [4096]u8 = [_]u8{0} ** 4096; var remaining: u64 = hdr.len; while (remaining > 0) { const chunk = @min(remaining, zeros.len); try file.writeAll(zeros[0..chunk]); remaining -= chunk; } } pos += header_size + hdr.len; } } // === tests === test "header encode/decode roundtrip" { const hdr = EvtHeader{ .flags = 0, .kind = @intFromEnum(EvtKind.commit), .len = 1234, .uid = 42, .seq = 99999, }; var buf: [header_size]u8 = undefined; hdr.encode(&buf); const decoded = EvtHeader.decode(&buf); try std.testing.expectEqual(hdr.flags, decoded.flags); try std.testing.expectEqual(hdr.kind, decoded.kind); try std.testing.expectEqual(hdr.len, decoded.len); try std.testing.expectEqual(hdr.uid, decoded.uid); try std.testing.expectEqual(hdr.seq, decoded.seq); } test "header is little-endian" { const hdr = EvtHeader{ .flags = 0, .kind = 1, .len = 256, .uid = 0, .seq = 1 }; var buf: [header_size]u8 = undefined; hdr.encode(&buf); // len=256 in LE is 0x00 0x01 0x00 0x00 try std.testing.expectEqual(@as(u8, 0x00), buf[8]); try std.testing.expectEqual(@as(u8, 0x01), buf[9]); } fn requireDatabaseUrl() ![]const u8 { return std.posix.getenv("DATABASE_URL") orelse return error.SkipZigTest; } test "persist and playback" { const database_url = try requireDatabaseUrl(); var tmp = std.testing.tmpDir(.{}); defer tmp.cleanup(); const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); defer std.testing.allocator.free(dir_path); var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); defer dp.deinit(); // persist some events (sync flush, no background thread) const seq1 = try dp.persist(.commit, 1, "payload-one"); const seq2 = try dp.persist(.identity, 2, "payload-two"); const seq3 = try dp.persist(.commit, 1, "payload-three"); try std.testing.expectEqual(@as(u64, 1), seq1); try std.testing.expectEqual(@as(u64, 2), seq2); try std.testing.expectEqual(@as(u64, 3), seq3); // flush manually { dp.mutex.lock(); defer dp.mutex.unlock(); try dp.flushLocked(); } // playback from cursor=0 → all events var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{}; defer { for (entries.items) |e| std.testing.allocator.free(e.data); entries.deinit(std.testing.allocator); } try dp.playback(0, std.testing.allocator, &entries); try std.testing.expectEqual(@as(usize, 3), entries.items.len); try std.testing.expectEqualStrings("payload-one", entries.items[0].data); try std.testing.expectEqual(@as(u64, 1), entries.items[0].seq); try std.testing.expectEqualStrings("payload-two", entries.items[1].data); try std.testing.expectEqualStrings("payload-three", entries.items[2].data); } test "playback with cursor" { const database_url = try requireDatabaseUrl(); var tmp = std.testing.tmpDir(.{}); defer tmp.cleanup(); const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); defer std.testing.allocator.free(dir_path); var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); defer dp.deinit(); _ = try dp.persist(.commit, 1, "a"); _ = try dp.persist(.commit, 2, "b"); _ = try dp.persist(.commit, 3, "c"); { dp.mutex.lock(); defer dp.mutex.unlock(); try dp.flushLocked(); } // playback from cursor=2 → only seq 3 var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{}; defer { for (entries.items) |e| std.testing.allocator.free(e.data); entries.deinit(std.testing.allocator); } try dp.playback(2, std.testing.allocator, &entries); try std.testing.expectEqual(@as(usize, 1), entries.items.len); try std.testing.expectEqual(@as(u64, 3), entries.items[0].seq); try std.testing.expectEqualStrings("c", entries.items[0].data); } test "seq recovery after reinit" { const database_url = try requireDatabaseUrl(); var tmp = std.testing.tmpDir(.{}); defer tmp.cleanup(); const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); defer std.testing.allocator.free(dir_path); // write some events { var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); defer dp.deinit(); _ = try dp.persist(.commit, 1, "x"); _ = try dp.persist(.commit, 2, "y"); _ = try dp.persist(.account, 3, "z"); dp.mutex.lock(); defer dp.mutex.unlock(); try dp.flushLocked(); } // reinit — should recover seq { var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); defer dp.deinit(); try std.testing.expectEqual(@as(u64, 3), dp.lastSeq().?); const seq4 = try dp.persist(.commit, 1, "w"); try std.testing.expectEqual(@as(u64, 4), seq4); } } test "takedown zeros payload" { const database_url = try requireDatabaseUrl(); var tmp = std.testing.tmpDir(.{}); defer tmp.cleanup(); const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); defer std.testing.allocator.free(dir_path); var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); defer dp.deinit(); _ = try dp.persist(.commit, 42, "secret-data"); _ = try dp.persist(.commit, 99, "other-data"); { dp.mutex.lock(); defer dp.mutex.unlock(); try dp.flushLocked(); } // take down user 42 try dp.takeDownUser(42); // playback should skip user 42's events var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{}; defer { for (entries.items) |e| std.testing.allocator.free(e.data); entries.deinit(std.testing.allocator); } try dp.playback(0, std.testing.allocator, &entries); try std.testing.expectEqual(@as(usize, 1), entries.items.len); try std.testing.expectEqualStrings("other-data", entries.items[0].data); } test "uidForDid assigns and caches UIDs" { const database_url = try requireDatabaseUrl(); var tmp = std.testing.tmpDir(.{}); defer tmp.cleanup(); const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); defer std.testing.allocator.free(dir_path); var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); defer dp.deinit(); // first call creates the account const uid1 = try dp.uidForDid("did:plc:alice"); try std.testing.expect(uid1 > 0); // same DID returns same UID (from cache) const uid1_again = try dp.uidForDid("did:plc:alice"); try std.testing.expectEqual(uid1, uid1_again); // different DID gets a different UID const uid2 = try dp.uidForDid("did:plc:bob"); try std.testing.expect(uid2 > 0); try std.testing.expect(uid1 != uid2); } test "uidForDid survives reinit" { const database_url = try requireDatabaseUrl(); var tmp = std.testing.tmpDir(.{}); defer tmp.cleanup(); const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); defer std.testing.allocator.free(dir_path); var uid1: u64 = undefined; { var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); defer dp.deinit(); uid1 = try dp.uidForDid("did:plc:carol"); } // reinit — UID should be the same from database { var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); defer dp.deinit(); const uid1_again = try dp.uidForDid("did:plc:carol"); try std.testing.expectEqual(uid1, uid1_again); } } test "takedown with real UIDs" { const database_url = try requireDatabaseUrl(); var tmp = std.testing.tmpDir(.{}); defer tmp.cleanup(); const dir_path = try tmpDirRealPath(std.testing.allocator, tmp); defer std.testing.allocator.free(dir_path); var dp = try DiskPersist.init(std.testing.allocator, dir_path, database_url); defer dp.deinit(); const alice_uid = try dp.uidForDid("did:plc:alice"); const bob_uid = try dp.uidForDid("did:plc:bob"); _ = try dp.persist(.commit, alice_uid, "alice-post"); _ = try dp.persist(.commit, bob_uid, "bob-post"); _ = try dp.persist(.commit, alice_uid, "alice-post-2"); { dp.mutex.lock(); defer dp.mutex.unlock(); try dp.flushLocked(); } // take down alice try dp.takeDownUser(alice_uid); // playback should only have bob's event var entries: std.ArrayListUnmanaged(PlaybackEntry) = .{}; defer { for (entries.items) |e| std.testing.allocator.free(e.data); entries.deinit(std.testing.allocator); } try dp.playback(0, std.testing.allocator, &entries); try std.testing.expectEqual(@as(usize, 1), entries.items.len); try std.testing.expectEqualStrings("bob-post", entries.items[0].data); try std.testing.expectEqual(bob_uid, entries.items[0].uid); } fn tmpDirRealPath(allocator: Allocator, tmp: std.testing.TmpDir) ![]const u8 { var buf: [std.fs.max_path_bytes]u8 = undefined; const real = try tmp.dir.realpath(".", &buf); return try allocator.dupe(u8, real); }