this repo has no description coral.waow.tech
at main 322 lines 11 kB view raw
1const std = @import("std"); 2const net = std.net; 3const http = std.http; 4const mem = std.mem; 5const json = std.json; 6const zat = @import("zat"); 7const lattice = @import("lattice.zig"); 8const entity_graph = @import("entity_graph.zig"); 9const ner = @import("ner.zig"); 10 11const log = std.log.scoped(.http); 12 13const HTTP_BUF_SIZE = 8192; 14 15// threshold below which we consider firehose degraded (normal is ~50/s) 16const FIREHOSE_DEGRADED_THRESHOLD: f32 = 10.0; 17 18pub fn handleConnection(conn: net.Server.Connection) void { 19 defer conn.stream.close(); 20 21 var read_buffer: [HTTP_BUF_SIZE]u8 = undefined; 22 var write_buffer: [HTTP_BUF_SIZE]u8 = undefined; 23 24 var reader = conn.stream.reader(&read_buffer); 25 var writer = conn.stream.writer(&write_buffer); 26 27 var server = http.Server.init(reader.interface(), &writer.interface); 28 29 while (true) { 30 var request = server.receiveHead() catch |err| { 31 if (err != error.HttpConnectionClosing and err != error.EndOfStream) { 32 log.debug("http receive error: {}", .{err}); 33 } 34 return; 35 }; 36 37 handleRequest(&request) catch |err| { 38 log.err("request error: {}", .{err}); 39 return; 40 }; 41 42 if (!request.head.keep_alive) return; 43 } 44} 45 46fn handleRequest(request: *http.Server.Request) !void { 47 const target = request.head.target; 48 49 // cors preflight 50 if (request.head.method == .OPTIONS) { 51 try sendCorsHeaders(request, ""); 52 return; 53 } 54 55 if (mem.eql(u8, target, "/health")) { 56 try sendJson(request, "{\"status\":\"ok\"}"); 57 } else if (mem.eql(u8, target, "/stats")) { 58 try handleStats(request); 59 } else if (mem.startsWith(u8, target, "/state/")) { 60 try handleState(request, target); 61 } else if (mem.eql(u8, target, "/entity-graph")) { 62 try handleEntityGraph(request); 63 } else if (mem.eql(u8, target, "/groups")) { 64 try handleGroups(request); 65 } else if (mem.eql(u8, target, "/diagnostics")) { 66 try handleDiagnostics(request); 67 } else if (mem.eql(u8, target, "/")) { 68 try sendJson(request, "{\"name\":\"coral\",\"endpoints\":[\"/stats\",\"/state/32\",\"/state/128\",\"/state/512\",\"/entity-graph\",\"/groups\",\"/diagnostics\"]}"); 69 } else { 70 try sendNotFound(request); 71 } 72} 73 74fn handleStats(request: *http.Server.Request) !void { 75 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 76 defer arena.deinit(); 77 const alloc = arena.allocator(); 78 79 const world = lattice.get(); 80 const small = world.small.getStats(); 81 const medium = world.medium.getStats(); 82 const large = world.large.getStats(); 83 const events = world.getEventCount(); 84 const event_rate = world.getEventRate(); 85 86 var buf: std.ArrayList(u8) = .empty; 87 defer buf.deinit(alloc); 88 const w = buf.writer(alloc); 89 90 try w.writeAll("{"); 91 92 const fh_rate = ner.firehoseRate(); 93 try w.print("\"events\":{d},", .{events}); 94 try w.print("\"events_per_sec\":{d:.2},", .{event_rate}); 95 try w.print("\"firehose_rate\":{d:.2},", .{fh_rate}); 96 try w.print("\"firehose_degraded\":{},", .{fh_rate < FIREHOSE_DEGRADED_THRESHOLD}); 97 98 if (ner.currentHost()) |host| { 99 try w.print("\"jetstream_url\":\"wss://{s}/subscribe\",", .{host}); 100 try w.print("\"jetstream_failover\":{},", .{ner.isFailover()}); 101 } 102 103 try w.writeAll("\"lattices\":{"); 104 105 // small lattice 106 try w.writeAll("\"32\":{"); 107 try w.print("\"occupied\":{d},", .{small.occupied}); 108 try w.print("\"density\":{d:.4},", .{small.density()}); 109 try w.print("\"clusters\":{d},", .{small.clusters}); 110 try w.print("\"largest\":{d},", .{small.largest}); 111 try w.print("\"percolates\":{}", .{small.percolates}); 112 try w.writeAll("},"); 113 114 // medium lattice 115 try w.writeAll("\"128\":{"); 116 try w.print("\"occupied\":{d},", .{medium.occupied}); 117 try w.print("\"density\":{d:.4},", .{medium.density()}); 118 try w.print("\"clusters\":{d},", .{medium.clusters}); 119 try w.print("\"largest\":{d},", .{medium.largest}); 120 try w.print("\"percolates\":{}", .{medium.percolates}); 121 try w.writeAll("},"); 122 123 // large lattice 124 try w.writeAll("\"512\":{"); 125 try w.print("\"occupied\":{d},", .{large.occupied}); 126 try w.print("\"density\":{d:.4},", .{large.density()}); 127 try w.print("\"clusters\":{d},", .{large.clusters}); 128 try w.print("\"largest\":{d},", .{large.largest}); 129 try w.print("\"percolates\":{}", .{large.percolates}); 130 try w.writeAll("}"); 131 132 try w.writeAll("},"); 133 134 // transition tracking (medium lattice - near critical) 135 try w.writeAll("\"transitions\":{"); 136 try w.print("\"count\":{d},", .{world.medium_tracker.transitions}); 137 try w.print("\"events_since\":{d},", .{world.medium_tracker.events_since_transition}); 138 try w.print("\"currently_percolating\":{}", .{world.medium_tracker.last_percolated}); 139 try w.writeAll("}"); 140 141 try w.writeAll("}"); 142 143 try sendJson(request, buf.items); 144} 145 146/// GET /entity-graph - returns entity graph state as JSON 147fn handleEntityGraph(request: *http.Server.Request) !void { 148 // update clusters before returning stats 149 entity_graph.graph.updateClusters(); 150 151 // serialize to JSON (1MB buffer — 272+ active entities with edges can exceed 256KB) 152 var buf: [1024 * 1024]u8 = undefined; 153 const json_data = entity_graph.graph.toJson(&buf); 154 155 try sendJson(request, json_data); 156} 157 158/// POST /groups - accept LLM-curated named groups from NER bridge 159fn handleGroups(request: *http.Server.Request) !void { 160 if (request.head.method != .POST) { 161 try sendJson(request, "{\"error\":\"method not allowed\"}"); 162 return; 163 } 164 165 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 166 defer arena.deinit(); 167 const alloc = arena.allocator(); 168 169 var body_buffer: [1024 * 16]u8 = undefined; 170 const body_reader = request.readerExpectNone(&body_buffer); 171 172 var body_collector: std.Io.Writer.Allocating = .init(alloc); 173 defer body_collector.deinit(); 174 175 _ = body_reader.stream(&body_collector.writer, .unlimited) catch |err| { 176 log.err("failed to read groups body: {}", .{err}); 177 try sendJson(request, "{\"error\":\"failed to read body\"}"); 178 return; 179 }; 180 181 const body = body_collector.toArrayList().items; 182 183 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 184 try sendJson(request, "{\"error\":\"invalid JSON\"}"); 185 return; 186 }; 187 188 const groups_items = zat.json.getArray(parsed.value, "groups") orelse { 189 try sendJson(request, "{\"error\":\"missing or invalid groups array\"}"); 190 return; 191 }; 192 193 // parse groups into fixed-size structs 194 var groups: [entity_graph.MAX_GROUPS]entity_graph.Group = [_]entity_graph.Group{.{}} ** entity_graph.MAX_GROUPS; 195 var group_count: u8 = 0; 196 197 for (groups_items) |group_val| { 198 if (group_count >= entity_graph.MAX_GROUPS) break; 199 200 const name_str = zat.json.getString(group_val, "name") orelse continue; 201 202 var grp = entity_graph.Group{}; 203 204 // set name 205 const name_len: u8 = @intCast(@min(name_str.len, 64)); 206 @memcpy(grp.name[0..name_len], name_str[0..name_len]); 207 grp.name_len = name_len; 208 209 // set entities (array of raw strings) 210 if (zat.json.getArray(group_val, "entities")) |ent_items| { 211 for (ent_items) |ent_val| { 212 if (grp.entity_count >= entity_graph.MAX_GROUP_ENTITIES) break; 213 if (ent_val != .string) continue; 214 const ent_len: u8 = @intCast(@min(ent_val.string.len, 64)); 215 @memcpy(grp.entity_texts[grp.entity_count][0..ent_len], ent_val.string[0..ent_len]); 216 grp.entity_text_lens[grp.entity_count] = ent_len; 217 grp.entity_count += 1; 218 } 219 } 220 221 groups[group_count] = grp; 222 group_count += 1; 223 } 224 225 entity_graph.graph.mutex.lock(); 226 defer entity_graph.graph.mutex.unlock(); 227 228 entity_graph.graph.setGroups(&groups, group_count); 229 230 // optional haiku field 231 if (zat.json.getString(parsed.value, "haiku")) |haiku_str| { 232 entity_graph.graph.setHaiku(haiku_str); 233 } 234 235 log.info("groups: set {d} groups", .{group_count}); 236 237 var buf: [64]u8 = undefined; 238 const response = std.fmt.bufPrint(&buf, "{{\"set\":{d}}}", .{group_count}) catch "{\"set\":0}"; 239 try sendJson(request, response); 240} 241 242/// GET /diagnostics - returns distributions and analysis data 243fn handleDiagnostics(request: *http.Server.Request) !void { 244 var buf: [64 * 1024]u8 = undefined; 245 const json_data = entity_graph.graph.toDiagnosticsJson(&buf); 246 try sendJson(request, json_data); 247} 248 249fn handleState(request: *http.Server.Request, target: []const u8) !void { 250 // parse size from /state/{size} 251 const size_str = target[7..]; // skip "/state/" 252 const size = std.fmt.parseInt(usize, size_str, 10) catch { 253 try sendNotFound(request); 254 return; 255 }; 256 257 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 258 defer arena.deinit(); 259 const alloc = arena.allocator(); 260 261 const world = lattice.get(); 262 263 // get bitmap based on requested size 264 const bitmap = switch (size) { 265 32 => try world.small.getStateBitmap(alloc), 266 128 => try world.medium.getStateBitmap(alloc), 267 512 => try world.large.getStateBitmap(alloc), 268 else => { 269 try sendNotFound(request); 270 return; 271 }, 272 }; 273 274 // encode as base64 for JSON transport 275 const encoded_len = std.base64.standard.Encoder.calcSize(bitmap.len); 276 const encoded = try alloc.alloc(u8, encoded_len); 277 _ = std.base64.standard.Encoder.encode(encoded, bitmap); 278 279 // build JSON response 280 var buf: std.ArrayList(u8) = .empty; 281 defer buf.deinit(alloc); 282 const w = buf.writer(alloc); 283 284 try w.print("{{\"size\":{d},\"bitmap\":\"{s}\"}}", .{ size, encoded }); 285 286 try sendJson(request, buf.items); 287} 288 289fn sendJson(request: *http.Server.Request, body: []const u8) !void { 290 try request.respond(body, .{ 291 .status = .ok, 292 .extra_headers = &.{ 293 .{ .name = "content-type", .value = "application/json" }, 294 .{ .name = "access-control-allow-origin", .value = "*" }, 295 .{ .name = "access-control-allow-methods", .value = "GET, POST, OPTIONS" }, 296 .{ .name = "access-control-allow-headers", .value = "content-type" }, 297 .{ .name = "cache-control", .value = "no-cache" }, 298 }, 299 }); 300} 301 302fn sendCorsHeaders(request: *http.Server.Request, body: []const u8) !void { 303 try request.respond(body, .{ 304 .status = .no_content, 305 .extra_headers = &.{ 306 .{ .name = "access-control-allow-origin", .value = "*" }, 307 .{ .name = "access-control-allow-methods", .value = "GET, POST, OPTIONS" }, 308 .{ .name = "access-control-allow-headers", .value = "content-type" }, 309 }, 310 }); 311} 312 313fn sendNotFound(request: *http.Server.Request) !void { 314 try request.respond("{\"error\":\"not found\"}", .{ 315 .status = .not_found, 316 .extra_headers = &.{ 317 .{ .name = "content-type", .value = "application/json" }, 318 .{ .name = "access-control-allow-origin", .value = "*" }, 319 }, 320 }); 321} 322