The smokesignal.events web application
at main 323 lines 11 kB view raw
1//! Storage for generic AT Protocol records. 2//! 3//! This module provides storage for arbitrary AT Protocol records that don't require 4//! specialized handling. Records are stored as JSONB with minimal metadata. 5 6use std::hash::{Hash, Hasher}; 7 8use chrono::{DateTime, Utc}; 9use ordermap::OrderSet; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use sqlx::FromRow; 13 14use super::StoragePool; 15use super::errors::StorageError; 16 17/// Model for a generic AT Protocol record 18#[derive(Clone, FromRow, Debug)] 19pub struct AtprotoRecord { 20 pub aturi: String, 21 pub did: String, 22 pub cid: String, 23 pub collection: String, 24 pub indexed_at: DateTime<Utc>, 25 pub record: sqlx::types::Json<serde_json::Value>, 26} 27 28/// Insert or update a generic AT Protocol record 29pub async fn atproto_record_upsert( 30 pool: &StoragePool, 31 aturi: &str, 32 did: &str, 33 cid: &str, 34 collection: &str, 35 record: &serde_json::Value, 36) -> Result<(), StorageError> { 37 let mut tx = pool 38 .begin() 39 .await 40 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 41 42 let now = Utc::now(); 43 44 sqlx::query( 45 "INSERT INTO atproto_records (aturi, did, cid, collection, indexed_at, record) 46 VALUES ($1, $2, $3, $4, $5, $6) 47 ON CONFLICT (aturi) DO UPDATE 48 SET cid = $3, indexed_at = $5, record = $6", 49 ) 50 .bind(aturi) 51 .bind(did) 52 .bind(cid) 53 .bind(collection) 54 .bind(now) 55 .bind(json!(record)) 56 .execute(tx.as_mut()) 57 .await 58 .map_err(StorageError::UnableToExecuteQuery)?; 59 60 tx.commit() 61 .await 62 .map_err(StorageError::CannotCommitDatabaseTransaction) 63} 64 65/// Delete a generic AT Protocol record by AT-URI 66pub async fn atproto_record_delete(pool: &StoragePool, aturi: &str) -> Result<(), StorageError> { 67 let mut tx = pool 68 .begin() 69 .await 70 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 71 72 sqlx::query("DELETE FROM atproto_records WHERE aturi = $1") 73 .bind(aturi) 74 .execute(tx.as_mut()) 75 .await 76 .map_err(StorageError::UnableToExecuteQuery)?; 77 78 tx.commit() 79 .await 80 .map_err(StorageError::CannotCommitDatabaseTransaction) 81} 82 83/// A location suggestion extracted from AT Protocol records. 84/// 85/// Contains both address fields and geo coordinates when available. 86#[derive(Clone, Debug, Serialize, Deserialize)] 87pub struct LocationSuggestion { 88 pub source: Option<String>, 89 // Address fields 90 pub name: Option<String>, 91 pub street: Option<String>, 92 pub locality: Option<String>, 93 pub region: Option<String>, 94 pub postal_code: Option<String>, 95 pub country: Option<String>, 96 // Geo coordinates 97 pub latitude: Option<String>, 98 pub longitude: Option<String>, 99} 100 101impl PartialEq for LocationSuggestion { 102 fn eq(&self, other: &Self) -> bool { 103 // Compare only location fields, not source 104 self.name == other.name 105 && self.street == other.street 106 && self.locality == other.locality 107 && self.region == other.region 108 && self.postal_code == other.postal_code 109 && self.country == other.country 110 && self.latitude == other.latitude 111 && self.longitude == other.longitude 112 } 113} 114 115impl Eq for LocationSuggestion {} 116 117impl Hash for LocationSuggestion { 118 fn hash<H: Hasher>(&self, state: &mut H) { 119 // Hash only location fields, not source 120 self.name.hash(state); 121 self.street.hash(state); 122 self.locality.hash(state); 123 self.region.hash(state); 124 self.postal_code.hash(state); 125 self.country.hash(state); 126 self.latitude.hash(state); 127 self.longitude.hash(state); 128 } 129} 130 131/// Query location records for a user, returning most recent locations. 132/// 133/// Queries locations from multiple sources: 134/// - Up to 100 most recent `app.beaconbits.bookmark.item`, `app.beaconbits.beacon`, `app.dropanchor.checkin` from atproto_records 135/// - Locations from up to 100 most recent events created by the user 136/// 137/// Results are ordered by timestamp descending and deduplicated based on location fields 138/// (name, street, locality, region, postal_code, country, latitude, longitude). 139pub async fn atproto_record_get_location_suggestions( 140 pool: &StoragePool, 141 did: &str, 142) -> Result<OrderSet<LocationSuggestion>, StorageError> { 143 // Fetch up to 100 records from each source 144 let fetch_limit: i64 = 100; 145 146 // Query 1: beaconbits/dropanchor from atproto_records 147 let atproto_rows: Vec<(String, DateTime<Utc>, sqlx::types::Json<serde_json::Value>)> = 148 sqlx::query_as( 149 r#" 150 SELECT aturi, indexed_at, record 151 FROM atproto_records 152 WHERE did = $1 153 AND collection IN ( 154 'app.beaconbits.bookmark.item', 155 'app.beaconbits.beacon', 156 'app.dropanchor.checkin' 157 ) 158 ORDER BY indexed_at DESC 159 LIMIT $2 160 "#, 161 ) 162 .bind(did) 163 .bind(fetch_limit) 164 .fetch_all(pool) 165 .await 166 .map_err(StorageError::UnableToExecuteQuery)?; 167 168 // Query 2: events with locations created by this user 169 let event_rows: Vec<( 170 String, 171 Option<DateTime<Utc>>, 172 sqlx::types::Json<serde_json::Value>, 173 )> = sqlx::query_as( 174 r#" 175 SELECT aturi, updated_at, record 176 FROM events 177 WHERE did = $1 178 AND json_array_length(COALESCE(record->'locations', '[]'::json)) > 0 179 ORDER BY updated_at DESC NULLS LAST 180 LIMIT $2 181 "#, 182 ) 183 .bind(did) 184 .bind(fetch_limit) 185 .fetch_all(pool) 186 .await 187 .map_err(StorageError::UnableToExecuteQuery)?; 188 189 // Collect all suggestions with (priority, timestamp) for sorting 190 // Priority: 0 = beaconbits/dropanchor (higher priority), 1 = smokesignal events 191 let mut suggestions_with_priority: Vec<(u8, DateTime<Utc>, LocationSuggestion)> = Vec::new(); 192 193 // Process atproto records (beaconbits/dropanchor) - priority 0 194 for (aturi, indexed_at, record) in atproto_rows { 195 let r = &record.0; 196 // Try addressDetails (beaconbits) first, then address (dropanchor) 197 let address = r.get("addressDetails").or_else(|| r.get("address")); 198 // Try location (beaconbits) first, then geo (dropanchor) 199 let geo = r.get("location").or_else(|| r.get("geo")); 200 201 let suggestion = LocationSuggestion { 202 source: Some(aturi), 203 name: address 204 .and_then(|a| a.get("name")) 205 .and_then(|v| v.as_str()) 206 .map(String::from), 207 street: address 208 .and_then(|a| a.get("street")) 209 .and_then(|v| v.as_str()) 210 .map(String::from), 211 locality: address 212 .and_then(|a| a.get("locality")) 213 .and_then(|v| v.as_str()) 214 .map(String::from), 215 region: address 216 .and_then(|a| a.get("region")) 217 .and_then(|v| v.as_str()) 218 .map(String::from), 219 postal_code: address 220 .and_then(|a| a.get("postalCode")) 221 .and_then(|v| v.as_str()) 222 .map(String::from), 223 country: address 224 .and_then(|a| a.get("country")) 225 .and_then(|v| v.as_str()) 226 .map(String::from), 227 latitude: geo 228 .and_then(|g| g.get("latitude")) 229 .and_then(|v| v.as_str()) 230 .map(String::from), 231 longitude: geo 232 .and_then(|g| g.get("longitude")) 233 .and_then(|v| v.as_str()) 234 .map(String::from), 235 }; 236 suggestions_with_priority.push((0, indexed_at, suggestion)); 237 } 238 239 // Process event locations - priority 1 (lower than beaconbits/dropanchor) 240 for (aturi, updated_at, record) in event_rows { 241 let timestamp = updated_at.unwrap_or_else(Utc::now); 242 let locations = extract_locations_from_event(&aturi, &record.0); 243 for suggestion in locations { 244 suggestions_with_priority.push((1, timestamp, suggestion)); 245 } 246 } 247 248 // Sort by priority ascending (0 first), then by timestamp descending 249 suggestions_with_priority.sort_by(|a, b| { 250 match a.0.cmp(&b.0) { 251 std::cmp::Ordering::Equal => b.1.cmp(&a.1), // Same priority: newer first 252 other => other, // Different priority: lower first 253 } 254 }); 255 256 // Collect into OrderSet (deduplicates based on location fields) 257 let suggestions: OrderSet<LocationSuggestion> = suggestions_with_priority 258 .into_iter() 259 .map(|(_, _, s)| s) 260 .collect(); 261 262 Ok(suggestions) 263} 264 265/// Extract locations from an event record's locations array. 266/// 267/// Handles both address and geo location types based on the `$type` field. 268fn extract_locations_from_event( 269 aturi: &str, 270 record: &serde_json::Value, 271) -> Vec<LocationSuggestion> { 272 let Some(locations) = record.get("locations").and_then(|l| l.as_array()) else { 273 return vec![]; 274 }; 275 276 locations 277 .iter() 278 .filter_map(|loc| { 279 let loc_type = loc.get("$type").and_then(|t| t.as_str())?; 280 281 match loc_type { 282 "community.lexicon.location.address" => Some(LocationSuggestion { 283 source: Some(aturi.to_string()), 284 name: loc.get("name").and_then(|v| v.as_str()).map(String::from), 285 street: loc.get("street").and_then(|v| v.as_str()).map(String::from), 286 locality: loc 287 .get("locality") 288 .and_then(|v| v.as_str()) 289 .map(String::from), 290 region: loc.get("region").and_then(|v| v.as_str()).map(String::from), 291 postal_code: loc 292 .get("postalCode") 293 .and_then(|v| v.as_str()) 294 .map(String::from), 295 country: loc 296 .get("country") 297 .and_then(|v| v.as_str()) 298 .map(String::from), 299 latitude: None, 300 longitude: None, 301 }), 302 "community.lexicon.location.geo" => Some(LocationSuggestion { 303 source: Some(aturi.to_string()), 304 name: loc.get("name").and_then(|v| v.as_str()).map(String::from), 305 street: None, 306 locality: None, 307 region: None, 308 postal_code: None, 309 country: None, 310 latitude: loc 311 .get("latitude") 312 .and_then(|v| v.as_str()) 313 .map(String::from), 314 longitude: loc 315 .get("longitude") 316 .and_then(|v| v.as_str()) 317 .map(String::from), 318 }), 319 _ => None, 320 } 321 }) 322 .collect() 323}