this repo has no description
1use crate::api::ApiError; 2use crate::circuit_breaker::{with_circuit_breaker, CircuitBreakerError}; 3use crate::plc::{ 4 create_update_op, sign_operation, PlcClient, PlcError, PlcOpOrTombstone, PlcService, 5}; 6use crate::state::AppState; 7use axum::{ 8 extract::State, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11 Json, 12}; 13use chrono::Utc; 14use k256::ecdsa::SigningKey; 15use serde::{Deserialize, Serialize}; 16use serde_json::{json, Value}; 17use std::collections::HashMap; 18use tracing::{error, info, warn}; 19#[derive(Debug, Deserialize)] 20#[serde(rename_all = "camelCase")] 21pub struct SignPlcOperationInput { 22 pub token: Option<String>, 23 pub rotation_keys: Option<Vec<String>>, 24 pub also_known_as: Option<Vec<String>>, 25 pub verification_methods: Option<HashMap<String, String>>, 26 pub services: Option<HashMap<String, ServiceInput>>, 27} 28#[derive(Debug, Deserialize, Clone)] 29pub struct ServiceInput { 30 #[serde(rename = "type")] 31 pub service_type: String, 32 pub endpoint: String, 33} 34#[derive(Debug, Serialize)] 35pub struct SignPlcOperationOutput { 36 pub operation: Value, 37} 38pub async fn sign_plc_operation( 39 State(state): State<AppState>, 40 headers: axum::http::HeaderMap, 41 Json(input): Json<SignPlcOperationInput>, 42) -> Response { 43 let bearer = match crate::auth::extract_bearer_token_from_header( 44 headers.get("Authorization").and_then(|h| h.to_str().ok()), 45 ) { 46 Some(t) => t, 47 None => return ApiError::AuthenticationRequired.into_response(), 48 }; 49 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await { 50 Ok(user) => user, 51 Err(e) => return ApiError::from(e).into_response(), 52 }; 53 let did = &auth_user.did; 54 let token = match &input.token { 55 Some(t) => t, 56 None => { 57 return ApiError::InvalidRequest( 58 "Email confirmation token required to sign PLC operations".into() 59 ).into_response(); 60 } 61 }; 62 let user = match sqlx::query!("SELECT id FROM users WHERE did = $1", did) 63 .fetch_optional(&state.db) 64 .await 65 { 66 Ok(Some(row)) => row, 67 _ => { 68 return ( 69 StatusCode::NOT_FOUND, 70 Json(json!({"error": "AccountNotFound"})), 71 ) 72 .into_response(); 73 } 74 }; 75 let token_row = match sqlx::query!( 76 "SELECT id, expires_at FROM plc_operation_tokens WHERE user_id = $1 AND token = $2", 77 user.id, 78 token 79 ) 80 .fetch_optional(&state.db) 81 .await 82 { 83 Ok(Some(row)) => row, 84 Ok(None) => { 85 return ( 86 StatusCode::BAD_REQUEST, 87 Json(json!({ 88 "error": "InvalidToken", 89 "message": "Invalid or expired token" 90 })), 91 ) 92 .into_response(); 93 } 94 Err(e) => { 95 error!("DB error: {:?}", e); 96 return ( 97 StatusCode::INTERNAL_SERVER_ERROR, 98 Json(json!({"error": "InternalError"})), 99 ) 100 .into_response(); 101 } 102 }; 103 if Utc::now() > token_row.expires_at { 104 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id) 105 .execute(&state.db) 106 .await; 107 return ( 108 StatusCode::BAD_REQUEST, 109 Json(json!({ 110 "error": "ExpiredToken", 111 "message": "Token has expired" 112 })), 113 ) 114 .into_response(); 115 } 116 let key_row = match sqlx::query!( 117 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 118 user.id 119 ) 120 .fetch_optional(&state.db) 121 .await 122 { 123 Ok(Some(row)) => row, 124 _ => { 125 return ( 126 StatusCode::INTERNAL_SERVER_ERROR, 127 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 128 ) 129 .into_response(); 130 } 131 }; 132 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 133 { 134 Ok(k) => k, 135 Err(e) => { 136 error!("Failed to decrypt user key: {}", e); 137 return ( 138 StatusCode::INTERNAL_SERVER_ERROR, 139 Json(json!({"error": "InternalError"})), 140 ) 141 .into_response(); 142 } 143 }; 144 let signing_key = match SigningKey::from_slice(&key_bytes) { 145 Ok(k) => k, 146 Err(e) => { 147 error!("Failed to create signing key: {:?}", e); 148 return ( 149 StatusCode::INTERNAL_SERVER_ERROR, 150 Json(json!({"error": "InternalError"})), 151 ) 152 .into_response(); 153 } 154 }; 155 let plc_client = PlcClient::new(None); 156 let did_clone = did.clone(); 157 let result: Result<PlcOpOrTombstone, CircuitBreakerError<PlcError>> = with_circuit_breaker( 158 &state.circuit_breakers.plc_directory, 159 || async { plc_client.get_last_op(&did_clone).await }, 160 ) 161 .await; 162 let last_op = match result { 163 Ok(op) => op, 164 Err(CircuitBreakerError::CircuitOpen(e)) => { 165 warn!("PLC directory circuit breaker open: {}", e); 166 return ( 167 StatusCode::SERVICE_UNAVAILABLE, 168 Json(json!({ 169 "error": "ServiceUnavailable", 170 "message": "PLC directory service temporarily unavailable" 171 })), 172 ) 173 .into_response(); 174 } 175 Err(CircuitBreakerError::OperationFailed(PlcError::NotFound)) => { 176 return ( 177 StatusCode::NOT_FOUND, 178 Json(json!({ 179 "error": "NotFound", 180 "message": "DID not found in PLC directory" 181 })), 182 ) 183 .into_response(); 184 } 185 Err(CircuitBreakerError::OperationFailed(e)) => { 186 error!("Failed to fetch PLC operation: {:?}", e); 187 return ( 188 StatusCode::BAD_GATEWAY, 189 Json(json!({ 190 "error": "UpstreamError", 191 "message": "Failed to communicate with PLC directory" 192 })), 193 ) 194 .into_response(); 195 } 196 }; 197 if last_op.is_tombstone() { 198 return ( 199 StatusCode::BAD_REQUEST, 200 Json(json!({ 201 "error": "InvalidRequest", 202 "message": "DID is tombstoned" 203 })), 204 ) 205 .into_response(); 206 } 207 let services = input.services.map(|s| { 208 s.into_iter() 209 .map(|(k, v)| { 210 ( 211 k, 212 PlcService { 213 service_type: v.service_type, 214 endpoint: v.endpoint, 215 }, 216 ) 217 }) 218 .collect() 219 }); 220 let unsigned_op = match create_update_op( 221 &last_op, 222 input.rotation_keys, 223 input.verification_methods, 224 input.also_known_as, 225 services, 226 ) { 227 Ok(op) => op, 228 Err(PlcError::Tombstoned) => { 229 return ( 230 StatusCode::BAD_REQUEST, 231 Json(json!({ 232 "error": "InvalidRequest", 233 "message": "Cannot update tombstoned DID" 234 })), 235 ) 236 .into_response(); 237 } 238 Err(e) => { 239 error!("Failed to create PLC operation: {:?}", e); 240 return ( 241 StatusCode::INTERNAL_SERVER_ERROR, 242 Json(json!({"error": "InternalError"})), 243 ) 244 .into_response(); 245 } 246 }; 247 let signed_op = match sign_operation(&unsigned_op, &signing_key) { 248 Ok(op) => op, 249 Err(e) => { 250 error!("Failed to sign PLC operation: {:?}", e); 251 return ( 252 StatusCode::INTERNAL_SERVER_ERROR, 253 Json(json!({"error": "InternalError"})), 254 ) 255 .into_response(); 256 } 257 }; 258 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id) 259 .execute(&state.db) 260 .await; 261 info!("Signed PLC operation for user {}", did); 262 ( 263 StatusCode::OK, 264 Json(SignPlcOperationOutput { 265 operation: signed_op, 266 }), 267 ) 268 .into_response() 269}