this repo has no description
1use crate::api::ApiError; 2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker}; 3use crate::plc::{PlcClient, PlcError, signing_key_to_did_key, validate_plc_operation}; 4use crate::state::AppState; 5use axum::{ 6 Json, 7 extract::State, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use k256::ecdsa::SigningKey; 12use serde::Deserialize; 13use serde_json::{Value, json}; 14use tracing::{error, info, warn}; 15 16#[derive(Debug, Deserialize)] 17pub struct SubmitPlcOperationInput { 18 pub operation: Value, 19} 20 21pub async fn submit_plc_operation( 22 State(state): State<AppState>, 23 headers: axum::http::HeaderMap, 24 Json(input): Json<SubmitPlcOperationInput>, 25) -> Response { 26 let bearer = match crate::auth::extract_bearer_token_from_header( 27 headers.get("Authorization").and_then(|h| h.to_str().ok()), 28 ) { 29 Some(t) => t, 30 None => return ApiError::AuthenticationRequired.into_response(), 31 }; 32 let auth_user = 33 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await { 34 Ok(user) => user, 35 Err(e) => return ApiError::from(e).into_response(), 36 }; 37 if let Err(e) = crate::auth::scope_check::check_identity_scope( 38 auth_user.is_oauth, 39 auth_user.scope.as_deref(), 40 crate::oauth::scopes::IdentityAttr::Wildcard, 41 ) { 42 return e; 43 } 44 let did = &auth_user.did; 45 if did.starts_with("did:web:") { 46 return ApiError::InvalidRequest( 47 "PLC operations are only valid for did:plc identities".into(), 48 ) 49 .into_response(); 50 } 51 if let Err(e) = validate_plc_operation(&input.operation) { 52 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response(); 53 } 54 let op = &input.operation; 55 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 56 let public_url = format!("https://{}", hostname); 57 let user = match sqlx::query!( 58 "SELECT id, handle, deactivated_at FROM users WHERE did = $1", 59 did 60 ) 61 .fetch_optional(&state.db) 62 .await 63 { 64 Ok(Some(row)) => row, 65 _ => { 66 return ( 67 StatusCode::NOT_FOUND, 68 Json(json!({"error": "AccountNotFound"})), 69 ) 70 .into_response(); 71 } 72 }; 73 let is_migration = user.deactivated_at.is_some(); 74 let key_row = match sqlx::query!( 75 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 76 user.id 77 ) 78 .fetch_optional(&state.db) 79 .await 80 { 81 Ok(Some(row)) => row, 82 _ => { 83 return ( 84 StatusCode::INTERNAL_SERVER_ERROR, 85 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 86 ) 87 .into_response(); 88 } 89 }; 90 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 91 { 92 Ok(k) => k, 93 Err(e) => { 94 error!("Failed to decrypt user key: {}", e); 95 return ( 96 StatusCode::INTERNAL_SERVER_ERROR, 97 Json(json!({"error": "InternalError"})), 98 ) 99 .into_response(); 100 } 101 }; 102 let signing_key = match SigningKey::from_slice(&key_bytes) { 103 Ok(k) => k, 104 Err(e) => { 105 error!("Failed to create signing key: {:?}", e); 106 return ( 107 StatusCode::INTERNAL_SERVER_ERROR, 108 Json(json!({"error": "InternalError"})), 109 ) 110 .into_response(); 111 } 112 }; 113 let user_did_key = signing_key_to_did_key(&signing_key); 114 if !is_migration && let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) 115 { 116 let server_rotation_key = 117 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone()); 118 let has_server_key = rotation_keys 119 .iter() 120 .any(|k| k.as_str() == Some(&server_rotation_key)); 121 if !has_server_key { 122 return ( 123 StatusCode::BAD_REQUEST, 124 Json(json!({ 125 "error": "InvalidRequest", 126 "message": "Rotation keys do not include server's rotation key" 127 })), 128 ) 129 .into_response(); 130 } 131 } 132 if let Some(services) = op.get("services").and_then(|v| v.as_object()) 133 && let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) 134 { 135 let service_type = pds.get("type").and_then(|v| v.as_str()); 136 let endpoint = pds.get("endpoint").and_then(|v| v.as_str()); 137 if service_type != Some("AtprotoPersonalDataServer") { 138 return ( 139 StatusCode::BAD_REQUEST, 140 Json(json!({ 141 "error": "InvalidRequest", 142 "message": "Incorrect type on atproto_pds service" 143 })), 144 ) 145 .into_response(); 146 } 147 if endpoint != Some(&public_url) { 148 return ( 149 StatusCode::BAD_REQUEST, 150 Json(json!({ 151 "error": "InvalidRequest", 152 "message": "Incorrect endpoint on atproto_pds service" 153 })), 154 ) 155 .into_response(); 156 } 157 } 158 if !is_migration { 159 if let Some(verification_methods) = 160 op.get("verificationMethods").and_then(|v| v.as_object()) 161 && let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) 162 && atproto_key != user_did_key 163 { 164 return ( 165 StatusCode::BAD_REQUEST, 166 Json(json!({ 167 "error": "InvalidRequest", 168 "message": "Incorrect signing key in verificationMethods" 169 })), 170 ) 171 .into_response(); 172 } 173 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) { 174 let expected_handle = format!("at://{}", user.handle); 175 let first_aka = also_known_as.first().and_then(|v| v.as_str()); 176 if first_aka != Some(&expected_handle) { 177 return ( 178 StatusCode::BAD_REQUEST, 179 Json(json!({ 180 "error": "InvalidRequest", 181 "message": "Incorrect handle in alsoKnownAs" 182 })), 183 ) 184 .into_response(); 185 } 186 } 187 } 188 let plc_client = PlcClient::new(None); 189 let operation_clone = input.operation.clone(); 190 let did_clone = did.clone(); 191 let result: Result<(), CircuitBreakerError<PlcError>> = 192 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async { 193 plc_client 194 .send_operation(&did_clone, &operation_clone) 195 .await 196 }) 197 .await; 198 match result { 199 Ok(()) => {} 200 Err(CircuitBreakerError::CircuitOpen(e)) => { 201 warn!("PLC directory circuit breaker open: {}", e); 202 return ( 203 StatusCode::SERVICE_UNAVAILABLE, 204 Json(json!({ 205 "error": "ServiceUnavailable", 206 "message": "PLC directory service temporarily unavailable" 207 })), 208 ) 209 .into_response(); 210 } 211 Err(CircuitBreakerError::OperationFailed(e)) => { 212 error!("Failed to submit PLC operation: {:?}", e); 213 return ( 214 StatusCode::BAD_GATEWAY, 215 Json(json!({ 216 "error": "UpstreamError", 217 "message": format!("Failed to submit to PLC directory: {}", e) 218 })), 219 ) 220 .into_response(); 221 } 222 } 223 match sqlx::query!( 224 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq", 225 did 226 ) 227 .fetch_one(&state.db) 228 .await 229 { 230 Ok(row) => { 231 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq)) 232 .execute(&state.db) 233 .await 234 { 235 warn!("Failed to notify identity event: {:?}", e); 236 } 237 } 238 Err(e) => { 239 warn!("Failed to sequence identity event: {:?}", e); 240 } 241 } 242 info!("Submitted PLC operation for user {}", did); 243 (StatusCode::OK, Json(json!({}))).into_response() 244}