this repo has no description
1use crate::api::ApiError;
2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker};
3use crate::plc::{
4 PlcClient, PlcError, PlcOpOrTombstone, PlcService, create_update_op, sign_operation,
5};
6use crate::state::AppState;
7use axum::{
8 Json,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use chrono::Utc;
14use k256::ecdsa::SigningKey;
15use serde::{Deserialize, Serialize};
16use serde_json::{Value, json};
17use std::collections::HashMap;
18use tracing::{error, info, warn};
19
20#[derive(Debug, Deserialize)]
21#[serde(rename_all = "camelCase")]
22pub struct SignPlcOperationInput {
23 pub token: Option<String>,
24 pub rotation_keys: Option<Vec<String>>,
25 pub also_known_as: Option<Vec<String>>,
26 pub verification_methods: Option<HashMap<String, String>>,
27 pub services: Option<HashMap<String, ServiceInput>>,
28}
29
30#[derive(Debug, Deserialize, Clone)]
31pub struct ServiceInput {
32 #[serde(rename = "type")]
33 pub service_type: String,
34 pub endpoint: String,
35}
36
37#[derive(Debug, Serialize)]
38pub struct SignPlcOperationOutput {
39 pub operation: Value,
40}
41
42pub async fn sign_plc_operation(
43 State(state): State<AppState>,
44 headers: axum::http::HeaderMap,
45 Json(input): Json<SignPlcOperationInput>,
46) -> Response {
47 let bearer = match crate::auth::extract_bearer_token_from_header(
48 headers.get("Authorization").and_then(|h| h.to_str().ok()),
49 ) {
50 Some(t) => t,
51 None => return ApiError::AuthenticationRequired.into_response(),
52 };
53 let auth_user =
54 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await {
55 Ok(user) => user,
56 Err(e) => return ApiError::from(e).into_response(),
57 };
58 if let Err(e) = crate::auth::scope_check::check_identity_scope(
59 auth_user.is_oauth,
60 auth_user.scope.as_deref(),
61 crate::oauth::scopes::IdentityAttr::Wildcard,
62 ) {
63 return e;
64 }
65 let did = &auth_user.did;
66 if did.starts_with("did:web:") {
67 return ApiError::InvalidRequest(
68 "PLC operations are only valid for did:plc identities".into(),
69 )
70 .into_response();
71 }
72 let token = match &input.token {
73 Some(t) => t,
74 None => {
75 return ApiError::InvalidRequest(
76 "Email confirmation token required to sign PLC operations".into(),
77 )
78 .into_response();
79 }
80 };
81 let user = match sqlx::query!("SELECT id FROM users WHERE did = $1", did)
82 .fetch_optional(&state.db)
83 .await
84 {
85 Ok(Some(row)) => row,
86 _ => {
87 return (
88 StatusCode::NOT_FOUND,
89 Json(json!({"error": "AccountNotFound"})),
90 )
91 .into_response();
92 }
93 };
94 let token_row = match sqlx::query!(
95 "SELECT id, expires_at FROM plc_operation_tokens WHERE user_id = $1 AND token = $2",
96 user.id,
97 token
98 )
99 .fetch_optional(&state.db)
100 .await
101 {
102 Ok(Some(row)) => row,
103 Ok(None) => {
104 return (
105 StatusCode::BAD_REQUEST,
106 Json(json!({
107 "error": "InvalidToken",
108 "message": "Invalid or expired token"
109 })),
110 )
111 .into_response();
112 }
113 Err(e) => {
114 error!("DB error: {:?}", e);
115 return (
116 StatusCode::INTERNAL_SERVER_ERROR,
117 Json(json!({"error": "InternalError"})),
118 )
119 .into_response();
120 }
121 };
122 if Utc::now() > token_row.expires_at {
123 let _ = sqlx::query!(
124 "DELETE FROM plc_operation_tokens WHERE id = $1",
125 token_row.id
126 )
127 .execute(&state.db)
128 .await;
129 return (
130 StatusCode::BAD_REQUEST,
131 Json(json!({
132 "error": "ExpiredToken",
133 "message": "Token has expired"
134 })),
135 )
136 .into_response();
137 }
138 let key_row = match sqlx::query!(
139 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
140 user.id
141 )
142 .fetch_optional(&state.db)
143 .await
144 {
145 Ok(Some(row)) => row,
146 _ => {
147 return (
148 StatusCode::INTERNAL_SERVER_ERROR,
149 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
150 )
151 .into_response();
152 }
153 };
154 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
155 {
156 Ok(k) => k,
157 Err(e) => {
158 error!("Failed to decrypt user key: {}", e);
159 return (
160 StatusCode::INTERNAL_SERVER_ERROR,
161 Json(json!({"error": "InternalError"})),
162 )
163 .into_response();
164 }
165 };
166 let signing_key = match SigningKey::from_slice(&key_bytes) {
167 Ok(k) => k,
168 Err(e) => {
169 error!("Failed to create signing key: {:?}", e);
170 return (
171 StatusCode::INTERNAL_SERVER_ERROR,
172 Json(json!({"error": "InternalError"})),
173 )
174 .into_response();
175 }
176 };
177 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone()));
178 let did_clone = did.clone();
179 let result: Result<PlcOpOrTombstone, CircuitBreakerError<PlcError>> =
180 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async {
181 plc_client.get_last_op(&did_clone).await
182 })
183 .await;
184 let last_op = match result {
185 Ok(op) => op,
186 Err(CircuitBreakerError::CircuitOpen(e)) => {
187 warn!("PLC directory circuit breaker open: {}", e);
188 return (
189 StatusCode::SERVICE_UNAVAILABLE,
190 Json(json!({
191 "error": "ServiceUnavailable",
192 "message": "PLC directory service temporarily unavailable"
193 })),
194 )
195 .into_response();
196 }
197 Err(CircuitBreakerError::OperationFailed(PlcError::NotFound)) => {
198 return (
199 StatusCode::NOT_FOUND,
200 Json(json!({
201 "error": "NotFound",
202 "message": "DID not found in PLC directory"
203 })),
204 )
205 .into_response();
206 }
207 Err(CircuitBreakerError::OperationFailed(e)) => {
208 error!("Failed to fetch PLC operation: {:?}", e);
209 return (
210 StatusCode::BAD_GATEWAY,
211 Json(json!({
212 "error": "UpstreamError",
213 "message": "Failed to communicate with PLC directory"
214 })),
215 )
216 .into_response();
217 }
218 };
219 if last_op.is_tombstone() {
220 return (
221 StatusCode::BAD_REQUEST,
222 Json(json!({
223 "error": "InvalidRequest",
224 "message": "DID is tombstoned"
225 })),
226 )
227 .into_response();
228 }
229 let services = input.services.map(|s| {
230 s.into_iter()
231 .map(|(k, v)| {
232 (
233 k,
234 PlcService {
235 service_type: v.service_type,
236 endpoint: v.endpoint,
237 },
238 )
239 })
240 .collect()
241 });
242 let unsigned_op = match create_update_op(
243 &last_op,
244 input.rotation_keys,
245 input.verification_methods,
246 input.also_known_as,
247 services,
248 ) {
249 Ok(op) => op,
250 Err(PlcError::Tombstoned) => {
251 return (
252 StatusCode::BAD_REQUEST,
253 Json(json!({
254 "error": "InvalidRequest",
255 "message": "Cannot update tombstoned DID"
256 })),
257 )
258 .into_response();
259 }
260 Err(e) => {
261 error!("Failed to create PLC operation: {:?}", e);
262 return (
263 StatusCode::INTERNAL_SERVER_ERROR,
264 Json(json!({"error": "InternalError"})),
265 )
266 .into_response();
267 }
268 };
269 let signed_op = match sign_operation(&unsigned_op, &signing_key) {
270 Ok(op) => op,
271 Err(e) => {
272 error!("Failed to sign PLC operation: {:?}", e);
273 return (
274 StatusCode::INTERNAL_SERVER_ERROR,
275 Json(json!({"error": "InternalError"})),
276 )
277 .into_response();
278 }
279 };
280 let _ = sqlx::query!(
281 "DELETE FROM plc_operation_tokens WHERE id = $1",
282 token_row.id
283 )
284 .execute(&state.db)
285 .await;
286 info!("Signed PLC operation for user {}", did);
287 (
288 StatusCode::OK,
289 Json(SignPlcOperationOutput {
290 operation: signed_op,
291 }),
292 )
293 .into_response()
294}