The smokesignal.events web application
1//! Storage for generic AT Protocol records.
2//!
3//! This module provides storage for arbitrary AT Protocol records that don't require
4//! specialized handling. Records are stored as JSONB with minimal metadata.
5
6use std::hash::{Hash, Hasher};
7
8use chrono::{DateTime, Utc};
9use ordermap::OrderSet;
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use sqlx::FromRow;
13
14use super::StoragePool;
15use super::errors::StorageError;
16
17/// Model for a generic AT Protocol record
18#[derive(Clone, FromRow, Debug)]
19pub struct AtprotoRecord {
20 pub aturi: String,
21 pub did: String,
22 pub cid: String,
23 pub collection: String,
24 pub indexed_at: DateTime<Utc>,
25 pub record: sqlx::types::Json<serde_json::Value>,
26}
27
28/// Insert or update a generic AT Protocol record
29pub async fn atproto_record_upsert(
30 pool: &StoragePool,
31 aturi: &str,
32 did: &str,
33 cid: &str,
34 collection: &str,
35 record: &serde_json::Value,
36) -> Result<(), StorageError> {
37 let mut tx = pool
38 .begin()
39 .await
40 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
41
42 let now = Utc::now();
43
44 sqlx::query(
45 "INSERT INTO atproto_records (aturi, did, cid, collection, indexed_at, record)
46 VALUES ($1, $2, $3, $4, $5, $6)
47 ON CONFLICT (aturi) DO UPDATE
48 SET cid = $3, indexed_at = $5, record = $6",
49 )
50 .bind(aturi)
51 .bind(did)
52 .bind(cid)
53 .bind(collection)
54 .bind(now)
55 .bind(json!(record))
56 .execute(tx.as_mut())
57 .await
58 .map_err(StorageError::UnableToExecuteQuery)?;
59
60 tx.commit()
61 .await
62 .map_err(StorageError::CannotCommitDatabaseTransaction)
63}
64
65/// Delete a generic AT Protocol record by AT-URI
66pub async fn atproto_record_delete(pool: &StoragePool, aturi: &str) -> Result<(), StorageError> {
67 let mut tx = pool
68 .begin()
69 .await
70 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
71
72 sqlx::query("DELETE FROM atproto_records WHERE aturi = $1")
73 .bind(aturi)
74 .execute(tx.as_mut())
75 .await
76 .map_err(StorageError::UnableToExecuteQuery)?;
77
78 tx.commit()
79 .await
80 .map_err(StorageError::CannotCommitDatabaseTransaction)
81}
82
83/// A location suggestion extracted from AT Protocol records.
84///
85/// Contains both address fields and geo coordinates when available.
86#[derive(Clone, Debug, Serialize, Deserialize)]
87pub struct LocationSuggestion {
88 pub source: Option<String>,
89 // Address fields
90 pub name: Option<String>,
91 pub street: Option<String>,
92 pub locality: Option<String>,
93 pub region: Option<String>,
94 pub postal_code: Option<String>,
95 pub country: Option<String>,
96 // Geo coordinates
97 pub latitude: Option<String>,
98 pub longitude: Option<String>,
99}
100
101impl PartialEq for LocationSuggestion {
102 fn eq(&self, other: &Self) -> bool {
103 // Compare only location fields, not source
104 self.name == other.name
105 && self.street == other.street
106 && self.locality == other.locality
107 && self.region == other.region
108 && self.postal_code == other.postal_code
109 && self.country == other.country
110 && self.latitude == other.latitude
111 && self.longitude == other.longitude
112 }
113}
114
115impl Eq for LocationSuggestion {}
116
117impl Hash for LocationSuggestion {
118 fn hash<H: Hasher>(&self, state: &mut H) {
119 // Hash only location fields, not source
120 self.name.hash(state);
121 self.street.hash(state);
122 self.locality.hash(state);
123 self.region.hash(state);
124 self.postal_code.hash(state);
125 self.country.hash(state);
126 self.latitude.hash(state);
127 self.longitude.hash(state);
128 }
129}
130
131/// Query location records for a user, returning most recent locations.
132///
133/// Queries locations from multiple sources:
134/// - Up to 100 most recent `app.beaconbits.bookmark.item`, `app.beaconbits.beacon`, `app.dropanchor.checkin` from atproto_records
135/// - Locations from up to 100 most recent events created by the user
136///
137/// Results are ordered by timestamp descending and deduplicated based on location fields
138/// (name, street, locality, region, postal_code, country, latitude, longitude).
139pub async fn atproto_record_get_location_suggestions(
140 pool: &StoragePool,
141 did: &str,
142) -> Result<OrderSet<LocationSuggestion>, StorageError> {
143 // Fetch up to 100 records from each source
144 let fetch_limit: i64 = 100;
145
146 // Query 1: beaconbits/dropanchor from atproto_records
147 let atproto_rows: Vec<(String, DateTime<Utc>, sqlx::types::Json<serde_json::Value>)> =
148 sqlx::query_as(
149 r#"
150 SELECT aturi, indexed_at, record
151 FROM atproto_records
152 WHERE did = $1
153 AND collection IN (
154 'app.beaconbits.bookmark.item',
155 'app.beaconbits.beacon',
156 'app.dropanchor.checkin'
157 )
158 ORDER BY indexed_at DESC
159 LIMIT $2
160 "#,
161 )
162 .bind(did)
163 .bind(fetch_limit)
164 .fetch_all(pool)
165 .await
166 .map_err(StorageError::UnableToExecuteQuery)?;
167
168 // Query 2: events with locations created by this user
169 let event_rows: Vec<(
170 String,
171 Option<DateTime<Utc>>,
172 sqlx::types::Json<serde_json::Value>,
173 )> = sqlx::query_as(
174 r#"
175 SELECT aturi, updated_at, record
176 FROM events
177 WHERE did = $1
178 AND json_array_length(COALESCE(record->'locations', '[]'::json)) > 0
179 ORDER BY updated_at DESC NULLS LAST
180 LIMIT $2
181 "#,
182 )
183 .bind(did)
184 .bind(fetch_limit)
185 .fetch_all(pool)
186 .await
187 .map_err(StorageError::UnableToExecuteQuery)?;
188
189 // Collect all suggestions with (priority, timestamp) for sorting
190 // Priority: 0 = beaconbits/dropanchor (higher priority), 1 = smokesignal events
191 let mut suggestions_with_priority: Vec<(u8, DateTime<Utc>, LocationSuggestion)> = Vec::new();
192
193 // Process atproto records (beaconbits/dropanchor) - priority 0
194 for (aturi, indexed_at, record) in atproto_rows {
195 let r = &record.0;
196 // Try addressDetails (beaconbits) first, then address (dropanchor)
197 let address = r.get("addressDetails").or_else(|| r.get("address"));
198 // Try location (beaconbits) first, then geo (dropanchor)
199 let geo = r.get("location").or_else(|| r.get("geo"));
200
201 let suggestion = LocationSuggestion {
202 source: Some(aturi),
203 name: address
204 .and_then(|a| a.get("name"))
205 .and_then(|v| v.as_str())
206 .map(String::from),
207 street: address
208 .and_then(|a| a.get("street"))
209 .and_then(|v| v.as_str())
210 .map(String::from),
211 locality: address
212 .and_then(|a| a.get("locality"))
213 .and_then(|v| v.as_str())
214 .map(String::from),
215 region: address
216 .and_then(|a| a.get("region"))
217 .and_then(|v| v.as_str())
218 .map(String::from),
219 postal_code: address
220 .and_then(|a| a.get("postalCode"))
221 .and_then(|v| v.as_str())
222 .map(String::from),
223 country: address
224 .and_then(|a| a.get("country"))
225 .and_then(|v| v.as_str())
226 .map(String::from),
227 latitude: geo
228 .and_then(|g| g.get("latitude"))
229 .and_then(|v| v.as_str())
230 .map(String::from),
231 longitude: geo
232 .and_then(|g| g.get("longitude"))
233 .and_then(|v| v.as_str())
234 .map(String::from),
235 };
236 suggestions_with_priority.push((0, indexed_at, suggestion));
237 }
238
239 // Process event locations - priority 1 (lower than beaconbits/dropanchor)
240 for (aturi, updated_at, record) in event_rows {
241 let timestamp = updated_at.unwrap_or_else(Utc::now);
242 let locations = extract_locations_from_event(&aturi, &record.0);
243 for suggestion in locations {
244 suggestions_with_priority.push((1, timestamp, suggestion));
245 }
246 }
247
248 // Sort by priority ascending (0 first), then by timestamp descending
249 suggestions_with_priority.sort_by(|a, b| {
250 match a.0.cmp(&b.0) {
251 std::cmp::Ordering::Equal => b.1.cmp(&a.1), // Same priority: newer first
252 other => other, // Different priority: lower first
253 }
254 });
255
256 // Collect into OrderSet (deduplicates based on location fields)
257 let suggestions: OrderSet<LocationSuggestion> = suggestions_with_priority
258 .into_iter()
259 .map(|(_, _, s)| s)
260 .collect();
261
262 Ok(suggestions)
263}
264
265/// Extract locations from an event record's locations array.
266///
267/// Handles both address and geo location types based on the `$type` field.
268fn extract_locations_from_event(
269 aturi: &str,
270 record: &serde_json::Value,
271) -> Vec<LocationSuggestion> {
272 let Some(locations) = record.get("locations").and_then(|l| l.as_array()) else {
273 return vec![];
274 };
275
276 locations
277 .iter()
278 .filter_map(|loc| {
279 let loc_type = loc.get("$type").and_then(|t| t.as_str())?;
280
281 match loc_type {
282 "community.lexicon.location.address" => Some(LocationSuggestion {
283 source: Some(aturi.to_string()),
284 name: loc.get("name").and_then(|v| v.as_str()).map(String::from),
285 street: loc.get("street").and_then(|v| v.as_str()).map(String::from),
286 locality: loc
287 .get("locality")
288 .and_then(|v| v.as_str())
289 .map(String::from),
290 region: loc.get("region").and_then(|v| v.as_str()).map(String::from),
291 postal_code: loc
292 .get("postalCode")
293 .and_then(|v| v.as_str())
294 .map(String::from),
295 country: loc
296 .get("country")
297 .and_then(|v| v.as_str())
298 .map(String::from),
299 latitude: None,
300 longitude: None,
301 }),
302 "community.lexicon.location.geo" => Some(LocationSuggestion {
303 source: Some(aturi.to_string()),
304 name: loc.get("name").and_then(|v| v.as_str()).map(String::from),
305 street: None,
306 locality: None,
307 region: None,
308 postal_code: None,
309 country: None,
310 latitude: loc
311 .get("latitude")
312 .and_then(|v| v.as_str())
313 .map(String::from),
314 longitude: loc
315 .get("longitude")
316 .and_then(|v| v.as_str())
317 .map(String::from),
318 }),
319 _ => None,
320 }
321 })
322 .collect()
323}