this repo has no description
1use crate::api::ApiError; 2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker}; 3use crate::plc::{PlcClient, PlcError, signing_key_to_did_key, validate_plc_operation}; 4use crate::state::AppState; 5use axum::{ 6 Json, 7 extract::State, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use k256::ecdsa::SigningKey; 12use serde::Deserialize; 13use serde_json::{Value, json}; 14use tracing::{error, info, warn}; 15 16#[derive(Debug, Deserialize)] 17pub struct SubmitPlcOperationInput { 18 pub operation: Value, 19} 20 21pub async fn submit_plc_operation( 22 State(state): State<AppState>, 23 headers: axum::http::HeaderMap, 24 Json(input): Json<SubmitPlcOperationInput>, 25) -> Response { 26 let bearer = match crate::auth::extract_bearer_token_from_header( 27 headers.get("Authorization").and_then(|h| h.to_str().ok()), 28 ) { 29 Some(t) => t, 30 None => return ApiError::AuthenticationRequired.into_response(), 31 }; 32 let auth_user = 33 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await { 34 Ok(user) => user, 35 Err(e) => return ApiError::from(e).into_response(), 36 }; 37 if let Err(e) = crate::auth::scope_check::check_identity_scope( 38 auth_user.is_oauth, 39 auth_user.scope.as_deref(), 40 crate::oauth::scopes::IdentityAttr::Wildcard, 41 ) { 42 return e; 43 } 44 let did = &auth_user.did; 45 if let Err(e) = validate_plc_operation(&input.operation) { 46 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response(); 47 } 48 let op = &input.operation; 49 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 50 let public_url = format!("https://{}", hostname); 51 let user = match sqlx::query!( 52 "SELECT id, handle, deactivated_at FROM users WHERE did = $1", 53 did 54 ) 55 .fetch_optional(&state.db) 56 .await 57 { 58 Ok(Some(row)) => row, 59 _ => { 60 return ( 61 StatusCode::NOT_FOUND, 62 Json(json!({"error": "AccountNotFound"})), 63 ) 64 .into_response(); 65 } 66 }; 67 let is_migration = user.deactivated_at.is_some(); 68 let key_row = match sqlx::query!( 69 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 70 user.id 71 ) 72 .fetch_optional(&state.db) 73 .await 74 { 75 Ok(Some(row)) => row, 76 _ => { 77 return ( 78 StatusCode::INTERNAL_SERVER_ERROR, 79 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 80 ) 81 .into_response(); 82 } 83 }; 84 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 85 { 86 Ok(k) => k, 87 Err(e) => { 88 error!("Failed to decrypt user key: {}", e); 89 return ( 90 StatusCode::INTERNAL_SERVER_ERROR, 91 Json(json!({"error": "InternalError"})), 92 ) 93 .into_response(); 94 } 95 }; 96 let signing_key = match SigningKey::from_slice(&key_bytes) { 97 Ok(k) => k, 98 Err(e) => { 99 error!("Failed to create signing key: {:?}", e); 100 return ( 101 StatusCode::INTERNAL_SERVER_ERROR, 102 Json(json!({"error": "InternalError"})), 103 ) 104 .into_response(); 105 } 106 }; 107 let user_did_key = signing_key_to_did_key(&signing_key); 108 if !is_migration && let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) 109 { 110 let server_rotation_key = 111 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone()); 112 let has_server_key = rotation_keys 113 .iter() 114 .any(|k| k.as_str() == Some(&server_rotation_key)); 115 if !has_server_key { 116 return ( 117 StatusCode::BAD_REQUEST, 118 Json(json!({ 119 "error": "InvalidRequest", 120 "message": "Rotation keys do not include server's rotation key" 121 })), 122 ) 123 .into_response(); 124 } 125 } 126 if let Some(services) = op.get("services").and_then(|v| v.as_object()) 127 && let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) 128 { 129 let service_type = pds.get("type").and_then(|v| v.as_str()); 130 let endpoint = pds.get("endpoint").and_then(|v| v.as_str()); 131 if service_type != Some("AtprotoPersonalDataServer") { 132 return ( 133 StatusCode::BAD_REQUEST, 134 Json(json!({ 135 "error": "InvalidRequest", 136 "message": "Incorrect type on atproto_pds service" 137 })), 138 ) 139 .into_response(); 140 } 141 if endpoint != Some(&public_url) { 142 return ( 143 StatusCode::BAD_REQUEST, 144 Json(json!({ 145 "error": "InvalidRequest", 146 "message": "Incorrect endpoint on atproto_pds service" 147 })), 148 ) 149 .into_response(); 150 } 151 } 152 if !is_migration { 153 if let Some(verification_methods) = 154 op.get("verificationMethods").and_then(|v| v.as_object()) 155 && let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) 156 && atproto_key != user_did_key 157 { 158 return ( 159 StatusCode::BAD_REQUEST, 160 Json(json!({ 161 "error": "InvalidRequest", 162 "message": "Incorrect signing key in verificationMethods" 163 })), 164 ) 165 .into_response(); 166 } 167 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) { 168 let expected_handle = format!("at://{}", user.handle); 169 let first_aka = also_known_as.first().and_then(|v| v.as_str()); 170 if first_aka != Some(&expected_handle) { 171 return ( 172 StatusCode::BAD_REQUEST, 173 Json(json!({ 174 "error": "InvalidRequest", 175 "message": "Incorrect handle in alsoKnownAs" 176 })), 177 ) 178 .into_response(); 179 } 180 } 181 } 182 let plc_client = PlcClient::new(None); 183 let operation_clone = input.operation.clone(); 184 let did_clone = did.clone(); 185 let result: Result<(), CircuitBreakerError<PlcError>> = 186 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async { 187 plc_client 188 .send_operation(&did_clone, &operation_clone) 189 .await 190 }) 191 .await; 192 match result { 193 Ok(()) => {} 194 Err(CircuitBreakerError::CircuitOpen(e)) => { 195 warn!("PLC directory circuit breaker open: {}", e); 196 return ( 197 StatusCode::SERVICE_UNAVAILABLE, 198 Json(json!({ 199 "error": "ServiceUnavailable", 200 "message": "PLC directory service temporarily unavailable" 201 })), 202 ) 203 .into_response(); 204 } 205 Err(CircuitBreakerError::OperationFailed(e)) => { 206 error!("Failed to submit PLC operation: {:?}", e); 207 return ( 208 StatusCode::BAD_GATEWAY, 209 Json(json!({ 210 "error": "UpstreamError", 211 "message": format!("Failed to submit to PLC directory: {}", e) 212 })), 213 ) 214 .into_response(); 215 } 216 } 217 match sqlx::query!( 218 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq", 219 did 220 ) 221 .fetch_one(&state.db) 222 .await 223 { 224 Ok(row) => { 225 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq)) 226 .execute(&state.db) 227 .await 228 { 229 warn!("Failed to notify identity event: {:?}", e); 230 } 231 } 232 Err(e) => { 233 warn!("Failed to sequence identity event: {:?}", e); 234 } 235 } 236 info!("Submitted PLC operation for user {}", did); 237 (StatusCode::OK, Json(json!({}))).into_response() 238}