this repo has no description
1use crate::api::ApiError; 2use crate::circuit_breaker::{with_circuit_breaker, CircuitBreakerError}; 3use crate::plc::{ 4 create_update_op, sign_operation, PlcClient, PlcError, PlcOpOrTombstone, PlcService, 5}; 6use crate::state::AppState; 7use axum::{ 8 extract::State, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11 Json, 12}; 13use chrono::Utc; 14use k256::ecdsa::SigningKey; 15use serde::{Deserialize, Serialize}; 16use serde_json::{json, Value}; 17use std::collections::HashMap; 18use tracing::{error, info, warn}; 19 20#[derive(Debug, Deserialize)] 21#[serde(rename_all = "camelCase")] 22pub struct SignPlcOperationInput { 23 pub token: Option<String>, 24 pub rotation_keys: Option<Vec<String>>, 25 pub also_known_as: Option<Vec<String>>, 26 pub verification_methods: Option<HashMap<String, String>>, 27 pub services: Option<HashMap<String, ServiceInput>>, 28} 29 30#[derive(Debug, Deserialize, Clone)] 31pub struct ServiceInput { 32 #[serde(rename = "type")] 33 pub service_type: String, 34 pub endpoint: String, 35} 36 37#[derive(Debug, Serialize)] 38pub struct SignPlcOperationOutput { 39 pub operation: Value, 40} 41 42pub async fn sign_plc_operation( 43 State(state): State<AppState>, 44 headers: axum::http::HeaderMap, 45 Json(input): Json<SignPlcOperationInput>, 46) -> Response { 47 let bearer = match crate::auth::extract_bearer_token_from_header( 48 headers.get("Authorization").and_then(|h| h.to_str().ok()), 49 ) { 50 Some(t) => t, 51 None => return ApiError::AuthenticationRequired.into_response(), 52 }; 53 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await { 54 Ok(user) => user, 55 Err(e) => return ApiError::from(e).into_response(), 56 }; 57 let did = &auth_user.did; 58 let token = match &input.token { 59 Some(t) => t, 60 None => { 61 return ApiError::InvalidRequest( 62 "Email confirmation token required to sign PLC operations".into() 63 ).into_response(); 64 } 65 }; 66 let user = match sqlx::query!("SELECT id FROM users WHERE did = $1", did) 67 .fetch_optional(&state.db) 68 .await 69 { 70 Ok(Some(row)) => row, 71 _ => { 72 return ( 73 StatusCode::NOT_FOUND, 74 Json(json!({"error": "AccountNotFound"})), 75 ) 76 .into_response(); 77 } 78 }; 79 let token_row = match sqlx::query!( 80 "SELECT id, expires_at FROM plc_operation_tokens WHERE user_id = $1 AND token = $2", 81 user.id, 82 token 83 ) 84 .fetch_optional(&state.db) 85 .await 86 { 87 Ok(Some(row)) => row, 88 Ok(None) => { 89 return ( 90 StatusCode::BAD_REQUEST, 91 Json(json!({ 92 "error": "InvalidToken", 93 "message": "Invalid or expired token" 94 })), 95 ) 96 .into_response(); 97 } 98 Err(e) => { 99 error!("DB error: {:?}", e); 100 return ( 101 StatusCode::INTERNAL_SERVER_ERROR, 102 Json(json!({"error": "InternalError"})), 103 ) 104 .into_response(); 105 } 106 }; 107 if Utc::now() > token_row.expires_at { 108 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id) 109 .execute(&state.db) 110 .await; 111 return ( 112 StatusCode::BAD_REQUEST, 113 Json(json!({ 114 "error": "ExpiredToken", 115 "message": "Token has expired" 116 })), 117 ) 118 .into_response(); 119 } 120 let key_row = match sqlx::query!( 121 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 122 user.id 123 ) 124 .fetch_optional(&state.db) 125 .await 126 { 127 Ok(Some(row)) => row, 128 _ => { 129 return ( 130 StatusCode::INTERNAL_SERVER_ERROR, 131 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 132 ) 133 .into_response(); 134 } 135 }; 136 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 137 { 138 Ok(k) => k, 139 Err(e) => { 140 error!("Failed to decrypt user key: {}", e); 141 return ( 142 StatusCode::INTERNAL_SERVER_ERROR, 143 Json(json!({"error": "InternalError"})), 144 ) 145 .into_response(); 146 } 147 }; 148 let signing_key = match SigningKey::from_slice(&key_bytes) { 149 Ok(k) => k, 150 Err(e) => { 151 error!("Failed to create signing key: {:?}", e); 152 return ( 153 StatusCode::INTERNAL_SERVER_ERROR, 154 Json(json!({"error": "InternalError"})), 155 ) 156 .into_response(); 157 } 158 }; 159 let plc_client = PlcClient::new(None); 160 let did_clone = did.clone(); 161 let result: Result<PlcOpOrTombstone, CircuitBreakerError<PlcError>> = with_circuit_breaker( 162 &state.circuit_breakers.plc_directory, 163 || async { plc_client.get_last_op(&did_clone).await }, 164 ) 165 .await; 166 let last_op = match result { 167 Ok(op) => op, 168 Err(CircuitBreakerError::CircuitOpen(e)) => { 169 warn!("PLC directory circuit breaker open: {}", e); 170 return ( 171 StatusCode::SERVICE_UNAVAILABLE, 172 Json(json!({ 173 "error": "ServiceUnavailable", 174 "message": "PLC directory service temporarily unavailable" 175 })), 176 ) 177 .into_response(); 178 } 179 Err(CircuitBreakerError::OperationFailed(PlcError::NotFound)) => { 180 return ( 181 StatusCode::NOT_FOUND, 182 Json(json!({ 183 "error": "NotFound", 184 "message": "DID not found in PLC directory" 185 })), 186 ) 187 .into_response(); 188 } 189 Err(CircuitBreakerError::OperationFailed(e)) => { 190 error!("Failed to fetch PLC operation: {:?}", e); 191 return ( 192 StatusCode::BAD_GATEWAY, 193 Json(json!({ 194 "error": "UpstreamError", 195 "message": "Failed to communicate with PLC directory" 196 })), 197 ) 198 .into_response(); 199 } 200 }; 201 if last_op.is_tombstone() { 202 return ( 203 StatusCode::BAD_REQUEST, 204 Json(json!({ 205 "error": "InvalidRequest", 206 "message": "DID is tombstoned" 207 })), 208 ) 209 .into_response(); 210 } 211 let services = input.services.map(|s| { 212 s.into_iter() 213 .map(|(k, v)| { 214 ( 215 k, 216 PlcService { 217 service_type: v.service_type, 218 endpoint: v.endpoint, 219 }, 220 ) 221 }) 222 .collect() 223 }); 224 let unsigned_op = match create_update_op( 225 &last_op, 226 input.rotation_keys, 227 input.verification_methods, 228 input.also_known_as, 229 services, 230 ) { 231 Ok(op) => op, 232 Err(PlcError::Tombstoned) => { 233 return ( 234 StatusCode::BAD_REQUEST, 235 Json(json!({ 236 "error": "InvalidRequest", 237 "message": "Cannot update tombstoned DID" 238 })), 239 ) 240 .into_response(); 241 } 242 Err(e) => { 243 error!("Failed to create PLC operation: {:?}", e); 244 return ( 245 StatusCode::INTERNAL_SERVER_ERROR, 246 Json(json!({"error": "InternalError"})), 247 ) 248 .into_response(); 249 } 250 }; 251 let signed_op = match sign_operation(&unsigned_op, &signing_key) { 252 Ok(op) => op, 253 Err(e) => { 254 error!("Failed to sign PLC operation: {:?}", e); 255 return ( 256 StatusCode::INTERNAL_SERVER_ERROR, 257 Json(json!({"error": "InternalError"})), 258 ) 259 .into_response(); 260 } 261 }; 262 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id) 263 .execute(&state.db) 264 .await; 265 info!("Signed PLC operation for user {}", did); 266 ( 267 StatusCode::OK, 268 Json(SignPlcOperationOutput { 269 operation: signed_op, 270 }), 271 ) 272 .into_response() 273}