this repo has no description
1use crate::api::ApiError;
2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker};
3use crate::plc::{
4 PlcClient, PlcError, PlcOpOrTombstone, PlcService, create_update_op, sign_operation,
5};
6use crate::state::AppState;
7use axum::{
8 Json,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use chrono::Utc;
14use k256::ecdsa::SigningKey;
15use serde::{Deserialize, Serialize};
16use serde_json::{Value, json};
17use std::collections::HashMap;
18use tracing::{error, info, warn};
19
20#[derive(Debug, Deserialize)]
21#[serde(rename_all = "camelCase")]
22pub struct SignPlcOperationInput {
23 pub token: Option<String>,
24 pub rotation_keys: Option<Vec<String>>,
25 pub also_known_as: Option<Vec<String>>,
26 pub verification_methods: Option<HashMap<String, String>>,
27 pub services: Option<HashMap<String, ServiceInput>>,
28}
29
30#[derive(Debug, Deserialize, Clone)]
31pub struct ServiceInput {
32 #[serde(rename = "type")]
33 pub service_type: String,
34 pub endpoint: String,
35}
36
37#[derive(Debug, Serialize)]
38pub struct SignPlcOperationOutput {
39 pub operation: Value,
40}
41
42pub async fn sign_plc_operation(
43 State(state): State<AppState>,
44 headers: axum::http::HeaderMap,
45 Json(input): Json<SignPlcOperationInput>,
46) -> Response {
47 let bearer = match crate::auth::extract_bearer_token_from_header(
48 headers.get("Authorization").and_then(|h| h.to_str().ok()),
49 ) {
50 Some(t) => t,
51 None => return ApiError::AuthenticationRequired.into_response(),
52 };
53 let auth_user =
54 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await {
55 Ok(user) => user,
56 Err(e) => return ApiError::from(e).into_response(),
57 };
58 if let Err(e) = crate::auth::scope_check::check_identity_scope(
59 auth_user.is_oauth,
60 auth_user.scope.as_deref(),
61 crate::oauth::scopes::IdentityAttr::Wildcard,
62 ) {
63 return e;
64 }
65 let did = &auth_user.did;
66 let token = match &input.token {
67 Some(t) => t,
68 None => {
69 return ApiError::InvalidRequest(
70 "Email confirmation token required to sign PLC operations".into(),
71 )
72 .into_response();
73 }
74 };
75 let user = match sqlx::query!("SELECT id FROM users WHERE did = $1", did)
76 .fetch_optional(&state.db)
77 .await
78 {
79 Ok(Some(row)) => row,
80 _ => {
81 return (
82 StatusCode::NOT_FOUND,
83 Json(json!({"error": "AccountNotFound"})),
84 )
85 .into_response();
86 }
87 };
88 let token_row = match sqlx::query!(
89 "SELECT id, expires_at FROM plc_operation_tokens WHERE user_id = $1 AND token = $2",
90 user.id,
91 token
92 )
93 .fetch_optional(&state.db)
94 .await
95 {
96 Ok(Some(row)) => row,
97 Ok(None) => {
98 return (
99 StatusCode::BAD_REQUEST,
100 Json(json!({
101 "error": "InvalidToken",
102 "message": "Invalid or expired token"
103 })),
104 )
105 .into_response();
106 }
107 Err(e) => {
108 error!("DB error: {:?}", e);
109 return (
110 StatusCode::INTERNAL_SERVER_ERROR,
111 Json(json!({"error": "InternalError"})),
112 )
113 .into_response();
114 }
115 };
116 if Utc::now() > token_row.expires_at {
117 let _ = sqlx::query!(
118 "DELETE FROM plc_operation_tokens WHERE id = $1",
119 token_row.id
120 )
121 .execute(&state.db)
122 .await;
123 return (
124 StatusCode::BAD_REQUEST,
125 Json(json!({
126 "error": "ExpiredToken",
127 "message": "Token has expired"
128 })),
129 )
130 .into_response();
131 }
132 let key_row = match sqlx::query!(
133 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
134 user.id
135 )
136 .fetch_optional(&state.db)
137 .await
138 {
139 Ok(Some(row)) => row,
140 _ => {
141 return (
142 StatusCode::INTERNAL_SERVER_ERROR,
143 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
144 )
145 .into_response();
146 }
147 };
148 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
149 {
150 Ok(k) => k,
151 Err(e) => {
152 error!("Failed to decrypt user key: {}", e);
153 return (
154 StatusCode::INTERNAL_SERVER_ERROR,
155 Json(json!({"error": "InternalError"})),
156 )
157 .into_response();
158 }
159 };
160 let signing_key = match SigningKey::from_slice(&key_bytes) {
161 Ok(k) => k,
162 Err(e) => {
163 error!("Failed to create signing key: {:?}", e);
164 return (
165 StatusCode::INTERNAL_SERVER_ERROR,
166 Json(json!({"error": "InternalError"})),
167 )
168 .into_response();
169 }
170 };
171 let plc_client = PlcClient::new(None);
172 let did_clone = did.clone();
173 let result: Result<PlcOpOrTombstone, CircuitBreakerError<PlcError>> =
174 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async {
175 plc_client.get_last_op(&did_clone).await
176 })
177 .await;
178 let last_op = match result {
179 Ok(op) => op,
180 Err(CircuitBreakerError::CircuitOpen(e)) => {
181 warn!("PLC directory circuit breaker open: {}", e);
182 return (
183 StatusCode::SERVICE_UNAVAILABLE,
184 Json(json!({
185 "error": "ServiceUnavailable",
186 "message": "PLC directory service temporarily unavailable"
187 })),
188 )
189 .into_response();
190 }
191 Err(CircuitBreakerError::OperationFailed(PlcError::NotFound)) => {
192 return (
193 StatusCode::NOT_FOUND,
194 Json(json!({
195 "error": "NotFound",
196 "message": "DID not found in PLC directory"
197 })),
198 )
199 .into_response();
200 }
201 Err(CircuitBreakerError::OperationFailed(e)) => {
202 error!("Failed to fetch PLC operation: {:?}", e);
203 return (
204 StatusCode::BAD_GATEWAY,
205 Json(json!({
206 "error": "UpstreamError",
207 "message": "Failed to communicate with PLC directory"
208 })),
209 )
210 .into_response();
211 }
212 };
213 if last_op.is_tombstone() {
214 return (
215 StatusCode::BAD_REQUEST,
216 Json(json!({
217 "error": "InvalidRequest",
218 "message": "DID is tombstoned"
219 })),
220 )
221 .into_response();
222 }
223 let services = input.services.map(|s| {
224 s.into_iter()
225 .map(|(k, v)| {
226 (
227 k,
228 PlcService {
229 service_type: v.service_type,
230 endpoint: v.endpoint,
231 },
232 )
233 })
234 .collect()
235 });
236 let unsigned_op = match create_update_op(
237 &last_op,
238 input.rotation_keys,
239 input.verification_methods,
240 input.also_known_as,
241 services,
242 ) {
243 Ok(op) => op,
244 Err(PlcError::Tombstoned) => {
245 return (
246 StatusCode::BAD_REQUEST,
247 Json(json!({
248 "error": "InvalidRequest",
249 "message": "Cannot update tombstoned DID"
250 })),
251 )
252 .into_response();
253 }
254 Err(e) => {
255 error!("Failed to create PLC operation: {:?}", e);
256 return (
257 StatusCode::INTERNAL_SERVER_ERROR,
258 Json(json!({"error": "InternalError"})),
259 )
260 .into_response();
261 }
262 };
263 let signed_op = match sign_operation(&unsigned_op, &signing_key) {
264 Ok(op) => op,
265 Err(e) => {
266 error!("Failed to sign PLC operation: {:?}", e);
267 return (
268 StatusCode::INTERNAL_SERVER_ERROR,
269 Json(json!({"error": "InternalError"})),
270 )
271 .into_response();
272 }
273 };
274 let _ = sqlx::query!(
275 "DELETE FROM plc_operation_tokens WHERE id = $1",
276 token_row.id
277 )
278 .execute(&state.db)
279 .await;
280 info!("Signed PLC operation for user {}", did);
281 (
282 StatusCode::OK,
283 Json(SignPlcOperationOutput {
284 operation: signed_op,
285 }),
286 )
287 .into_response()
288}