this repo has no description coral.waow.tech
at main 787 lines 26 kB view raw
1const std = @import("std"); 2const zqlite = @import("zqlite"); 3const Mutex = std.Thread.Mutex; 4 5const log = std.log.scoped(.db); 6 7// global database connection 8var conn: ?zqlite.Conn = null; 9var mutex: Mutex = .{}; 10var initialized: bool = false; 11 12/// initialize SQLite database connection 13pub fn init(path: [*:0]const u8) !void { 14 if (initialized) return; 15 16 mutex.lock(); 17 defer mutex.unlock(); 18 19 if (initialized) return; // double-check after lock 20 21 const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite; 22 conn = zqlite.open(path, flags) catch |err| { 23 log.err("failed to open database: {}", .{err}); 24 return err; 25 }; 26 27 // SQLite pragmas for performance and reliability 28 _ = conn.?.exec("PRAGMA journal_mode=WAL", .{}) catch {}; 29 _ = conn.?.exec("PRAGMA busy_timeout=5000", .{}) catch {}; 30 _ = conn.?.exec("PRAGMA foreign_keys=ON", .{}) catch {}; 31 _ = conn.?.exec("PRAGMA synchronous=NORMAL", .{}) catch {}; 32 33 // run migrations 34 try applyMigrations(); 35 36 initialized = true; 37 log.info("database initialized at {s}", .{std.mem.span(path)}); 38} 39 40/// close database connection 41pub fn close() void { 42 mutex.lock(); 43 defer mutex.unlock(); 44 45 if (conn) |c| { 46 c.close(); 47 conn = null; 48 } 49 initialized = false; 50} 51 52/// execute SQL with no return value 53pub fn exec(sql: []const u8, args: anytype) !void { 54 mutex.lock(); 55 defer mutex.unlock(); 56 57 if (conn) |c| { 58 c.exec(sql, args) catch |err| { 59 log.err("exec failed: {} sql={s}", .{ err, sql }); 60 return err; 61 }; 62 } else { 63 return error.NotInitialized; 64 } 65} 66 67/// query and iterate rows 68pub fn query(sql: []const u8, args: anytype) !zqlite.Rows { 69 mutex.lock(); 70 defer mutex.unlock(); 71 72 if (conn) |c| { 73 return c.query(sql, args) catch |err| { 74 log.err("query failed: {} sql={s}", .{ err, sql }); 75 return err; 76 }; 77 } else { 78 return error.NotInitialized; 79 } 80} 81 82// ==================== migrations ==================== 83 84const SCHEMA_VERSION = 8; 85 86fn applyMigrations() !void { 87 const c = conn orelse return error.NotInitialized; 88 89 // create migrations table if not exists 90 _ = c.exec( 91 \\CREATE TABLE IF NOT EXISTS _migrations ( 92 \\ id INTEGER PRIMARY KEY, 93 \\ applied_at TEXT DEFAULT (datetime('now')) 94 \\) 95 , .{}) catch |err| { 96 log.err("failed to create migrations table: {}", .{err}); 97 return err; 98 }; 99 100 // check current version 101 var rows = c.rows("SELECT COALESCE(MAX(id), 0) FROM _migrations", .{}) catch |err| { 102 log.err("failed to check migration version: {}", .{err}); 103 return err; 104 }; 105 defer rows.deinit(); 106 107 const current_version: i64 = if (rows.next()) |row| row.int(0) else 0; 108 109 if (current_version < 1) { 110 try migration_001_entity_baselines(c); 111 } 112 if (current_version < 2) { 113 try migration_002_full_persistence(c); 114 } 115 if (current_version < 3) { 116 try migration_003_edge_timestamps(c); 117 } 118 if (current_version < 4) { 119 try migration_004_posts(c); 120 } 121 if (current_version < 5) { 122 try migration_005_fix_entity_posts_fk(c); 123 } 124 if (current_version < 6) { 125 try migration_006_last_seen(c); 126 } 127 if (current_version < 7) { 128 try migration_007_drop_mentions(c); 129 } 130 if (current_version < 8) { 131 try migration_008_drop_posts(c); 132 } 133 134 log.info("migrations complete, version={d}", .{SCHEMA_VERSION}); 135} 136 137fn migration_001_entity_baselines(c: zqlite.Conn) !void { 138 log.info("applying migration 001: entity_baselines", .{}); 139 140 // entity baselines table - stores EMA baseline rates for trend detection 141 _ = c.exec( 142 \\CREATE TABLE IF NOT EXISTS entity_baseline ( 143 \\ text_hash INTEGER PRIMARY KEY, 144 \\ text TEXT NOT NULL, 145 \\ label TEXT NOT NULL, 146 \\ baseline_rate REAL DEFAULT 0.0, 147 \\ last_rate REAL DEFAULT 0.0, 148 \\ mention_count INTEGER DEFAULT 0, 149 \\ first_seen TEXT DEFAULT (datetime('now')), 150 \\ last_seen TEXT DEFAULT (datetime('now')) 151 \\) 152 , .{}) catch |err| { 153 log.err("failed to create entity_baseline table: {}", .{err}); 154 return err; 155 }; 156 157 // index for looking up by text 158 _ = c.exec( 159 \\CREATE INDEX IF NOT EXISTS idx_entity_baseline_text ON entity_baseline(text) 160 , .{}) catch {}; 161 162 // record migration 163 _ = c.exec("INSERT INTO _migrations (id) VALUES (1)", .{}) catch |err| { 164 log.err("failed to record migration: {}", .{err}); 165 return err; 166 }; 167} 168 169fn migration_002_full_persistence(c: zqlite.Conn) !void { 170 log.info("applying migration 002: full persistence (entities, users, mentions, edges)", .{}); 171 172 // entities table - full entity state for fast restart 173 _ = c.exec( 174 \\CREATE TABLE IF NOT EXISTS entities ( 175 \\ id INTEGER PRIMARY KEY, 176 \\ text TEXT NOT NULL, 177 \\ label TEXT NOT NULL, 178 \\ grid_x INTEGER NOT NULL, 179 \\ grid_y INTEGER NOT NULL, 180 \\ baseline_rate REAL DEFAULT 0.0, 181 \\ created_at INTEGER NOT NULL, 182 \\ UNIQUE(text COLLATE NOCASE) 183 \\) 184 , .{}) catch |err| { 185 log.err("failed to create entities table: {}", .{err}); 186 return err; 187 }; 188 189 // users table - user state for activity tracking 190 _ = c.exec( 191 \\CREATE TABLE IF NOT EXISTS users ( 192 \\ id INTEGER PRIMARY KEY, 193 \\ did_hash INTEGER NOT NULL UNIQUE, 194 \\ created_at INTEGER NOT NULL 195 \\) 196 , .{}) catch |err| { 197 log.err("failed to create users table: {}", .{err}); 198 return err; 199 }; 200 201 // mentions table - append-only event log 202 // stores recent mentions for reconstructing activity on restart 203 _ = c.exec( 204 \\CREATE TABLE IF NOT EXISTS mentions ( 205 \\ id INTEGER PRIMARY KEY, 206 \\ entity_id INTEGER NOT NULL, 207 \\ user_id INTEGER, 208 \\ timestamp INTEGER NOT NULL, 209 \\ FOREIGN KEY (entity_id) REFERENCES entities(id), 210 \\ FOREIGN KEY (user_id) REFERENCES users(id) 211 \\) 212 , .{}) catch |err| { 213 log.err("failed to create mentions table: {}", .{err}); 214 return err; 215 }; 216 217 // index for loading recent mentions efficiently 218 _ = c.exec( 219 \\CREATE INDEX IF NOT EXISTS idx_mentions_timestamp ON mentions(timestamp DESC) 220 , .{}) catch {}; 221 _ = c.exec( 222 \\CREATE INDEX IF NOT EXISTS idx_mentions_entity ON mentions(entity_id) 223 , .{}) catch {}; 224 225 // edges table - co-occurrence relationships 226 _ = c.exec( 227 \\CREATE TABLE IF NOT EXISTS edges ( 228 \\ entity_a INTEGER NOT NULL, 229 \\ entity_b INTEGER NOT NULL, 230 \\ created_at INTEGER NOT NULL, 231 \\ PRIMARY KEY (entity_a, entity_b), 232 \\ FOREIGN KEY (entity_a) REFERENCES entities(id), 233 \\ FOREIGN KEY (entity_b) REFERENCES entities(id) 234 \\) 235 , .{}) catch |err| { 236 log.err("failed to create edges table: {}", .{err}); 237 return err; 238 }; 239 240 // record migration 241 _ = c.exec("INSERT INTO _migrations (id) VALUES (2)", .{}) catch |err| { 242 log.err("failed to record migration: {}", .{err}); 243 return err; 244 }; 245} 246 247fn migration_003_edge_timestamps(c: zqlite.Conn) !void { 248 log.info("applying migration 003: edge timestamps for decay", .{}); 249 250 // add last_seen column to edges table 251 // SQLite doesn't support ADD COLUMN with NOT NULL without default, so use default 0 252 _ = c.exec( 253 \\ALTER TABLE edges ADD COLUMN last_seen INTEGER DEFAULT 0 254 , .{}) catch |err| { 255 // column might already exist if partial migration happened 256 log.warn("alter table edges add last_seen: {} (may already exist)", .{err}); 257 }; 258 259 // record migration 260 _ = c.exec("INSERT INTO _migrations (id) VALUES (3)", .{}) catch |err| { 261 log.err("failed to record migration: {}", .{err}); 262 return err; 263 }; 264} 265 266fn migration_004_posts(c: zqlite.Conn) !void { 267 log.info("applying migration 004: posts for top-post-per-topic", .{}); 268 269 // posts table - stores post metadata for "top post per topic" 270 _ = c.exec( 271 \\CREATE TABLE IF NOT EXISTS posts ( 272 \\ id INTEGER PRIMARY KEY, 273 \\ at_uri TEXT NOT NULL UNIQUE, 274 \\ author_did TEXT NOT NULL, 275 \\ author_handle TEXT, 276 \\ author_followers INTEGER DEFAULT 0, 277 \\ score REAL DEFAULT 0, 278 \\ timestamp INTEGER NOT NULL 279 \\) 280 , .{}) catch |err| { 281 log.err("failed to create posts table: {}", .{err}); 282 return err; 283 }; 284 285 _ = c.exec( 286 \\CREATE INDEX IF NOT EXISTS idx_posts_timestamp ON posts(timestamp DESC) 287 , .{}) catch {}; 288 _ = c.exec( 289 \\CREATE INDEX IF NOT EXISTS idx_posts_author_did ON posts(author_did) 290 , .{}) catch {}; 291 292 // entity_posts - links entities to their top posts (curated, not full log) 293 // NOTE: no FK on entity_id because entities are persisted async (every 30s) 294 // and posts arrive continuously. FK on post_id is fine since we insert post first. 295 _ = c.exec( 296 \\CREATE TABLE IF NOT EXISTS entity_posts ( 297 \\ entity_id INTEGER NOT NULL, 298 \\ post_id INTEGER NOT NULL, 299 \\ score REAL DEFAULT 0, 300 \\ timestamp INTEGER NOT NULL, 301 \\ PRIMARY KEY (entity_id, post_id), 302 \\ FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE 303 \\) 304 , .{}) catch |err| { 305 log.err("failed to create entity_posts table: {}", .{err}); 306 return err; 307 }; 308 309 _ = c.exec( 310 \\CREATE INDEX IF NOT EXISTS idx_entity_posts_entity_score ON entity_posts(entity_id, score DESC) 311 , .{}) catch {}; 312 _ = c.exec( 313 \\CREATE INDEX IF NOT EXISTS idx_entity_posts_post ON entity_posts(post_id) 314 , .{}) catch {}; 315 316 // record migration 317 _ = c.exec("INSERT INTO _migrations (id) VALUES (4)", .{}) catch |err| { 318 log.err("failed to record migration: {}", .{err}); 319 return err; 320 }; 321} 322 323fn migration_005_fix_entity_posts_fk(c: zqlite.Conn) !void { 324 log.info("applying migration 005: remove entity_id FK from entity_posts", .{}); 325 326 // SQLite can't drop constraints, so recreate the table 327 // entity_posts is new and has minimal data, safe to recreate 328 _ = c.exec("DROP TABLE IF EXISTS entity_posts", .{}) catch |err| { 329 log.err("failed to drop entity_posts table: {}", .{err}); 330 return err; 331 }; 332 333 _ = c.exec( 334 \\CREATE TABLE entity_posts ( 335 \\ entity_id INTEGER NOT NULL, 336 \\ post_id INTEGER NOT NULL, 337 \\ score REAL DEFAULT 0, 338 \\ timestamp INTEGER NOT NULL, 339 \\ PRIMARY KEY (entity_id, post_id), 340 \\ FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE 341 \\) 342 , .{}) catch |err| { 343 log.err("failed to recreate entity_posts table: {}", .{err}); 344 return err; 345 }; 346 347 _ = c.exec( 348 \\CREATE INDEX IF NOT EXISTS idx_entity_posts_entity_score ON entity_posts(entity_id, score DESC) 349 , .{}) catch {}; 350 _ = c.exec( 351 \\CREATE INDEX IF NOT EXISTS idx_entity_posts_post ON entity_posts(post_id) 352 , .{}) catch {}; 353 354 _ = c.exec("INSERT INTO _migrations (id) VALUES (5)", .{}) catch |err| { 355 log.err("failed to record migration: {}", .{err}); 356 return err; 357 }; 358} 359 360fn migration_006_last_seen(c: zqlite.Conn) !void { 361 log.info("applying migration 006: last_seen for entity/user eviction", .{}); 362 363 // add last_seen to entities table (for eviction) 364 _ = c.exec( 365 \\ALTER TABLE entities ADD COLUMN last_seen INTEGER DEFAULT 0 366 , .{}) catch |err| { 367 log.warn("alter table entities add last_seen: {} (may already exist)", .{err}); 368 }; 369 370 // add last_seen to users table (for eviction) 371 _ = c.exec( 372 \\ALTER TABLE users ADD COLUMN last_seen INTEGER DEFAULT 0 373 , .{}) catch |err| { 374 log.warn("alter table users add last_seen: {} (may already exist)", .{err}); 375 }; 376 377 // initialize existing rows to current time so they're not immediately evictable 378 const now = std.time.milliTimestamp(); 379 _ = c.exec("UPDATE entities SET last_seen = ? WHERE last_seen = 0", .{now}) catch {}; 380 _ = c.exec("UPDATE users SET last_seen = ? WHERE last_seen = 0", .{now}) catch {}; 381 382 _ = c.exec("INSERT INTO _migrations (id) VALUES (6)", .{}) catch |err| { 383 log.err("failed to record migration: {}", .{err}); 384 return err; 385 }; 386} 387 388fn migration_007_drop_mentions(c: zqlite.Conn) !void { 389 log.info("applying migration 007: drop legacy mentions table", .{}); 390 391 // mentions were used by the old ring buffer; activity buckets are now in-memory 392 _ = c.exec("DROP TABLE IF EXISTS mentions", .{}) catch |err| { 393 log.err("failed to drop mentions table: {}", .{err}); 394 return err; 395 }; 396 397 _ = c.exec("INSERT INTO _migrations (id) VALUES (7)", .{}) catch |err| { 398 log.err("failed to record migration: {}", .{err}); 399 return err; 400 }; 401} 402 403fn migration_008_drop_posts(c: zqlite.Conn) !void { 404 log.info("applying migration 008: drop posts and entity_posts tables (top-post feature removed)", .{}); 405 406 _ = c.exec("DROP TABLE IF EXISTS entity_posts", .{}) catch |err| { 407 log.err("failed to drop entity_posts table: {}", .{err}); 408 return err; 409 }; 410 _ = c.exec("DROP TABLE IF EXISTS posts", .{}) catch |err| { 411 log.err("failed to drop posts table: {}", .{err}); 412 return err; 413 }; 414 415 _ = c.exec("INSERT INTO _migrations (id) VALUES (8)", .{}) catch |err| { 416 log.err("failed to record migration: {}", .{err}); 417 return err; 418 }; 419} 420 421// ==================== entity baseline operations ==================== 422 423/// hash entity text for database key (same as entity_graph) 424fn hashText(text: []const u8) i64 { 425 var hasher = std.hash.Wyhash.init(0); 426 hasher.update(text); 427 // SQLite INTEGER is signed 64-bit, so cast appropriately 428 return @bitCast(hasher.final()); 429} 430 431/// normalize text to lowercase for consistent lookups 432fn normalizeText(text: []const u8, buf: *[64]u8) []const u8 { 433 const len = @min(text.len, 64); 434 for (0..len) |i| { 435 buf[i] = std.ascii.toLower(text[i]); 436 } 437 return buf[0..len]; 438} 439 440/// get baseline rate for an entity, returns null if not found 441pub fn getBaseline(text: []const u8) ?f32 { 442 var norm_buf: [64]u8 = undefined; 443 const norm_text = normalizeText(text, &norm_buf); 444 const text_hash = hashText(norm_text); 445 446 mutex.lock(); 447 defer mutex.unlock(); 448 449 const c = conn orelse return null; 450 var rows = c.rows( 451 "SELECT baseline_rate FROM entity_baseline WHERE text_hash = ?", 452 .{text_hash}, 453 ) catch return null; 454 defer rows.deinit(); 455 456 if (rows.next()) |row| { 457 return @floatCast(row.float(0)); 458 } 459 return null; 460} 461 462/// update baseline for an entity (upsert) 463pub fn updateBaseline(text: []const u8, label: []const u8, current_rate: f32, baseline_rate: f32) !void { 464 var norm_buf: [64]u8 = undefined; 465 const norm_text = normalizeText(text, &norm_buf); 466 const text_hash = hashText(norm_text); 467 468 mutex.lock(); 469 defer mutex.unlock(); 470 471 const c = conn orelse return error.NotInitialized; 472 473 // upsert: insert or update on conflict 474 _ = c.exec( 475 \\INSERT INTO entity_baseline (text_hash, text, label, baseline_rate, last_rate, mention_count, last_seen) 476 \\VALUES (?, ?, ?, ?, ?, 1, datetime('now')) 477 \\ON CONFLICT(text_hash) DO UPDATE SET 478 \\ baseline_rate = excluded.baseline_rate, 479 \\ last_rate = excluded.last_rate, 480 \\ mention_count = mention_count + 1, 481 \\ last_seen = datetime('now') 482 , .{ text_hash, text, label, baseline_rate, current_rate }) catch |err| { 483 log.err("failed to update baseline: {}", .{err}); 484 return err; 485 }; 486} 487 488/// load all baselines into memory (for startup) 489pub fn loadAllBaselines(allocator: std.mem.Allocator) !std.StringHashMap(f32) { 490 var baselines = std.StringHashMap(f32).init(allocator); 491 errdefer baselines.deinit(); 492 493 mutex.lock(); 494 defer mutex.unlock(); 495 496 const c = conn orelse return error.NotInitialized; 497 498 var rows = c.rows( 499 "SELECT text, baseline_rate FROM entity_baseline WHERE baseline_rate > 0", 500 .{}, 501 ) catch |err| { 502 log.err("failed to load baselines: {}", .{err}); 503 return err; 504 }; 505 defer rows.deinit(); 506 507 while (rows.next()) |row| { 508 const text = row.text(0); 509 const rate: f32 = @floatCast(row.float(1)); 510 511 // dupe the text since it's owned by the row 512 const text_dupe = try allocator.dupe(u8, text); 513 try baselines.put(text_dupe, rate); 514 } 515 516 log.info("loaded {d} entity baselines from database", .{baselines.count()}); 517 return baselines; 518} 519 520/// get stats about stored baselines 521pub fn getStats() struct { entity_count: i64, avg_baseline: f64 } { 522 mutex.lock(); 523 defer mutex.unlock(); 524 525 const c = conn orelse return .{ .entity_count = 0, .avg_baseline = 0 }; 526 527 var rows = c.rows( 528 "SELECT COUNT(*), COALESCE(AVG(baseline_rate), 0) FROM entity_baseline", 529 .{}, 530 ) catch return .{ .entity_count = 0, .avg_baseline = 0 }; 531 defer rows.deinit(); 532 533 if (rows.next()) |row| { 534 return .{ 535 .entity_count = row.int(0), 536 .avg_baseline = row.float(1), 537 }; 538 } 539 return .{ .entity_count = 0, .avg_baseline = 0 }; 540} 541 542// ==================== full state persistence ==================== 543 544const entity_graph = @import("entity_graph.zig"); 545 546/// save complete graph state to SQLite (called periodically) 547pub fn saveState(graph: *entity_graph.EntityGraph) !void { 548 mutex.lock(); 549 defer mutex.unlock(); 550 551 const c = conn orelse return error.NotInitialized; 552 const now = std.time.milliTimestamp(); 553 554 // begin transaction for atomicity 555 _ = c.exec("BEGIN TRANSACTION", .{}) catch |err| { 556 log.err("failed to begin transaction: {}", .{err}); 557 return err; 558 }; 559 errdefer _ = c.exec("ROLLBACK", .{}) catch {}; 560 561 // save entities (in two steps for backward compat with pre-migration-006 schema) 562 for (0..graph.count) |i| { 563 const e = &graph.entities[i]; 564 // skip cleared entities (evicted slots) 565 if (e.text_len == 0) continue; 566 567 // insert or update core fields 568 // text UNIQUE conflicts happen when slots are reused (eviction) 569 // these are rare and non-critical - entity state is in memory 570 _ = c.exec( 571 \\INSERT INTO entities (id, text, label, grid_x, grid_y, baseline_rate, created_at) 572 \\VALUES (?, ?, ?, ?, ?, ?, ?) 573 \\ON CONFLICT(id) DO UPDATE SET 574 \\ text = excluded.text, 575 \\ label = excluded.label, 576 \\ baseline_rate = excluded.baseline_rate 577 , .{ 578 @as(i64, @intCast(i)), 579 e.getText(), 580 e.getLabel(), 581 @as(i64, e.grid_x), 582 @as(i64, e.grid_y), 583 e.baseline_rate, 584 now, 585 }) catch { 586 // text UNIQUE conflict - slot reused with different text 587 // non-critical: entity is in memory, will sync on restart 588 continue; 589 }; 590 591 // step 2: update last_seen if column exists (migration 006) 592 _ = c.exec( 593 "UPDATE entities SET last_seen = ? WHERE id = ?", 594 .{ e.last_seen, @as(i64, @intCast(i)) }, 595 ) catch {}; // ignore if column doesn't exist 596 597 // NOTE: activity counts use time buckets now, reset on restart (no persistence) 598 } 599 600 // save users (in two steps for backward compat with pre-migration-006 schema) 601 for (0..graph.user_count) |i| { 602 const u = &graph.users[i]; 603 // skip cleared users (evicted slots) 604 if (u.did_hash == 0) continue; 605 606 // step 1: insert/update core fields (works with old schema) 607 _ = c.exec( 608 \\INSERT INTO users (id, did_hash, created_at) 609 \\VALUES (?, ?, ?) 610 \\ON CONFLICT(id) DO NOTHING 611 , .{ 612 @as(i64, @intCast(i)), 613 @as(i64, @bitCast(u.did_hash)), 614 now, 615 }) catch continue; 616 617 // step 2: update last_seen if column exists (migration 006) 618 _ = c.exec( 619 "UPDATE users SET last_seen = ? WHERE id = ?", 620 .{ u.last_seen, @as(i64, @intCast(i)) }, 621 ) catch {}; // ignore if column doesn't exist 622 } 623 624 // save edges with timestamps 625 for (0..graph.count) |i| { 626 const e = &graph.entities[i]; 627 for (0..e.edges.count) |j| { 628 const target = e.edges.edges[j]; 629 const last_seen = e.edges.last_seen[j]; 630 // only save each edge once (a < b) 631 if (i >= target) continue; 632 _ = c.exec( 633 \\INSERT INTO edges (entity_a, entity_b, created_at, last_seen) 634 \\VALUES (?, ?, ?, ?) 635 \\ON CONFLICT(entity_a, entity_b) DO UPDATE SET 636 \\ last_seen = excluded.last_seen 637 , .{ @as(i64, @intCast(i)), @as(i64, @intCast(target)), now, last_seen }) catch continue; 638 } 639 } 640 641 // commit transaction 642 _ = c.exec("COMMIT", .{}) catch |err| { 643 log.err("failed to commit transaction: {}", .{err}); 644 return err; 645 }; 646 647 log.info("saved state: {d} entities, {d} users", .{ graph.count, graph.user_count }); 648} 649 650/// load complete graph state from SQLite (called on startup) 651pub fn loadState(graph: *entity_graph.EntityGraph) !void { 652 mutex.lock(); 653 defer mutex.unlock(); 654 655 const c = conn orelse return error.NotInitialized; 656 657 // load entities 658 var entity_rows = c.rows( 659 "SELECT id, text, label, grid_x, grid_y, baseline_rate, COALESCE(last_seen, 0) FROM entities ORDER BY id", 660 .{}, 661 ) catch |err| { 662 log.err("failed to load entities: {}", .{err}); 663 return err; 664 }; 665 defer entity_rows.deinit(); 666 667 const now = std.time.milliTimestamp(); 668 var entity_count: u32 = 0; 669 while (entity_rows.next()) |row| { 670 const id: u32 = @intCast(row.int(0)); 671 if (id >= entity_graph.MAX_ENTITIES) continue; 672 673 const e = &graph.entities[id]; 674 const text = row.text(1); 675 const label = row.text(2); 676 677 // set text 678 const text_len: u8 = @intCast(@min(text.len, 64)); 679 @memcpy(e.text[0..text_len], text[0..text_len]); 680 e.text_len = text_len; 681 682 // set label 683 const label_len: u8 = @intCast(@min(label.len, 16)); 684 @memcpy(e.label[0..label_len], label[0..label_len]); 685 e.label_len = label_len; 686 687 e.grid_x = @intCast(row.int(3)); 688 e.grid_y = @intCast(row.int(4)); 689 e.baseline_rate = @floatCast(row.float(5)); 690 // seed smoothed_rate from baseline to prevent negative trends after restart 691 // (activity buckets aren't persisted, so current_rate starts at 0) 692 e.smoothed_rate = e.baseline_rate; 693 // restore is_active based on seeded smoothed_rate 694 e.is_active = e.smoothed_rate >= entity_graph.ACTIVITY_THRESHOLD_ENTER; 695 // load last_seen, default to now if 0 (backward compat) 696 const db_last_seen: i64 = row.int(6); 697 e.last_seen = if (db_last_seen > 0) db_last_seen else now; 698 699 if (id >= entity_count) { 700 entity_count = id + 1; 701 } 702 } 703 graph.count = entity_count; 704 705 // load users 706 var user_rows = c.rows( 707 "SELECT id, did_hash, COALESCE(last_seen, 0) FROM users ORDER BY id", 708 .{}, 709 ) catch |err| { 710 log.err("failed to load users: {}", .{err}); 711 return err; 712 }; 713 defer user_rows.deinit(); 714 715 var user_count: u32 = 0; 716 while (user_rows.next()) |row| { 717 const id: u32 = @intCast(row.int(0)); 718 if (id >= entity_graph.MAX_USERS) continue; 719 720 graph.users[id].did_hash = @bitCast(row.int(1)); 721 // load last_seen, default to now if 0 (backward compat) 722 const db_last_seen: i64 = row.int(2); 723 graph.users[id].last_seen = if (db_last_seen > 0) db_last_seen else now; 724 725 if (id >= user_count) { 726 user_count = id + 1; 727 } 728 } 729 graph.user_count = user_count; 730 731 // load edges with timestamps 732 var edge_rows = c.rows( 733 "SELECT entity_a, entity_b, COALESCE(last_seen, created_at) FROM edges", 734 .{}, 735 ) catch |err| { 736 log.err("failed to load edges: {}", .{err}); 737 return err; 738 }; 739 defer edge_rows.deinit(); 740 741 var edge_count: u32 = 0; 742 while (edge_rows.next()) |row| { 743 const a: u32 = @intCast(row.int(0)); 744 const b: u32 = @intCast(row.int(1)); 745 const last_seen: entity_graph.Timestamp = row.int(2); 746 if (a >= graph.count or b >= graph.count) continue; 747 748 _ = graph.entities[a].edges.add(b, last_seen); 749 _ = graph.entities[b].edges.add(a, last_seen); 750 edge_count += 1; 751 } 752 753 // NOTE: activity counts use time buckets now, reset on restart (no persistence) 754 // entities will be inactive until new mentions come in 755 756 log.info("loaded state: {d} entities, {d} users, {d} edges (activity resets on restart)", .{ 757 graph.count, 758 graph.user_count, 759 edge_count, 760 }); 761} 762 763/// delete stale entity baselines (not seen in 7 days) 764pub fn pruneStaleBaselines() !usize { 765 mutex.lock(); 766 defer mutex.unlock(); 767 768 const c = conn orelse return error.NotInitialized; 769 770 // delete baselines where last_seen is older than 7 days 771 _ = c.exec( 772 \\DELETE FROM entity_baseline 773 \\WHERE last_seen < datetime('now', '-7 days') 774 , .{}) catch |err| { 775 log.err("failed to prune stale baselines: {}", .{err}); 776 return err; 777 }; 778 779 // return count of remaining rows for logging 780 var rows = c.rows("SELECT COUNT(*) FROM entity_baseline", .{}) catch return 0; 781 defer rows.deinit(); 782 783 if (rows.next()) |row| { 784 return @intCast(row.int(0)); 785 } 786 return 0; 787}