An ATProtocol powered blogging engine.
at main 420 lines 13 kB view raw
1use std::collections::HashMap; 2use std::sync::Arc; 3 4use crate::errors::Result; 5use async_trait::async_trait; 6use atproto_identity::model::Document; 7use atproto_identity::storage::DidDocumentStorage; 8use chrono::Utc; 9use sqlx::Row; 10use sqlx::postgres::PgPool; 11 12use super::{Identity, IdentityStorage, Post, PostReference, PostStorage, Storage}; 13 14/// PostgreSQL storage implementation for blog storage operations 15#[derive(Debug, Clone)] 16pub struct PostgresStorage { 17 pool: PgPool, 18} 19 20impl PostgresStorage { 21 /// Create a new PostgreSQL storage instance with the given connection pool. 22 pub fn new(pool: PgPool) -> Self { 23 Self { pool } 24 } 25} 26 27#[async_trait] 28impl PostStorage for PostgresStorage { 29 async fn upsert_post(&self, post: &Post) -> Result<()> { 30 sqlx::query( 31 r#" 32 INSERT INTO posts (aturi, cid, title, slug, content, record_key, created_at, updated_at, record) 33 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 34 ON CONFLICT(aturi) DO UPDATE SET 35 cid = EXCLUDED.cid, 36 title = EXCLUDED.title, 37 slug = EXCLUDED.slug, 38 content = EXCLUDED.content, 39 record_key = EXCLUDED.record_key, 40 created_at = EXCLUDED.created_at, 41 updated_at = EXCLUDED.updated_at, 42 record = EXCLUDED.record 43 "#, 44 ) 45 .bind(&post.aturi) 46 .bind(&post.cid) 47 .bind(&post.title) 48 .bind(&post.slug) 49 .bind(&post.content) 50 .bind(&post.record_key) 51 .bind(post.created_at) 52 .bind(post.updated_at) 53 .bind(&post.record) 54 .execute(&self.pool) 55 .await?; 56 57 Ok(()) 58 } 59 60 async fn get_post(&self, aturi: &str) -> Result<Option<Post>> { 61 let row = sqlx::query_as::<_, Post>("SELECT * FROM posts WHERE aturi = $1") 62 .bind(aturi) 63 .fetch_optional(&self.pool) 64 .await?; 65 66 Ok(row) 67 } 68 69 async fn get_posts(&self) -> Result<Vec<Post>> { 70 let rows = sqlx::query_as::<_, Post>("SELECT * FROM posts ORDER BY created_at DESC") 71 .fetch_all(&self.pool) 72 .await?; 73 74 Ok(rows) 75 } 76 77 async fn delete_post(&self, aturi: &str) -> Result<Option<Post>> { 78 let post = self.get_post(aturi).await?; 79 80 if post.is_some() { 81 sqlx::query("DELETE FROM posts WHERE aturi = $1") 82 .bind(aturi) 83 .execute(&self.pool) 84 .await?; 85 } 86 87 Ok(post) 88 } 89 90 async fn upsert_post_reference(&self, post_reference: &PostReference) -> Result<bool> { 91 let existing = 92 sqlx::query_as::<_, PostReference>("SELECT * FROM post_references WHERE aturi = $1") 93 .bind(&post_reference.aturi) 94 .fetch_optional(&self.pool) 95 .await?; 96 97 let is_new = existing.is_none(); 98 99 sqlx::query( 100 r#" 101 INSERT INTO post_references (aturi, cid, did, collection, post_aturi, discovered_at, record) 102 VALUES ($1, $2, $3, $4, $5, $6, $7) 103 ON CONFLICT(aturi) DO UPDATE SET 104 cid = EXCLUDED.cid, 105 did = EXCLUDED.did, 106 collection = EXCLUDED.collection, 107 post_aturi = EXCLUDED.post_aturi, 108 discovered_at = EXCLUDED.discovered_at, 109 record = EXCLUDED.record 110 "#, 111 ) 112 .bind(&post_reference.aturi) 113 .bind(&post_reference.cid) 114 .bind(&post_reference.did) 115 .bind(&post_reference.collection) 116 .bind(&post_reference.post_aturi) 117 .bind(post_reference.discovered_at) 118 .bind(&post_reference.record) 119 .execute(&self.pool) 120 .await?; 121 122 Ok(is_new) 123 } 124 125 async fn delete_post_reference(&self, aturi: &str) -> Result<()> { 126 sqlx::query("DELETE FROM post_references WHERE aturi = $1") 127 .bind(aturi) 128 .execute(&self.pool) 129 .await?; 130 131 Ok(()) 132 } 133 134 async fn get_post_reference_count(&self, post_aturi: &str) -> Result<HashMap<String, i64>> { 135 let rows = sqlx::query( 136 r#" 137 SELECT collection, COUNT(*) as count 138 FROM post_references 139 WHERE post_aturi = $1 140 GROUP BY collection 141 "#, 142 ) 143 .bind(post_aturi) 144 .fetch_all(&self.pool) 145 .await?; 146 147 let mut count_map = HashMap::new(); 148 for row in rows { 149 let collection: String = row.get("collection"); 150 let count: i64 = row.get("count"); 151 count_map.insert(collection, count); 152 } 153 154 Ok(count_map) 155 } 156 157 async fn get_post_references_for_post(&self, post_aturi: &str) -> Result<Vec<PostReference>> { 158 let rows = sqlx::query_as::<_, PostReference>( 159 "SELECT * FROM post_references WHERE post_aturi = $1 ORDER BY discovered_at DESC", 160 ) 161 .bind(post_aturi) 162 .fetch_all(&self.pool) 163 .await?; 164 165 Ok(rows) 166 } 167 168 async fn get_post_references_for_post_for_collection( 169 &self, 170 post_aturi: &str, 171 collection: &str, 172 ) -> Result<Vec<PostReference>> { 173 let rows = sqlx::query_as::<_, PostReference>( 174 "SELECT * FROM post_references WHERE post_aturi = $1 AND collection = $2 ORDER BY discovered_at DESC", 175 ) 176 .bind(post_aturi) 177 .bind(collection) 178 .fetch_all(&self.pool) 179 .await?; 180 181 Ok(rows) 182 } 183} 184 185#[async_trait] 186impl IdentityStorage for PostgresStorage { 187 async fn upsert_identity(&self, identity: &Identity) -> Result<()> { 188 sqlx::query( 189 r#" 190 INSERT INTO identities (did, handle, record, created_at, updated_at) 191 VALUES ($1, $2, $3, $4, $5) 192 ON CONFLICT(did) DO UPDATE SET 193 handle = EXCLUDED.handle, 194 record = EXCLUDED.record, 195 updated_at = EXCLUDED.updated_at 196 "#, 197 ) 198 .bind(&identity.did) 199 .bind(&identity.handle) 200 .bind(&identity.record) 201 .bind(identity.created_at) 202 .bind(identity.updated_at) 203 .execute(&self.pool) 204 .await?; 205 206 Ok(()) 207 } 208 209 async fn get_identity_by_did(&self, did: &str) -> Result<Option<Identity>> { 210 let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE did = $1") 211 .bind(did) 212 .fetch_optional(&self.pool) 213 .await?; 214 215 Ok(row) 216 } 217 218 async fn get_identity_by_handle(&self, handle: &str) -> Result<Option<Identity>> { 219 let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE handle = $1") 220 .bind(handle) 221 .fetch_optional(&self.pool) 222 .await?; 223 224 Ok(row) 225 } 226 227 async fn delete_identity(&self, did: &str) -> Result<Option<Identity>> { 228 let identity = self.get_identity_by_did(did).await?; 229 230 if identity.is_some() { 231 sqlx::query("DELETE FROM identities WHERE did = $1") 232 .bind(did) 233 .execute(&self.pool) 234 .await?; 235 } 236 237 Ok(identity) 238 } 239} 240 241#[async_trait] 242impl Storage for PostgresStorage { 243 async fn migrate(&self) -> Result<()> { 244 // Create identities table with JSONB for better JSON performance 245 sqlx::query( 246 r#" 247 CREATE TABLE IF NOT EXISTS identities ( 248 did TEXT PRIMARY KEY, 249 handle TEXT NOT NULL, 250 record JSONB NOT NULL, 251 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 252 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() 253 ); 254 "#, 255 ) 256 .execute(&self.pool) 257 .await?; 258 259 // Create indexes for identities table 260 sqlx::query("CREATE INDEX IF NOT EXISTS idx_identities_handle ON identities(handle)") 261 .execute(&self.pool) 262 .await?; 263 264 sqlx::query( 265 "CREATE INDEX IF NOT EXISTS idx_identities_created_at ON identities(created_at)", 266 ) 267 .execute(&self.pool) 268 .await?; 269 270 sqlx::query( 271 "CREATE INDEX IF NOT EXISTS idx_identities_updated_at ON identities(updated_at)", 272 ) 273 .execute(&self.pool) 274 .await?; 275 276 // Create posts table with JSONB 277 sqlx::query( 278 r#" 279 CREATE TABLE IF NOT EXISTS posts ( 280 aturi TEXT PRIMARY KEY, 281 cid TEXT NOT NULL, 282 title TEXT NOT NULL, 283 slug TEXT NOT NULL UNIQUE, 284 content TEXT NOT NULL, 285 record_key TEXT NOT NULL, 286 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 287 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 288 record JSONB NOT NULL 289 ); 290 "#, 291 ) 292 .execute(&self.pool) 293 .await?; 294 295 // Create indexes for posts table 296 sqlx::query("CREATE INDEX IF NOT EXISTS idx_posts_cid ON posts(cid)") 297 .execute(&self.pool) 298 .await?; 299 300 sqlx::query("CREATE INDEX IF NOT EXISTS idx_posts_slug ON posts(slug)") 301 .execute(&self.pool) 302 .await?; 303 304 sqlx::query("CREATE INDEX IF NOT EXISTS idx_posts_created_at ON posts(created_at)") 305 .execute(&self.pool) 306 .await?; 307 308 sqlx::query("CREATE INDEX IF NOT EXISTS idx_posts_updated_at ON posts(updated_at)") 309 .execute(&self.pool) 310 .await?; 311 312 sqlx::query("CREATE INDEX IF NOT EXISTS idx_posts_record_key ON posts(record_key)") 313 .execute(&self.pool) 314 .await?; 315 316 // Create post_references table with JSONB 317 sqlx::query( 318 r#" 319 CREATE TABLE IF NOT EXISTS post_references ( 320 aturi TEXT PRIMARY KEY, 321 cid TEXT NOT NULL, 322 did TEXT NOT NULL, 323 collection TEXT NOT NULL, 324 post_aturi TEXT NOT NULL, 325 discovered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 326 record JSONB NOT NULL 327 ); 328 "#, 329 ) 330 .execute(&self.pool) 331 .await?; 332 333 // Create indexes for post_references table 334 sqlx::query("CREATE INDEX IF NOT EXISTS idx_post_references_cid ON post_references(cid)") 335 .execute(&self.pool) 336 .await?; 337 338 sqlx::query("CREATE INDEX IF NOT EXISTS idx_post_references_did ON post_references(did)") 339 .execute(&self.pool) 340 .await?; 341 342 sqlx::query( 343 "CREATE INDEX IF NOT EXISTS idx_post_references_collection ON post_references(collection)", 344 ) 345 .execute(&self.pool) 346 .await?; 347 348 sqlx::query( 349 "CREATE INDEX IF NOT EXISTS idx_post_references_discovered_at ON post_references(discovered_at)", 350 ) 351 .execute(&self.pool) 352 .await?; 353 354 sqlx::query( 355 "CREATE INDEX IF NOT EXISTS idx_post_references_post_aturi ON post_references(post_aturi)", 356 ) 357 .execute(&self.pool) 358 .await?; 359 360 Ok(()) 361 } 362} 363 364/// PostgreSQL-specific DID document storage adapter 365pub struct PostgresStorageDidDocumentStorage { 366 storage: Arc<PostgresStorage>, 367} 368 369impl PostgresStorageDidDocumentStorage { 370 /// Create a new DID document storage instance backed by PostgreSQL. 371 pub fn new(storage: Arc<PostgresStorage>) -> Self { 372 Self { storage } 373 } 374} 375 376#[async_trait] 377impl DidDocumentStorage for PostgresStorageDidDocumentStorage { 378 async fn get_document_by_did(&self, did: &str) -> anyhow::Result<Option<Document>> { 379 if let Some(identity) = self 380 .storage 381 .get_identity_by_did(did) 382 .await 383 .map_err(anyhow::Error::new)? 384 { 385 let document: Document = serde_json::from_value(identity.record)?; 386 Ok(Some(document)) 387 } else { 388 Ok(None) 389 } 390 } 391 392 async fn store_document(&self, doc: Document) -> anyhow::Result<()> { 393 let handle = doc 394 .also_known_as 395 .first() 396 .and_then(|aka| aka.strip_prefix("at://")) 397 .unwrap_or("unknown.handle") 398 .to_string(); 399 400 // Create a simple JSON representation of the document 401 let record = serde_json::json!(doc); 402 403 let identity = Identity { 404 did: doc.id.clone(), 405 handle, 406 record, 407 created_at: Utc::now(), 408 updated_at: Utc::now(), 409 }; 410 411 self.storage 412 .upsert_identity(&identity) 413 .await 414 .map_err(anyhow::Error::new) 415 } 416 417 async fn delete_document_by_did(&self, _did: &str) -> anyhow::Result<()> { 418 Ok(()) 419 } 420}