this repo has no description coral.waow.tech
at main 304 lines 9.1 kB view raw
1//! jetstream consumer + spacez NER. 2//! 3//! consumes the Bluesky firehose via zat's JetstreamClient, 4//! runs spacez NER on post text, and feeds entities directly 5//! into the entity graph — no HTTP round-trip. 6 7const std = @import("std"); 8const zat = @import("zat"); 9const spacez = @import("spacez"); 10 11const lattice = @import("lattice.zig"); 12const entities = @import("entities.zig"); 13const entity_graph = @import("entity_graph.zig"); 14 15const log = std.log.scoped(.ner); 16 17const MAX_TEXT_LEN = 500; 18 19/// entity labels that coral cares about. 20const KEEP_LABELS = [_]spacez.Label{ 21 .PERSON, 22 .ORG, 23 .GPE, 24 .PRODUCT, 25 .EVENT, 26 .WORK_OF_ART, 27 .FAC, 28 .NORP, 29 .LOC, 30}; 31 32fn isKeptLabel(label: spacez.Label) bool { 33 for (KEEP_LABELS) |k| { 34 if (label == k) return true; 35 } 36 return false; 37} 38 39// ── firehose rate tracking ── 40 41var msg_count: std.atomic.Value(u64) = std.atomic.Value(u64).init(0); 42var rate: std.atomic.Value(f32) = std.atomic.Value(f32).init(0); 43 44// ── current host tracking ── 45 46var current_host_buf: [256]u8 = undefined; 47var current_host_len: std.atomic.Value(usize) = std.atomic.Value(usize).init(0); 48var is_failover: std.atomic.Value(bool) = std.atomic.Value(bool).init(false); 49 50/// current firehose message rate (messages/sec). 51pub fn firehoseRate() f32 { 52 return rate.load(.acquire); 53} 54 55/// currently connected jetstream host (e.g. "jetstream.waow.tech"). 56pub fn currentHost() ?[]const u8 { 57 const len = current_host_len.load(.acquire); 58 if (len == 0) return null; 59 return current_host_buf[0..len]; 60} 61 62/// whether we've failed over from the primary host. 63pub fn isFailover() bool { 64 return is_failover.load(.acquire); 65} 66 67// ── model state ── 68 69var model: ?spacez.Model = null; 70 71/// embedded weight file (~6MB, compiled into binary). 72const weight_bytes = @embedFile("weights/en_core_web_sm.bin"); 73 74pub fn init() !void { 75 model = try spacez.Model.load(weight_bytes); 76 log.info("spacez model loaded ({d} bytes)", .{weight_bytes.len}); 77} 78 79// ── jetstream handler ── 80 81const Handler = struct { 82 pub fn onEvent(self: *Handler, event: zat.JetstreamEvent) void { 83 _ = self; 84 _ = msg_count.fetchAdd(1, .monotonic); 85 86 switch (event) { 87 .commit => |commit| { 88 if (commit.operation != .create) return; 89 if (!std.mem.eql(u8, commit.collection, "app.bsky.feed.post")) return; 90 91 const record = commit.record orelse return; 92 const text = zat.json.getString(record, "text") orelse return; 93 if (text.len == 0) return; 94 95 processPost(text, commit.did); 96 }, 97 else => {}, 98 } 99 } 100 101 pub fn onConnect(_: *Handler, host: []const u8) void { 102 const len = @min(host.len, current_host_buf.len); 103 @memcpy(current_host_buf[0..len], host[0..len]); 104 current_host_len.store(len, .release); 105 // failover = not the primary (first) host 106 is_failover.store(!std.mem.eql(u8, host, HOSTS[0]), .release); 107 log.info("connected to {s}{s}", .{ host, if (is_failover.load(.acquire)) " (failover)" else "" }); 108 } 109 110 pub fn onError(_: *Handler, err: anyerror) void { 111 log.err("jetstream error: {s}", .{@errorName(err)}); 112 } 113}; 114 115// ── entity normalization ── 116// matches bridge.py: whitespace collapse, punctuation trim, NORP plural collapse. 117 118const TRIM_CHARS = " \t\r\n\"'`.,:;!?()[]{}<>"; 119 120fn isTrimChar(c: u8) bool { 121 for (TRIM_CHARS) |t| { 122 if (c == t) return true; 123 } 124 return false; 125} 126 127/// count alphabetic characters in text. 128fn countAlpha(text: []const u8) usize { 129 var n: usize = 0; 130 for (text) |c| { 131 if ((c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z')) n += 1; 132 } 133 return n; 134} 135 136/// normalize entity text: collapse whitespace, trim punctuation, NORP plural collapse. 137/// returns the normalized slice within `buf`, or null if the result is empty/invalid. 138fn normalizeEntity(raw: []const u8, label: spacez.Label, buf: *[MAX_TEXT_LEN]u8) ?[]const u8 { 139 // collapse whitespace: " ".join(text.split()) 140 var len: usize = 0; 141 var in_space = true; 142 for (raw) |c| { 143 if (c == ' ' or c == '\t' or c == '\r' or c == '\n') { 144 if (!in_space and len > 0) { 145 if (len >= buf.len) break; 146 buf[len] = ' '; 147 len += 1; 148 } 149 in_space = true; 150 } else { 151 in_space = false; 152 if (len >= buf.len) break; 153 buf[len] = c; 154 len += 1; 155 } 156 } 157 158 // trim trailing space from whitespace collapse 159 while (len > 0 and buf[len - 1] == ' ') len -= 1; 160 161 // trim leading punctuation 162 var trim_start: usize = 0; 163 while (trim_start < len and isTrimChar(buf[trim_start])) trim_start += 1; 164 165 // trim trailing punctuation 166 while (len > trim_start and isTrimChar(buf[len - 1])) len -= 1; 167 168 if (trim_start >= len) return null; 169 const result = buf[trim_start..len]; 170 171 // NORP plural collapse: "Americans" -> "American" (single word, >3 chars, ends in s/S but not ss/SS) 172 if (label == .NORP and result.len > 3) { 173 // must be single word (no spaces) 174 var has_space = false; 175 for (result) |c| { 176 if (c == ' ') { 177 has_space = true; 178 break; 179 } 180 } 181 if (!has_space) { 182 const last = result[result.len - 1]; 183 const penult = result[result.len - 2]; 184 if ((last == 's' and penult != 's') or (last == 'S' and penult != 'S')) { 185 return result[0 .. result.len - 1]; 186 } 187 } 188 } 189 190 return result; 191} 192 193fn processPost(text: []const u8, did: []const u8) void { 194 const m = model orelse return; 195 196 // truncate to MAX_TEXT_LEN 197 const bounded = if (text.len > MAX_TEXT_LEN) text[0..MAX_TEXT_LEN] else text; 198 199 // run NER 200 var ent_buf: [32]spacez.SpanEntity = undefined; 201 const n_ents = spacez.recognize(&m, bounded, &ent_buf); 202 if (n_ents == 0) return; 203 204 // hash DID for user tracking 205 var did_hash: ?u64 = null; 206 if (zat.Did.parse(did)) |_| { 207 var hasher = std.hash.Wyhash.init(0); 208 hasher.update(did); 209 did_hash = hasher.final(); 210 } 211 212 // collect valid entities 213 var graph_entities: [32]entity_graph.EntityData = undefined; 214 var graph_count: usize = 0; 215 const world = lattice.get(); 216 217 for (ent_buf[0..n_ents]) |ent| { 218 if (!isKeptLabel(ent.label)) continue; 219 220 const raw_text = bounded[ent.start..ent.end]; 221 222 // normalize: whitespace collapse, punctuation trim, NORP plural 223 var norm_buf: [MAX_TEXT_LEN]u8 = undefined; 224 const ent_text = normalizeEntity(raw_text, ent.label, &norm_buf) orelse continue; 225 226 // must have ≥3 alphabetic characters (matches bridge.py) 227 if (countAlpha(ent_text) < 3) continue; 228 229 const label_str = @tagName(ent.label); 230 231 // record in entity tracker (trending + live feed) 232 entities.top.record(ent_text, label_str); 233 entities.recent.push(ent_text, label_str); 234 235 // feed lattice 236 const hash = hashEntity(ent_text); 237 world.feedEvent(hash, true); 238 239 // collect for graph 240 if (graph_count < 32) { 241 graph_entities[graph_count] = .{ .text = ent_text, .label = label_str }; 242 graph_count += 1; 243 } 244 } 245 246 // record co-occurrences in entity graph 247 if (graph_count > 0) { 248 entity_graph.graph.recordPost(graph_entities[0..graph_count], did_hash); 249 } 250} 251 252fn hashEntity(text: []const u8) u64 { 253 var hasher = std.hash.Wyhash.init(0); 254 for (text) |c| { 255 hasher.update(&[_]u8{std.ascii.toLower(c)}); 256 } 257 return hasher.final(); 258} 259 260// ── rate tracking thread ── 261 262fn rateTracker() void { 263 var prev_count: u64 = 0; 264 while (true) { 265 std.Thread.sleep(3 * std.time.ns_per_s); 266 const current = msg_count.load(.acquire); 267 const delta = current -| prev_count; 268 rate.store(@as(f32, @floatFromInt(delta)) / 3.0, .release); 269 prev_count = current; 270 } 271} 272 273// ── host configuration ── 274 275const HOSTS = [_][]const u8{ 276 "jetstream.waow.tech", 277 "jetstream1.us-east.bsky.network", 278 "jetstream2.us-east.bsky.network", 279 "jetstream1.us-west.bsky.network", 280 "jetstream2.us-west.bsky.network", 281}; 282 283// ── public entry point ── 284 285/// start the NER processing thread (jetstream consumer + spacez). 286/// blocks forever — call from a detached thread. 287pub fn start(allocator: std.mem.Allocator) void { 288 // start rate tracker 289 const tracker = std.Thread.spawn(.{}, rateTracker, .{}) catch { 290 log.err("failed to spawn rate tracker thread", .{}); 291 return; 292 }; 293 tracker.detach(); 294 295 // connect to jetstream and process events 296 var client = zat.JetstreamClient.init(allocator, .{ 297 .wanted_collections = &.{"app.bsky.feed.post"}, 298 .hosts = &HOSTS, 299 }); 300 defer client.deinit(); 301 302 var handler = Handler{}; 303 client.subscribe(&handler); // blocks forever 304}