//! Storage for generic AT Protocol records. //! //! This module provides storage for arbitrary AT Protocol records that don't require //! specialized handling. Records are stored as JSONB with minimal metadata. use std::hash::{Hash, Hasher}; use chrono::{DateTime, Utc}; use ordermap::OrderSet; use serde::{Deserialize, Serialize}; use serde_json::json; use sqlx::FromRow; use super::StoragePool; use super::errors::StorageError; /// Model for a generic AT Protocol record #[derive(Clone, FromRow, Debug)] pub struct AtprotoRecord { pub aturi: String, pub did: String, pub cid: String, pub collection: String, pub indexed_at: DateTime, pub record: sqlx::types::Json, } /// Insert or update a generic AT Protocol record pub async fn atproto_record_upsert( pool: &StoragePool, aturi: &str, did: &str, cid: &str, collection: &str, record: &serde_json::Value, ) -> Result<(), StorageError> { let mut tx = pool .begin() .await .map_err(StorageError::CannotBeginDatabaseTransaction)?; let now = Utc::now(); sqlx::query( "INSERT INTO atproto_records (aturi, did, cid, collection, indexed_at, record) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (aturi) DO UPDATE SET cid = $3, indexed_at = $5, record = $6", ) .bind(aturi) .bind(did) .bind(cid) .bind(collection) .bind(now) .bind(json!(record)) .execute(tx.as_mut()) .await .map_err(StorageError::UnableToExecuteQuery)?; tx.commit() .await .map_err(StorageError::CannotCommitDatabaseTransaction) } /// Delete a generic AT Protocol record by AT-URI pub async fn atproto_record_delete(pool: &StoragePool, aturi: &str) -> Result<(), StorageError> { let mut tx = pool .begin() .await .map_err(StorageError::CannotBeginDatabaseTransaction)?; sqlx::query("DELETE FROM atproto_records WHERE aturi = $1") .bind(aturi) .execute(tx.as_mut()) .await .map_err(StorageError::UnableToExecuteQuery)?; tx.commit() .await .map_err(StorageError::CannotCommitDatabaseTransaction) } /// A location suggestion extracted from AT Protocol records. /// /// Contains both address fields and geo coordinates when available. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LocationSuggestion { pub source: Option, // Address fields pub name: Option, pub street: Option, pub locality: Option, pub region: Option, pub postal_code: Option, pub country: Option, // Geo coordinates pub latitude: Option, pub longitude: Option, } impl PartialEq for LocationSuggestion { fn eq(&self, other: &Self) -> bool { // Compare only location fields, not source self.name == other.name && self.street == other.street && self.locality == other.locality && self.region == other.region && self.postal_code == other.postal_code && self.country == other.country && self.latitude == other.latitude && self.longitude == other.longitude } } impl Eq for LocationSuggestion {} impl Hash for LocationSuggestion { fn hash(&self, state: &mut H) { // Hash only location fields, not source self.name.hash(state); self.street.hash(state); self.locality.hash(state); self.region.hash(state); self.postal_code.hash(state); self.country.hash(state); self.latitude.hash(state); self.longitude.hash(state); } } /// Query location records for a user, returning most recent locations. /// /// Queries locations from multiple sources: /// - Up to 100 most recent `app.beaconbits.bookmark.item`, `app.beaconbits.beacon`, `app.dropanchor.checkin` from atproto_records /// - Locations from up to 100 most recent events created by the user /// /// Results are ordered by timestamp descending and deduplicated based on location fields /// (name, street, locality, region, postal_code, country, latitude, longitude). pub async fn atproto_record_get_location_suggestions( pool: &StoragePool, did: &str, ) -> Result, StorageError> { // Fetch up to 100 records from each source let fetch_limit: i64 = 100; // Query 1: beaconbits/dropanchor from atproto_records let atproto_rows: Vec<(String, DateTime, sqlx::types::Json)> = sqlx::query_as( r#" SELECT aturi, indexed_at, record FROM atproto_records WHERE did = $1 AND collection IN ( 'app.beaconbits.bookmark.item', 'app.beaconbits.beacon', 'app.dropanchor.checkin' ) ORDER BY indexed_at DESC LIMIT $2 "#, ) .bind(did) .bind(fetch_limit) .fetch_all(pool) .await .map_err(StorageError::UnableToExecuteQuery)?; // Query 2: events with locations created by this user let event_rows: Vec<( String, Option>, sqlx::types::Json, )> = sqlx::query_as( r#" SELECT aturi, updated_at, record FROM events WHERE did = $1 AND json_array_length(COALESCE(record->'locations', '[]'::json)) > 0 ORDER BY updated_at DESC NULLS LAST LIMIT $2 "#, ) .bind(did) .bind(fetch_limit) .fetch_all(pool) .await .map_err(StorageError::UnableToExecuteQuery)?; // Collect all suggestions with (priority, timestamp) for sorting // Priority: 0 = beaconbits/dropanchor (higher priority), 1 = smokesignal events let mut suggestions_with_priority: Vec<(u8, DateTime, LocationSuggestion)> = Vec::new(); // Process atproto records (beaconbits/dropanchor) - priority 0 for (aturi, indexed_at, record) in atproto_rows { let r = &record.0; // Try addressDetails (beaconbits) first, then address (dropanchor) let address = r.get("addressDetails").or_else(|| r.get("address")); // Try location (beaconbits) first, then geo (dropanchor) let geo = r.get("location").or_else(|| r.get("geo")); let suggestion = LocationSuggestion { source: Some(aturi), name: address .and_then(|a| a.get("name")) .and_then(|v| v.as_str()) .map(String::from), street: address .and_then(|a| a.get("street")) .and_then(|v| v.as_str()) .map(String::from), locality: address .and_then(|a| a.get("locality")) .and_then(|v| v.as_str()) .map(String::from), region: address .and_then(|a| a.get("region")) .and_then(|v| v.as_str()) .map(String::from), postal_code: address .and_then(|a| a.get("postalCode")) .and_then(|v| v.as_str()) .map(String::from), country: address .and_then(|a| a.get("country")) .and_then(|v| v.as_str()) .map(String::from), latitude: geo .and_then(|g| g.get("latitude")) .and_then(|v| v.as_str()) .map(String::from), longitude: geo .and_then(|g| g.get("longitude")) .and_then(|v| v.as_str()) .map(String::from), }; suggestions_with_priority.push((0, indexed_at, suggestion)); } // Process event locations - priority 1 (lower than beaconbits/dropanchor) for (aturi, updated_at, record) in event_rows { let timestamp = updated_at.unwrap_or_else(Utc::now); let locations = extract_locations_from_event(&aturi, &record.0); for suggestion in locations { suggestions_with_priority.push((1, timestamp, suggestion)); } } // Sort by priority ascending (0 first), then by timestamp descending suggestions_with_priority.sort_by(|a, b| { match a.0.cmp(&b.0) { std::cmp::Ordering::Equal => b.1.cmp(&a.1), // Same priority: newer first other => other, // Different priority: lower first } }); // Collect into OrderSet (deduplicates based on location fields) let suggestions: OrderSet = suggestions_with_priority .into_iter() .map(|(_, _, s)| s) .collect(); Ok(suggestions) } /// Extract locations from an event record's locations array. /// /// Handles both address and geo location types based on the `$type` field. fn extract_locations_from_event( aturi: &str, record: &serde_json::Value, ) -> Vec { let Some(locations) = record.get("locations").and_then(|l| l.as_array()) else { return vec![]; }; locations .iter() .filter_map(|loc| { let loc_type = loc.get("$type").and_then(|t| t.as_str())?; match loc_type { "community.lexicon.location.address" => Some(LocationSuggestion { source: Some(aturi.to_string()), name: loc.get("name").and_then(|v| v.as_str()).map(String::from), street: loc.get("street").and_then(|v| v.as_str()).map(String::from), locality: loc .get("locality") .and_then(|v| v.as_str()) .map(String::from), region: loc.get("region").and_then(|v| v.as_str()).map(String::from), postal_code: loc .get("postalCode") .and_then(|v| v.as_str()) .map(String::from), country: loc .get("country") .and_then(|v| v.as_str()) .map(String::from), latitude: None, longitude: None, }), "community.lexicon.location.geo" => Some(LocationSuggestion { source: Some(aturi.to_string()), name: loc.get("name").and_then(|v| v.as_str()).map(String::from), street: None, locality: None, region: None, postal_code: None, country: None, latitude: loc .get("latitude") .and_then(|v| v.as_str()) .map(String::from), longitude: loc .get("longitude") .and_then(|v| v.as_str()) .map(String::from), }), _ => None, } }) .collect() }