Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 688 lines 25 kB view raw
1import actor_validator 2import backfill 3import database/executor.{type Executor} 4import database/repositories/actors 5import database/repositories/jetstream_activity 6import database/repositories/lexicons 7import database/repositories/records 8import database/types.{Inserted, Skipped} 9import gleam/dynamic.{type Dynamic} 10import gleam/dynamic/decode 11import gleam/json 12import gleam/list 13import gleam/option 14import gleam/result 15import gleam/string 16import goose 17import honk 18import honk/errors 19import logging 20import pubsub 21import stats_pubsub 22import timestamp 23 24/// Convert a Dynamic value (Erlang term) to JSON string 25fn dynamic_to_json(value: Dynamic) -> String { 26 // Erlang's json:encode returns an iolist, we need to convert it to a string 27 let iolist = do_json_encode(value) 28 iolist_to_string(iolist) 29} 30 31/// Encode a dynamic value to JSON (returns iolist) 32@external(erlang, "json", "encode") 33fn do_json_encode(value: Dynamic) -> Dynamic 34 35/// Convert an iolist to a string 36@external(erlang, "erlang", "iolist_to_binary") 37fn iolist_to_binary(iolist: Dynamic) -> Dynamic 38 39/// Wrapper to convert iolist to string 40fn iolist_to_string(iolist: Dynamic) -> String { 41 let binary = iolist_to_binary(iolist) 42 // The binary is already a string in Gleam's representation 43 case decode.run(binary, decode.string) { 44 Ok(str) -> str 45 Error(_) -> { 46 logging.log( 47 logging.Warning, 48 "[jetstream] Failed to convert iolist to string", 49 ) 50 string.inspect(iolist) 51 } 52 } 53} 54 55/// Serialize a commit event to JSON string for activity logging 56fn serialize_commit_event( 57 did: String, 58 time_us: Int, 59 commit: goose.CommitData, 60) -> String { 61 let record_json = case commit.record { 62 option.Some(record_data) -> json.string(dynamic_to_json(record_data)) 63 option.None -> json.null() 64 } 65 66 let cid_json = case commit.cid { 67 option.Some(cid) -> json.string(cid) 68 option.None -> json.null() 69 } 70 71 json.object([ 72 #("did", json.string(did)), 73 #("time_us", json.int(time_us)), 74 #( 75 "commit", 76 json.object([ 77 #("rev", json.string(commit.rev)), 78 #("operation", json.string(commit.operation)), 79 #("collection", json.string(commit.collection)), 80 #("rkey", json.string(commit.rkey)), 81 #("record", record_json), 82 #("cid", cid_json), 83 ]), 84 ), 85 ]) 86 |> json.to_string 87} 88 89/// Handle a commit event (create, update, or delete) 90pub fn handle_commit_event( 91 db: Executor, 92 did: String, 93 time_us: Int, 94 commit: goose.CommitData, 95 plc_url: String, 96 collection_ids: List(String), 97 external_collection_ids: List(String), 98) -> Nil { 99 let uri = "at://" <> did <> "/" <> commit.collection <> "/" <> commit.rkey 100 101 // Log activity at entry point - serialize the commit event to JSON 102 let event_json = serialize_commit_event(did, time_us, commit) 103 let timestamp = timestamp.microseconds_to_iso8601(time_us) 104 105 let activity_id = case 106 jetstream_activity.log_activity( 107 db, 108 timestamp, 109 commit.operation, 110 commit.collection, 111 did, 112 event_json, 113 ) 114 { 115 Ok(id) -> option.Some(id) 116 Error(err) -> { 117 logging.log( 118 logging.Warning, 119 "[jetstream] Failed to log activity: " <> string.inspect(err), 120 ) 121 option.None 122 } 123 } 124 125 case commit.operation { 126 "create" | "update" -> { 127 // Extract record and cid from options 128 case commit.record, commit.cid { 129 option.Some(record_data), option.Some(cid_value) -> { 130 // Convert the dynamic record to JSON string using Erlang's json:encode 131 let json_string = dynamic_to_json(record_data) 132 133 // Get lexicons from database for validation 134 case lexicons.get_all(db) { 135 Ok(lexicons) -> { 136 // Parse lexicon JSON strings to Json objects 137 let lexicon_jsons_result = 138 lexicons 139 |> list.try_map(fn(lex) { 140 honk.parse_json_string(lex.json) 141 |> result.map_error(fn(e) { errors.to_string(e) }) 142 }) 143 144 // Parse record JSON string to Json object 145 let record_json_result = 146 honk.parse_json_string(json_string) 147 |> result.map_error(fn(e) { errors.to_string(e) }) 148 149 // Validate record against lexicon 150 case lexicon_jsons_result, record_json_result { 151 Ok(lexicon_jsons), Ok(record_json) -> 152 case 153 honk.validate_record( 154 lexicon_jsons, 155 commit.collection, 156 record_json, 157 ) 158 { 159 Ok(_) -> { 160 // Check if record already exists BEFORE inserting to determine operation type 161 let existing_record = records.get(db, uri) 162 let is_create = case existing_record { 163 Ok([]) -> True 164 // Empty list means record doesn't exist 165 Ok(_) -> False 166 // Non-empty list means record exists 167 Error(_) -> { 168 // Database error - log it and treat as update to be safe 169 logging.log( 170 logging.Warning, 171 "[jetstream] Error checking existing record for " 172 <> uri, 173 ) 174 False 175 } 176 } 177 178 // Ensure actor exists before inserting record 179 case 180 actor_validator.ensure_actor_exists(db, did, plc_url) 181 { 182 Ok(is_new_actor) -> { 183 // If this is a new actor, synchronously backfill all collections 184 // This ensures subscription joins have complete data immediately 185 // We're already in a spawned process per event, so blocking is fine 186 case is_new_actor { 187 True -> { 188 // Publish stats event for new actor 189 stats_pubsub.publish(stats_pubsub.ActorCreated) 190 191 backfill.backfill_collections_for_actor( 192 db, 193 did, 194 collection_ids, 195 external_collection_ids, 196 plc_url, 197 ) 198 } 199 False -> Nil 200 } 201 202 // Continue with record insertion 203 // Validation passed, insert record 204 case 205 records.insert( 206 db, 207 uri, 208 cid_value, 209 did, 210 commit.collection, 211 json_string, 212 ) 213 { 214 Ok(Inserted) -> { 215 logging.log( 216 logging.Info, 217 "[jetstream] " 218 <> case is_create { 219 True -> "create" 220 False -> "update" 221 } 222 <> " " 223 <> commit.collection 224 <> " (" 225 <> commit.rkey 226 <> ") " 227 <> did, 228 ) 229 230 // Update activity status to success 231 case activity_id { 232 option.Some(id) -> { 233 case 234 jetstream_activity.update_status( 235 db, 236 id, 237 "success", 238 option.None, 239 ) 240 { 241 Ok(_) -> 242 // Publish activity event for real-time UI updates 243 stats_pubsub.publish( 244 stats_pubsub.ActivityLogged( 245 id, 246 timestamp, 247 commit.operation, 248 commit.collection, 249 did, 250 "success", 251 option.None, 252 event_json, 253 ), 254 ) 255 Error(_) -> Nil 256 } 257 } 258 option.None -> Nil 259 } 260 261 // Publish event to PubSub for GraphQL subscriptions 262 let operation = case is_create { 263 True -> pubsub.Create 264 False -> pubsub.Update 265 } 266 267 // Convert event timestamp from microseconds to ISO8601 268 let indexed_at = 269 timestamp.microseconds_to_iso8601(time_us) 270 271 let event = 272 pubsub.RecordEvent( 273 uri: uri, 274 cid: cid_value, 275 did: did, 276 collection: commit.collection, 277 value: json_string, 278 indexed_at: indexed_at, 279 operation: operation, 280 ) 281 282 pubsub.publish(event) 283 284 // Publish stats event for real-time stats updates 285 case is_create { 286 True -> 287 stats_pubsub.publish( 288 stats_pubsub.RecordCreated, 289 ) 290 False -> Nil 291 } 292 } 293 Ok(Skipped) -> { 294 logging.log( 295 logging.Info, 296 "[jetstream] skipped (duplicate CID) " 297 <> commit.collection 298 <> " (" 299 <> commit.rkey 300 <> ") " 301 <> did, 302 ) 303 304 // Update activity status to success (but don't increment counters) 305 case activity_id { 306 option.Some(id) -> { 307 case 308 jetstream_activity.update_status( 309 db, 310 id, 311 "success", 312 option.Some("Skipped: duplicate CID"), 313 ) 314 { 315 Ok(_) -> 316 // Publish activity event for real-time UI updates 317 stats_pubsub.publish( 318 stats_pubsub.ActivityLogged( 319 id, 320 timestamp, 321 commit.operation, 322 commit.collection, 323 did, 324 "success", 325 option.Some("Skipped: duplicate CID"), 326 event_json, 327 ), 328 ) 329 Error(_) -> Nil 330 } 331 } 332 option.None -> Nil 333 } 334 // Don't publish RecordCreated event - record wasn't actually created 335 } 336 Error(err) -> { 337 logging.log( 338 logging.Error, 339 "[jetstream] Failed to insert record " 340 <> uri 341 <> ": " 342 <> string.inspect(err), 343 ) 344 345 // Update activity status to error 346 case activity_id { 347 option.Some(id) -> { 348 case 349 jetstream_activity.update_status( 350 db, 351 id, 352 "error", 353 option.Some( 354 "Database insert failed: " 355 <> string.inspect(err), 356 ), 357 ) 358 { 359 Ok(_) -> { 360 let error_msg = 361 "Database insert failed: " 362 <> string.inspect(err) 363 // Publish activity event for real-time UI updates 364 stats_pubsub.publish( 365 stats_pubsub.ActivityLogged( 366 id, 367 timestamp, 368 commit.operation, 369 commit.collection, 370 did, 371 "error", 372 option.Some(error_msg), 373 event_json, 374 ), 375 ) 376 } 377 Error(_) -> Nil 378 } 379 } 380 option.None -> Nil 381 } 382 } 383 } 384 } 385 Error(actor_err) -> { 386 logging.log( 387 logging.Error, 388 "[jetstream] Failed to validate/create actor for " 389 <> uri 390 <> ": " 391 <> actor_err, 392 ) 393 394 // Update activity status to error 395 case activity_id { 396 option.Some(id) -> { 397 case 398 jetstream_activity.update_status( 399 db, 400 id, 401 "error", 402 option.Some( 403 "Actor validation failed: " <> actor_err, 404 ), 405 ) 406 { 407 Ok(_) -> { 408 let error_msg = 409 "Actor validation failed: " <> actor_err 410 // Publish activity event for real-time UI updates 411 stats_pubsub.publish( 412 stats_pubsub.ActivityLogged( 413 id, 414 timestamp, 415 commit.operation, 416 commit.collection, 417 did, 418 "error", 419 option.Some(error_msg), 420 event_json, 421 ), 422 ) 423 } 424 Error(_) -> Nil 425 } 426 } 427 option.None -> Nil 428 } 429 } 430 } 431 } 432 Error(validation_error) -> { 433 logging.log( 434 logging.Warning, 435 "[jetstream] Validation failed for " 436 <> uri 437 <> ": " 438 <> errors.to_string(validation_error), 439 ) 440 441 // Update activity status to validation_error 442 case activity_id { 443 option.Some(id) -> { 444 case 445 jetstream_activity.update_status( 446 db, 447 id, 448 "validation_error", 449 option.Some(errors.to_string(validation_error)), 450 ) 451 { 452 Ok(_) -> { 453 let error_msg = errors.to_string(validation_error) 454 // Publish activity event for real-time UI updates 455 stats_pubsub.publish(stats_pubsub.ActivityLogged( 456 id, 457 timestamp, 458 commit.operation, 459 commit.collection, 460 did, 461 "validation_error", 462 option.Some(error_msg), 463 event_json, 464 )) 465 } 466 Error(_) -> Nil 467 } 468 } 469 option.None -> Nil 470 } 471 } 472 } 473 Error(_lex_parse_err), _ | _, Error(_rec_parse_err) -> { 474 logging.log( 475 logging.Error, 476 "[jetstream] Failed to parse JSON for validation: " <> uri, 477 ) 478 479 // Update activity status to error 480 case activity_id { 481 option.Some(id) -> { 482 let _ = 483 jetstream_activity.update_status( 484 db, 485 id, 486 "error", 487 option.Some("Failed to parse JSON"), 488 ) 489 Nil 490 } 491 option.None -> Nil 492 } 493 } 494 } 495 } 496 Error(db_err) -> { 497 logging.log( 498 logging.Error, 499 "[jetstream] Failed to fetch lexicons for validation: " 500 <> string.inspect(db_err), 501 ) 502 503 // Update activity status to error 504 case activity_id { 505 option.Some(id) -> { 506 let _ = 507 jetstream_activity.update_status( 508 db, 509 id, 510 "error", 511 option.Some( 512 "Failed to fetch lexicons: " <> string.inspect(db_err), 513 ), 514 ) 515 Nil 516 } 517 option.None -> Nil 518 } 519 } 520 } 521 } 522 _, _ -> { 523 logging.log( 524 logging.Warning, 525 "[jetstream] " 526 <> commit.operation 527 <> " event missing record or cid for " 528 <> uri, 529 ) 530 531 // Update activity status to error 532 case activity_id { 533 option.Some(id) -> { 534 let _ = 535 jetstream_activity.update_status( 536 db, 537 id, 538 "error", 539 option.Some("Event missing record or cid"), 540 ) 541 Nil 542 } 543 option.None -> Nil 544 } 545 } 546 } 547 } 548 "delete" -> { 549 logging.log( 550 logging.Info, 551 "[jetstream] delete " 552 <> commit.collection 553 <> " (" 554 <> commit.rkey 555 <> ") " 556 <> did, 557 ) 558 559 case records.delete(db, uri) { 560 Ok(_) -> { 561 // Update activity status to success 562 case activity_id { 563 option.Some(id) -> { 564 case 565 jetstream_activity.update_status(db, id, "success", option.None) 566 { 567 Ok(_) -> 568 // Publish activity event for real-time UI updates 569 stats_pubsub.publish(stats_pubsub.ActivityLogged( 570 id, 571 timestamp, 572 commit.operation, 573 commit.collection, 574 did, 575 "success", 576 option.None, 577 event_json, 578 )) 579 Error(_) -> Nil 580 } 581 } 582 option.None -> Nil 583 } 584 585 // Publish delete event to PubSub for GraphQL subscriptions 586 // Use the event timestamp from the Jetstream event 587 let indexed_at = timestamp.microseconds_to_iso8601(time_us) 588 589 let event = 590 pubsub.RecordEvent( 591 uri: uri, 592 cid: "", 593 did: did, 594 collection: commit.collection, 595 value: "", 596 indexed_at: indexed_at, 597 operation: pubsub.Delete, 598 ) 599 600 pubsub.publish(event) 601 602 // Publish stats event for real-time stats updates 603 stats_pubsub.publish(stats_pubsub.RecordDeleted) 604 } 605 Error(err) -> { 606 logging.log( 607 logging.Error, 608 "[jetstream] Failed to delete: " <> string.inspect(err), 609 ) 610 611 // Update activity status to error 612 case activity_id { 613 option.Some(id) -> { 614 let _ = 615 jetstream_activity.update_status( 616 db, 617 id, 618 "error", 619 option.Some("Delete failed: " <> string.inspect(err)), 620 ) 621 Nil 622 } 623 option.None -> Nil 624 } 625 } 626 } 627 } 628 _ -> { 629 logging.log( 630 logging.Warning, 631 "[jetstream] Unknown operation: " <> commit.operation, 632 ) 633 634 // Update activity status to error 635 case activity_id { 636 option.Some(id) -> { 637 let _ = 638 jetstream_activity.update_status( 639 db, 640 id, 641 "error", 642 option.Some("Unknown operation: " <> commit.operation), 643 ) 644 Nil 645 } 646 option.None -> Nil 647 } 648 } 649 } 650} 651 652/// Handle an identity event (update actor handle) 653pub fn handle_identity_event(db: Executor, identity: goose.IdentityData) -> Nil { 654 case actors.upsert(db, identity.did, identity.handle) { 655 Ok(_) -> { 656 logging.log( 657 logging.Info, 658 "[jetstream] identity update: " 659 <> identity.handle 660 <> " (" 661 <> identity.did 662 <> ")", 663 ) 664 } 665 Error(err) -> { 666 logging.log( 667 logging.Error, 668 "[jetstream] Failed to upsert actor " 669 <> identity.did 670 <> ": " 671 <> string.inspect(err), 672 ) 673 } 674 } 675} 676 677/// Handle an account event 678pub fn handle_account_event(_db: Executor, account: goose.AccountData) -> Nil { 679 // For now, just log account events - we could extend this in the future 680 let status = case account.active { 681 True -> "active" 682 False -> "inactive" 683 } 684 logging.log( 685 logging.Info, 686 "[jetstream] account " <> status <> ": " <> account.did, 687 ) 688}