this repo has no description
1use crate::api::ApiError;
2use crate::circuit_breaker::{with_circuit_breaker, CircuitBreakerError};
3use crate::plc::{
4 create_update_op, sign_operation, PlcClient, PlcError, PlcOpOrTombstone, PlcService,
5};
6use crate::state::AppState;
7use axum::{
8 extract::State,
9 http::StatusCode,
10 response::{IntoResponse, Response},
11 Json,
12};
13use chrono::Utc;
14use k256::ecdsa::SigningKey;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use std::collections::HashMap;
18use tracing::{error, info, warn};
19
20#[derive(Debug, Deserialize)]
21#[serde(rename_all = "camelCase")]
22pub struct SignPlcOperationInput {
23 pub token: Option<String>,
24 pub rotation_keys: Option<Vec<String>>,
25 pub also_known_as: Option<Vec<String>>,
26 pub verification_methods: Option<HashMap<String, String>>,
27 pub services: Option<HashMap<String, ServiceInput>>,
28}
29
30#[derive(Debug, Deserialize, Clone)]
31pub struct ServiceInput {
32 #[serde(rename = "type")]
33 pub service_type: String,
34 pub endpoint: String,
35}
36
37#[derive(Debug, Serialize)]
38pub struct SignPlcOperationOutput {
39 pub operation: Value,
40}
41
42pub async fn sign_plc_operation(
43 State(state): State<AppState>,
44 headers: axum::http::HeaderMap,
45 Json(input): Json<SignPlcOperationInput>,
46) -> Response {
47 let bearer = match crate::auth::extract_bearer_token_from_header(
48 headers.get("Authorization").and_then(|h| h.to_str().ok()),
49 ) {
50 Some(t) => t,
51 None => return ApiError::AuthenticationRequired.into_response(),
52 };
53
54 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await {
55 Ok(user) => user,
56 Err(e) => return ApiError::from(e).into_response(),
57 };
58
59 let did = &auth_user.did;
60
61 let token = match &input.token {
62 Some(t) => t,
63 None => {
64 return ApiError::InvalidRequest(
65 "Email confirmation token required to sign PLC operations".into()
66 ).into_response();
67 }
68 };
69
70 let user = match sqlx::query!("SELECT id FROM users WHERE did = $1", did)
71 .fetch_optional(&state.db)
72 .await
73 {
74 Ok(Some(row)) => row,
75 _ => {
76 return (
77 StatusCode::NOT_FOUND,
78 Json(json!({"error": "AccountNotFound"})),
79 )
80 .into_response();
81 }
82 };
83
84 let token_row = match sqlx::query!(
85 "SELECT id, expires_at FROM plc_operation_tokens WHERE user_id = $1 AND token = $2",
86 user.id,
87 token
88 )
89 .fetch_optional(&state.db)
90 .await
91 {
92 Ok(Some(row)) => row,
93 Ok(None) => {
94 return (
95 StatusCode::BAD_REQUEST,
96 Json(json!({
97 "error": "InvalidToken",
98 "message": "Invalid or expired token"
99 })),
100 )
101 .into_response();
102 }
103 Err(e) => {
104 error!("DB error: {:?}", e);
105 return (
106 StatusCode::INTERNAL_SERVER_ERROR,
107 Json(json!({"error": "InternalError"})),
108 )
109 .into_response();
110 }
111 };
112
113 if Utc::now() > token_row.expires_at {
114 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id)
115 .execute(&state.db)
116 .await;
117 return (
118 StatusCode::BAD_REQUEST,
119 Json(json!({
120 "error": "ExpiredToken",
121 "message": "Token has expired"
122 })),
123 )
124 .into_response();
125 }
126
127 let key_row = match sqlx::query!(
128 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
129 user.id
130 )
131 .fetch_optional(&state.db)
132 .await
133 {
134 Ok(Some(row)) => row,
135 _ => {
136 return (
137 StatusCode::INTERNAL_SERVER_ERROR,
138 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
139 )
140 .into_response();
141 }
142 };
143
144 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
145 {
146 Ok(k) => k,
147 Err(e) => {
148 error!("Failed to decrypt user key: {}", e);
149 return (
150 StatusCode::INTERNAL_SERVER_ERROR,
151 Json(json!({"error": "InternalError"})),
152 )
153 .into_response();
154 }
155 };
156
157 let signing_key = match SigningKey::from_slice(&key_bytes) {
158 Ok(k) => k,
159 Err(e) => {
160 error!("Failed to create signing key: {:?}", e);
161 return (
162 StatusCode::INTERNAL_SERVER_ERROR,
163 Json(json!({"error": "InternalError"})),
164 )
165 .into_response();
166 }
167 };
168
169 let plc_client = PlcClient::new(None);
170 let did_clone = did.clone();
171 let result: Result<PlcOpOrTombstone, CircuitBreakerError<PlcError>> = with_circuit_breaker(
172 &state.circuit_breakers.plc_directory,
173 || async { plc_client.get_last_op(&did_clone).await },
174 )
175 .await;
176
177 let last_op = match result {
178 Ok(op) => op,
179 Err(CircuitBreakerError::CircuitOpen(e)) => {
180 warn!("PLC directory circuit breaker open: {}", e);
181 return (
182 StatusCode::SERVICE_UNAVAILABLE,
183 Json(json!({
184 "error": "ServiceUnavailable",
185 "message": "PLC directory service temporarily unavailable"
186 })),
187 )
188 .into_response();
189 }
190 Err(CircuitBreakerError::OperationFailed(PlcError::NotFound)) => {
191 return (
192 StatusCode::NOT_FOUND,
193 Json(json!({
194 "error": "NotFound",
195 "message": "DID not found in PLC directory"
196 })),
197 )
198 .into_response();
199 }
200 Err(CircuitBreakerError::OperationFailed(e)) => {
201 error!("Failed to fetch PLC operation: {:?}", e);
202 return (
203 StatusCode::BAD_GATEWAY,
204 Json(json!({
205 "error": "UpstreamError",
206 "message": "Failed to communicate with PLC directory"
207 })),
208 )
209 .into_response();
210 }
211 };
212
213 if last_op.is_tombstone() {
214 return (
215 StatusCode::BAD_REQUEST,
216 Json(json!({
217 "error": "InvalidRequest",
218 "message": "DID is tombstoned"
219 })),
220 )
221 .into_response();
222 }
223
224 let services = input.services.map(|s| {
225 s.into_iter()
226 .map(|(k, v)| {
227 (
228 k,
229 PlcService {
230 service_type: v.service_type,
231 endpoint: v.endpoint,
232 },
233 )
234 })
235 .collect()
236 });
237
238 let unsigned_op = match create_update_op(
239 &last_op,
240 input.rotation_keys,
241 input.verification_methods,
242 input.also_known_as,
243 services,
244 ) {
245 Ok(op) => op,
246 Err(PlcError::Tombstoned) => {
247 return (
248 StatusCode::BAD_REQUEST,
249 Json(json!({
250 "error": "InvalidRequest",
251 "message": "Cannot update tombstoned DID"
252 })),
253 )
254 .into_response();
255 }
256 Err(e) => {
257 error!("Failed to create PLC operation: {:?}", e);
258 return (
259 StatusCode::INTERNAL_SERVER_ERROR,
260 Json(json!({"error": "InternalError"})),
261 )
262 .into_response();
263 }
264 };
265
266 let signed_op = match sign_operation(&unsigned_op, &signing_key) {
267 Ok(op) => op,
268 Err(e) => {
269 error!("Failed to sign PLC operation: {:?}", e);
270 return (
271 StatusCode::INTERNAL_SERVER_ERROR,
272 Json(json!({"error": "InternalError"})),
273 )
274 .into_response();
275 }
276 };
277
278 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id)
279 .execute(&state.db)
280 .await;
281
282 info!("Signed PLC operation for user {}", did);
283
284 (
285 StatusCode::OK,
286 Json(SignPlcOperationOutput {
287 operation: signed_op,
288 }),
289 )
290 .into_response()
291}