this repo has no description

More local methods to not proxy

lewis 6404b1dc 387cc447

Changed files
+187 -15
frontend
src
api
storage
+21
Cargo.lock
··· 1110 ] 1111 1112 [[package]] 1113 name = "cfg-if" 1114 version = "1.0.4" 1115 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3003 checksum = "79cf5c93f93228cf8efb3ba362535fb11199ac548a09ce117c9b1adc3030d706" 3004 dependencies = [ 3005 "rustversion", 3006 ] 3007 3008 [[package]] ··· 6323 "hmac", 6324 "http 1.4.0", 6325 "image", 6326 "ipld-core", 6327 "iroh-car", 6328 "jacquard",
··· 1110 ] 1111 1112 [[package]] 1113 + name = "cfb" 1114 + version = "0.7.3" 1115 + source = "registry+https://github.com/rust-lang/crates.io-index" 1116 + checksum = "d38f2da7a0a2c4ccf0065be06397cc26a81f4e528be095826eee9d4adbb8c60f" 1117 + dependencies = [ 1118 + "byteorder", 1119 + "fnv", 1120 + "uuid", 1121 + ] 1122 + 1123 + [[package]] 1124 name = "cfg-if" 1125 version = "1.0.4" 1126 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3014 checksum = "79cf5c93f93228cf8efb3ba362535fb11199ac548a09ce117c9b1adc3030d706" 3015 dependencies = [ 3016 "rustversion", 3017 + ] 3018 + 3019 + [[package]] 3020 + name = "infer" 3021 + version = "0.19.0" 3022 + source = "registry+https://github.com/rust-lang/crates.io-index" 3023 + checksum = "a588916bfdfd92e71cacef98a63d9b1f0d74d6599980d11894290e7ddefffcf7" 3024 + dependencies = [ 3025 + "cfb", 3026 ] 3027 3028 [[package]] ··· 6343 "hmac", 6344 "http 1.4.0", 6345 "image", 6346 + "infer", 6347 "ipld-core", 6348 "iroh-car", 6349 "jacquard",
+1
Cargo.toml
··· 22 hex = "0.4" 23 hkdf = "0.12" 24 hmac = "0.12" 25 aes-gcm = "0.10" 26 jacquard = { version = "0.9.5", default-features = false, features = ["api", "api_bluesky", "api_full", "derive", "dns"] } 27 jacquard-axum = "0.9.6"
··· 22 hex = "0.4" 23 hkdf = "0.12" 24 hmac = "0.12" 25 + infer = "0.19" 26 aes-gcm = "0.10" 27 jacquard = { version = "0.9.5", default-features = false, features = ["api", "api_bluesky", "api_full", "derive", "dns"] } 28 jacquard-axum = "0.9.6"
+22
frontend/src/lib/migration/atproto-client.ts
··· 227 }); 228 } 229 230 async uploadBlob( 231 data: Uint8Array, 232 mimeType: string,
··· 227 }); 228 } 229 230 + async getBlobWithContentType( 231 + did: string, 232 + cid: string, 233 + ): Promise<{ data: Uint8Array; contentType: string }> { 234 + const url = `${this.baseUrl}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(cid)}`; 235 + const headers: Record<string, string> = {}; 236 + if (this.accessToken) { 237 + headers["Authorization"] = `Bearer ${this.accessToken}`; 238 + } 239 + const res = await fetch(url, { headers }); 240 + if (!res.ok) { 241 + const err = await res.json().catch(() => ({ 242 + error: "Unknown", 243 + message: res.statusText, 244 + })); 245 + throw new Error(err.message || err.error || res.statusText); 246 + } 247 + const contentType = res.headers.get("content-type") || "application/octet-stream"; 248 + const data = new Uint8Array(await res.arrayBuffer()); 249 + return { data, contentType }; 250 + } 251 + 252 async uploadBlob( 253 data: Uint8Array, 254 mimeType: string,
+5 -3
frontend/src/lib/migration/blob-migration.ts
··· 87 }); 88 89 console.log("[blob-migration] Fetching blob", cid, "from source"); 90 - const blobData = await sourceClient.getBlob(userDid, cid); 91 console.log( 92 "[blob-migration] Got blob", 93 cid, 94 "size:", 95 blobData.byteLength, 96 ); 97 - await localClient.uploadBlob(blobData, "application/octet-stream"); 98 - console.log("[blob-migration] Uploaded blob", cid); 99 migrated++; 100 onProgress({ blobsMigrated: migrated }); 101 } catch (e) {
··· 87 }); 88 89 console.log("[blob-migration] Fetching blob", cid, "from source"); 90 + const { data: blobData, contentType } = await sourceClient.getBlobWithContentType(userDid, cid); 91 console.log( 92 "[blob-migration] Got blob", 93 cid, 94 "size:", 95 blobData.byteLength, 96 + "contentType:", 97 + contentType, 98 ); 99 + await localClient.uploadBlob(blobData, contentType); 100 + console.log("[blob-migration] Uploaded blob", cid, "with contentType:", contentType); 101 migrated++; 102 onProgress({ blobsMigrated: migrated }); 103 } catch (e) {
+1 -1
src/api/error.rs
··· 480 Self::AuthenticationFailed(None) 481 } 482 crate::auth::extractor::AuthError::TokenExpired => { 483 - Self::AuthenticationFailed(Some("Token has expired".to_string())) 484 } 485 crate::auth::extractor::AuthError::AccountDeactivated => Self::AccountDeactivated, 486 crate::auth::extractor::AuthError::AccountTakedown => Self::AccountTakedown,
··· 480 Self::AuthenticationFailed(None) 481 } 482 crate::auth::extractor::AuthError::TokenExpired => { 483 + Self::ExpiredToken(Some("Token has expired".to_string())) 484 } 485 crate::auth::extractor::AuthError::AccountDeactivated => Self::AccountDeactivated, 486 crate::auth::extractor::AuthError::AccountTakedown => Self::AccountTakedown,
+76 -7
src/api/proxy.rs
··· 15 use tracing::{error, info, warn}; 16 17 const PROTECTED_METHODS: &[&str] = &[ 18 "com.atproto.admin.sendEmail", 19 "com.atproto.identity.requestPlcOperationSignature", 20 "com.atproto.identity.signPlcOperation", 21 "com.atproto.identity.updateHandle", 22 "com.atproto.server.activateAccount", 23 "com.atproto.server.confirmEmail", 24 "com.atproto.server.createAppPassword", 25 "com.atproto.server.deactivateAccount", 26 "com.atproto.server.getAccountInviteCodes", 27 "com.atproto.server.getSession", 28 "com.atproto.server.listAppPasswords", 29 "com.atproto.server.requestAccountDelete", 30 "com.atproto.server.requestEmailConfirmation", 31 "com.atproto.server.requestEmailUpdate", 32 "com.atproto.server.revokeAppPassword", 33 "com.atproto.server.updateEmail", 34 ]; 35 36 fn is_protected_method(method: &str) -> bool { ··· 89 .headers() 90 .contains_key(http::HeaderName::from(jacquard::xrpc::Header::AtprotoProxy)) 91 { 92 - // If the age assurance override is set and this is an age assurance call then we dont want to proxy even if the client requests it. 93 - if !std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_err() 94 - && (req.uri().path().ends_with("app.bsky.ageassurance.getState") 95 - || req 96 - .uri() 97 - .path() 98 - .ends_with("app.bsky.unspecced.getAgeAssuranceState")) 99 { 100 return Either::Right(self.inner.call(req)); 101 }
··· 15 use tracing::{error, info, warn}; 16 17 const PROTECTED_METHODS: &[&str] = &[ 18 + "app.bsky.actor.getPreferences", 19 + "app.bsky.actor.putPreferences", 20 + "com.atproto.admin.deleteAccount", 21 + "com.atproto.admin.disableAccountInvites", 22 + "com.atproto.admin.disableInviteCodes", 23 + "com.atproto.admin.enableAccountInvites", 24 + "com.atproto.admin.getAccountInfo", 25 + "com.atproto.admin.getAccountInfos", 26 + "com.atproto.admin.getInviteCodes", 27 + "com.atproto.admin.getSubjectStatus", 28 + "com.atproto.admin.searchAccounts", 29 "com.atproto.admin.sendEmail", 30 + "com.atproto.admin.updateAccountEmail", 31 + "com.atproto.admin.updateAccountHandle", 32 + "com.atproto.admin.updateAccountPassword", 33 + "com.atproto.admin.updateSubjectStatus", 34 + "com.atproto.identity.getRecommendedDidCredentials", 35 "com.atproto.identity.requestPlcOperationSignature", 36 "com.atproto.identity.signPlcOperation", 37 + "com.atproto.identity.submitPlcOperation", 38 "com.atproto.identity.updateHandle", 39 + "com.atproto.repo.applyWrites", 40 + "com.atproto.repo.createRecord", 41 + "com.atproto.repo.deleteRecord", 42 + "com.atproto.repo.importRepo", 43 + "com.atproto.repo.putRecord", 44 + "com.atproto.repo.uploadBlob", 45 "com.atproto.server.activateAccount", 46 + "com.atproto.server.checkAccountStatus", 47 "com.atproto.server.confirmEmail", 48 + "com.atproto.server.confirmSignup", 49 + "com.atproto.server.createAccount", 50 "com.atproto.server.createAppPassword", 51 + "com.atproto.server.createInviteCode", 52 + "com.atproto.server.createInviteCodes", 53 + "com.atproto.server.createSession", 54 + "com.atproto.server.createTotpSecret", 55 "com.atproto.server.deactivateAccount", 56 + "com.atproto.server.deleteAccount", 57 + "com.atproto.server.deletePasskey", 58 + "com.atproto.server.deleteSession", 59 + "com.atproto.server.describeServer", 60 + "com.atproto.server.disableTotp", 61 + "com.atproto.server.enableTotp", 62 + "com.atproto.server.finishPasskeyRegistration", 63 "com.atproto.server.getAccountInviteCodes", 64 + "com.atproto.server.getServiceAuth", 65 "com.atproto.server.getSession", 66 + "com.atproto.server.getTotpStatus", 67 "com.atproto.server.listAppPasswords", 68 + "com.atproto.server.listPasskeys", 69 + "com.atproto.server.refreshSession", 70 + "com.atproto.server.regenerateBackupCodes", 71 "com.atproto.server.requestAccountDelete", 72 "com.atproto.server.requestEmailConfirmation", 73 "com.atproto.server.requestEmailUpdate", 74 + "com.atproto.server.requestPasswordReset", 75 + "com.atproto.server.resendMigrationVerification", 76 + "com.atproto.server.resendVerification", 77 + "com.atproto.server.reserveSigningKey", 78 + "com.atproto.server.resetPassword", 79 "com.atproto.server.revokeAppPassword", 80 + "com.atproto.server.startPasskeyRegistration", 81 "com.atproto.server.updateEmail", 82 + "com.atproto.server.updatePasskey", 83 + "com.atproto.server.verifyMigrationEmail", 84 + "com.atproto.sync.getBlob", 85 + "com.atproto.sync.getBlocks", 86 + "com.atproto.sync.getCheckout", 87 + "com.atproto.sync.getHead", 88 + "com.atproto.sync.getLatestCommit", 89 + "com.atproto.sync.getRecord", 90 + "com.atproto.sync.getRepo", 91 + "com.atproto.sync.getRepoStatus", 92 + "com.atproto.sync.listBlobs", 93 + "com.atproto.sync.listRepos", 94 + "com.atproto.sync.notifyOfUpdate", 95 + "com.atproto.sync.requestCrawl", 96 + "com.atproto.sync.subscribeRepos", 97 + "com.atproto.temp.checkSignupQueue", 98 + "com.atproto.temp.dereferenceScope", 99 ]; 100 101 fn is_protected_method(method: &str) -> bool { ··· 154 .headers() 155 .contains_key(http::HeaderName::from(jacquard::xrpc::Header::AtprotoProxy)) 156 { 157 + let path = req.uri().path(); 158 + let method = path.trim_start_matches("/"); 159 + 160 + if is_protected_method(method) { 161 + return Either::Right(self.inner.call(req)); 162 + } 163 + 164 + // If the age assurance override is set and this is an age assurance call then we dont want to proxy even if the client requests it 165 + if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() 166 + && (path.ends_with("app.bsky.ageassurance.getState") 167 + || path.ends_with("app.bsky.unspecced.getAgeAssuranceState")) 168 { 169 return Either::Right(self.inner.call(req)); 170 }
+31 -4
src/api/repo/blob.rs
··· 17 use serde::{Deserialize, Serialize}; 18 use serde_json::json; 19 use std::pin::Pin; 20 - use tracing::{debug, error, info}; 21 22 pub async fn upload_blob( 23 State(state): State<AppState>, ··· 91 return ApiError::Forbidden.into_response(); 92 } 93 94 - let mime_type = headers 95 .get("content-type") 96 .and_then(|h| h.to_str().ok()) 97 - .unwrap_or("application/octet-stream") 98 - .to_string(); 99 100 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 101 .fetch_optional(&state.db) ··· 135 )) 136 .into_response(); 137 } 138 139 let multihash = match Multihash::wrap(0x12, &upload_result.sha256_hash) { 140 Ok(mh) => mh,
··· 17 use serde::{Deserialize, Serialize}; 18 use serde_json::json; 19 use std::pin::Pin; 20 + use tracing::{debug, error, info, warn}; 21 + 22 + fn detect_mime_type(data: &[u8], client_hint: &str) -> String { 23 + if let Some(kind) = infer::get(data) { 24 + let detected = kind.mime_type().to_string(); 25 + if detected != client_hint { 26 + debug!( 27 + "MIME type detection: client sent '{}', detected '{}'", 28 + client_hint, detected 29 + ); 30 + } 31 + detected 32 + } else { 33 + if client_hint == "*/*" || client_hint.is_empty() { 34 + warn!("Could not detect MIME type and client sent invalid hint: '{}'", client_hint); 35 + "application/octet-stream".to_string() 36 + } else { 37 + client_hint.to_string() 38 + } 39 + } 40 + } 41 42 pub async fn upload_blob( 43 State(state): State<AppState>, ··· 111 return ApiError::Forbidden.into_response(); 112 } 113 114 + let client_mime_hint = headers 115 .get("content-type") 116 .and_then(|h| h.to_str().ok()) 117 + .unwrap_or("application/octet-stream"); 118 119 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 120 .fetch_optional(&state.db) ··· 154 )) 155 .into_response(); 156 } 157 + 158 + let mime_type = match state.blob_store.get_head(&temp_key, 8192).await { 159 + Ok(head_bytes) => detect_mime_type(&head_bytes, client_mime_hint), 160 + Err(e) => { 161 + warn!("Failed to read blob head for MIME detection: {:?}", e); 162 + client_mime_hint.to_string() 163 + } 164 + }; 165 166 let multihash = match Multihash::wrap(0x12, &upload_result.sha256_hash) { 167 Ok(mh) => mh,
+30
src/storage/mod.rs
··· 34 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>; 35 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>; 36 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>; 37 async fn delete(&self, key: &str) -> Result<(), StorageError>; 38 async fn put_stream( 39 &self, ··· 230 .into_bytes(); 231 232 crate::metrics::record_s3_operation("get", "success"); 233 Ok(data) 234 } 235
··· 34 async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), StorageError>; 35 async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError>; 36 async fn get_bytes(&self, key: &str) -> Result<Bytes, StorageError>; 37 + async fn get_head(&self, key: &str, size: usize) -> Result<Bytes, StorageError>; 38 async fn delete(&self, key: &str) -> Result<(), StorageError>; 39 async fn put_stream( 40 &self, ··· 231 .into_bytes(); 232 233 crate::metrics::record_s3_operation("get", "success"); 234 + Ok(data) 235 + } 236 + 237 + async fn get_head(&self, key: &str, size: usize) -> Result<Bytes, StorageError> { 238 + let range = format!("bytes=0-{}", size.saturating_sub(1)); 239 + let resp = self 240 + .client 241 + .get_object() 242 + .bucket(&self.bucket) 243 + .key(key) 244 + .range(range) 245 + .send() 246 + .await 247 + .map_err(|e| { 248 + crate::metrics::record_s3_operation("get_head", "error"); 249 + StorageError::S3(e.to_string()) 250 + })?; 251 + 252 + let data = resp 253 + .body 254 + .collect() 255 + .await 256 + .map_err(|e| { 257 + crate::metrics::record_s3_operation("get_head", "error"); 258 + StorageError::S3(e.to_string()) 259 + })? 260 + .into_bytes(); 261 + 262 + crate::metrics::record_s3_operation("get_head", "success"); 263 Ok(data) 264 } 265