//! relay subscriber — per-host firehose worker //! //! connects to a single PDS or relay upstream, decodes firehose frames //! using the zat SDK's CBOR codec, validates commit frames, and persists //! all events to disk with relay-assigned sequence numbers before broadcast. //! //! managed by the Slurper, which spawns one Subscriber per tracked host. const std = @import("std"); const websocket = @import("websocket"); const zat = @import("zat"); const broadcaster = @import("broadcaster.zig"); const validator_mod = @import("validator.zig"); const event_log_mod = @import("event_log.zig"); const collection_index_mod = @import("collection_index.zig"); const frame_worker_mod = @import("frame_worker.zig"); const Allocator = std.mem.Allocator; const log = std.log.scoped(.relay); const max_consecutive_failures = 15; const cursor_flush_interval_sec = 4; // flush cursor to DB every N seconds (Go relay: 4s) // per-host rate limits — backpressure thresholds (matches indigo: 50/s, 2500/hr, 20k/day) // blocking instead of dropping means these trigger TCP backpressure on the upstream PDS const default_per_second_limit: u64 = 50; const default_per_hour_limit: u64 = 2_500; const default_per_day_limit: u64 = 20_000; // trusted hosts get much higher limits (Go relay: 5000/s, 50M/hr, 500M/day) const trusted_per_second_limit: u64 = 5_000; const trusted_per_hour_limit: u64 = 50_000_000; const trusted_per_day_limit: u64 = 500_000_000; // Go relay: TrustedDomains config — hosts matching these suffixes get trusted limits const trusted_suffixes: []const []const u8 = &.{".host.bsky.network"}; fn isTrustedHost(hostname: []const u8) bool { for (trusted_suffixes) |suffix| { if (std.mem.endsWith(u8, hostname, suffix)) return true; } return false; } /// compute rate limits scaled by account count (matches Go relay: slurper.go ComputeLimiterCounts) fn computeLimits(trusted: bool, account_count: u64) struct { sec: u64, hour: u64, day: u64 } { if (trusted) return .{ .sec = trusted_per_second_limit, .hour = trusted_per_hour_limit, .day = trusted_per_day_limit, }; return .{ .sec = default_per_second_limit + account_count / 1000, .hour = default_per_hour_limit + account_count, .day = default_per_day_limit + account_count * 10, }; } pub const Options = struct { hostname: []const u8 = "bsky.network", max_message_size: usize = 5 * 1024 * 1024, host_id: u64 = 0, account_count: u64 = 0, ca_bundle: ?std.crypto.Certificate.Bundle = null, }; /// simple sliding window rate limiter — tracks event counts per second/hour/day. /// Sliding window rate limiter (same algorithm as Go relay's github.com/RussellLuo/slidingwindow). /// Uses millisecond timestamps for sub-second precision (critical for the 1-second window). /// Effective count = weight * prev_count + curr_count /// where weight = (window_size - elapsed) / window_size. const RateLimiter = struct { sec: SlidingWindow = .{ .size_ms = 1_000 }, hour: SlidingWindow = .{ .size_ms = 3_600_000 }, day: SlidingWindow = .{ .size_ms = 86_400_000 }, sec_limit: u64 = default_per_second_limit, hour_limit: u64 = default_per_hour_limit, day_limit: u64 = default_per_day_limit, const SlidingWindow = struct { size_ms: i64, curr_start: i64 = 0, curr_count: u64 = 0, prev_count: u64 = 0, fn advance(self: *SlidingWindow, now_ms: i64) void { const new_start = now_ms - @mod(now_ms, self.size_ms); if (new_start <= self.curr_start) return; const diff = @divTrunc(new_start - self.curr_start, self.size_ms); self.prev_count = if (diff == 1) self.curr_count else 0; self.curr_start = new_start; self.curr_count = 0; } fn effectiveCount(self: *const SlidingWindow, now_ms: i64) u64 { const elapsed = now_ms - self.curr_start; const remaining = self.size_ms - elapsed; if (remaining <= 0 or self.prev_count == 0) return self.curr_count; const weighted_prev = self.prev_count * @as(u64, @intCast(remaining)) / @as(u64, @intCast(self.size_ms)); return weighted_prev + self.curr_count; } }; const Result = enum { allowed, sec, hour, day }; fn allow(self: *RateLimiter, now_ms: i64) Result { self.sec.advance(now_ms); self.hour.advance(now_ms); self.day.advance(now_ms); if (self.sec.effectiveCount(now_ms) >= self.sec_limit) return .sec; if (self.hour.effectiveCount(now_ms) >= self.hour_limit) return .hour; if (self.day.effectiveCount(now_ms) >= self.day_limit) return .day; self.sec.curr_count += 1; self.hour.curr_count += 1; self.day.curr_count += 1; return .allowed; } /// Block until all rate limit windows allow the event. /// Checks every 100ms, matching indigo's waitForLimiter behavior. /// Returns which tier (if any) caused a wait, for metrics. fn waitForAllow(self: *RateLimiter, shutdown: *std.atomic.Value(bool)) Result { // fast path: no waiting needed const now_ms = std.time.milliTimestamp(); self.sec.advance(now_ms); self.hour.advance(now_ms); self.day.advance(now_ms); if (self.sec.effectiveCount(now_ms) < self.sec_limit and self.hour.effectiveCount(now_ms) < self.hour_limit and self.day.effectiveCount(now_ms) < self.day_limit) { self.sec.curr_count += 1; self.hour.curr_count += 1; self.day.curr_count += 1; return .allowed; } // slow path: poll every 100ms until allowed (creates TCP backpressure) var waited: Result = .sec; while (!shutdown.load(.acquire)) { std.posix.nanosleep(0, 100 * std.time.ns_per_ms); const t = std.time.milliTimestamp(); self.sec.advance(t); self.hour.advance(t); self.day.advance(t); if (self.sec.effectiveCount(t) >= self.sec_limit) { waited = .sec; continue; } if (self.hour.effectiveCount(t) >= self.hour_limit) { waited = .hour; continue; } if (self.day.effectiveCount(t) >= self.day_limit) { waited = .day; continue; } self.sec.curr_count += 1; self.hour.curr_count += 1; self.day.curr_count += 1; return waited; } return waited; } }; pub const Subscriber = struct { allocator: Allocator, options: Options, bc: *broadcaster.Broadcaster, validator: *validator_mod.Validator, persist: ?*event_log_mod.DiskPersist, collection_index: ?*collection_index_mod.CollectionIndex = null, pool: ?*frame_worker_mod.FramePool = null, shutdown: *std.atomic.Value(bool), last_upstream_seq: ?u64 = null, last_cursor_flush: i64 = 0, rate_limiter: RateLimiter = .{}, // per-host shutdown (e.g. FutureCursor — stops only this subscriber) host_shutdown: std.atomic.Value(bool) = .{ .raw = false }, pub fn init( allocator: Allocator, bc: *broadcaster.Broadcaster, val: *validator_mod.Validator, persist: ?*event_log_mod.DiskPersist, shutdown: *std.atomic.Value(bool), options: Options, ) Subscriber { const trusted = isTrustedHost(options.hostname); const limits = computeLimits(trusted, options.account_count); return .{ .allocator = allocator, .options = options, .bc = bc, .validator = val, .persist = persist, .shutdown = shutdown, .rate_limiter = .{ .sec_limit = limits.sec, .hour_limit = limits.hour, .day_limit = limits.day, }, }; } /// check if this subscriber should stop (global or per-host shutdown) fn shouldStop(self: *Subscriber) bool { return self.shutdown.load(.acquire) or self.host_shutdown.load(.acquire); } /// run the subscriber loop. reconnects with exponential backoff. /// blocks until shutdown flag is set or host is exhausted. pub fn run(self: *Subscriber) void { var backoff: u64 = 1; const max_backoff: u64 = 60; // load cursor from DB if we have a host_id if (self.options.host_id > 0) { if (self.persist) |dp| { const host_info = dp.getOrCreateHost(self.options.hostname) catch null; if (host_info) |info| { if (info.last_seq > 0) { self.last_upstream_seq = info.last_seq; log.info("host {s}: resuming from cursor {d}", .{ self.options.hostname, info.last_seq }); } } } } while (!self.shouldStop()) { log.info("host {s}: connecting...", .{self.options.hostname}); self.connectAndRead() catch |err| { if (self.shouldStop()) return; log.err("host {s}: error: {s}, reconnecting in {d}s...", .{ self.options.hostname, @errorName(err), backoff }); }; if (self.shouldStop()) return; // track failures for this host if (self.options.host_id > 0) { if (self.persist) |dp| { const failures = dp.incrementHostFailures(self.options.host_id) catch 0; if (failures >= max_consecutive_failures) { log.warn("host {s}: exhausted after {d} failures, stopping", .{ self.options.hostname, failures }); dp.updateHostStatus(self.options.host_id, "exhausted") catch {}; return; } } } // backoff sleep in small increments so we can check shutdown var remaining: u64 = backoff; while (remaining > 0 and !self.shouldStop()) { const chunk = @min(remaining, 1); std.posix.nanosleep(chunk, 0); remaining -= chunk; } backoff = @min(backoff * 2, max_backoff); } } /// flush cursor position to the host table fn flushCursor(self: *Subscriber) void { if (self.options.host_id == 0) return; const seq = self.last_upstream_seq orelse return; if (self.persist) |dp| { dp.updateHostSeq(self.options.host_id, seq) catch |err| { log.debug("host {s}: cursor flush failed: {s}", .{ self.options.hostname, @errorName(err) }); }; } } fn connectAndRead(self: *Subscriber) !void { var path_buf: [256]u8 = undefined; var w: std.Io.Writer = .fixed(&path_buf); try w.writeAll("/xrpc/com.atproto.sync.subscribeRepos"); if (self.last_upstream_seq) |cursor| { try w.print("?cursor={d}", .{cursor}); } const path = w.buffered(); var client = try websocket.Client.init(self.allocator, .{ .host = self.options.hostname, .port = 443, .tls = true, .max_size = self.options.max_message_size, .ca_bundle = self.options.ca_bundle, }); defer client.deinit(); var host_header_buf: [256]u8 = undefined; const host_header = std.fmt.bufPrint( &host_header_buf, "Host: {s}\r\n", .{self.options.hostname}, ) catch self.options.hostname; try client.handshake(path, .{ .headers = host_header }); log.info("host {s}: connected", .{self.options.hostname}); // reset failures on successful connect if (self.options.host_id > 0) { if (self.persist) |dp| { dp.resetHostFailures(self.options.host_id) catch {}; } } var handler = FrameHandler{ .subscriber = self, }; try client.readLoop(&handler); } }; const FrameHandler = struct { subscriber: *Subscriber, pub fn serverMessage(self: *FrameHandler, data: []const u8) !void { const sub = self.subscriber; // lightweight header decode for cursor tracking + routing var arena = std.heap.ArenaAllocator.init(sub.allocator); defer arena.deinit(); const alloc = arena.allocator(); const header_result = zat.cbor.decode(alloc, data) catch |err| { log.debug("frame header decode failed: {s} (len={d})", .{ @errorName(err), data.len }); _ = sub.bc.stats.decode_errors.fetchAdd(1, .monotonic); return; }; const header = header_result.value; const payload_data = data[header_result.consumed..]; // check op field (1 = message, -1 = error) const op = header.getInt("op") orelse return; if (op == -1) { // error frame from upstream — check for FutureCursor // Go relay: slurper.go — sets host to idle and disconnects if (zat.cbor.decodeAll(alloc, payload_data) catch null) |err_payload| { const err_name = err_payload.getString("error") orelse "unknown"; const err_msg = err_payload.getString("message") orelse ""; log.warn("host {s}: error frame: {s}: {s}", .{ sub.options.hostname, err_name, err_msg }); if (std.mem.eql(u8, err_name, "FutureCursor")) { // our cursor is ahead of the PDS — set host to idle, stop this subscriber only if (sub.persist) |dp| { if (sub.options.host_id > 0) { dp.updateHostStatus(sub.options.host_id, "idle") catch {}; } } sub.host_shutdown.store(true, .release); } } return; } const frame_type = header.getString("t") orelse return; const payload = zat.cbor.decodeAll(alloc, payload_data) catch |err| { log.debug("frame payload decode failed: {s} (type={s})", .{ @errorName(err), frame_type }); _ = sub.bc.stats.decode_errors.fetchAdd(1, .monotonic); return; }; // count every successfully decoded event (matches Go relay's events_received_counter) _ = sub.bc.stats.frames_in.fetchAdd(1, .monotonic); // extract seq for cursor tracking (deferred until after pool accepts) const upstream_seq = payload.getUint("seq"); // time-based cursor flush (Go relay: every 4 seconds) { const now = std.time.timestamp(); if (now - sub.last_cursor_flush >= cursor_flush_interval_sec) { sub.flushCursor(); sub.last_cursor_flush = now; } } // per-host rate limiting — block until window opens (matches indigo's waitForLimiter) // blocking here stalls the websocket reader → TCP backpressure → PDS slows down switch (sub.rate_limiter.waitForAllow(sub.shutdown)) { .allowed => {}, .sec => { _ = sub.bc.stats.rate_limited.fetchAdd(1, .monotonic); _ = sub.bc.stats.rate_limited_sec.fetchAdd(1, .monotonic); }, .hour => { _ = sub.bc.stats.rate_limited.fetchAdd(1, .monotonic); _ = sub.bc.stats.rate_limited_hour.fetchAdd(1, .monotonic); }, .day => { _ = sub.bc.stats.rate_limited.fetchAdd(1, .monotonic); _ = sub.bc.stats.rate_limited_day.fetchAdd(1, .monotonic); }, } // filter unknown frame types before submitting to pool (forward-compat) const is_commit = std.mem.eql(u8, frame_type, "#commit"); const is_sync = std.mem.eql(u8, frame_type, "#sync"); const is_account = std.mem.eql(u8, frame_type, "#account"); const is_identity = std.mem.eql(u8, frame_type, "#identity"); if (!is_commit and !is_sync and !is_account and !is_identity) { // advance cursor for intentionally skipped frames — // we won't process these on reconnect either if (upstream_seq) |s| sub.last_upstream_seq = s; return; } // submit to frame pool for heavy processing (CBOR re-decode, validation, persist, broadcast) // route by DID so commits for the same account serialize (prevents chain break races) // blocks when queue is full — TCP backpressure propagates to upstream PDS (matches indigo) if (sub.pool) |pool| { const did_key: u64 = blk: { const d = payload.getString("repo") orelse payload.getString("did"); break :blk if (d) |s| std.hash.Wyhash.hash(0, s) else sub.options.host_id; }; const duped = sub.allocator.dupe(u8, data) catch return; const t0 = std.time.nanoTimestamp(); if (pool.submit(did_key, .{ .data = duped, .host_id = sub.options.host_id, .hostname = sub.options.hostname, .allocator = sub.allocator, .bc = sub.bc, .validator = sub.validator, .persist = sub.persist, .collection_index = sub.collection_index, }, sub.shutdown)) { // pool accepted — advance cursor past this frame if (upstream_seq) |s| sub.last_upstream_seq = s; if (std.time.nanoTimestamp() - t0 > 1_000_000) { // >1ms = had to wait _ = sub.bc.stats.pool_backpressure.fetchAdd(1, .monotonic); } } else { // shutdown requested — don't advance cursor so reconnect replays this frame sub.allocator.free(duped); } return; } // fallback: no pool, process inline (original path for tests / standalone use) self.processInline(sub, alloc, data, payload, upstream_seq, frame_type, is_commit, is_sync, is_account, is_identity); } /// inline processing path — used when no frame pool is configured (tests, standalone). /// this is the original serverMessage heavy processing logic. fn processInline( _: *FrameHandler, sub: *Subscriber, alloc: Allocator, data: []const u8, payload: zat.cbor.Value, upstream_seq: ?u64, _: []const u8, is_commit: bool, is_sync: bool, is_account: bool, is_identity: bool, ) void { // extract DID: "repo" for commits, "did" for identity/account const did: ?[]const u8 = if (is_commit) payload.getString("repo") else payload.getString("did"); // on #identity event, evict cached signing key so next commit re-resolves if (is_identity) { if (did) |d| sub.validator.evictKey(d); } // resolve DID → numeric UID for event header (host-aware) const uid: u64 = if (sub.persist) |dp| blk: { if (did) |d| { const result = dp.uidForDidFromHost(d, sub.options.host_id) catch break :blk @as(u64, 0); // host authority enforcement (mirrors frame_worker path) if ((result.is_new or result.host_changed) and !is_identity) { switch (sub.validator.resolveHostAuthority(d, sub.options.host_id)) { .migrate => { dp.setAccountHostId(result.uid, sub.options.host_id) catch {}; if (result.host_changed) { log.info("host {s}: account migrated uid={d} did={s}", .{ sub.options.hostname, result.uid, d, }); } }, .reject => { log.info("host {s}: dropping event, host authority failed uid={d} did={s}", .{ sub.options.hostname, result.uid, d, }); _ = sub.bc.stats.failed.fetchAdd(1, .monotonic); return; }, .accept => {}, } } break :blk result.uid; } else break :blk @as(u64, 0); } else 0; // process #account events: update upstream status // Go relay: ingest.go processAccountEvent if (is_account) { if (sub.persist) |dp| { if (uid > 0) { const is_active = payload.getBool("active") orelse false; const status_str = payload.getString("status"); const new_status: []const u8 = if (is_active) "active" else (status_str orelse "inactive"); dp.updateAccountUpstreamStatus(uid, new_status) catch |err| { log.debug("upstream status update failed: {s}", .{@errorName(err)}); }; // on account tombstone/deletion, remove all collection index entries if (std.mem.eql(u8, new_status, "deleted") or std.mem.eql(u8, new_status, "takendown")) { if (sub.collection_index) |ci| { if (did) |d| { ci.removeAll(d) catch |err| { log.debug("collection removeAll failed: {s}", .{@errorName(err)}); }; } } } } } } // for commits and syncs: check account is active, validate, extract state var commit_data_cid: ?[]const u8 = null; var commit_rev: ?[]const u8 = null; if (is_commit or is_sync) { // drop for inactive accounts if (sub.persist) |dp| { if (uid > 0) { const active = dp.isAccountActive(uid) catch true; if (!active) { _ = sub.bc.stats.skipped.fetchAdd(1, .monotonic); return; } } } // rev checks: stale, future, chain continuity if (is_commit and uid > 0) { if (payload.getString("rev")) |incoming_rev| { // future-rev rejection if (zat.Tid.parse(incoming_rev)) |tid| { const rev_us: i64 = @intCast(tid.timestamp()); const now_us = std.time.microTimestamp(); const skew_us: i64 = sub.validator.config.rev_clock_skew * 1_000_000; if (rev_us > now_us + skew_us) { log.info("host {s}: dropping future rev uid={d} rev={s}", .{ sub.options.hostname, uid, incoming_rev, }); _ = sub.bc.stats.failed.fetchAdd(1, .monotonic); return; } } if (sub.persist) |dp| { if (dp.getAccountState(uid, alloc) catch null) |prev| { // stale rev check if (std.mem.order(u8, incoming_rev, prev.rev) != .gt) { log.debug("host {s}: dropping stale commit uid={d} rev={s} <= {s}", .{ sub.options.hostname, uid, incoming_rev, prev.rev, }); _ = sub.bc.stats.skipped.fetchAdd(1, .monotonic); return; } // chain continuity: since should match stored rev if (payload.getString("since")) |since| { if (!std.mem.eql(u8, since, prev.rev)) { log.info("host {s}: chain break uid={d} since={s} stored_rev={s}", .{ sub.options.hostname, uid, since, prev.rev, }); _ = sub.bc.stats.chain_breaks.fetchAdd(1, .monotonic); } } // chain continuity: prevData CID should match stored data_cid if (payload.get("prevData")) |pd| { if (pd == .cid) { const prev_data_encoded = zat.multibase.encode(alloc, .base32lower, pd.cid.raw) catch ""; if (prev_data_encoded.len > 0 and prev.data_cid.len > 0 and !std.mem.eql(u8, prev_data_encoded, prev.data_cid)) { log.info("host {s}: chain break uid={d} prevData mismatch", .{ sub.options.hostname, uid, }); _ = sub.bc.stats.chain_breaks.fetchAdd(1, .monotonic); } } } } } } } if (is_commit) { const result = sub.validator.validateCommit(payload); if (!result.valid) return; commit_data_cid = result.data_cid; commit_rev = result.commit_rev; // track collections from commit ops (phase 1 live indexing) if (sub.collection_index) |ci| { if (did) |d| { if (payload.get("ops")) |ops| { ci.trackCommitOps(d, ops); } } } } else { // #sync: signature verification only, no ops/MST const result = sub.validator.validateSync(payload); if (!result.valid) return; commit_data_cid = result.data_cid; commit_rev = result.commit_rev; } } // determine event kind for persistence const kind: event_log_mod.EvtKind = if (is_commit) .commit else if (is_sync) .sync else if (is_account) .account else // is_identity (unknown types already filtered above) .identity; // persist and get relay-assigned seq, broadcast raw bytes. // ordering mutex ensures frames are broadcast in seq order — // without it, concurrent subscriber threads can interleave // persist (seq assignment) and broadcast, delivering out-of-order. if (sub.persist) |dp| { const relay_seq = blk: { sub.bc.broadcast_order.lock(); defer sub.bc.broadcast_order.unlock(); const seq = dp.persist(kind, uid, data) catch |err| { log.warn("persist failed: {s}", .{@errorName(err)}); return; }; sub.bc.stats.relay_seq.store(seq, .release); const broadcast_data = broadcaster.resequenceFrame(alloc, data, seq) orelse data; sub.bc.broadcast(seq, broadcast_data); break :blk seq; }; _ = relay_seq; // update per-DID state outside the ordering lock (Postgres round-trip) if ((is_commit or is_sync) and uid > 0) { if (commit_rev) |rev| { const cid_str: []const u8 = if (commit_data_cid) |cid_raw| zat.multibase.encode(alloc, .base32lower, cid_raw) catch "" else ""; if (dp.updateAccountState(uid, rev, cid_str)) |updated| { if (!updated) { log.debug("host {s}: stale state update lost race uid={d} rev={s}", .{ sub.options.hostname, uid, rev, }); } } else |err| { log.debug("account state update failed: {s}", .{@errorName(err)}); } } } } else { sub.bc.broadcast(upstream_seq orelse 0, data); } } pub fn close(self: *FrameHandler) void { log.info("host {s}: connection closed", .{self.subscriber.options.hostname}); // flush cursor on disconnect self.subscriber.flushCursor(); } }; // --- tests --- test "decode frame via SDK and extract fields" { const cbor = zat.cbor; var arena = std.heap.ArenaAllocator.init(std.testing.allocator); defer arena.deinit(); const alloc = arena.allocator(); // build a commit frame using SDK encoder const header: cbor.Value = .{ .map = &.{ .{ .key = "op", .value = .{ .unsigned = 1 } }, .{ .key = "t", .value = .{ .text = "#commit" } }, } }; const payload: cbor.Value = .{ .map = &.{ .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, .{ .key = "seq", .value = .{ .unsigned = 12345 } }, .{ .key = "rev", .value = .{ .text = "3k2abc000000" } }, .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } }, } }; const header_bytes = try cbor.encodeAlloc(alloc, header); const payload_bytes = try cbor.encodeAlloc(alloc, payload); var frame = try alloc.alloc(u8, header_bytes.len + payload_bytes.len); @memcpy(frame[0..header_bytes.len], header_bytes); @memcpy(frame[header_bytes.len..], payload_bytes); // decode using SDK (same path as FrameHandler.serverMessage) const h_result = try cbor.decode(alloc, frame); const h = h_result.value; const p_data = frame[h_result.consumed..]; const p = try cbor.decodeAll(alloc, p_data); try std.testing.expectEqualStrings("#commit", h.getString("t").?); try std.testing.expectEqual(@as(i64, 1), h.getInt("op").?); try std.testing.expectEqual(@as(i64, 12345), p.getInt("seq").?); try std.testing.expectEqualStrings("did:plc:test123", p.getString("repo").?); } test "decode identity frame via SDK" { const cbor = zat.cbor; var arena = std.heap.ArenaAllocator.init(std.testing.allocator); defer arena.deinit(); const alloc = arena.allocator(); const header: cbor.Value = .{ .map = &.{ .{ .key = "op", .value = .{ .unsigned = 1 } }, .{ .key = "t", .value = .{ .text = "#identity" } }, } }; const payload: cbor.Value = .{ .map = &.{ .{ .key = "did", .value = .{ .text = "did:plc:alice" } }, .{ .key = "seq", .value = .{ .unsigned = 99 } }, .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } }, } }; const header_bytes = try cbor.encodeAlloc(alloc, header); const payload_bytes = try cbor.encodeAlloc(alloc, payload); var frame = try alloc.alloc(u8, header_bytes.len + payload_bytes.len); @memcpy(frame[0..header_bytes.len], header_bytes); @memcpy(frame[header_bytes.len..], payload_bytes); const h_result = try cbor.decode(alloc, frame); const h = h_result.value; const p_data = frame[h_result.consumed..]; const p = try cbor.decodeAll(alloc, p_data); try std.testing.expectEqualStrings("#identity", h.getString("t").?); try std.testing.expectEqualStrings("did:plc:alice", p.getString("did").?); try std.testing.expectEqual(@as(i64, 99), p.getInt("seq").?); } test "rate limiter enforces per-second limit" { var rl: RateLimiter = .{ .sec_limit = 3, .hour_limit = 1000, .day_limit = 10000 }; const now: i64 = 1_000_000_000; try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now)); try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now)); try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now)); try std.testing.expectEqual(RateLimiter.Result.sec, rl.allow(now)); // at start of next second, prev carries over fully → still blocked try std.testing.expectEqual(RateLimiter.Result.sec, rl.allow(now + 1000)); // two seconds later prev is forgotten try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now + 2000)); } test "rate limiter enforces per-hour limit" { var rl: RateLimiter = .{ .sec_limit = 1000, .hour_limit = 5, .day_limit = 10000 }; const now: i64 = 3_600_000 * 100; for (0..5) |_| { try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now)); } try std.testing.expectEqual(RateLimiter.Result.hour, rl.allow(now + 100)); // next hour, prev carries over → still blocked try std.testing.expectEqual(RateLimiter.Result.hour, rl.allow(now + 3_600_000)); // two hours later prev is forgotten try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now + 7_200_000)); } test "sliding window interpolates previous count by elapsed time" { var rl: RateLimiter = .{ .sec_limit = 10, .hour_limit = 1_000_000, .day_limit = 1_000_000 }; const base: i64 = 1_000_000_000; for (0..8) |_| { try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base)); } // next second: prev=8, curr=0. effective=8, room for 2 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1000)); try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1000)); try std.testing.expectEqual(RateLimiter.Result.sec, rl.allow(base + 1000)); // halfway through: prev weight = 500/1000 = 0.5, so weighted_prev = 4. curr=2. eff=6, room for 4 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1500)); try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1500)); try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1500)); try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1500)); try std.testing.expectEqual(RateLimiter.Result.sec, rl.allow(base + 1500)); } test "waitForAllow blocks then allows after window advances" { // verify that waitForAllow returns a non-.allowed result when the limit was hit, // indicating it had to wait. We use a tiny limit so the fast path is exhausted. var rl: RateLimiter = .{ .sec_limit = 1, .hour_limit = 1000, .day_limit = 10000 }; var shutdown = std.atomic.Value(bool){ .raw = false }; // first call takes the fast path try std.testing.expectEqual(RateLimiter.Result.allowed, rl.waitForAllow(&shutdown)); // second call must block (sec limit = 1), then return .sec after the window advances // this will sleep ~100ms+ until the sliding window allows it const before = std.time.milliTimestamp(); const result = rl.waitForAllow(&shutdown); const elapsed = std.time.milliTimestamp() - before; try std.testing.expectEqual(RateLimiter.Result.sec, result); try std.testing.expect(elapsed >= 100); // must have slept at least one 100ms poll } test "waitForAllow respects shutdown" { var rl: RateLimiter = .{ .sec_limit = 1, .hour_limit = 1000, .day_limit = 10000 }; var shutdown = std.atomic.Value(bool){ .raw = false }; // exhaust the limit _ = rl.waitForAllow(&shutdown); // set shutdown before the next call shutdown.store(true, .release); // should return immediately without blocking const before = std.time.milliTimestamp(); _ = rl.waitForAllow(&shutdown); const elapsed = std.time.milliTimestamp() - before; try std.testing.expect(elapsed < 50); // should not have slept } test "trusted host detection" { try std.testing.expect(isTrustedHost("pds-123.host.bsky.network")); try std.testing.expect(isTrustedHost("abc.host.bsky.network")); try std.testing.expect(!isTrustedHost("bsky.network")); try std.testing.expect(!isTrustedHost("evil.bsky.network")); try std.testing.expect(!isTrustedHost("pds.example.com")); } test "error frame (op=-1) is detected" { const cbor = zat.cbor; var arena = std.heap.ArenaAllocator.init(std.testing.allocator); defer arena.deinit(); const alloc = arena.allocator(); const header: cbor.Value = .{ .map = &.{ .{ .key = "op", .value = .{ .negative = -1 } }, .{ .key = "t", .value = .{ .text = "#info" } }, } }; const header_bytes = try cbor.encodeAlloc(alloc, header); const h_result = try cbor.decode(alloc, header_bytes); const h = h_result.value; try std.testing.expectEqual(@as(i64, -1), h.getInt("op").?); } // --- spec conformance tests --- test "spec: unknown frame type (op=1, t=#unknown) is ignored" { // event stream spec: unknown t values must be ignored for forward-compat const cbor = zat.cbor; var arena = std.heap.ArenaAllocator.init(std.testing.allocator); defer arena.deinit(); const alloc = arena.allocator(); const header: cbor.Value = .{ .map = &.{ .{ .key = "op", .value = .{ .unsigned = 1 } }, .{ .key = "t", .value = .{ .text = "#unknown" } }, } }; const payload: cbor.Value = .{ .map = &.{ .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, .{ .key = "seq", .value = .{ .unsigned = 1 } }, } }; const header_bytes = try cbor.encodeAlloc(alloc, header); const payload_bytes = try cbor.encodeAlloc(alloc, payload); // decode header — verify it's a valid message with unknown type const h_result = try cbor.decode(alloc, header_bytes); const h = h_result.value; try std.testing.expectEqual(@as(i64, 1), h.getInt("op").?); const frame_type = h.getString("t").?; try std.testing.expectEqualStrings("#unknown", frame_type); // verify unknown type is NOT one of the known types (this is the filter logic) const is_commit = std.mem.eql(u8, frame_type, "#commit"); const is_sync = std.mem.eql(u8, frame_type, "#sync"); const is_account = std.mem.eql(u8, frame_type, "#account"); const is_identity = std.mem.eql(u8, frame_type, "#identity"); try std.testing.expect(!is_commit and !is_sync and !is_account and !is_identity); // verify payload still decodes (frame is valid, just ignored) const p = try cbor.decodeAll(alloc, payload_bytes); try std.testing.expectEqualStrings("did:plc:test123", p.getString("did").?); } test "spec: error frame (op=-1) is handled, not persisted" { // event stream spec: op=-1 frames are error notifications from upstream const cbor = zat.cbor; var arena = std.heap.ArenaAllocator.init(std.testing.allocator); defer arena.deinit(); const alloc = arena.allocator(); const header: cbor.Value = .{ .map = &.{ .{ .key = "op", .value = .{ .negative = -1 } }, .{ .key = "t", .value = .{ .text = "#error" } }, } }; const err_payload: cbor.Value = .{ .map = &.{ .{ .key = "error", .value = .{ .text = "FutureCursor" } }, .{ .key = "message", .value = .{ .text = "cursor is ahead of server" } }, } }; const header_bytes = try cbor.encodeAlloc(alloc, header); const h_result = try cbor.decode(alloc, header_bytes); const h = h_result.value; // verify op=-1 is detected const op = h.getInt("op").?; try std.testing.expectEqual(@as(i64, -1), op); // verify error payload decodes correctly const payload_bytes = try cbor.encodeAlloc(alloc, err_payload); const p = try cbor.decodeAll(alloc, payload_bytes); try std.testing.expectEqualStrings("FutureCursor", p.getString("error").?); try std.testing.expectEqualStrings("cursor is ahead of server", p.getString("message").?); }