this repo has no description
1use crate::api::ApiError; 2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker}; 3use crate::plc::{ 4 PlcClient, PlcError, PlcOpOrTombstone, PlcService, create_update_op, sign_operation, 5}; 6use crate::state::AppState; 7use axum::{ 8 Json, 9 extract::State, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use chrono::Utc; 14use k256::ecdsa::SigningKey; 15use serde::{Deserialize, Serialize}; 16use serde_json::{Value, json}; 17use std::collections::HashMap; 18use tracing::{error, info, warn}; 19 20#[derive(Debug, Deserialize)] 21#[serde(rename_all = "camelCase")] 22pub struct SignPlcOperationInput { 23 pub token: Option<String>, 24 pub rotation_keys: Option<Vec<String>>, 25 pub also_known_as: Option<Vec<String>>, 26 pub verification_methods: Option<HashMap<String, String>>, 27 pub services: Option<HashMap<String, ServiceInput>>, 28} 29 30#[derive(Debug, Deserialize, Clone)] 31pub struct ServiceInput { 32 #[serde(rename = "type")] 33 pub service_type: String, 34 pub endpoint: String, 35} 36 37#[derive(Debug, Serialize)] 38pub struct SignPlcOperationOutput { 39 pub operation: Value, 40} 41 42pub async fn sign_plc_operation( 43 State(state): State<AppState>, 44 headers: axum::http::HeaderMap, 45 Json(input): Json<SignPlcOperationInput>, 46) -> Response { 47 let bearer = match crate::auth::extract_bearer_token_from_header( 48 headers.get("Authorization").and_then(|h| h.to_str().ok()), 49 ) { 50 Some(t) => t, 51 None => return ApiError::AuthenticationRequired.into_response(), 52 }; 53 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await { 54 Ok(user) => user, 55 Err(e) => return ApiError::from(e).into_response(), 56 }; 57 let did = &auth_user.did; 58 let token = match &input.token { 59 Some(t) => t, 60 None => { 61 return ApiError::InvalidRequest( 62 "Email confirmation token required to sign PLC operations".into(), 63 ) 64 .into_response(); 65 } 66 }; 67 let user = match sqlx::query!("SELECT id FROM users WHERE did = $1", did) 68 .fetch_optional(&state.db) 69 .await 70 { 71 Ok(Some(row)) => row, 72 _ => { 73 return ( 74 StatusCode::NOT_FOUND, 75 Json(json!({"error": "AccountNotFound"})), 76 ) 77 .into_response(); 78 } 79 }; 80 let token_row = match sqlx::query!( 81 "SELECT id, expires_at FROM plc_operation_tokens WHERE user_id = $1 AND token = $2", 82 user.id, 83 token 84 ) 85 .fetch_optional(&state.db) 86 .await 87 { 88 Ok(Some(row)) => row, 89 Ok(None) => { 90 return ( 91 StatusCode::BAD_REQUEST, 92 Json(json!({ 93 "error": "InvalidToken", 94 "message": "Invalid or expired token" 95 })), 96 ) 97 .into_response(); 98 } 99 Err(e) => { 100 error!("DB error: {:?}", e); 101 return ( 102 StatusCode::INTERNAL_SERVER_ERROR, 103 Json(json!({"error": "InternalError"})), 104 ) 105 .into_response(); 106 } 107 }; 108 if Utc::now() > token_row.expires_at { 109 let _ = sqlx::query!( 110 "DELETE FROM plc_operation_tokens WHERE id = $1", 111 token_row.id 112 ) 113 .execute(&state.db) 114 .await; 115 return ( 116 StatusCode::BAD_REQUEST, 117 Json(json!({ 118 "error": "ExpiredToken", 119 "message": "Token has expired" 120 })), 121 ) 122 .into_response(); 123 } 124 let key_row = match sqlx::query!( 125 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 126 user.id 127 ) 128 .fetch_optional(&state.db) 129 .await 130 { 131 Ok(Some(row)) => row, 132 _ => { 133 return ( 134 StatusCode::INTERNAL_SERVER_ERROR, 135 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 136 ) 137 .into_response(); 138 } 139 }; 140 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 141 { 142 Ok(k) => k, 143 Err(e) => { 144 error!("Failed to decrypt user key: {}", e); 145 return ( 146 StatusCode::INTERNAL_SERVER_ERROR, 147 Json(json!({"error": "InternalError"})), 148 ) 149 .into_response(); 150 } 151 }; 152 let signing_key = match SigningKey::from_slice(&key_bytes) { 153 Ok(k) => k, 154 Err(e) => { 155 error!("Failed to create signing key: {:?}", e); 156 return ( 157 StatusCode::INTERNAL_SERVER_ERROR, 158 Json(json!({"error": "InternalError"})), 159 ) 160 .into_response(); 161 } 162 }; 163 let plc_client = PlcClient::new(None); 164 let did_clone = did.clone(); 165 let result: Result<PlcOpOrTombstone, CircuitBreakerError<PlcError>> = 166 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async { 167 plc_client.get_last_op(&did_clone).await 168 }) 169 .await; 170 let last_op = match result { 171 Ok(op) => op, 172 Err(CircuitBreakerError::CircuitOpen(e)) => { 173 warn!("PLC directory circuit breaker open: {}", e); 174 return ( 175 StatusCode::SERVICE_UNAVAILABLE, 176 Json(json!({ 177 "error": "ServiceUnavailable", 178 "message": "PLC directory service temporarily unavailable" 179 })), 180 ) 181 .into_response(); 182 } 183 Err(CircuitBreakerError::OperationFailed(PlcError::NotFound)) => { 184 return ( 185 StatusCode::NOT_FOUND, 186 Json(json!({ 187 "error": "NotFound", 188 "message": "DID not found in PLC directory" 189 })), 190 ) 191 .into_response(); 192 } 193 Err(CircuitBreakerError::OperationFailed(e)) => { 194 error!("Failed to fetch PLC operation: {:?}", e); 195 return ( 196 StatusCode::BAD_GATEWAY, 197 Json(json!({ 198 "error": "UpstreamError", 199 "message": "Failed to communicate with PLC directory" 200 })), 201 ) 202 .into_response(); 203 } 204 }; 205 if last_op.is_tombstone() { 206 return ( 207 StatusCode::BAD_REQUEST, 208 Json(json!({ 209 "error": "InvalidRequest", 210 "message": "DID is tombstoned" 211 })), 212 ) 213 .into_response(); 214 } 215 let services = input.services.map(|s| { 216 s.into_iter() 217 .map(|(k, v)| { 218 ( 219 k, 220 PlcService { 221 service_type: v.service_type, 222 endpoint: v.endpoint, 223 }, 224 ) 225 }) 226 .collect() 227 }); 228 let unsigned_op = match create_update_op( 229 &last_op, 230 input.rotation_keys, 231 input.verification_methods, 232 input.also_known_as, 233 services, 234 ) { 235 Ok(op) => op, 236 Err(PlcError::Tombstoned) => { 237 return ( 238 StatusCode::BAD_REQUEST, 239 Json(json!({ 240 "error": "InvalidRequest", 241 "message": "Cannot update tombstoned DID" 242 })), 243 ) 244 .into_response(); 245 } 246 Err(e) => { 247 error!("Failed to create PLC operation: {:?}", e); 248 return ( 249 StatusCode::INTERNAL_SERVER_ERROR, 250 Json(json!({"error": "InternalError"})), 251 ) 252 .into_response(); 253 } 254 }; 255 let signed_op = match sign_operation(&unsigned_op, &signing_key) { 256 Ok(op) => op, 257 Err(e) => { 258 error!("Failed to sign PLC operation: {:?}", e); 259 return ( 260 StatusCode::INTERNAL_SERVER_ERROR, 261 Json(json!({"error": "InternalError"})), 262 ) 263 .into_response(); 264 } 265 }; 266 let _ = sqlx::query!( 267 "DELETE FROM plc_operation_tokens WHERE id = $1", 268 token_row.id 269 ) 270 .execute(&state.db) 271 .await; 272 info!("Signed PLC operation for user {}", did); 273 ( 274 StatusCode::OK, 275 Json(SignPlcOperationOutput { 276 operation: signed_op, 277 }), 278 ) 279 .into_response() 280}