this repo has no description
coral.waow.tech
1//! jetstream consumer + spacez NER.
2//!
3//! consumes the Bluesky firehose via zat's JetstreamClient,
4//! runs spacez NER on post text, and feeds entities directly
5//! into the entity graph — no HTTP round-trip.
6
7const std = @import("std");
8const zat = @import("zat");
9const spacez = @import("spacez");
10
11const lattice = @import("lattice.zig");
12const entities = @import("entities.zig");
13const entity_graph = @import("entity_graph.zig");
14
15const log = std.log.scoped(.ner);
16
17const MAX_TEXT_LEN = 500;
18
19/// entity labels that coral cares about.
20const KEEP_LABELS = [_]spacez.Label{
21 .PERSON,
22 .ORG,
23 .GPE,
24 .PRODUCT,
25 .EVENT,
26 .WORK_OF_ART,
27 .FAC,
28 .NORP,
29 .LOC,
30};
31
32fn isKeptLabel(label: spacez.Label) bool {
33 for (KEEP_LABELS) |k| {
34 if (label == k) return true;
35 }
36 return false;
37}
38
39// ── firehose rate tracking ──
40
41var msg_count: std.atomic.Value(u64) = std.atomic.Value(u64).init(0);
42var rate: std.atomic.Value(f32) = std.atomic.Value(f32).init(0);
43
44// ── current host tracking ──
45
46var current_host_buf: [256]u8 = undefined;
47var current_host_len: std.atomic.Value(usize) = std.atomic.Value(usize).init(0);
48var is_failover: std.atomic.Value(bool) = std.atomic.Value(bool).init(false);
49
50/// current firehose message rate (messages/sec).
51pub fn firehoseRate() f32 {
52 return rate.load(.acquire);
53}
54
55/// currently connected jetstream host (e.g. "jetstream.waow.tech").
56pub fn currentHost() ?[]const u8 {
57 const len = current_host_len.load(.acquire);
58 if (len == 0) return null;
59 return current_host_buf[0..len];
60}
61
62/// whether we've failed over from the primary host.
63pub fn isFailover() bool {
64 return is_failover.load(.acquire);
65}
66
67// ── model state ──
68
69var model: ?spacez.Model = null;
70
71/// embedded weight file (~6MB, compiled into binary).
72const weight_bytes = @embedFile("weights/en_core_web_sm.bin");
73
74pub fn init() !void {
75 model = try spacez.Model.load(weight_bytes);
76 log.info("spacez model loaded ({d} bytes)", .{weight_bytes.len});
77}
78
79// ── jetstream handler ──
80
81const Handler = struct {
82 pub fn onEvent(self: *Handler, event: zat.JetstreamEvent) void {
83 _ = self;
84 _ = msg_count.fetchAdd(1, .monotonic);
85
86 switch (event) {
87 .commit => |commit| {
88 if (commit.operation != .create) return;
89 if (!std.mem.eql(u8, commit.collection, "app.bsky.feed.post")) return;
90
91 const record = commit.record orelse return;
92 const text = zat.json.getString(record, "text") orelse return;
93 if (text.len == 0) return;
94
95 processPost(text, commit.did);
96 },
97 else => {},
98 }
99 }
100
101 pub fn onConnect(_: *Handler, host: []const u8) void {
102 const len = @min(host.len, current_host_buf.len);
103 @memcpy(current_host_buf[0..len], host[0..len]);
104 current_host_len.store(len, .release);
105 // failover = not the primary (first) host
106 is_failover.store(!std.mem.eql(u8, host, HOSTS[0]), .release);
107 log.info("connected to {s}{s}", .{ host, if (is_failover.load(.acquire)) " (failover)" else "" });
108 }
109
110 pub fn onError(_: *Handler, err: anyerror) void {
111 log.err("jetstream error: {s}", .{@errorName(err)});
112 }
113};
114
115// ── entity normalization ──
116// matches bridge.py: whitespace collapse, punctuation trim, NORP plural collapse.
117
118const TRIM_CHARS = " \t\r\n\"'`.,:;!?()[]{}<>";
119
120fn isTrimChar(c: u8) bool {
121 for (TRIM_CHARS) |t| {
122 if (c == t) return true;
123 }
124 return false;
125}
126
127/// count alphabetic characters in text.
128fn countAlpha(text: []const u8) usize {
129 var n: usize = 0;
130 for (text) |c| {
131 if ((c >= 'a' and c <= 'z') or (c >= 'A' and c <= 'Z')) n += 1;
132 }
133 return n;
134}
135
136/// normalize entity text: collapse whitespace, trim punctuation, NORP plural collapse.
137/// returns the normalized slice within `buf`, or null if the result is empty/invalid.
138fn normalizeEntity(raw: []const u8, label: spacez.Label, buf: *[MAX_TEXT_LEN]u8) ?[]const u8 {
139 // collapse whitespace: " ".join(text.split())
140 var len: usize = 0;
141 var in_space = true;
142 for (raw) |c| {
143 if (c == ' ' or c == '\t' or c == '\r' or c == '\n') {
144 if (!in_space and len > 0) {
145 if (len >= buf.len) break;
146 buf[len] = ' ';
147 len += 1;
148 }
149 in_space = true;
150 } else {
151 in_space = false;
152 if (len >= buf.len) break;
153 buf[len] = c;
154 len += 1;
155 }
156 }
157
158 // trim trailing space from whitespace collapse
159 while (len > 0 and buf[len - 1] == ' ') len -= 1;
160
161 // trim leading punctuation
162 var trim_start: usize = 0;
163 while (trim_start < len and isTrimChar(buf[trim_start])) trim_start += 1;
164
165 // trim trailing punctuation
166 while (len > trim_start and isTrimChar(buf[len - 1])) len -= 1;
167
168 if (trim_start >= len) return null;
169 const result = buf[trim_start..len];
170
171 // NORP plural collapse: "Americans" -> "American" (single word, >3 chars, ends in s/S but not ss/SS)
172 if (label == .NORP and result.len > 3) {
173 // must be single word (no spaces)
174 var has_space = false;
175 for (result) |c| {
176 if (c == ' ') {
177 has_space = true;
178 break;
179 }
180 }
181 if (!has_space) {
182 const last = result[result.len - 1];
183 const penult = result[result.len - 2];
184 if ((last == 's' and penult != 's') or (last == 'S' and penult != 'S')) {
185 return result[0 .. result.len - 1];
186 }
187 }
188 }
189
190 return result;
191}
192
193fn processPost(text: []const u8, did: []const u8) void {
194 const m = model orelse return;
195
196 // truncate to MAX_TEXT_LEN
197 const bounded = if (text.len > MAX_TEXT_LEN) text[0..MAX_TEXT_LEN] else text;
198
199 // run NER
200 var ent_buf: [32]spacez.SpanEntity = undefined;
201 const n_ents = spacez.recognize(&m, bounded, &ent_buf);
202 if (n_ents == 0) return;
203
204 // hash DID for user tracking
205 var did_hash: ?u64 = null;
206 if (zat.Did.parse(did)) |_| {
207 var hasher = std.hash.Wyhash.init(0);
208 hasher.update(did);
209 did_hash = hasher.final();
210 }
211
212 // collect valid entities
213 var graph_entities: [32]entity_graph.EntityData = undefined;
214 var graph_count: usize = 0;
215 const world = lattice.get();
216
217 for (ent_buf[0..n_ents]) |ent| {
218 if (!isKeptLabel(ent.label)) continue;
219
220 const raw_text = bounded[ent.start..ent.end];
221
222 // normalize: whitespace collapse, punctuation trim, NORP plural
223 var norm_buf: [MAX_TEXT_LEN]u8 = undefined;
224 const ent_text = normalizeEntity(raw_text, ent.label, &norm_buf) orelse continue;
225
226 // must have ≥3 alphabetic characters (matches bridge.py)
227 if (countAlpha(ent_text) < 3) continue;
228
229 const label_str = @tagName(ent.label);
230
231 // record in entity tracker (trending + live feed)
232 entities.top.record(ent_text, label_str);
233 entities.recent.push(ent_text, label_str);
234
235 // feed lattice
236 const hash = hashEntity(ent_text);
237 world.feedEvent(hash, true);
238
239 // collect for graph
240 if (graph_count < 32) {
241 graph_entities[graph_count] = .{ .text = ent_text, .label = label_str };
242 graph_count += 1;
243 }
244 }
245
246 // record co-occurrences in entity graph
247 if (graph_count > 0) {
248 entity_graph.graph.recordPost(graph_entities[0..graph_count], did_hash);
249 }
250}
251
252fn hashEntity(text: []const u8) u64 {
253 var hasher = std.hash.Wyhash.init(0);
254 for (text) |c| {
255 hasher.update(&[_]u8{std.ascii.toLower(c)});
256 }
257 return hasher.final();
258}
259
260// ── rate tracking thread ──
261
262fn rateTracker() void {
263 var prev_count: u64 = 0;
264 while (true) {
265 std.Thread.sleep(3 * std.time.ns_per_s);
266 const current = msg_count.load(.acquire);
267 const delta = current -| prev_count;
268 rate.store(@as(f32, @floatFromInt(delta)) / 3.0, .release);
269 prev_count = current;
270 }
271}
272
273// ── host configuration ──
274
275const HOSTS = [_][]const u8{
276 "jetstream.waow.tech",
277 "jetstream1.us-east.bsky.network",
278 "jetstream2.us-east.bsky.network",
279 "jetstream1.us-west.bsky.network",
280 "jetstream2.us-west.bsky.network",
281};
282
283// ── public entry point ──
284
285/// start the NER processing thread (jetstream consumer + spacez).
286/// blocks forever — call from a detached thread.
287pub fn start(allocator: std.mem.Allocator) void {
288 // start rate tracker
289 const tracker = std.Thread.spawn(.{}, rateTracker, .{}) catch {
290 log.err("failed to spawn rate tracker thread", .{});
291 return;
292 };
293 tracker.detach();
294
295 // connect to jetstream and process events
296 var client = zat.JetstreamClient.init(allocator, .{
297 .wanted_collections = &.{"app.bsky.feed.post"},
298 .hosts = &HOSTS,
299 });
300 defer client.deinit();
301
302 var handler = Handler{};
303 client.subscribe(&handler); // blocks forever
304}