An ATProtocol powered blogging engine.
1use std::collections::HashMap;
2use std::sync::Arc;
3
4use crate::errors::Result;
5use async_trait::async_trait;
6use atproto_identity::model::Document;
7use atproto_identity::storage::DidDocumentStorage;
8use chrono::Utc;
9use sqlx::Row;
10use sqlx::postgres::PgPool;
11
12use super::{Identity, IdentityStorage, Post, PostReference, PostStorage, Storage};
13
14/// PostgreSQL storage implementation for blog storage operations
15#[derive(Debug, Clone)]
16pub struct PostgresStorage {
17 pool: PgPool,
18}
19
20impl PostgresStorage {
21 /// Create a new PostgreSQL storage instance with the given connection pool.
22 pub fn new(pool: PgPool) -> Self {
23 Self { pool }
24 }
25}
26
27#[async_trait]
28impl PostStorage for PostgresStorage {
29 async fn upsert_post(&self, post: &Post) -> Result<()> {
30 sqlx::query(
31 r#"
32 INSERT INTO posts (aturi, cid, title, slug, content, record_key, created_at, updated_at, record)
33 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
34 ON CONFLICT(aturi) DO UPDATE SET
35 cid = EXCLUDED.cid,
36 title = EXCLUDED.title,
37 slug = EXCLUDED.slug,
38 content = EXCLUDED.content,
39 record_key = EXCLUDED.record_key,
40 created_at = EXCLUDED.created_at,
41 updated_at = EXCLUDED.updated_at,
42 record = EXCLUDED.record
43 "#,
44 )
45 .bind(&post.aturi)
46 .bind(&post.cid)
47 .bind(&post.title)
48 .bind(&post.slug)
49 .bind(&post.content)
50 .bind(&post.record_key)
51 .bind(post.created_at)
52 .bind(post.updated_at)
53 .bind(&post.record)
54 .execute(&self.pool)
55 .await?;
56
57 Ok(())
58 }
59
60 async fn get_post(&self, aturi: &str) -> Result<Option<Post>> {
61 let row = sqlx::query_as::<_, Post>("SELECT * FROM posts WHERE aturi = $1")
62 .bind(aturi)
63 .fetch_optional(&self.pool)
64 .await?;
65
66 Ok(row)
67 }
68
69 async fn get_posts(&self) -> Result<Vec<Post>> {
70 let rows = sqlx::query_as::<_, Post>("SELECT * FROM posts ORDER BY created_at DESC")
71 .fetch_all(&self.pool)
72 .await?;
73
74 Ok(rows)
75 }
76
77 async fn delete_post(&self, aturi: &str) -> Result<Option<Post>> {
78 let post = self.get_post(aturi).await?;
79
80 if post.is_some() {
81 sqlx::query("DELETE FROM posts WHERE aturi = $1")
82 .bind(aturi)
83 .execute(&self.pool)
84 .await?;
85 }
86
87 Ok(post)
88 }
89
90 async fn upsert_post_reference(&self, post_reference: &PostReference) -> Result<bool> {
91 let existing =
92 sqlx::query_as::<_, PostReference>("SELECT * FROM post_references WHERE aturi = $1")
93 .bind(&post_reference.aturi)
94 .fetch_optional(&self.pool)
95 .await?;
96
97 let is_new = existing.is_none();
98
99 sqlx::query(
100 r#"
101 INSERT INTO post_references (aturi, cid, did, collection, post_aturi, discovered_at, record)
102 VALUES ($1, $2, $3, $4, $5, $6, $7)
103 ON CONFLICT(aturi) DO UPDATE SET
104 cid = EXCLUDED.cid,
105 did = EXCLUDED.did,
106 collection = EXCLUDED.collection,
107 post_aturi = EXCLUDED.post_aturi,
108 discovered_at = EXCLUDED.discovered_at,
109 record = EXCLUDED.record
110 "#,
111 )
112 .bind(&post_reference.aturi)
113 .bind(&post_reference.cid)
114 .bind(&post_reference.did)
115 .bind(&post_reference.collection)
116 .bind(&post_reference.post_aturi)
117 .bind(post_reference.discovered_at)
118 .bind(&post_reference.record)
119 .execute(&self.pool)
120 .await?;
121
122 Ok(is_new)
123 }
124
125 async fn delete_post_reference(&self, aturi: &str) -> Result<()> {
126 sqlx::query("DELETE FROM post_references WHERE aturi = $1")
127 .bind(aturi)
128 .execute(&self.pool)
129 .await?;
130
131 Ok(())
132 }
133
134 async fn get_post_reference_count(&self, post_aturi: &str) -> Result<HashMap<String, i64>> {
135 let rows = sqlx::query(
136 r#"
137 SELECT collection, COUNT(*) as count
138 FROM post_references
139 WHERE post_aturi = $1
140 GROUP BY collection
141 "#,
142 )
143 .bind(post_aturi)
144 .fetch_all(&self.pool)
145 .await?;
146
147 let mut count_map = HashMap::new();
148 for row in rows {
149 let collection: String = row.get("collection");
150 let count: i64 = row.get("count");
151 count_map.insert(collection, count);
152 }
153
154 Ok(count_map)
155 }
156
157 async fn get_post_references_for_post(&self, post_aturi: &str) -> Result<Vec<PostReference>> {
158 let rows = sqlx::query_as::<_, PostReference>(
159 "SELECT * FROM post_references WHERE post_aturi = $1 ORDER BY discovered_at DESC",
160 )
161 .bind(post_aturi)
162 .fetch_all(&self.pool)
163 .await?;
164
165 Ok(rows)
166 }
167
168 async fn get_post_references_for_post_for_collection(
169 &self,
170 post_aturi: &str,
171 collection: &str,
172 ) -> Result<Vec<PostReference>> {
173 let rows = sqlx::query_as::<_, PostReference>(
174 "SELECT * FROM post_references WHERE post_aturi = $1 AND collection = $2 ORDER BY discovered_at DESC",
175 )
176 .bind(post_aturi)
177 .bind(collection)
178 .fetch_all(&self.pool)
179 .await?;
180
181 Ok(rows)
182 }
183}
184
185#[async_trait]
186impl IdentityStorage for PostgresStorage {
187 async fn upsert_identity(&self, identity: &Identity) -> Result<()> {
188 sqlx::query(
189 r#"
190 INSERT INTO identities (did, handle, record, created_at, updated_at)
191 VALUES ($1, $2, $3, $4, $5)
192 ON CONFLICT(did) DO UPDATE SET
193 handle = EXCLUDED.handle,
194 record = EXCLUDED.record,
195 updated_at = EXCLUDED.updated_at
196 "#,
197 )
198 .bind(&identity.did)
199 .bind(&identity.handle)
200 .bind(&identity.record)
201 .bind(identity.created_at)
202 .bind(identity.updated_at)
203 .execute(&self.pool)
204 .await?;
205
206 Ok(())
207 }
208
209 async fn get_identity_by_did(&self, did: &str) -> Result<Option<Identity>> {
210 let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE did = $1")
211 .bind(did)
212 .fetch_optional(&self.pool)
213 .await?;
214
215 Ok(row)
216 }
217
218 async fn get_identity_by_handle(&self, handle: &str) -> Result<Option<Identity>> {
219 let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE handle = $1")
220 .bind(handle)
221 .fetch_optional(&self.pool)
222 .await?;
223
224 Ok(row)
225 }
226
227 async fn delete_identity(&self, did: &str) -> Result<Option<Identity>> {
228 let identity = self.get_identity_by_did(did).await?;
229
230 if identity.is_some() {
231 sqlx::query("DELETE FROM identities WHERE did = $1")
232 .bind(did)
233 .execute(&self.pool)
234 .await?;
235 }
236
237 Ok(identity)
238 }
239}
240
241#[async_trait]
242impl Storage for PostgresStorage {
243 async fn migrate(&self) -> Result<()> {
244 // Create identities table with JSONB for better JSON performance
245 sqlx::query(
246 r#"
247 CREATE TABLE IF NOT EXISTS identities (
248 did TEXT PRIMARY KEY,
249 handle TEXT NOT NULL,
250 record JSONB NOT NULL,
251 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
252 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
253 );
254 "#,
255 )
256 .execute(&self.pool)
257 .await?;
258
259 // Create indexes for identities table
260 sqlx::query("CREATE INDEX IF NOT EXISTS idx_identities_handle ON identities(handle)")
261 .execute(&self.pool)
262 .await?;
263
264 sqlx::query(
265 "CREATE INDEX IF NOT EXISTS idx_identities_created_at ON identities(created_at)",
266 )
267 .execute(&self.pool)
268 .await?;
269
270 sqlx::query(
271 "CREATE INDEX IF NOT EXISTS idx_identities_updated_at ON identities(updated_at)",
272 )
273 .execute(&self.pool)
274 .await?;
275
276 // Create posts table with JSONB
277 sqlx::query(
278 r#"
279 CREATE TABLE IF NOT EXISTS posts (
280 aturi TEXT PRIMARY KEY,
281 cid TEXT NOT NULL,
282 title TEXT NOT NULL,
283 slug TEXT NOT NULL UNIQUE,
284 content TEXT NOT NULL,
285 record_key TEXT NOT NULL,
286 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
287 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
288 record JSONB NOT NULL
289 );
290 "#,
291 )
292 .execute(&self.pool)
293 .await?;
294
295 // Create indexes for posts table
296 sqlx::query("CREATE INDEX IF NOT EXISTS idx_posts_cid ON posts(cid)")
297 .execute(&self.pool)
298 .await?;
299
300 sqlx::query("CREATE INDEX IF NOT EXISTS idx_posts_slug ON posts(slug)")
301 .execute(&self.pool)
302 .await?;
303
304 sqlx::query("CREATE INDEX IF NOT EXISTS idx_posts_created_at ON posts(created_at)")
305 .execute(&self.pool)
306 .await?;
307
308 sqlx::query("CREATE INDEX IF NOT EXISTS idx_posts_updated_at ON posts(updated_at)")
309 .execute(&self.pool)
310 .await?;
311
312 sqlx::query("CREATE INDEX IF NOT EXISTS idx_posts_record_key ON posts(record_key)")
313 .execute(&self.pool)
314 .await?;
315
316 // Create post_references table with JSONB
317 sqlx::query(
318 r#"
319 CREATE TABLE IF NOT EXISTS post_references (
320 aturi TEXT PRIMARY KEY,
321 cid TEXT NOT NULL,
322 did TEXT NOT NULL,
323 collection TEXT NOT NULL,
324 post_aturi TEXT NOT NULL,
325 discovered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
326 record JSONB NOT NULL
327 );
328 "#,
329 )
330 .execute(&self.pool)
331 .await?;
332
333 // Create indexes for post_references table
334 sqlx::query("CREATE INDEX IF NOT EXISTS idx_post_references_cid ON post_references(cid)")
335 .execute(&self.pool)
336 .await?;
337
338 sqlx::query("CREATE INDEX IF NOT EXISTS idx_post_references_did ON post_references(did)")
339 .execute(&self.pool)
340 .await?;
341
342 sqlx::query(
343 "CREATE INDEX IF NOT EXISTS idx_post_references_collection ON post_references(collection)",
344 )
345 .execute(&self.pool)
346 .await?;
347
348 sqlx::query(
349 "CREATE INDEX IF NOT EXISTS idx_post_references_discovered_at ON post_references(discovered_at)",
350 )
351 .execute(&self.pool)
352 .await?;
353
354 sqlx::query(
355 "CREATE INDEX IF NOT EXISTS idx_post_references_post_aturi ON post_references(post_aturi)",
356 )
357 .execute(&self.pool)
358 .await?;
359
360 Ok(())
361 }
362}
363
364/// PostgreSQL-specific DID document storage adapter
365pub struct PostgresStorageDidDocumentStorage {
366 storage: Arc<PostgresStorage>,
367}
368
369impl PostgresStorageDidDocumentStorage {
370 /// Create a new DID document storage instance backed by PostgreSQL.
371 pub fn new(storage: Arc<PostgresStorage>) -> Self {
372 Self { storage }
373 }
374}
375
376#[async_trait]
377impl DidDocumentStorage for PostgresStorageDidDocumentStorage {
378 async fn get_document_by_did(&self, did: &str) -> anyhow::Result<Option<Document>> {
379 if let Some(identity) = self
380 .storage
381 .get_identity_by_did(did)
382 .await
383 .map_err(anyhow::Error::new)?
384 {
385 let document: Document = serde_json::from_value(identity.record)?;
386 Ok(Some(document))
387 } else {
388 Ok(None)
389 }
390 }
391
392 async fn store_document(&self, doc: Document) -> anyhow::Result<()> {
393 let handle = doc
394 .also_known_as
395 .first()
396 .and_then(|aka| aka.strip_prefix("at://"))
397 .unwrap_or("unknown.handle")
398 .to_string();
399
400 // Create a simple JSON representation of the document
401 let record = serde_json::json!(doc);
402
403 let identity = Identity {
404 did: doc.id.clone(),
405 handle,
406 record,
407 created_at: Utc::now(),
408 updated_at: Utc::now(),
409 };
410
411 self.storage
412 .upsert_identity(&identity)
413 .await
414 .map_err(anyhow::Error::new)
415 }
416
417 async fn delete_document_by_did(&self, _did: &str) -> anyhow::Result<()> {
418 Ok(())
419 }
420}