this repo has no description
1use crate::api::ApiError; 2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker}; 3use crate::plc::{PlcClient, PlcError, signing_key_to_did_key, validate_plc_operation}; 4use crate::state::AppState; 5use axum::{ 6 Json, 7 extract::State, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use k256::ecdsa::SigningKey; 12use serde::Deserialize; 13use serde_json::{Value, json}; 14use tracing::{error, info, warn}; 15 16#[derive(Debug, Deserialize)] 17pub struct SubmitPlcOperationInput { 18 pub operation: Value, 19} 20 21pub async fn submit_plc_operation( 22 State(state): State<AppState>, 23 headers: axum::http::HeaderMap, 24 Json(input): Json<SubmitPlcOperationInput>, 25) -> Response { 26 info!("[MIGRATION] submitPlcOperation called"); 27 let bearer = match crate::auth::extract_bearer_token_from_header( 28 headers.get("Authorization").and_then(|h| h.to_str().ok()), 29 ) { 30 Some(t) => t, 31 None => { 32 info!("[MIGRATION] submitPlcOperation: No bearer token"); 33 return ApiError::AuthenticationRequired.into_response(); 34 } 35 }; 36 let auth_user = 37 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await { 38 Ok(user) => user, 39 Err(e) => { 40 info!("[MIGRATION] submitPlcOperation: Auth failed: {:?}", e); 41 return ApiError::from(e).into_response(); 42 } 43 }; 44 info!( 45 "[MIGRATION] submitPlcOperation: Authenticated user did={}", 46 auth_user.did 47 ); 48 if let Err(e) = crate::auth::scope_check::check_identity_scope( 49 auth_user.is_oauth, 50 auth_user.scope.as_deref(), 51 crate::oauth::scopes::IdentityAttr::Wildcard, 52 ) { 53 info!("[MIGRATION] submitPlcOperation: Scope check failed"); 54 return e; 55 } 56 let did = &auth_user.did; 57 if did.starts_with("did:web:") { 58 return ApiError::InvalidRequest( 59 "PLC operations are only valid for did:plc identities".into(), 60 ) 61 .into_response(); 62 } 63 if let Err(e) = validate_plc_operation(&input.operation) { 64 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response(); 65 } 66 let op = &input.operation; 67 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 68 let public_url = format!("https://{}", hostname); 69 let user = match sqlx::query!( 70 "SELECT id, handle, deactivated_at FROM users WHERE did = $1", 71 did 72 ) 73 .fetch_optional(&state.db) 74 .await 75 { 76 Ok(Some(row)) => row, 77 _ => { 78 return ( 79 StatusCode::NOT_FOUND, 80 Json(json!({"error": "AccountNotFound"})), 81 ) 82 .into_response(); 83 } 84 }; 85 let is_migration = user.deactivated_at.is_some(); 86 let key_row = match sqlx::query!( 87 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 88 user.id 89 ) 90 .fetch_optional(&state.db) 91 .await 92 { 93 Ok(Some(row)) => row, 94 _ => { 95 return ( 96 StatusCode::INTERNAL_SERVER_ERROR, 97 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 98 ) 99 .into_response(); 100 } 101 }; 102 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 103 { 104 Ok(k) => k, 105 Err(e) => { 106 error!("Failed to decrypt user key: {}", e); 107 return ( 108 StatusCode::INTERNAL_SERVER_ERROR, 109 Json(json!({"error": "InternalError"})), 110 ) 111 .into_response(); 112 } 113 }; 114 let signing_key = match SigningKey::from_slice(&key_bytes) { 115 Ok(k) => k, 116 Err(e) => { 117 error!("Failed to create signing key: {:?}", e); 118 return ( 119 StatusCode::INTERNAL_SERVER_ERROR, 120 Json(json!({"error": "InternalError"})), 121 ) 122 .into_response(); 123 } 124 }; 125 let user_did_key = signing_key_to_did_key(&signing_key); 126 if !is_migration && let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) 127 { 128 let server_rotation_key = 129 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone()); 130 let has_server_key = rotation_keys 131 .iter() 132 .any(|k| k.as_str() == Some(&server_rotation_key)); 133 if !has_server_key { 134 return ( 135 StatusCode::BAD_REQUEST, 136 Json(json!({ 137 "error": "InvalidRequest", 138 "message": "Rotation keys do not include server's rotation key" 139 })), 140 ) 141 .into_response(); 142 } 143 } 144 if let Some(services) = op.get("services").and_then(|v| v.as_object()) 145 && let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) 146 { 147 let service_type = pds.get("type").and_then(|v| v.as_str()); 148 let endpoint = pds.get("endpoint").and_then(|v| v.as_str()); 149 if service_type != Some("AtprotoPersonalDataServer") { 150 return ( 151 StatusCode::BAD_REQUEST, 152 Json(json!({ 153 "error": "InvalidRequest", 154 "message": "Incorrect type on atproto_pds service" 155 })), 156 ) 157 .into_response(); 158 } 159 if endpoint != Some(&public_url) { 160 return ( 161 StatusCode::BAD_REQUEST, 162 Json(json!({ 163 "error": "InvalidRequest", 164 "message": "Incorrect endpoint on atproto_pds service" 165 })), 166 ) 167 .into_response(); 168 } 169 } 170 if !is_migration { 171 if let Some(verification_methods) = 172 op.get("verificationMethods").and_then(|v| v.as_object()) 173 && let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) 174 && atproto_key != user_did_key 175 { 176 return ( 177 StatusCode::BAD_REQUEST, 178 Json(json!({ 179 "error": "InvalidRequest", 180 "message": "Incorrect signing key in verificationMethods" 181 })), 182 ) 183 .into_response(); 184 } 185 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) { 186 let expected_handle = format!("at://{}", user.handle); 187 let first_aka = also_known_as.first().and_then(|v| v.as_str()); 188 if first_aka != Some(&expected_handle) { 189 return ( 190 StatusCode::BAD_REQUEST, 191 Json(json!({ 192 "error": "InvalidRequest", 193 "message": "Incorrect handle in alsoKnownAs" 194 })), 195 ) 196 .into_response(); 197 } 198 } 199 } 200 let plc_client = PlcClient::new(None); 201 let operation_clone = input.operation.clone(); 202 let did_clone = did.clone(); 203 info!( 204 "[MIGRATION] submitPlcOperation: Sending operation to PLC directory for did={}", 205 did 206 ); 207 let plc_start = std::time::Instant::now(); 208 let result: Result<(), CircuitBreakerError<PlcError>> = 209 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async { 210 plc_client 211 .send_operation(&did_clone, &operation_clone) 212 .await 213 }) 214 .await; 215 match result { 216 Ok(()) => { 217 info!( 218 "[MIGRATION] submitPlcOperation: PLC directory accepted operation in {:?}", 219 plc_start.elapsed() 220 ); 221 } 222 Err(CircuitBreakerError::CircuitOpen(e)) => { 223 warn!( 224 "[MIGRATION] submitPlcOperation: PLC directory circuit breaker open: {}", 225 e 226 ); 227 return ( 228 StatusCode::SERVICE_UNAVAILABLE, 229 Json(json!({ 230 "error": "ServiceUnavailable", 231 "message": "PLC directory service temporarily unavailable" 232 })), 233 ) 234 .into_response(); 235 } 236 Err(CircuitBreakerError::OperationFailed(e)) => { 237 error!( 238 "[MIGRATION] submitPlcOperation: PLC operation failed: {:?}", 239 e 240 ); 241 return ( 242 StatusCode::BAD_GATEWAY, 243 Json(json!({ 244 "error": "UpstreamError", 245 "message": format!("Failed to submit to PLC directory: {}", e) 246 })), 247 ) 248 .into_response(); 249 } 250 } 251 info!( 252 "[MIGRATION] submitPlcOperation: Sequencing identity event for did={}", 253 did 254 ); 255 match sqlx::query!( 256 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq", 257 did 258 ) 259 .fetch_one(&state.db) 260 .await 261 { 262 Ok(row) => { 263 info!( 264 "[MIGRATION] submitPlcOperation: Identity event sequenced with seq={}", 265 row.seq 266 ); 267 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq)) 268 .execute(&state.db) 269 .await 270 { 271 warn!( 272 "[MIGRATION] submitPlcOperation: Failed to notify identity event: {:?}", 273 e 274 ); 275 } 276 } 277 Err(e) => { 278 warn!( 279 "[MIGRATION] submitPlcOperation: Failed to sequence identity event: {:?}", 280 e 281 ); 282 } 283 } 284 info!("[MIGRATION] submitPlcOperation: SUCCESS for did={}", did); 285 (StatusCode::OK, Json(json!({}))).into_response() 286}