this repo has no description
1use crate::api::ApiError; 2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker}; 3use crate::plc::{PlcClient, PlcError, signing_key_to_did_key, validate_plc_operation}; 4use crate::state::AppState; 5use axum::{ 6 Json, 7 extract::State, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use k256::ecdsa::SigningKey; 12use serde::Deserialize; 13use serde_json::{Value, json}; 14use tracing::{error, info, warn}; 15 16#[derive(Debug, Deserialize)] 17pub struct SubmitPlcOperationInput { 18 pub operation: Value, 19} 20 21pub async fn submit_plc_operation( 22 State(state): State<AppState>, 23 headers: axum::http::HeaderMap, 24 Json(input): Json<SubmitPlcOperationInput>, 25) -> Response { 26 let bearer = match crate::auth::extract_bearer_token_from_header( 27 headers.get("Authorization").and_then(|h| h.to_str().ok()), 28 ) { 29 Some(t) => t, 30 None => { 31 return ApiError::AuthenticationRequired.into_response(); 32 } 33 }; 34 let auth_user = 35 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await { 36 Ok(user) => user, 37 Err(e) => { 38 return ApiError::from(e).into_response(); 39 } 40 }; 41 if let Err(e) = crate::auth::scope_check::check_identity_scope( 42 auth_user.is_oauth, 43 auth_user.scope.as_deref(), 44 crate::oauth::scopes::IdentityAttr::Wildcard, 45 ) { 46 return e; 47 } 48 let did = &auth_user.did; 49 if did.starts_with("did:web:") { 50 return ApiError::InvalidRequest( 51 "PLC operations are only valid for did:plc identities".into(), 52 ) 53 .into_response(); 54 } 55 if let Err(e) = validate_plc_operation(&input.operation) { 56 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response(); 57 } 58 let op = &input.operation; 59 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 60 let public_url = format!("https://{}", hostname); 61 let user = match sqlx::query!( 62 "SELECT id, handle FROM users WHERE did = $1", 63 did 64 ) 65 .fetch_optional(&state.db) 66 .await 67 { 68 Ok(Some(row)) => row, 69 _ => { 70 return ( 71 StatusCode::NOT_FOUND, 72 Json(json!({"error": "AccountNotFound"})), 73 ) 74 .into_response(); 75 } 76 }; 77 let key_row = match sqlx::query!( 78 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 79 user.id 80 ) 81 .fetch_optional(&state.db) 82 .await 83 { 84 Ok(Some(row)) => row, 85 _ => { 86 return ( 87 StatusCode::INTERNAL_SERVER_ERROR, 88 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 89 ) 90 .into_response(); 91 } 92 }; 93 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 94 { 95 Ok(k) => k, 96 Err(e) => { 97 error!("Failed to decrypt user key: {}", e); 98 return ( 99 StatusCode::INTERNAL_SERVER_ERROR, 100 Json(json!({"error": "InternalError"})), 101 ) 102 .into_response(); 103 } 104 }; 105 let signing_key = match SigningKey::from_slice(&key_bytes) { 106 Ok(k) => k, 107 Err(e) => { 108 error!("Failed to create signing key: {:?}", e); 109 return ( 110 StatusCode::INTERNAL_SERVER_ERROR, 111 Json(json!({"error": "InternalError"})), 112 ) 113 .into_response(); 114 } 115 }; 116 let user_did_key = signing_key_to_did_key(&signing_key); 117 let server_rotation_key = 118 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone()); 119 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) { 120 let has_server_key = rotation_keys 121 .iter() 122 .any(|k| k.as_str() == Some(&server_rotation_key)); 123 if !has_server_key { 124 return ( 125 StatusCode::BAD_REQUEST, 126 Json(json!({ 127 "error": "InvalidRequest", 128 "message": "Rotation keys do not include server's rotation key" 129 })), 130 ) 131 .into_response(); 132 } 133 } 134 if let Some(services) = op.get("services").and_then(|v| v.as_object()) 135 && let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) 136 { 137 let service_type = pds.get("type").and_then(|v| v.as_str()); 138 let endpoint = pds.get("endpoint").and_then(|v| v.as_str()); 139 if service_type != Some("AtprotoPersonalDataServer") { 140 return ( 141 StatusCode::BAD_REQUEST, 142 Json(json!({ 143 "error": "InvalidRequest", 144 "message": "Incorrect type on atproto_pds service" 145 })), 146 ) 147 .into_response(); 148 } 149 if endpoint != Some(&public_url) { 150 return ( 151 StatusCode::BAD_REQUEST, 152 Json(json!({ 153 "error": "InvalidRequest", 154 "message": "Incorrect endpoint on atproto_pds service" 155 })), 156 ) 157 .into_response(); 158 } 159 } 160 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) 161 && let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) 162 && atproto_key != user_did_key 163 { 164 return ( 165 StatusCode::BAD_REQUEST, 166 Json(json!({ 167 "error": "InvalidRequest", 168 "message": "Incorrect signing key in verificationMethods" 169 })), 170 ) 171 .into_response(); 172 } 173 if !user.handle.is_empty() { 174 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) { 175 let expected_handle = format!("at://{}", user.handle); 176 let first_aka = also_known_as.first().and_then(|v| v.as_str()); 177 if first_aka != Some(&expected_handle) { 178 return ( 179 StatusCode::BAD_REQUEST, 180 Json(json!({ 181 "error": "InvalidRequest", 182 "message": "Incorrect handle in alsoKnownAs" 183 })), 184 ) 185 .into_response(); 186 } 187 } 188 } 189 let plc_client = PlcClient::new(None); 190 let operation_clone = input.operation.clone(); 191 let did_clone = did.clone(); 192 let result: Result<(), CircuitBreakerError<PlcError>> = 193 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async { 194 plc_client 195 .send_operation(&did_clone, &operation_clone) 196 .await 197 }) 198 .await; 199 match result { 200 Ok(()) => {} 201 Err(CircuitBreakerError::CircuitOpen(e)) => { 202 warn!("PLC directory circuit breaker open: {}", e); 203 return ( 204 StatusCode::SERVICE_UNAVAILABLE, 205 Json(json!({ 206 "error": "ServiceUnavailable", 207 "message": "PLC directory service temporarily unavailable" 208 })), 209 ) 210 .into_response(); 211 } 212 Err(CircuitBreakerError::OperationFailed(e)) => { 213 error!("PLC operation failed: {:?}", e); 214 return ( 215 StatusCode::BAD_GATEWAY, 216 Json(json!({ 217 "error": "UpstreamError", 218 "message": format!("Failed to submit to PLC directory: {}", e) 219 })), 220 ) 221 .into_response(); 222 } 223 } 224 match sqlx::query!( 225 "INSERT INTO repo_seq (did, event_type, handle) VALUES ($1, 'identity', $2) RETURNING seq", 226 did, 227 user.handle 228 ) 229 .fetch_one(&state.db) 230 .await 231 { 232 Ok(row) => { 233 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq)) 234 .execute(&state.db) 235 .await 236 { 237 warn!("Failed to notify identity event: {:?}", e); 238 } 239 } 240 Err(e) => { 241 warn!("Failed to sequence identity event: {:?}", e); 242 } 243 } 244 let _ = state.cache.delete(&format!("handle:{}", user.handle)).await; 245 if state.did_resolver.refresh_did(did).await.is_none() { 246 warn!(did = %did, "Failed to refresh DID cache after PLC update"); 247 } 248 info!(did = %did, "PLC operation submitted successfully"); 249 (StatusCode::OK, Json(json!({}))).into_response() 250}