this repo has no description
1use crate::api::ApiError; 2use crate::plc::{signing_key_to_did_key, validate_plc_operation, PlcClient}; 3use crate::state::AppState; 4use axum::{ 5 extract::State, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8 Json, 9}; 10use k256::ecdsa::SigningKey; 11use serde::Deserialize; 12use serde_json::{json, Value}; 13use tracing::{error, info, warn}; 14 15#[derive(Debug, Deserialize)] 16pub struct SubmitPlcOperationInput { 17 pub operation: Value, 18} 19 20pub async fn submit_plc_operation( 21 State(state): State<AppState>, 22 headers: axum::http::HeaderMap, 23 Json(input): Json<SubmitPlcOperationInput>, 24) -> Response { 25 let bearer = match crate::auth::extract_bearer_token_from_header( 26 headers.get("Authorization").and_then(|h| h.to_str().ok()), 27 ) { 28 Some(t) => t, 29 None => return ApiError::AuthenticationRequired.into_response(), 30 }; 31 32 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await { 33 Ok(user) => user, 34 Err(e) => return ApiError::from(e).into_response(), 35 }; 36 37 let did = &auth_user.did; 38 39 if let Err(e) = validate_plc_operation(&input.operation) { 40 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response(); 41 } 42 43 let op = &input.operation; 44 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 45 let public_url = format!("https://{}", hostname); 46 47 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did) 48 .fetch_optional(&state.db) 49 .await 50 { 51 Ok(Some(row)) => row, 52 _ => { 53 return ( 54 StatusCode::NOT_FOUND, 55 Json(json!({"error": "AccountNotFound"})), 56 ) 57 .into_response(); 58 } 59 }; 60 61 let key_row = match sqlx::query!( 62 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 63 user.id 64 ) 65 .fetch_optional(&state.db) 66 .await 67 { 68 Ok(Some(row)) => row, 69 _ => { 70 return ( 71 StatusCode::INTERNAL_SERVER_ERROR, 72 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 73 ) 74 .into_response(); 75 } 76 }; 77 78 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 79 { 80 Ok(k) => k, 81 Err(e) => { 82 error!("Failed to decrypt user key: {}", e); 83 return ( 84 StatusCode::INTERNAL_SERVER_ERROR, 85 Json(json!({"error": "InternalError"})), 86 ) 87 .into_response(); 88 } 89 }; 90 91 let signing_key = match SigningKey::from_slice(&key_bytes) { 92 Ok(k) => k, 93 Err(e) => { 94 error!("Failed to create signing key: {:?}", e); 95 return ( 96 StatusCode::INTERNAL_SERVER_ERROR, 97 Json(json!({"error": "InternalError"})), 98 ) 99 .into_response(); 100 } 101 }; 102 103 let user_did_key = signing_key_to_did_key(&signing_key); 104 105 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) { 106 let server_rotation_key = 107 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone()); 108 109 let has_server_key = rotation_keys 110 .iter() 111 .any(|k| k.as_str() == Some(&server_rotation_key)); 112 113 if !has_server_key { 114 return ( 115 StatusCode::BAD_REQUEST, 116 Json(json!({ 117 "error": "InvalidRequest", 118 "message": "Rotation keys do not include server's rotation key" 119 })), 120 ) 121 .into_response(); 122 } 123 } 124 125 if let Some(services) = op.get("services").and_then(|v| v.as_object()) { 126 if let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) { 127 let service_type = pds.get("type").and_then(|v| v.as_str()); 128 let endpoint = pds.get("endpoint").and_then(|v| v.as_str()); 129 130 if service_type != Some("AtprotoPersonalDataServer") { 131 return ( 132 StatusCode::BAD_REQUEST, 133 Json(json!({ 134 "error": "InvalidRequest", 135 "message": "Incorrect type on atproto_pds service" 136 })), 137 ) 138 .into_response(); 139 } 140 141 if endpoint != Some(&public_url) { 142 return ( 143 StatusCode::BAD_REQUEST, 144 Json(json!({ 145 "error": "InvalidRequest", 146 "message": "Incorrect endpoint on atproto_pds service" 147 })), 148 ) 149 .into_response(); 150 } 151 } 152 } 153 154 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) { 155 if let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) { 156 if atproto_key != user_did_key { 157 return ( 158 StatusCode::BAD_REQUEST, 159 Json(json!({ 160 "error": "InvalidRequest", 161 "message": "Incorrect signing key in verificationMethods" 162 })), 163 ) 164 .into_response(); 165 } 166 } 167 } 168 169 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) { 170 let expected_handle = format!("at://{}", user.handle); 171 let first_aka = also_known_as.first().and_then(|v| v.as_str()); 172 173 if first_aka != Some(&expected_handle) { 174 return ( 175 StatusCode::BAD_REQUEST, 176 Json(json!({ 177 "error": "InvalidRequest", 178 "message": "Incorrect handle in alsoKnownAs" 179 })), 180 ) 181 .into_response(); 182 } 183 } 184 185 let plc_client = PlcClient::new(None); 186 if let Err(e) = plc_client.send_operation(did, &input.operation).await { 187 error!("Failed to submit PLC operation: {:?}", e); 188 return ( 189 StatusCode::BAD_GATEWAY, 190 Json(json!({ 191 "error": "UpstreamError", 192 "message": format!("Failed to submit to PLC directory: {}", e) 193 })), 194 ) 195 .into_response(); 196 } 197 198 if let Err(e) = sqlx::query!( 199 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity')", 200 did 201 ) 202 .execute(&state.db) 203 .await 204 { 205 warn!("Failed to sequence identity event: {:?}", e); 206 } 207 208 info!("Submitted PLC operation for user {}", did); 209 210 (StatusCode::OK, Json(json!({}))).into_response() 211}