atproto relay implementation in zig zlay.waow.tech
at main 376 lines 17 kB view raw view rendered
1# zlay memory allocation audit 2 3linear RSS growth: ~290 MiB/hour, 0 → 3.5 GiB in 12h (~117 bytes/frame at 700 frames/sec) 4 5all allocations use `std.heap.c_allocator` (glibc malloc) unless noted. 6 7--- 8 9## 1. per-frame hot path (~700 frames/sec across 16 workers) 10 11each incoming firehose frame passes through: subscriber → thread pool → frame worker. 12the following allocations happen FOR EVERY FRAME. 13 14### 1a. subscriber header decode arena 15- **file**: subscriber.zig:260-261 16- **what**: `ArenaAllocator.init(sub.allocator)` — lightweight CBOR header+payload decode 17- **size**: ~32KB chunk (default arena chunk size), actual used ~1-5KB 18- **freed**: `defer arena.deinit()` — same function 19- **status**: CLEAN 20 21### 1b. frame data dupe (subscriber → worker handoff) 22- **file**: subscriber.zig:341 23- **what**: `sub.allocator.dupe(u8, data)` — copies raw frame bytes for async processing 24- **size**: ~2-5KB per frame (full websocket message) 25- **freed**: frame_worker.zig:34 `defer work.allocator.free(work.data)` 26- **status**: CLEAN (also freed on backpressure: subscriber.zig:353) 27 28### 1c. frame worker processing arena 29- **file**: frame_worker.zig:36-37 30- **what**: `ArenaAllocator.init(work.allocator)` — full CBOR re-decode, multibase encode, resequence 31- **size**: ~32KB chunk, actual used ~5-20KB (depends on frame complexity) 32- **freed**: `defer arena.deinit()` — same function 33- **status**: CLEAN 34 35### 1d. persist data allocation 36- **file**: event_log.zig:592 37- **what**: `self.allocator.alloc(u8, header_size + payload.len)` — 28-byte LE header + raw CBOR payload 38- **size**: 28 + payload_len (~2-5KB) 39- **freed**: event_log.zig:845-846 in `flushLocked()``for (self.evtbuf.items) |job| self.allocator.free(job.data)` 40- **flush trigger**: every 400 events OR every 100ms (whichever comes first) 41- **status**: CLEAN — but see note on outbuf/evtbuf below 42 43### 1e. outbuf/evtbuf ArrayList growth 44- **file**: event_log.zig:99-100 45- **what**: `outbuf: ArrayListUnmanaged(u8)` and `evtbuf: ArrayListUnmanaged(PersistJob)` 46- **growth**: ArrayList uses 2x growth factor. outbuf accumulates raw bytes, evtbuf accumulates job structs. 47- **freed**: `clearRetainingCapacity()` on flush — backing memory is KEPT, only len reset to 0 48- **bounded**: yes — max ~400 entries before flush. outbuf max ~400 × 5KB = ~2MB. backing array ~4MB max. 49- **status**: CLEAN (bounded, retains capacity) 50 51### 1f. SharedFrame for broadcast 52- **file**: broadcaster.zig:50-59 (`SharedFrame.create`) 53- **what**: allocates SharedFrame struct + dupes frame data 54- **size**: sizeof(SharedFrame) (~48 bytes) + data.len (~2-5KB) 55- **freed**: ref-counted. broadcaster releases immediately (`defer frame.release()` line 372). each consumer releases after send (writeLoop line 231). freed when refcount hits 0 (line 66-70). 56- **status**: CLEAN (if no consumers, created+freed immediately) 57 58### 1g. ring buffer history entry 59- **file**: ring_buffer.zig:55 via broadcaster.zig:368 60- **what**: `self.allocator.dupe(u8, data)` — copies frame data into history 61- **size**: ~2-5KB per frame (resequenced CBOR) 62- **freed**: on overwrite when buffer is full (ring_buffer.zig:60-62) 63- **bounded**: 50,000 entries. once full, every push frees the oldest. 64- **steady state**: 50K × ~3KB avg = ~150MB 65- **status**: CLEAN (bounded) 66 67### 1h. resequenceFrame temporaries 68- **file**: broadcaster.zig:117-156, called from frame_worker.zig:232 69- **what**: CBOR decode + re-encode with new seq. allocates ArrayList, CBOR encode buffers, result slice. 70- **size**: ~5-10KB temporaries 71- **freed**: allocated on frame worker's arena (1c), freed when arena deinits 72- **status**: CLEAN 73 74### 1i. collection index keys (per commit with ops) 75- **file**: collection_index.zig:79-82 via trackCommitOps → addCollection 76- **what**: `makeKey()` allocates 2 keys: `collection\0did` and `did\0collection` 77- **size**: ~100-200 bytes per key pair 78- **freed**: `defer self.allocator.free(rbc_key)` / `defer self.allocator.free(cbr_key)` — same function 79- **status**: CLEAN 80 81### 1j. postgres queries per frame (via pg.zig) 82each frame triggers 3-5 postgres queries. each query allocates internally: 83 84| query | file | method | per-frame? | 85|-------|------|--------|------------| 86| DID→UID lookup | event_log.zig:294 | rowUnsafe | yes (cache miss only) | 87| DID→UID create | event_log.zig:306 | exec | yes (first encounter only) | 88| DID→UID readback | event_log.zig:315 | rowUnsafe | yes (first encounter only) | 89| isAccountActive | event_log.zig:398 | rowUnsafe | yes (commits/syncs) | 90| getAccountState | event_log.zig:339 | rowUnsafe | yes (commits, for rev check) | 91| updateAccountState | event_log.zig:358 | exec | yes (validated commits) | 92| updateUpstreamStatus | event_log.zig:389 | exec | yes (#account events) | 93| updateHostSeq | event_log.zig:458 | exec | every 4s per host | 94| getAccountHostId | event_log.zig:369 | rowUnsafe | yes (uidForDidFromHost) | 95 96**pg.zig internals** (from karlseguin/pg.zig): 97- `Pool.rowUnsafe()`: acquires connection from pool, creates a `Result` with its own ArenaAllocator 98- `Result.deinit()`: releases connection back to pool, destroys arena 99- `QueryRowUnsafe.deinit()`: calls `result.drain()` THEN `result.deinit()` 100 101**POTENTIAL ISSUE — pg.zig deinit error path**: 102```zig 103// pg.zig QueryRowUnsafe.deinit(): 104pub fn deinit(self: *QueryRowUnsafe) !void { 105 try self.result.drain(); // if this errors... 106 self.result.deinit(); // ...this NEVER runs → arena leak + connection leak 107} 108``` 109zlay callers use `defer row.deinit() catch {};` — if drain() errors, the Result arena and connection are LEAKED. drain() errors on network failure to postgres. 110 111- **likelihood**: low per-frame (postgres is local), but at 2,000+ queries/sec, even 0.01% failure = 0.2 leaks/sec 112- **size per leak**: ~1-4KB (pg arena) + 1 connection slot 113- **impact at 0.01%**: ~0.7 MB/hour (doesn't explain 290 MB/hour) 114- **impact at 0.1%**: ~7 MB/hour (partial explanation?) 115 116### 1k. validator arenas (per validated commit/sync) 117- **file**: validator.zig:161-163 (validateSync) and validator.zig:242-244 (verifyCommit) 118- **what**: `ArenaAllocator.init(self.allocator)` for CAR parsing + signature verification 119- **size**: depends on CAR size. typical commit: ~5-50KB used. 120- **freed**: `defer arena.deinit()` — same function 121- **note**: these were NOT changed in the page_allocator experiment 122- **status**: CLEAN (assuming arena.deinit works correctly) 123 124### 1l. getAccountState string dupes 125- **file**: event_log.zig:348-349 126- **what**: `allocator.dupe(u8, rev)` and `allocator.dupe(u8, data_cid)` — copies from pg result into caller's arena 127- **size**: ~20-50 bytes each (TID rev + multibase CID) 128- **freed**: allocated on frame worker's arena (1c), freed when arena deinits 129- **status**: CLEAN 130 131--- 132 133## 2. per-connection allocations (~2,750 PDS hosts) 134 135### 2a. subscriber struct + hostname 136- **file**: slurper.zig:426-427, 423 137- **what**: `allocator.create(Subscriber)` + `allocator.dupe(u8, hostname)` 138- **size**: sizeof(Subscriber) + hostname (~20-50 bytes) 139- **freed**: slurper.zig:468-469 (runWorker cleanup on exit) 140- **status**: CLEAN (bounded by host count) 141 142### 2b. websocket.Client per connection 143- **file**: subscriber.zig:220-227 144- **what**: `websocket.Client.init(self.allocator, ...)` — TLS buffers, read buffers 145- **size**: TLS handshake buffers (~134KB), static read buffer (4096 bytes) 146- **freed**: `defer client.deinit()` — subscriber.zig:227 147- **status**: CLEAN (created/freed per connection attempt) 148 149### 2c. websocket dynamic read buffers 150- **what**: for messages > 4096 bytes, BufferProvider allocates a dynamic buffer 151- **freed**: `reader.done()``restoreStatic()``provider.release()``allocator.free()` 152- **status**: CLEAN (freed after each message) 153 154### 2d. shared TLS CA bundle 155- **file**: slurper.zig:251-253 156- **what**: `bundle.rescan(self.allocator)` — loads system CA certs 157- **size**: ~100-200KB (one-time) 158- **freed**: slurper.zig:575 `b.deinit(self.allocator)` 159- **status**: CLEAN (one-time, shared) 160 161### 2e. thread stacks 162- **what**: each subscriber thread gets 8MB virtual stack (`default_stack_size`) 163- **count**: ~2,750 subscriber threads + 16 worker threads + 4 resolver threads + 3 background threads = ~2,773 threads 164- **total virtual**: ~22 GiB (but only touched pages count as RSS) 165- **status**: BOUNDED (pages are returned when threads exit) 166 167--- 168 169## 3. bounded caches and data structures 170 171### 3a. DID → UID cache (LRU) 172- **file**: event_log.zig:96, lru.zig 173- **capacity**: 500,000 entries 174- **per entry**: duped key string (~30 bytes DID) + Node struct (~80 bytes) + u64 value + HashMap entry 175- **steady state**: ~55-65 MB 176- **status**: BOUNDED (evicts LRU on put when full) 177 178### 3b. validator signing key cache (LRU) 179- **file**: validator.zig:50, lru.zig 180- **capacity**: 250,000 entries 181- **per entry**: duped key string (~30 bytes DID) + Node struct (~80 bytes) + CachedKey (fixed 42 bytes) + HashMap entry 182- **steady state**: ~30-40 MB 183- **status**: BOUNDED (evicts LRU on put when full) 184 185### 3c. resolve queue 186- **file**: validator.zig:52-54 187- **what**: `queue: ArrayListUnmanaged([]const u8)` + `queued_set: StringHashMapUnmanaged(void)` 188- **per entry**: duped DID string (~30 bytes) + HashMap entry 189- **max size**: 100,000 (validator.zig:63) 190- **freed**: DID freed after resolution (validator.zig:419) 191- **status**: BOUNDED 192 193### 3d. workers map 194- **file**: slurper.zig:217 195- **what**: `std.AutoHashMapUnmanaged(u64, WorkerEntry)` — host_id → thread+subscriber 196- **size**: bounded by host count (~2,750) 197- **status**: BOUNDED 198 199### 3e. crawl request queue 200- **file**: slurper.zig:221 201- **what**: `ArrayListUnmanaged([]const u8)` — duped hostnames waiting for processing 202- **freed**: slurper.zig:522 `defer self.allocator.free(h)` after processing 203- **status**: BOUNDED (grows slowly, processed continuously) 204 205### 3f. frame pool ring buffers 206- **file**: thread_pool.zig 207- **what**: pre-allocated ring buffers per worker. zero alloc per submit. 208- **size**: 16 workers × 4096 capacity × sizeof(FrameWork) per entry 209- **status**: FIXED SIZE (no growth) 210 211### 3g. consumer list 212- **file**: broadcaster.zig:291 213- **what**: `ArrayListUnmanaged(*Consumer)` — active downstream WebSocket consumers 214- **per consumer**: Consumer struct (~66KB for 8192-entry SharedFrame pointer buffer) + write thread 215- **status**: BOUNDED by connected consumers (typically 0-10) 216 217--- 218 219## 4. API handler allocations (cold paths) 220 221### 4a. handleAdminListHosts — ALLOCATES ON c_allocator 222- **file**: api/admin.zig:89-99 223- **what**: `persist.listAllHosts(persist.allocator)` — dupes hostname+status for each host 224- **freed**: defer block frees all hostname/status strings + slice 225- **status**: CLEAN 226 227### 4b. handleBan — LEAKS (cold path) 228- **file**: api/admin.zig:70-78 229- **what**: `buildAccountFrame(ctx.persist.allocator, did)` → allocates CBOR frame on c_allocator 230- **what**: `broadcaster.resequenceFrame(ctx.persist.allocator, frame_bytes, relay_seq)` → allocates resequenced frame on c_allocator 231- **freed**: NEITHER frame_bytes NOR broadcast_data is freed after broadcast 232- **size**: ~200-500 bytes per ban 233- **status**: **LEAK** — but cold path (admin-only, negligible impact) 234 235### 4c. handleAdminBackfillStatus 236- **file**: api/admin.zig:197-201 237- **what**: `backfiller.getStatus(backfiller.allocator)` → builds JSON string on c_allocator 238- **freed**: `defer backfiller.allocator.free(body)` 239- **status**: CLEAN 240 241### 4d. xrpc handlers (listRepos, getRepoStatus, etc.) 242- **what**: all use stack-allocated fixed buffers (65536 bytes) or pg query iteration 243- **no heap allocation** in the handler code itself 244- **pg queries**: each creates/destroys internal pg arenas (same pattern as 1j) 245- **status**: CLEAN 246 247### 4e. handleRequestCrawl 248- **file**: api/xrpc.zig:429 249- **what**: `std.json.parseFromSlice(...)` on `slurper.allocator` — parses JSON body 250- **freed**: `defer parsed.deinit()` 251- **what**: `validateHostname(slurper.allocator, ...)` — allocates normalized hostname 252- **freed**: `defer slurper.allocator.free(hostname)` 253- **what**: `slurper.addCrawlRequest(hostname)` → dupes hostname 254- **freed**: by crawl processor (slurper.zig:522) 255- **status**: CLEAN 256 257--- 258 259## 5. one-time startup allocations 260 261| what | file | size | 262|------|------|------| 263| Broadcaster struct | main.zig:140 | ~400KB (history array) | 264| Validator struct | main.zig:143 | ~100 bytes | 265| DiskPersist (+ pg pool) | main.zig:149 | ~10KB + pg connections | 266| CollectionIndex (RocksDB) | main.zig:169 | RocksDB internal (~50-100MB) | 267| Backfiller struct | main.zig:176 | ~100 bytes | 268| CA bundle | slurper.zig:251 | ~100-200KB | 269| Frame pool | slurper.zig:257 | pre-allocated ring buffers | 270| Error frame (CBOR) | broadcaster.zig:306 | ~50 bytes | 271| MetricsServer struct | main.zig:216 | ~100 bytes | 272| build_options module | (compile-time) | ~100 bytes | 273 274--- 275 276## 6. backfill allocations (when running) 277 278### 6a. discoverCollections 279- **file**: backfill.zig:101-148 280- **what**: fetches lexicon garden llms.txt + RBC scan, deduplicates 281- **freed**: all temporaries freed via defer blocks 282- **status**: CLEAN (only runs when admin triggers backfill) 283 284### 6b. backfillCollection per-page 285- **file**: backfill.zig:269-318 286- **what**: per-page: HTTP fetch → JSON parse → dupe DIDs → addCollection to RocksDB 287- **freed**: all duped strings freed in defer blocks 288- **http client**: reused across pages for one collection, freed after collection done 289- **status**: CLEAN 290 291--- 292 293## 7. dependency internal allocations 294 295### 7a. RocksDB (via rocksdb-zig) 296- internal memory managed by RocksDB C library (block cache, memtables, etc.) 297- not tracked by c_allocator — uses its own allocator 298- bounded by RocksDB options (write_buffer_size, max_open_files, block_cache_size) 299 300### 7b. pg.zig connection pool 301- **file**: initialized in event_log.zig:132 with `size = 5` 302- 5 connections, each with internal read/write buffers 303- **status**: BOUNDED (5 connections) 304 305### 7c. zat.DidResolver (per resolver thread) 306- **file**: validator.zig:401 307- **what**: `zat.DidResolver.init(self.allocator)` — creates HTTP client for DID resolution 308- **long-lived**: one per resolver thread (4 total), lives until shutdown 309- **internal**: likely holds std.http.Client with connection pool 310- **potential issue**: if std.http.Client pools connections to many unique hosts (PLC server, PDS endpoints), the pool could grow. but PLC is typically one host (plc.directory). 311 312### 7d. zat CBOR decode 313- all CBOR decode operations are on arena allocators (subscriber arena or worker arena) 314- freed when arena deinits 315- **status**: CLEAN 316 317--- 318 319## 8. summary of potential issues 320 321### confirmed leak (cold path, negligible): 322- **admin handleBan**: leaks frame_bytes + broadcast_data (~500 bytes per ban) 323 324### potential leak under network errors: 325- **pg.zig QueryRowUnsafe.deinit()**: if drain() errors, Result arena + connection leak 326- at 2,000+ queries/sec, even rare failures accumulate 327- needs investigation: check postgres error rate in logs 328 329### fragmentation concerns: 330- ~700 frames/sec × ~10 alloc/free cycles per frame = ~7,000 alloc/free operations per second on c_allocator 331- many different sizes (28-byte headers, 2-5KB frames, 100-byte keys, 1-4KB pg arenas) 332- glibc with MALLOC_ARENA_MAX=2 concentrates fragmentation into 2 arenas 333- mallinfo() only reports the MAIN arena — second arena is invisible 334- malloc_trim(0) only trims the main arena — second arena is untrimmed 335 336### items NOT investigated: 337- zat library internals (CBOR allocator patterns, DID resolver HTTP client connection pooling) 338- rocksdb-zig binding allocations (WriteBatch, Iterator internal state) 339- std.http.Client internal connection/TLS buffer retention within zat.DidResolver 340 341--- 342 343## 9. per-frame allocation count summary 344 345for a typical validated #commit frame, approximately: 346 347| step | allocs | frees | net | size | 348|------|--------|-------|-----|------| 349| subscriber arena init | 1 chunk | 0 | +1 | ~32KB | 350| subscriber CBOR decode | ~5 | 0 | +5 | ~2KB | 351| subscriber arena deinit | 0 | 1 chunk | -1 | ~32KB | 352| frame data dupe | 1 | 0 | +1 | ~3KB | 353| worker arena init | 1 chunk | 0 | +1 | ~32KB | 354| worker CBOR re-decode | ~5 | 0 | +5 | ~2KB | 355| pg: uidForDid (cache hit) | 0 | 0 | 0 | 0 | 356| pg: isAccountActive | 1 arena | 1 arena | 0 | ~2KB | 357| pg: getAccountState | 1 arena | 1 arena | 0 | ~2KB | 358| multibase encode | 1 | 0 | +1 | ~50B | 359| validator arena init | 1 chunk | 0 | +1 | ~32KB | 360| verifyCommitCar | ~10 | 0 | +10 | ~20KB | 361| validator arena deinit | 0 | 1 chunk | -1 | ~32KB | 362| persist data alloc | 1 | 0 | +1 | ~3KB | 363| resequenceFrame | ~5 | 0 | +5 | ~5KB | 364| SharedFrame.create | 2 (struct+data) | 0 | +2 | ~3KB | 365| ring buffer dupe | 1 | 1 (overwrite) | 0 | ~3KB | 366| collection index keys | 2 | 2 | 0 | ~200B | 367| worker arena deinit | 0 | 1 chunk | -1 | ~32KB | 368| frame data free | 0 | 1 | -1 | ~3KB | 369| persist flush (batched) | 0 | 1 | -1 | ~3KB | 370| SharedFrame release | 0 | 2 | -2 | ~3KB | 371| **TOTAL per frame** | **~37** | **~12** | 0 | 0 | 372 373all allocations balance out. yet RSS grows linearly. the remaining hypotheses are: 3741. glibc malloc fragmentation in the per-thread arenas (invisible to mallinfo/malloc_trim) 3752. a leak in a dependency (pg.zig error path, zat internals, rocksdb-zig) 3763. a leak we haven't found in the zig code