A Rust application to showcase badge awards in the AT Protocol ecosystem.
1use std::sync::Arc;
2
3use crate::errors::Result;
4use async_trait::async_trait;
5use atproto_identity::model::Document;
6use atproto_identity::storage::DidDocumentStorage;
7use chrono::Utc;
8use sqlx::sqlite::SqlitePool;
9
10use super::{Award, AwardWithBadge, Badge, Identity, Storage};
11
12/// SQLite storage implementation for badges and awards.
13#[derive(Debug, Clone)]
14pub struct SqliteStorage {
15 pool: SqlitePool,
16}
17
18impl SqliteStorage {
19 /// Create a new SQLite storage instance with the given connection pool.
20 pub fn new(pool: SqlitePool) -> Self {
21 Self { pool }
22 }
23
24 async fn migrate(&self) -> Result<()> {
25 sqlx::query(
26 r#"
27 CREATE TABLE IF NOT EXISTS identities (
28 did TEXT PRIMARY KEY,
29 handle TEXT NOT NULL,
30 record JSON NOT NULL,
31 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
32 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
33 );
34 "#,
35 )
36 .execute(&self.pool)
37 .await?;
38
39 sqlx::query(
40 r#"
41 CREATE INDEX IF NOT EXISTS idx_identities_handle ON identities(handle);
42 CREATE INDEX IF NOT EXISTS idx_identities_created_at ON identities(created_at);
43 CREATE INDEX IF NOT EXISTS idx_identities_updated_at ON identities(updated_at);
44 "#,
45 )
46 .execute(&self.pool)
47 .await?;
48
49 sqlx::query(
50 r#"
51 CREATE TABLE IF NOT EXISTS awards (
52 aturi TEXT PRIMARY KEY,
53 cid TEXT NOT NULL,
54 did TEXT NOT NULL,
55 badge TEXT NOT NULL,
56 badge_cid TEXT NOT NULL,
57 badge_name TEXT NOT NULL,
58 validated_issuers JSON NOT NULL,
59 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
60 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
61 record JSON NOT NULL
62 );
63 "#,
64 )
65 .execute(&self.pool)
66 .await?;
67
68 sqlx::query(
69 r#"
70 CREATE INDEX IF NOT EXISTS idx_awards_did ON awards(did);
71 CREATE INDEX IF NOT EXISTS idx_awards_badge ON awards(badge);
72 CREATE INDEX IF NOT EXISTS idx_awards_badge_cid ON awards(badge_cid);
73 CREATE INDEX IF NOT EXISTS idx_awards_created_at ON awards(created_at);
74 CREATE INDEX IF NOT EXISTS idx_awards_updated_at ON awards(updated_at);
75 "#,
76 )
77 .execute(&self.pool)
78 .await?;
79
80 sqlx::query(
81 r#"
82 CREATE TABLE IF NOT EXISTS badges (
83 aturi TEXT NOT NULL,
84 cid TEXT NOT NULL,
85 name TEXT NOT NULL,
86 image TEXT,
87 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
88 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
89 count INTEGER NOT NULL DEFAULT 0,
90 record JSON NOT NULL DEFAULT '{}',
91 PRIMARY KEY (aturi, cid)
92 );
93 "#,
94 )
95 .execute(&self.pool)
96 .await?;
97
98 // Add record column to existing badges table if it doesn't exist
99 sqlx::query(
100 r#"
101 ALTER TABLE badges ADD COLUMN record JSON NOT NULL DEFAULT '{}';
102 "#,
103 )
104 .execute(&self.pool)
105 .await
106 .ok(); // Ignore error if column already exists
107
108 sqlx::query(
109 r#"
110 CREATE INDEX IF NOT EXISTS idx_badges_aturi ON badges(aturi);
111 CREATE INDEX IF NOT EXISTS idx_badges_cid ON badges(cid);
112 CREATE INDEX IF NOT EXISTS idx_badges_created_at ON badges(created_at);
113 CREATE INDEX IF NOT EXISTS idx_badges_updated_at ON badges(updated_at);
114 CREATE INDEX IF NOT EXISTS idx_badges_record ON badges(json_extract(record, '$.name'));
115 "#,
116 )
117 .execute(&self.pool)
118 .await?;
119
120 Ok(())
121 }
122
123 async fn upsert_identity(&self, identity: &Identity) -> Result<()> {
124 sqlx::query(
125 r#"
126 INSERT INTO identities (did, handle, record, created_at, updated_at)
127 VALUES ($1, $2, $3, $4, $5)
128 ON CONFLICT(did) DO UPDATE SET
129 handle = EXCLUDED.handle,
130 record = EXCLUDED.record,
131 updated_at = EXCLUDED.updated_at
132 "#,
133 )
134 .bind(&identity.did)
135 .bind(&identity.handle)
136 .bind(&identity.record)
137 .bind(identity.created_at)
138 .bind(identity.updated_at)
139 .execute(&self.pool)
140 .await?;
141
142 Ok(())
143 }
144
145 async fn get_identity_by_did(&self, did: &str) -> Result<Option<Identity>> {
146 let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE did = $1")
147 .bind(did)
148 .fetch_optional(&self.pool)
149 .await?;
150
151 Ok(row)
152 }
153
154 async fn get_identity_by_handle(&self, handle: &str) -> Result<Option<Identity>> {
155 let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE handle = $1")
156 .bind(handle)
157 .fetch_optional(&self.pool)
158 .await?;
159
160 Ok(row)
161 }
162
163 async fn upsert_badge(&self, badge: &Badge) -> Result<()> {
164 sqlx::query(
165 r#"
166 INSERT INTO badges (aturi, cid, name, image, created_at, updated_at, count, record)
167 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
168 ON CONFLICT(aturi, cid) DO UPDATE SET
169 name = EXCLUDED.name,
170 image = EXCLUDED.image,
171 updated_at = EXCLUDED.updated_at,
172 record = EXCLUDED.record
173 "#,
174 )
175 .bind(&badge.aturi)
176 .bind(&badge.cid)
177 .bind(&badge.name)
178 .bind(&badge.image)
179 .bind(badge.created_at)
180 .bind(badge.updated_at)
181 .bind(badge.count)
182 .bind(&badge.record)
183 .execute(&self.pool)
184 .await?;
185
186 Ok(())
187 }
188
189 async fn get_badge(&self, aturi: &str, cid: &str) -> Result<Option<Badge>> {
190 let row = sqlx::query_as::<_, Badge>("SELECT * FROM badges WHERE aturi = $1 AND cid = $2")
191 .bind(aturi)
192 .bind(cid)
193 .fetch_optional(&self.pool)
194 .await?;
195
196 Ok(row)
197 }
198
199 async fn increment_badge_count(&self, aturi: &str, cid: &str) -> Result<()> {
200 sqlx::query(
201 r#"
202 UPDATE badges
203 SET count = count + 1, updated_at = CURRENT_TIMESTAMP
204 WHERE aturi = $1 AND cid = $2
205 "#,
206 )
207 .bind(aturi)
208 .bind(cid)
209 .execute(&self.pool)
210 .await?;
211
212 Ok(())
213 }
214
215 async fn decrement_badge_count(&self, aturi: &str, cid: &str) -> Result<()> {
216 sqlx::query(
217 r#"
218 UPDATE badges
219 SET count = GREATEST(0, count - 1), updated_at = CURRENT_TIMESTAMP
220 WHERE aturi = $1 AND cid = $2
221 "#,
222 )
223 .bind(aturi)
224 .bind(cid)
225 .execute(&self.pool)
226 .await?;
227
228 Ok(())
229 }
230
231 async fn upsert_award(&self, award: &Award) -> Result<bool> {
232 let existing = self.get_award(&award.aturi).await?;
233 let is_new = existing.is_none();
234
235 sqlx::query(
236 r#"
237 INSERT INTO awards (aturi, cid, did, badge, badge_cid, badge_name, validated_issuers, created_at, updated_at, record)
238 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
239 ON CONFLICT(aturi) DO UPDATE SET
240 cid = EXCLUDED.cid,
241 badge = EXCLUDED.badge,
242 badge_cid = EXCLUDED.badge_cid,
243 badge_name = EXCLUDED.badge_name,
244 validated_issuers = EXCLUDED.validated_issuers,
245 updated_at = EXCLUDED.updated_at,
246 record = EXCLUDED.record
247 "#,
248 )
249 .bind(&award.aturi)
250 .bind(&award.cid)
251 .bind(&award.did)
252 .bind(&award.badge)
253 .bind(&award.badge_cid)
254 .bind(&award.badge_name)
255 .bind(&award.validated_issuers)
256 .bind(award.created_at)
257 .bind(award.updated_at)
258 .bind(&award.record)
259 .execute(&self.pool)
260 .await?;
261
262 Ok(is_new)
263 }
264
265 async fn get_award(&self, aturi: &str) -> Result<Option<Award>> {
266 let row = sqlx::query_as::<_, Award>("SELECT * FROM awards WHERE aturi = $1")
267 .bind(aturi)
268 .fetch_optional(&self.pool)
269 .await?;
270
271 Ok(row)
272 }
273
274 async fn delete_award(&self, aturi: &str) -> Result<Option<Award>> {
275 let award = self.get_award(aturi).await?;
276
277 if award.is_some() {
278 sqlx::query("DELETE FROM awards WHERE aturi = $1")
279 .bind(aturi)
280 .execute(&self.pool)
281 .await?;
282 }
283
284 Ok(award)
285 }
286
287 async fn trim_awards_for_did(&self, did: &str, max_count: i64) -> Result<()> {
288 sqlx::query(
289 r#"
290 DELETE FROM awards
291 WHERE aturi IN (
292 SELECT aturi FROM awards
293 WHERE did = $1
294 ORDER BY updated_at DESC
295 LIMIT -1 OFFSET $2
296 )
297 "#,
298 )
299 .bind(did)
300 .bind(max_count)
301 .execute(&self.pool)
302 .await?;
303
304 Ok(())
305 }
306
307 async fn get_recent_awards(&self, limit: i64) -> Result<Vec<AwardWithBadge>> {
308 let awards =
309 sqlx::query_as::<_, Award>("SELECT * FROM awards ORDER BY updated_at DESC LIMIT $1")
310 .bind(limit)
311 .fetch_all(&self.pool)
312 .await?;
313
314 self.enrich_awards_with_details(awards).await
315 }
316
317 async fn get_awards_for_did(&self, did: &str, limit: i64) -> Result<Vec<AwardWithBadge>> {
318 let awards = sqlx::query_as::<_, Award>(
319 "SELECT * FROM awards WHERE did = $1 ORDER BY updated_at DESC LIMIT $2",
320 )
321 .bind(did)
322 .bind(limit)
323 .fetch_all(&self.pool)
324 .await?;
325
326 self.enrich_awards_with_details(awards).await
327 }
328
329 async fn enrich_awards_with_details(&self, awards: Vec<Award>) -> Result<Vec<AwardWithBadge>> {
330 let mut result = Vec::new();
331
332 for award in awards {
333 let badge = self.get_badge(&award.badge, &award.badge_cid).await?;
334 let identity = self.get_identity_by_did(&award.did).await?;
335
336 let validated_issuers: Vec<String> =
337 serde_json::from_value(award.validated_issuers.clone()).unwrap_or_default();
338
339 let mut signer_identities = Vec::new();
340 for issuer in validated_issuers {
341 if let Ok(Some(signer_identity)) = self.get_identity_by_did(&issuer).await {
342 signer_identities.push(signer_identity);
343 }
344 }
345
346 result.push(AwardWithBadge {
347 award,
348 badge,
349 identity,
350 signer_identities,
351 });
352 }
353
354 Ok(result)
355 }
356}
357
358#[async_trait]
359impl Storage for SqliteStorage {
360 async fn migrate(&self) -> Result<()> {
361 self.migrate().await
362 }
363
364 async fn upsert_identity(&self, identity: &Identity) -> Result<()> {
365 self.upsert_identity(identity).await
366 }
367
368 async fn get_identity_by_did(&self, did: &str) -> Result<Option<Identity>> {
369 self.get_identity_by_did(did).await
370 }
371
372 async fn get_identity_by_handle(&self, handle: &str) -> Result<Option<Identity>> {
373 self.get_identity_by_handle(handle).await
374 }
375
376 async fn upsert_badge(&self, badge: &Badge) -> Result<()> {
377 self.upsert_badge(badge).await
378 }
379
380 async fn get_badge(&self, aturi: &str, cid: &str) -> Result<Option<Badge>> {
381 self.get_badge(aturi, cid).await
382 }
383
384 async fn increment_badge_count(&self, aturi: &str, cid: &str) -> Result<()> {
385 self.increment_badge_count(aturi, cid).await
386 }
387
388 async fn decrement_badge_count(&self, aturi: &str, cid: &str) -> Result<()> {
389 self.decrement_badge_count(aturi, cid).await
390 }
391
392 async fn upsert_award(&self, award: &Award) -> Result<bool> {
393 self.upsert_award(award).await
394 }
395
396 async fn get_award(&self, aturi: &str) -> Result<Option<Award>> {
397 self.get_award(aturi).await
398 }
399
400 async fn delete_award(&self, aturi: &str) -> Result<Option<Award>> {
401 self.delete_award(aturi).await
402 }
403
404 async fn trim_awards_for_did(&self, did: &str, max_count: i64) -> Result<()> {
405 self.trim_awards_for_did(did, max_count).await
406 }
407
408 async fn get_recent_awards(&self, limit: i64) -> Result<Vec<AwardWithBadge>> {
409 self.get_recent_awards(limit).await
410 }
411
412 async fn get_awards_for_did(&self, did: &str, limit: i64) -> Result<Vec<AwardWithBadge>> {
413 self.get_awards_for_did(did, limit).await
414 }
415}
416
417/// DID document storage implementation using SQLite.
418pub struct SqliteStorageDidDocumentStorage {
419 storage: Arc<SqliteStorage>,
420}
421
422impl SqliteStorageDidDocumentStorage {
423 /// Create a new DID document storage instance backed by SQLite.
424 pub fn new(storage: Arc<SqliteStorage>) -> Self {
425 Self { storage }
426 }
427}
428
429#[async_trait]
430impl DidDocumentStorage for SqliteStorageDidDocumentStorage {
431 async fn get_document_by_did(&self, did: &str) -> anyhow::Result<Option<Document>> {
432 if let Some(identity) = self
433 .storage
434 .get_identity_by_did(did)
435 .await
436 .map_err(anyhow::Error::new)?
437 {
438 let document: Document = serde_json::from_value(identity.record)?;
439 Ok(Some(document))
440 } else {
441 Ok(None)
442 }
443 }
444
445 async fn store_document(&self, doc: Document) -> anyhow::Result<()> {
446 let handle = doc
447 .also_known_as
448 .first()
449 .and_then(|aka| aka.strip_prefix("at://"))
450 .unwrap_or("unknown.handle")
451 .to_string();
452
453 // Create a simple JSON representation of the document
454 let record = serde_json::json!(doc);
455
456 let identity = Identity {
457 did: doc.id.clone(),
458 handle,
459 record,
460 created_at: Utc::now(),
461 updated_at: Utc::now(),
462 };
463
464 self.storage
465 .upsert_identity(&identity)
466 .await
467 .map_err(anyhow::Error::new)
468 }
469
470 async fn delete_document_by_did(&self, _did: &str) -> anyhow::Result<()> {
471 Ok(())
472 }
473}