search for standard sites pub-search.waow.tech
search zig blog atproto

fix: decouple TAP ACKs from turso writes via processing queue

root cause: processMessage (which writes to turso via HTTP) ran
synchronously in the websocket readLoop callback. when turso was
slow or hung, the readLoop blocked — no messages read, no ACKs
sent, TAP outbox grew unboundedly (4222 events stuck).

fix: send ACK immediately upon receipt, push message data to a
bounded queue, process in a separate worker thread. readLoop
never blocks on turso. if turso is slow, queue fills and oldest
messages are dropped (already ACK'd, indexing is idempotent via
ON CONFLICT DO UPDATE).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+121 -12
+121 -12
backend/src/ingest/tap.zig
··· 44 44 return getTapPort() == 443; 45 45 } 46 46 47 + /// Bounded queue for decoupling websocket readLoop from turso writes. 48 + /// ACKs are sent immediately in the readLoop; processing happens in a worker thread. 49 + /// If the queue is full (turso is slow), new messages are dropped (already ACK'd). 50 + const QUEUE_CAPACITY = 256; 51 + 52 + const ProcessQueue = struct { 53 + mutex: std.Thread.Mutex = .{}, 54 + cond: std.Thread.Condition = .{}, 55 + items: [QUEUE_CAPACITY]?[]u8 = .{null} ** QUEUE_CAPACITY, 56 + head: usize = 0, 57 + tail: usize = 0, 58 + len: usize = 0, 59 + stopped: bool = false, 60 + allocator: Allocator, 61 + dropped: usize = 0, 62 + processed: usize = 0, 63 + 64 + fn push(self: *ProcessQueue, data: []u8) void { 65 + self.mutex.lock(); 66 + defer self.mutex.unlock(); 67 + 68 + if (self.len == QUEUE_CAPACITY) { 69 + // queue full — drop oldest (already ACK'd) 70 + if (self.items[self.head]) |old| { 71 + self.allocator.free(old); 72 + } 73 + self.head = (self.head + 1) % QUEUE_CAPACITY; 74 + self.len -= 1; 75 + self.dropped += 1; 76 + if (self.dropped <= 5 or self.dropped % 100 == 0) { 77 + logfire.warn("tap: queue full, dropped {d} messages total", .{self.dropped}); 78 + } 79 + } 80 + 81 + self.items[self.tail] = data; 82 + self.tail = (self.tail + 1) % QUEUE_CAPACITY; 83 + self.len += 1; 84 + self.cond.signal(); 85 + } 86 + 87 + fn pop(self: *ProcessQueue) ?[]u8 { 88 + self.mutex.lock(); 89 + defer self.mutex.unlock(); 90 + 91 + while (self.len == 0 and !self.stopped) { 92 + self.cond.wait(&self.mutex); 93 + } 94 + 95 + if (self.len == 0) return null; // stopped with empty queue 96 + 97 + const data = self.items[self.head].?; 98 + self.items[self.head] = null; 99 + self.head = (self.head + 1) % QUEUE_CAPACITY; 100 + self.len -= 1; 101 + return data; 102 + } 103 + 104 + fn stop(self: *ProcessQueue) void { 105 + self.mutex.lock(); 106 + defer self.mutex.unlock(); 107 + self.stopped = true; 108 + self.cond.signal(); 109 + } 110 + 111 + fn drain(self: *ProcessQueue) void { 112 + self.mutex.lock(); 113 + defer self.mutex.unlock(); 114 + for (&self.items) |*item| { 115 + if (item.*) |data| { 116 + self.allocator.free(data); 117 + item.* = null; 118 + } 119 + } 120 + } 121 + }; 122 + 123 + fn processWorker(queue: *ProcessQueue) void { 124 + logfire.info("tap: process worker started", .{}); 125 + while (queue.pop()) |data| { 126 + defer queue.allocator.free(data); 127 + processMessage(queue.allocator, data) catch |err| { 128 + logfire.err("message processing error: {}", .{err}); 129 + }; 130 + queue.mutex.lock(); 131 + queue.processed += 1; 132 + queue.mutex.unlock(); 133 + } 134 + logfire.info("tap: process worker stopped (processed {d}, dropped {d})", .{ queue.processed, queue.dropped }); 135 + } 136 + 47 137 pub fn consumer(allocator: Allocator) void { 48 138 var backoff: u64 = 1; 49 139 const max_backoff: u64 = 30; ··· 66 156 const Handler = struct { 67 157 allocator: Allocator, 68 158 client: *websocket.Client, 159 + queue: *ProcessQueue, 69 160 msg_count: usize = 0, 70 161 ack_count: usize = 0, 71 162 no_id_count: usize = 0, ··· 74 165 pub fn serverMessage(self: *Handler, data: []const u8) !void { 75 166 self.msg_count += 1; 76 167 if (self.msg_count % 1000 == 0) { 77 - logfire.info("tap: processed {d} messages, acks sent: {d}, no-id: {d}", .{ self.msg_count, self.ack_count, self.no_id_count }); 168 + logfire.info("tap: recv {d}, acks {d}, processed {d}, dropped {d}, queued {d}", .{ 169 + self.msg_count, self.ack_count, self.queue.processed, self.queue.dropped, self.queue.len, 170 + }); 78 171 } 79 172 80 173 // extract message ID for ACK 81 174 const msg_id = extractMessageId(self.allocator, data); 82 175 83 - // process the message 84 - processMessage(self.allocator, data) catch |err| { 85 - logfire.err("message processing error: {}", .{err}); 86 - // still ACK even on error to avoid infinite retries 87 - }; 88 - 89 - // send ACK if we have a message ID 176 + // ACK immediately — before processing — to keep TAP outbox draining. 177 + // processing happens asynchronously in the worker thread. 90 178 if (msg_id) |id| { 91 179 self.sendAck(id); 92 180 } else { ··· 95 183 logfire.warn("tap: message has no id, first {d} bytes: {s}", .{ @min(data.len, 100), data[0..@min(data.len, 100)] }); 96 184 } 97 185 } 186 + 187 + // dupe message data (websocket reuses the buffer) and push to processing queue 188 + const data_copy = self.allocator.dupe(u8, data) catch |err| { 189 + logfire.err("tap: failed to dupe message: {}", .{err}); 190 + return; 191 + }; 192 + self.queue.push(data_copy); 98 193 } 99 194 100 195 fn sendAck(self: *Handler, msg_id: i64) void { ··· 102 197 logfire.err("tap: ACK format error: {}", .{err}); 103 198 return; 104 199 }; 200 + // log before write — websocket.zig masks the buffer in-place 201 + if (self.ack_count < 3) { 202 + logfire.info("tap: sending ACK for id={d}", .{msg_id}); 203 + } 105 204 self.client.write(@constCast(ack_json)) catch |err| { 106 205 logfire.err("tap: failed to send ACK: {}", .{err}); 107 206 return; 108 207 }; 109 208 self.ack_count += 1; 110 - if (self.ack_count <= 3) { 111 - logfire.info("tap: ACK sent for id={d}, ack_json={s}", .{ msg_id, ack_json }); 112 - } 113 209 } 114 210 115 211 pub fn close(_: *Handler) void {} ··· 150 246 151 247 logfire.info("tap connected", .{}); 152 248 153 - var handler = Handler{ .allocator = allocator, .client = &client }; 249 + // processing queue + worker thread: decouples readLoop from turso writes 250 + // so a slow/hung turso request never blocks ACKs 251 + var queue = ProcessQueue{ .allocator = allocator }; 252 + const worker = std.Thread.spawn(.{}, processWorker, .{&queue}) catch |err| { 253 + logfire.err("tap: failed to spawn process worker: {}", .{err}); 254 + return err; 255 + }; 256 + defer { 257 + queue.stop(); 258 + worker.join(); 259 + queue.drain(); 260 + } 261 + 262 + var handler = Handler{ .allocator = allocator, .client = &client, .queue = &queue }; 154 263 client.readLoop(&handler) catch |err| { 155 264 logfire.err("websocket read loop error: {}", .{err}); 156 265 return err;