this repo has no description
1use crate::api::ApiError;
2use crate::circuit_breaker::{with_circuit_breaker, CircuitBreakerError};
3use crate::plc::{
4 create_update_op, sign_operation, PlcClient, PlcError, PlcOpOrTombstone, PlcService,
5};
6use crate::state::AppState;
7use axum::{
8 extract::State,
9 http::StatusCode,
10 response::{IntoResponse, Response},
11 Json,
12};
13use chrono::Utc;
14use k256::ecdsa::SigningKey;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use std::collections::HashMap;
18use tracing::{error, info, warn};
19#[derive(Debug, Deserialize)]
20#[serde(rename_all = "camelCase")]
21pub struct SignPlcOperationInput {
22 pub token: Option<String>,
23 pub rotation_keys: Option<Vec<String>>,
24 pub also_known_as: Option<Vec<String>>,
25 pub verification_methods: Option<HashMap<String, String>>,
26 pub services: Option<HashMap<String, ServiceInput>>,
27}
28#[derive(Debug, Deserialize, Clone)]
29pub struct ServiceInput {
30 #[serde(rename = "type")]
31 pub service_type: String,
32 pub endpoint: String,
33}
34#[derive(Debug, Serialize)]
35pub struct SignPlcOperationOutput {
36 pub operation: Value,
37}
38pub async fn sign_plc_operation(
39 State(state): State<AppState>,
40 headers: axum::http::HeaderMap,
41 Json(input): Json<SignPlcOperationInput>,
42) -> Response {
43 let bearer = match crate::auth::extract_bearer_token_from_header(
44 headers.get("Authorization").and_then(|h| h.to_str().ok()),
45 ) {
46 Some(t) => t,
47 None => return ApiError::AuthenticationRequired.into_response(),
48 };
49 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await {
50 Ok(user) => user,
51 Err(e) => return ApiError::from(e).into_response(),
52 };
53 let did = &auth_user.did;
54 let token = match &input.token {
55 Some(t) => t,
56 None => {
57 return ApiError::InvalidRequest(
58 "Email confirmation token required to sign PLC operations".into()
59 ).into_response();
60 }
61 };
62 let user = match sqlx::query!("SELECT id FROM users WHERE did = $1", did)
63 .fetch_optional(&state.db)
64 .await
65 {
66 Ok(Some(row)) => row,
67 _ => {
68 return (
69 StatusCode::NOT_FOUND,
70 Json(json!({"error": "AccountNotFound"})),
71 )
72 .into_response();
73 }
74 };
75 let token_row = match sqlx::query!(
76 "SELECT id, expires_at FROM plc_operation_tokens WHERE user_id = $1 AND token = $2",
77 user.id,
78 token
79 )
80 .fetch_optional(&state.db)
81 .await
82 {
83 Ok(Some(row)) => row,
84 Ok(None) => {
85 return (
86 StatusCode::BAD_REQUEST,
87 Json(json!({
88 "error": "InvalidToken",
89 "message": "Invalid or expired token"
90 })),
91 )
92 .into_response();
93 }
94 Err(e) => {
95 error!("DB error: {:?}", e);
96 return (
97 StatusCode::INTERNAL_SERVER_ERROR,
98 Json(json!({"error": "InternalError"})),
99 )
100 .into_response();
101 }
102 };
103 if Utc::now() > token_row.expires_at {
104 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id)
105 .execute(&state.db)
106 .await;
107 return (
108 StatusCode::BAD_REQUEST,
109 Json(json!({
110 "error": "ExpiredToken",
111 "message": "Token has expired"
112 })),
113 )
114 .into_response();
115 }
116 let key_row = match sqlx::query!(
117 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
118 user.id
119 )
120 .fetch_optional(&state.db)
121 .await
122 {
123 Ok(Some(row)) => row,
124 _ => {
125 return (
126 StatusCode::INTERNAL_SERVER_ERROR,
127 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
128 )
129 .into_response();
130 }
131 };
132 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
133 {
134 Ok(k) => k,
135 Err(e) => {
136 error!("Failed to decrypt user key: {}", e);
137 return (
138 StatusCode::INTERNAL_SERVER_ERROR,
139 Json(json!({"error": "InternalError"})),
140 )
141 .into_response();
142 }
143 };
144 let signing_key = match SigningKey::from_slice(&key_bytes) {
145 Ok(k) => k,
146 Err(e) => {
147 error!("Failed to create signing key: {:?}", e);
148 return (
149 StatusCode::INTERNAL_SERVER_ERROR,
150 Json(json!({"error": "InternalError"})),
151 )
152 .into_response();
153 }
154 };
155 let plc_client = PlcClient::new(None);
156 let did_clone = did.clone();
157 let result: Result<PlcOpOrTombstone, CircuitBreakerError<PlcError>> = with_circuit_breaker(
158 &state.circuit_breakers.plc_directory,
159 || async { plc_client.get_last_op(&did_clone).await },
160 )
161 .await;
162 let last_op = match result {
163 Ok(op) => op,
164 Err(CircuitBreakerError::CircuitOpen(e)) => {
165 warn!("PLC directory circuit breaker open: {}", e);
166 return (
167 StatusCode::SERVICE_UNAVAILABLE,
168 Json(json!({
169 "error": "ServiceUnavailable",
170 "message": "PLC directory service temporarily unavailable"
171 })),
172 )
173 .into_response();
174 }
175 Err(CircuitBreakerError::OperationFailed(PlcError::NotFound)) => {
176 return (
177 StatusCode::NOT_FOUND,
178 Json(json!({
179 "error": "NotFound",
180 "message": "DID not found in PLC directory"
181 })),
182 )
183 .into_response();
184 }
185 Err(CircuitBreakerError::OperationFailed(e)) => {
186 error!("Failed to fetch PLC operation: {:?}", e);
187 return (
188 StatusCode::BAD_GATEWAY,
189 Json(json!({
190 "error": "UpstreamError",
191 "message": "Failed to communicate with PLC directory"
192 })),
193 )
194 .into_response();
195 }
196 };
197 if last_op.is_tombstone() {
198 return (
199 StatusCode::BAD_REQUEST,
200 Json(json!({
201 "error": "InvalidRequest",
202 "message": "DID is tombstoned"
203 })),
204 )
205 .into_response();
206 }
207 let services = input.services.map(|s| {
208 s.into_iter()
209 .map(|(k, v)| {
210 (
211 k,
212 PlcService {
213 service_type: v.service_type,
214 endpoint: v.endpoint,
215 },
216 )
217 })
218 .collect()
219 });
220 let unsigned_op = match create_update_op(
221 &last_op,
222 input.rotation_keys,
223 input.verification_methods,
224 input.also_known_as,
225 services,
226 ) {
227 Ok(op) => op,
228 Err(PlcError::Tombstoned) => {
229 return (
230 StatusCode::BAD_REQUEST,
231 Json(json!({
232 "error": "InvalidRequest",
233 "message": "Cannot update tombstoned DID"
234 })),
235 )
236 .into_response();
237 }
238 Err(e) => {
239 error!("Failed to create PLC operation: {:?}", e);
240 return (
241 StatusCode::INTERNAL_SERVER_ERROR,
242 Json(json!({"error": "InternalError"})),
243 )
244 .into_response();
245 }
246 };
247 let signed_op = match sign_operation(&unsigned_op, &signing_key) {
248 Ok(op) => op,
249 Err(e) => {
250 error!("Failed to sign PLC operation: {:?}", e);
251 return (
252 StatusCode::INTERNAL_SERVER_ERROR,
253 Json(json!({"error": "InternalError"})),
254 )
255 .into_response();
256 }
257 };
258 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id)
259 .execute(&state.db)
260 .await;
261 info!("Signed PLC operation for user {}", did);
262 (
263 StatusCode::OK,
264 Json(SignPlcOperationOutput {
265 operation: signed_op,
266 }),
267 )
268 .into_response()
269}