this repo has no description
1use crate::api::ApiError; 2use crate::circuit_breaker::{with_circuit_breaker, CircuitBreakerError}; 3use crate::plc::{signing_key_to_did_key, validate_plc_operation, PlcClient, PlcError}; 4use crate::state::AppState; 5use axum::{ 6 extract::State, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9 Json, 10}; 11use k256::ecdsa::SigningKey; 12use serde::Deserialize; 13use serde_json::{json, Value}; 14use tracing::{error, info, warn}; 15 16#[derive(Debug, Deserialize)] 17pub struct SubmitPlcOperationInput { 18 pub operation: Value, 19} 20 21pub async fn submit_plc_operation( 22 State(state): State<AppState>, 23 headers: axum::http::HeaderMap, 24 Json(input): Json<SubmitPlcOperationInput>, 25) -> Response { 26 let bearer = match crate::auth::extract_bearer_token_from_header( 27 headers.get("Authorization").and_then(|h| h.to_str().ok()), 28 ) { 29 Some(t) => t, 30 None => return ApiError::AuthenticationRequired.into_response(), 31 }; 32 33 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await { 34 Ok(user) => user, 35 Err(e) => return ApiError::from(e).into_response(), 36 }; 37 38 let did = &auth_user.did; 39 40 if let Err(e) = validate_plc_operation(&input.operation) { 41 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response(); 42 } 43 44 let op = &input.operation; 45 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 46 let public_url = format!("https://{}", hostname); 47 48 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did) 49 .fetch_optional(&state.db) 50 .await 51 { 52 Ok(Some(row)) => row, 53 _ => { 54 return ( 55 StatusCode::NOT_FOUND, 56 Json(json!({"error": "AccountNotFound"})), 57 ) 58 .into_response(); 59 } 60 }; 61 62 let key_row = match sqlx::query!( 63 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 64 user.id 65 ) 66 .fetch_optional(&state.db) 67 .await 68 { 69 Ok(Some(row)) => row, 70 _ => { 71 return ( 72 StatusCode::INTERNAL_SERVER_ERROR, 73 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 74 ) 75 .into_response(); 76 } 77 }; 78 79 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 80 { 81 Ok(k) => k, 82 Err(e) => { 83 error!("Failed to decrypt user key: {}", e); 84 return ( 85 StatusCode::INTERNAL_SERVER_ERROR, 86 Json(json!({"error": "InternalError"})), 87 ) 88 .into_response(); 89 } 90 }; 91 92 let signing_key = match SigningKey::from_slice(&key_bytes) { 93 Ok(k) => k, 94 Err(e) => { 95 error!("Failed to create signing key: {:?}", e); 96 return ( 97 StatusCode::INTERNAL_SERVER_ERROR, 98 Json(json!({"error": "InternalError"})), 99 ) 100 .into_response(); 101 } 102 }; 103 104 let user_did_key = signing_key_to_did_key(&signing_key); 105 106 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) { 107 let server_rotation_key = 108 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone()); 109 110 let has_server_key = rotation_keys 111 .iter() 112 .any(|k| k.as_str() == Some(&server_rotation_key)); 113 114 if !has_server_key { 115 return ( 116 StatusCode::BAD_REQUEST, 117 Json(json!({ 118 "error": "InvalidRequest", 119 "message": "Rotation keys do not include server's rotation key" 120 })), 121 ) 122 .into_response(); 123 } 124 } 125 126 if let Some(services) = op.get("services").and_then(|v| v.as_object()) { 127 if let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) { 128 let service_type = pds.get("type").and_then(|v| v.as_str()); 129 let endpoint = pds.get("endpoint").and_then(|v| v.as_str()); 130 131 if service_type != Some("AtprotoPersonalDataServer") { 132 return ( 133 StatusCode::BAD_REQUEST, 134 Json(json!({ 135 "error": "InvalidRequest", 136 "message": "Incorrect type on atproto_pds service" 137 })), 138 ) 139 .into_response(); 140 } 141 142 if endpoint != Some(&public_url) { 143 return ( 144 StatusCode::BAD_REQUEST, 145 Json(json!({ 146 "error": "InvalidRequest", 147 "message": "Incorrect endpoint on atproto_pds service" 148 })), 149 ) 150 .into_response(); 151 } 152 } 153 } 154 155 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) { 156 if let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) { 157 if atproto_key != user_did_key { 158 return ( 159 StatusCode::BAD_REQUEST, 160 Json(json!({ 161 "error": "InvalidRequest", 162 "message": "Incorrect signing key in verificationMethods" 163 })), 164 ) 165 .into_response(); 166 } 167 } 168 } 169 170 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) { 171 let expected_handle = format!("at://{}", user.handle); 172 let first_aka = also_known_as.first().and_then(|v| v.as_str()); 173 174 if first_aka != Some(&expected_handle) { 175 return ( 176 StatusCode::BAD_REQUEST, 177 Json(json!({ 178 "error": "InvalidRequest", 179 "message": "Incorrect handle in alsoKnownAs" 180 })), 181 ) 182 .into_response(); 183 } 184 } 185 186 let plc_client = PlcClient::new(None); 187 let operation_clone = input.operation.clone(); 188 let did_clone = did.clone(); 189 let result: Result<(), CircuitBreakerError<PlcError>> = with_circuit_breaker( 190 &state.circuit_breakers.plc_directory, 191 || async { plc_client.send_operation(&did_clone, &operation_clone).await }, 192 ) 193 .await; 194 195 match result { 196 Ok(()) => {} 197 Err(CircuitBreakerError::CircuitOpen(e)) => { 198 warn!("PLC directory circuit breaker open: {}", e); 199 return ( 200 StatusCode::SERVICE_UNAVAILABLE, 201 Json(json!({ 202 "error": "ServiceUnavailable", 203 "message": "PLC directory service temporarily unavailable" 204 })), 205 ) 206 .into_response(); 207 } 208 Err(CircuitBreakerError::OperationFailed(e)) => { 209 error!("Failed to submit PLC operation: {:?}", e); 210 return ( 211 StatusCode::BAD_GATEWAY, 212 Json(json!({ 213 "error": "UpstreamError", 214 "message": format!("Failed to submit to PLC directory: {}", e) 215 })), 216 ) 217 .into_response(); 218 } 219 } 220 221 match sqlx::query!( 222 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq", 223 did 224 ) 225 .fetch_one(&state.db) 226 .await 227 { 228 Ok(row) => { 229 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq)) 230 .execute(&state.db) 231 .await 232 { 233 warn!("Failed to notify identity event: {:?}", e); 234 } 235 } 236 Err(e) => { 237 warn!("Failed to sequence identity event: {:?}", e); 238 } 239 } 240 241 info!("Submitted PLC operation for user {}", did); 242 243 (StatusCode::OK, Json(json!({}))).into_response() 244}