Alternative ATProto PDS implementation
1//! Blob operations for the actor store
2//! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/blob/mod.rs
3//! blacksky-algorithms/rsky is licensed under the Apache License 2.0
4//!
5//! Modified for SQLite backend
6
7use crate::models::actor_store as models;
8use anyhow::{Result, bail};
9use axum::body::Bytes;
10use cidv10::Cid;
11use diesel::dsl::{count_distinct, exists, not};
12use diesel::sql_types::{Integer, Nullable, Text};
13use diesel::*;
14use futures::{
15 stream::{self, StreamExt},
16 try_join,
17};
18use rsky_common::ipld::sha256_raw_to_cid;
19use rsky_common::now;
20use rsky_lexicon::blob_refs::BlobRef;
21use rsky_lexicon::com::atproto::admin::StatusAttr;
22use rsky_lexicon::com::atproto::repo::ListMissingBlobsRefRecordBlob;
23use rsky_pds::actor_store::blob::{
24 BlobMetadata, GetBlobMetadataOutput, ListBlobsOpts, ListMissingBlobsOpts, accepted_mime,
25 sha256_stream,
26};
27use rsky_pds::image;
28use rsky_repo::error::BlobError;
29use rsky_repo::types::{PreparedBlobRef, PreparedWrite};
30use std::str::FromStr as _;
31
32use super::blob_fs::{BlobStoreFs, ByteStream};
33
34pub struct GetBlobOutput {
35 pub size: i32,
36 pub mime_type: Option<String>,
37 pub stream: ByteStream,
38}
39
40/// Handles blob operations for an actor store
41pub struct BlobReader {
42 /// SQL-based blob storage
43 pub blobstore: BlobStoreFs,
44 /// DID of the actor
45 pub did: String,
46 /// Database connection
47 pub db: deadpool_diesel::Pool<
48 deadpool_diesel::Manager<SqliteConnection>,
49 deadpool_diesel::sqlite::Object,
50 >,
51}
52
53impl BlobReader {
54 /// Create a new blob reader
55 pub fn new(
56 blobstore: BlobStoreFs,
57 db: deadpool_diesel::Pool<
58 deadpool_diesel::Manager<SqliteConnection>,
59 deadpool_diesel::sqlite::Object,
60 >,
61 ) -> Self {
62 Self {
63 did: blobstore.did.clone(),
64 blobstore,
65 db,
66 }
67 }
68
69 /// Get metadata for a blob by CID
70 pub async fn get_blob_metadata(&self, cid: Cid) -> Result<GetBlobMetadataOutput> {
71 use crate::schema::actor_store::blob::dsl as BlobSchema;
72
73 let did = self.did.clone();
74 let found = self
75 .db
76 .get()
77 .await?
78 .interact(move |conn| {
79 BlobSchema::blob
80 .filter(BlobSchema::did.eq(did))
81 .filter(BlobSchema::cid.eq(cid.to_string()))
82 .filter(BlobSchema::takedownRef.is_null())
83 .select(models::Blob::as_select())
84 .first(conn)
85 .optional()
86 })
87 .await
88 .expect("Failed to get blob metadata")?;
89
90 match found {
91 None => bail!("Blob not found"),
92 Some(found) => Ok(GetBlobMetadataOutput {
93 size: found.size,
94 mime_type: Some(found.mime_type),
95 }),
96 }
97 }
98
99 /// Get a blob by CID with metadata and content
100 pub async fn get_blob(&self, cid: Cid) -> Result<GetBlobOutput> {
101 let metadata = self.get_blob_metadata(cid).await?;
102 let blob_stream = match self.blobstore.get_stream(cid).await {
103 Ok(stream) => stream,
104 Err(e) => bail!("Failed to get blob: {}", e),
105 };
106
107 Ok(GetBlobOutput {
108 size: metadata.size,
109 mime_type: metadata.mime_type,
110 stream: blob_stream,
111 })
112 }
113
114 /// Get all records that reference a specific blob
115 pub async fn get_records_for_blob(&self, cid: Cid) -> Result<Vec<String>> {
116 use crate::schema::actor_store::record_blob::dsl as RecordBlobSchema;
117
118 let did = self.did.clone();
119 let res = self
120 .db
121 .get()
122 .await?
123 .interact(move |conn| {
124 let results = RecordBlobSchema::record_blob
125 .filter(RecordBlobSchema::blobCid.eq(cid.to_string()))
126 .filter(RecordBlobSchema::did.eq(did))
127 .select(models::RecordBlob::as_select())
128 .get_results(conn)?;
129 Ok::<_, result::Error>(results.into_iter().map(|row| row.record_uri))
130 })
131 .await
132 .expect("Failed to get records for blob")?
133 .collect::<Vec<String>>();
134
135 Ok(res)
136 }
137
138 /// Upload a blob and get its metadata
139 pub async fn upload_blob_and_get_metadata(
140 &self,
141 user_suggested_mime: String,
142 blob: Bytes,
143 ) -> Result<BlobMetadata> {
144 let bytes = blob;
145 let size = bytes.len() as i64;
146
147 let (temp_key, sha256, img_info, sniffed_mime) = try_join!(
148 self.blobstore.put_temp(bytes.clone()),
149 // TODO: reimpl funcs to use Bytes instead of Vec<u8>
150 sha256_stream(bytes.to_vec()),
151 image::maybe_get_info(bytes.to_vec()),
152 image::mime_type_from_bytes(bytes.to_vec())
153 )?;
154
155 let cid = sha256_raw_to_cid(sha256);
156 let mime_type = sniffed_mime.unwrap_or(user_suggested_mime);
157
158 Ok(BlobMetadata {
159 temp_key,
160 size,
161 cid,
162 mime_type,
163 width: img_info.as_ref().map(|info| info.width as i32),
164 height: if let Some(info) = img_info {
165 Some(info.height as i32)
166 } else {
167 None
168 },
169 })
170 }
171
172 /// Track a blob that hasn't been associated with any records yet
173 pub async fn track_untethered_blob(&self, metadata: BlobMetadata) -> Result<BlobRef> {
174 use crate::schema::actor_store::blob::dsl as BlobSchema;
175
176 let did = self.did.clone();
177 self.db.get().await?.interact(move |conn| {
178 let BlobMetadata {
179 temp_key,
180 size,
181 cid,
182 mime_type,
183 width,
184 height,
185 } = metadata;
186 let created_at = now();
187
188 let found = BlobSchema::blob
189 .filter(BlobSchema::did.eq(&did))
190 .filter(BlobSchema::cid.eq(&cid.to_string()))
191 .select(models::Blob::as_select())
192 .first(conn)
193 .optional()?;
194
195 if let Some(found) = found {
196 if found.takedown_ref.is_some() {
197 bail!("Blob has been takendown, cannot re-upload")
198 }
199 }
200
201 let upsert = sql_query("INSERT INTO pds.blob (cid, did, \"mimeType\", size, \"tempKey\", width, height, \"createdAt\", \"takedownRef\") \
202 VALUES \
203 ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
204 ON CONFLICT (cid, did) DO UPDATE \
205 SET \"tempKey\" = EXCLUDED.\"tempKey\" \
206 WHERE pds.blob.\"tempKey\" is not null;");
207 #[expect(trivial_casts)]
208 let _ = upsert
209 .bind::<Text, _>(&cid.to_string())
210 .bind::<Text, _>(&did)
211 .bind::<Text, _>(&mime_type)
212 .bind::<Integer, _>(size as i32)
213 .bind::<Nullable<Text>, _>(Some(temp_key))
214 .bind::<Nullable<Integer>, _>(width)
215 .bind::<Nullable<Integer>, _>(height)
216 .bind::<Text, _>(created_at)
217 .bind::<Nullable<Text>, _>(None as Option<String>)
218 .execute(conn)?;
219
220 Ok(BlobRef::new(cid, mime_type, size, None))
221 }).await.expect("Failed to track untethered blob")
222 }
223
224 /// Process blobs associated with writes
225 pub async fn process_write_blobs(&self, writes: Vec<PreparedWrite>) -> Result<()> {
226 self.delete_dereferenced_blobs(writes.clone()).await?;
227
228 drop(
229 stream::iter(writes)
230 .then(async move |write| {
231 match write {
232 PreparedWrite::Create(w) => {
233 for blob in w.blobs {
234 self.verify_blob_and_make_permanent(blob.clone()).await?;
235 self.associate_blob(blob, w.uri.clone()).await?;
236 }
237 }
238 PreparedWrite::Update(w) => {
239 for blob in w.blobs {
240 self.verify_blob_and_make_permanent(blob.clone()).await?;
241 self.associate_blob(blob, w.uri.clone()).await?;
242 }
243 }
244 _ => (),
245 };
246 Ok::<(), anyhow::Error>(())
247 })
248 .collect::<Vec<_>>()
249 .await
250 .into_iter()
251 .collect::<Result<Vec<_>, _>>()?,
252 );
253
254 Ok(())
255 }
256
257 /// Delete blobs that are no longer referenced by any records
258 pub async fn delete_dereferenced_blobs(&self, writes: Vec<PreparedWrite>) -> Result<()> {
259 use crate::schema::actor_store::blob::dsl as BlobSchema;
260 use crate::schema::actor_store::record_blob::dsl as RecordBlobSchema;
261
262 // Extract URIs
263 let uris: Vec<String> = writes
264 .iter()
265 .filter_map(|w| match w {
266 PreparedWrite::Delete(w) => Some(w.uri.clone()),
267 PreparedWrite::Update(w) => Some(w.uri.clone()),
268 _ => None,
269 })
270 .collect();
271
272 if uris.is_empty() {
273 return Ok(());
274 }
275
276 // In SQLite, we can't do DELETE...RETURNING
277 // So we need to fetch the records first, then delete
278 let did = self.did.clone();
279 let uris_clone = uris.clone();
280 let deleted_repo_blobs: Vec<models::RecordBlob> = self
281 .db
282 .get()
283 .await?
284 .interact(move |conn| {
285 RecordBlobSchema::record_blob
286 .filter(RecordBlobSchema::recordUri.eq_any(&uris_clone))
287 .filter(RecordBlobSchema::did.eq(&did))
288 .load::<models::RecordBlob>(conn)
289 })
290 .await
291 .expect("Failed to get deleted repo blobs")?;
292
293 if deleted_repo_blobs.is_empty() {
294 return Ok(());
295 }
296
297 // Now perform the delete
298 let uris_clone = uris.clone();
299 _ = self
300 .db
301 .get()
302 .await?
303 .interact(move |conn| {
304 delete(RecordBlobSchema::record_blob)
305 .filter(RecordBlobSchema::recordUri.eq_any(uris_clone))
306 .execute(conn)
307 })
308 .await
309 .expect("Failed to delete repo blobs")?;
310
311 // Extract blob cids from the deleted records
312 let deleted_repo_blob_cids: Vec<String> = deleted_repo_blobs
313 .into_iter()
314 .map(|row| row.blob_cid)
315 .collect();
316
317 // Find duplicates (blobs referenced by other records)
318 let cids_clone = deleted_repo_blob_cids.clone();
319 let did_clone = self.did.clone();
320 let duplicated_cids: Vec<String> = self
321 .db
322 .get()
323 .await?
324 .interact(move |conn| {
325 RecordBlobSchema::record_blob
326 .filter(RecordBlobSchema::blobCid.eq_any(cids_clone))
327 .filter(RecordBlobSchema::did.eq(did_clone))
328 .select(RecordBlobSchema::blobCid)
329 .load::<String>(conn)
330 })
331 .await
332 .expect("Failed to get duplicated cids")?;
333
334 // Extract new blob cids from writes (creates and updates)
335 let new_blob_cids: Vec<String> = writes
336 .iter()
337 .flat_map(|w| match w {
338 PreparedWrite::Create(w) => w.blobs.clone(),
339 PreparedWrite::Update(w) => w.blobs.clone(),
340 _ => Vec::new(),
341 })
342 .map(|b| b.cid.to_string())
343 .collect();
344
345 // Determine which blobs to keep vs delete
346 let cids_to_keep: Vec<String> = [&new_blob_cids[..], &duplicated_cids[..]].concat();
347 let cids_to_delete: Vec<String> = deleted_repo_blob_cids
348 .into_iter()
349 .filter(|cid| !cids_to_keep.contains(cid))
350 .collect();
351
352 if cids_to_delete.is_empty() {
353 return Ok(());
354 }
355
356 // Delete from the blob table
357 let cids = cids_to_delete.clone();
358 let did_clone = self.did.clone();
359 _ = self
360 .db
361 .get()
362 .await?
363 .interact(move |conn| {
364 delete(BlobSchema::blob)
365 .filter(BlobSchema::cid.eq_any(cids))
366 .filter(BlobSchema::did.eq(did_clone))
367 .execute(conn)
368 })
369 .await
370 .expect("Failed to delete blobs")?;
371
372 // Delete from blob storage
373 // Ideally we'd use a background queue here, but for now:
374 drop(
375 stream::iter(cids_to_delete)
376 .then(async move |cid| match Cid::from_str(&cid) {
377 Ok(cid) => self.blobstore.delete(cid.to_string()).await,
378 Err(e) => Err(anyhow::Error::new(e)),
379 })
380 .collect::<Vec<_>>()
381 .await
382 .into_iter()
383 .collect::<Result<Vec<_>, _>>()?,
384 );
385
386 Ok(())
387 }
388
389 /// Verify a blob and make it permanent
390 pub async fn verify_blob_and_make_permanent(&self, blob: PreparedBlobRef) -> Result<()> {
391 use crate::schema::actor_store::blob::dsl as BlobSchema;
392
393 let found = self
394 .db
395 .get()
396 .await?
397 .interact(move |conn| {
398 BlobSchema::blob
399 .filter(
400 BlobSchema::cid
401 .eq(blob.cid.to_string())
402 .and(BlobSchema::takedownRef.is_null()),
403 )
404 .select(models::Blob::as_select())
405 .first(conn)
406 .optional()
407 })
408 .await
409 .expect("Failed to verify blob")?;
410
411 if let Some(found) = found {
412 verify_blob(&blob, &found).await?;
413 if let Some(ref temp_key) = found.temp_key {
414 self.blobstore
415 .make_permanent(temp_key.clone(), blob.cid)
416 .await?;
417 }
418 _ = self
419 .db
420 .get()
421 .await?
422 .interact(move |conn| {
423 update(BlobSchema::blob)
424 .filter(BlobSchema::tempKey.eq(found.temp_key))
425 .set(BlobSchema::tempKey.eq::<Option<String>>(None))
426 .execute(conn)
427 })
428 .await
429 .expect("Failed to update blob")?;
430 Ok(())
431 } else {
432 bail!("Could not find blob: {:?}", blob.cid.to_string())
433 }
434 }
435
436 /// Associate a blob with a record
437 pub async fn associate_blob(&self, blob: PreparedBlobRef, record_uri: String) -> Result<()> {
438 use crate::schema::actor_store::record_blob::dsl as RecordBlobSchema;
439
440 let cid = blob.cid.to_string();
441 let did = self.did.clone();
442
443 _ = self
444 .db
445 .get()
446 .await?
447 .interact(move |conn| {
448 insert_into(RecordBlobSchema::record_blob)
449 .values((
450 RecordBlobSchema::blobCid.eq(cid),
451 RecordBlobSchema::recordUri.eq(record_uri),
452 RecordBlobSchema::did.eq(&did),
453 ))
454 .on_conflict_do_nothing()
455 .execute(conn)
456 })
457 .await
458 .expect("Failed to associate blob")?;
459
460 Ok(())
461 }
462
463 /// Count all blobs for this actor
464 pub async fn blob_count(&self) -> Result<i64> {
465 use crate::schema::actor_store::blob::dsl as BlobSchema;
466
467 let did = self.did.clone();
468 self.db
469 .get()
470 .await?
471 .interact(move |conn| {
472 let res = BlobSchema::blob
473 .filter(BlobSchema::did.eq(&did))
474 .count()
475 .get_result(conn)?;
476 Ok(res)
477 })
478 .await
479 .expect("Failed to count blobs")
480 }
481
482 /// Count blobs associated with records
483 pub async fn record_blob_count(&self) -> Result<i64> {
484 use crate::schema::actor_store::record_blob::dsl as RecordBlobSchema;
485
486 let did = self.did.clone();
487 self.db
488 .get()
489 .await?
490 .interact(move |conn| {
491 let res: i64 = RecordBlobSchema::record_blob
492 .filter(RecordBlobSchema::did.eq(&did))
493 .select(count_distinct(RecordBlobSchema::blobCid))
494 .get_result(conn)?;
495 Ok(res)
496 })
497 .await
498 .expect("Failed to count record blobs")
499 }
500
501 /// List blobs that are referenced but missing
502 pub async fn list_missing_blobs(
503 &self,
504 opts: ListMissingBlobsOpts,
505 ) -> Result<Vec<ListMissingBlobsRefRecordBlob>> {
506 use crate::schema::actor_store::blob::dsl as BlobSchema;
507 use crate::schema::actor_store::record_blob::dsl as RecordBlobSchema;
508
509 let did = self.did.clone();
510 self.db
511 .get()
512 .await?
513 .interact(move |conn| {
514 let ListMissingBlobsOpts { cursor, limit } = opts;
515
516 if limit > 1000 {
517 bail!("Limit too high. Max: 1000.");
518 }
519
520 // TODO: Improve this query
521
522 // SQLite doesn't support DISTINCT ON, so we use GROUP BY instead
523 let query = RecordBlobSchema::record_blob
524 .filter(not(exists(
525 BlobSchema::blob
526 .filter(BlobSchema::cid.eq(RecordBlobSchema::blobCid))
527 .filter(BlobSchema::did.eq(&did)),
528 )))
529 .filter(RecordBlobSchema::did.eq(&did))
530 .into_boxed();
531
532 // Apply cursor filtering if provided
533 let query = if let Some(cursor) = cursor {
534 query.filter(RecordBlobSchema::blobCid.gt(cursor))
535 } else {
536 query
537 };
538
539 // For SQLite, use a simplified approach without GROUP BY to avoid recursion limit issues
540 let res = query
541 .select((RecordBlobSchema::blobCid, RecordBlobSchema::recordUri))
542 .order(RecordBlobSchema::blobCid.asc())
543 .limit(limit as i64)
544 .load::<(String, String)>(conn)?;
545
546 // Process results to get distinct cids with their first record URI
547 let mut result = Vec::new();
548 let mut last_cid = None;
549
550 for (cid, uri) in res {
551 if last_cid.as_ref() != Some(&cid) {
552 result.push(ListMissingBlobsRefRecordBlob {
553 cid: cid.clone(),
554 record_uri: uri,
555 });
556 last_cid = Some(cid);
557 }
558 }
559
560 Ok(result)
561 })
562 .await
563 .expect("Failed to list missing blobs")
564 }
565
566 /// List all blobs with optional filtering
567 pub async fn list_blobs(&self, opts: ListBlobsOpts) -> Result<Vec<String>> {
568 use crate::schema::actor_store::record::dsl as RecordSchema;
569 use crate::schema::actor_store::record_blob::dsl as RecordBlobSchema;
570
571 let ListBlobsOpts {
572 since,
573 cursor,
574 limit,
575 } = opts;
576
577 let res: Vec<String> = if let Some(since) = since {
578 let mut builder = RecordBlobSchema::record_blob
579 .inner_join(
580 RecordSchema::record.on(RecordSchema::uri.eq(RecordBlobSchema::recordUri)),
581 )
582 .filter(RecordSchema::repoRev.gt(since))
583 .select(RecordBlobSchema::blobCid)
584 .distinct()
585 .order(RecordBlobSchema::blobCid.asc())
586 .limit(limit as i64)
587 .into_boxed();
588
589 if let Some(cursor) = cursor {
590 builder = builder.filter(RecordBlobSchema::blobCid.gt(cursor));
591 }
592 self.db
593 .get()
594 .await?
595 .interact(move |conn| builder.load(conn))
596 .await
597 .expect("Failed to list blobs")?
598 } else {
599 let mut builder = RecordBlobSchema::record_blob
600 .select(RecordBlobSchema::blobCid)
601 .distinct()
602 .order(RecordBlobSchema::blobCid.asc())
603 .limit(limit as i64)
604 .into_boxed();
605
606 if let Some(cursor) = cursor {
607 builder = builder.filter(RecordBlobSchema::blobCid.gt(cursor));
608 }
609 self.db
610 .get()
611 .await?
612 .interact(move |conn| builder.load(conn))
613 .await
614 .expect("Failed to list blobs")?
615 };
616
617 Ok(res)
618 }
619
620 /// Get the takedown status of a blob
621 pub async fn get_blob_takedown_status(&self, cid: Cid) -> Result<Option<StatusAttr>> {
622 use crate::schema::actor_store::blob::dsl as BlobSchema;
623
624 self.db
625 .get()
626 .await?
627 .interact(move |conn| {
628 let res = BlobSchema::blob
629 .filter(BlobSchema::cid.eq(cid.to_string()))
630 .select(models::Blob::as_select())
631 .first(conn)
632 .optional()?;
633
634 match res {
635 None => Ok(None),
636 Some(res) => res.takedown_ref.map_or_else(
637 || {
638 Ok(Some(StatusAttr {
639 applied: false,
640 r#ref: None,
641 }))
642 },
643 |takedown_ref| {
644 Ok(Some(StatusAttr {
645 applied: true,
646 r#ref: Some(takedown_ref),
647 }))
648 },
649 ),
650 }
651 })
652 .await
653 .expect("Failed to get blob takedown status")
654 }
655
656 /// Update the takedown status of a blob
657 pub async fn update_blob_takedown_status(&self, blob: Cid, takedown: StatusAttr) -> Result<()> {
658 use crate::schema::actor_store::blob::dsl as BlobSchema;
659
660 let takedown_ref: Option<String> = match takedown.applied {
661 true => takedown.r#ref.map_or_else(|| Some(now()), Some),
662 false => None,
663 };
664
665 let blob_cid = blob.to_string();
666 let did_clone = self.did.clone();
667
668 _ = self
669 .db
670 .get()
671 .await?
672 .interact(move |conn| {
673 _ = update(BlobSchema::blob)
674 .filter(BlobSchema::cid.eq(blob_cid))
675 .filter(BlobSchema::did.eq(did_clone))
676 .set(BlobSchema::takedownRef.eq(takedown_ref))
677 .execute(conn)?;
678 Ok::<_, result::Error>(blob)
679 })
680 .await
681 .expect("Failed to update blob takedown status")?;
682
683 let res = match takedown.applied {
684 true => self.blobstore.quarantine(blob).await,
685 false => self.blobstore.unquarantine(blob).await,
686 };
687
688 match res {
689 Ok(_) => Ok(()),
690 Err(e) => match e.downcast_ref() {
691 Some(BlobError::BlobNotFoundError) => Ok(()),
692 None => Err(e),
693 },
694 }
695 }
696}
697
698pub async fn verify_blob(blob: &PreparedBlobRef, found: &models::Blob) -> Result<()> {
699 if let Some(max_size) = blob.constraints.max_size {
700 if found.size as usize > max_size {
701 bail!(
702 "BlobTooLarge: This file is too large. It is {:?} but the maximum size is {:?}",
703 found.size,
704 max_size
705 )
706 }
707 }
708 if blob.mime_type != found.mime_type {
709 bail!(
710 "InvalidMimeType: Referenced MimeType does not match stored blob. Expected: {:?}, Got: {:?}",
711 found.mime_type,
712 blob.mime_type
713 )
714 }
715 if let Some(ref accept) = blob.constraints.accept {
716 if !accepted_mime(blob.mime_type.clone(), accept.clone()).await {
717 bail!(
718 "Wrong type of file. It is {:?} but it must match {:?}.",
719 blob.mime_type,
720 accept
721 )
722 }
723 }
724 Ok(())
725}