this repo has no description
coral.waow.tech
1const std = @import("std");
2const net = std.net;
3const http = std.http;
4const mem = std.mem;
5const json = std.json;
6const zat = @import("zat");
7const lattice = @import("lattice.zig");
8const entity_graph = @import("entity_graph.zig");
9const ner = @import("ner.zig");
10
11const log = std.log.scoped(.http);
12
13const HTTP_BUF_SIZE = 8192;
14
15// threshold below which we consider firehose degraded (normal is ~50/s)
16const FIREHOSE_DEGRADED_THRESHOLD: f32 = 10.0;
17
18pub fn handleConnection(conn: net.Server.Connection) void {
19 defer conn.stream.close();
20
21 var read_buffer: [HTTP_BUF_SIZE]u8 = undefined;
22 var write_buffer: [HTTP_BUF_SIZE]u8 = undefined;
23
24 var reader = conn.stream.reader(&read_buffer);
25 var writer = conn.stream.writer(&write_buffer);
26
27 var server = http.Server.init(reader.interface(), &writer.interface);
28
29 while (true) {
30 var request = server.receiveHead() catch |err| {
31 if (err != error.HttpConnectionClosing and err != error.EndOfStream) {
32 log.debug("http receive error: {}", .{err});
33 }
34 return;
35 };
36
37 handleRequest(&request) catch |err| {
38 log.err("request error: {}", .{err});
39 return;
40 };
41
42 if (!request.head.keep_alive) return;
43 }
44}
45
46fn handleRequest(request: *http.Server.Request) !void {
47 const target = request.head.target;
48
49 // cors preflight
50 if (request.head.method == .OPTIONS) {
51 try sendCorsHeaders(request, "");
52 return;
53 }
54
55 if (mem.eql(u8, target, "/health")) {
56 try sendJson(request, "{\"status\":\"ok\"}");
57 } else if (mem.eql(u8, target, "/stats")) {
58 try handleStats(request);
59 } else if (mem.startsWith(u8, target, "/state/")) {
60 try handleState(request, target);
61 } else if (mem.eql(u8, target, "/entity-graph")) {
62 try handleEntityGraph(request);
63 } else if (mem.eql(u8, target, "/groups")) {
64 try handleGroups(request);
65 } else if (mem.eql(u8, target, "/diagnostics")) {
66 try handleDiagnostics(request);
67 } else if (mem.eql(u8, target, "/")) {
68 try sendJson(request, "{\"name\":\"coral\",\"endpoints\":[\"/stats\",\"/state/32\",\"/state/128\",\"/state/512\",\"/entity-graph\",\"/groups\",\"/diagnostics\"]}");
69 } else {
70 try sendNotFound(request);
71 }
72}
73
74fn handleStats(request: *http.Server.Request) !void {
75 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
76 defer arena.deinit();
77 const alloc = arena.allocator();
78
79 const world = lattice.get();
80 const small = world.small.getStats();
81 const medium = world.medium.getStats();
82 const large = world.large.getStats();
83 const events = world.getEventCount();
84 const event_rate = world.getEventRate();
85
86 var buf: std.ArrayList(u8) = .empty;
87 defer buf.deinit(alloc);
88 const w = buf.writer(alloc);
89
90 try w.writeAll("{");
91
92 const fh_rate = ner.firehoseRate();
93 try w.print("\"events\":{d},", .{events});
94 try w.print("\"events_per_sec\":{d:.2},", .{event_rate});
95 try w.print("\"firehose_rate\":{d:.2},", .{fh_rate});
96 try w.print("\"firehose_degraded\":{},", .{fh_rate < FIREHOSE_DEGRADED_THRESHOLD});
97
98 if (ner.currentHost()) |host| {
99 try w.print("\"jetstream_url\":\"wss://{s}/subscribe\",", .{host});
100 try w.print("\"jetstream_failover\":{},", .{ner.isFailover()});
101 }
102
103 try w.writeAll("\"lattices\":{");
104
105 // small lattice
106 try w.writeAll("\"32\":{");
107 try w.print("\"occupied\":{d},", .{small.occupied});
108 try w.print("\"density\":{d:.4},", .{small.density()});
109 try w.print("\"clusters\":{d},", .{small.clusters});
110 try w.print("\"largest\":{d},", .{small.largest});
111 try w.print("\"percolates\":{}", .{small.percolates});
112 try w.writeAll("},");
113
114 // medium lattice
115 try w.writeAll("\"128\":{");
116 try w.print("\"occupied\":{d},", .{medium.occupied});
117 try w.print("\"density\":{d:.4},", .{medium.density()});
118 try w.print("\"clusters\":{d},", .{medium.clusters});
119 try w.print("\"largest\":{d},", .{medium.largest});
120 try w.print("\"percolates\":{}", .{medium.percolates});
121 try w.writeAll("},");
122
123 // large lattice
124 try w.writeAll("\"512\":{");
125 try w.print("\"occupied\":{d},", .{large.occupied});
126 try w.print("\"density\":{d:.4},", .{large.density()});
127 try w.print("\"clusters\":{d},", .{large.clusters});
128 try w.print("\"largest\":{d},", .{large.largest});
129 try w.print("\"percolates\":{}", .{large.percolates});
130 try w.writeAll("}");
131
132 try w.writeAll("},");
133
134 // transition tracking (medium lattice - near critical)
135 try w.writeAll("\"transitions\":{");
136 try w.print("\"count\":{d},", .{world.medium_tracker.transitions});
137 try w.print("\"events_since\":{d},", .{world.medium_tracker.events_since_transition});
138 try w.print("\"currently_percolating\":{}", .{world.medium_tracker.last_percolated});
139 try w.writeAll("}");
140
141 try w.writeAll("}");
142
143 try sendJson(request, buf.items);
144}
145
146/// GET /entity-graph - returns entity graph state as JSON
147fn handleEntityGraph(request: *http.Server.Request) !void {
148 // update clusters before returning stats
149 entity_graph.graph.updateClusters();
150
151 // serialize to JSON (1MB buffer — 272+ active entities with edges can exceed 256KB)
152 var buf: [1024 * 1024]u8 = undefined;
153 const json_data = entity_graph.graph.toJson(&buf);
154
155 try sendJson(request, json_data);
156}
157
158/// POST /groups - accept LLM-curated named groups from NER bridge
159fn handleGroups(request: *http.Server.Request) !void {
160 if (request.head.method != .POST) {
161 try sendJson(request, "{\"error\":\"method not allowed\"}");
162 return;
163 }
164
165 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
166 defer arena.deinit();
167 const alloc = arena.allocator();
168
169 var body_buffer: [1024 * 16]u8 = undefined;
170 const body_reader = request.readerExpectNone(&body_buffer);
171
172 var body_collector: std.Io.Writer.Allocating = .init(alloc);
173 defer body_collector.deinit();
174
175 _ = body_reader.stream(&body_collector.writer, .unlimited) catch |err| {
176 log.err("failed to read groups body: {}", .{err});
177 try sendJson(request, "{\"error\":\"failed to read body\"}");
178 return;
179 };
180
181 const body = body_collector.toArrayList().items;
182
183 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
184 try sendJson(request, "{\"error\":\"invalid JSON\"}");
185 return;
186 };
187
188 const groups_items = zat.json.getArray(parsed.value, "groups") orelse {
189 try sendJson(request, "{\"error\":\"missing or invalid groups array\"}");
190 return;
191 };
192
193 // parse groups into fixed-size structs
194 var groups: [entity_graph.MAX_GROUPS]entity_graph.Group = [_]entity_graph.Group{.{}} ** entity_graph.MAX_GROUPS;
195 var group_count: u8 = 0;
196
197 for (groups_items) |group_val| {
198 if (group_count >= entity_graph.MAX_GROUPS) break;
199
200 const name_str = zat.json.getString(group_val, "name") orelse continue;
201
202 var grp = entity_graph.Group{};
203
204 // set name
205 const name_len: u8 = @intCast(@min(name_str.len, 64));
206 @memcpy(grp.name[0..name_len], name_str[0..name_len]);
207 grp.name_len = name_len;
208
209 // set entities (array of raw strings)
210 if (zat.json.getArray(group_val, "entities")) |ent_items| {
211 for (ent_items) |ent_val| {
212 if (grp.entity_count >= entity_graph.MAX_GROUP_ENTITIES) break;
213 if (ent_val != .string) continue;
214 const ent_len: u8 = @intCast(@min(ent_val.string.len, 64));
215 @memcpy(grp.entity_texts[grp.entity_count][0..ent_len], ent_val.string[0..ent_len]);
216 grp.entity_text_lens[grp.entity_count] = ent_len;
217 grp.entity_count += 1;
218 }
219 }
220
221 groups[group_count] = grp;
222 group_count += 1;
223 }
224
225 entity_graph.graph.mutex.lock();
226 defer entity_graph.graph.mutex.unlock();
227
228 entity_graph.graph.setGroups(&groups, group_count);
229
230 // optional haiku field
231 if (zat.json.getString(parsed.value, "haiku")) |haiku_str| {
232 entity_graph.graph.setHaiku(haiku_str);
233 }
234
235 log.info("groups: set {d} groups", .{group_count});
236
237 var buf: [64]u8 = undefined;
238 const response = std.fmt.bufPrint(&buf, "{{\"set\":{d}}}", .{group_count}) catch "{\"set\":0}";
239 try sendJson(request, response);
240}
241
242/// GET /diagnostics - returns distributions and analysis data
243fn handleDiagnostics(request: *http.Server.Request) !void {
244 var buf: [64 * 1024]u8 = undefined;
245 const json_data = entity_graph.graph.toDiagnosticsJson(&buf);
246 try sendJson(request, json_data);
247}
248
249fn handleState(request: *http.Server.Request, target: []const u8) !void {
250 // parse size from /state/{size}
251 const size_str = target[7..]; // skip "/state/"
252 const size = std.fmt.parseInt(usize, size_str, 10) catch {
253 try sendNotFound(request);
254 return;
255 };
256
257 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
258 defer arena.deinit();
259 const alloc = arena.allocator();
260
261 const world = lattice.get();
262
263 // get bitmap based on requested size
264 const bitmap = switch (size) {
265 32 => try world.small.getStateBitmap(alloc),
266 128 => try world.medium.getStateBitmap(alloc),
267 512 => try world.large.getStateBitmap(alloc),
268 else => {
269 try sendNotFound(request);
270 return;
271 },
272 };
273
274 // encode as base64 for JSON transport
275 const encoded_len = std.base64.standard.Encoder.calcSize(bitmap.len);
276 const encoded = try alloc.alloc(u8, encoded_len);
277 _ = std.base64.standard.Encoder.encode(encoded, bitmap);
278
279 // build JSON response
280 var buf: std.ArrayList(u8) = .empty;
281 defer buf.deinit(alloc);
282 const w = buf.writer(alloc);
283
284 try w.print("{{\"size\":{d},\"bitmap\":\"{s}\"}}", .{ size, encoded });
285
286 try sendJson(request, buf.items);
287}
288
289fn sendJson(request: *http.Server.Request, body: []const u8) !void {
290 try request.respond(body, .{
291 .status = .ok,
292 .extra_headers = &.{
293 .{ .name = "content-type", .value = "application/json" },
294 .{ .name = "access-control-allow-origin", .value = "*" },
295 .{ .name = "access-control-allow-methods", .value = "GET, POST, OPTIONS" },
296 .{ .name = "access-control-allow-headers", .value = "content-type" },
297 .{ .name = "cache-control", .value = "no-cache" },
298 },
299 });
300}
301
302fn sendCorsHeaders(request: *http.Server.Request, body: []const u8) !void {
303 try request.respond(body, .{
304 .status = .no_content,
305 .extra_headers = &.{
306 .{ .name = "access-control-allow-origin", .value = "*" },
307 .{ .name = "access-control-allow-methods", .value = "GET, POST, OPTIONS" },
308 .{ .name = "access-control-allow-headers", .value = "content-type" },
309 },
310 });
311}
312
313fn sendNotFound(request: *http.Server.Request) !void {
314 try request.respond("{\"error\":\"not found\"}", .{
315 .status = .not_found,
316 .extra_headers = &.{
317 .{ .name = "content-type", .value = "application/json" },
318 .{ .name = "access-control-allow-origin", .value = "*" },
319 },
320 });
321}
322