this repo has no description
1use crate::api::ApiError;
2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker};
3use crate::plc::{
4 PlcClient, PlcError, PlcOpOrTombstone, PlcService, create_update_op, sign_operation,
5};
6use crate::state::AppState;
7use axum::{
8 Json,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use chrono::Utc;
14use k256::ecdsa::SigningKey;
15use serde::{Deserialize, Serialize};
16use serde_json::{Value, json};
17use std::collections::HashMap;
18use tracing::{error, info, warn};
19
20#[derive(Debug, Deserialize)]
21#[serde(rename_all = "camelCase")]
22pub struct SignPlcOperationInput {
23 pub token: Option<String>,
24 pub rotation_keys: Option<Vec<String>>,
25 pub also_known_as: Option<Vec<String>>,
26 pub verification_methods: Option<HashMap<String, String>>,
27 pub services: Option<HashMap<String, ServiceInput>>,
28}
29
30#[derive(Debug, Deserialize, Clone)]
31pub struct ServiceInput {
32 #[serde(rename = "type")]
33 pub service_type: String,
34 pub endpoint: String,
35}
36
37#[derive(Debug, Serialize)]
38pub struct SignPlcOperationOutput {
39 pub operation: Value,
40}
41
42pub async fn sign_plc_operation(
43 State(state): State<AppState>,
44 headers: axum::http::HeaderMap,
45 Json(input): Json<SignPlcOperationInput>,
46) -> Response {
47 let bearer = match crate::auth::extract_bearer_token_from_header(
48 headers.get("Authorization").and_then(|h| h.to_str().ok()),
49 ) {
50 Some(t) => t,
51 None => return ApiError::AuthenticationRequired.into_response(),
52 };
53 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await {
54 Ok(user) => user,
55 Err(e) => return ApiError::from(e).into_response(),
56 };
57 let did = &auth_user.did;
58 let token = match &input.token {
59 Some(t) => t,
60 None => {
61 return ApiError::InvalidRequest(
62 "Email confirmation token required to sign PLC operations".into(),
63 )
64 .into_response();
65 }
66 };
67 let user = match sqlx::query!("SELECT id FROM users WHERE did = $1", did)
68 .fetch_optional(&state.db)
69 .await
70 {
71 Ok(Some(row)) => row,
72 _ => {
73 return (
74 StatusCode::NOT_FOUND,
75 Json(json!({"error": "AccountNotFound"})),
76 )
77 .into_response();
78 }
79 };
80 let token_row = match sqlx::query!(
81 "SELECT id, expires_at FROM plc_operation_tokens WHERE user_id = $1 AND token = $2",
82 user.id,
83 token
84 )
85 .fetch_optional(&state.db)
86 .await
87 {
88 Ok(Some(row)) => row,
89 Ok(None) => {
90 return (
91 StatusCode::BAD_REQUEST,
92 Json(json!({
93 "error": "InvalidToken",
94 "message": "Invalid or expired token"
95 })),
96 )
97 .into_response();
98 }
99 Err(e) => {
100 error!("DB error: {:?}", e);
101 return (
102 StatusCode::INTERNAL_SERVER_ERROR,
103 Json(json!({"error": "InternalError"})),
104 )
105 .into_response();
106 }
107 };
108 if Utc::now() > token_row.expires_at {
109 let _ = sqlx::query!(
110 "DELETE FROM plc_operation_tokens WHERE id = $1",
111 token_row.id
112 )
113 .execute(&state.db)
114 .await;
115 return (
116 StatusCode::BAD_REQUEST,
117 Json(json!({
118 "error": "ExpiredToken",
119 "message": "Token has expired"
120 })),
121 )
122 .into_response();
123 }
124 let key_row = match sqlx::query!(
125 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
126 user.id
127 )
128 .fetch_optional(&state.db)
129 .await
130 {
131 Ok(Some(row)) => row,
132 _ => {
133 return (
134 StatusCode::INTERNAL_SERVER_ERROR,
135 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
136 )
137 .into_response();
138 }
139 };
140 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
141 {
142 Ok(k) => k,
143 Err(e) => {
144 error!("Failed to decrypt user key: {}", e);
145 return (
146 StatusCode::INTERNAL_SERVER_ERROR,
147 Json(json!({"error": "InternalError"})),
148 )
149 .into_response();
150 }
151 };
152 let signing_key = match SigningKey::from_slice(&key_bytes) {
153 Ok(k) => k,
154 Err(e) => {
155 error!("Failed to create signing key: {:?}", e);
156 return (
157 StatusCode::INTERNAL_SERVER_ERROR,
158 Json(json!({"error": "InternalError"})),
159 )
160 .into_response();
161 }
162 };
163 let plc_client = PlcClient::new(None);
164 let did_clone = did.clone();
165 let result: Result<PlcOpOrTombstone, CircuitBreakerError<PlcError>> =
166 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async {
167 plc_client.get_last_op(&did_clone).await
168 })
169 .await;
170 let last_op = match result {
171 Ok(op) => op,
172 Err(CircuitBreakerError::CircuitOpen(e)) => {
173 warn!("PLC directory circuit breaker open: {}", e);
174 return (
175 StatusCode::SERVICE_UNAVAILABLE,
176 Json(json!({
177 "error": "ServiceUnavailable",
178 "message": "PLC directory service temporarily unavailable"
179 })),
180 )
181 .into_response();
182 }
183 Err(CircuitBreakerError::OperationFailed(PlcError::NotFound)) => {
184 return (
185 StatusCode::NOT_FOUND,
186 Json(json!({
187 "error": "NotFound",
188 "message": "DID not found in PLC directory"
189 })),
190 )
191 .into_response();
192 }
193 Err(CircuitBreakerError::OperationFailed(e)) => {
194 error!("Failed to fetch PLC operation: {:?}", e);
195 return (
196 StatusCode::BAD_GATEWAY,
197 Json(json!({
198 "error": "UpstreamError",
199 "message": "Failed to communicate with PLC directory"
200 })),
201 )
202 .into_response();
203 }
204 };
205 if last_op.is_tombstone() {
206 return (
207 StatusCode::BAD_REQUEST,
208 Json(json!({
209 "error": "InvalidRequest",
210 "message": "DID is tombstoned"
211 })),
212 )
213 .into_response();
214 }
215 let services = input.services.map(|s| {
216 s.into_iter()
217 .map(|(k, v)| {
218 (
219 k,
220 PlcService {
221 service_type: v.service_type,
222 endpoint: v.endpoint,
223 },
224 )
225 })
226 .collect()
227 });
228 let unsigned_op = match create_update_op(
229 &last_op,
230 input.rotation_keys,
231 input.verification_methods,
232 input.also_known_as,
233 services,
234 ) {
235 Ok(op) => op,
236 Err(PlcError::Tombstoned) => {
237 return (
238 StatusCode::BAD_REQUEST,
239 Json(json!({
240 "error": "InvalidRequest",
241 "message": "Cannot update tombstoned DID"
242 })),
243 )
244 .into_response();
245 }
246 Err(e) => {
247 error!("Failed to create PLC operation: {:?}", e);
248 return (
249 StatusCode::INTERNAL_SERVER_ERROR,
250 Json(json!({"error": "InternalError"})),
251 )
252 .into_response();
253 }
254 };
255 let signed_op = match sign_operation(&unsigned_op, &signing_key) {
256 Ok(op) => op,
257 Err(e) => {
258 error!("Failed to sign PLC operation: {:?}", e);
259 return (
260 StatusCode::INTERNAL_SERVER_ERROR,
261 Json(json!({"error": "InternalError"})),
262 )
263 .into_response();
264 }
265 };
266 let _ = sqlx::query!(
267 "DELETE FROM plc_operation_tokens WHERE id = $1",
268 token_row.id
269 )
270 .execute(&state.db)
271 .await;
272 info!("Signed PLC operation for user {}", did);
273 (
274 StatusCode::OK,
275 Json(SignPlcOperationOutput {
276 operation: signed_op,
277 }),
278 )
279 .into_response()
280}