A Rust application to showcase badge awards in the AT Protocol ecosystem.
at main 473 lines 15 kB view raw
1use std::sync::Arc; 2 3use crate::errors::Result; 4use async_trait::async_trait; 5use atproto_identity::model::Document; 6use atproto_identity::storage::DidDocumentStorage; 7use chrono::Utc; 8use sqlx::sqlite::SqlitePool; 9 10use super::{Award, AwardWithBadge, Badge, Identity, Storage}; 11 12/// SQLite storage implementation for badges and awards. 13#[derive(Debug, Clone)] 14pub struct SqliteStorage { 15 pool: SqlitePool, 16} 17 18impl SqliteStorage { 19 /// Create a new SQLite storage instance with the given connection pool. 20 pub fn new(pool: SqlitePool) -> Self { 21 Self { pool } 22 } 23 24 async fn migrate(&self) -> Result<()> { 25 sqlx::query( 26 r#" 27 CREATE TABLE IF NOT EXISTS identities ( 28 did TEXT PRIMARY KEY, 29 handle TEXT NOT NULL, 30 record JSON NOT NULL, 31 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 32 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP 33 ); 34 "#, 35 ) 36 .execute(&self.pool) 37 .await?; 38 39 sqlx::query( 40 r#" 41 CREATE INDEX IF NOT EXISTS idx_identities_handle ON identities(handle); 42 CREATE INDEX IF NOT EXISTS idx_identities_created_at ON identities(created_at); 43 CREATE INDEX IF NOT EXISTS idx_identities_updated_at ON identities(updated_at); 44 "#, 45 ) 46 .execute(&self.pool) 47 .await?; 48 49 sqlx::query( 50 r#" 51 CREATE TABLE IF NOT EXISTS awards ( 52 aturi TEXT PRIMARY KEY, 53 cid TEXT NOT NULL, 54 did TEXT NOT NULL, 55 badge TEXT NOT NULL, 56 badge_cid TEXT NOT NULL, 57 badge_name TEXT NOT NULL, 58 validated_issuers JSON NOT NULL, 59 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 60 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 61 record JSON NOT NULL 62 ); 63 "#, 64 ) 65 .execute(&self.pool) 66 .await?; 67 68 sqlx::query( 69 r#" 70 CREATE INDEX IF NOT EXISTS idx_awards_did ON awards(did); 71 CREATE INDEX IF NOT EXISTS idx_awards_badge ON awards(badge); 72 CREATE INDEX IF NOT EXISTS idx_awards_badge_cid ON awards(badge_cid); 73 CREATE INDEX IF NOT EXISTS idx_awards_created_at ON awards(created_at); 74 CREATE INDEX IF NOT EXISTS idx_awards_updated_at ON awards(updated_at); 75 "#, 76 ) 77 .execute(&self.pool) 78 .await?; 79 80 sqlx::query( 81 r#" 82 CREATE TABLE IF NOT EXISTS badges ( 83 aturi TEXT NOT NULL, 84 cid TEXT NOT NULL, 85 name TEXT NOT NULL, 86 image TEXT, 87 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 88 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 89 count INTEGER NOT NULL DEFAULT 0, 90 record JSON NOT NULL DEFAULT '{}', 91 PRIMARY KEY (aturi, cid) 92 ); 93 "#, 94 ) 95 .execute(&self.pool) 96 .await?; 97 98 // Add record column to existing badges table if it doesn't exist 99 sqlx::query( 100 r#" 101 ALTER TABLE badges ADD COLUMN record JSON NOT NULL DEFAULT '{}'; 102 "#, 103 ) 104 .execute(&self.pool) 105 .await 106 .ok(); // Ignore error if column already exists 107 108 sqlx::query( 109 r#" 110 CREATE INDEX IF NOT EXISTS idx_badges_aturi ON badges(aturi); 111 CREATE INDEX IF NOT EXISTS idx_badges_cid ON badges(cid); 112 CREATE INDEX IF NOT EXISTS idx_badges_created_at ON badges(created_at); 113 CREATE INDEX IF NOT EXISTS idx_badges_updated_at ON badges(updated_at); 114 CREATE INDEX IF NOT EXISTS idx_badges_record ON badges(json_extract(record, '$.name')); 115 "#, 116 ) 117 .execute(&self.pool) 118 .await?; 119 120 Ok(()) 121 } 122 123 async fn upsert_identity(&self, identity: &Identity) -> Result<()> { 124 sqlx::query( 125 r#" 126 INSERT INTO identities (did, handle, record, created_at, updated_at) 127 VALUES ($1, $2, $3, $4, $5) 128 ON CONFLICT(did) DO UPDATE SET 129 handle = EXCLUDED.handle, 130 record = EXCLUDED.record, 131 updated_at = EXCLUDED.updated_at 132 "#, 133 ) 134 .bind(&identity.did) 135 .bind(&identity.handle) 136 .bind(&identity.record) 137 .bind(identity.created_at) 138 .bind(identity.updated_at) 139 .execute(&self.pool) 140 .await?; 141 142 Ok(()) 143 } 144 145 async fn get_identity_by_did(&self, did: &str) -> Result<Option<Identity>> { 146 let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE did = $1") 147 .bind(did) 148 .fetch_optional(&self.pool) 149 .await?; 150 151 Ok(row) 152 } 153 154 async fn get_identity_by_handle(&self, handle: &str) -> Result<Option<Identity>> { 155 let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE handle = $1") 156 .bind(handle) 157 .fetch_optional(&self.pool) 158 .await?; 159 160 Ok(row) 161 } 162 163 async fn upsert_badge(&self, badge: &Badge) -> Result<()> { 164 sqlx::query( 165 r#" 166 INSERT INTO badges (aturi, cid, name, image, created_at, updated_at, count, record) 167 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 168 ON CONFLICT(aturi, cid) DO UPDATE SET 169 name = EXCLUDED.name, 170 image = EXCLUDED.image, 171 updated_at = EXCLUDED.updated_at, 172 record = EXCLUDED.record 173 "#, 174 ) 175 .bind(&badge.aturi) 176 .bind(&badge.cid) 177 .bind(&badge.name) 178 .bind(&badge.image) 179 .bind(badge.created_at) 180 .bind(badge.updated_at) 181 .bind(badge.count) 182 .bind(&badge.record) 183 .execute(&self.pool) 184 .await?; 185 186 Ok(()) 187 } 188 189 async fn get_badge(&self, aturi: &str, cid: &str) -> Result<Option<Badge>> { 190 let row = sqlx::query_as::<_, Badge>("SELECT * FROM badges WHERE aturi = $1 AND cid = $2") 191 .bind(aturi) 192 .bind(cid) 193 .fetch_optional(&self.pool) 194 .await?; 195 196 Ok(row) 197 } 198 199 async fn increment_badge_count(&self, aturi: &str, cid: &str) -> Result<()> { 200 sqlx::query( 201 r#" 202 UPDATE badges 203 SET count = count + 1, updated_at = CURRENT_TIMESTAMP 204 WHERE aturi = $1 AND cid = $2 205 "#, 206 ) 207 .bind(aturi) 208 .bind(cid) 209 .execute(&self.pool) 210 .await?; 211 212 Ok(()) 213 } 214 215 async fn decrement_badge_count(&self, aturi: &str, cid: &str) -> Result<()> { 216 sqlx::query( 217 r#" 218 UPDATE badges 219 SET count = GREATEST(0, count - 1), updated_at = CURRENT_TIMESTAMP 220 WHERE aturi = $1 AND cid = $2 221 "#, 222 ) 223 .bind(aturi) 224 .bind(cid) 225 .execute(&self.pool) 226 .await?; 227 228 Ok(()) 229 } 230 231 async fn upsert_award(&self, award: &Award) -> Result<bool> { 232 let existing = self.get_award(&award.aturi).await?; 233 let is_new = existing.is_none(); 234 235 sqlx::query( 236 r#" 237 INSERT INTO awards (aturi, cid, did, badge, badge_cid, badge_name, validated_issuers, created_at, updated_at, record) 238 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 239 ON CONFLICT(aturi) DO UPDATE SET 240 cid = EXCLUDED.cid, 241 badge = EXCLUDED.badge, 242 badge_cid = EXCLUDED.badge_cid, 243 badge_name = EXCLUDED.badge_name, 244 validated_issuers = EXCLUDED.validated_issuers, 245 updated_at = EXCLUDED.updated_at, 246 record = EXCLUDED.record 247 "#, 248 ) 249 .bind(&award.aturi) 250 .bind(&award.cid) 251 .bind(&award.did) 252 .bind(&award.badge) 253 .bind(&award.badge_cid) 254 .bind(&award.badge_name) 255 .bind(&award.validated_issuers) 256 .bind(award.created_at) 257 .bind(award.updated_at) 258 .bind(&award.record) 259 .execute(&self.pool) 260 .await?; 261 262 Ok(is_new) 263 } 264 265 async fn get_award(&self, aturi: &str) -> Result<Option<Award>> { 266 let row = sqlx::query_as::<_, Award>("SELECT * FROM awards WHERE aturi = $1") 267 .bind(aturi) 268 .fetch_optional(&self.pool) 269 .await?; 270 271 Ok(row) 272 } 273 274 async fn delete_award(&self, aturi: &str) -> Result<Option<Award>> { 275 let award = self.get_award(aturi).await?; 276 277 if award.is_some() { 278 sqlx::query("DELETE FROM awards WHERE aturi = $1") 279 .bind(aturi) 280 .execute(&self.pool) 281 .await?; 282 } 283 284 Ok(award) 285 } 286 287 async fn trim_awards_for_did(&self, did: &str, max_count: i64) -> Result<()> { 288 sqlx::query( 289 r#" 290 DELETE FROM awards 291 WHERE aturi IN ( 292 SELECT aturi FROM awards 293 WHERE did = $1 294 ORDER BY updated_at DESC 295 LIMIT -1 OFFSET $2 296 ) 297 "#, 298 ) 299 .bind(did) 300 .bind(max_count) 301 .execute(&self.pool) 302 .await?; 303 304 Ok(()) 305 } 306 307 async fn get_recent_awards(&self, limit: i64) -> Result<Vec<AwardWithBadge>> { 308 let awards = 309 sqlx::query_as::<_, Award>("SELECT * FROM awards ORDER BY updated_at DESC LIMIT $1") 310 .bind(limit) 311 .fetch_all(&self.pool) 312 .await?; 313 314 self.enrich_awards_with_details(awards).await 315 } 316 317 async fn get_awards_for_did(&self, did: &str, limit: i64) -> Result<Vec<AwardWithBadge>> { 318 let awards = sqlx::query_as::<_, Award>( 319 "SELECT * FROM awards WHERE did = $1 ORDER BY updated_at DESC LIMIT $2", 320 ) 321 .bind(did) 322 .bind(limit) 323 .fetch_all(&self.pool) 324 .await?; 325 326 self.enrich_awards_with_details(awards).await 327 } 328 329 async fn enrich_awards_with_details(&self, awards: Vec<Award>) -> Result<Vec<AwardWithBadge>> { 330 let mut result = Vec::new(); 331 332 for award in awards { 333 let badge = self.get_badge(&award.badge, &award.badge_cid).await?; 334 let identity = self.get_identity_by_did(&award.did).await?; 335 336 let validated_issuers: Vec<String> = 337 serde_json::from_value(award.validated_issuers.clone()).unwrap_or_default(); 338 339 let mut signer_identities = Vec::new(); 340 for issuer in validated_issuers { 341 if let Ok(Some(signer_identity)) = self.get_identity_by_did(&issuer).await { 342 signer_identities.push(signer_identity); 343 } 344 } 345 346 result.push(AwardWithBadge { 347 award, 348 badge, 349 identity, 350 signer_identities, 351 }); 352 } 353 354 Ok(result) 355 } 356} 357 358#[async_trait] 359impl Storage for SqliteStorage { 360 async fn migrate(&self) -> Result<()> { 361 self.migrate().await 362 } 363 364 async fn upsert_identity(&self, identity: &Identity) -> Result<()> { 365 self.upsert_identity(identity).await 366 } 367 368 async fn get_identity_by_did(&self, did: &str) -> Result<Option<Identity>> { 369 self.get_identity_by_did(did).await 370 } 371 372 async fn get_identity_by_handle(&self, handle: &str) -> Result<Option<Identity>> { 373 self.get_identity_by_handle(handle).await 374 } 375 376 async fn upsert_badge(&self, badge: &Badge) -> Result<()> { 377 self.upsert_badge(badge).await 378 } 379 380 async fn get_badge(&self, aturi: &str, cid: &str) -> Result<Option<Badge>> { 381 self.get_badge(aturi, cid).await 382 } 383 384 async fn increment_badge_count(&self, aturi: &str, cid: &str) -> Result<()> { 385 self.increment_badge_count(aturi, cid).await 386 } 387 388 async fn decrement_badge_count(&self, aturi: &str, cid: &str) -> Result<()> { 389 self.decrement_badge_count(aturi, cid).await 390 } 391 392 async fn upsert_award(&self, award: &Award) -> Result<bool> { 393 self.upsert_award(award).await 394 } 395 396 async fn get_award(&self, aturi: &str) -> Result<Option<Award>> { 397 self.get_award(aturi).await 398 } 399 400 async fn delete_award(&self, aturi: &str) -> Result<Option<Award>> { 401 self.delete_award(aturi).await 402 } 403 404 async fn trim_awards_for_did(&self, did: &str, max_count: i64) -> Result<()> { 405 self.trim_awards_for_did(did, max_count).await 406 } 407 408 async fn get_recent_awards(&self, limit: i64) -> Result<Vec<AwardWithBadge>> { 409 self.get_recent_awards(limit).await 410 } 411 412 async fn get_awards_for_did(&self, did: &str, limit: i64) -> Result<Vec<AwardWithBadge>> { 413 self.get_awards_for_did(did, limit).await 414 } 415} 416 417/// DID document storage implementation using SQLite. 418pub struct SqliteStorageDidDocumentStorage { 419 storage: Arc<SqliteStorage>, 420} 421 422impl SqliteStorageDidDocumentStorage { 423 /// Create a new DID document storage instance backed by SQLite. 424 pub fn new(storage: Arc<SqliteStorage>) -> Self { 425 Self { storage } 426 } 427} 428 429#[async_trait] 430impl DidDocumentStorage for SqliteStorageDidDocumentStorage { 431 async fn get_document_by_did(&self, did: &str) -> anyhow::Result<Option<Document>> { 432 if let Some(identity) = self 433 .storage 434 .get_identity_by_did(did) 435 .await 436 .map_err(anyhow::Error::new)? 437 { 438 let document: Document = serde_json::from_value(identity.record)?; 439 Ok(Some(document)) 440 } else { 441 Ok(None) 442 } 443 } 444 445 async fn store_document(&self, doc: Document) -> anyhow::Result<()> { 446 let handle = doc 447 .also_known_as 448 .first() 449 .and_then(|aka| aka.strip_prefix("at://")) 450 .unwrap_or("unknown.handle") 451 .to_string(); 452 453 // Create a simple JSON representation of the document 454 let record = serde_json::json!(doc); 455 456 let identity = Identity { 457 did: doc.id.clone(), 458 handle, 459 record, 460 created_at: Utc::now(), 461 updated_at: Utc::now(), 462 }; 463 464 self.storage 465 .upsert_identity(&identity) 466 .await 467 .map_err(anyhow::Error::new) 468 } 469 470 async fn delete_document_by_did(&self, _did: &str) -> anyhow::Result<()> { 471 Ok(()) 472 } 473}