this repo has no description
1use crate::plc::{ 2 create_update_op, sign_operation, signing_key_to_did_key, validate_plc_operation, 3 PlcClient, PlcError, PlcService, 4}; 5use crate::state::AppState; 6use axum::{ 7 extract::State, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10 Json, 11}; 12use chrono::{Duration, Utc}; 13use k256::ecdsa::SigningKey; 14use rand::Rng; 15use serde::{Deserialize, Serialize}; 16use serde_json::{json, Value}; 17use std::collections::HashMap; 18use tracing::{error, info, warn}; 19 20fn generate_plc_token() -> String { 21 let mut rng = rand::thread_rng(); 22 let chars: Vec<char> = "abcdefghijklmnopqrstuvwxyz234567".chars().collect(); 23 let part1: String = (0..5).map(|_| chars[rng.gen_range(0..chars.len())]).collect(); 24 let part2: String = (0..5).map(|_| chars[rng.gen_range(0..chars.len())]).collect(); 25 format!("{}-{}", part1, part2) 26} 27 28pub async fn request_plc_operation_signature( 29 State(state): State<AppState>, 30 headers: axum::http::HeaderMap, 31) -> Response { 32 let token = match crate::auth::extract_bearer_token_from_header( 33 headers.get("Authorization").and_then(|h| h.to_str().ok()), 34 ) { 35 Some(t) => t, 36 None => { 37 return ( 38 StatusCode::UNAUTHORIZED, 39 Json(json!({"error": "AuthenticationRequired"})), 40 ) 41 .into_response(); 42 } 43 }; 44 45 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 46 Ok(user) => user, 47 Err(e) => { 48 return ( 49 StatusCode::UNAUTHORIZED, 50 Json(json!({"error": "AuthenticationFailed", "message": e})), 51 ) 52 .into_response(); 53 } 54 }; 55 56 let did = &auth_user.did; 57 58 let user = match sqlx::query!( 59 "SELECT id FROM users WHERE did = $1", 60 did 61 ) 62 .fetch_optional(&state.db) 63 .await 64 { 65 Ok(Some(row)) => row, 66 Ok(None) => { 67 return ( 68 StatusCode::NOT_FOUND, 69 Json(json!({"error": "AccountNotFound"})), 70 ) 71 .into_response(); 72 } 73 Err(e) => { 74 error!("DB error: {:?}", e); 75 return ( 76 StatusCode::INTERNAL_SERVER_ERROR, 77 Json(json!({"error": "InternalError"})), 78 ) 79 .into_response(); 80 } 81 }; 82 83 let _ = sqlx::query!( 84 "DELETE FROM plc_operation_tokens WHERE user_id = $1 OR expires_at < NOW()", 85 user.id 86 ) 87 .execute(&state.db) 88 .await; 89 90 let plc_token = generate_plc_token(); 91 let expires_at = Utc::now() + Duration::minutes(10); 92 93 if let Err(e) = sqlx::query!( 94 r#" 95 INSERT INTO plc_operation_tokens (user_id, token, expires_at) 96 VALUES ($1, $2, $3) 97 "#, 98 user.id, 99 plc_token, 100 expires_at 101 ) 102 .execute(&state.db) 103 .await 104 { 105 error!("Failed to create PLC token: {:?}", e); 106 return ( 107 StatusCode::INTERNAL_SERVER_ERROR, 108 Json(json!({"error": "InternalError"})), 109 ) 110 .into_response(); 111 } 112 113 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 114 115 if let Err(e) = crate::notifications::enqueue_plc_operation( 116 &state.db, 117 user.id, 118 &plc_token, 119 &hostname, 120 ) 121 .await 122 { 123 warn!("Failed to enqueue PLC operation notification: {:?}", e); 124 } 125 126 info!("PLC operation signature requested for user {}", did); 127 128 (StatusCode::OK, Json(json!({}))).into_response() 129} 130 131#[derive(Debug, Deserialize)] 132#[serde(rename_all = "camelCase")] 133pub struct SignPlcOperationInput { 134 pub token: Option<String>, 135 pub rotation_keys: Option<Vec<String>>, 136 pub also_known_as: Option<Vec<String>>, 137 pub verification_methods: Option<HashMap<String, String>>, 138 pub services: Option<HashMap<String, ServiceInput>>, 139} 140 141#[derive(Debug, Deserialize, Clone)] 142pub struct ServiceInput { 143 #[serde(rename = "type")] 144 pub service_type: String, 145 pub endpoint: String, 146} 147 148#[derive(Debug, Serialize)] 149pub struct SignPlcOperationOutput { 150 pub operation: Value, 151} 152 153pub async fn sign_plc_operation( 154 State(state): State<AppState>, 155 headers: axum::http::HeaderMap, 156 Json(input): Json<SignPlcOperationInput>, 157) -> Response { 158 let bearer = match crate::auth::extract_bearer_token_from_header( 159 headers.get("Authorization").and_then(|h| h.to_str().ok()), 160 ) { 161 Some(t) => t, 162 None => { 163 return ( 164 StatusCode::UNAUTHORIZED, 165 Json(json!({"error": "AuthenticationRequired"})), 166 ) 167 .into_response(); 168 } 169 }; 170 171 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await { 172 Ok(user) => user, 173 Err(e) => { 174 return ( 175 StatusCode::UNAUTHORIZED, 176 Json(json!({"error": "AuthenticationFailed", "message": e})), 177 ) 178 .into_response(); 179 } 180 }; 181 182 let did = &auth_user.did; 183 184 let token = match &input.token { 185 Some(t) => t, 186 None => { 187 return ( 188 StatusCode::BAD_REQUEST, 189 Json(json!({ 190 "error": "InvalidRequest", 191 "message": "Email confirmation token required to sign PLC operations" 192 })), 193 ) 194 .into_response(); 195 } 196 }; 197 198 let user = match sqlx::query!("SELECT id FROM users WHERE did = $1", did) 199 .fetch_optional(&state.db) 200 .await 201 { 202 Ok(Some(row)) => row, 203 _ => { 204 return ( 205 StatusCode::NOT_FOUND, 206 Json(json!({"error": "AccountNotFound"})), 207 ) 208 .into_response(); 209 } 210 }; 211 212 let token_row = match sqlx::query!( 213 "SELECT id, expires_at FROM plc_operation_tokens WHERE user_id = $1 AND token = $2", 214 user.id, 215 token 216 ) 217 .fetch_optional(&state.db) 218 .await 219 { 220 Ok(Some(row)) => row, 221 Ok(None) => { 222 return ( 223 StatusCode::BAD_REQUEST, 224 Json(json!({ 225 "error": "InvalidToken", 226 "message": "Invalid or expired token" 227 })), 228 ) 229 .into_response(); 230 } 231 Err(e) => { 232 error!("DB error: {:?}", e); 233 return ( 234 StatusCode::INTERNAL_SERVER_ERROR, 235 Json(json!({"error": "InternalError"})), 236 ) 237 .into_response(); 238 } 239 }; 240 241 if Utc::now() > token_row.expires_at { 242 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id) 243 .execute(&state.db) 244 .await; 245 return ( 246 StatusCode::BAD_REQUEST, 247 Json(json!({ 248 "error": "ExpiredToken", 249 "message": "Token has expired" 250 })), 251 ) 252 .into_response(); 253 } 254 255 let key_row = match sqlx::query!( 256 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 257 user.id 258 ) 259 .fetch_optional(&state.db) 260 .await 261 { 262 Ok(Some(row)) => row, 263 _ => { 264 return ( 265 StatusCode::INTERNAL_SERVER_ERROR, 266 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 267 ) 268 .into_response(); 269 } 270 }; 271 272 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 273 { 274 Ok(k) => k, 275 Err(e) => { 276 error!("Failed to decrypt user key: {}", e); 277 return ( 278 StatusCode::INTERNAL_SERVER_ERROR, 279 Json(json!({"error": "InternalError"})), 280 ) 281 .into_response(); 282 } 283 }; 284 285 let signing_key = match SigningKey::from_slice(&key_bytes) { 286 Ok(k) => k, 287 Err(e) => { 288 error!("Failed to create signing key: {:?}", e); 289 return ( 290 StatusCode::INTERNAL_SERVER_ERROR, 291 Json(json!({"error": "InternalError"})), 292 ) 293 .into_response(); 294 } 295 }; 296 297 let plc_client = PlcClient::new(None); 298 let last_op = match plc_client.get_last_op(did).await { 299 Ok(op) => op, 300 Err(PlcError::NotFound) => { 301 return ( 302 StatusCode::NOT_FOUND, 303 Json(json!({ 304 "error": "NotFound", 305 "message": "DID not found in PLC directory" 306 })), 307 ) 308 .into_response(); 309 } 310 Err(e) => { 311 error!("Failed to fetch PLC operation: {:?}", e); 312 return ( 313 StatusCode::BAD_GATEWAY, 314 Json(json!({ 315 "error": "UpstreamError", 316 "message": "Failed to communicate with PLC directory" 317 })), 318 ) 319 .into_response(); 320 } 321 }; 322 323 if last_op.is_tombstone() { 324 return ( 325 StatusCode::BAD_REQUEST, 326 Json(json!({ 327 "error": "InvalidRequest", 328 "message": "DID is tombstoned" 329 })), 330 ) 331 .into_response(); 332 } 333 334 let services = input.services.map(|s| { 335 s.into_iter() 336 .map(|(k, v)| { 337 ( 338 k, 339 PlcService { 340 service_type: v.service_type, 341 endpoint: v.endpoint, 342 }, 343 ) 344 }) 345 .collect() 346 }); 347 348 let unsigned_op = match create_update_op( 349 &last_op, 350 input.rotation_keys, 351 input.verification_methods, 352 input.also_known_as, 353 services, 354 ) { 355 Ok(op) => op, 356 Err(PlcError::Tombstoned) => { 357 return ( 358 StatusCode::BAD_REQUEST, 359 Json(json!({ 360 "error": "InvalidRequest", 361 "message": "Cannot update tombstoned DID" 362 })), 363 ) 364 .into_response(); 365 } 366 Err(e) => { 367 error!("Failed to create PLC operation: {:?}", e); 368 return ( 369 StatusCode::INTERNAL_SERVER_ERROR, 370 Json(json!({"error": "InternalError"})), 371 ) 372 .into_response(); 373 } 374 }; 375 376 let signed_op = match sign_operation(&unsigned_op, &signing_key) { 377 Ok(op) => op, 378 Err(e) => { 379 error!("Failed to sign PLC operation: {:?}", e); 380 return ( 381 StatusCode::INTERNAL_SERVER_ERROR, 382 Json(json!({"error": "InternalError"})), 383 ) 384 .into_response(); 385 } 386 }; 387 388 let _ = sqlx::query!("DELETE FROM plc_operation_tokens WHERE id = $1", token_row.id) 389 .execute(&state.db) 390 .await; 391 392 info!("Signed PLC operation for user {}", did); 393 394 ( 395 StatusCode::OK, 396 Json(SignPlcOperationOutput { 397 operation: signed_op, 398 }), 399 ) 400 .into_response() 401} 402 403#[derive(Debug, Deserialize)] 404pub struct SubmitPlcOperationInput { 405 pub operation: Value, 406} 407 408pub async fn submit_plc_operation( 409 State(state): State<AppState>, 410 headers: axum::http::HeaderMap, 411 Json(input): Json<SubmitPlcOperationInput>, 412) -> Response { 413 let bearer = match crate::auth::extract_bearer_token_from_header( 414 headers.get("Authorization").and_then(|h| h.to_str().ok()), 415 ) { 416 Some(t) => t, 417 None => { 418 return ( 419 StatusCode::UNAUTHORIZED, 420 Json(json!({"error": "AuthenticationRequired"})), 421 ) 422 .into_response(); 423 } 424 }; 425 426 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await { 427 Ok(user) => user, 428 Err(e) => { 429 return ( 430 StatusCode::UNAUTHORIZED, 431 Json(json!({"error": "AuthenticationFailed", "message": e})), 432 ) 433 .into_response(); 434 } 435 }; 436 437 let did = &auth_user.did; 438 439 if let Err(e) = validate_plc_operation(&input.operation) { 440 return ( 441 StatusCode::BAD_REQUEST, 442 Json(json!({ 443 "error": "InvalidRequest", 444 "message": format!("Invalid operation: {}", e) 445 })), 446 ) 447 .into_response(); 448 } 449 450 let op = &input.operation; 451 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 452 let public_url = format!("https://{}", hostname); 453 454 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did) 455 .fetch_optional(&state.db) 456 .await 457 { 458 Ok(Some(row)) => row, 459 _ => { 460 return ( 461 StatusCode::NOT_FOUND, 462 Json(json!({"error": "AccountNotFound"})), 463 ) 464 .into_response(); 465 } 466 }; 467 468 let key_row = match sqlx::query!( 469 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 470 user.id 471 ) 472 .fetch_optional(&state.db) 473 .await 474 { 475 Ok(Some(row)) => row, 476 _ => { 477 return ( 478 StatusCode::INTERNAL_SERVER_ERROR, 479 Json(json!({"error": "InternalError", "message": "User signing key not found"})), 480 ) 481 .into_response(); 482 } 483 }; 484 485 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 486 { 487 Ok(k) => k, 488 Err(e) => { 489 error!("Failed to decrypt user key: {}", e); 490 return ( 491 StatusCode::INTERNAL_SERVER_ERROR, 492 Json(json!({"error": "InternalError"})), 493 ) 494 .into_response(); 495 } 496 }; 497 498 let signing_key = match SigningKey::from_slice(&key_bytes) { 499 Ok(k) => k, 500 Err(e) => { 501 error!("Failed to create signing key: {:?}", e); 502 return ( 503 StatusCode::INTERNAL_SERVER_ERROR, 504 Json(json!({"error": "InternalError"})), 505 ) 506 .into_response(); 507 } 508 }; 509 510 let user_did_key = signing_key_to_did_key(&signing_key); 511 512 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) { 513 let server_rotation_key = 514 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone()); 515 516 let has_server_key = rotation_keys 517 .iter() 518 .any(|k| k.as_str() == Some(&server_rotation_key)); 519 520 if !has_server_key { 521 return ( 522 StatusCode::BAD_REQUEST, 523 Json(json!({ 524 "error": "InvalidRequest", 525 "message": "Rotation keys do not include server's rotation key" 526 })), 527 ) 528 .into_response(); 529 } 530 } 531 532 if let Some(services) = op.get("services").and_then(|v| v.as_object()) { 533 if let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) { 534 let service_type = pds.get("type").and_then(|v| v.as_str()); 535 let endpoint = pds.get("endpoint").and_then(|v| v.as_str()); 536 537 if service_type != Some("AtprotoPersonalDataServer") { 538 return ( 539 StatusCode::BAD_REQUEST, 540 Json(json!({ 541 "error": "InvalidRequest", 542 "message": "Incorrect type on atproto_pds service" 543 })), 544 ) 545 .into_response(); 546 } 547 548 if endpoint != Some(&public_url) { 549 return ( 550 StatusCode::BAD_REQUEST, 551 Json(json!({ 552 "error": "InvalidRequest", 553 "message": "Incorrect endpoint on atproto_pds service" 554 })), 555 ) 556 .into_response(); 557 } 558 } 559 } 560 561 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) { 562 if let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) { 563 if atproto_key != user_did_key { 564 return ( 565 StatusCode::BAD_REQUEST, 566 Json(json!({ 567 "error": "InvalidRequest", 568 "message": "Incorrect signing key in verificationMethods" 569 })), 570 ) 571 .into_response(); 572 } 573 } 574 } 575 576 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) { 577 let expected_handle = format!("at://{}", user.handle); 578 let first_aka = also_known_as.first().and_then(|v| v.as_str()); 579 580 if first_aka != Some(&expected_handle) { 581 return ( 582 StatusCode::BAD_REQUEST, 583 Json(json!({ 584 "error": "InvalidRequest", 585 "message": "Incorrect handle in alsoKnownAs" 586 })), 587 ) 588 .into_response(); 589 } 590 } 591 592 let plc_client = PlcClient::new(None); 593 if let Err(e) = plc_client.send_operation(did, &input.operation).await { 594 error!("Failed to submit PLC operation: {:?}", e); 595 return ( 596 StatusCode::BAD_GATEWAY, 597 Json(json!({ 598 "error": "UpstreamError", 599 "message": format!("Failed to submit to PLC directory: {}", e) 600 })), 601 ) 602 .into_response(); 603 } 604 605 if let Err(e) = sqlx::query!( 606 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity')", 607 did 608 ) 609 .execute(&state.db) 610 .await 611 { 612 warn!("Failed to sequence identity event: {:?}", e); 613 } 614 615 info!("Submitted PLC operation for user {}", did); 616 617 (StatusCode::OK, Json(json!({}))).into_response() 618}