atproto relay implementation in zig zlay.waow.tech
at main 969 lines 41 kB view raw
1//! relay subscriber — per-host firehose worker 2//! 3//! connects to a single PDS or relay upstream, decodes firehose frames 4//! using the zat SDK's CBOR codec, validates commit frames, and persists 5//! all events to disk with relay-assigned sequence numbers before broadcast. 6//! 7//! managed by the Slurper, which spawns one Subscriber per tracked host. 8 9const std = @import("std"); 10const websocket = @import("websocket"); 11const zat = @import("zat"); 12const broadcaster = @import("broadcaster.zig"); 13const validator_mod = @import("validator.zig"); 14const event_log_mod = @import("event_log.zig"); 15const collection_index_mod = @import("collection_index.zig"); 16const frame_worker_mod = @import("frame_worker.zig"); 17 18const Allocator = std.mem.Allocator; 19const log = std.log.scoped(.relay); 20 21const max_consecutive_failures = 15; 22const cursor_flush_interval_sec = 4; // flush cursor to DB every N seconds (Go relay: 4s) 23 24// per-host rate limits — backpressure thresholds (matches indigo: 50/s, 2500/hr, 20k/day) 25// blocking instead of dropping means these trigger TCP backpressure on the upstream PDS 26const default_per_second_limit: u64 = 50; 27const default_per_hour_limit: u64 = 2_500; 28const default_per_day_limit: u64 = 20_000; 29 30// trusted hosts get much higher limits (Go relay: 5000/s, 50M/hr, 500M/day) 31const trusted_per_second_limit: u64 = 5_000; 32const trusted_per_hour_limit: u64 = 50_000_000; 33const trusted_per_day_limit: u64 = 500_000_000; 34 35// Go relay: TrustedDomains config — hosts matching these suffixes get trusted limits 36const trusted_suffixes: []const []const u8 = &.{".host.bsky.network"}; 37 38pub fn isTrustedHost(hostname: []const u8) bool { 39 for (trusted_suffixes) |suffix| { 40 if (std.mem.endsWith(u8, hostname, suffix)) return true; 41 } 42 return false; 43} 44 45/// compute rate limits scaled by account count (matches Go relay: slurper.go ComputeLimiterCounts) 46pub fn computeLimits(trusted: bool, account_count: u64) struct { sec: u64, hour: u64, day: u64 } { 47 if (trusted) return .{ 48 .sec = trusted_per_second_limit, 49 .hour = trusted_per_hour_limit, 50 .day = trusted_per_day_limit, 51 }; 52 return .{ 53 .sec = default_per_second_limit + account_count / 1000, 54 .hour = default_per_hour_limit + account_count, 55 .day = default_per_day_limit + account_count * 10, 56 }; 57} 58 59pub const Options = struct { 60 hostname: []const u8 = "bsky.network", 61 max_message_size: usize = 5 * 1024 * 1024, 62 host_id: u64 = 0, 63 account_count: u64 = 0, 64 ca_bundle: ?std.crypto.Certificate.Bundle = null, 65}; 66 67/// simple sliding window rate limiter — tracks event counts per second/hour/day. 68/// Sliding window rate limiter (same algorithm as Go relay's github.com/RussellLuo/slidingwindow). 69/// Uses millisecond timestamps for sub-second precision (critical for the 1-second window). 70/// Effective count = weight * prev_count + curr_count 71/// where weight = (window_size - elapsed) / window_size. 72const RateLimiter = struct { 73 sec: SlidingWindow = .{ .size_ms = 1_000 }, 74 hour: SlidingWindow = .{ .size_ms = 3_600_000 }, 75 day: SlidingWindow = .{ .size_ms = 86_400_000 }, 76 77 sec_limit: std.atomic.Value(u64) = .{ .raw = default_per_second_limit }, 78 hour_limit: std.atomic.Value(u64) = .{ .raw = default_per_hour_limit }, 79 day_limit: std.atomic.Value(u64) = .{ .raw = default_per_day_limit }, 80 81 const SlidingWindow = struct { 82 size_ms: i64, 83 curr_start: i64 = 0, 84 curr_count: u64 = 0, 85 prev_count: u64 = 0, 86 87 fn advance(self: *SlidingWindow, now_ms: i64) void { 88 const new_start = now_ms - @mod(now_ms, self.size_ms); 89 if (new_start <= self.curr_start) return; 90 const diff = @divTrunc(new_start - self.curr_start, self.size_ms); 91 self.prev_count = if (diff == 1) self.curr_count else 0; 92 self.curr_start = new_start; 93 self.curr_count = 0; 94 } 95 96 fn effectiveCount(self: *const SlidingWindow, now_ms: i64) u64 { 97 const elapsed = now_ms - self.curr_start; 98 const remaining = self.size_ms - elapsed; 99 if (remaining <= 0 or self.prev_count == 0) return self.curr_count; 100 const weighted_prev = self.prev_count * @as(u64, @intCast(remaining)) / @as(u64, @intCast(self.size_ms)); 101 return weighted_prev + self.curr_count; 102 } 103 }; 104 105 const Result = enum { allowed, sec, hour, day }; 106 107 fn allow(self: *RateLimiter, now_ms: i64) Result { 108 self.sec.advance(now_ms); 109 self.hour.advance(now_ms); 110 self.day.advance(now_ms); 111 112 if (self.sec.effectiveCount(now_ms) >= self.sec_limit.load(.monotonic)) return .sec; 113 if (self.hour.effectiveCount(now_ms) >= self.hour_limit.load(.monotonic)) return .hour; 114 if (self.day.effectiveCount(now_ms) >= self.day_limit.load(.monotonic)) return .day; 115 116 self.sec.curr_count += 1; 117 self.hour.curr_count += 1; 118 self.day.curr_count += 1; 119 return .allowed; 120 } 121 122 /// Block until all rate limit windows allow the event. 123 /// Checks every 100ms, matching indigo's waitForLimiter behavior. 124 /// Returns which tier (if any) caused a wait, for metrics. 125 fn waitForAllow(self: *RateLimiter, shutdown: *std.atomic.Value(bool)) Result { 126 // fast path: no waiting needed 127 const now_ms = std.time.milliTimestamp(); 128 self.sec.advance(now_ms); 129 self.hour.advance(now_ms); 130 self.day.advance(now_ms); 131 132 if (self.sec.effectiveCount(now_ms) < self.sec_limit.load(.monotonic) and 133 self.hour.effectiveCount(now_ms) < self.hour_limit.load(.monotonic) and 134 self.day.effectiveCount(now_ms) < self.day_limit.load(.monotonic)) 135 { 136 self.sec.curr_count += 1; 137 self.hour.curr_count += 1; 138 self.day.curr_count += 1; 139 return .allowed; 140 } 141 142 // slow path: poll every 100ms until allowed (creates TCP backpressure) 143 var waited: Result = .sec; 144 while (!shutdown.load(.acquire)) { 145 std.posix.nanosleep(0, 100 * std.time.ns_per_ms); 146 147 const t = std.time.milliTimestamp(); 148 self.sec.advance(t); 149 self.hour.advance(t); 150 self.day.advance(t); 151 152 if (self.sec.effectiveCount(t) >= self.sec_limit.load(.monotonic)) { 153 waited = .sec; 154 continue; 155 } 156 if (self.hour.effectiveCount(t) >= self.hour_limit.load(.monotonic)) { 157 waited = .hour; 158 continue; 159 } 160 if (self.day.effectiveCount(t) >= self.day_limit.load(.monotonic)) { 161 waited = .day; 162 continue; 163 } 164 165 self.sec.curr_count += 1; 166 self.hour.curr_count += 1; 167 self.day.curr_count += 1; 168 return waited; 169 } 170 return waited; 171 } 172 173 /// update rate limits from another thread (e.g. admin API). 174 pub fn updateLimits(self: *RateLimiter, sec: u64, hour: u64, day: u64) void { 175 self.sec_limit.store(sec, .monotonic); 176 self.hour_limit.store(hour, .monotonic); 177 self.day_limit.store(day, .monotonic); 178 } 179}; 180 181pub const Subscriber = struct { 182 allocator: Allocator, 183 options: Options, 184 bc: *broadcaster.Broadcaster, 185 validator: *validator_mod.Validator, 186 persist: ?*event_log_mod.DiskPersist, 187 collection_index: ?*collection_index_mod.CollectionIndex = null, 188 pool: ?*frame_worker_mod.FramePool = null, 189 shutdown: *std.atomic.Value(bool), 190 last_upstream_seq: ?u64 = null, 191 last_cursor_flush: i64 = 0, 192 rate_limiter: RateLimiter = .{}, 193 194 // per-host shutdown (e.g. FutureCursor — stops only this subscriber) 195 host_shutdown: std.atomic.Value(bool) = .{ .raw = false }, 196 197 pub fn init( 198 allocator: Allocator, 199 bc: *broadcaster.Broadcaster, 200 val: *validator_mod.Validator, 201 persist: ?*event_log_mod.DiskPersist, 202 shutdown: *std.atomic.Value(bool), 203 options: Options, 204 ) Subscriber { 205 const trusted = isTrustedHost(options.hostname); 206 const limits = computeLimits(trusted, options.account_count); 207 return .{ 208 .allocator = allocator, 209 .options = options, 210 .bc = bc, 211 .validator = val, 212 .persist = persist, 213 .shutdown = shutdown, 214 .rate_limiter = .{ 215 .sec_limit = .{ .raw = limits.sec }, 216 .hour_limit = .{ .raw = limits.hour }, 217 .day_limit = .{ .raw = limits.day }, 218 }, 219 }; 220 } 221 222 /// check if this subscriber should stop (global or per-host shutdown) 223 fn shouldStop(self: *Subscriber) bool { 224 return self.shutdown.load(.acquire) or self.host_shutdown.load(.acquire); 225 } 226 227 /// run the subscriber loop. reconnects with exponential backoff. 228 /// blocks until shutdown flag is set or host is exhausted. 229 pub fn run(self: *Subscriber) void { 230 var backoff: u64 = 1; 231 const max_backoff: u64 = 60; 232 233 // load cursor from DB if we have a host_id 234 if (self.options.host_id > 0) { 235 if (self.persist) |dp| { 236 const host_info = dp.getOrCreateHost(self.options.hostname) catch null; 237 if (host_info) |info| { 238 if (info.last_seq > 0) { 239 self.last_upstream_seq = info.last_seq; 240 log.info("host {s}: resuming from cursor {d}", .{ self.options.hostname, info.last_seq }); 241 } 242 } 243 } 244 } 245 246 while (!self.shouldStop()) { 247 log.info("host {s}: connecting...", .{self.options.hostname}); 248 249 self.connectAndRead() catch |err| { 250 if (self.shouldStop()) return; 251 log.err("host {s}: error: {s}, reconnecting in {d}s...", .{ self.options.hostname, @errorName(err), backoff }); 252 }; 253 254 if (self.shouldStop()) return; 255 256 // track failures for this host 257 if (self.options.host_id > 0) { 258 if (self.persist) |dp| { 259 const failures = dp.incrementHostFailures(self.options.host_id) catch 0; 260 if (failures >= max_consecutive_failures) { 261 log.warn("host {s}: exhausted after {d} failures, stopping", .{ self.options.hostname, failures }); 262 dp.updateHostStatus(self.options.host_id, "exhausted") catch {}; 263 return; 264 } 265 } 266 } 267 268 // backoff sleep in small increments so we can check shutdown 269 var remaining: u64 = backoff; 270 while (remaining > 0 and !self.shouldStop()) { 271 const chunk = @min(remaining, 1); 272 std.posix.nanosleep(chunk, 0); 273 remaining -= chunk; 274 } 275 backoff = @min(backoff * 2, max_backoff); 276 } 277 } 278 279 /// flush cursor position to the host table 280 fn flushCursor(self: *Subscriber) void { 281 if (self.options.host_id == 0) return; 282 const seq = self.last_upstream_seq orelse return; 283 if (self.persist) |dp| { 284 dp.updateHostSeq(self.options.host_id, seq) catch |err| { 285 log.debug("host {s}: cursor flush failed: {s}", .{ self.options.hostname, @errorName(err) }); 286 }; 287 } 288 } 289 290 fn connectAndRead(self: *Subscriber) !void { 291 var path_buf: [256]u8 = undefined; 292 var w: std.Io.Writer = .fixed(&path_buf); 293 294 try w.writeAll("/xrpc/com.atproto.sync.subscribeRepos"); 295 if (self.last_upstream_seq) |cursor| { 296 try w.print("?cursor={d}", .{cursor}); 297 } 298 const path = w.buffered(); 299 300 var client = try websocket.Client.init(self.allocator, .{ 301 .host = self.options.hostname, 302 .port = 443, 303 .tls = true, 304 .max_size = self.options.max_message_size, 305 .ca_bundle = self.options.ca_bundle, 306 }); 307 defer client.deinit(); 308 309 var host_header_buf: [256]u8 = undefined; 310 const host_header = std.fmt.bufPrint( 311 &host_header_buf, 312 "Host: {s}\r\n", 313 .{self.options.hostname}, 314 ) catch self.options.hostname; 315 316 try client.handshake(path, .{ .headers = host_header }); 317 log.info("host {s}: connected", .{self.options.hostname}); 318 319 // reset failures on successful connect 320 if (self.options.host_id > 0) { 321 if (self.persist) |dp| { 322 dp.resetHostFailures(self.options.host_id) catch {}; 323 } 324 } 325 326 var handler = FrameHandler{ 327 .subscriber = self, 328 }; 329 try client.readLoop(&handler); 330 } 331}; 332 333const FrameHandler = struct { 334 subscriber: *Subscriber, 335 336 pub fn serverMessage(self: *FrameHandler, data: []const u8) !void { 337 const sub = self.subscriber; 338 339 // lightweight header decode for cursor tracking + routing 340 var arena = std.heap.ArenaAllocator.init(sub.allocator); 341 defer arena.deinit(); 342 const alloc = arena.allocator(); 343 344 const header_result = zat.cbor.decode(alloc, data) catch |err| { 345 log.debug("frame header decode failed: {s} (len={d})", .{ @errorName(err), data.len }); 346 _ = sub.bc.stats.decode_errors.fetchAdd(1, .monotonic); 347 return; 348 }; 349 const header = header_result.value; 350 const payload_data = data[header_result.consumed..]; 351 352 // check op field (1 = message, -1 = error) 353 const op = header.getInt("op") orelse return; 354 if (op == -1) { 355 // error frame from upstream — check for FutureCursor 356 // Go relay: slurper.go — sets host to idle and disconnects 357 if (zat.cbor.decodeAll(alloc, payload_data) catch null) |err_payload| { 358 const err_name = err_payload.getString("error") orelse "unknown"; 359 const err_msg = err_payload.getString("message") orelse ""; 360 log.warn("host {s}: error frame: {s}: {s}", .{ sub.options.hostname, err_name, err_msg }); 361 if (std.mem.eql(u8, err_name, "FutureCursor")) { 362 // our cursor is ahead of the PDS — set host to idle, stop this subscriber only 363 if (sub.persist) |dp| { 364 if (sub.options.host_id > 0) { 365 dp.updateHostStatus(sub.options.host_id, "idle") catch {}; 366 } 367 } 368 sub.host_shutdown.store(true, .release); 369 } 370 } 371 return; 372 } 373 374 const frame_type = header.getString("t") orelse return; 375 const payload = zat.cbor.decodeAll(alloc, payload_data) catch |err| { 376 log.debug("frame payload decode failed: {s} (type={s})", .{ @errorName(err), frame_type }); 377 _ = sub.bc.stats.decode_errors.fetchAdd(1, .monotonic); 378 return; 379 }; 380 381 // count every successfully decoded event (matches Go relay's events_received_counter) 382 _ = sub.bc.stats.frames_in.fetchAdd(1, .monotonic); 383 384 // extract seq for cursor tracking (deferred until after pool accepts) 385 const upstream_seq = payload.getUint("seq"); 386 387 // time-based cursor flush (Go relay: every 4 seconds) 388 { 389 const now = std.time.timestamp(); 390 if (now - sub.last_cursor_flush >= cursor_flush_interval_sec) { 391 sub.flushCursor(); 392 sub.last_cursor_flush = now; 393 } 394 } 395 396 // per-host rate limiting — block until window opens (matches indigo's waitForLimiter) 397 // blocking here stalls the websocket reader → TCP backpressure → PDS slows down 398 switch (sub.rate_limiter.waitForAllow(sub.shutdown)) { 399 .allowed => {}, 400 .sec => { 401 _ = sub.bc.stats.rate_limited.fetchAdd(1, .monotonic); 402 _ = sub.bc.stats.rate_limited_sec.fetchAdd(1, .monotonic); 403 }, 404 .hour => { 405 _ = sub.bc.stats.rate_limited.fetchAdd(1, .monotonic); 406 _ = sub.bc.stats.rate_limited_hour.fetchAdd(1, .monotonic); 407 }, 408 .day => { 409 _ = sub.bc.stats.rate_limited.fetchAdd(1, .monotonic); 410 _ = sub.bc.stats.rate_limited_day.fetchAdd(1, .monotonic); 411 }, 412 } 413 414 // filter unknown frame types before submitting to pool (forward-compat) 415 const is_commit = std.mem.eql(u8, frame_type, "#commit"); 416 const is_sync = std.mem.eql(u8, frame_type, "#sync"); 417 const is_account = std.mem.eql(u8, frame_type, "#account"); 418 const is_identity = std.mem.eql(u8, frame_type, "#identity"); 419 420 if (!is_commit and !is_sync and !is_account and !is_identity) { 421 // advance cursor for intentionally skipped frames — 422 // we won't process these on reconnect either 423 if (upstream_seq) |s| sub.last_upstream_seq = s; 424 return; 425 } 426 427 // submit to frame pool for heavy processing (CBOR re-decode, validation, persist, broadcast) 428 // route by DID so commits for the same account serialize (prevents chain break races) 429 // blocks when queue is full — TCP backpressure propagates to upstream PDS (matches indigo) 430 if (sub.pool) |pool| { 431 const did_key: u64 = blk: { 432 const d = payload.getString("repo") orelse payload.getString("did"); 433 break :blk if (d) |s| std.hash.Wyhash.hash(0, s) else sub.options.host_id; 434 }; 435 const duped = sub.allocator.dupe(u8, data) catch return; 436 const t0 = std.time.nanoTimestamp(); 437 if (pool.submit(did_key, .{ 438 .data = duped, 439 .host_id = sub.options.host_id, 440 .hostname = sub.options.hostname, 441 .allocator = sub.allocator, 442 .bc = sub.bc, 443 .validator = sub.validator, 444 .persist = sub.persist, 445 .collection_index = sub.collection_index, 446 }, sub.shutdown)) { 447 // pool accepted — advance cursor past this frame 448 if (upstream_seq) |s| sub.last_upstream_seq = s; 449 if (std.time.nanoTimestamp() - t0 > 1_000_000) { // >1ms = had to wait 450 _ = sub.bc.stats.pool_backpressure.fetchAdd(1, .monotonic); 451 } 452 } else { 453 // shutdown requested — don't advance cursor so reconnect replays this frame 454 sub.allocator.free(duped); 455 } 456 return; 457 } 458 459 // fallback: no pool, process inline (original path for tests / standalone use) 460 self.processInline(sub, alloc, data, payload, upstream_seq, frame_type, is_commit, is_sync, is_account, is_identity); 461 } 462 463 /// inline processing path — used when no frame pool is configured (tests, standalone). 464 /// this is the original serverMessage heavy processing logic. 465 fn processInline( 466 _: *FrameHandler, 467 sub: *Subscriber, 468 alloc: Allocator, 469 data: []const u8, 470 payload: zat.cbor.Value, 471 upstream_seq: ?u64, 472 _: []const u8, 473 is_commit: bool, 474 is_sync: bool, 475 is_account: bool, 476 is_identity: bool, 477 ) void { 478 // extract DID: "repo" for commits, "did" for identity/account 479 const did: ?[]const u8 = if (is_commit) 480 payload.getString("repo") 481 else 482 payload.getString("did"); 483 484 // on #identity event, evict cached signing key so next commit re-resolves 485 if (is_identity) { 486 if (did) |d| sub.validator.evictKey(d); 487 } 488 489 // resolve DID → numeric UID for event header (host-aware) 490 const uid: u64 = if (sub.persist) |dp| blk: { 491 if (did) |d| { 492 const result = dp.uidForDidFromHost(d, sub.options.host_id) catch break :blk @as(u64, 0); 493 494 // host authority enforcement (mirrors frame_worker path) 495 if ((result.is_new or result.host_changed) and !is_identity) { 496 switch (sub.validator.resolveHostAuthority(d, sub.options.host_id)) { 497 .migrate => { 498 dp.setAccountHostId(result.uid, sub.options.host_id) catch {}; 499 if (result.host_changed) { 500 log.info("host {s}: account migrated uid={d} did={s}", .{ 501 sub.options.hostname, result.uid, d, 502 }); 503 } 504 }, 505 .reject => { 506 log.info("host {s}: dropping event, host authority failed uid={d} did={s}", .{ 507 sub.options.hostname, result.uid, d, 508 }); 509 _ = sub.bc.stats.failed.fetchAdd(1, .monotonic); 510 _ = sub.bc.stats.failed_host_authority.fetchAdd(1, .monotonic); 511 return; 512 }, 513 .accept => {}, 514 } 515 } 516 517 break :blk result.uid; 518 } else break :blk @as(u64, 0); 519 } else 0; 520 521 // process #account events: update upstream status 522 // Go relay: ingest.go processAccountEvent 523 if (is_account) { 524 if (sub.persist) |dp| { 525 if (uid > 0) { 526 const is_active = payload.getBool("active") orelse false; 527 const status_str = payload.getString("status"); 528 const new_status: []const u8 = if (is_active) 529 "active" 530 else 531 (status_str orelse "inactive"); 532 dp.updateAccountUpstreamStatus(uid, new_status) catch |err| { 533 log.debug("upstream status update failed: {s}", .{@errorName(err)}); 534 }; 535 536 // on account tombstone/deletion, remove all collection index entries 537 if (std.mem.eql(u8, new_status, "deleted") or std.mem.eql(u8, new_status, "takendown")) { 538 if (sub.collection_index) |ci| { 539 if (did) |d| { 540 ci.removeAll(d) catch |err| { 541 log.debug("collection removeAll failed: {s}", .{@errorName(err)}); 542 }; 543 } 544 } 545 } 546 } 547 } 548 } 549 550 // for commits and syncs: check account is active, validate, extract state 551 var commit_data_cid: ?[]const u8 = null; 552 var commit_rev: ?[]const u8 = null; 553 if (is_commit or is_sync) { 554 // drop for inactive accounts 555 if (sub.persist) |dp| { 556 if (uid > 0) { 557 const active = dp.isAccountActive(uid) catch true; 558 if (!active) { 559 _ = sub.bc.stats.skipped.fetchAdd(1, .monotonic); 560 return; 561 } 562 } 563 } 564 565 // rev checks: stale, future, chain continuity 566 if (is_commit and uid > 0) { 567 if (payload.getString("rev")) |incoming_rev| { 568 // future-rev rejection 569 if (zat.Tid.parse(incoming_rev)) |tid| { 570 const rev_us: i64 = @intCast(tid.timestamp()); 571 const now_us = std.time.microTimestamp(); 572 const skew_us: i64 = sub.validator.config.rev_clock_skew * 1_000_000; 573 if (rev_us > now_us + skew_us) { 574 log.info("host {s}: dropping future rev uid={d} rev={s}", .{ 575 sub.options.hostname, uid, incoming_rev, 576 }); 577 _ = sub.bc.stats.failed.fetchAdd(1, .monotonic); 578 _ = sub.bc.stats.failed_future_rev.fetchAdd(1, .monotonic); 579 return; 580 } 581 } 582 583 if (sub.persist) |dp| { 584 if (dp.getAccountState(uid, alloc) catch null) |prev| { 585 // stale rev check 586 if (std.mem.order(u8, incoming_rev, prev.rev) != .gt) { 587 log.debug("host {s}: dropping stale commit uid={d} rev={s} <= {s}", .{ 588 sub.options.hostname, uid, incoming_rev, prev.rev, 589 }); 590 _ = sub.bc.stats.skipped.fetchAdd(1, .monotonic); 591 return; 592 } 593 594 // chain continuity: since should match stored rev 595 if (payload.getString("since")) |since| { 596 if (!std.mem.eql(u8, since, prev.rev)) { 597 log.info("host {s}: chain break uid={d} since={s} stored_rev={s}", .{ 598 sub.options.hostname, uid, since, prev.rev, 599 }); 600 _ = sub.bc.stats.chain_breaks.fetchAdd(1, .monotonic); 601 } 602 } 603 604 // chain continuity: prevData CID should match stored data_cid 605 if (payload.get("prevData")) |pd| { 606 if (pd == .cid) { 607 const prev_data_encoded = zat.multibase.encode(alloc, .base32lower, pd.cid.raw) catch ""; 608 if (prev_data_encoded.len > 0 and prev.data_cid.len > 0 and 609 !std.mem.eql(u8, prev_data_encoded, prev.data_cid)) 610 { 611 log.info("host {s}: chain break uid={d} prevData mismatch", .{ 612 sub.options.hostname, uid, 613 }); 614 _ = sub.bc.stats.chain_breaks.fetchAdd(1, .monotonic); 615 } 616 } 617 } 618 } 619 } 620 } 621 } 622 623 if (is_commit) { 624 const result = sub.validator.validateCommit(payload); 625 if (!result.valid) return; 626 commit_data_cid = result.data_cid; 627 commit_rev = result.commit_rev; 628 629 // track collections from commit ops (phase 1 live indexing) 630 if (sub.collection_index) |ci| { 631 if (did) |d| { 632 if (payload.get("ops")) |ops| { 633 ci.trackCommitOps(d, ops); 634 } 635 } 636 } 637 } else { 638 // #sync: signature verification only, no ops/MST 639 const result = sub.validator.validateSync(payload); 640 if (!result.valid) return; 641 commit_data_cid = result.data_cid; 642 commit_rev = result.commit_rev; 643 } 644 } 645 646 // determine event kind for persistence 647 const kind: event_log_mod.EvtKind = if (is_commit) 648 .commit 649 else if (is_sync) 650 .sync 651 else if (is_account) 652 .account 653 else // is_identity (unknown types already filtered above) 654 .identity; 655 656 // persist and get relay-assigned seq, broadcast raw bytes. 657 // ordering mutex ensures frames are broadcast in seq order — 658 // without it, concurrent subscriber threads can interleave 659 // persist (seq assignment) and broadcast, delivering out-of-order. 660 if (sub.persist) |dp| { 661 const relay_seq = blk: { 662 sub.bc.broadcast_order.lock(); 663 defer sub.bc.broadcast_order.unlock(); 664 665 const seq = dp.persist(kind, uid, data) catch |err| { 666 log.warn("persist failed: {s}", .{@errorName(err)}); 667 return; 668 }; 669 sub.bc.stats.relay_seq.store(seq, .release); 670 const broadcast_data = broadcaster.resequenceFrame(alloc, data, seq) orelse data; 671 sub.bc.broadcast(seq, broadcast_data); 672 break :blk seq; 673 }; 674 _ = relay_seq; 675 676 // update per-DID state outside the ordering lock (Postgres round-trip) 677 if ((is_commit or is_sync) and uid > 0) { 678 if (commit_rev) |rev| { 679 const cid_str: []const u8 = if (commit_data_cid) |cid_raw| 680 zat.multibase.encode(alloc, .base32lower, cid_raw) catch "" 681 else 682 ""; 683 if (dp.updateAccountState(uid, rev, cid_str)) |updated| { 684 if (!updated) { 685 log.debug("host {s}: stale state update lost race uid={d} rev={s}", .{ 686 sub.options.hostname, uid, rev, 687 }); 688 } 689 } else |err| { 690 log.debug("account state update failed: {s}", .{@errorName(err)}); 691 } 692 } 693 } 694 } else { 695 sub.bc.broadcast(upstream_seq orelse 0, data); 696 } 697 } 698 699 pub fn close(self: *FrameHandler) void { 700 log.info("host {s}: connection closed", .{self.subscriber.options.hostname}); 701 // flush cursor on disconnect 702 self.subscriber.flushCursor(); 703 } 704}; 705 706// --- tests --- 707 708test "decode frame via SDK and extract fields" { 709 const cbor = zat.cbor; 710 711 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 712 defer arena.deinit(); 713 const alloc = arena.allocator(); 714 715 // build a commit frame using SDK encoder 716 const header: cbor.Value = .{ .map = &.{ 717 .{ .key = "op", .value = .{ .unsigned = 1 } }, 718 .{ .key = "t", .value = .{ .text = "#commit" } }, 719 } }; 720 const payload: cbor.Value = .{ .map = &.{ 721 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 722 .{ .key = "seq", .value = .{ .unsigned = 12345 } }, 723 .{ .key = "rev", .value = .{ .text = "3k2abc000000" } }, 724 .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } }, 725 } }; 726 727 const header_bytes = try cbor.encodeAlloc(alloc, header); 728 const payload_bytes = try cbor.encodeAlloc(alloc, payload); 729 730 var frame = try alloc.alloc(u8, header_bytes.len + payload_bytes.len); 731 @memcpy(frame[0..header_bytes.len], header_bytes); 732 @memcpy(frame[header_bytes.len..], payload_bytes); 733 734 // decode using SDK (same path as FrameHandler.serverMessage) 735 const h_result = try cbor.decode(alloc, frame); 736 const h = h_result.value; 737 const p_data = frame[h_result.consumed..]; 738 const p = try cbor.decodeAll(alloc, p_data); 739 740 try std.testing.expectEqualStrings("#commit", h.getString("t").?); 741 try std.testing.expectEqual(@as(i64, 1), h.getInt("op").?); 742 try std.testing.expectEqual(@as(i64, 12345), p.getInt("seq").?); 743 try std.testing.expectEqualStrings("did:plc:test123", p.getString("repo").?); 744} 745 746test "decode identity frame via SDK" { 747 const cbor = zat.cbor; 748 749 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 750 defer arena.deinit(); 751 const alloc = arena.allocator(); 752 753 const header: cbor.Value = .{ .map = &.{ 754 .{ .key = "op", .value = .{ .unsigned = 1 } }, 755 .{ .key = "t", .value = .{ .text = "#identity" } }, 756 } }; 757 const payload: cbor.Value = .{ .map = &.{ 758 .{ .key = "did", .value = .{ .text = "did:plc:alice" } }, 759 .{ .key = "seq", .value = .{ .unsigned = 99 } }, 760 .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } }, 761 } }; 762 763 const header_bytes = try cbor.encodeAlloc(alloc, header); 764 const payload_bytes = try cbor.encodeAlloc(alloc, payload); 765 766 var frame = try alloc.alloc(u8, header_bytes.len + payload_bytes.len); 767 @memcpy(frame[0..header_bytes.len], header_bytes); 768 @memcpy(frame[header_bytes.len..], payload_bytes); 769 770 const h_result = try cbor.decode(alloc, frame); 771 const h = h_result.value; 772 const p_data = frame[h_result.consumed..]; 773 const p = try cbor.decodeAll(alloc, p_data); 774 775 try std.testing.expectEqualStrings("#identity", h.getString("t").?); 776 try std.testing.expectEqualStrings("did:plc:alice", p.getString("did").?); 777 try std.testing.expectEqual(@as(i64, 99), p.getInt("seq").?); 778} 779 780test "rate limiter enforces per-second limit" { 781 var rl: RateLimiter = .{ .sec_limit = .{ .raw = 3 }, .hour_limit = .{ .raw = 1000 }, .day_limit = .{ .raw = 10000 } }; 782 783 const now: i64 = 1_000_000_000; 784 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now)); 785 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now)); 786 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now)); 787 try std.testing.expectEqual(RateLimiter.Result.sec, rl.allow(now)); 788 789 // at start of next second, prev carries over fully → still blocked 790 try std.testing.expectEqual(RateLimiter.Result.sec, rl.allow(now + 1000)); 791 792 // two seconds later prev is forgotten 793 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now + 2000)); 794} 795 796test "rate limiter enforces per-hour limit" { 797 var rl: RateLimiter = .{ .sec_limit = .{ .raw = 1000 }, .hour_limit = .{ .raw = 5 }, .day_limit = .{ .raw = 10000 } }; 798 799 const now: i64 = 3_600_000 * 100; 800 for (0..5) |_| { 801 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now)); 802 } 803 try std.testing.expectEqual(RateLimiter.Result.hour, rl.allow(now + 100)); 804 805 // next hour, prev carries over → still blocked 806 try std.testing.expectEqual(RateLimiter.Result.hour, rl.allow(now + 3_600_000)); 807 808 // two hours later prev is forgotten 809 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(now + 7_200_000)); 810} 811 812test "sliding window interpolates previous count by elapsed time" { 813 var rl: RateLimiter = .{ .sec_limit = .{ .raw = 10 }, .hour_limit = .{ .raw = 1_000_000 }, .day_limit = .{ .raw = 1_000_000 } }; 814 815 const base: i64 = 1_000_000_000; 816 for (0..8) |_| { 817 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base)); 818 } 819 820 // next second: prev=8, curr=0. effective=8, room for 2 821 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1000)); 822 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1000)); 823 try std.testing.expectEqual(RateLimiter.Result.sec, rl.allow(base + 1000)); 824 825 // halfway through: prev weight = 500/1000 = 0.5, so weighted_prev = 4. curr=2. eff=6, room for 4 826 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1500)); 827 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1500)); 828 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1500)); 829 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.allow(base + 1500)); 830 try std.testing.expectEqual(RateLimiter.Result.sec, rl.allow(base + 1500)); 831} 832 833test "waitForAllow blocks then allows after window advances" { 834 // verify that waitForAllow returns a non-.allowed result when the limit was hit, 835 // indicating it had to wait. We use a tiny limit so the fast path is exhausted. 836 var rl: RateLimiter = .{ .sec_limit = .{ .raw = 1 }, .hour_limit = .{ .raw = 1000 }, .day_limit = .{ .raw = 10000 } }; 837 var shutdown = std.atomic.Value(bool){ .raw = false }; 838 839 // first call takes the fast path 840 try std.testing.expectEqual(RateLimiter.Result.allowed, rl.waitForAllow(&shutdown)); 841 842 // second call must block (sec limit = 1), then return .sec after the window advances 843 // this will sleep ~100ms+ until the sliding window allows it 844 const before = std.time.milliTimestamp(); 845 const result = rl.waitForAllow(&shutdown); 846 const elapsed = std.time.milliTimestamp() - before; 847 848 try std.testing.expectEqual(RateLimiter.Result.sec, result); 849 try std.testing.expect(elapsed >= 100); // must have slept at least one 100ms poll 850} 851 852test "waitForAllow respects shutdown" { 853 var rl: RateLimiter = .{ .sec_limit = .{ .raw = 1 }, .hour_limit = .{ .raw = 1000 }, .day_limit = .{ .raw = 10000 } }; 854 var shutdown = std.atomic.Value(bool){ .raw = false }; 855 856 // exhaust the limit 857 _ = rl.waitForAllow(&shutdown); 858 859 // set shutdown before the next call 860 shutdown.store(true, .release); 861 862 // should return immediately without blocking 863 const before = std.time.milliTimestamp(); 864 _ = rl.waitForAllow(&shutdown); 865 const elapsed = std.time.milliTimestamp() - before; 866 867 try std.testing.expect(elapsed < 50); // should not have slept 868} 869 870test "trusted host detection" { 871 try std.testing.expect(isTrustedHost("pds-123.host.bsky.network")); 872 try std.testing.expect(isTrustedHost("abc.host.bsky.network")); 873 try std.testing.expect(!isTrustedHost("bsky.network")); 874 try std.testing.expect(!isTrustedHost("evil.bsky.network")); 875 try std.testing.expect(!isTrustedHost("pds.example.com")); 876} 877 878test "error frame (op=-1) is detected" { 879 const cbor = zat.cbor; 880 881 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 882 defer arena.deinit(); 883 const alloc = arena.allocator(); 884 885 const header: cbor.Value = .{ .map = &.{ 886 .{ .key = "op", .value = .{ .negative = -1 } }, 887 .{ .key = "t", .value = .{ .text = "#info" } }, 888 } }; 889 890 const header_bytes = try cbor.encodeAlloc(alloc, header); 891 const h_result = try cbor.decode(alloc, header_bytes); 892 const h = h_result.value; 893 894 try std.testing.expectEqual(@as(i64, -1), h.getInt("op").?); 895} 896 897// --- spec conformance tests --- 898 899test "spec: unknown frame type (op=1, t=#unknown) is ignored" { 900 // event stream spec: unknown t values must be ignored for forward-compat 901 const cbor = zat.cbor; 902 903 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 904 defer arena.deinit(); 905 const alloc = arena.allocator(); 906 907 const header: cbor.Value = .{ .map = &.{ 908 .{ .key = "op", .value = .{ .unsigned = 1 } }, 909 .{ .key = "t", .value = .{ .text = "#unknown" } }, 910 } }; 911 const payload: cbor.Value = .{ .map = &.{ 912 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 913 .{ .key = "seq", .value = .{ .unsigned = 1 } }, 914 } }; 915 916 const header_bytes = try cbor.encodeAlloc(alloc, header); 917 const payload_bytes = try cbor.encodeAlloc(alloc, payload); 918 919 // decode header — verify it's a valid message with unknown type 920 const h_result = try cbor.decode(alloc, header_bytes); 921 const h = h_result.value; 922 923 try std.testing.expectEqual(@as(i64, 1), h.getInt("op").?); 924 const frame_type = h.getString("t").?; 925 try std.testing.expectEqualStrings("#unknown", frame_type); 926 927 // verify unknown type is NOT one of the known types (this is the filter logic) 928 const is_commit = std.mem.eql(u8, frame_type, "#commit"); 929 const is_sync = std.mem.eql(u8, frame_type, "#sync"); 930 const is_account = std.mem.eql(u8, frame_type, "#account"); 931 const is_identity = std.mem.eql(u8, frame_type, "#identity"); 932 try std.testing.expect(!is_commit and !is_sync and !is_account and !is_identity); 933 934 // verify payload still decodes (frame is valid, just ignored) 935 const p = try cbor.decodeAll(alloc, payload_bytes); 936 try std.testing.expectEqualStrings("did:plc:test123", p.getString("did").?); 937} 938 939test "spec: error frame (op=-1) is handled, not persisted" { 940 // event stream spec: op=-1 frames are error notifications from upstream 941 const cbor = zat.cbor; 942 943 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 944 defer arena.deinit(); 945 const alloc = arena.allocator(); 946 947 const header: cbor.Value = .{ .map = &.{ 948 .{ .key = "op", .value = .{ .negative = -1 } }, 949 .{ .key = "t", .value = .{ .text = "#error" } }, 950 } }; 951 const err_payload: cbor.Value = .{ .map = &.{ 952 .{ .key = "error", .value = .{ .text = "FutureCursor" } }, 953 .{ .key = "message", .value = .{ .text = "cursor is ahead of server" } }, 954 } }; 955 956 const header_bytes = try cbor.encodeAlloc(alloc, header); 957 const h_result = try cbor.decode(alloc, header_bytes); 958 const h = h_result.value; 959 960 // verify op=-1 is detected 961 const op = h.getInt("op").?; 962 try std.testing.expectEqual(@as(i64, -1), op); 963 964 // verify error payload decodes correctly 965 const payload_bytes = try cbor.encodeAlloc(alloc, err_payload); 966 const p = try cbor.decodeAll(alloc, payload_bytes); 967 try std.testing.expectEqualStrings("FutureCursor", p.getString("error").?); 968 try std.testing.expectEqualStrings("cursor is ahead of server", p.getString("message").?); 969}