this repo has no description
1use crate::api::ApiError; 2use crate::circuit_breaker::{with_circuit_breaker, CircuitBreakerError}; 3use crate::plc::{signing_key_to_did_key, validate_plc_operation, PlcClient, PlcError}; 4use crate::state::AppState; 5use axum::{ 6 extract::State, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9 Json, 10}; 11use k256::ecdsa::SigningKey; 12use serde::Deserialize; 13use serde_json::{json, Value}; 14use tracing::{error, info, warn}; 15#[derive(Debug, Deserialize)] 16pub struct SubmitPlcOperationInput { 17 pub operation: Value, 18} 19pub async fn submit_plc_operation( 20 State(state): State<AppState>, 21 headers: axum::http::HeaderMap, 22 Json(input): Json<SubmitPlcOperationInput>, 23) -> Response { 24 let bearer = match crate::auth::extract_bearer_token_from_header( 25 headers.get("Authorization").and_then(|h| h.to_str().ok()), 26 ) { 27 Some(t) => t, 28 None => return ApiError::AuthenticationRequired.into_response(), 29 }; 30 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await { 31 Ok(user) => user, 32 Err(e) => return ApiError::from(e).into_response(), 33 }; 34 let did = &auth_user.did; 35 if let Err(e) = validate_plc_operation(&input.operation) { 36 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response(); 37 } 38 let op = &input.operation; 39 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 40 let public_url = format!("https://{}", hostname); 41 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did) 42 .fetch_optional(&state.db) 43 .await 44 { 45 Ok(Some(row)) => row, 46 _ => { 47 return ( 48 StatusCode::NOT_FOUND, 49 Json(json!({"error": "AccountNotFound"})), 50 ) 51 .into_response(); 52 } 53 }; 54 let key_row = match sqlx::query!( 55 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 56 user.id 57 ) 58 .fetch_optional(&state.db) 59 .await 60 { 61 Ok(Some(row)) => row, 62 _ => { 63 return ( 64 StatusCode::INTERNAL_SERVER_ERROR, 65 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 66 ) 67 .into_response(); 68 } 69 }; 70 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 71 { 72 Ok(k) => k, 73 Err(e) => { 74 error!("Failed to decrypt user key: {}", e); 75 return ( 76 StatusCode::INTERNAL_SERVER_ERROR, 77 Json(json!({"error": "InternalError"})), 78 ) 79 .into_response(); 80 } 81 }; 82 let signing_key = match SigningKey::from_slice(&key_bytes) { 83 Ok(k) => k, 84 Err(e) => { 85 error!("Failed to create signing key: {:?}", e); 86 return ( 87 StatusCode::INTERNAL_SERVER_ERROR, 88 Json(json!({"error": "InternalError"})), 89 ) 90 .into_response(); 91 } 92 }; 93 let user_did_key = signing_key_to_did_key(&signing_key); 94 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) { 95 let server_rotation_key = 96 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone()); 97 let has_server_key = rotation_keys 98 .iter() 99 .any(|k| k.as_str() == Some(&server_rotation_key)); 100 if !has_server_key { 101 return ( 102 StatusCode::BAD_REQUEST, 103 Json(json!({ 104 "error": "InvalidRequest", 105 "message": "Rotation keys do not include server's rotation key" 106 })), 107 ) 108 .into_response(); 109 } 110 } 111 if let Some(services) = op.get("services").and_then(|v| v.as_object()) { 112 if let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) { 113 let service_type = pds.get("type").and_then(|v| v.as_str()); 114 let endpoint = pds.get("endpoint").and_then(|v| v.as_str()); 115 if service_type != Some("AtprotoPersonalDataServer") { 116 return ( 117 StatusCode::BAD_REQUEST, 118 Json(json!({ 119 "error": "InvalidRequest", 120 "message": "Incorrect type on atproto_pds service" 121 })), 122 ) 123 .into_response(); 124 } 125 if endpoint != Some(&public_url) { 126 return ( 127 StatusCode::BAD_REQUEST, 128 Json(json!({ 129 "error": "InvalidRequest", 130 "message": "Incorrect endpoint on atproto_pds service" 131 })), 132 ) 133 .into_response(); 134 } 135 } 136 } 137 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) { 138 if let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) { 139 if atproto_key != user_did_key { 140 return ( 141 StatusCode::BAD_REQUEST, 142 Json(json!({ 143 "error": "InvalidRequest", 144 "message": "Incorrect signing key in verificationMethods" 145 })), 146 ) 147 .into_response(); 148 } 149 } 150 } 151 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) { 152 let expected_handle = format!("at://{}", user.handle); 153 let first_aka = also_known_as.first().and_then(|v| v.as_str()); 154 if first_aka != Some(&expected_handle) { 155 return ( 156 StatusCode::BAD_REQUEST, 157 Json(json!({ 158 "error": "InvalidRequest", 159 "message": "Incorrect handle in alsoKnownAs" 160 })), 161 ) 162 .into_response(); 163 } 164 } 165 let plc_client = PlcClient::new(None); 166 let operation_clone = input.operation.clone(); 167 let did_clone = did.clone(); 168 let result: Result<(), CircuitBreakerError<PlcError>> = with_circuit_breaker( 169 &state.circuit_breakers.plc_directory, 170 || async { plc_client.send_operation(&did_clone, &operation_clone).await }, 171 ) 172 .await; 173 match result { 174 Ok(()) => {} 175 Err(CircuitBreakerError::CircuitOpen(e)) => { 176 warn!("PLC directory circuit breaker open: {}", e); 177 return ( 178 StatusCode::SERVICE_UNAVAILABLE, 179 Json(json!({ 180 "error": "ServiceUnavailable", 181 "message": "PLC directory service temporarily unavailable" 182 })), 183 ) 184 .into_response(); 185 } 186 Err(CircuitBreakerError::OperationFailed(e)) => { 187 error!("Failed to submit PLC operation: {:?}", e); 188 return ( 189 StatusCode::BAD_GATEWAY, 190 Json(json!({ 191 "error": "UpstreamError", 192 "message": format!("Failed to submit to PLC directory: {}", e) 193 })), 194 ) 195 .into_response(); 196 } 197 } 198 match sqlx::query!( 199 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq", 200 did 201 ) 202 .fetch_one(&state.db) 203 .await 204 { 205 Ok(row) => { 206 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq)) 207 .execute(&state.db) 208 .await 209 { 210 warn!("Failed to notify identity event: {:?}", e); 211 } 212 } 213 Err(e) => { 214 warn!("Failed to sequence identity event: {:?}", e); 215 } 216 } 217 info!("Submitted PLC operation for user {}", did); 218 (StatusCode::OK, Json(json!({}))).into_response() 219}