this repo has no description
1use crate::plc::{
2 create_update_op, sign_operation, signing_key_to_did_key, validate_plc_operation,
3 PlcClient, PlcError, PlcService,
4};
5use crate::state::AppState;
6use axum::{
7 extract::State,
8 http::StatusCode,
9 response::{IntoResponse, Response},
10 Json,
11};
12use chrono::{Duration, Utc};
13use k256::ecdsa::SigningKey;
14use rand::Rng;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use std::collections::HashMap;
18use tracing::{error, info, warn};
19
20fn generate_plc_token() -> String {
21 let mut rng = rand::thread_rng();
22 let chars: Vec<char> = "abcdefghijklmnopqrstuvwxyz234567".chars().collect();
23 let part1: String = (0..5).map(|_| chars[rng.gen_range(0..chars.len())]).collect();
24 let part2: String = (0..5).map(|_| chars[rng.gen_range(0..chars.len())]).collect();
25 format!("{}-{}", part1, part2)
26}
27
28pub async fn request_plc_operation_signature(
29 State(state): State<AppState>,
30 headers: axum::http::HeaderMap,
31) -> Response {
32 let token = match crate::auth::extract_bearer_token_from_header(
33 headers.get("Authorization").and_then(|h| h.to_str().ok()),
34 ) {
35 Some(t) => t,
36 None => {
37 return (
38 StatusCode::UNAUTHORIZED,
39 Json(json!({"error": "AuthenticationRequired"})),
40 )
41 .into_response();
42 }
43 };
44
45 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
46 Ok(user) => user,
47 Err(e) => {
48 return (
49 StatusCode::UNAUTHORIZED,
50 Json(json!({"error": "AuthenticationFailed", "message": e})),
51 )
52 .into_response();
53 }
54 };
55
56 let did = &auth_user.did;
57
58 let user = match sqlx::query!(
59 "SELECT id FROM users WHERE did = $1",
60 did
61 )
62 .fetch_optional(&state.db)
63 .await
64 {
65 Ok(Some(row)) => row,
66 Ok(None) => {
67 return (
68 StatusCode::NOT_FOUND,
69 Json(json!({"error": "AccountNotFound"})),
70 )
71 .into_response();
72 }
73 Err(e) => {
74 error!("DB error: {:?}", e);
75 return (
76 StatusCode::INTERNAL_SERVER_ERROR,
77 Json(json!({"error": "InternalError"})),
78 )
79 .into_response();
80 }
81 };
82
83 let _ = sqlx::query!(
84 "DELETE FROM plc_operation_tokens WHERE user_id = $1 OR expires_at < NOW()",
85 user.id
86 )
87 .execute(&state.db)
88 .await;
89
90 let plc_token = generate_plc_token();
91 let expires_at = Utc::now() + Duration::minutes(10);
92
93 if let Err(e) = sqlx::query!(
94 r#"
95 INSERT INTO plc_operation_tokens (user_id, token, expires_at)
96 VALUES ($1, $2, $3)
97 "#,
98 user.id,
99 plc_token,
100 expires_at
101 )
102 .execute(&state.db)
103 .await
104 {
105 error!("Failed to create PLC token: {:?}", e);
106 return (
107 StatusCode::INTERNAL_SERVER_ERROR,
108 Json(json!({"error": "InternalError"})),
109 )
110 .into_response();
111 }
112
113 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
114
115 if let Err(e) = crate::notifications::enqueue_plc_operation(
116 &state.db,
117 user.id,
118 &plc_token,
119 &hostname,
120 )
121 .await
122 {
123 warn!("Failed to enqueue PLC operation notification: {:?}", e);
124 }
125
126 info!("PLC operation signature requested for user {}", did);
127
128 (StatusCode::OK, Json(json!({}))).into_response()
129}
130
131#[derive(Debug, Deserialize)]
132#[serde(rename_all = "camelCase")]
133pub struct SignPlcOperationInput {
134 pub token: Option<String>,
135 pub rotation_keys: Option<Vec<String>>,
136 pub also_known_as: Option<Vec<String>>,
137 pub verification_methods: Option<HashMap<String, String>>,
138 pub services: Option<HashMap<String, ServiceInput>>,
139}
140
141#[derive(Debug, Deserialize, Clone)]
142pub struct ServiceInput {
143 #[serde(rename = "type")]
144 pub service_type: String,
145 pub endpoint: String,
146}
147
148#[derive(Debug, Serialize)]
149pub struct SignPlcOperationOutput {
150 pub operation: Value,
151}
152
153pub async fn sign_plc_operation(
154 State(state): State<AppState>,
155 headers: axum::http::HeaderMap,
156 Json(input): Json<SignPlcOperationInput>,
157) -> Response {
158 let bearer = match crate::auth::extract_bearer_token_from_header(
159 headers.get("Authorization").and_then(|h| h.to_str().ok()),
160 ) {
161 Some(t) => t,
162 None => {
163 return (
164 StatusCode::UNAUTHORIZED,
165 Json(json!({"error": "AuthenticationRequired"})),
166 )
167 .into_response();
168 }
169 };
170
171 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await {
172 Ok(user) => user,
173 Err(e) => {
174 return (
175 StatusCode::UNAUTHORIZED,
176 Json(json!({"error": "AuthenticationFailed", "message": e})),
177 )
178 .into_response();
179 }
180 };
181
182 let did = &auth_user.did;
183
184 let token = match &input.token {
185 Some(t) => t,
186 None => {
187 return (
188 StatusCode::BAD_REQUEST,
189 Json(json!({
190 "error": "InvalidRequest",
191 "message": "Email confirmation token required to sign PLC operations"
192 })),
193 )
194 .into_response();
195 }
196 };
197
198 let user = match sqlx::query!("SELECT id FROM users WHERE did = $1", did)
199 .fetch_optional(&state.db)
200 .await
201 {
202 Ok(Some(row)) => row,
203 _ => {
204 return (
205 StatusCode::NOT_FOUND,
206 Json(json!({"error": "AccountNotFound"})),
207 )
208 .into_response();
209 }
210 };
211
212 let token_row = match sqlx::query!(
213 "SELECT id, expires_at FROM plc_operation_tokens WHERE user_id = $1 AND token = $2",
214 user.id,
215 token
216 )
217 .fetch_optional(&state.db)
218 .await
219 {
220 Ok(Some(row)) => row,
221 Ok(None) => {
222 return (
223 StatusCode::BAD_REQUEST,
224 Json(json!({
225 "error": "InvalidToken",
226 "message": "Invalid or expired token"
227 })),
228 )
229 .into_response();
230 }
231 Err(e) => {
232 error!("DB error: {:?}", e);
233 return (
234 StatusCode::INTERNAL_SERVER_ERROR,
235 Json(json!({"error": "InternalError"})),
236 )
237 .into_response();
238 }
239 };
240
241 if Utc::now() > token_row.expires_at {
242 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id)
243 .execute(&state.db)
244 .await;
245 return (
246 StatusCode::BAD_REQUEST,
247 Json(json!({
248 "error": "ExpiredToken",
249 "message": "Token has expired"
250 })),
251 )
252 .into_response();
253 }
254
255 let key_row = match sqlx::query!(
256 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
257 user.id
258 )
259 .fetch_optional(&state.db)
260 .await
261 {
262 Ok(Some(row)) => row,
263 _ => {
264 return (
265 StatusCode::INTERNAL_SERVER_ERROR,
266 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
267 )
268 .into_response();
269 }
270 };
271
272 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
273 {
274 Ok(k) => k,
275 Err(e) => {
276 error!("Failed to decrypt user key: {}", e);
277 return (
278 StatusCode::INTERNAL_SERVER_ERROR,
279 Json(json!({"error": "InternalError"})),
280 )
281 .into_response();
282 }
283 };
284
285 let signing_key = match SigningKey::from_slice(&key_bytes) {
286 Ok(k) => k,
287 Err(e) => {
288 error!("Failed to create signing key: {:?}", e);
289 return (
290 StatusCode::INTERNAL_SERVER_ERROR,
291 Json(json!({"error": "InternalError"})),
292 )
293 .into_response();
294 }
295 };
296
297 let plc_client = PlcClient::new(None);
298 let last_op = match plc_client.get_last_op(did).await {
299 Ok(op) => op,
300 Err(PlcError::NotFound) => {
301 return (
302 StatusCode::NOT_FOUND,
303 Json(json!({
304 "error": "NotFound",
305 "message": "DID not found in PLC directory"
306 })),
307 )
308 .into_response();
309 }
310 Err(e) => {
311 error!("Failed to fetch PLC operation: {:?}", e);
312 return (
313 StatusCode::BAD_GATEWAY,
314 Json(json!({
315 "error": "UpstreamError",
316 "message": "Failed to communicate with PLC directory"
317 })),
318 )
319 .into_response();
320 }
321 };
322
323 if last_op.is_tombstone() {
324 return (
325 StatusCode::BAD_REQUEST,
326 Json(json!({
327 "error": "InvalidRequest",
328 "message": "DID is tombstoned"
329 })),
330 )
331 .into_response();
332 }
333
334 let services = input.services.map(|s| {
335 s.into_iter()
336 .map(|(k, v)| {
337 (
338 k,
339 PlcService {
340 service_type: v.service_type,
341 endpoint: v.endpoint,
342 },
343 )
344 })
345 .collect()
346 });
347
348 let unsigned_op = match create_update_op(
349 &last_op,
350 input.rotation_keys,
351 input.verification_methods,
352 input.also_known_as,
353 services,
354 ) {
355 Ok(op) => op,
356 Err(PlcError::Tombstoned) => {
357 return (
358 StatusCode::BAD_REQUEST,
359 Json(json!({
360 "error": "InvalidRequest",
361 "message": "Cannot update tombstoned DID"
362 })),
363 )
364 .into_response();
365 }
366 Err(e) => {
367 error!("Failed to create PLC operation: {:?}", e);
368 return (
369 StatusCode::INTERNAL_SERVER_ERROR,
370 Json(json!({"error": "InternalError"})),
371 )
372 .into_response();
373 }
374 };
375
376 let signed_op = match sign_operation(&unsigned_op, &signing_key) {
377 Ok(op) => op,
378 Err(e) => {
379 error!("Failed to sign PLC operation: {:?}", e);
380 return (
381 StatusCode::INTERNAL_SERVER_ERROR,
382 Json(json!({"error": "InternalError"})),
383 )
384 .into_response();
385 }
386 };
387
388 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id)
389 .execute(&state.db)
390 .await;
391
392 info!("Signed PLC operation for user {}", did);
393
394 (
395 StatusCode::OK,
396 Json(SignPlcOperationOutput {
397 operation: signed_op,
398 }),
399 )
400 .into_response()
401}
402
403#[derive(Debug, Deserialize)]
404pub struct SubmitPlcOperationInput {
405 pub operation: Value,
406}
407
408pub async fn submit_plc_operation(
409 State(state): State<AppState>,
410 headers: axum::http::HeaderMap,
411 Json(input): Json<SubmitPlcOperationInput>,
412) -> Response {
413 let bearer = match crate::auth::extract_bearer_token_from_header(
414 headers.get("Authorization").and_then(|h| h.to_str().ok()),
415 ) {
416 Some(t) => t,
417 None => {
418 return (
419 StatusCode::UNAUTHORIZED,
420 Json(json!({"error": "AuthenticationRequired"})),
421 )
422 .into_response();
423 }
424 };
425
426 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await {
427 Ok(user) => user,
428 Err(e) => {
429 return (
430 StatusCode::UNAUTHORIZED,
431 Json(json!({"error": "AuthenticationFailed", "message": e})),
432 )
433 .into_response();
434 }
435 };
436
437 let did = &auth_user.did;
438
439 if let Err(e) = validate_plc_operation(&input.operation) {
440 return (
441 StatusCode::BAD_REQUEST,
442 Json(json!({
443 "error": "InvalidRequest",
444 "message": format!("Invalid operation: {}", e)
445 })),
446 )
447 .into_response();
448 }
449
450 let op = &input.operation;
451 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
452 let public_url = format!("https://{}", hostname);
453
454 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did)
455 .fetch_optional(&state.db)
456 .await
457 {
458 Ok(Some(row)) => row,
459 _ => {
460 return (
461 StatusCode::NOT_FOUND,
462 Json(json!({"error": "AccountNotFound"})),
463 )
464 .into_response();
465 }
466 };
467
468 let key_row = match sqlx::query!(
469 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
470 user.id
471 )
472 .fetch_optional(&state.db)
473 .await
474 {
475 Ok(Some(row)) => row,
476 _ => {
477 return (
478 StatusCode::INTERNAL_SERVER_ERROR,
479 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
480 )
481 .into_response();
482 }
483 };
484
485 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
486 {
487 Ok(k) => k,
488 Err(e) => {
489 error!("Failed to decrypt user key: {}", e);
490 return (
491 StatusCode::INTERNAL_SERVER_ERROR,
492 Json(json!({"error": "InternalError"})),
493 )
494 .into_response();
495 }
496 };
497
498 let signing_key = match SigningKey::from_slice(&key_bytes) {
499 Ok(k) => k,
500 Err(e) => {
501 error!("Failed to create signing key: {:?}", e);
502 return (
503 StatusCode::INTERNAL_SERVER_ERROR,
504 Json(json!({"error": "InternalError"})),
505 )
506 .into_response();
507 }
508 };
509
510 let user_did_key = signing_key_to_did_key(&signing_key);
511
512 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) {
513 let server_rotation_key =
514 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone());
515
516 let has_server_key = rotation_keys
517 .iter()
518 .any(|k| k.as_str() == Some(&server_rotation_key));
519
520 if !has_server_key {
521 return (
522 StatusCode::BAD_REQUEST,
523 Json(json!({
524 "error": "InvalidRequest",
525 "message": "Rotation keys do not include server's rotation key"
526 })),
527 )
528 .into_response();
529 }
530 }
531
532 if let Some(services) = op.get("services").and_then(|v| v.as_object()) {
533 if let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) {
534 let service_type = pds.get("type").and_then(|v| v.as_str());
535 let endpoint = pds.get("endpoint").and_then(|v| v.as_str());
536
537 if service_type != Some("AtprotoPersonalDataServer") {
538 return (
539 StatusCode::BAD_REQUEST,
540 Json(json!({
541 "error": "InvalidRequest",
542 "message": "Incorrect type on atproto_pds service"
543 })),
544 )
545 .into_response();
546 }
547
548 if endpoint != Some(&public_url) {
549 return (
550 StatusCode::BAD_REQUEST,
551 Json(json!({
552 "error": "InvalidRequest",
553 "message": "Incorrect endpoint on atproto_pds service"
554 })),
555 )
556 .into_response();
557 }
558 }
559 }
560
561 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) {
562 if let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) {
563 if atproto_key != user_did_key {
564 return (
565 StatusCode::BAD_REQUEST,
566 Json(json!({
567 "error": "InvalidRequest",
568 "message": "Incorrect signing key in verificationMethods"
569 })),
570 )
571 .into_response();
572 }
573 }
574 }
575
576 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) {
577 let expected_handle = format!("at://{}", user.handle);
578 let first_aka = also_known_as.first().and_then(|v| v.as_str());
579
580 if first_aka != Some(&expected_handle) {
581 return (
582 StatusCode::BAD_REQUEST,
583 Json(json!({
584 "error": "InvalidRequest",
585 "message": "Incorrect handle in alsoKnownAs"
586 })),
587 )
588 .into_response();
589 }
590 }
591
592 let plc_client = PlcClient::new(None);
593 if let Err(e) = plc_client.send_operation(did, &input.operation).await {
594 error!("Failed to submit PLC operation: {:?}", e);
595 return (
596 StatusCode::BAD_GATEWAY,
597 Json(json!({
598 "error": "UpstreamError",
599 "message": format!("Failed to submit to PLC directory: {}", e)
600 })),
601 )
602 .into_response();
603 }
604
605 if let Err(e) = sqlx::query!(
606 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity')",
607 did
608 )
609 .execute(&state.db)
610 .await
611 {
612 warn!("Failed to sequence identity event: {:?}", e);
613 }
614
615 info!("Submitted PLC operation for user {}", did);
616
617 (StatusCode::OK, Json(json!({}))).into_response()
618}