The smokesignal.events web application

feature: index beaconbits-app records

+279 -11
+14
migrations/20251229000000_atproto_records.sql
··· 1 + -- Generic table for storing AT Protocol records 2 + CREATE TABLE atproto_records ( 3 + aturi VARCHAR(1024) PRIMARY KEY, 4 + did VARCHAR(256) NOT NULL, 5 + cid VARCHAR(256) NOT NULL, 6 + collection VARCHAR(256) NOT NULL, 7 + indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), 8 + record JSONB NOT NULL 9 + ); 10 + 11 + CREATE INDEX idx_atproto_records_did ON atproto_records (did); 12 + CREATE INDEX idx_atproto_records_cid ON atproto_records (cid); 13 + CREATE INDEX idx_atproto_records_collection ON atproto_records (collection); 14 + CREATE INDEX idx_atproto_records_indexed_at ON atproto_records (indexed_at);
+12 -7
src/http/handle_xrpc_search_events.rs
··· 17 17 #[derive(Debug, Deserialize)] 18 18 pub struct SearchEventsParams { 19 19 repository: Option<String>, 20 - query: String, 21 - 22 - #[allow(dead_code)] 20 + query: Option<String>, 21 + #[serde(default)] 22 + location: Vec<String>, 23 23 limit: Option<u32>, 24 24 } 25 25 ··· 53 53 State(web_context): State<WebContext>, 54 54 Query(params): Query<SearchEventsParams>, 55 55 ) -> Result<impl IntoResponse, WebError> { 56 + // Return empty list when no search parameters provided 57 + if params.query.is_none() && params.repository.is_none() && params.location.is_empty() { 58 + return Ok(Json(SearchEventsResponse { results: vec![] }).into_response()); 59 + } 60 + 56 61 let did = if let Some(repository) = params.repository.as_ref() { 57 62 match parse_input(repository) { 58 63 Ok(InputType::Plc(value)) | Ok(InputType::Web(value)) => Some(value), ··· 105 110 106 111 // Perform search using centralized methods 107 112 let limit = params.limit.unwrap_or(10); 108 - let is_upcoming = params.query == "upcoming"; 113 + let is_upcoming = params.query.as_deref() == Some("upcoming"); 109 114 110 115 let event_ids = if is_upcoming { 111 116 // Search for upcoming events ··· 127 132 } 128 133 } 129 134 } else { 130 - // Search by query string 135 + // Search with optional query, DID filter, and location CIDs 131 136 match manager 132 - .search_events_by_query(&params.query, did.as_deref(), limit) 137 + .search_events(params.query.as_deref(), did.as_deref(), &params.location, limit) 133 138 .await 134 139 { 135 140 Ok(ids) => ids, 136 141 Err(err) => { 137 - tracing::error!(?err, "Failed to search events by query"); 142 + tracing::error!(?err, "Failed to search events"); 138 143 return Ok(( 139 144 StatusCode::INTERNAL_SERVER_ERROR, 140 145 Json(ErrorResponse {
+38
src/processor.rs
··· 52 52 use crate::storage::event::event_insert_with_metadata; 53 53 use crate::storage::event::rsvp_delete; 54 54 use crate::storage::event::rsvp_insert_with_metadata; 55 + use crate::storage::atproto_record::{atproto_record_delete, atproto_record_upsert}; 55 56 use crate::storage::profile::profile_delete; 56 57 use crate::storage::profile::profile_insert; 57 58 use atproto_record::lexicon::community::lexicon::calendar::event::{ ··· 60 61 use atproto_record::lexicon::community::lexicon::calendar::rsvp::{ 61 62 NSID as LexiconCommunityRSVPNSID, Rsvp, RsvpStatus, 62 63 }; 64 + 65 + const BEACONBITS_BOOKMARK_NSID: &str = "app.beaconbits.bookmark.item"; 66 + const BEACONBITS_BEACON_NSID: &str = "app.beaconbits.beacon"; 63 67 64 68 pub struct ContentFetcher { 65 69 pool: StoragePool, ··· 132 136 "events.smokesignal.calendar.acceptance" => { 133 137 self.handle_acceptance_commit(did, rkey, cid, record).await 134 138 } 139 + BEACONBITS_BOOKMARK_NSID | BEACONBITS_BEACON_NSID => { 140 + self.handle_atproto_record_commit(did, collection, rkey, cid, record) 141 + .await 142 + } 135 143 _ => Ok(()), 136 144 } 137 145 } ··· 154 162 "events.smokesignal.profile" => self.handle_profile_delete(did, rkey).await, 155 163 "events.smokesignal.calendar.acceptance" => { 156 164 self.handle_acceptance_delete(did, rkey).await 165 + } 166 + BEACONBITS_BOOKMARK_NSID | BEACONBITS_BEACON_NSID => { 167 + self.handle_atproto_record_delete(did, collection, rkey) 168 + .await 157 169 } 158 170 _ => Ok(()), 159 171 } ··· 431 443 let aturi = build_aturi(did, AcceptanceNSID, rkey); 432 444 acceptance_record_delete(&self.pool, &aturi).await?; 433 445 tracing::info!("Acceptance deleted: {}", aturi); 446 + Ok(()) 447 + } 448 + 449 + async fn handle_atproto_record_commit( 450 + &self, 451 + did: &str, 452 + collection: &str, 453 + rkey: &str, 454 + cid: &str, 455 + record: Value, 456 + ) -> Result<()> { 457 + let aturi = build_aturi(did, collection, rkey); 458 + atproto_record_upsert(&self.pool, &aturi, did, cid, collection, &record).await?; 459 + tracing::info!("Stored atproto record: {} ({})", aturi, collection); 460 + Ok(()) 461 + } 462 + 463 + async fn handle_atproto_record_delete( 464 + &self, 465 + did: &str, 466 + collection: &str, 467 + rkey: &str, 468 + ) -> Result<()> { 469 + let aturi = build_aturi(did, collection, rkey); 470 + atproto_record_delete(&self.pool, &aturi).await?; 471 + tracing::info!("Deleted atproto record: {}", aturi); 434 472 Ok(()) 435 473 } 436 474
+100
src/search_index.rs
··· 1 1 use anyhow::Result; 2 + use atproto_attestation::create_dagbor_cid; 2 3 use atproto_identity::resolve::IdentityResolver; 3 4 use opensearch::{ 4 5 DeleteParts, IndexParts, OpenSearch, SearchParts, ··· 12 13 use crate::atproto::utils::get_event_hashtags; 13 14 use crate::storage::{event::event_list, identity_profile::handle_for_did, StoragePool}; 14 15 use atproto_record::lexicon::community::lexicon::calendar::event::Event; 16 + use atproto_record::lexicon::community::lexicon::location::LocationOrRef; 17 + 18 + /// Generate a DAG-CBOR CID from a LocationOrRef value. 19 + fn generate_location_cid(location: &LocationOrRef) -> Option<String> { 20 + let json_value = serde_json::to_value(location).ok()?; 21 + let cid = create_dagbor_cid(&json_value).ok()?; 22 + Some(cid.to_string()) 23 + } 15 24 16 25 const INDEX_NAME: &str = "smokesignal-events"; 17 26 ··· 23 32 pub name: String, 24 33 pub description: Option<String>, 25 34 pub tags: Vec<String>, 35 + #[serde(default)] 36 + pub location_cids: Vec<String>, 26 37 pub start_time: Option<String>, 27 38 pub end_time: Option<String>, 28 39 pub created_at: String, ··· 268 279 "name": { "type": "text" }, 269 280 "description": { "type": "text" }, 270 281 "tags": { "type": "keyword" }, 282 + "location_cids": { "type": "keyword" }, 271 283 "start_time": { "type": "date" }, 272 284 "end_time": { "type": "date" }, 273 285 "created_at": { "type": "date" }, ··· 318 330 // Extract hashtags from facets 319 331 let tags = get_event_hashtags(&event_record); 320 332 333 + // Generate location CIDs 334 + let location_cids: Vec<String> = event_record 335 + .locations 336 + .iter() 337 + .filter_map(generate_location_cid) 338 + .collect(); 339 + 321 340 let mut doc = json!({ 322 341 "aturi": event.aturi, 323 342 "did": event.did, ··· 325 344 "name": event_record.name, 326 345 "description": event_record.description, 327 346 "tags": tags, 347 + "location_cids": location_cids, 328 348 "created_at": json!(event_record.created_at), 329 349 "updated_at": json!(chrono::Utc::now()) 330 350 }); ··· 453 473 }) 454 474 } 455 475 }; 476 + 477 + let response = self 478 + .client 479 + .search(SearchParts::Index(&[INDEX_NAME])) 480 + .from(0) 481 + .size(limit as i64) 482 + .body(search_body) 483 + .send() 484 + .await?; 485 + 486 + if !response.status_code().is_success() { 487 + return Err(anyhow::anyhow!("Search query failed")); 488 + } 489 + 490 + let body = response.json::<Value>().await?; 491 + 492 + let mut event_uris = Vec::new(); 493 + if let Some(outer_hits) = body.get("hits") 494 + && let Some(inner_hits) = outer_hits.get("hits") 495 + && let Some(hits) = inner_hits.as_array() 496 + { 497 + for hit in hits { 498 + if let Some(document_id) = hit.get("_id").and_then(|v| v.as_str()) { 499 + event_uris.push(document_id.to_string()); 500 + } 501 + } 502 + } 503 + 504 + Ok(event_uris) 505 + } 506 + 507 + /// Search events with optional query, DID filter, and location CIDs. 508 + /// 509 + /// Returns a list of AT-URIs matching the search criteria. 510 + /// - Multiple location CIDs are OR'd together (match any of the locations) 511 + /// - Location filter is AND'd with text query and DID filter 512 + /// 513 + /// # Arguments 514 + /// * `query` - Optional text query to search for 515 + /// * `did_filter` - Optional DID to filter events by repository 516 + /// * `location_cids` - Location CIDs to filter by (OR'd together) 517 + /// * `limit` - Maximum number of results to return 518 + pub async fn search_events( 519 + &self, 520 + query: Option<&str>, 521 + did_filter: Option<&str>, 522 + location_cids: &[String], 523 + limit: u32, 524 + ) -> Result<Vec<String>> { 525 + // Build the query based on provided parameters 526 + let mut must_clauses: Vec<Value> = Vec::new(); 527 + 528 + // Add DID filter if provided 529 + if let Some(did) = did_filter { 530 + must_clauses.push(json!({"term": {"did": did}})); 531 + } 532 + 533 + // Add text query if provided 534 + if let Some(q) = query { 535 + must_clauses.push(json!({ 536 + "simple_query_string": { 537 + "query": q, 538 + "fields": ["name", "did", "handle", "description"] 539 + } 540 + })); 541 + } 542 + 543 + // Add location CIDs filter if provided (terms query uses OR behavior) 544 + if !location_cids.is_empty() { 545 + must_clauses.push(json!({"terms": {"location_cids": location_cids}})); 546 + } 547 + 548 + let search_body = json!({ 549 + "_source": false, 550 + "query": { 551 + "bool": { 552 + "must": must_clauses 553 + } 554 + } 555 + }); 456 556 457 557 let response = self 458 558 .client
+77
src/storage/atproto_record.rs
··· 1 + //! Storage for generic AT Protocol records. 2 + //! 3 + //! This module provides storage for arbitrary AT Protocol records that don't require 4 + //! specialized handling. Records are stored as JSONB with minimal metadata. 5 + 6 + use chrono::{DateTime, Utc}; 7 + use serde_json::json; 8 + use sqlx::FromRow; 9 + 10 + use super::StoragePool; 11 + use super::errors::StorageError; 12 + 13 + /// Model for a generic AT Protocol record 14 + #[derive(Clone, FromRow, Debug)] 15 + pub struct AtprotoRecord { 16 + pub aturi: String, 17 + pub did: String, 18 + pub cid: String, 19 + pub collection: String, 20 + pub indexed_at: DateTime<Utc>, 21 + pub record: sqlx::types::Json<serde_json::Value>, 22 + } 23 + 24 + /// Insert or update a generic AT Protocol record 25 + pub async fn atproto_record_upsert( 26 + pool: &StoragePool, 27 + aturi: &str, 28 + did: &str, 29 + cid: &str, 30 + collection: &str, 31 + record: &serde_json::Value, 32 + ) -> Result<(), StorageError> { 33 + let mut tx = pool 34 + .begin() 35 + .await 36 + .map_err(StorageError::CannotBeginDatabaseTransaction)?; 37 + 38 + let now = Utc::now(); 39 + 40 + sqlx::query( 41 + "INSERT INTO atproto_records (aturi, did, cid, collection, indexed_at, record) 42 + VALUES ($1, $2, $3, $4, $5, $6) 43 + ON CONFLICT (aturi) DO UPDATE 44 + SET cid = $3, indexed_at = $5, record = $6", 45 + ) 46 + .bind(aturi) 47 + .bind(did) 48 + .bind(cid) 49 + .bind(collection) 50 + .bind(now) 51 + .bind(json!(record)) 52 + .execute(tx.as_mut()) 53 + .await 54 + .map_err(StorageError::UnableToExecuteQuery)?; 55 + 56 + tx.commit() 57 + .await 58 + .map_err(StorageError::CannotCommitDatabaseTransaction) 59 + } 60 + 61 + /// Delete a generic AT Protocol record by AT-URI 62 + pub async fn atproto_record_delete(pool: &StoragePool, aturi: &str) -> Result<(), StorageError> { 63 + let mut tx = pool 64 + .begin() 65 + .await 66 + .map_err(StorageError::CannotBeginDatabaseTransaction)?; 67 + 68 + sqlx::query("DELETE FROM atproto_records WHERE aturi = $1") 69 + .bind(aturi) 70 + .execute(tx.as_mut()) 71 + .await 72 + .map_err(StorageError::UnableToExecuteQuery)?; 73 + 74 + tx.commit() 75 + .await 76 + .map_err(StorageError::CannotCommitDatabaseTransaction) 77 + }
+1
src/storage/mod.rs
··· 1 1 pub mod acceptance; 2 2 pub mod atproto; 3 + pub mod atproto_record; 3 4 pub mod cache; 4 5 pub mod content; 5 6 pub mod denylist;
+2
src/tap_processor.rs
··· 207 207 | "community.lexicon.calendar.event" 208 208 | "events.smokesignal.profile" 209 209 | "events.smokesignal.calendar.acceptance" 210 + | "app.beaconbits.bookmark.item" 211 + | "app.beaconbits.beacon" 210 212 ) 211 213 } 212 214 }
+28 -4
src/task_search_indexer.rs
··· 1 1 use anyhow::Result; 2 + use atproto_attestation::create_dagbor_cid; 2 3 use atproto_identity::{model::Document, resolve::IdentityResolver, traits::DidDocumentStorage}; 3 4 use atproto_record::lexicon::app::bsky::richtext::facet::{Facet, FacetFeature}; 4 5 use atproto_record::lexicon::community::lexicon::calendar::event::NSID as LexiconCommunityEventNSID; ··· 29 30 uri 30 31 } 31 32 32 - /// A lightweight event struct for search indexing that excludes problematic fields like locations. 33 - /// This avoids deserialization errors when event data contains location types not supported 34 - /// by the LocationOrRef enum (e.g., new or unknown location formats). 33 + /// Generate a DAG-CBOR CID from a JSON value. 34 + /// 35 + /// Creates a CIDv1 with DAG-CBOR codec (0x71) and SHA-256 hash (0x12), 36 + /// following the AT Protocol specification for content addressing. 37 + fn generate_location_cid(value: &Value) -> Result<String, SearchIndexerError> { 38 + let cid = create_dagbor_cid(value).map_err(|e| SearchIndexerError::CidGenerationFailed { 39 + error: e.to_string(), 40 + })?; 41 + Ok(cid.to_string()) 42 + } 43 + 44 + /// A lightweight event struct for search indexing. 45 + /// 46 + /// Uses serde_json::Value for locations to avoid deserialization errors 47 + /// when event data contains location types not supported by the LocationOrRef enum. 35 48 #[derive(Deserialize)] 36 49 struct IndexableEvent { 37 50 name: String, ··· 44 57 ends_at: Option<chrono::DateTime<chrono::Utc>>, 45 58 #[serde(rename = "descriptionFacets")] 46 59 facets: Option<Vec<Facet>>, 60 + /// Locations stored as raw JSON values for CID generation. 61 + #[serde(default)] 62 + locations: Vec<Value>, 47 63 } 48 64 49 65 impl IndexableEvent { ··· 134 150 "name": { "type": "text" }, 135 151 "description": { "type": "text" }, 136 152 "tags": { "type": "keyword" }, 153 + "location_cids": { "type": "keyword" }, 137 154 "start_time": { "type": "date" }, 138 155 "end_time": { "type": "date" }, 139 156 "created_at": { "type": "date" }, ··· 235 252 } 236 253 237 254 async fn index_event(&self, did: &str, rkey: &str, record: Value) -> Result<()> { 238 - // Use IndexableEvent which excludes problematic fields like locations 239 255 let event: IndexableEvent = serde_json::from_value(record)?; 240 256 241 257 let document = self.ensure_identity_stored(did).await?; ··· 253 269 // Extract hashtags from facets 254 270 let tags = event.get_hashtags(); 255 271 272 + // Generate CIDs for each location 273 + let location_cids: Vec<String> = event 274 + .locations 275 + .iter() 276 + .filter_map(|loc| generate_location_cid(loc).ok()) 277 + .collect(); 278 + 256 279 let mut doc = json!({ 257 280 "did": did, 258 281 "handle": handle, 259 282 "name": name, 260 283 "description": description, 261 284 "tags": tags, 285 + "location_cids": location_cids, 262 286 "created_at": json!(created_at), 263 287 "updated_at": json!(chrono::Utc::now()) 264 288 });
+7
src/task_search_indexer_errors.rs
··· 12 12 /// and the operation fails with a server error response. 13 13 #[error("error-smokesignal-search-indexer-1 Failed to create index: {error_body}")] 14 14 IndexCreationFailed { error_body: String }, 15 + 16 + /// Error when CID generation fails. 17 + /// 18 + /// This error occurs when serializing location data to DAG-CBOR 19 + /// or generating the multihash for a CID fails. 20 + #[error("error-smokesignal-search-indexer-2 Failed to generate CID: {error}")] 21 + CidGenerationFailed { error: String }, 15 22 }