this repo has no description
1use crate::api::{ApiError, EmptyResponse}; 2use crate::circuit_breaker::with_circuit_breaker; 3use crate::plc::{PlcClient, signing_key_to_did_key, validate_plc_operation}; 4use crate::state::AppState; 5use axum::{ 6 Json, 7 extract::State, 8 response::{IntoResponse, Response}, 9}; 10use k256::ecdsa::SigningKey; 11use serde::Deserialize; 12use serde_json::Value; 13use tracing::{error, info, warn}; 14 15#[derive(Debug, Deserialize)] 16pub struct SubmitPlcOperationInput { 17 pub operation: Value, 18} 19 20pub async fn submit_plc_operation( 21 State(state): State<AppState>, 22 headers: axum::http::HeaderMap, 23 Json(input): Json<SubmitPlcOperationInput>, 24) -> Response { 25 let bearer = match crate::auth::extract_bearer_token_from_header( 26 headers.get("Authorization").and_then(|h| h.to_str().ok()), 27 ) { 28 Some(t) => t, 29 None => { 30 return ApiError::AuthenticationRequired.into_response(); 31 } 32 }; 33 let auth_user = 34 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await { 35 Ok(user) => user, 36 Err(e) => { 37 return ApiError::from(e).into_response(); 38 } 39 }; 40 if let Err(e) = crate::auth::scope_check::check_identity_scope( 41 auth_user.is_oauth, 42 auth_user.scope.as_deref(), 43 crate::oauth::scopes::IdentityAttr::Wildcard, 44 ) { 45 return e; 46 } 47 let did = &auth_user.did; 48 if did.starts_with("did:web:") { 49 return ApiError::InvalidRequest( 50 "PLC operations are only valid for did:plc identities".into(), 51 ) 52 .into_response(); 53 } 54 if let Err(e) = validate_plc_operation(&input.operation) { 55 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response(); 56 } 57 let op = &input.operation; 58 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 59 let public_url = format!("https://{}", hostname); 60 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did) 61 .fetch_optional(&state.db) 62 .await 63 { 64 Ok(Some(row)) => row, 65 _ => { 66 return ApiError::AccountNotFound.into_response(); 67 } 68 }; 69 let key_row = match sqlx::query!( 70 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 71 user.id 72 ) 73 .fetch_optional(&state.db) 74 .await 75 { 76 Ok(Some(row)) => row, 77 _ => { 78 return ApiError::InternalError(Some("User signing key not found".into())) 79 .into_response(); 80 } 81 }; 82 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 83 { 84 Ok(k) => k, 85 Err(e) => { 86 error!("Failed to decrypt user key: {}", e); 87 return ApiError::InternalError(None).into_response(); 88 } 89 }; 90 let signing_key = match SigningKey::from_slice(&key_bytes) { 91 Ok(k) => k, 92 Err(e) => { 93 error!("Failed to create signing key: {:?}", e); 94 return ApiError::InternalError(None).into_response(); 95 } 96 }; 97 let user_did_key = signing_key_to_did_key(&signing_key); 98 let server_rotation_key = 99 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone()); 100 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) { 101 let has_server_key = rotation_keys 102 .iter() 103 .any(|k| k.as_str() == Some(&server_rotation_key)); 104 if !has_server_key { 105 return ApiError::InvalidRequest( 106 "Rotation keys do not include server's rotation key".into(), 107 ) 108 .into_response(); 109 } 110 } 111 if let Some(services) = op.get("services").and_then(|v| v.as_object()) 112 && let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) 113 { 114 let service_type = pds.get("type").and_then(|v| v.as_str()); 115 let endpoint = pds.get("endpoint").and_then(|v| v.as_str()); 116 if service_type != Some("AtprotoPersonalDataServer") { 117 return ApiError::InvalidRequest("Incorrect type on atproto_pds service".into()) 118 .into_response(); 119 } 120 if endpoint != Some(&public_url) { 121 return ApiError::InvalidRequest("Incorrect endpoint on atproto_pds service".into()) 122 .into_response(); 123 } 124 } 125 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) 126 && let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) 127 && atproto_key != user_did_key 128 { 129 return ApiError::InvalidRequest("Incorrect signing key in verificationMethods".into()) 130 .into_response(); 131 } 132 if let Some(also_known_as) = (!user.handle.is_empty()) 133 .then(|| op.get("alsoKnownAs").and_then(|v| v.as_array())) 134 .flatten() 135 { 136 let expected_handle = format!("at://{}", user.handle); 137 let first_aka = also_known_as.first().and_then(|v| v.as_str()); 138 if first_aka != Some(&expected_handle) { 139 return ApiError::InvalidRequest("Incorrect handle in alsoKnownAs".into()) 140 .into_response(); 141 } 142 } 143 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone())); 144 let operation_clone = input.operation.clone(); 145 let did_clone = did.clone(); 146 if let Err(e) = with_circuit_breaker(&state.circuit_breakers.plc_directory, || async { 147 plc_client 148 .send_operation(&did_clone, &operation_clone) 149 .await 150 }) 151 .await 152 { 153 return ApiError::from(e).into_response(); 154 } 155 match sqlx::query!( 156 "INSERT INTO repo_seq (did, event_type, handle) VALUES ($1, 'identity', $2) RETURNING seq", 157 did, 158 user.handle 159 ) 160 .fetch_one(&state.db) 161 .await 162 { 163 Ok(row) => { 164 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq)) 165 .execute(&state.db) 166 .await 167 { 168 warn!("Failed to notify identity event: {:?}", e); 169 } 170 } 171 Err(e) => { 172 warn!("Failed to sequence identity event: {:?}", e); 173 } 174 } 175 let _ = state.cache.delete(&format!("handle:{}", user.handle)).await; 176 if state.did_resolver.refresh_did(did).await.is_none() { 177 warn!(did = %did, "Failed to refresh DID cache after PLC update"); 178 } 179 info!(did = %did, "PLC operation submitted successfully"); 180 EmptyResponse::ok().into_response() 181}