Auto-indexing service and GraphQL API for AT Protocol Records

skip record insert if cid exists

+241 -65
+241 -65
server/src/database.gleam
··· 1 1 import cursor 2 + import gleam/dict.{type Dict} 2 3 import gleam/dynamic/decode 3 4 import gleam/int 4 5 import gleam/list ··· 253 254 sqlight.exec(create_table_sql, conn) 254 255 } 255 256 257 + /// Migration v3: Add CID index for deduplication 258 + fn migration_v3(conn: sqlight.Connection) -> Result(Nil, sqlight.Error) { 259 + logging.log(logging.Info, "Running migration v3 (CID index)...") 260 + 261 + let create_cid_index_sql = 262 + " 263 + CREATE INDEX IF NOT EXISTS idx_record_cid 264 + ON record(cid) 265 + " 266 + 267 + sqlight.exec(create_cid_index_sql, conn) 268 + } 269 + 256 270 /// Runs all pending migrations based on current schema version 257 271 fn run_migrations(conn: sqlight.Connection) -> Result(Nil, sqlight.Error) { 258 272 use current_version <- result.try(get_current_version(conn)) ··· 267 281 // Fresh database or pre-migration database - run v1 268 282 0 -> { 269 283 use _ <- result.try(apply_migration(conn, 1, migration_v1)) 270 - apply_migration(conn, 2, migration_v2) 284 + use _ <- result.try(apply_migration(conn, 2, migration_v2)) 285 + apply_migration(conn, 3, migration_v3) 271 286 } 272 287 273 - // Run v2 migration 274 - 1 -> apply_migration(conn, 2, migration_v2) 288 + // Run v2 and v3 migrations 289 + 1 -> { 290 + use _ <- result.try(apply_migration(conn, 2, migration_v2)) 291 + apply_migration(conn, 3, migration_v3) 292 + } 293 + 294 + // Run v3 migration 295 + 2 -> apply_migration(conn, 3, migration_v3) 275 296 276 297 // Already at latest version 277 - 2 -> { 278 - logging.log(logging.Info, "Schema is up to date (v2)") 298 + 3 -> { 299 + logging.log(logging.Info, "Schema is up to date (v3)") 279 300 Ok(Nil) 280 301 } 281 302 282 303 // Future versions would be handled here: 283 - // 2 -> apply_migration(conn, 3, migration_v3) 284 304 // 3 -> apply_migration(conn, 4, migration_v4) 305 + // 4 -> apply_migration(conn, 5, migration_v5) 285 306 _ -> { 286 307 logging.log( 287 308 logging.Error, ··· 455 476 456 477 // ===== Record Functions ===== 457 478 479 + /// Gets existing CIDs for a list of URIs 480 + /// Returns a Dict mapping URI -> CID for records that exist in the database 481 + fn get_existing_cids( 482 + conn: sqlight.Connection, 483 + uris: List(String), 484 + ) -> Result(Dict(String, String), sqlight.Error) { 485 + case uris { 486 + [] -> Ok(dict.new()) 487 + _ -> { 488 + // Build placeholders for SQL IN clause 489 + let placeholders = 490 + list.map(uris, fn(_) { "?" }) 491 + |> string.join(", ") 492 + 493 + let sql = 494 + " 495 + SELECT uri, cid 496 + FROM record 497 + WHERE uri IN (" <> placeholders <> ") 498 + " 499 + 500 + // Convert URIs to sqlight.Value list 501 + let params = list.map(uris, sqlight.text) 502 + 503 + let decoder = { 504 + use uri <- decode.field(0, decode.string) 505 + use cid <- decode.field(1, decode.string) 506 + decode.success(#(uri, cid)) 507 + } 508 + 509 + use results <- result.try(sqlight.query( 510 + sql, 511 + on: conn, 512 + with: params, 513 + expecting: decoder, 514 + )) 515 + 516 + // Convert list of tuples to Dict 517 + Ok(dict.from_list(results)) 518 + } 519 + } 520 + } 521 + 458 522 /// Inserts or updates a record in the database 523 + /// Skips insertion if the CID already exists in the database (for any URI) 524 + /// Also skips update if the URI exists with the same CID (content unchanged) 459 525 pub fn insert_record( 460 526 conn: sqlight.Connection, 461 527 uri: String, ··· 464 530 collection: String, 465 531 json: String, 466 532 ) -> Result(Nil, sqlight.Error) { 467 - let sql = 468 - " 469 - INSERT INTO record (uri, cid, did, collection, json) 470 - VALUES (?, ?, ?, ?, ?) 471 - ON CONFLICT(uri) DO UPDATE SET 472 - cid = excluded.cid, 473 - json = excluded.json, 474 - indexed_at = datetime('now') 475 - " 533 + // Check if this CID already exists in the database 534 + use existing_cids <- result.try(get_existing_cids(conn, [uri])) 476 535 477 - use _ <- result.try(sqlight.query( 478 - sql, 479 - on: conn, 480 - with: [ 481 - sqlight.text(uri), 482 - sqlight.text(cid), 483 - sqlight.text(did), 484 - sqlight.text(collection), 485 - sqlight.text(json), 486 - ], 487 - expecting: decode.string, 488 - )) 489 - Ok(Nil) 536 + case dict.get(existing_cids, uri) { 537 + // URI exists with same CID - skip update (content unchanged) 538 + Ok(existing_cid) if existing_cid == cid -> Ok(Nil) 539 + // URI exists with different CID - proceed with update 540 + // URI doesn't exist - proceed with insert 541 + _ -> { 542 + // Check if this CID exists for any other URI 543 + let check_cid_sql = 544 + " 545 + SELECT COUNT(*) as count 546 + FROM record 547 + WHERE cid = ? 548 + " 549 + 550 + let count_decoder = { 551 + use count <- decode.field(0, decode.int) 552 + decode.success(count) 553 + } 554 + 555 + use cid_exists <- result.try(case 556 + sqlight.query( 557 + check_cid_sql, 558 + on: conn, 559 + with: [sqlight.text(cid)], 560 + expecting: count_decoder, 561 + ) 562 + { 563 + Ok([count]) if count > 0 -> Ok(True) 564 + Ok(_) -> Ok(False) 565 + Error(err) -> Error(err) 566 + }) 567 + 568 + case cid_exists { 569 + True -> Ok(Nil) 570 + False -> { 571 + let sql = 572 + " 573 + INSERT INTO record (uri, cid, did, collection, json) 574 + VALUES (?, ?, ?, ?, ?) 575 + ON CONFLICT(uri) DO UPDATE SET 576 + cid = excluded.cid, 577 + json = excluded.json, 578 + indexed_at = datetime('now') 579 + " 580 + 581 + use _ <- result.try(sqlight.query( 582 + sql, 583 + on: conn, 584 + with: [ 585 + sqlight.text(uri), 586 + sqlight.text(cid), 587 + sqlight.text(did), 588 + sqlight.text(collection), 589 + sqlight.text(json), 590 + ], 591 + expecting: decode.string, 592 + )) 593 + Ok(Nil) 594 + } 595 + } 596 + } 597 + } 490 598 } 491 599 492 600 /// Batch inserts or updates multiple records in the database 493 601 /// More efficient than individual inserts for large datasets 602 + /// Filters out records where CID already exists or is unchanged 494 603 pub fn batch_insert_records( 495 604 conn: sqlight.Connection, 496 605 records: List(Record), 497 606 ) -> Result(Nil, sqlight.Error) { 498 - // Process records in smaller batches to avoid SQL parameter limits 499 - // SQLite has a default limit of 999 parameters 500 - // Each record uses 5 parameters, so we can safely do 100 records at a time (500 params) 501 - let batch_size = 100 607 + case records { 608 + [] -> Ok(Nil) 609 + _ -> { 610 + // Get all URIs from the incoming records 611 + let uris = list.map(records, fn(record) { record.uri }) 612 + 613 + // Fetch existing CIDs for these URIs 614 + use existing_cids <- result.try(get_existing_cids(conn, uris)) 615 + 616 + // Get all CIDs that already exist in the database (for any URI) 617 + let all_incoming_cids = list.map(records, fn(record) { record.cid }) 618 + let check_all_cids_sql = 619 + " 620 + SELECT cid 621 + FROM record 622 + WHERE cid IN (" 623 + <> string.join(list.map(all_incoming_cids, fn(_) { "?" }), ", ") 624 + <> ") 625 + " 626 + 627 + let cid_decoder = { 628 + use cid <- decode.field(0, decode.string) 629 + decode.success(cid) 630 + } 631 + 632 + use existing_cids_in_db <- result.try(sqlight.query( 633 + check_all_cids_sql, 634 + on: conn, 635 + with: list.map(all_incoming_cids, sqlight.text), 636 + expecting: cid_decoder, 637 + )) 638 + 639 + // Create a set of existing CIDs for fast lookup 640 + let existing_cid_set = dict.from_list( 641 + list.map(existing_cids_in_db, fn(cid) { #(cid, True) }), 642 + ) 643 + 644 + // Filter out records where: 645 + // 1. URI exists with same CID (unchanged) 646 + // 2. CID already exists for a different URI (duplicate content) 647 + let filtered_records = 648 + list.filter(records, fn(record) { 649 + case dict.get(existing_cids, record.uri) { 650 + // URI exists with same CID - skip 651 + Ok(existing_cid) if existing_cid == record.cid -> False 652 + // URI exists with different CID - include (content changed) 653 + Ok(_) -> 654 + case dict.get(existing_cid_set, record.cid) { 655 + Ok(_) -> False 656 + Error(_) -> True 657 + } 658 + // URI doesn't exist - check if CID exists elsewhere 659 + Error(_) -> 660 + case dict.get(existing_cid_set, record.cid) { 661 + Ok(_) -> False 662 + Error(_) -> True 663 + } 664 + } 665 + }) 666 + 667 + case filtered_records { 668 + [] -> Ok(Nil) 669 + _ -> { 670 + // Process records in smaller batches to avoid SQL parameter limits 671 + // SQLite has a default limit of 999 parameters 672 + // Each record uses 5 parameters, so we can safely do 100 records at a time (500 params) 673 + let batch_size = 100 502 674 503 - list.sized_chunk(records, batch_size) 504 - |> list.try_each(fn(batch) { 505 - // Build the SQL with multiple value sets 506 - let value_placeholders = 507 - list.repeat("(?, ?, ?, ?, ?)", list.length(batch)) 508 - |> string.join(", ") 675 + list.sized_chunk(filtered_records, batch_size) 676 + |> list.try_each(fn(batch) { 677 + // Build the SQL with multiple value sets 678 + let value_placeholders = 679 + list.repeat("(?, ?, ?, ?, ?)", list.length(batch)) 680 + |> string.join(", ") 509 681 510 - let sql = " 511 - INSERT INTO record (uri, cid, did, collection, json) 512 - VALUES " <> value_placeholders <> " 513 - ON CONFLICT(uri) DO UPDATE SET 514 - cid = excluded.cid, 515 - json = excluded.json, 516 - indexed_at = datetime('now') 517 - " 682 + let sql = " 683 + INSERT INTO record (uri, cid, did, collection, json) 684 + VALUES " <> value_placeholders <> " 685 + ON CONFLICT(uri) DO UPDATE SET 686 + cid = excluded.cid, 687 + json = excluded.json, 688 + indexed_at = datetime('now') 689 + " 518 690 519 - // Flatten all record parameters into a single list 520 - let params = 521 - list.flat_map(batch, fn(record) { 522 - [ 523 - sqlight.text(record.uri), 524 - sqlight.text(record.cid), 525 - sqlight.text(record.did), 526 - sqlight.text(record.collection), 527 - sqlight.text(record.json), 528 - ] 529 - }) 691 + // Flatten all record parameters into a single list 692 + let params = 693 + list.flat_map(batch, fn(record) { 694 + [ 695 + sqlight.text(record.uri), 696 + sqlight.text(record.cid), 697 + sqlight.text(record.did), 698 + sqlight.text(record.collection), 699 + sqlight.text(record.json), 700 + ] 701 + }) 530 702 531 - use _ <- result.try(sqlight.query( 532 - sql, 533 - on: conn, 534 - with: params, 535 - expecting: decode.string, 536 - )) 537 - Ok(Nil) 538 - }) 703 + use _ <- result.try(sqlight.query( 704 + sql, 705 + on: conn, 706 + with: params, 707 + expecting: decode.string, 708 + )) 709 + Ok(Nil) 710 + }) 711 + } 712 + } 713 + } 714 + } 539 715 } 540 716 541 717 /// Gets a record by URI