Alternative ATProto PDS implementation
at oauth 725 lines 25 kB view raw
1//! Blob operations for the actor store 2//! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/blob/mod.rs 3//! blacksky-algorithms/rsky is licensed under the Apache License 2.0 4//! 5//! Modified for SQLite backend 6 7use crate::models::actor_store as models; 8use anyhow::{Result, bail}; 9use axum::body::Bytes; 10use cidv10::Cid; 11use diesel::dsl::{count_distinct, exists, not}; 12use diesel::sql_types::{Integer, Nullable, Text}; 13use diesel::*; 14use futures::{ 15 stream::{self, StreamExt}, 16 try_join, 17}; 18use rsky_common::ipld::sha256_raw_to_cid; 19use rsky_common::now; 20use rsky_lexicon::blob_refs::BlobRef; 21use rsky_lexicon::com::atproto::admin::StatusAttr; 22use rsky_lexicon::com::atproto::repo::ListMissingBlobsRefRecordBlob; 23use rsky_pds::actor_store::blob::{ 24 BlobMetadata, GetBlobMetadataOutput, ListBlobsOpts, ListMissingBlobsOpts, accepted_mime, 25 sha256_stream, 26}; 27use rsky_pds::image; 28use rsky_repo::error::BlobError; 29use rsky_repo::types::{PreparedBlobRef, PreparedWrite}; 30use std::str::FromStr as _; 31 32use super::blob_fs::{BlobStoreFs, ByteStream}; 33 34pub struct GetBlobOutput { 35 pub size: i32, 36 pub mime_type: Option<String>, 37 pub stream: ByteStream, 38} 39 40/// Handles blob operations for an actor store 41pub struct BlobReader { 42 /// SQL-based blob storage 43 pub blobstore: BlobStoreFs, 44 /// DID of the actor 45 pub did: String, 46 /// Database connection 47 pub db: deadpool_diesel::Pool< 48 deadpool_diesel::Manager<SqliteConnection>, 49 deadpool_diesel::sqlite::Object, 50 >, 51} 52 53impl BlobReader { 54 /// Create a new blob reader 55 pub fn new( 56 blobstore: BlobStoreFs, 57 db: deadpool_diesel::Pool< 58 deadpool_diesel::Manager<SqliteConnection>, 59 deadpool_diesel::sqlite::Object, 60 >, 61 ) -> Self { 62 Self { 63 did: blobstore.did.clone(), 64 blobstore, 65 db, 66 } 67 } 68 69 /// Get metadata for a blob by CID 70 pub async fn get_blob_metadata(&self, cid: Cid) -> Result<GetBlobMetadataOutput> { 71 use crate::schema::actor_store::blob::dsl as BlobSchema; 72 73 let did = self.did.clone(); 74 let found = self 75 .db 76 .get() 77 .await? 78 .interact(move |conn| { 79 BlobSchema::blob 80 .filter(BlobSchema::did.eq(did)) 81 .filter(BlobSchema::cid.eq(cid.to_string())) 82 .filter(BlobSchema::takedownRef.is_null()) 83 .select(models::Blob::as_select()) 84 .first(conn) 85 .optional() 86 }) 87 .await 88 .expect("Failed to get blob metadata")?; 89 90 match found { 91 None => bail!("Blob not found"), 92 Some(found) => Ok(GetBlobMetadataOutput { 93 size: found.size, 94 mime_type: Some(found.mime_type), 95 }), 96 } 97 } 98 99 /// Get a blob by CID with metadata and content 100 pub async fn get_blob(&self, cid: Cid) -> Result<GetBlobOutput> { 101 let metadata = self.get_blob_metadata(cid).await?; 102 let blob_stream = match self.blobstore.get_stream(cid).await { 103 Ok(stream) => stream, 104 Err(e) => bail!("Failed to get blob: {}", e), 105 }; 106 107 Ok(GetBlobOutput { 108 size: metadata.size, 109 mime_type: metadata.mime_type, 110 stream: blob_stream, 111 }) 112 } 113 114 /// Get all records that reference a specific blob 115 pub async fn get_records_for_blob(&self, cid: Cid) -> Result<Vec<String>> { 116 use crate::schema::actor_store::record_blob::dsl as RecordBlobSchema; 117 118 let did = self.did.clone(); 119 let res = self 120 .db 121 .get() 122 .await? 123 .interact(move |conn| { 124 let results = RecordBlobSchema::record_blob 125 .filter(RecordBlobSchema::blobCid.eq(cid.to_string())) 126 .filter(RecordBlobSchema::did.eq(did)) 127 .select(models::RecordBlob::as_select()) 128 .get_results(conn)?; 129 Ok::<_, result::Error>(results.into_iter().map(|row| row.record_uri)) 130 }) 131 .await 132 .expect("Failed to get records for blob")? 133 .collect::<Vec<String>>(); 134 135 Ok(res) 136 } 137 138 /// Upload a blob and get its metadata 139 pub async fn upload_blob_and_get_metadata( 140 &self, 141 user_suggested_mime: String, 142 blob: Bytes, 143 ) -> Result<BlobMetadata> { 144 let bytes = blob; 145 let size = bytes.len() as i64; 146 147 let (temp_key, sha256, img_info, sniffed_mime) = try_join!( 148 self.blobstore.put_temp(bytes.clone()), 149 // TODO: reimpl funcs to use Bytes instead of Vec<u8> 150 sha256_stream(bytes.to_vec()), 151 image::maybe_get_info(bytes.to_vec()), 152 image::mime_type_from_bytes(bytes.to_vec()) 153 )?; 154 155 let cid = sha256_raw_to_cid(sha256); 156 let mime_type = sniffed_mime.unwrap_or(user_suggested_mime); 157 158 Ok(BlobMetadata { 159 temp_key, 160 size, 161 cid, 162 mime_type, 163 width: img_info.as_ref().map(|info| info.width as i32), 164 height: if let Some(info) = img_info { 165 Some(info.height as i32) 166 } else { 167 None 168 }, 169 }) 170 } 171 172 /// Track a blob that hasn't been associated with any records yet 173 pub async fn track_untethered_blob(&self, metadata: BlobMetadata) -> Result<BlobRef> { 174 use crate::schema::actor_store::blob::dsl as BlobSchema; 175 176 let did = self.did.clone(); 177 self.db.get().await?.interact(move |conn| { 178 let BlobMetadata { 179 temp_key, 180 size, 181 cid, 182 mime_type, 183 width, 184 height, 185 } = metadata; 186 let created_at = now(); 187 188 let found = BlobSchema::blob 189 .filter(BlobSchema::did.eq(&did)) 190 .filter(BlobSchema::cid.eq(&cid.to_string())) 191 .select(models::Blob::as_select()) 192 .first(conn) 193 .optional()?; 194 195 if let Some(found) = found { 196 if found.takedown_ref.is_some() { 197 bail!("Blob has been takendown, cannot re-upload") 198 } 199 } 200 201 let upsert = sql_query("INSERT INTO pds.blob (cid, did, \"mimeType\", size, \"tempKey\", width, height, \"createdAt\", \"takedownRef\") \ 202 VALUES \ 203 ($1, $2, $3, $4, $5, $6, $7, $8, $9) \ 204 ON CONFLICT (cid, did) DO UPDATE \ 205 SET \"tempKey\" = EXCLUDED.\"tempKey\" \ 206 WHERE pds.blob.\"tempKey\" is not null;"); 207 #[expect(trivial_casts)] 208 let _ = upsert 209 .bind::<Text, _>(&cid.to_string()) 210 .bind::<Text, _>(&did) 211 .bind::<Text, _>(&mime_type) 212 .bind::<Integer, _>(size as i32) 213 .bind::<Nullable<Text>, _>(Some(temp_key)) 214 .bind::<Nullable<Integer>, _>(width) 215 .bind::<Nullable<Integer>, _>(height) 216 .bind::<Text, _>(created_at) 217 .bind::<Nullable<Text>, _>(None as Option<String>) 218 .execute(conn)?; 219 220 Ok(BlobRef::new(cid, mime_type, size, None)) 221 }).await.expect("Failed to track untethered blob") 222 } 223 224 /// Process blobs associated with writes 225 pub async fn process_write_blobs(&self, writes: Vec<PreparedWrite>) -> Result<()> { 226 self.delete_dereferenced_blobs(writes.clone()).await?; 227 228 drop( 229 stream::iter(writes) 230 .then(async move |write| { 231 match write { 232 PreparedWrite::Create(w) => { 233 for blob in w.blobs { 234 self.verify_blob_and_make_permanent(blob.clone()).await?; 235 self.associate_blob(blob, w.uri.clone()).await?; 236 } 237 } 238 PreparedWrite::Update(w) => { 239 for blob in w.blobs { 240 self.verify_blob_and_make_permanent(blob.clone()).await?; 241 self.associate_blob(blob, w.uri.clone()).await?; 242 } 243 } 244 _ => (), 245 }; 246 Ok::<(), anyhow::Error>(()) 247 }) 248 .collect::<Vec<_>>() 249 .await 250 .into_iter() 251 .collect::<Result<Vec<_>, _>>()?, 252 ); 253 254 Ok(()) 255 } 256 257 /// Delete blobs that are no longer referenced by any records 258 pub async fn delete_dereferenced_blobs(&self, writes: Vec<PreparedWrite>) -> Result<()> { 259 use crate::schema::actor_store::blob::dsl as BlobSchema; 260 use crate::schema::actor_store::record_blob::dsl as RecordBlobSchema; 261 262 // Extract URIs 263 let uris: Vec<String> = writes 264 .iter() 265 .filter_map(|w| match w { 266 PreparedWrite::Delete(w) => Some(w.uri.clone()), 267 PreparedWrite::Update(w) => Some(w.uri.clone()), 268 _ => None, 269 }) 270 .collect(); 271 272 if uris.is_empty() { 273 return Ok(()); 274 } 275 276 // In SQLite, we can't do DELETE...RETURNING 277 // So we need to fetch the records first, then delete 278 let did = self.did.clone(); 279 let uris_clone = uris.clone(); 280 let deleted_repo_blobs: Vec<models::RecordBlob> = self 281 .db 282 .get() 283 .await? 284 .interact(move |conn| { 285 RecordBlobSchema::record_blob 286 .filter(RecordBlobSchema::recordUri.eq_any(&uris_clone)) 287 .filter(RecordBlobSchema::did.eq(&did)) 288 .load::<models::RecordBlob>(conn) 289 }) 290 .await 291 .expect("Failed to get deleted repo blobs")?; 292 293 if deleted_repo_blobs.is_empty() { 294 return Ok(()); 295 } 296 297 // Now perform the delete 298 let uris_clone = uris.clone(); 299 _ = self 300 .db 301 .get() 302 .await? 303 .interact(move |conn| { 304 delete(RecordBlobSchema::record_blob) 305 .filter(RecordBlobSchema::recordUri.eq_any(uris_clone)) 306 .execute(conn) 307 }) 308 .await 309 .expect("Failed to delete repo blobs")?; 310 311 // Extract blob cids from the deleted records 312 let deleted_repo_blob_cids: Vec<String> = deleted_repo_blobs 313 .into_iter() 314 .map(|row| row.blob_cid) 315 .collect(); 316 317 // Find duplicates (blobs referenced by other records) 318 let cids_clone = deleted_repo_blob_cids.clone(); 319 let did_clone = self.did.clone(); 320 let duplicated_cids: Vec<String> = self 321 .db 322 .get() 323 .await? 324 .interact(move |conn| { 325 RecordBlobSchema::record_blob 326 .filter(RecordBlobSchema::blobCid.eq_any(cids_clone)) 327 .filter(RecordBlobSchema::did.eq(did_clone)) 328 .select(RecordBlobSchema::blobCid) 329 .load::<String>(conn) 330 }) 331 .await 332 .expect("Failed to get duplicated cids")?; 333 334 // Extract new blob cids from writes (creates and updates) 335 let new_blob_cids: Vec<String> = writes 336 .iter() 337 .flat_map(|w| match w { 338 PreparedWrite::Create(w) => w.blobs.clone(), 339 PreparedWrite::Update(w) => w.blobs.clone(), 340 _ => Vec::new(), 341 }) 342 .map(|b| b.cid.to_string()) 343 .collect(); 344 345 // Determine which blobs to keep vs delete 346 let cids_to_keep: Vec<String> = [&new_blob_cids[..], &duplicated_cids[..]].concat(); 347 let cids_to_delete: Vec<String> = deleted_repo_blob_cids 348 .into_iter() 349 .filter(|cid| !cids_to_keep.contains(cid)) 350 .collect(); 351 352 if cids_to_delete.is_empty() { 353 return Ok(()); 354 } 355 356 // Delete from the blob table 357 let cids = cids_to_delete.clone(); 358 let did_clone = self.did.clone(); 359 _ = self 360 .db 361 .get() 362 .await? 363 .interact(move |conn| { 364 delete(BlobSchema::blob) 365 .filter(BlobSchema::cid.eq_any(cids)) 366 .filter(BlobSchema::did.eq(did_clone)) 367 .execute(conn) 368 }) 369 .await 370 .expect("Failed to delete blobs")?; 371 372 // Delete from blob storage 373 // Ideally we'd use a background queue here, but for now: 374 drop( 375 stream::iter(cids_to_delete) 376 .then(async move |cid| match Cid::from_str(&cid) { 377 Ok(cid) => self.blobstore.delete(cid.to_string()).await, 378 Err(e) => Err(anyhow::Error::new(e)), 379 }) 380 .collect::<Vec<_>>() 381 .await 382 .into_iter() 383 .collect::<Result<Vec<_>, _>>()?, 384 ); 385 386 Ok(()) 387 } 388 389 /// Verify a blob and make it permanent 390 pub async fn verify_blob_and_make_permanent(&self, blob: PreparedBlobRef) -> Result<()> { 391 use crate::schema::actor_store::blob::dsl as BlobSchema; 392 393 let found = self 394 .db 395 .get() 396 .await? 397 .interact(move |conn| { 398 BlobSchema::blob 399 .filter( 400 BlobSchema::cid 401 .eq(blob.cid.to_string()) 402 .and(BlobSchema::takedownRef.is_null()), 403 ) 404 .select(models::Blob::as_select()) 405 .first(conn) 406 .optional() 407 }) 408 .await 409 .expect("Failed to verify blob")?; 410 411 if let Some(found) = found { 412 verify_blob(&blob, &found).await?; 413 if let Some(ref temp_key) = found.temp_key { 414 self.blobstore 415 .make_permanent(temp_key.clone(), blob.cid) 416 .await?; 417 } 418 _ = self 419 .db 420 .get() 421 .await? 422 .interact(move |conn| { 423 update(BlobSchema::blob) 424 .filter(BlobSchema::tempKey.eq(found.temp_key)) 425 .set(BlobSchema::tempKey.eq::<Option<String>>(None)) 426 .execute(conn) 427 }) 428 .await 429 .expect("Failed to update blob")?; 430 Ok(()) 431 } else { 432 bail!("Could not find blob: {:?}", blob.cid.to_string()) 433 } 434 } 435 436 /// Associate a blob with a record 437 pub async fn associate_blob(&self, blob: PreparedBlobRef, record_uri: String) -> Result<()> { 438 use crate::schema::actor_store::record_blob::dsl as RecordBlobSchema; 439 440 let cid = blob.cid.to_string(); 441 let did = self.did.clone(); 442 443 _ = self 444 .db 445 .get() 446 .await? 447 .interact(move |conn| { 448 insert_into(RecordBlobSchema::record_blob) 449 .values(( 450 RecordBlobSchema::blobCid.eq(cid), 451 RecordBlobSchema::recordUri.eq(record_uri), 452 RecordBlobSchema::did.eq(&did), 453 )) 454 .on_conflict_do_nothing() 455 .execute(conn) 456 }) 457 .await 458 .expect("Failed to associate blob")?; 459 460 Ok(()) 461 } 462 463 /// Count all blobs for this actor 464 pub async fn blob_count(&self) -> Result<i64> { 465 use crate::schema::actor_store::blob::dsl as BlobSchema; 466 467 let did = self.did.clone(); 468 self.db 469 .get() 470 .await? 471 .interact(move |conn| { 472 let res = BlobSchema::blob 473 .filter(BlobSchema::did.eq(&did)) 474 .count() 475 .get_result(conn)?; 476 Ok(res) 477 }) 478 .await 479 .expect("Failed to count blobs") 480 } 481 482 /// Count blobs associated with records 483 pub async fn record_blob_count(&self) -> Result<i64> { 484 use crate::schema::actor_store::record_blob::dsl as RecordBlobSchema; 485 486 let did = self.did.clone(); 487 self.db 488 .get() 489 .await? 490 .interact(move |conn| { 491 let res: i64 = RecordBlobSchema::record_blob 492 .filter(RecordBlobSchema::did.eq(&did)) 493 .select(count_distinct(RecordBlobSchema::blobCid)) 494 .get_result(conn)?; 495 Ok(res) 496 }) 497 .await 498 .expect("Failed to count record blobs") 499 } 500 501 /// List blobs that are referenced but missing 502 pub async fn list_missing_blobs( 503 &self, 504 opts: ListMissingBlobsOpts, 505 ) -> Result<Vec<ListMissingBlobsRefRecordBlob>> { 506 use crate::schema::actor_store::blob::dsl as BlobSchema; 507 use crate::schema::actor_store::record_blob::dsl as RecordBlobSchema; 508 509 let did = self.did.clone(); 510 self.db 511 .get() 512 .await? 513 .interact(move |conn| { 514 let ListMissingBlobsOpts { cursor, limit } = opts; 515 516 if limit > 1000 { 517 bail!("Limit too high. Max: 1000."); 518 } 519 520 // TODO: Improve this query 521 522 // SQLite doesn't support DISTINCT ON, so we use GROUP BY instead 523 let query = RecordBlobSchema::record_blob 524 .filter(not(exists( 525 BlobSchema::blob 526 .filter(BlobSchema::cid.eq(RecordBlobSchema::blobCid)) 527 .filter(BlobSchema::did.eq(&did)), 528 ))) 529 .filter(RecordBlobSchema::did.eq(&did)) 530 .into_boxed(); 531 532 // Apply cursor filtering if provided 533 let query = if let Some(cursor) = cursor { 534 query.filter(RecordBlobSchema::blobCid.gt(cursor)) 535 } else { 536 query 537 }; 538 539 // For SQLite, use a simplified approach without GROUP BY to avoid recursion limit issues 540 let res = query 541 .select((RecordBlobSchema::blobCid, RecordBlobSchema::recordUri)) 542 .order(RecordBlobSchema::blobCid.asc()) 543 .limit(limit as i64) 544 .load::<(String, String)>(conn)?; 545 546 // Process results to get distinct cids with their first record URI 547 let mut result = Vec::new(); 548 let mut last_cid = None; 549 550 for (cid, uri) in res { 551 if last_cid.as_ref() != Some(&cid) { 552 result.push(ListMissingBlobsRefRecordBlob { 553 cid: cid.clone(), 554 record_uri: uri, 555 }); 556 last_cid = Some(cid); 557 } 558 } 559 560 Ok(result) 561 }) 562 .await 563 .expect("Failed to list missing blobs") 564 } 565 566 /// List all blobs with optional filtering 567 pub async fn list_blobs(&self, opts: ListBlobsOpts) -> Result<Vec<String>> { 568 use crate::schema::actor_store::record::dsl as RecordSchema; 569 use crate::schema::actor_store::record_blob::dsl as RecordBlobSchema; 570 571 let ListBlobsOpts { 572 since, 573 cursor, 574 limit, 575 } = opts; 576 577 let res: Vec<String> = if let Some(since) = since { 578 let mut builder = RecordBlobSchema::record_blob 579 .inner_join( 580 RecordSchema::record.on(RecordSchema::uri.eq(RecordBlobSchema::recordUri)), 581 ) 582 .filter(RecordSchema::repoRev.gt(since)) 583 .select(RecordBlobSchema::blobCid) 584 .distinct() 585 .order(RecordBlobSchema::blobCid.asc()) 586 .limit(limit as i64) 587 .into_boxed(); 588 589 if let Some(cursor) = cursor { 590 builder = builder.filter(RecordBlobSchema::blobCid.gt(cursor)); 591 } 592 self.db 593 .get() 594 .await? 595 .interact(move |conn| builder.load(conn)) 596 .await 597 .expect("Failed to list blobs")? 598 } else { 599 let mut builder = RecordBlobSchema::record_blob 600 .select(RecordBlobSchema::blobCid) 601 .distinct() 602 .order(RecordBlobSchema::blobCid.asc()) 603 .limit(limit as i64) 604 .into_boxed(); 605 606 if let Some(cursor) = cursor { 607 builder = builder.filter(RecordBlobSchema::blobCid.gt(cursor)); 608 } 609 self.db 610 .get() 611 .await? 612 .interact(move |conn| builder.load(conn)) 613 .await 614 .expect("Failed to list blobs")? 615 }; 616 617 Ok(res) 618 } 619 620 /// Get the takedown status of a blob 621 pub async fn get_blob_takedown_status(&self, cid: Cid) -> Result<Option<StatusAttr>> { 622 use crate::schema::actor_store::blob::dsl as BlobSchema; 623 624 self.db 625 .get() 626 .await? 627 .interact(move |conn| { 628 let res = BlobSchema::blob 629 .filter(BlobSchema::cid.eq(cid.to_string())) 630 .select(models::Blob::as_select()) 631 .first(conn) 632 .optional()?; 633 634 match res { 635 None => Ok(None), 636 Some(res) => res.takedown_ref.map_or_else( 637 || { 638 Ok(Some(StatusAttr { 639 applied: false, 640 r#ref: None, 641 })) 642 }, 643 |takedown_ref| { 644 Ok(Some(StatusAttr { 645 applied: true, 646 r#ref: Some(takedown_ref), 647 })) 648 }, 649 ), 650 } 651 }) 652 .await 653 .expect("Failed to get blob takedown status") 654 } 655 656 /// Update the takedown status of a blob 657 pub async fn update_blob_takedown_status(&self, blob: Cid, takedown: StatusAttr) -> Result<()> { 658 use crate::schema::actor_store::blob::dsl as BlobSchema; 659 660 let takedown_ref: Option<String> = match takedown.applied { 661 true => takedown.r#ref.map_or_else(|| Some(now()), Some), 662 false => None, 663 }; 664 665 let blob_cid = blob.to_string(); 666 let did_clone = self.did.clone(); 667 668 _ = self 669 .db 670 .get() 671 .await? 672 .interact(move |conn| { 673 _ = update(BlobSchema::blob) 674 .filter(BlobSchema::cid.eq(blob_cid)) 675 .filter(BlobSchema::did.eq(did_clone)) 676 .set(BlobSchema::takedownRef.eq(takedown_ref)) 677 .execute(conn)?; 678 Ok::<_, result::Error>(blob) 679 }) 680 .await 681 .expect("Failed to update blob takedown status")?; 682 683 let res = match takedown.applied { 684 true => self.blobstore.quarantine(blob).await, 685 false => self.blobstore.unquarantine(blob).await, 686 }; 687 688 match res { 689 Ok(_) => Ok(()), 690 Err(e) => match e.downcast_ref() { 691 Some(BlobError::BlobNotFoundError) => Ok(()), 692 None => Err(e), 693 }, 694 } 695 } 696} 697 698pub async fn verify_blob(blob: &PreparedBlobRef, found: &models::Blob) -> Result<()> { 699 if let Some(max_size) = blob.constraints.max_size { 700 if found.size as usize > max_size { 701 bail!( 702 "BlobTooLarge: This file is too large. It is {:?} but the maximum size is {:?}", 703 found.size, 704 max_size 705 ) 706 } 707 } 708 if blob.mime_type != found.mime_type { 709 bail!( 710 "InvalidMimeType: Referenced MimeType does not match stored blob. Expected: {:?}, Got: {:?}", 711 found.mime_type, 712 blob.mime_type 713 ) 714 } 715 if let Some(ref accept) = blob.constraints.accept { 716 if !accepted_mime(blob.mime_type.clone(), accept.clone()).await { 717 bail!( 718 "Wrong type of file. It is {:?} but it must match {:?}.", 719 blob.mime_type, 720 accept 721 ) 722 } 723 } 724 Ok(()) 725}