atproto relay implementation in zig zlay.waow.tech

fix: read full POST body when split across TCP segments

reverse proxies (traefik) may send headers and body in separate
TCP writes. the single stream.read() only got headers, leaving
the body empty — causing all POST endpoints (requestCrawl, ban,
block/unblock) to fail with "invalid JSON" through the ingress.

now parses Content-Length from headers and keeps reading until the
full body is received.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+45 -10
+45 -10
src/main.zig
··· 213 213 fn handleHttpConn(stream: std.net.Stream, stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, ci: *collection_index_mod.CollectionIndex) void { 214 214 defer stream.close(); 215 215 216 - // read request (headers + body for small POSTs) 216 + // read request — may need multiple reads if proxy splits headers/body 217 217 var buf: [8192]u8 = undefined; 218 - const n = stream.read(&buf) catch return; 219 - if (n == 0) return; 220 - const request = buf[0..n]; 218 + var total: usize = 0; 219 + total = stream.read(&buf) catch return; 220 + if (total == 0) return; 221 221 222 222 // parse first line: "METHOD /path HTTP/1.1" 223 - const line_end = std.mem.indexOfScalar(u8, request, '\n') orelse return; 224 - const first_line = request[0..line_end]; 223 + const line_end = std.mem.indexOfScalar(u8, buf[0..total], '\n') orelse return; 224 + const first_line = buf[0..line_end]; 225 225 226 226 const method_end = std.mem.indexOfScalar(u8, first_line, ' ') orelse return; 227 227 const method = first_line[0..method_end]; ··· 231 231 const path_end = std.mem.indexOfScalar(u8, rest, ' ') orelse rest.len; 232 232 const path = rest[0..path_end]; 233 233 234 - // find body (after \r\n\r\n) 235 - const header_end = std.mem.indexOf(u8, request, "\r\n\r\n"); 236 - const body: []const u8 = if (header_end) |he| request[he + 4 ..] else ""; 234 + // find end of headers 235 + const header_end = std.mem.indexOf(u8, buf[0..total], "\r\n\r\n"); 236 + 237 + // for POSTs: if we have headers, parse Content-Length and read remaining body 238 + if (std.mem.eql(u8, method, "POST")) { 239 + if (header_end) |he| { 240 + const headers = buf[0..he]; 241 + const content_length = parseContentLength(headers) orelse 0; 242 + const body_start = he + 4; 243 + const body_needed = body_start + content_length; 244 + 245 + // keep reading until we have the full body (or buffer is full) 246 + while (total < body_needed and total < buf.len) { 247 + const m = stream.read(buf[total..]) catch break; 248 + if (m == 0) break; 249 + total += m; 250 + } 251 + } 252 + } 253 + 254 + const request = buf[0..total]; 255 + // re-find header_end in case more data shifted things (it won't, but be safe) 256 + const he = std.mem.indexOf(u8, request, "\r\n\r\n"); 257 + const body: []const u8 = if (he) |h| request[h + 4 ..] else ""; 237 258 238 259 if (std.mem.eql(u8, method, "GET")) { 239 260 handleGet(stream, path, stats, persist, slurper, ci); 240 261 } else if (std.mem.eql(u8, method, "POST")) { 241 - handlePost(stream, path, request[0 .. header_end orelse n], body, persist, slurper); 262 + handlePost(stream, path, request[0 .. he orelse total], body, persist, slurper); 242 263 } else { 243 264 httpRespond(stream, "405 Method Not Allowed", "text/plain", "method not allowed"); 244 265 } 266 + } 267 + 268 + fn parseContentLength(headers: []const u8) ?usize { 269 + var iter = std.mem.splitScalar(u8, headers, '\n'); 270 + while (iter.next()) |line| { 271 + const trimmed = std.mem.trimRight(u8, line, "\r"); 272 + const colon = std.mem.indexOfScalar(u8, trimmed, ':') orelse continue; 273 + const key = std.mem.trim(u8, trimmed[0..colon], " "); 274 + if (std.ascii.eqlIgnoreCase(key, "content-length")) { 275 + const val = std.mem.trim(u8, trimmed[colon + 1 ..], " "); 276 + return std.fmt.parseInt(usize, val, 10) catch null; 277 + } 278 + } 279 + return null; 245 280 } 246 281 247 282 fn handleGet(stream: std.net.Stream, full_path: []const u8, stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, ci: *collection_index_mod.CollectionIndex) void {