this repo has no description
at main 6.2 kB view raw
1use crate::api::{ApiError, EmptyResponse}; 2use crate::auth::BearerAuthAllowDeactivated; 3use crate::circuit_breaker::with_circuit_breaker; 4use crate::plc::{PlcClient, signing_key_to_did_key, validate_plc_operation}; 5use crate::state::AppState; 6use axum::{ 7 Json, 8 extract::State, 9 response::{IntoResponse, Response}, 10}; 11use k256::ecdsa::SigningKey; 12use serde::Deserialize; 13use serde_json::Value; 14use tracing::{error, info, warn}; 15 16#[derive(Debug, Deserialize)] 17pub struct SubmitPlcOperationInput { 18 pub operation: Value, 19} 20 21pub async fn submit_plc_operation( 22 State(state): State<AppState>, 23 auth: BearerAuthAllowDeactivated, 24 Json(input): Json<SubmitPlcOperationInput>, 25) -> Response { 26 let auth_user = auth.0; 27 if let Err(e) = crate::auth::scope_check::check_identity_scope( 28 auth_user.is_oauth, 29 auth_user.scope.as_deref(), 30 crate::oauth::scopes::IdentityAttr::Wildcard, 31 ) { 32 return e; 33 } 34 let did = &auth_user.did; 35 if did.starts_with("did:web:") { 36 return ApiError::InvalidRequest( 37 "PLC operations are only valid for did:plc identities".into(), 38 ) 39 .into_response(); 40 } 41 if let Err(e) = validate_plc_operation(&input.operation) { 42 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response(); 43 } 44 let op = &input.operation; 45 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 46 let public_url = format!("https://{}", hostname); 47 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did) 48 .fetch_optional(&state.db) 49 .await 50 { 51 Ok(Some(row)) => row, 52 _ => { 53 return ApiError::AccountNotFound.into_response(); 54 } 55 }; 56 let key_row = match sqlx::query!( 57 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 58 user.id 59 ) 60 .fetch_optional(&state.db) 61 .await 62 { 63 Ok(Some(row)) => row, 64 _ => { 65 return ApiError::InternalError(Some("User signing key not found".into())) 66 .into_response(); 67 } 68 }; 69 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 70 { 71 Ok(k) => k, 72 Err(e) => { 73 error!("Failed to decrypt user key: {}", e); 74 return ApiError::InternalError(None).into_response(); 75 } 76 }; 77 let signing_key = match SigningKey::from_slice(&key_bytes) { 78 Ok(k) => k, 79 Err(e) => { 80 error!("Failed to create signing key: {:?}", e); 81 return ApiError::InternalError(None).into_response(); 82 } 83 }; 84 let user_did_key = signing_key_to_did_key(&signing_key); 85 let server_rotation_key = 86 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone()); 87 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) { 88 let has_server_key = rotation_keys 89 .iter() 90 .any(|k| k.as_str() == Some(&server_rotation_key)); 91 if !has_server_key { 92 return ApiError::InvalidRequest( 93 "Rotation keys do not include server's rotation key".into(), 94 ) 95 .into_response(); 96 } 97 } 98 if let Some(services) = op.get("services").and_then(|v| v.as_object()) 99 && let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) 100 { 101 let service_type = pds.get("type").and_then(|v| v.as_str()); 102 let endpoint = pds.get("endpoint").and_then(|v| v.as_str()); 103 if service_type != Some("AtprotoPersonalDataServer") { 104 return ApiError::InvalidRequest("Incorrect type on atproto_pds service".into()) 105 .into_response(); 106 } 107 if endpoint != Some(&public_url) { 108 return ApiError::InvalidRequest("Incorrect endpoint on atproto_pds service".into()) 109 .into_response(); 110 } 111 } 112 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) 113 && let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) 114 && atproto_key != user_did_key 115 { 116 return ApiError::InvalidRequest("Incorrect signing key in verificationMethods".into()) 117 .into_response(); 118 } 119 if let Some(also_known_as) = (!user.handle.is_empty()) 120 .then(|| op.get("alsoKnownAs").and_then(|v| v.as_array())) 121 .flatten() 122 { 123 let expected_handle = format!("at://{}", user.handle); 124 let first_aka = also_known_as.first().and_then(|v| v.as_str()); 125 if first_aka != Some(&expected_handle) { 126 return ApiError::InvalidRequest("Incorrect handle in alsoKnownAs".into()) 127 .into_response(); 128 } 129 } 130 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone())); 131 let operation_clone = input.operation.clone(); 132 let did_clone = did.clone(); 133 if let Err(e) = with_circuit_breaker(&state.circuit_breakers.plc_directory, || async { 134 plc_client 135 .send_operation(&did_clone, &operation_clone) 136 .await 137 }) 138 .await 139 { 140 return ApiError::from(e).into_response(); 141 } 142 match sqlx::query!( 143 "INSERT INTO repo_seq (did, event_type, handle) VALUES ($1, 'identity', $2) RETURNING seq", 144 did, 145 user.handle 146 ) 147 .fetch_one(&state.db) 148 .await 149 { 150 Ok(row) => { 151 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq)) 152 .execute(&state.db) 153 .await 154 { 155 warn!("Failed to notify identity event: {:?}", e); 156 } 157 } 158 Err(e) => { 159 warn!("Failed to sequence identity event: {:?}", e); 160 } 161 } 162 let _ = state.cache.delete(&format!("handle:{}", user.handle)).await; 163 if state.did_resolver.refresh_did(did).await.is_none() { 164 warn!(did = %did, "Failed to refresh DID cache after PLC update"); 165 } 166 info!(did = %did, "PLC operation submitted successfully"); 167 EmptyResponse::ok().into_response() 168}