this repo has no description
1use crate::api::ApiError; 2use crate::circuit_breaker::{with_circuit_breaker, CircuitBreakerError}; 3use crate::plc::{ 4 create_update_op, sign_operation, PlcClient, PlcError, PlcOpOrTombstone, PlcService, 5}; 6use crate::state::AppState; 7use axum::{ 8 extract::State, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11 Json, 12}; 13use chrono::Utc; 14use k256::ecdsa::SigningKey; 15use serde::{Deserialize, Serialize}; 16use serde_json::{json, Value}; 17use std::collections::HashMap; 18use tracing::{error, info, warn}; 19 20#[derive(Debug, Deserialize)] 21#[serde(rename_all = "camelCase")] 22pub struct SignPlcOperationInput { 23 pub token: Option<String>, 24 pub rotation_keys: Option<Vec<String>>, 25 pub also_known_as: Option<Vec<String>>, 26 pub verification_methods: Option<HashMap<String, String>>, 27 pub services: Option<HashMap<String, ServiceInput>>, 28} 29 30#[derive(Debug, Deserialize, Clone)] 31pub struct ServiceInput { 32 #[serde(rename = "type")] 33 pub service_type: String, 34 pub endpoint: String, 35} 36 37#[derive(Debug, Serialize)] 38pub struct SignPlcOperationOutput { 39 pub operation: Value, 40} 41 42pub async fn sign_plc_operation( 43 State(state): State<AppState>, 44 headers: axum::http::HeaderMap, 45 Json(input): Json<SignPlcOperationInput>, 46) -> Response { 47 let bearer = match crate::auth::extract_bearer_token_from_header( 48 headers.get("Authorization").and_then(|h| h.to_str().ok()), 49 ) { 50 Some(t) => t, 51 None => return ApiError::AuthenticationRequired.into_response(), 52 }; 53 54 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await { 55 Ok(user) => user, 56 Err(e) => return ApiError::from(e).into_response(), 57 }; 58 59 let did = &auth_user.did; 60 61 let token = match &input.token { 62 Some(t) => t, 63 None => { 64 return ApiError::InvalidRequest( 65 "Email confirmation token required to sign PLC operations".into() 66 ).into_response(); 67 } 68 }; 69 70 let user = match sqlx::query!("SELECT id FROM users WHERE did = $1", did) 71 .fetch_optional(&state.db) 72 .await 73 { 74 Ok(Some(row)) => row, 75 _ => { 76 return ( 77 StatusCode::NOT_FOUND, 78 Json(json!({"error": "AccountNotFound"})), 79 ) 80 .into_response(); 81 } 82 }; 83 84 let token_row = match sqlx::query!( 85 "SELECT id, expires_at FROM plc_operation_tokens WHERE user_id = $1 AND token = $2", 86 user.id, 87 token 88 ) 89 .fetch_optional(&state.db) 90 .await 91 { 92 Ok(Some(row)) => row, 93 Ok(None) => { 94 return ( 95 StatusCode::BAD_REQUEST, 96 Json(json!({ 97 "error": "InvalidToken", 98 "message": "Invalid or expired token" 99 })), 100 ) 101 .into_response(); 102 } 103 Err(e) => { 104 error!("DB error: {:?}", e); 105 return ( 106 StatusCode::INTERNAL_SERVER_ERROR, 107 Json(json!({"error": "InternalError"})), 108 ) 109 .into_response(); 110 } 111 }; 112 113 if Utc::now() > token_row.expires_at { 114 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id) 115 .execute(&state.db) 116 .await; 117 return ( 118 StatusCode::BAD_REQUEST, 119 Json(json!({ 120 "error": "ExpiredToken", 121 "message": "Token has expired" 122 })), 123 ) 124 .into_response(); 125 } 126 127 let key_row = match sqlx::query!( 128 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 129 user.id 130 ) 131 .fetch_optional(&state.db) 132 .await 133 { 134 Ok(Some(row)) => row, 135 _ => { 136 return ( 137 StatusCode::INTERNAL_SERVER_ERROR, 138 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 139 ) 140 .into_response(); 141 } 142 }; 143 144 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 145 { 146 Ok(k) => k, 147 Err(e) => { 148 error!("Failed to decrypt user key: {}", e); 149 return ( 150 StatusCode::INTERNAL_SERVER_ERROR, 151 Json(json!({"error": "InternalError"})), 152 ) 153 .into_response(); 154 } 155 }; 156 157 let signing_key = match SigningKey::from_slice(&key_bytes) { 158 Ok(k) => k, 159 Err(e) => { 160 error!("Failed to create signing key: {:?}", e); 161 return ( 162 StatusCode::INTERNAL_SERVER_ERROR, 163 Json(json!({"error": "InternalError"})), 164 ) 165 .into_response(); 166 } 167 }; 168 169 let plc_client = PlcClient::new(None); 170 let did_clone = did.clone(); 171 let result: Result<PlcOpOrTombstone, CircuitBreakerError<PlcError>> = with_circuit_breaker( 172 &state.circuit_breakers.plc_directory, 173 || async { plc_client.get_last_op(&did_clone).await }, 174 ) 175 .await; 176 177 let last_op = match result { 178 Ok(op) => op, 179 Err(CircuitBreakerError::CircuitOpen(e)) => { 180 warn!("PLC directory circuit breaker open: {}", e); 181 return ( 182 StatusCode::SERVICE_UNAVAILABLE, 183 Json(json!({ 184 "error": "ServiceUnavailable", 185 "message": "PLC directory service temporarily unavailable" 186 })), 187 ) 188 .into_response(); 189 } 190 Err(CircuitBreakerError::OperationFailed(PlcError::NotFound)) => { 191 return ( 192 StatusCode::NOT_FOUND, 193 Json(json!({ 194 "error": "NotFound", 195 "message": "DID not found in PLC directory" 196 })), 197 ) 198 .into_response(); 199 } 200 Err(CircuitBreakerError::OperationFailed(e)) => { 201 error!("Failed to fetch PLC operation: {:?}", e); 202 return ( 203 StatusCode::BAD_GATEWAY, 204 Json(json!({ 205 "error": "UpstreamError", 206 "message": "Failed to communicate with PLC directory" 207 })), 208 ) 209 .into_response(); 210 } 211 }; 212 213 if last_op.is_tombstone() { 214 return ( 215 StatusCode::BAD_REQUEST, 216 Json(json!({ 217 "error": "InvalidRequest", 218 "message": "DID is tombstoned" 219 })), 220 ) 221 .into_response(); 222 } 223 224 let services = input.services.map(|s| { 225 s.into_iter() 226 .map(|(k, v)| { 227 ( 228 k, 229 PlcService { 230 service_type: v.service_type, 231 endpoint: v.endpoint, 232 }, 233 ) 234 }) 235 .collect() 236 }); 237 238 let unsigned_op = match create_update_op( 239 &last_op, 240 input.rotation_keys, 241 input.verification_methods, 242 input.also_known_as, 243 services, 244 ) { 245 Ok(op) => op, 246 Err(PlcError::Tombstoned) => { 247 return ( 248 StatusCode::BAD_REQUEST, 249 Json(json!({ 250 "error": "InvalidRequest", 251 "message": "Cannot update tombstoned DID" 252 })), 253 ) 254 .into_response(); 255 } 256 Err(e) => { 257 error!("Failed to create PLC operation: {:?}", e); 258 return ( 259 StatusCode::INTERNAL_SERVER_ERROR, 260 Json(json!({"error": "InternalError"})), 261 ) 262 .into_response(); 263 } 264 }; 265 266 let signed_op = match sign_operation(&unsigned_op, &signing_key) { 267 Ok(op) => op, 268 Err(e) => { 269 error!("Failed to sign PLC operation: {:?}", e); 270 return ( 271 StatusCode::INTERNAL_SERVER_ERROR, 272 Json(json!({"error": "InternalError"})), 273 ) 274 .into_response(); 275 } 276 }; 277 278 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id) 279 .execute(&state.db) 280 .await; 281 282 info!("Signed PLC operation for user {}", did); 283 284 ( 285 StatusCode::OK, 286 Json(SignPlcOperationOutput { 287 operation: signed_op, 288 }), 289 ) 290 .into_response() 291}