this repo has no description
1use crate::api::ApiError; 2use crate::circuit_breaker::{with_circuit_breaker, CircuitBreakerError}; 3use crate::plc::{signing_key_to_did_key, validate_plc_operation, PlcClient, PlcError}; 4use crate::state::AppState; 5use axum::{ 6 extract::State, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9 Json, 10}; 11use k256::ecdsa::SigningKey; 12use serde::Deserialize; 13use serde_json::{json, Value}; 14use tracing::{error, info, warn}; 15 16#[derive(Debug, Deserialize)] 17pub struct SubmitPlcOperationInput { 18 pub operation: Value, 19} 20 21pub async fn submit_plc_operation( 22 State(state): State<AppState>, 23 headers: axum::http::HeaderMap, 24 Json(input): Json<SubmitPlcOperationInput>, 25) -> Response { 26 let bearer = match crate::auth::extract_bearer_token_from_header( 27 headers.get("Authorization").and_then(|h| h.to_str().ok()), 28 ) { 29 Some(t) => t, 30 None => return ApiError::AuthenticationRequired.into_response(), 31 }; 32 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await { 33 Ok(user) => user, 34 Err(e) => return ApiError::from(e).into_response(), 35 }; 36 let did = &auth_user.did; 37 if let Err(e) = validate_plc_operation(&input.operation) { 38 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response(); 39 } 40 let op = &input.operation; 41 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 42 let public_url = format!("https://{}", hostname); 43 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did) 44 .fetch_optional(&state.db) 45 .await 46 { 47 Ok(Some(row)) => row, 48 _ => { 49 return ( 50 StatusCode::NOT_FOUND, 51 Json(json!({"error": "AccountNotFound"})), 52 ) 53 .into_response(); 54 } 55 }; 56 let key_row = match sqlx::query!( 57 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 58 user.id 59 ) 60 .fetch_optional(&state.db) 61 .await 62 { 63 Ok(Some(row)) => row, 64 _ => { 65 return ( 66 StatusCode::INTERNAL_SERVER_ERROR, 67 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 68 ) 69 .into_response(); 70 } 71 }; 72 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 73 { 74 Ok(k) => k, 75 Err(e) => { 76 error!("Failed to decrypt user key: {}", e); 77 return ( 78 StatusCode::INTERNAL_SERVER_ERROR, 79 Json(json!({"error": "InternalError"})), 80 ) 81 .into_response(); 82 } 83 }; 84 let signing_key = match SigningKey::from_slice(&key_bytes) { 85 Ok(k) => k, 86 Err(e) => { 87 error!("Failed to create signing key: {:?}", e); 88 return ( 89 StatusCode::INTERNAL_SERVER_ERROR, 90 Json(json!({"error": "InternalError"})), 91 ) 92 .into_response(); 93 } 94 }; 95 let user_did_key = signing_key_to_did_key(&signing_key); 96 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) { 97 let server_rotation_key = 98 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone()); 99 let has_server_key = rotation_keys 100 .iter() 101 .any(|k| k.as_str() == Some(&server_rotation_key)); 102 if !has_server_key { 103 return ( 104 StatusCode::BAD_REQUEST, 105 Json(json!({ 106 "error": "InvalidRequest", 107 "message": "Rotation keys do not include server's rotation key" 108 })), 109 ) 110 .into_response(); 111 } 112 } 113 if let Some(services) = op.get("services").and_then(|v| v.as_object()) { 114 if let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) { 115 let service_type = pds.get("type").and_then(|v| v.as_str()); 116 let endpoint = pds.get("endpoint").and_then(|v| v.as_str()); 117 if service_type != Some("AtprotoPersonalDataServer") { 118 return ( 119 StatusCode::BAD_REQUEST, 120 Json(json!({ 121 "error": "InvalidRequest", 122 "message": "Incorrect type on atproto_pds service" 123 })), 124 ) 125 .into_response(); 126 } 127 if endpoint != Some(&public_url) { 128 return ( 129 StatusCode::BAD_REQUEST, 130 Json(json!({ 131 "error": "InvalidRequest", 132 "message": "Incorrect endpoint on atproto_pds service" 133 })), 134 ) 135 .into_response(); 136 } 137 } 138 } 139 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) { 140 if let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) { 141 if atproto_key != user_did_key { 142 return ( 143 StatusCode::BAD_REQUEST, 144 Json(json!({ 145 "error": "InvalidRequest", 146 "message": "Incorrect signing key in verificationMethods" 147 })), 148 ) 149 .into_response(); 150 } 151 } 152 } 153 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) { 154 let expected_handle = format!("at://{}", user.handle); 155 let first_aka = also_known_as.first().and_then(|v| v.as_str()); 156 if first_aka != Some(&expected_handle) { 157 return ( 158 StatusCode::BAD_REQUEST, 159 Json(json!({ 160 "error": "InvalidRequest", 161 "message": "Incorrect handle in alsoKnownAs" 162 })), 163 ) 164 .into_response(); 165 } 166 } 167 let plc_client = PlcClient::new(None); 168 let operation_clone = input.operation.clone(); 169 let did_clone = did.clone(); 170 let result: Result<(), CircuitBreakerError<PlcError>> = with_circuit_breaker( 171 &state.circuit_breakers.plc_directory, 172 || async { plc_client.send_operation(&did_clone, &operation_clone).await }, 173 ) 174 .await; 175 match result { 176 Ok(()) => {} 177 Err(CircuitBreakerError::CircuitOpen(e)) => { 178 warn!("PLC directory circuit breaker open: {}", e); 179 return ( 180 StatusCode::SERVICE_UNAVAILABLE, 181 Json(json!({ 182 "error": "ServiceUnavailable", 183 "message": "PLC directory service temporarily unavailable" 184 })), 185 ) 186 .into_response(); 187 } 188 Err(CircuitBreakerError::OperationFailed(e)) => { 189 error!("Failed to submit PLC operation: {:?}", e); 190 return ( 191 StatusCode::BAD_GATEWAY, 192 Json(json!({ 193 "error": "UpstreamError", 194 "message": format!("Failed to submit to PLC directory: {}", e) 195 })), 196 ) 197 .into_response(); 198 } 199 } 200 match sqlx::query!( 201 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq", 202 did 203 ) 204 .fetch_one(&state.db) 205 .await 206 { 207 Ok(row) => { 208 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq)) 209 .execute(&state.db) 210 .await 211 { 212 warn!("Failed to notify identity event: {:?}", e); 213 } 214 } 215 Err(e) => { 216 warn!("Failed to sequence identity event: {:?}", e); 217 } 218 } 219 info!("Submitted PLC operation for user {}", did); 220 (StatusCode::OK, Json(json!({}))).into_response() 221}