this repo has no description
1use crate::auth::{ServiceTokenVerifier, is_service_token};
2use crate::delegation::{self, DelegationActionType};
3use crate::state::AppState;
4use axum::body::Bytes;
5use axum::{
6 Json,
7 extract::{Query, State},
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use cid::Cid;
12use multihash::Multihash;
13use serde::{Deserialize, Serialize};
14use serde_json::json;
15use sha2::{Digest, Sha256};
16use tracing::{debug, error};
17
18const MAX_BLOB_SIZE: usize = 10_000_000_000;
19const MAX_VIDEO_BLOB_SIZE: usize = 10_000_000_000;
20
21pub async fn upload_blob(
22 State(state): State<AppState>,
23 headers: axum::http::HeaderMap,
24 body: Bytes,
25) -> Response {
26 let token = match crate::auth::extract_bearer_token_from_header(
27 headers.get("Authorization").and_then(|h| h.to_str().ok()),
28 ) {
29 Some(t) => t,
30 None => {
31 return (
32 StatusCode::UNAUTHORIZED,
33 Json(json!({"error": "AuthenticationRequired"})),
34 )
35 .into_response();
36 }
37 };
38
39 let is_service_auth = is_service_token(&token);
40
41 let (did, is_migration, controller_did) = if is_service_auth {
42 debug!("Verifying service token for blob upload");
43 let verifier = ServiceTokenVerifier::new();
44 match verifier
45 .verify_service_token(&token, Some("com.atproto.repo.uploadBlob"))
46 .await
47 {
48 Ok(claims) => {
49 debug!("Service token verified for DID: {}", claims.iss);
50 (claims.iss, false, None)
51 }
52 Err(e) => {
53 error!("Service token verification failed: {:?}", e);
54 return (
55 StatusCode::UNAUTHORIZED,
56 Json(json!({"error": "AuthenticationFailed", "message": format!("Service token verification failed: {}", e)})),
57 )
58 .into_response();
59 }
60 }
61 } else {
62 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
63 Ok(user) => {
64 let mime_type_for_check = headers
65 .get("content-type")
66 .and_then(|h| h.to_str().ok())
67 .unwrap_or("application/octet-stream");
68 if let Err(e) = crate::auth::scope_check::check_blob_scope(
69 user.is_oauth,
70 user.scope.as_deref(),
71 mime_type_for_check,
72 ) {
73 return e;
74 }
75 let deactivated = sqlx::query_scalar!(
76 "SELECT deactivated_at FROM users WHERE did = $1",
77 user.did
78 )
79 .fetch_optional(&state.db)
80 .await
81 .ok()
82 .flatten()
83 .flatten();
84 let ctrl_did = user.controller_did.clone();
85 (user.did, deactivated.is_some(), ctrl_did)
86 }
87 Err(_) => {
88 return (
89 StatusCode::UNAUTHORIZED,
90 Json(json!({"error": "AuthenticationFailed"})),
91 )
92 .into_response();
93 }
94 }
95 };
96
97 let max_size = if is_service_auth || is_migration {
98 MAX_VIDEO_BLOB_SIZE
99 } else {
100 MAX_BLOB_SIZE
101 };
102
103 if body.len() > max_size {
104 return (
105 StatusCode::PAYLOAD_TOO_LARGE,
106 Json(json!({"error": "BlobTooLarge", "message": format!("Blob size {} exceeds maximum of {} bytes", body.len(), max_size)})),
107 )
108 .into_response();
109 }
110 let mime_type = headers
111 .get("content-type")
112 .and_then(|h| h.to_str().ok())
113 .unwrap_or("application/octet-stream")
114 .to_string();
115 let size = body.len() as i64;
116 let data = body.to_vec();
117 let mut hasher = Sha256::new();
118 hasher.update(&data);
119 let hash = hasher.finalize();
120 let multihash = match Multihash::wrap(0x12, &hash) {
121 Ok(mh) => mh,
122 Err(e) => {
123 error!("Failed to create multihash for blob: {:?}", e);
124 return (
125 StatusCode::INTERNAL_SERVER_ERROR,
126 Json(json!({"error": "InternalError", "message": "Failed to hash blob"})),
127 )
128 .into_response();
129 }
130 };
131 let cid = Cid::new_v1(0x55, multihash);
132 let cid_str = cid.to_string();
133 let storage_key = format!("blobs/{}", cid_str);
134 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
135 .fetch_optional(&state.db)
136 .await;
137 let user_id = match user_query {
138 Ok(Some(row)) => row.id,
139 _ => {
140 return (
141 StatusCode::INTERNAL_SERVER_ERROR,
142 Json(json!({"error": "InternalError"})),
143 )
144 .into_response();
145 }
146 };
147 let mut tx = match state.db.begin().await {
148 Ok(tx) => tx,
149 Err(e) => {
150 error!("Failed to begin transaction: {:?}", e);
151 return (
152 StatusCode::INTERNAL_SERVER_ERROR,
153 Json(json!({"error": "InternalError"})),
154 )
155 .into_response();
156 }
157 };
158 let insert = sqlx::query!(
159 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING RETURNING cid",
160 cid_str,
161 mime_type,
162 size,
163 user_id,
164 storage_key
165 )
166 .fetch_optional(&mut *tx)
167 .await;
168 let was_inserted = match insert {
169 Ok(Some(_)) => true,
170 Ok(None) => false,
171 Err(e) => {
172 error!("Failed to insert blob record: {:?}", e);
173 return (
174 StatusCode::INTERNAL_SERVER_ERROR,
175 Json(json!({"error": "InternalError"})),
176 )
177 .into_response();
178 }
179 };
180 if was_inserted
181 && let Err(e) = state
182 .blob_store
183 .put_bytes(&storage_key, bytes::Bytes::from(data))
184 .await
185 {
186 error!("Failed to upload blob to storage: {:?}", e);
187 return (
188 StatusCode::INTERNAL_SERVER_ERROR,
189 Json(json!({"error": "InternalError", "message": "Failed to store blob"})),
190 )
191 .into_response();
192 }
193 if let Err(e) = tx.commit().await {
194 error!("Failed to commit blob transaction: {:?}", e);
195 if was_inserted && let Err(cleanup_err) = state.blob_store.delete(&storage_key).await {
196 error!(
197 "Failed to cleanup orphaned blob {}: {:?}",
198 storage_key, cleanup_err
199 );
200 }
201 return (
202 StatusCode::INTERNAL_SERVER_ERROR,
203 Json(json!({"error": "InternalError"})),
204 )
205 .into_response();
206 }
207
208 if let Some(ref controller) = controller_did {
209 let _ = delegation::log_delegation_action(
210 &state.db,
211 &did,
212 controller,
213 Some(controller),
214 DelegationActionType::BlobUpload,
215 Some(json!({
216 "cid": cid_str,
217 "mime_type": mime_type,
218 "size": size
219 })),
220 None,
221 None,
222 )
223 .await;
224 }
225
226 Json(json!({
227 "blob": {
228 "$type": "blob",
229 "ref": {
230 "$link": cid_str
231 },
232 "mimeType": mime_type,
233 "size": size
234 }
235 }))
236 .into_response()
237}
238
239#[derive(Deserialize)]
240pub struct ListMissingBlobsParams {
241 pub limit: Option<i64>,
242 pub cursor: Option<String>,
243}
244
245#[derive(Serialize)]
246#[serde(rename_all = "camelCase")]
247pub struct RecordBlob {
248 pub cid: String,
249 pub record_uri: String,
250}
251
252#[derive(Serialize)]
253pub struct ListMissingBlobsOutput {
254 #[serde(skip_serializing_if = "Option::is_none")]
255 pub cursor: Option<String>,
256 pub blobs: Vec<RecordBlob>,
257}
258
259pub async fn list_missing_blobs(
260 State(state): State<AppState>,
261 headers: axum::http::HeaderMap,
262 Query(params): Query<ListMissingBlobsParams>,
263) -> Response {
264 let token = match crate::auth::extract_bearer_token_from_header(
265 headers.get("Authorization").and_then(|h| h.to_str().ok()),
266 ) {
267 Some(t) => t,
268 None => {
269 return (
270 StatusCode::UNAUTHORIZED,
271 Json(json!({"error": "AuthenticationRequired"})),
272 )
273 .into_response();
274 }
275 };
276 let auth_user =
277 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
278 Ok(user) => user,
279 Err(_) => {
280 return (
281 StatusCode::UNAUTHORIZED,
282 Json(json!({"error": "AuthenticationFailed"})),
283 )
284 .into_response();
285 }
286 };
287 let did = auth_user.did;
288 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
289 .fetch_optional(&state.db)
290 .await;
291 let user_id = match user_query {
292 Ok(Some(row)) => row.id,
293 _ => {
294 return (
295 StatusCode::INTERNAL_SERVER_ERROR,
296 Json(json!({"error": "InternalError"})),
297 )
298 .into_response();
299 }
300 };
301 let limit = params.limit.unwrap_or(500).clamp(1, 1000);
302 let cursor_cid = params.cursor.as_deref().unwrap_or("");
303 let missing_query = sqlx::query!(
304 r#"
305 SELECT rb.blob_cid, rb.record_uri
306 FROM record_blobs rb
307 LEFT JOIN blobs b ON rb.blob_cid = b.cid AND b.created_by_user = rb.repo_id
308 WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2
309 ORDER BY rb.blob_cid
310 LIMIT $3
311 "#,
312 user_id,
313 cursor_cid,
314 limit + 1
315 )
316 .fetch_all(&state.db)
317 .await;
318 let rows = match missing_query {
319 Ok(r) => r,
320 Err(e) => {
321 error!("DB error fetching missing blobs: {:?}", e);
322 return (
323 StatusCode::INTERNAL_SERVER_ERROR,
324 Json(json!({"error": "InternalError"})),
325 )
326 .into_response();
327 }
328 };
329 let has_more = rows.len() > limit as usize;
330 let blobs: Vec<RecordBlob> = rows
331 .into_iter()
332 .take(limit as usize)
333 .map(|row| RecordBlob {
334 cid: row.blob_cid,
335 record_uri: row.record_uri,
336 })
337 .collect();
338 let next_cursor = if has_more {
339 blobs.last().map(|b| b.cid.clone())
340 } else {
341 None
342 };
343 (
344 StatusCode::OK,
345 Json(ListMissingBlobsOutput {
346 cursor: next_cursor,
347 blobs,
348 }),
349 )
350 .into_response()
351}