atproto relay implementation in zig
zlay.waow.tech
1# zlay memory allocation audit
2
3linear RSS growth: ~290 MiB/hour, 0 → 3.5 GiB in 12h (~117 bytes/frame at 700 frames/sec)
4
5all allocations use `std.heap.c_allocator` (glibc malloc) unless noted.
6
7---
8
9## 1. per-frame hot path (~700 frames/sec across 16 workers)
10
11each incoming firehose frame passes through: subscriber → thread pool → frame worker.
12the following allocations happen FOR EVERY FRAME.
13
14### 1a. subscriber header decode arena
15- **file**: subscriber.zig:260-261
16- **what**: `ArenaAllocator.init(sub.allocator)` — lightweight CBOR header+payload decode
17- **size**: ~32KB chunk (default arena chunk size), actual used ~1-5KB
18- **freed**: `defer arena.deinit()` — same function
19- **status**: CLEAN
20
21### 1b. frame data dupe (subscriber → worker handoff)
22- **file**: subscriber.zig:341
23- **what**: `sub.allocator.dupe(u8, data)` — copies raw frame bytes for async processing
24- **size**: ~2-5KB per frame (full websocket message)
25- **freed**: frame_worker.zig:34 `defer work.allocator.free(work.data)`
26- **status**: CLEAN (also freed on backpressure: subscriber.zig:353)
27
28### 1c. frame worker processing arena
29- **file**: frame_worker.zig:36-37
30- **what**: `ArenaAllocator.init(work.allocator)` — full CBOR re-decode, multibase encode, resequence
31- **size**: ~32KB chunk, actual used ~5-20KB (depends on frame complexity)
32- **freed**: `defer arena.deinit()` — same function
33- **status**: CLEAN
34
35### 1d. persist data allocation
36- **file**: event_log.zig:592
37- **what**: `self.allocator.alloc(u8, header_size + payload.len)` — 28-byte LE header + raw CBOR payload
38- **size**: 28 + payload_len (~2-5KB)
39- **freed**: event_log.zig:845-846 in `flushLocked()` — `for (self.evtbuf.items) |job| self.allocator.free(job.data)`
40- **flush trigger**: every 400 events OR every 100ms (whichever comes first)
41- **status**: CLEAN — but see note on outbuf/evtbuf below
42
43### 1e. outbuf/evtbuf ArrayList growth
44- **file**: event_log.zig:99-100
45- **what**: `outbuf: ArrayListUnmanaged(u8)` and `evtbuf: ArrayListUnmanaged(PersistJob)`
46- **growth**: ArrayList uses 2x growth factor. outbuf accumulates raw bytes, evtbuf accumulates job structs.
47- **freed**: `clearRetainingCapacity()` on flush — backing memory is KEPT, only len reset to 0
48- **bounded**: yes — max ~400 entries before flush. outbuf max ~400 × 5KB = ~2MB. backing array ~4MB max.
49- **status**: CLEAN (bounded, retains capacity)
50
51### 1f. SharedFrame for broadcast
52- **file**: broadcaster.zig:50-59 (`SharedFrame.create`)
53- **what**: allocates SharedFrame struct + dupes frame data
54- **size**: sizeof(SharedFrame) (~48 bytes) + data.len (~2-5KB)
55- **freed**: ref-counted. broadcaster releases immediately (`defer frame.release()` line 372). each consumer releases after send (writeLoop line 231). freed when refcount hits 0 (line 66-70).
56- **status**: CLEAN (if no consumers, created+freed immediately)
57
58### 1g. ring buffer history entry
59- **file**: ring_buffer.zig:55 via broadcaster.zig:368
60- **what**: `self.allocator.dupe(u8, data)` — copies frame data into history
61- **size**: ~2-5KB per frame (resequenced CBOR)
62- **freed**: on overwrite when buffer is full (ring_buffer.zig:60-62)
63- **bounded**: 50,000 entries. once full, every push frees the oldest.
64- **steady state**: 50K × ~3KB avg = ~150MB
65- **status**: CLEAN (bounded)
66
67### 1h. resequenceFrame temporaries
68- **file**: broadcaster.zig:117-156, called from frame_worker.zig:232
69- **what**: CBOR decode + re-encode with new seq. allocates ArrayList, CBOR encode buffers, result slice.
70- **size**: ~5-10KB temporaries
71- **freed**: allocated on frame worker's arena (1c), freed when arena deinits
72- **status**: CLEAN
73
74### 1i. collection index keys (per commit with ops)
75- **file**: collection_index.zig:79-82 via trackCommitOps → addCollection
76- **what**: `makeKey()` allocates 2 keys: `collection\0did` and `did\0collection`
77- **size**: ~100-200 bytes per key pair
78- **freed**: `defer self.allocator.free(rbc_key)` / `defer self.allocator.free(cbr_key)` — same function
79- **status**: CLEAN
80
81### 1j. postgres queries per frame (via pg.zig)
82each frame triggers 3-5 postgres queries. each query allocates internally:
83
84| query | file | method | per-frame? |
85|-------|------|--------|------------|
86| DID→UID lookup | event_log.zig:294 | rowUnsafe | yes (cache miss only) |
87| DID→UID create | event_log.zig:306 | exec | yes (first encounter only) |
88| DID→UID readback | event_log.zig:315 | rowUnsafe | yes (first encounter only) |
89| isAccountActive | event_log.zig:398 | rowUnsafe | yes (commits/syncs) |
90| getAccountState | event_log.zig:339 | rowUnsafe | yes (commits, for rev check) |
91| updateAccountState | event_log.zig:358 | exec | yes (validated commits) |
92| updateUpstreamStatus | event_log.zig:389 | exec | yes (#account events) |
93| updateHostSeq | event_log.zig:458 | exec | every 4s per host |
94| getAccountHostId | event_log.zig:369 | rowUnsafe | yes (uidForDidFromHost) |
95
96**pg.zig internals** (from karlseguin/pg.zig):
97- `Pool.rowUnsafe()`: acquires connection from pool, creates a `Result` with its own ArenaAllocator
98- `Result.deinit()`: releases connection back to pool, destroys arena
99- `QueryRowUnsafe.deinit()`: calls `result.drain()` THEN `result.deinit()`
100
101**POTENTIAL ISSUE — pg.zig deinit error path**:
102```zig
103// pg.zig QueryRowUnsafe.deinit():
104pub fn deinit(self: *QueryRowUnsafe) !void {
105 try self.result.drain(); // if this errors...
106 self.result.deinit(); // ...this NEVER runs → arena leak + connection leak
107}
108```
109zlay callers use `defer row.deinit() catch {};` — if drain() errors, the Result arena and connection are LEAKED. drain() errors on network failure to postgres.
110
111- **likelihood**: low per-frame (postgres is local), but at 2,000+ queries/sec, even 0.01% failure = 0.2 leaks/sec
112- **size per leak**: ~1-4KB (pg arena) + 1 connection slot
113- **impact at 0.01%**: ~0.7 MB/hour (doesn't explain 290 MB/hour)
114- **impact at 0.1%**: ~7 MB/hour (partial explanation?)
115
116### 1k. validator arenas (per validated commit/sync)
117- **file**: validator.zig:161-163 (validateSync) and validator.zig:242-244 (verifyCommit)
118- **what**: `ArenaAllocator.init(self.allocator)` for CAR parsing + signature verification
119- **size**: depends on CAR size. typical commit: ~5-50KB used.
120- **freed**: `defer arena.deinit()` — same function
121- **note**: these were NOT changed in the page_allocator experiment
122- **status**: CLEAN (assuming arena.deinit works correctly)
123
124### 1l. getAccountState string dupes
125- **file**: event_log.zig:348-349
126- **what**: `allocator.dupe(u8, rev)` and `allocator.dupe(u8, data_cid)` — copies from pg result into caller's arena
127- **size**: ~20-50 bytes each (TID rev + multibase CID)
128- **freed**: allocated on frame worker's arena (1c), freed when arena deinits
129- **status**: CLEAN
130
131---
132
133## 2. per-connection allocations (~2,750 PDS hosts)
134
135### 2a. subscriber struct + hostname
136- **file**: slurper.zig:426-427, 423
137- **what**: `allocator.create(Subscriber)` + `allocator.dupe(u8, hostname)`
138- **size**: sizeof(Subscriber) + hostname (~20-50 bytes)
139- **freed**: slurper.zig:468-469 (runWorker cleanup on exit)
140- **status**: CLEAN (bounded by host count)
141
142### 2b. websocket.Client per connection
143- **file**: subscriber.zig:220-227
144- **what**: `websocket.Client.init(self.allocator, ...)` — TLS buffers, read buffers
145- **size**: TLS handshake buffers (~134KB), static read buffer (4096 bytes)
146- **freed**: `defer client.deinit()` — subscriber.zig:227
147- **status**: CLEAN (created/freed per connection attempt)
148
149### 2c. websocket dynamic read buffers
150- **what**: for messages > 4096 bytes, BufferProvider allocates a dynamic buffer
151- **freed**: `reader.done()` → `restoreStatic()` → `provider.release()` → `allocator.free()`
152- **status**: CLEAN (freed after each message)
153
154### 2d. shared TLS CA bundle
155- **file**: slurper.zig:251-253
156- **what**: `bundle.rescan(self.allocator)` — loads system CA certs
157- **size**: ~100-200KB (one-time)
158- **freed**: slurper.zig:575 `b.deinit(self.allocator)`
159- **status**: CLEAN (one-time, shared)
160
161### 2e. thread stacks
162- **what**: each subscriber thread gets 8MB virtual stack (`default_stack_size`)
163- **count**: ~2,750 subscriber threads + 16 worker threads + 4 resolver threads + 3 background threads = ~2,773 threads
164- **total virtual**: ~22 GiB (but only touched pages count as RSS)
165- **status**: BOUNDED (pages are returned when threads exit)
166
167---
168
169## 3. bounded caches and data structures
170
171### 3a. DID → UID cache (LRU)
172- **file**: event_log.zig:96, lru.zig
173- **capacity**: 500,000 entries
174- **per entry**: duped key string (~30 bytes DID) + Node struct (~80 bytes) + u64 value + HashMap entry
175- **steady state**: ~55-65 MB
176- **status**: BOUNDED (evicts LRU on put when full)
177
178### 3b. validator signing key cache (LRU)
179- **file**: validator.zig:50, lru.zig
180- **capacity**: 250,000 entries
181- **per entry**: duped key string (~30 bytes DID) + Node struct (~80 bytes) + CachedKey (fixed 42 bytes) + HashMap entry
182- **steady state**: ~30-40 MB
183- **status**: BOUNDED (evicts LRU on put when full)
184
185### 3c. resolve queue
186- **file**: validator.zig:52-54
187- **what**: `queue: ArrayListUnmanaged([]const u8)` + `queued_set: StringHashMapUnmanaged(void)`
188- **per entry**: duped DID string (~30 bytes) + HashMap entry
189- **max size**: 100,000 (validator.zig:63)
190- **freed**: DID freed after resolution (validator.zig:419)
191- **status**: BOUNDED
192
193### 3d. workers map
194- **file**: slurper.zig:217
195- **what**: `std.AutoHashMapUnmanaged(u64, WorkerEntry)` — host_id → thread+subscriber
196- **size**: bounded by host count (~2,750)
197- **status**: BOUNDED
198
199### 3e. crawl request queue
200- **file**: slurper.zig:221
201- **what**: `ArrayListUnmanaged([]const u8)` — duped hostnames waiting for processing
202- **freed**: slurper.zig:522 `defer self.allocator.free(h)` after processing
203- **status**: BOUNDED (grows slowly, processed continuously)
204
205### 3f. frame pool ring buffers
206- **file**: thread_pool.zig
207- **what**: pre-allocated ring buffers per worker. zero alloc per submit.
208- **size**: 16 workers × 4096 capacity × sizeof(FrameWork) per entry
209- **status**: FIXED SIZE (no growth)
210
211### 3g. consumer list
212- **file**: broadcaster.zig:291
213- **what**: `ArrayListUnmanaged(*Consumer)` — active downstream WebSocket consumers
214- **per consumer**: Consumer struct (~66KB for 8192-entry SharedFrame pointer buffer) + write thread
215- **status**: BOUNDED by connected consumers (typically 0-10)
216
217---
218
219## 4. API handler allocations (cold paths)
220
221### 4a. handleAdminListHosts — ALLOCATES ON c_allocator
222- **file**: api/admin.zig:89-99
223- **what**: `persist.listAllHosts(persist.allocator)` — dupes hostname+status for each host
224- **freed**: defer block frees all hostname/status strings + slice
225- **status**: CLEAN
226
227### 4b. handleBan — LEAKS (cold path)
228- **file**: api/admin.zig:70-78
229- **what**: `buildAccountFrame(ctx.persist.allocator, did)` → allocates CBOR frame on c_allocator
230- **what**: `broadcaster.resequenceFrame(ctx.persist.allocator, frame_bytes, relay_seq)` → allocates resequenced frame on c_allocator
231- **freed**: NEITHER frame_bytes NOR broadcast_data is freed after broadcast
232- **size**: ~200-500 bytes per ban
233- **status**: **LEAK** — but cold path (admin-only, negligible impact)
234
235### 4c. handleAdminBackfillStatus
236- **file**: api/admin.zig:197-201
237- **what**: `backfiller.getStatus(backfiller.allocator)` → builds JSON string on c_allocator
238- **freed**: `defer backfiller.allocator.free(body)`
239- **status**: CLEAN
240
241### 4d. xrpc handlers (listRepos, getRepoStatus, etc.)
242- **what**: all use stack-allocated fixed buffers (65536 bytes) or pg query iteration
243- **no heap allocation** in the handler code itself
244- **pg queries**: each creates/destroys internal pg arenas (same pattern as 1j)
245- **status**: CLEAN
246
247### 4e. handleRequestCrawl
248- **file**: api/xrpc.zig:429
249- **what**: `std.json.parseFromSlice(...)` on `slurper.allocator` — parses JSON body
250- **freed**: `defer parsed.deinit()`
251- **what**: `validateHostname(slurper.allocator, ...)` — allocates normalized hostname
252- **freed**: `defer slurper.allocator.free(hostname)`
253- **what**: `slurper.addCrawlRequest(hostname)` → dupes hostname
254- **freed**: by crawl processor (slurper.zig:522)
255- **status**: CLEAN
256
257---
258
259## 5. one-time startup allocations
260
261| what | file | size |
262|------|------|------|
263| Broadcaster struct | main.zig:140 | ~400KB (history array) |
264| Validator struct | main.zig:143 | ~100 bytes |
265| DiskPersist (+ pg pool) | main.zig:149 | ~10KB + pg connections |
266| CollectionIndex (RocksDB) | main.zig:169 | RocksDB internal (~50-100MB) |
267| Backfiller struct | main.zig:176 | ~100 bytes |
268| CA bundle | slurper.zig:251 | ~100-200KB |
269| Frame pool | slurper.zig:257 | pre-allocated ring buffers |
270| Error frame (CBOR) | broadcaster.zig:306 | ~50 bytes |
271| MetricsServer struct | main.zig:216 | ~100 bytes |
272| build_options module | (compile-time) | ~100 bytes |
273
274---
275
276## 6. backfill allocations (when running)
277
278### 6a. discoverCollections
279- **file**: backfill.zig:101-148
280- **what**: fetches lexicon garden llms.txt + RBC scan, deduplicates
281- **freed**: all temporaries freed via defer blocks
282- **status**: CLEAN (only runs when admin triggers backfill)
283
284### 6b. backfillCollection per-page
285- **file**: backfill.zig:269-318
286- **what**: per-page: HTTP fetch → JSON parse → dupe DIDs → addCollection to RocksDB
287- **freed**: all duped strings freed in defer blocks
288- **http client**: reused across pages for one collection, freed after collection done
289- **status**: CLEAN
290
291---
292
293## 7. dependency internal allocations
294
295### 7a. RocksDB (via rocksdb-zig)
296- internal memory managed by RocksDB C library (block cache, memtables, etc.)
297- not tracked by c_allocator — uses its own allocator
298- bounded by RocksDB options (write_buffer_size, max_open_files, block_cache_size)
299
300### 7b. pg.zig connection pool
301- **file**: initialized in event_log.zig:132 with `size = 5`
302- 5 connections, each with internal read/write buffers
303- **status**: BOUNDED (5 connections)
304
305### 7c. zat.DidResolver (per resolver thread)
306- **file**: validator.zig:401
307- **what**: `zat.DidResolver.init(self.allocator)` — creates HTTP client for DID resolution
308- **long-lived**: one per resolver thread (4 total), lives until shutdown
309- **internal**: likely holds std.http.Client with connection pool
310- **potential issue**: if std.http.Client pools connections to many unique hosts (PLC server, PDS endpoints), the pool could grow. but PLC is typically one host (plc.directory).
311
312### 7d. zat CBOR decode
313- all CBOR decode operations are on arena allocators (subscriber arena or worker arena)
314- freed when arena deinits
315- **status**: CLEAN
316
317---
318
319## 8. summary of potential issues
320
321### confirmed leak (cold path, negligible):
322- **admin handleBan**: leaks frame_bytes + broadcast_data (~500 bytes per ban)
323
324### potential leak under network errors:
325- **pg.zig QueryRowUnsafe.deinit()**: if drain() errors, Result arena + connection leak
326- at 2,000+ queries/sec, even rare failures accumulate
327- needs investigation: check postgres error rate in logs
328
329### fragmentation concerns:
330- ~700 frames/sec × ~10 alloc/free cycles per frame = ~7,000 alloc/free operations per second on c_allocator
331- many different sizes (28-byte headers, 2-5KB frames, 100-byte keys, 1-4KB pg arenas)
332- glibc with MALLOC_ARENA_MAX=2 concentrates fragmentation into 2 arenas
333- mallinfo() only reports the MAIN arena — second arena is invisible
334- malloc_trim(0) only trims the main arena — second arena is untrimmed
335
336### items NOT investigated:
337- zat library internals (CBOR allocator patterns, DID resolver HTTP client connection pooling)
338- rocksdb-zig binding allocations (WriteBatch, Iterator internal state)
339- std.http.Client internal connection/TLS buffer retention within zat.DidResolver
340
341---
342
343## 9. per-frame allocation count summary
344
345for a typical validated #commit frame, approximately:
346
347| step | allocs | frees | net | size |
348|------|--------|-------|-----|------|
349| subscriber arena init | 1 chunk | 0 | +1 | ~32KB |
350| subscriber CBOR decode | ~5 | 0 | +5 | ~2KB |
351| subscriber arena deinit | 0 | 1 chunk | -1 | ~32KB |
352| frame data dupe | 1 | 0 | +1 | ~3KB |
353| worker arena init | 1 chunk | 0 | +1 | ~32KB |
354| worker CBOR re-decode | ~5 | 0 | +5 | ~2KB |
355| pg: uidForDid (cache hit) | 0 | 0 | 0 | 0 |
356| pg: isAccountActive | 1 arena | 1 arena | 0 | ~2KB |
357| pg: getAccountState | 1 arena | 1 arena | 0 | ~2KB |
358| multibase encode | 1 | 0 | +1 | ~50B |
359| validator arena init | 1 chunk | 0 | +1 | ~32KB |
360| verifyCommitCar | ~10 | 0 | +10 | ~20KB |
361| validator arena deinit | 0 | 1 chunk | -1 | ~32KB |
362| persist data alloc | 1 | 0 | +1 | ~3KB |
363| resequenceFrame | ~5 | 0 | +5 | ~5KB |
364| SharedFrame.create | 2 (struct+data) | 0 | +2 | ~3KB |
365| ring buffer dupe | 1 | 1 (overwrite) | 0 | ~3KB |
366| collection index keys | 2 | 2 | 0 | ~200B |
367| worker arena deinit | 0 | 1 chunk | -1 | ~32KB |
368| frame data free | 0 | 1 | -1 | ~3KB |
369| persist flush (batched) | 0 | 1 | -1 | ~3KB |
370| SharedFrame release | 0 | 2 | -2 | ~3KB |
371| **TOTAL per frame** | **~37** | **~12** | 0 | 0 |
372
373all allocations balance out. yet RSS grows linearly. the remaining hypotheses are:
3741. glibc malloc fragmentation in the per-thread arenas (invisible to mallinfo/malloc_trim)
3752. a leak in a dependency (pg.zig error path, zat internals, rocksdb-zig)
3763. a leak we haven't found in the zig code