this repo has no description
coral.waow.tech
1const std = @import("std");
2const zqlite = @import("zqlite");
3const Mutex = std.Thread.Mutex;
4
5const log = std.log.scoped(.db);
6
7// global database connection
8var conn: ?zqlite.Conn = null;
9var mutex: Mutex = .{};
10var initialized: bool = false;
11
12/// initialize SQLite database connection
13pub fn init(path: [*:0]const u8) !void {
14 if (initialized) return;
15
16 mutex.lock();
17 defer mutex.unlock();
18
19 if (initialized) return; // double-check after lock
20
21 const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite;
22 conn = zqlite.open(path, flags) catch |err| {
23 log.err("failed to open database: {}", .{err});
24 return err;
25 };
26
27 // SQLite pragmas for performance and reliability
28 _ = conn.?.exec("PRAGMA journal_mode=WAL", .{}) catch {};
29 _ = conn.?.exec("PRAGMA busy_timeout=5000", .{}) catch {};
30 _ = conn.?.exec("PRAGMA foreign_keys=ON", .{}) catch {};
31 _ = conn.?.exec("PRAGMA synchronous=NORMAL", .{}) catch {};
32
33 // run migrations
34 try applyMigrations();
35
36 initialized = true;
37 log.info("database initialized at {s}", .{std.mem.span(path)});
38}
39
40/// close database connection
41pub fn close() void {
42 mutex.lock();
43 defer mutex.unlock();
44
45 if (conn) |c| {
46 c.close();
47 conn = null;
48 }
49 initialized = false;
50}
51
52/// execute SQL with no return value
53pub fn exec(sql: []const u8, args: anytype) !void {
54 mutex.lock();
55 defer mutex.unlock();
56
57 if (conn) |c| {
58 c.exec(sql, args) catch |err| {
59 log.err("exec failed: {} sql={s}", .{ err, sql });
60 return err;
61 };
62 } else {
63 return error.NotInitialized;
64 }
65}
66
67/// query and iterate rows
68pub fn query(sql: []const u8, args: anytype) !zqlite.Rows {
69 mutex.lock();
70 defer mutex.unlock();
71
72 if (conn) |c| {
73 return c.query(sql, args) catch |err| {
74 log.err("query failed: {} sql={s}", .{ err, sql });
75 return err;
76 };
77 } else {
78 return error.NotInitialized;
79 }
80}
81
82// ==================== migrations ====================
83
84const SCHEMA_VERSION = 8;
85
86fn applyMigrations() !void {
87 const c = conn orelse return error.NotInitialized;
88
89 // create migrations table if not exists
90 _ = c.exec(
91 \\CREATE TABLE IF NOT EXISTS _migrations (
92 \\ id INTEGER PRIMARY KEY,
93 \\ applied_at TEXT DEFAULT (datetime('now'))
94 \\)
95 , .{}) catch |err| {
96 log.err("failed to create migrations table: {}", .{err});
97 return err;
98 };
99
100 // check current version
101 var rows = c.rows("SELECT COALESCE(MAX(id), 0) FROM _migrations", .{}) catch |err| {
102 log.err("failed to check migration version: {}", .{err});
103 return err;
104 };
105 defer rows.deinit();
106
107 const current_version: i64 = if (rows.next()) |row| row.int(0) else 0;
108
109 if (current_version < 1) {
110 try migration_001_entity_baselines(c);
111 }
112 if (current_version < 2) {
113 try migration_002_full_persistence(c);
114 }
115 if (current_version < 3) {
116 try migration_003_edge_timestamps(c);
117 }
118 if (current_version < 4) {
119 try migration_004_posts(c);
120 }
121 if (current_version < 5) {
122 try migration_005_fix_entity_posts_fk(c);
123 }
124 if (current_version < 6) {
125 try migration_006_last_seen(c);
126 }
127 if (current_version < 7) {
128 try migration_007_drop_mentions(c);
129 }
130 if (current_version < 8) {
131 try migration_008_drop_posts(c);
132 }
133
134 log.info("migrations complete, version={d}", .{SCHEMA_VERSION});
135}
136
137fn migration_001_entity_baselines(c: zqlite.Conn) !void {
138 log.info("applying migration 001: entity_baselines", .{});
139
140 // entity baselines table - stores EMA baseline rates for trend detection
141 _ = c.exec(
142 \\CREATE TABLE IF NOT EXISTS entity_baseline (
143 \\ text_hash INTEGER PRIMARY KEY,
144 \\ text TEXT NOT NULL,
145 \\ label TEXT NOT NULL,
146 \\ baseline_rate REAL DEFAULT 0.0,
147 \\ last_rate REAL DEFAULT 0.0,
148 \\ mention_count INTEGER DEFAULT 0,
149 \\ first_seen TEXT DEFAULT (datetime('now')),
150 \\ last_seen TEXT DEFAULT (datetime('now'))
151 \\)
152 , .{}) catch |err| {
153 log.err("failed to create entity_baseline table: {}", .{err});
154 return err;
155 };
156
157 // index for looking up by text
158 _ = c.exec(
159 \\CREATE INDEX IF NOT EXISTS idx_entity_baseline_text ON entity_baseline(text)
160 , .{}) catch {};
161
162 // record migration
163 _ = c.exec("INSERT INTO _migrations (id) VALUES (1)", .{}) catch |err| {
164 log.err("failed to record migration: {}", .{err});
165 return err;
166 };
167}
168
169fn migration_002_full_persistence(c: zqlite.Conn) !void {
170 log.info("applying migration 002: full persistence (entities, users, mentions, edges)", .{});
171
172 // entities table - full entity state for fast restart
173 _ = c.exec(
174 \\CREATE TABLE IF NOT EXISTS entities (
175 \\ id INTEGER PRIMARY KEY,
176 \\ text TEXT NOT NULL,
177 \\ label TEXT NOT NULL,
178 \\ grid_x INTEGER NOT NULL,
179 \\ grid_y INTEGER NOT NULL,
180 \\ baseline_rate REAL DEFAULT 0.0,
181 \\ created_at INTEGER NOT NULL,
182 \\ UNIQUE(text COLLATE NOCASE)
183 \\)
184 , .{}) catch |err| {
185 log.err("failed to create entities table: {}", .{err});
186 return err;
187 };
188
189 // users table - user state for activity tracking
190 _ = c.exec(
191 \\CREATE TABLE IF NOT EXISTS users (
192 \\ id INTEGER PRIMARY KEY,
193 \\ did_hash INTEGER NOT NULL UNIQUE,
194 \\ created_at INTEGER NOT NULL
195 \\)
196 , .{}) catch |err| {
197 log.err("failed to create users table: {}", .{err});
198 return err;
199 };
200
201 // mentions table - append-only event log
202 // stores recent mentions for reconstructing activity on restart
203 _ = c.exec(
204 \\CREATE TABLE IF NOT EXISTS mentions (
205 \\ id INTEGER PRIMARY KEY,
206 \\ entity_id INTEGER NOT NULL,
207 \\ user_id INTEGER,
208 \\ timestamp INTEGER NOT NULL,
209 \\ FOREIGN KEY (entity_id) REFERENCES entities(id),
210 \\ FOREIGN KEY (user_id) REFERENCES users(id)
211 \\)
212 , .{}) catch |err| {
213 log.err("failed to create mentions table: {}", .{err});
214 return err;
215 };
216
217 // index for loading recent mentions efficiently
218 _ = c.exec(
219 \\CREATE INDEX IF NOT EXISTS idx_mentions_timestamp ON mentions(timestamp DESC)
220 , .{}) catch {};
221 _ = c.exec(
222 \\CREATE INDEX IF NOT EXISTS idx_mentions_entity ON mentions(entity_id)
223 , .{}) catch {};
224
225 // edges table - co-occurrence relationships
226 _ = c.exec(
227 \\CREATE TABLE IF NOT EXISTS edges (
228 \\ entity_a INTEGER NOT NULL,
229 \\ entity_b INTEGER NOT NULL,
230 \\ created_at INTEGER NOT NULL,
231 \\ PRIMARY KEY (entity_a, entity_b),
232 \\ FOREIGN KEY (entity_a) REFERENCES entities(id),
233 \\ FOREIGN KEY (entity_b) REFERENCES entities(id)
234 \\)
235 , .{}) catch |err| {
236 log.err("failed to create edges table: {}", .{err});
237 return err;
238 };
239
240 // record migration
241 _ = c.exec("INSERT INTO _migrations (id) VALUES (2)", .{}) catch |err| {
242 log.err("failed to record migration: {}", .{err});
243 return err;
244 };
245}
246
247fn migration_003_edge_timestamps(c: zqlite.Conn) !void {
248 log.info("applying migration 003: edge timestamps for decay", .{});
249
250 // add last_seen column to edges table
251 // SQLite doesn't support ADD COLUMN with NOT NULL without default, so use default 0
252 _ = c.exec(
253 \\ALTER TABLE edges ADD COLUMN last_seen INTEGER DEFAULT 0
254 , .{}) catch |err| {
255 // column might already exist if partial migration happened
256 log.warn("alter table edges add last_seen: {} (may already exist)", .{err});
257 };
258
259 // record migration
260 _ = c.exec("INSERT INTO _migrations (id) VALUES (3)", .{}) catch |err| {
261 log.err("failed to record migration: {}", .{err});
262 return err;
263 };
264}
265
266fn migration_004_posts(c: zqlite.Conn) !void {
267 log.info("applying migration 004: posts for top-post-per-topic", .{});
268
269 // posts table - stores post metadata for "top post per topic"
270 _ = c.exec(
271 \\CREATE TABLE IF NOT EXISTS posts (
272 \\ id INTEGER PRIMARY KEY,
273 \\ at_uri TEXT NOT NULL UNIQUE,
274 \\ author_did TEXT NOT NULL,
275 \\ author_handle TEXT,
276 \\ author_followers INTEGER DEFAULT 0,
277 \\ score REAL DEFAULT 0,
278 \\ timestamp INTEGER NOT NULL
279 \\)
280 , .{}) catch |err| {
281 log.err("failed to create posts table: {}", .{err});
282 return err;
283 };
284
285 _ = c.exec(
286 \\CREATE INDEX IF NOT EXISTS idx_posts_timestamp ON posts(timestamp DESC)
287 , .{}) catch {};
288 _ = c.exec(
289 \\CREATE INDEX IF NOT EXISTS idx_posts_author_did ON posts(author_did)
290 , .{}) catch {};
291
292 // entity_posts - links entities to their top posts (curated, not full log)
293 // NOTE: no FK on entity_id because entities are persisted async (every 30s)
294 // and posts arrive continuously. FK on post_id is fine since we insert post first.
295 _ = c.exec(
296 \\CREATE TABLE IF NOT EXISTS entity_posts (
297 \\ entity_id INTEGER NOT NULL,
298 \\ post_id INTEGER NOT NULL,
299 \\ score REAL DEFAULT 0,
300 \\ timestamp INTEGER NOT NULL,
301 \\ PRIMARY KEY (entity_id, post_id),
302 \\ FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE
303 \\)
304 , .{}) catch |err| {
305 log.err("failed to create entity_posts table: {}", .{err});
306 return err;
307 };
308
309 _ = c.exec(
310 \\CREATE INDEX IF NOT EXISTS idx_entity_posts_entity_score ON entity_posts(entity_id, score DESC)
311 , .{}) catch {};
312 _ = c.exec(
313 \\CREATE INDEX IF NOT EXISTS idx_entity_posts_post ON entity_posts(post_id)
314 , .{}) catch {};
315
316 // record migration
317 _ = c.exec("INSERT INTO _migrations (id) VALUES (4)", .{}) catch |err| {
318 log.err("failed to record migration: {}", .{err});
319 return err;
320 };
321}
322
323fn migration_005_fix_entity_posts_fk(c: zqlite.Conn) !void {
324 log.info("applying migration 005: remove entity_id FK from entity_posts", .{});
325
326 // SQLite can't drop constraints, so recreate the table
327 // entity_posts is new and has minimal data, safe to recreate
328 _ = c.exec("DROP TABLE IF EXISTS entity_posts", .{}) catch |err| {
329 log.err("failed to drop entity_posts table: {}", .{err});
330 return err;
331 };
332
333 _ = c.exec(
334 \\CREATE TABLE entity_posts (
335 \\ entity_id INTEGER NOT NULL,
336 \\ post_id INTEGER NOT NULL,
337 \\ score REAL DEFAULT 0,
338 \\ timestamp INTEGER NOT NULL,
339 \\ PRIMARY KEY (entity_id, post_id),
340 \\ FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE
341 \\)
342 , .{}) catch |err| {
343 log.err("failed to recreate entity_posts table: {}", .{err});
344 return err;
345 };
346
347 _ = c.exec(
348 \\CREATE INDEX IF NOT EXISTS idx_entity_posts_entity_score ON entity_posts(entity_id, score DESC)
349 , .{}) catch {};
350 _ = c.exec(
351 \\CREATE INDEX IF NOT EXISTS idx_entity_posts_post ON entity_posts(post_id)
352 , .{}) catch {};
353
354 _ = c.exec("INSERT INTO _migrations (id) VALUES (5)", .{}) catch |err| {
355 log.err("failed to record migration: {}", .{err});
356 return err;
357 };
358}
359
360fn migration_006_last_seen(c: zqlite.Conn) !void {
361 log.info("applying migration 006: last_seen for entity/user eviction", .{});
362
363 // add last_seen to entities table (for eviction)
364 _ = c.exec(
365 \\ALTER TABLE entities ADD COLUMN last_seen INTEGER DEFAULT 0
366 , .{}) catch |err| {
367 log.warn("alter table entities add last_seen: {} (may already exist)", .{err});
368 };
369
370 // add last_seen to users table (for eviction)
371 _ = c.exec(
372 \\ALTER TABLE users ADD COLUMN last_seen INTEGER DEFAULT 0
373 , .{}) catch |err| {
374 log.warn("alter table users add last_seen: {} (may already exist)", .{err});
375 };
376
377 // initialize existing rows to current time so they're not immediately evictable
378 const now = std.time.milliTimestamp();
379 _ = c.exec("UPDATE entities SET last_seen = ? WHERE last_seen = 0", .{now}) catch {};
380 _ = c.exec("UPDATE users SET last_seen = ? WHERE last_seen = 0", .{now}) catch {};
381
382 _ = c.exec("INSERT INTO _migrations (id) VALUES (6)", .{}) catch |err| {
383 log.err("failed to record migration: {}", .{err});
384 return err;
385 };
386}
387
388fn migration_007_drop_mentions(c: zqlite.Conn) !void {
389 log.info("applying migration 007: drop legacy mentions table", .{});
390
391 // mentions were used by the old ring buffer; activity buckets are now in-memory
392 _ = c.exec("DROP TABLE IF EXISTS mentions", .{}) catch |err| {
393 log.err("failed to drop mentions table: {}", .{err});
394 return err;
395 };
396
397 _ = c.exec("INSERT INTO _migrations (id) VALUES (7)", .{}) catch |err| {
398 log.err("failed to record migration: {}", .{err});
399 return err;
400 };
401}
402
403fn migration_008_drop_posts(c: zqlite.Conn) !void {
404 log.info("applying migration 008: drop posts and entity_posts tables (top-post feature removed)", .{});
405
406 _ = c.exec("DROP TABLE IF EXISTS entity_posts", .{}) catch |err| {
407 log.err("failed to drop entity_posts table: {}", .{err});
408 return err;
409 };
410 _ = c.exec("DROP TABLE IF EXISTS posts", .{}) catch |err| {
411 log.err("failed to drop posts table: {}", .{err});
412 return err;
413 };
414
415 _ = c.exec("INSERT INTO _migrations (id) VALUES (8)", .{}) catch |err| {
416 log.err("failed to record migration: {}", .{err});
417 return err;
418 };
419}
420
421// ==================== entity baseline operations ====================
422
423/// hash entity text for database key (same as entity_graph)
424fn hashText(text: []const u8) i64 {
425 var hasher = std.hash.Wyhash.init(0);
426 hasher.update(text);
427 // SQLite INTEGER is signed 64-bit, so cast appropriately
428 return @bitCast(hasher.final());
429}
430
431/// normalize text to lowercase for consistent lookups
432fn normalizeText(text: []const u8, buf: *[64]u8) []const u8 {
433 const len = @min(text.len, 64);
434 for (0..len) |i| {
435 buf[i] = std.ascii.toLower(text[i]);
436 }
437 return buf[0..len];
438}
439
440/// get baseline rate for an entity, returns null if not found
441pub fn getBaseline(text: []const u8) ?f32 {
442 var norm_buf: [64]u8 = undefined;
443 const norm_text = normalizeText(text, &norm_buf);
444 const text_hash = hashText(norm_text);
445
446 mutex.lock();
447 defer mutex.unlock();
448
449 const c = conn orelse return null;
450 var rows = c.rows(
451 "SELECT baseline_rate FROM entity_baseline WHERE text_hash = ?",
452 .{text_hash},
453 ) catch return null;
454 defer rows.deinit();
455
456 if (rows.next()) |row| {
457 return @floatCast(row.float(0));
458 }
459 return null;
460}
461
462/// update baseline for an entity (upsert)
463pub fn updateBaseline(text: []const u8, label: []const u8, current_rate: f32, baseline_rate: f32) !void {
464 var norm_buf: [64]u8 = undefined;
465 const norm_text = normalizeText(text, &norm_buf);
466 const text_hash = hashText(norm_text);
467
468 mutex.lock();
469 defer mutex.unlock();
470
471 const c = conn orelse return error.NotInitialized;
472
473 // upsert: insert or update on conflict
474 _ = c.exec(
475 \\INSERT INTO entity_baseline (text_hash, text, label, baseline_rate, last_rate, mention_count, last_seen)
476 \\VALUES (?, ?, ?, ?, ?, 1, datetime('now'))
477 \\ON CONFLICT(text_hash) DO UPDATE SET
478 \\ baseline_rate = excluded.baseline_rate,
479 \\ last_rate = excluded.last_rate,
480 \\ mention_count = mention_count + 1,
481 \\ last_seen = datetime('now')
482 , .{ text_hash, text, label, baseline_rate, current_rate }) catch |err| {
483 log.err("failed to update baseline: {}", .{err});
484 return err;
485 };
486}
487
488/// load all baselines into memory (for startup)
489pub fn loadAllBaselines(allocator: std.mem.Allocator) !std.StringHashMap(f32) {
490 var baselines = std.StringHashMap(f32).init(allocator);
491 errdefer baselines.deinit();
492
493 mutex.lock();
494 defer mutex.unlock();
495
496 const c = conn orelse return error.NotInitialized;
497
498 var rows = c.rows(
499 "SELECT text, baseline_rate FROM entity_baseline WHERE baseline_rate > 0",
500 .{},
501 ) catch |err| {
502 log.err("failed to load baselines: {}", .{err});
503 return err;
504 };
505 defer rows.deinit();
506
507 while (rows.next()) |row| {
508 const text = row.text(0);
509 const rate: f32 = @floatCast(row.float(1));
510
511 // dupe the text since it's owned by the row
512 const text_dupe = try allocator.dupe(u8, text);
513 try baselines.put(text_dupe, rate);
514 }
515
516 log.info("loaded {d} entity baselines from database", .{baselines.count()});
517 return baselines;
518}
519
520/// get stats about stored baselines
521pub fn getStats() struct { entity_count: i64, avg_baseline: f64 } {
522 mutex.lock();
523 defer mutex.unlock();
524
525 const c = conn orelse return .{ .entity_count = 0, .avg_baseline = 0 };
526
527 var rows = c.rows(
528 "SELECT COUNT(*), COALESCE(AVG(baseline_rate), 0) FROM entity_baseline",
529 .{},
530 ) catch return .{ .entity_count = 0, .avg_baseline = 0 };
531 defer rows.deinit();
532
533 if (rows.next()) |row| {
534 return .{
535 .entity_count = row.int(0),
536 .avg_baseline = row.float(1),
537 };
538 }
539 return .{ .entity_count = 0, .avg_baseline = 0 };
540}
541
542// ==================== full state persistence ====================
543
544const entity_graph = @import("entity_graph.zig");
545
546/// save complete graph state to SQLite (called periodically)
547pub fn saveState(graph: *entity_graph.EntityGraph) !void {
548 mutex.lock();
549 defer mutex.unlock();
550
551 const c = conn orelse return error.NotInitialized;
552 const now = std.time.milliTimestamp();
553
554 // begin transaction for atomicity
555 _ = c.exec("BEGIN TRANSACTION", .{}) catch |err| {
556 log.err("failed to begin transaction: {}", .{err});
557 return err;
558 };
559 errdefer _ = c.exec("ROLLBACK", .{}) catch {};
560
561 // save entities (in two steps for backward compat with pre-migration-006 schema)
562 for (0..graph.count) |i| {
563 const e = &graph.entities[i];
564 // skip cleared entities (evicted slots)
565 if (e.text_len == 0) continue;
566
567 // insert or update core fields
568 // text UNIQUE conflicts happen when slots are reused (eviction)
569 // these are rare and non-critical - entity state is in memory
570 _ = c.exec(
571 \\INSERT INTO entities (id, text, label, grid_x, grid_y, baseline_rate, created_at)
572 \\VALUES (?, ?, ?, ?, ?, ?, ?)
573 \\ON CONFLICT(id) DO UPDATE SET
574 \\ text = excluded.text,
575 \\ label = excluded.label,
576 \\ baseline_rate = excluded.baseline_rate
577 , .{
578 @as(i64, @intCast(i)),
579 e.getText(),
580 e.getLabel(),
581 @as(i64, e.grid_x),
582 @as(i64, e.grid_y),
583 e.baseline_rate,
584 now,
585 }) catch {
586 // text UNIQUE conflict - slot reused with different text
587 // non-critical: entity is in memory, will sync on restart
588 continue;
589 };
590
591 // step 2: update last_seen if column exists (migration 006)
592 _ = c.exec(
593 "UPDATE entities SET last_seen = ? WHERE id = ?",
594 .{ e.last_seen, @as(i64, @intCast(i)) },
595 ) catch {}; // ignore if column doesn't exist
596
597 // NOTE: activity counts use time buckets now, reset on restart (no persistence)
598 }
599
600 // save users (in two steps for backward compat with pre-migration-006 schema)
601 for (0..graph.user_count) |i| {
602 const u = &graph.users[i];
603 // skip cleared users (evicted slots)
604 if (u.did_hash == 0) continue;
605
606 // step 1: insert/update core fields (works with old schema)
607 _ = c.exec(
608 \\INSERT INTO users (id, did_hash, created_at)
609 \\VALUES (?, ?, ?)
610 \\ON CONFLICT(id) DO NOTHING
611 , .{
612 @as(i64, @intCast(i)),
613 @as(i64, @bitCast(u.did_hash)),
614 now,
615 }) catch continue;
616
617 // step 2: update last_seen if column exists (migration 006)
618 _ = c.exec(
619 "UPDATE users SET last_seen = ? WHERE id = ?",
620 .{ u.last_seen, @as(i64, @intCast(i)) },
621 ) catch {}; // ignore if column doesn't exist
622 }
623
624 // save edges with timestamps
625 for (0..graph.count) |i| {
626 const e = &graph.entities[i];
627 for (0..e.edges.count) |j| {
628 const target = e.edges.edges[j];
629 const last_seen = e.edges.last_seen[j];
630 // only save each edge once (a < b)
631 if (i >= target) continue;
632 _ = c.exec(
633 \\INSERT INTO edges (entity_a, entity_b, created_at, last_seen)
634 \\VALUES (?, ?, ?, ?)
635 \\ON CONFLICT(entity_a, entity_b) DO UPDATE SET
636 \\ last_seen = excluded.last_seen
637 , .{ @as(i64, @intCast(i)), @as(i64, @intCast(target)), now, last_seen }) catch continue;
638 }
639 }
640
641 // commit transaction
642 _ = c.exec("COMMIT", .{}) catch |err| {
643 log.err("failed to commit transaction: {}", .{err});
644 return err;
645 };
646
647 log.info("saved state: {d} entities, {d} users", .{ graph.count, graph.user_count });
648}
649
650/// load complete graph state from SQLite (called on startup)
651pub fn loadState(graph: *entity_graph.EntityGraph) !void {
652 mutex.lock();
653 defer mutex.unlock();
654
655 const c = conn orelse return error.NotInitialized;
656
657 // load entities
658 var entity_rows = c.rows(
659 "SELECT id, text, label, grid_x, grid_y, baseline_rate, COALESCE(last_seen, 0) FROM entities ORDER BY id",
660 .{},
661 ) catch |err| {
662 log.err("failed to load entities: {}", .{err});
663 return err;
664 };
665 defer entity_rows.deinit();
666
667 const now = std.time.milliTimestamp();
668 var entity_count: u32 = 0;
669 while (entity_rows.next()) |row| {
670 const id: u32 = @intCast(row.int(0));
671 if (id >= entity_graph.MAX_ENTITIES) continue;
672
673 const e = &graph.entities[id];
674 const text = row.text(1);
675 const label = row.text(2);
676
677 // set text
678 const text_len: u8 = @intCast(@min(text.len, 64));
679 @memcpy(e.text[0..text_len], text[0..text_len]);
680 e.text_len = text_len;
681
682 // set label
683 const label_len: u8 = @intCast(@min(label.len, 16));
684 @memcpy(e.label[0..label_len], label[0..label_len]);
685 e.label_len = label_len;
686
687 e.grid_x = @intCast(row.int(3));
688 e.grid_y = @intCast(row.int(4));
689 e.baseline_rate = @floatCast(row.float(5));
690 // seed smoothed_rate from baseline to prevent negative trends after restart
691 // (activity buckets aren't persisted, so current_rate starts at 0)
692 e.smoothed_rate = e.baseline_rate;
693 // restore is_active based on seeded smoothed_rate
694 e.is_active = e.smoothed_rate >= entity_graph.ACTIVITY_THRESHOLD_ENTER;
695 // load last_seen, default to now if 0 (backward compat)
696 const db_last_seen: i64 = row.int(6);
697 e.last_seen = if (db_last_seen > 0) db_last_seen else now;
698
699 if (id >= entity_count) {
700 entity_count = id + 1;
701 }
702 }
703 graph.count = entity_count;
704
705 // load users
706 var user_rows = c.rows(
707 "SELECT id, did_hash, COALESCE(last_seen, 0) FROM users ORDER BY id",
708 .{},
709 ) catch |err| {
710 log.err("failed to load users: {}", .{err});
711 return err;
712 };
713 defer user_rows.deinit();
714
715 var user_count: u32 = 0;
716 while (user_rows.next()) |row| {
717 const id: u32 = @intCast(row.int(0));
718 if (id >= entity_graph.MAX_USERS) continue;
719
720 graph.users[id].did_hash = @bitCast(row.int(1));
721 // load last_seen, default to now if 0 (backward compat)
722 const db_last_seen: i64 = row.int(2);
723 graph.users[id].last_seen = if (db_last_seen > 0) db_last_seen else now;
724
725 if (id >= user_count) {
726 user_count = id + 1;
727 }
728 }
729 graph.user_count = user_count;
730
731 // load edges with timestamps
732 var edge_rows = c.rows(
733 "SELECT entity_a, entity_b, COALESCE(last_seen, created_at) FROM edges",
734 .{},
735 ) catch |err| {
736 log.err("failed to load edges: {}", .{err});
737 return err;
738 };
739 defer edge_rows.deinit();
740
741 var edge_count: u32 = 0;
742 while (edge_rows.next()) |row| {
743 const a: u32 = @intCast(row.int(0));
744 const b: u32 = @intCast(row.int(1));
745 const last_seen: entity_graph.Timestamp = row.int(2);
746 if (a >= graph.count or b >= graph.count) continue;
747
748 _ = graph.entities[a].edges.add(b, last_seen);
749 _ = graph.entities[b].edges.add(a, last_seen);
750 edge_count += 1;
751 }
752
753 // NOTE: activity counts use time buckets now, reset on restart (no persistence)
754 // entities will be inactive until new mentions come in
755
756 log.info("loaded state: {d} entities, {d} users, {d} edges (activity resets on restart)", .{
757 graph.count,
758 graph.user_count,
759 edge_count,
760 });
761}
762
763/// delete stale entity baselines (not seen in 7 days)
764pub fn pruneStaleBaselines() !usize {
765 mutex.lock();
766 defer mutex.unlock();
767
768 const c = conn orelse return error.NotInitialized;
769
770 // delete baselines where last_seen is older than 7 days
771 _ = c.exec(
772 \\DELETE FROM entity_baseline
773 \\WHERE last_seen < datetime('now', '-7 days')
774 , .{}) catch |err| {
775 log.err("failed to prune stale baselines: {}", .{err});
776 return err;
777 };
778
779 // return count of remaining rows for logging
780 var rows = c.rows("SELECT COUNT(*) FROM entity_baseline", .{}) catch return 0;
781 defer rows.deinit();
782
783 if (rows.next()) |row| {
784 return @intCast(row.int(0));
785 }
786 return 0;
787}