this repo has no description
1use crate::api::ApiError; 2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker}; 3use crate::plc::{ 4 PlcClient, PlcError, PlcOpOrTombstone, PlcService, create_update_op, sign_operation, 5}; 6use crate::state::AppState; 7use axum::{ 8 Json, 9 extract::State, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use chrono::Utc; 14use k256::ecdsa::SigningKey; 15use serde::{Deserialize, Serialize}; 16use serde_json::{Value, json}; 17use std::collections::HashMap; 18use tracing::{error, info, warn}; 19 20#[derive(Debug, Deserialize)] 21#[serde(rename_all = "camelCase")] 22pub struct SignPlcOperationInput { 23 pub token: Option<String>, 24 pub rotation_keys: Option<Vec<String>>, 25 pub also_known_as: Option<Vec<String>>, 26 pub verification_methods: Option<HashMap<String, String>>, 27 pub services: Option<HashMap<String, ServiceInput>>, 28} 29 30#[derive(Debug, Deserialize, Clone)] 31pub struct ServiceInput { 32 #[serde(rename = "type")] 33 pub service_type: String, 34 pub endpoint: String, 35} 36 37#[derive(Debug, Serialize)] 38pub struct SignPlcOperationOutput { 39 pub operation: Value, 40} 41 42pub async fn sign_plc_operation( 43 State(state): State<AppState>, 44 headers: axum::http::HeaderMap, 45 Json(input): Json<SignPlcOperationInput>, 46) -> Response { 47 let bearer = match crate::auth::extract_bearer_token_from_header( 48 headers.get("Authorization").and_then(|h| h.to_str().ok()), 49 ) { 50 Some(t) => t, 51 None => return ApiError::AuthenticationRequired.into_response(), 52 }; 53 let auth_user = 54 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await { 55 Ok(user) => user, 56 Err(e) => return ApiError::from(e).into_response(), 57 }; 58 if let Err(e) = crate::auth::scope_check::check_identity_scope( 59 auth_user.is_oauth, 60 auth_user.scope.as_deref(), 61 crate::oauth::scopes::IdentityAttr::Wildcard, 62 ) { 63 return e; 64 } 65 let did = &auth_user.did; 66 if did.starts_with("did:web:") { 67 return ApiError::InvalidRequest( 68 "PLC operations are only valid for did:plc identities".into(), 69 ) 70 .into_response(); 71 } 72 let token = match &input.token { 73 Some(t) => t, 74 None => { 75 return ApiError::InvalidRequest( 76 "Email confirmation token required to sign PLC operations".into(), 77 ) 78 .into_response(); 79 } 80 }; 81 let user = match sqlx::query!("SELECT id FROM users WHERE did = $1", did) 82 .fetch_optional(&state.db) 83 .await 84 { 85 Ok(Some(row)) => row, 86 _ => { 87 return ( 88 StatusCode::NOT_FOUND, 89 Json(json!({"error": "AccountNotFound"})), 90 ) 91 .into_response(); 92 } 93 }; 94 let token_row = match sqlx::query!( 95 "SELECT id, expires_at FROM plc_operation_tokens WHERE user_id = $1 AND token = $2", 96 user.id, 97 token 98 ) 99 .fetch_optional(&state.db) 100 .await 101 { 102 Ok(Some(row)) => row, 103 Ok(None) => { 104 return ( 105 StatusCode::BAD_REQUEST, 106 Json(json!({ 107 "error": "InvalidToken", 108 "message": "Invalid or expired token" 109 })), 110 ) 111 .into_response(); 112 } 113 Err(e) => { 114 error!("DB error: {:?}", e); 115 return ( 116 StatusCode::INTERNAL_SERVER_ERROR, 117 Json(json!({"error": "InternalError"})), 118 ) 119 .into_response(); 120 } 121 }; 122 if Utc::now() > token_row.expires_at { 123 let _ = sqlx::query!( 124 "DELETE FROM plc_operation_tokens WHERE id = $1", 125 token_row.id 126 ) 127 .execute(&state.db) 128 .await; 129 return ( 130 StatusCode::BAD_REQUEST, 131 Json(json!({ 132 "error": "ExpiredToken", 133 "message": "Token has expired" 134 })), 135 ) 136 .into_response(); 137 } 138 let key_row = match sqlx::query!( 139 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 140 user.id 141 ) 142 .fetch_optional(&state.db) 143 .await 144 { 145 Ok(Some(row)) => row, 146 _ => { 147 return ( 148 StatusCode::INTERNAL_SERVER_ERROR, 149 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 150 ) 151 .into_response(); 152 } 153 }; 154 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 155 { 156 Ok(k) => k, 157 Err(e) => { 158 error!("Failed to decrypt user key: {}", e); 159 return ( 160 StatusCode::INTERNAL_SERVER_ERROR, 161 Json(json!({"error": "InternalError"})), 162 ) 163 .into_response(); 164 } 165 }; 166 let signing_key = match SigningKey::from_slice(&key_bytes) { 167 Ok(k) => k, 168 Err(e) => { 169 error!("Failed to create signing key: {:?}", e); 170 return ( 171 StatusCode::INTERNAL_SERVER_ERROR, 172 Json(json!({"error": "InternalError"})), 173 ) 174 .into_response(); 175 } 176 }; 177 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone())); 178 let did_clone = did.clone(); 179 let result: Result<PlcOpOrTombstone, CircuitBreakerError<PlcError>> = 180 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async { 181 plc_client.get_last_op(&did_clone).await 182 }) 183 .await; 184 let last_op = match result { 185 Ok(op) => op, 186 Err(CircuitBreakerError::CircuitOpen(e)) => { 187 warn!("PLC directory circuit breaker open: {}", e); 188 return ( 189 StatusCode::SERVICE_UNAVAILABLE, 190 Json(json!({ 191 "error": "ServiceUnavailable", 192 "message": "PLC directory service temporarily unavailable" 193 })), 194 ) 195 .into_response(); 196 } 197 Err(CircuitBreakerError::OperationFailed(PlcError::NotFound)) => { 198 return ( 199 StatusCode::NOT_FOUND, 200 Json(json!({ 201 "error": "NotFound", 202 "message": "DID not found in PLC directory" 203 })), 204 ) 205 .into_response(); 206 } 207 Err(CircuitBreakerError::OperationFailed(e)) => { 208 error!("Failed to fetch PLC operation: {:?}", e); 209 return ( 210 StatusCode::BAD_GATEWAY, 211 Json(json!({ 212 "error": "UpstreamError", 213 "message": "Failed to communicate with PLC directory" 214 })), 215 ) 216 .into_response(); 217 } 218 }; 219 if last_op.is_tombstone() { 220 return ( 221 StatusCode::BAD_REQUEST, 222 Json(json!({ 223 "error": "InvalidRequest", 224 "message": "DID is tombstoned" 225 })), 226 ) 227 .into_response(); 228 } 229 let services = input.services.map(|s| { 230 s.into_iter() 231 .map(|(k, v)| { 232 ( 233 k, 234 PlcService { 235 service_type: v.service_type, 236 endpoint: v.endpoint, 237 }, 238 ) 239 }) 240 .collect() 241 }); 242 let unsigned_op = match create_update_op( 243 &last_op, 244 input.rotation_keys, 245 input.verification_methods, 246 input.also_known_as, 247 services, 248 ) { 249 Ok(op) => op, 250 Err(PlcError::Tombstoned) => { 251 return ( 252 StatusCode::BAD_REQUEST, 253 Json(json!({ 254 "error": "InvalidRequest", 255 "message": "Cannot update tombstoned DID" 256 })), 257 ) 258 .into_response(); 259 } 260 Err(e) => { 261 error!("Failed to create PLC operation: {:?}", e); 262 return ( 263 StatusCode::INTERNAL_SERVER_ERROR, 264 Json(json!({"error": "InternalError"})), 265 ) 266 .into_response(); 267 } 268 }; 269 let signed_op = match sign_operation(&unsigned_op, &signing_key) { 270 Ok(op) => op, 271 Err(e) => { 272 error!("Failed to sign PLC operation: {:?}", e); 273 return ( 274 StatusCode::INTERNAL_SERVER_ERROR, 275 Json(json!({"error": "InternalError"})), 276 ) 277 .into_response(); 278 } 279 }; 280 let _ = sqlx::query!( 281 "DELETE FROM plc_operation_tokens WHERE id = $1", 282 token_row.id 283 ) 284 .execute(&state.db) 285 .await; 286 info!("Signed PLC operation for user {}", did); 287 ( 288 StatusCode::OK, 289 Json(SignPlcOperationOutput { 290 operation: signed_op, 291 }), 292 ) 293 .into_response() 294}