atproto relay implementation in zig
zlay.waow.tech
1//! relay subscriber — per-host firehose worker
2//!
3//! connects to a single PDS or relay upstream, decodes firehose frames
4//! using the zat SDK's CBOR codec, validates commit frames, and persists
5//! all events to disk with relay-assigned sequence numbers before broadcast.
6//!
7//! managed by the Slurper, which spawns one Subscriber per tracked host.
8
9const std = @import("std");
10const websocket = @import("websocket");
11const zat = @import("zat");
12const broadcaster = @import("broadcaster.zig");
13const validator_mod = @import("validator.zig");
14const event_log_mod = @import("event_log.zig");
15const collection_index_mod = @import("collection_index.zig");
16const frame_worker_mod = @import("frame_worker.zig");
17
18const Allocator = std.mem.Allocator;
19const log = std.log.scoped(.relay);
20
21const max_consecutive_failures = 15;
22const cursor_flush_interval_sec = 4; // flush cursor to DB every N seconds (Go relay: 4s)
23
24// per-host rate limits — backpressure thresholds (matches indigo: 50/s, 2500/hr, 20k/day)
25// blocking instead of dropping means these trigger TCP backpressure on the upstream PDS
26const default_per_second_limit: u64 = 50;
27const default_per_hour_limit: u64 = 2_500;
28const default_per_day_limit: u64 = 20_000;
29
30// trusted hosts get much higher limits (Go relay: 5000/s, 50M/hr, 500M/day)
31const trusted_per_second_limit: u64 = 5_000;
32const trusted_per_hour_limit: u64 = 50_000_000;
33const trusted_per_day_limit: u64 = 500_000_000;
34
35// Go relay: TrustedDomains config — hosts matching these suffixes get trusted limits
36const trusted_suffixes: []const []const u8 = &.{".host.bsky.network"};
37
38pub fn isTrustedHost(hostname: []const u8) bool {
39 for (trusted_suffixes) |suffix| {
40 if (std.mem.endsWith(u8, hostname, suffix)) return true;
41 }
42 return false;
43}
44
45/// compute rate limits scaled by account count (matches Go relay: slurper.go ComputeLimiterCounts)
46pub fn computeLimits(trusted: bool, account_count: u64) struct { sec: u64, hour: u64, day: u64 } {
47 if (trusted) return .{
48 .sec = trusted_per_second_limit,
49 .hour = trusted_per_hour_limit,
50 .day = trusted_per_day_limit,
51 };
52 return .{
53 .sec = default_per_second_limit + account_count / 1000,
54 .hour = default_per_hour_limit + account_count,
55 .day = default_per_day_limit + account_count * 10,
56 };
57}
58
59pub const Options = struct {
60 hostname: []const u8 = "bsky.network",
61 max_message_size: usize = 5 * 1024 * 1024,
62 host_id: u64 = 0,
63 account_count: u64 = 0,
64 ca_bundle: ?std.crypto.Certificate.Bundle = null,
65};
66
67/// simple sliding window rate limiter — tracks event counts per second/hour/day.
68/// Sliding window rate limiter (same algorithm as Go relay's github.com/RussellLuo/slidingwindow).
69/// Uses millisecond timestamps for sub-second precision (critical for the 1-second window).
70/// Effective count = weight * prev_count + curr_count
71/// where weight = (window_size - elapsed) / window_size.
72const RateLimiter = struct {
73 sec: SlidingWindow = .{ .size_ms = 1_000 },
74 hour: SlidingWindow = .{ .size_ms = 3_600_000 },
75 day: SlidingWindow = .{ .size_ms = 86_400_000 },
76
77 sec_limit: std.atomic.Value(u64) = .{ .raw = default_per_second_limit },
78 hour_limit: std.atomic.Value(u64) = .{ .raw = default_per_hour_limit },
79 day_limit: std.atomic.Value(u64) = .{ .raw = default_per_day_limit },
80
81 const SlidingWindow = struct {
82 size_ms: i64,
83 curr_start: i64 = 0,
84 curr_count: u64 = 0,
85 prev_count: u64 = 0,
86
87 fn advance(self: *SlidingWindow, now_ms: i64) void {
88 const new_start = now_ms - @mod(now_ms, self.size_ms);
89 if (new_start <= self.curr_start) return;
90 const diff = @divTrunc(new_start - self.curr_start, self.size_ms);
91 self.prev_count = if (diff == 1) self.curr_count else 0;
92 self.curr_start = new_start;
93 self.curr_count = 0;
94 }
95
96 fn effectiveCount(self: *const SlidingWindow, now_ms: i64) u64 {
97 const elapsed = now_ms - self.curr_start;
98 const remaining = self.size_ms - elapsed;
99 if (remaining <= 0 or self.prev_count == 0) return self.curr_count;
100 const weighted_prev = self.prev_count * @as(u64, @intCast(remaining)) / @as(u64, @intCast(self.size_ms));
101 return weighted_prev + self.curr_count;
102 }
103 };
104
105 const Result = enum { allowed, sec, hour, day };
106
107 fn allow(self: *RateLimiter, now_ms: i64) Result {
108 self.sec.advance(now_ms);
109 self.hour.advance(now_ms);
110 self.day.advance(now_ms);
111
112 if (self.sec.effectiveCount(now_ms) >= self.sec_limit.load(.monotonic)) return .sec;
113 if (self.hour.effectiveCount(now_ms) >= self.hour_limit.load(.monotonic)) return .hour;
114 if (self.day.effectiveCount(now_ms) >= self.day_limit.load(.monotonic)) return .day;
115
116 self.sec.curr_count += 1;
117 self.hour.curr_count += 1;
118 self.day.curr_count += 1;
119 return .allowed;
120 }
121
122 /// Block until all rate limit windows allow the event.
123 /// Checks every 100ms, matching indigo's waitForLimiter behavior.
124 /// Returns which tier (if any) caused a wait, for metrics.
125 fn waitForAllow(self: *RateLimiter, shutdown: *std.atomic.Value(bool)) Result {
126 // fast path: no waiting needed
127 const now_ms = std.time.milliTimestamp();
128 self.sec.advance(now_ms);
129 self.hour.advance(now_ms);
130 self.day.advance(now_ms);
131
132 if (self.sec.effectiveCount(now_ms) < self.sec_limit.load(.monotonic) and
133 self.hour.effectiveCount(now_ms) < self.hour_limit.load(.monotonic) and
134 self.day.effectiveCount(now_ms) < self.day_limit.load(.monotonic))
135 {
136 self.sec.curr_count += 1;
137 self.hour.curr_count += 1;
138 self.day.curr_count += 1;
139 return .allowed;
140 }
141
142 // slow path: poll every 100ms until allowed (creates TCP backpressure)
143 var waited: Result = .sec;
144 while (!shutdown.load(.acquire)) {
145 std.posix.nanosleep(0, 100 * std.time.ns_per_ms);
146
147 const t = std.time.milliTimestamp();
148 self.sec.advance(t);
149 self.hour.advance(t);
150 self.day.advance(t);
151
152 if (self.sec.effectiveCount(t) >= self.sec_limit.load(.monotonic)) {
153 waited = .sec;
154 continue;
155 }
156 if (self.hour.effectiveCount(t) >= self.hour_limit.load(.monotonic)) {
157 waited = .hour;
158 continue;
159 }
160 if (self.day.effectiveCount(t) >= self.day_limit.load(.monotonic)) {
161 waited = .day;
162 continue;
163 }
164
165 self.sec.curr_count += 1;
166 self.hour.curr_count += 1;
167 self.day.curr_count += 1;
168 return waited;
169 }
170 return waited;
171 }
172
173 /// update rate limits from another thread (e.g. admin API).
174 pub fn updateLimits(self: *RateLimiter, sec: u64, hour: u64, day: u64) void {
175 self.sec_limit.store(sec, .monotonic);
176 self.hour_limit.store(hour, .monotonic);
177 self.day_limit.store(day, .monotonic);
178 }
179};
180
181pub const Subscriber = struct {
182 allocator: Allocator,
183 options: Options,
184 bc: *broadcaster.Broadcaster,
185 validator: *validator_mod.Validator,
186 persist: ?*event_log_mod.DiskPersist,
187 collection_index: ?*collection_index_mod.CollectionIndex = null,
188 pool: ?*frame_worker_mod.FramePool = null,
189 shutdown: *std.atomic.Value(bool),
190 last_upstream_seq: ?u64 = null,
191 last_cursor_flush: i64 = 0,
192 rate_limiter: RateLimiter = .{},
193
194 // per-host shutdown (e.g. FutureCursor — stops only this subscriber)
195 host_shutdown: std.atomic.Value(bool) = .{ .raw = false },
196
197 pub fn init(
198 allocator: Allocator,
199 bc: *broadcaster.Broadcaster,
200 val: *validator_mod.Validator,
201 persist: ?*event_log_mod.DiskPersist,
202 shutdown: *std.atomic.Value(bool),
203 options: Options,
204 ) Subscriber {
205 const trusted = isTrustedHost(options.hostname);
206 const limits = computeLimits(trusted, options.account_count);
207 return .{
208 .allocator = allocator,
209 .options = options,
210 .bc = bc,
211 .validator = val,
212 .persist = persist,
213 .shutdown = shutdown,
214 .rate_limiter = .{
215 .sec_limit = .{ .raw = limits.sec },
216 .hour_limit = .{ .raw = limits.hour },
217 .day_limit = .{ .raw = limits.day },
218 },
219 };
220 }
221
222 /// check if this subscriber should stop (global or per-host shutdown)
223 fn shouldStop(self: *Subscriber) bool {
224 return self.shutdown.load(.acquire) or self.host_shutdown.load(.acquire);
225 }
226
227 /// run the subscriber loop. reconnects with exponential backoff.
228 /// blocks until shutdown flag is set or host is exhausted.
229 pub fn run(self: *Subscriber) void {
230 var backoff: u64 = 1;
231 const max_backoff: u64 = 60;
232
233 // load cursor from DB if we have a host_id
234 if (self.options.host_id > 0) {
235 if (self.persist) |dp| {
236 const host_info = dp.getOrCreateHost(self.options.hostname) catch null;
237 if (host_info) |info| {
238 if (info.last_seq > 0) {
239 self.last_upstream_seq = info.last_seq;
240 log.info("host {s}: resuming from cursor {d}", .{ self.options.hostname, info.last_seq });
241 }
242 }
243 }
244 }
245
246 while (!self.shouldStop()) {
247 log.info("host {s}: connecting...", .{self.options.hostname});
248
249 self.connectAndRead() catch |err| {
250 if (self.shouldStop()) return;
251 log.err("host {s}: error: {s}, reconnecting in {d}s...", .{ self.options.hostname, @errorName(err), backoff });
252 };
253
254 if (self.shouldStop()) return;
255
256 // track failures for this host
257 if (self.options.host_id > 0) {
258 if (self.persist) |dp| {
259 const failures = dp.incrementHostFailures(self.options.host_id) catch 0;
260 if (failures >= max_consecutive_failures) {
261 log.warn("host {s}: exhausted after {d} failures, stopping", .{ self.options.hostname, failures });
262 dp.updateHostStatus(self.options.host_id, "exhausted") catch {};
263 return;
264 }
265 }
266 }
267
268 // backoff sleep in small increments so we can check shutdown
269 var remaining: u64 = backoff;
270 while (remaining > 0 and !self.shouldStop()) {
271 const chunk = @min(remaining, 1);
272 std.posix.nanosleep(chunk, 0);
273 remaining -= chunk;
274 }
275 backoff = @min(backoff * 2, max_backoff);
276 }
277 }
278
279 /// flush cursor position to the host table
280 fn flushCursor(self: *Subscriber) void {
281 if (self.options.host_id == 0) return;
282 const seq = self.last_upstream_seq orelse return;
283 if (self.persist) |dp| {
284 dp.updateHostSeq(self.options.host_id, seq) catch |err| {
285 log.debug("host {s}: cursor flush failed: {s}", .{ self.options.hostname, @errorName(err) });
286 };
287 }
288 }
289
290 fn connectAndRead(self: *Subscriber) !void {
291 var path_buf: [256]u8 = undefined;
292 var w: std.Io.Writer = .fixed(&path_buf);
293
294 try w.writeAll("/xrpc/com.atproto.sync.subscribeRepos");
295 if (self.last_upstream_seq) |cursor| {
296 try w.print("?cursor={d}", .{cursor});
297 }
298 const path = w.buffered();
299
300 var client = try websocket.Client.init(self.allocator, .{
301 .host = self.options.hostname,
302 .port = 443,
303 .tls = true,
304 .max_size = self.options.max_message_size,
305 .ca_bundle = self.options.ca_bundle,
306 });
307 defer client.deinit();
308
309 var host_header_buf: [256]u8 = undefined;
310 const host_header = std.fmt.bufPrint(
311 &host_header_buf,
312 "Host: {s}\r\n",
313 .{self.options.hostname},
314 ) catch self.options.hostname;
315
316 try client.handshake(path, .{ .headers = host_header });
317 log.info("host {s}: connected", .{self.options.hostname});
318
319 // reset failures on successful connect
320 if (self.options.host_id > 0) {
321 if (self.persist) |dp| {
322 dp.resetHostFailures(self.options.host_id) catch {};
323 }
324 }
325
326 var handler = FrameHandler{
327 .subscriber = self,
328 };
329 try client.readLoop(&handler);
330 }
331};
332
333const FrameHandler = struct {
334 subscriber: *Subscriber,
335
336 pub fn serverMessage(self: *FrameHandler, data: []const u8) !void {
337 const sub = self.subscriber;
338
339 // lightweight header decode for cursor tracking + routing
340 var arena = std.heap.ArenaAllocator.init(sub.allocator);
341 defer arena.deinit();
342 const alloc = arena.allocator();
343
344 const header_result = zat.cbor.decode(alloc, data) catch |err| {
345 log.debug("frame header decode failed: {s} (len={d})", .{ @errorName(err), data.len });
346 _ = sub.bc.stats.decode_errors.fetchAdd(1, .monotonic);
347 return;
348 };
349 const header = header_result.value;
350 const payload_data = data[header_result.consumed..];
351
352 // check op field (1 = message, -1 = error)
353 const op = header.getInt("op") orelse return;
354 if (op == -1) {
355 // error frame from upstream — check for FutureCursor
356 // Go relay: slurper.go — sets host to idle and disconnects
357 if (zat.cbor.decodeAll(alloc, payload_data) catch null) |err_payload| {
358 const err_name = err_payload.getString("error") orelse "unknown";
359 const err_msg = err_payload.getString("message") orelse "";
360 log.warn("host {s}: error frame: {s}: {s}", .{ sub.options.hostname, err_name, err_msg });
361 if (std.mem.eql(u8, err_name, "FutureCursor")) {
362 // our cursor is ahead of the PDS — set host to idle, stop this subscriber only
363 if (sub.persist) |dp| {
364 if (sub.options.host_id > 0) {
365 dp.updateHostStatus(sub.options.host_id, "idle") catch {};
366 }
367 }
368 sub.host_shutdown.store(true, .release);
369 }
370 }
371 return;
372 }
373
374 const frame_type = header.getString("t") orelse return;
375 const payload = zat.cbor.decodeAll(alloc, payload_data) catch |err| {
376 log.debug("frame payload decode failed: {s} (type={s})", .{ @errorName(err), frame_type });
377 _ = sub.bc.stats.decode_errors.fetchAdd(1, .monotonic);
378 return;
379 };
380
381 // count every successfully decoded event (matches Go relay's events_received_counter)
382 _ = sub.bc.stats.frames_in.fetchAdd(1, .monotonic);
383
384 // extract seq for cursor tracking (deferred until after pool accepts)
385 const upstream_seq = payload.getUint("seq");
386
387 // time-based cursor flush (Go relay: every 4 seconds)
388 {
389 const now = std.time.timestamp();
390 if (now - sub.last_cursor_flush >= cursor_flush_interval_sec) {
391 sub.flushCursor();
392 sub.last_cursor_flush = now;
393 }
394 }
395
396 // per-host rate limiting — block until window opens (matches indigo's waitForLimiter)
397 // blocking here stalls the websocket reader → TCP backpressure → PDS slows down
398 switch (sub.rate_limiter.waitForAllow(sub.shutdown)) {
399 .allowed => {},
400 .sec => {
401 _ = sub.bc.stats.rate_limited.fetchAdd(1, .monotonic);
402 _ = sub.bc.stats.rate_limited_sec.fetchAdd(1, .monotonic);
403 },
404 .hour => {
405 _ = sub.bc.stats.rate_limited.fetchAdd(1, .monotonic);
406 _ = sub.bc.stats.rate_limited_hour.fetchAdd(1, .monotonic);
407 },
408 .day => {
409 _ = sub.bc.stats.rate_limited.fetchAdd(1, .monotonic);
410 _ = sub.bc.stats.rate_limited_day.fetchAdd(1, .monotonic);
411 },
412 }
413
414 // filter unknown frame types before submitting to pool (forward-compat)
415 const is_commit = std.mem.eql(u8, frame_type, "#commit");
416 const is_sync = std.mem.eql(u8, frame_type, "#sync");
417 const is_account = std.mem.eql(u8, frame_type, "#account");
418 const is_identity = std.mem.eql(u8, frame_type, "#identity");
419
420 if (!is_commit and !is_sync and !is_account and !is_identity) {
421 // advance cursor for intentionally skipped frames —
422 // we won't process these on reconnect either
423 if (upstream_seq) |s| sub.last_upstream_seq = s;
424 return;
425 }
426
427 // submit to frame pool for heavy processing (CBOR re-decode, validation, persist, broadcast)
428 // route by DID so commits for the same account serialize (prevents chain break races)
429 // blocks when queue is full — TCP backpressure propagates to upstream PDS (matches indigo)
430 if (sub.pool) |pool| {
431 const did_key: u64 = blk: {
432 const d = payload.getString("repo") orelse payload.getString("did");
433 break :blk if (d) |s| std.hash.Wyhash.hash(0, s) else sub.options.host_id;
434 };
435 const duped = sub.allocator.dupe(u8, data) catch return;
436 const t0 = std.time.nanoTimestamp();
437 if (pool.submit(did_key, .{
438 .data = duped,
439 .host_id = sub.options.host_id,
440 .hostname = sub.options.hostname,
441 .allocator = sub.allocator,
442 .bc = sub.bc,
443 .validator = sub.validator,
444 .persist = sub.persist,
445 .collection_index = sub.collection_index,
446 }, sub.shutdown)) {
447 // pool accepted — advance cursor past this frame
448 if (upstream_seq) |s| sub.last_upstream_seq = s;
449 if (std.time.nanoTimestamp() - t0 > 1_000_000) { // >1ms = had to wait
450 _ = sub.bc.stats.pool_backpressure.fetchAdd(1, .monotonic);
451 }
452 } else {
453 // shutdown requested — don't advance cursor so reconnect replays this frame
454 sub.allocator.free(duped);
455 }
456 return;
457 }
458
459 // fallback: no pool, process inline (original path for tests / standalone use)
460 self.processInline(sub, alloc, data, payload, upstream_seq, frame_type, is_commit, is_sync, is_account, is_identity);
461 }
462
463 /// inline processing path — used when no frame pool is configured (tests, standalone).
464 /// this is the original serverMessage heavy processing logic.
465 fn processInline(
466 _: *FrameHandler,
467 sub: *Subscriber,
468 alloc: Allocator,
469 data: []const u8,
470 payload: zat.cbor.Value,
471 upstream_seq: ?u64,
472 _: []const u8,
473 is_commit: bool,
474 is_sync: bool,
475 is_account: bool,
476 is_identity: bool,
477 ) void {
478 // extract DID: "repo" for commits, "did" for identity/account
479 const did: ?[]const u8 = if (is_commit)
480 payload.getString("repo")
481 else
482 payload.getString("did");
483
484 // on #identity event, evict cached signing key so next commit re-resolves
485 if (is_identity) {
486 if (did) |d| sub.validator.evictKey(d);
487 }
488
489 // resolve DID → numeric UID for event header (host-aware)
490 const uid: u64 = if (sub.persist) |dp| blk: {
491 if (did) |d| {
492 const result = dp.uidForDidFromHost(d, sub.options.host_id) catch break :blk @as(u64, 0);
493
494 // host authority enforcement (mirrors frame_worker path)
495 if ((result.is_new or result.host_changed) and !is_identity) {
496 switch (sub.validator.resolveHostAuthority(d, sub.options.host_id)) {
497 .migrate => {
498 dp.setAccountHostId(result.uid, sub.options.host_id) catch {};
499 if (result.host_changed) {
500 log.info("host {s}: account migrated uid={d} did={s}", .{
501 sub.options.hostname, result.uid, d,
502 });
503 }
504 },
505 .reject => {
506 log.info("host {s}: dropping event, host authority failed uid={d} did={s}", .{
507 sub.options.hostname, result.uid, d,
508 });
509 _ = sub.bc.stats.failed.fetchAdd(1, .monotonic);
510 _ = sub.bc.stats.failed_host_authority.fetchAdd(1, .monotonic);
511 return;
512 },
513 .accept => {},
514 }
515 }
516
517 break :blk result.uid;
518 } else break :blk @as(u64, 0);
519 } else 0;
520
521 // process #account events: update upstream status
522 // Go relay: ingest.go processAccountEvent
523 if (is_account) {
524 if (sub.persist) |dp| {
525 if (uid > 0) {
526 const is_active = payload.getBool("active") orelse false;
527 const status_str = payload.getString("status");
528 const new_status: []const u8 = if (is_active)
529 "active"
530 else
531 (status_str orelse "inactive");
532 dp.updateAccountUpstreamStatus(uid, new_status) catch |err| {
533 log.debug("upstream status update failed: {s}", .{@errorName(err)});
534 };
535
536 // on account tombstone/deletion, remove all collection index entries
537 if (std.mem.eql(u8, new_status, "deleted") or std.mem.eql(u8, new_status, "takendown")) {
538 if (sub.collection_index) |ci| {
539 if (did) |d| {
540 ci.removeAll(d) catch |err| {
541 log.debug("collection removeAll failed: {s}", .{@errorName(err)});
542 };
543 }
544 }
545 }
546 }
547 }
548 }
549
550 // for commits and syncs: check account is active, validate, extract state
551 var commit_data_cid: ?[]const u8 = null;
552 var commit_rev: ?[]const u8 = null;
553 if (is_commit or is_sync) {
554 // drop for inactive accounts
555 if (sub.persist) |dp| {
556 if (uid > 0) {
557 const active = dp.isAccountActive(uid) catch true;
558 if (!active) {
559 _ = sub.bc.stats.skipped.fetchAdd(1, .monotonic);
560 return;
561 }
562 }
563 }
564
565 // rev checks: stale, future, chain continuity
566 if (is_commit and uid > 0) {
567 if (payload.getString("rev")) |incoming_rev| {
568 // future-rev rejection
569 if (zat.Tid.parse(incoming_rev)) |tid| {
570 const rev_us: i64 = @intCast(tid.timestamp());
571 const now_us = std.time.microTimestamp();
572 const skew_us: i64 = sub.validator.config.rev_clock_skew * 1_000_000;
573 if (rev_us > now_us + skew_us) {
574 log.info("host {s}: dropping future rev uid={d} rev={s}", .{
575 sub.options.hostname, uid, incoming_rev,
576 });
577 _ = sub.bc.stats.failed.fetchAdd(1, .monotonic);
578 _ = sub.bc.stats.failed_future_rev.fetchAdd(1, .monotonic);
579 return;
580 }
581 }
582
583 if (sub.persist) |dp| {
584 if (dp.getAccountState(uid, alloc) catch null) |prev| {
585 // stale rev check
586 if (std.mem.order(u8, incoming_rev, prev.rev) != .gt) {
587 log.debug("host {s}: dropping stale commit uid={d} rev={s} <= {s}", .{
588 sub.options.hostname, uid, incoming_rev, prev.rev,
589 });
590 _ = sub.bc.stats.skipped.fetchAdd(1, .monotonic);
591 return;
592 }
593
594 // chain continuity: since should match stored rev
595 if (payload.getString("since")) |since| {
596 if (!std.mem.eql(u8, since, prev.rev)) {
597 log.info("host {s}: chain break uid={d} since={s} stored_rev={s}", .{
598 sub.options.hostname, uid, since, prev.rev,
599 });
600 _ = sub.bc.stats.chain_breaks.fetchAdd(1, .monotonic);
601 }
602 }
603
604 // chain continuity: prevData CID should match stored data_cid
605 if (payload.get("prevData")) |pd| {
606 if (pd == .cid) {
607 const prev_data_encoded = zat.multibase.encode(alloc, .base32lower, pd.cid.raw) catch "";
608 if (prev_data_encoded.len > 0 and prev.data_cid.len > 0 and
609 !std.mem.eql(u8, prev_data_encoded, prev.data_cid))
610 {
611 log.info("host {s}: chain break uid={d} prevData mismatch", .{
612 sub.options.hostname, uid,
613 });
614 _ = sub.bc.stats.chain_breaks.fetchAdd(1, .monotonic);
615 }
616 }
617 }
618 }
619 }
620 }
621 }
622
623 if (is_commit) {
624 const result = sub.validator.validateCommit(payload);
625 if (!result.valid) return;
626 commit_data_cid = result.data_cid;
627 commit_rev = result.commit_rev;
628
629 // track collections from commit ops (phase 1 live indexing)
630 if (sub.collection_index) |ci| {
631 if (did) |d| {
632 if (payload.get("ops")) |ops| {
633 ci.trackCommitOps(d, ops);
634 }
635 }
636 }
637 } else {
638 // #sync: signature verification only, no ops/MST
639 const result = sub.validator.validateSync(payload);
640 if (!result.valid) return;
641 commit_data_cid = result.data_cid;
642 commit_rev = result.commit_rev;
643 }
644 }
645
646 // determine event kind for persistence
647 const kind: event_log_mod.EvtKind = if (is_commit)
648 .commit
649 else if (is_sync)
650 .sync
651 else if (is_account)
652 .account
653 else // is_identity (unknown types already filtered above)
654 .identity;
655
656 // persist and get relay-assigned seq, broadcast raw bytes.
657 // ordering mutex ensures frames are broadcast in seq order —
658 // without it, concurrent subscriber threads can interleave
659 // persist (seq assignment) and broadcast, delivering out-of-order.
660 if (sub.persist) |dp| {
661 const relay_seq = blk: {
662 sub.bc.broadcast_order.lock();
663 defer sub.bc.broadcast_order.unlock();
664
665 const seq = dp.persist(kind, uid, data) catch |err| {
666 log.warn("persist failed: {s}", .{@errorName(err)});
667 return;
668 };
669 sub.bc.stats.relay_seq.store(seq, .release);
670 const broadcast_data = broadcaster.resequenceFrame(alloc, data, seq) orelse data;
671 sub.bc.broadcast(seq, broadcast_data);
672 break :blk seq;
673 };
674 _ = relay_seq;
675
676 // update per-DID state outside the ordering lock (Postgres round-trip)
677 if ((is_commit or is_sync) and uid > 0) {
678 if (commit_rev) |rev| {
679 const cid_str: []const u8 = if (commit_data_cid) |cid_raw|
680 zat.multibase.encode(alloc, .base32lower, cid_raw) catch ""
681 else
682 "";
683 if (dp.updateAccountState(uid, rev, cid_str)) |updated| {
684 if (!updated) {
685 log.debug("host {s}: stale state update lost race uid={d} rev={s}", .{
686 sub.options.hostname, uid, rev,
687 });
688 }
689 } else |err| {
690 log.debug("account state update failed: {s}", .{@errorName(err)});
691 }
692 }
693 }
694 } else {
695 sub.bc.broadcast(upstream_seq orelse 0, data);
696 }
697 }
698
699 pub fn close(self: *FrameHandler) void {
700 log.info("host {s}: connection closed", .{self.subscriber.options.hostname});
701 // flush cursor on disconnect
702 self.subscriber.flushCursor();
703 }
704};
705
706// --- tests ---
707
708test "decode frame via SDK and extract fields" {
709 const cbor = zat.cbor;
710
711 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
712 defer arena.deinit();
713 const alloc = arena.allocator();
714
715 // build a commit frame using SDK encoder
716 const header: cbor.Value = .{ .map = &.{
717 .{ .key = "op", .value = .{ .unsigned = 1 } },
718 .{ .key = "t", .value = .{ .text = "#commit" } },
719 } };
720 const payload: cbor.Value = .{ .map = &.{
721 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
722 .{ .key = "seq", .value = .{ .unsigned = 12345 } },
723 .{ .key = "rev", .value = .{ .text = "3k2abc000000" } },
724 .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } },
725 } };
726
727 const header_bytes = try cbor.encodeAlloc(alloc, header);
728 const payload_bytes = try cbor.encodeAlloc(alloc, payload);
729
730 var frame = try alloc.alloc(u8, header_bytes.len + payload_bytes.len);
731 @memcpy(frame[0..header_bytes.len], header_bytes);
732 @memcpy(frame[header_bytes.len..], payload_bytes);
733
734 // decode using SDK (same path as FrameHandler.serverMessage)
735 const h_result = try cbor.decode(alloc, frame);
736 const h = h_result.value;
737 const p_data = frame[h_result.consumed..];
738 const p = try cbor.decodeAll(alloc, p_data);
739
740 try std.testing.expectEqualStrings("#commit", h.getString("t").?);
741 try std.testing.expectEqual(@as(i64, 1), h.getInt("op").?);
742 try std.testing.expectEqual(@as(i64, 12345), p.getInt("seq").?);
743 try std.testing.expectEqualStrings("did:plc:test123", p.getString("repo").?);
744}
745
746test "decode identity frame via SDK" {
747 const cbor = zat.cbor;
748
749 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
750 defer arena.deinit();
751 const alloc = arena.allocator();
752
753 const header: cbor.Value = .{ .map = &.{
754 .{ .key = "op", .value = .{ .unsigned = 1 } },
755 .{ .key = "t", .value = .{ .text = "#identity" } },
756 } };
757 const payload: cbor.Value = .{ .map = &.{
758 .{ .key = "did", .value = .{ .text = "did:plc:alice" } },
759 .{ .key = "seq", .value = .{ .unsigned = 99 } },
760 .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } },
761 } };
762
763 const header_bytes = try cbor.encodeAlloc(alloc, header);
764 const payload_bytes = try cbor.encodeAlloc(alloc, payload);
765
766 var frame = try alloc.alloc(u8, header_bytes.len + payload_bytes.len);
767 @memcpy(frame[0..header_bytes.len], header_bytes);
768 @memcpy(frame[header_bytes.len..], payload_bytes);
769
770 const h_result = try cbor.decode(alloc, frame);
771 const h = h_result.value;
772 const p_data = frame[h_result.consumed..];
773 const p = try cbor.decodeAll(alloc, p_data);
774
775 try std.testing.expectEqualStrings("#identity", h.getString("t").?);
776 try std.testing.expectEqualStrings("did:plc:alice", p.getString("did").?);
777 try std.testing.expectEqual(@as(i64, 99), p.getInt("seq").?);
778}
779
780test "rate limiter enforces per-second limit" {
781 var rl: RateLimiter = .{ .sec_limit = .{ .raw = 3 }, .hour_limit = .{ .raw = 1000 }, .day_limit = .{ .raw = 10000 } };
782
783 const now: i64 = 1_000_000_000;
784 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now));
785 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now));
786 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now));
787 try std.testing.expectEqual(RateLimiter.Result.sec, rl.allow(now));
788
789 // at start of next second, prev carries over fully → still blocked
790 try std.testing.expectEqual(RateLimiter.Result.sec, rl.allow(now + 1000));
791
792 // two seconds later prev is forgotten
793 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now + 2000));
794}
795
796test "rate limiter enforces per-hour limit" {
797 var rl: RateLimiter = .{ .sec_limit = .{ .raw = 1000 }, .hour_limit = .{ .raw = 5 }, .day_limit = .{ .raw = 10000 } };
798
799 const now: i64 = 3_600_000 * 100;
800 for (0..5) |_| {
801 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now));
802 }
803 try std.testing.expectEqual(RateLimiter.Result.hour, rl.allow(now + 100));
804
805 // next hour, prev carries over → still blocked
806 try std.testing.expectEqual(RateLimiter.Result.hour, rl.allow(now + 3_600_000));
807
808 // two hours later prev is forgotten
809 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now + 7_200_000));
810}
811
812test "sliding window interpolates previous count by elapsed time" {
813 var rl: RateLimiter = .{ .sec_limit = .{ .raw = 10 }, .hour_limit = .{ .raw = 1_000_000 }, .day_limit = .{ .raw = 1_000_000 } };
814
815 const base: i64 = 1_000_000_000;
816 for (0..8) |_| {
817 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base));
818 }
819
820 // next second: prev=8, curr=0. effective=8, room for 2
821 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1000));
822 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1000));
823 try std.testing.expectEqual(RateLimiter.Result.sec, rl.allow(base + 1000));
824
825 // halfway through: prev weight = 500/1000 = 0.5, so weighted_prev = 4. curr=2. eff=6, room for 4
826 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1500));
827 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1500));
828 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1500));
829 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1500));
830 try std.testing.expectEqual(RateLimiter.Result.sec, rl.allow(base + 1500));
831}
832
833test "waitForAllow blocks then allows after window advances" {
834 // verify that waitForAllow returns a non-.allowed result when the limit was hit,
835 // indicating it had to wait. We use a tiny limit so the fast path is exhausted.
836 var rl: RateLimiter = .{ .sec_limit = .{ .raw = 1 }, .hour_limit = .{ .raw = 1000 }, .day_limit = .{ .raw = 10000 } };
837 var shutdown = std.atomic.Value(bool){ .raw = false };
838
839 // first call takes the fast path
840 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.waitForAllow(&shutdown));
841
842 // second call must block (sec limit = 1), then return .sec after the window advances
843 // this will sleep ~100ms+ until the sliding window allows it
844 const before = std.time.milliTimestamp();
845 const result = rl.waitForAllow(&shutdown);
846 const elapsed = std.time.milliTimestamp() - before;
847
848 try std.testing.expectEqual(RateLimiter.Result.sec, result);
849 try std.testing.expect(elapsed >= 100); // must have slept at least one 100ms poll
850}
851
852test "waitForAllow respects shutdown" {
853 var rl: RateLimiter = .{ .sec_limit = .{ .raw = 1 }, .hour_limit = .{ .raw = 1000 }, .day_limit = .{ .raw = 10000 } };
854 var shutdown = std.atomic.Value(bool){ .raw = false };
855
856 // exhaust the limit
857 _ = rl.waitForAllow(&shutdown);
858
859 // set shutdown before the next call
860 shutdown.store(true, .release);
861
862 // should return immediately without blocking
863 const before = std.time.milliTimestamp();
864 _ = rl.waitForAllow(&shutdown);
865 const elapsed = std.time.milliTimestamp() - before;
866
867 try std.testing.expect(elapsed < 50); // should not have slept
868}
869
870test "trusted host detection" {
871 try std.testing.expect(isTrustedHost("pds-123.host.bsky.network"));
872 try std.testing.expect(isTrustedHost("abc.host.bsky.network"));
873 try std.testing.expect(!isTrustedHost("bsky.network"));
874 try std.testing.expect(!isTrustedHost("evil.bsky.network"));
875 try std.testing.expect(!isTrustedHost("pds.example.com"));
876}
877
878test "error frame (op=-1) is detected" {
879 const cbor = zat.cbor;
880
881 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
882 defer arena.deinit();
883 const alloc = arena.allocator();
884
885 const header: cbor.Value = .{ .map = &.{
886 .{ .key = "op", .value = .{ .negative = -1 } },
887 .{ .key = "t", .value = .{ .text = "#info" } },
888 } };
889
890 const header_bytes = try cbor.encodeAlloc(alloc, header);
891 const h_result = try cbor.decode(alloc, header_bytes);
892 const h = h_result.value;
893
894 try std.testing.expectEqual(@as(i64, -1), h.getInt("op").?);
895}
896
897// --- spec conformance tests ---
898
899test "spec: unknown frame type (op=1, t=#unknown) is ignored" {
900 // event stream spec: unknown t values must be ignored for forward-compat
901 const cbor = zat.cbor;
902
903 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
904 defer arena.deinit();
905 const alloc = arena.allocator();
906
907 const header: cbor.Value = .{ .map = &.{
908 .{ .key = "op", .value = .{ .unsigned = 1 } },
909 .{ .key = "t", .value = .{ .text = "#unknown" } },
910 } };
911 const payload: cbor.Value = .{ .map = &.{
912 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
913 .{ .key = "seq", .value = .{ .unsigned = 1 } },
914 } };
915
916 const header_bytes = try cbor.encodeAlloc(alloc, header);
917 const payload_bytes = try cbor.encodeAlloc(alloc, payload);
918
919 // decode header — verify it's a valid message with unknown type
920 const h_result = try cbor.decode(alloc, header_bytes);
921 const h = h_result.value;
922
923 try std.testing.expectEqual(@as(i64, 1), h.getInt("op").?);
924 const frame_type = h.getString("t").?;
925 try std.testing.expectEqualStrings("#unknown", frame_type);
926
927 // verify unknown type is NOT one of the known types (this is the filter logic)
928 const is_commit = std.mem.eql(u8, frame_type, "#commit");
929 const is_sync = std.mem.eql(u8, frame_type, "#sync");
930 const is_account = std.mem.eql(u8, frame_type, "#account");
931 const is_identity = std.mem.eql(u8, frame_type, "#identity");
932 try std.testing.expect(!is_commit and !is_sync and !is_account and !is_identity);
933
934 // verify payload still decodes (frame is valid, just ignored)
935 const p = try cbor.decodeAll(alloc, payload_bytes);
936 try std.testing.expectEqualStrings("did:plc:test123", p.getString("did").?);
937}
938
939test "spec: error frame (op=-1) is handled, not persisted" {
940 // event stream spec: op=-1 frames are error notifications from upstream
941 const cbor = zat.cbor;
942
943 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
944 defer arena.deinit();
945 const alloc = arena.allocator();
946
947 const header: cbor.Value = .{ .map = &.{
948 .{ .key = "op", .value = .{ .negative = -1 } },
949 .{ .key = "t", .value = .{ .text = "#error" } },
950 } };
951 const err_payload: cbor.Value = .{ .map = &.{
952 .{ .key = "error", .value = .{ .text = "FutureCursor" } },
953 .{ .key = "message", .value = .{ .text = "cursor is ahead of server" } },
954 } };
955
956 const header_bytes = try cbor.encodeAlloc(alloc, header);
957 const h_result = try cbor.decode(alloc, header_bytes);
958 const h = h_result.value;
959
960 // verify op=-1 is detected
961 const op = h.getInt("op").?;
962 try std.testing.expectEqual(@as(i64, -1), op);
963
964 // verify error payload decodes correctly
965 const payload_bytes = try cbor.encodeAlloc(alloc, err_payload);
966 const p = try cbor.decodeAll(alloc, payload_bytes);
967 try std.testing.expectEqualStrings("FutureCursor", p.getString("error").?);
968 try std.testing.expectEqualStrings("cursor is ahead of server", p.getString("message").?);
969}