this repo has no description
1use crate::cache::Cache; 2use base32::Alphabet; 3use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; 4use k256::ecdsa::{Signature, SigningKey, signature::Signer}; 5use reqwest::Client; 6use serde::{Deserialize, Serialize}; 7use serde_json::{Value, json}; 8use sha2::{Digest, Sha256}; 9use std::collections::HashMap; 10use std::sync::Arc; 11use std::time::Duration; 12use thiserror::Error; 13 14#[derive(Error, Debug)] 15pub enum PlcError { 16 #[error("HTTP request failed: {0}")] 17 Http(#[from] reqwest::Error), 18 #[error("Invalid response: {0}")] 19 InvalidResponse(String), 20 #[error("DID not found")] 21 NotFound, 22 #[error("DID is tombstoned")] 23 Tombstoned, 24 #[error("Serialization error: {0}")] 25 Serialization(String), 26 #[error("Signing error: {0}")] 27 Signing(String), 28 #[error("Request timeout")] 29 Timeout, 30 #[error("Service unavailable (circuit breaker open)")] 31 CircuitBreakerOpen, 32} 33 34#[derive(Debug, Clone, Serialize, Deserialize)] 35pub struct PlcOperation { 36 #[serde(rename = "type")] 37 pub op_type: String, 38 #[serde(rename = "rotationKeys")] 39 pub rotation_keys: Vec<String>, 40 #[serde(rename = "verificationMethods")] 41 pub verification_methods: HashMap<String, String>, 42 #[serde(rename = "alsoKnownAs")] 43 pub also_known_as: Vec<String>, 44 pub services: HashMap<String, PlcService>, 45 pub prev: Option<String>, 46 #[serde(skip_serializing_if = "Option::is_none")] 47 pub sig: Option<String>, 48} 49 50#[derive(Debug, Clone, Serialize, Deserialize)] 51pub struct PlcService { 52 #[serde(rename = "type")] 53 pub service_type: String, 54 pub endpoint: String, 55} 56 57#[derive(Debug, Clone, Serialize, Deserialize)] 58pub struct PlcTombstone { 59 #[serde(rename = "type")] 60 pub op_type: String, 61 pub prev: String, 62 #[serde(skip_serializing_if = "Option::is_none")] 63 pub sig: Option<String>, 64} 65 66#[derive(Debug, Clone, Serialize, Deserialize)] 67#[serde(untagged)] 68pub enum PlcOpOrTombstone { 69 Operation(PlcOperation), 70 Tombstone(PlcTombstone), 71} 72 73impl PlcOpOrTombstone { 74 pub fn is_tombstone(&self) -> bool { 75 match self { 76 PlcOpOrTombstone::Tombstone(_) => true, 77 PlcOpOrTombstone::Operation(op) => op.op_type == "plc_tombstone", 78 } 79 } 80} 81 82const PLC_CACHE_TTL_SECS: u64 = 300; 83 84pub struct PlcClient { 85 base_url: String, 86 client: Client, 87 cache: Option<Arc<dyn Cache>>, 88} 89 90impl PlcClient { 91 pub fn new(base_url: Option<String>) -> Self { 92 Self::with_cache(base_url, None) 93 } 94 95 pub fn with_cache(base_url: Option<String>, cache: Option<Arc<dyn Cache>>) -> Self { 96 let base_url = base_url.unwrap_or_else(|| { 97 std::env::var("PLC_DIRECTORY_URL") 98 .unwrap_or_else(|_| "https://plc.directory".to_string()) 99 }); 100 let timeout_secs: u64 = std::env::var("PLC_TIMEOUT_SECS") 101 .ok() 102 .and_then(|v| v.parse().ok()) 103 .unwrap_or(10); 104 let connect_timeout_secs: u64 = std::env::var("PLC_CONNECT_TIMEOUT_SECS") 105 .ok() 106 .and_then(|v| v.parse().ok()) 107 .unwrap_or(5); 108 let client = Client::builder() 109 .timeout(Duration::from_secs(timeout_secs)) 110 .connect_timeout(Duration::from_secs(connect_timeout_secs)) 111 .pool_max_idle_per_host(5) 112 .pool_idle_timeout(Duration::from_secs(90)) 113 .build() 114 .unwrap_or_else(|_| Client::new()); 115 Self { 116 base_url, 117 client, 118 cache, 119 } 120 } 121 122 fn encode_did(did: &str) -> String { 123 urlencoding::encode(did).to_string() 124 } 125 126 pub async fn get_document(&self, did: &str) -> Result<Value, PlcError> { 127 let cache_key = format!("plc:doc:{}", did); 128 if let Some(ref cache) = self.cache 129 && let Some(cached) = cache.get(&cache_key).await 130 && let Ok(value) = serde_json::from_str(&cached) 131 { 132 return Ok(value); 133 } 134 let url = format!("{}/{}", self.base_url, Self::encode_did(did)); 135 let response = self.client.get(&url).send().await?; 136 if response.status() == reqwest::StatusCode::NOT_FOUND { 137 return Err(PlcError::NotFound); 138 } 139 if !response.status().is_success() { 140 let status = response.status(); 141 let body = response.text().await.unwrap_or_default(); 142 return Err(PlcError::InvalidResponse(format!( 143 "HTTP {}: {}", 144 status, body 145 ))); 146 } 147 let value: Value = response 148 .json() 149 .await 150 .map_err(|e| PlcError::InvalidResponse(e.to_string()))?; 151 if let Some(ref cache) = self.cache 152 && let Ok(json_str) = serde_json::to_string(&value) 153 { 154 let _ = cache 155 .set( 156 &cache_key, 157 &json_str, 158 Duration::from_secs(PLC_CACHE_TTL_SECS), 159 ) 160 .await; 161 } 162 Ok(value) 163 } 164 165 pub async fn get_document_data(&self, did: &str) -> Result<Value, PlcError> { 166 let cache_key = format!("plc:data:{}", did); 167 if let Some(ref cache) = self.cache 168 && let Some(cached) = cache.get(&cache_key).await 169 && let Ok(value) = serde_json::from_str(&cached) 170 { 171 return Ok(value); 172 } 173 let url = format!("{}/{}/data", self.base_url, Self::encode_did(did)); 174 let response = self.client.get(&url).send().await?; 175 if response.status() == reqwest::StatusCode::NOT_FOUND { 176 return Err(PlcError::NotFound); 177 } 178 if !response.status().is_success() { 179 let status = response.status(); 180 let body = response.text().await.unwrap_or_default(); 181 return Err(PlcError::InvalidResponse(format!( 182 "HTTP {}: {}", 183 status, body 184 ))); 185 } 186 let value: Value = response 187 .json() 188 .await 189 .map_err(|e| PlcError::InvalidResponse(e.to_string()))?; 190 if let Some(ref cache) = self.cache 191 && let Ok(json_str) = serde_json::to_string(&value) 192 { 193 let _ = cache 194 .set( 195 &cache_key, 196 &json_str, 197 Duration::from_secs(PLC_CACHE_TTL_SECS), 198 ) 199 .await; 200 } 201 Ok(value) 202 } 203 204 pub async fn get_last_op(&self, did: &str) -> Result<PlcOpOrTombstone, PlcError> { 205 let url = format!("{}/{}/log/last", self.base_url, Self::encode_did(did)); 206 let response = self.client.get(&url).send().await?; 207 if response.status() == reqwest::StatusCode::NOT_FOUND { 208 return Err(PlcError::NotFound); 209 } 210 if !response.status().is_success() { 211 let status = response.status(); 212 let body = response.text().await.unwrap_or_default(); 213 return Err(PlcError::InvalidResponse(format!( 214 "HTTP {}: {}", 215 status, body 216 ))); 217 } 218 response 219 .json() 220 .await 221 .map_err(|e| PlcError::InvalidResponse(e.to_string())) 222 } 223 224 pub async fn get_audit_log(&self, did: &str) -> Result<Vec<Value>, PlcError> { 225 let url = format!("{}/{}/log/audit", self.base_url, Self::encode_did(did)); 226 let response = self.client.get(&url).send().await?; 227 if response.status() == reqwest::StatusCode::NOT_FOUND { 228 return Err(PlcError::NotFound); 229 } 230 if !response.status().is_success() { 231 let status = response.status(); 232 let body = response.text().await.unwrap_or_default(); 233 return Err(PlcError::InvalidResponse(format!( 234 "HTTP {}: {}", 235 status, body 236 ))); 237 } 238 response 239 .json() 240 .await 241 .map_err(|e| PlcError::InvalidResponse(e.to_string())) 242 } 243 244 pub async fn send_operation(&self, did: &str, operation: &Value) -> Result<(), PlcError> { 245 let url = format!("{}/{}", self.base_url, Self::encode_did(did)); 246 let response = self.client.post(&url).json(operation).send().await?; 247 if !response.status().is_success() { 248 let status = response.status(); 249 let body = response.text().await.unwrap_or_default(); 250 return Err(PlcError::InvalidResponse(format!( 251 "HTTP {}: {}", 252 status, body 253 ))); 254 } 255 Ok(()) 256 } 257} 258 259pub fn cid_for_cbor(value: &Value) -> Result<String, PlcError> { 260 let cbor_bytes = 261 serde_ipld_dagcbor::to_vec(value).map_err(|e| PlcError::Serialization(e.to_string()))?; 262 let mut hasher = Sha256::new(); 263 hasher.update(&cbor_bytes); 264 let hash = hasher.finalize(); 265 let multihash = multihash::Multihash::wrap(0x12, &hash) 266 .map_err(|e| PlcError::Serialization(e.to_string()))?; 267 let cid = cid::Cid::new_v1(0x71, multihash); 268 Ok(cid.to_string()) 269} 270 271pub fn sign_operation(operation: &Value, signing_key: &SigningKey) -> Result<Value, PlcError> { 272 let mut op = operation.clone(); 273 if let Some(obj) = op.as_object_mut() { 274 obj.remove("sig"); 275 } 276 let cbor_bytes = 277 serde_ipld_dagcbor::to_vec(&op).map_err(|e| PlcError::Serialization(e.to_string()))?; 278 let signature: Signature = signing_key.sign(&cbor_bytes); 279 let sig_bytes = signature.to_bytes(); 280 let sig_b64 = URL_SAFE_NO_PAD.encode(sig_bytes); 281 if let Some(obj) = op.as_object_mut() { 282 obj.insert("sig".to_string(), json!(sig_b64)); 283 } 284 Ok(op) 285} 286 287pub fn create_update_op( 288 last_op: &PlcOpOrTombstone, 289 rotation_keys: Option<Vec<String>>, 290 verification_methods: Option<HashMap<String, String>>, 291 also_known_as: Option<Vec<String>>, 292 services: Option<HashMap<String, PlcService>>, 293) -> Result<Value, PlcError> { 294 let prev_value = match last_op { 295 PlcOpOrTombstone::Operation(op) => { 296 serde_json::to_value(op).map_err(|e| PlcError::Serialization(e.to_string()))? 297 } 298 PlcOpOrTombstone::Tombstone(t) => { 299 serde_json::to_value(t).map_err(|e| PlcError::Serialization(e.to_string()))? 300 } 301 }; 302 let prev_cid = cid_for_cbor(&prev_value)?; 303 let (base_rotation_keys, base_verification_methods, base_also_known_as, base_services) = 304 match last_op { 305 PlcOpOrTombstone::Operation(op) => ( 306 op.rotation_keys.clone(), 307 op.verification_methods.clone(), 308 op.also_known_as.clone(), 309 op.services.clone(), 310 ), 311 PlcOpOrTombstone::Tombstone(_) => { 312 return Err(PlcError::Tombstoned); 313 } 314 }; 315 let new_op = PlcOperation { 316 op_type: "plc_operation".to_string(), 317 rotation_keys: rotation_keys.unwrap_or(base_rotation_keys), 318 verification_methods: verification_methods.unwrap_or(base_verification_methods), 319 also_known_as: also_known_as.unwrap_or(base_also_known_as), 320 services: services.unwrap_or(base_services), 321 prev: Some(prev_cid), 322 sig: None, 323 }; 324 serde_json::to_value(new_op).map_err(|e| PlcError::Serialization(e.to_string())) 325} 326 327pub fn signing_key_to_did_key(signing_key: &SigningKey) -> String { 328 let verifying_key = signing_key.verifying_key(); 329 let point = verifying_key.to_encoded_point(true); 330 let compressed_bytes = point.as_bytes(); 331 let mut prefixed = vec![0xe7, 0x01]; 332 prefixed.extend_from_slice(compressed_bytes); 333 let encoded = multibase::encode(multibase::Base::Base58Btc, &prefixed); 334 format!("did:key:{}", encoded) 335} 336 337pub struct GenesisResult { 338 pub did: String, 339 pub signed_operation: Value, 340} 341 342pub fn create_genesis_operation( 343 signing_key: &SigningKey, 344 rotation_key: &str, 345 handle: &str, 346 pds_endpoint: &str, 347) -> Result<GenesisResult, PlcError> { 348 let signing_did_key = signing_key_to_did_key(signing_key); 349 let mut verification_methods = HashMap::new(); 350 verification_methods.insert("atproto".to_string(), signing_did_key.clone()); 351 let mut services = HashMap::new(); 352 services.insert( 353 "atproto_pds".to_string(), 354 PlcService { 355 service_type: "AtprotoPersonalDataServer".to_string(), 356 endpoint: pds_endpoint.to_string(), 357 }, 358 ); 359 let genesis_op = PlcOperation { 360 op_type: "plc_operation".to_string(), 361 rotation_keys: vec![rotation_key.to_string()], 362 verification_methods, 363 also_known_as: vec![format!("at://{}", handle)], 364 services, 365 prev: None, 366 sig: None, 367 }; 368 let genesis_value = 369 serde_json::to_value(&genesis_op).map_err(|e| PlcError::Serialization(e.to_string()))?; 370 let signed_op = sign_operation(&genesis_value, signing_key)?; 371 let did = did_for_genesis_op(&signed_op)?; 372 Ok(GenesisResult { 373 did, 374 signed_operation: signed_op, 375 }) 376} 377 378pub fn did_for_genesis_op(signed_op: &Value) -> Result<String, PlcError> { 379 let cbor_bytes = serde_ipld_dagcbor::to_vec(signed_op) 380 .map_err(|e| PlcError::Serialization(e.to_string()))?; 381 let mut hasher = Sha256::new(); 382 hasher.update(&cbor_bytes); 383 let hash = hasher.finalize(); 384 let encoded = base32::encode(Alphabet::Rfc4648Lower { padding: false }, &hash); 385 let truncated = &encoded[..24]; 386 Ok(format!("did:plc:{}", truncated)) 387} 388 389pub fn validate_plc_operation(op: &Value) -> Result<(), PlcError> { 390 let obj = op 391 .as_object() 392 .ok_or_else(|| PlcError::InvalidResponse("Operation must be an object".to_string()))?; 393 let op_type = obj 394 .get("type") 395 .and_then(|v| v.as_str()) 396 .ok_or_else(|| PlcError::InvalidResponse("Missing type field".to_string()))?; 397 if op_type != "plc_operation" && op_type != "plc_tombstone" { 398 return Err(PlcError::InvalidResponse(format!( 399 "Invalid type: {}", 400 op_type 401 ))); 402 } 403 if op_type == "plc_operation" { 404 if obj.get("rotationKeys").is_none() { 405 return Err(PlcError::InvalidResponse( 406 "Missing rotationKeys".to_string(), 407 )); 408 } 409 if obj.get("verificationMethods").is_none() { 410 return Err(PlcError::InvalidResponse( 411 "Missing verificationMethods".to_string(), 412 )); 413 } 414 if obj.get("alsoKnownAs").is_none() { 415 return Err(PlcError::InvalidResponse("Missing alsoKnownAs".to_string())); 416 } 417 if obj.get("services").is_none() { 418 return Err(PlcError::InvalidResponse("Missing services".to_string())); 419 } 420 } 421 if obj.get("sig").is_none() { 422 return Err(PlcError::InvalidResponse("Missing sig".to_string())); 423 } 424 Ok(()) 425} 426 427pub struct PlcValidationContext { 428 pub server_rotation_key: String, 429 pub expected_signing_key: String, 430 pub expected_handle: String, 431 pub expected_pds_endpoint: String, 432} 433 434pub fn validate_plc_operation_for_submission( 435 op: &Value, 436 ctx: &PlcValidationContext, 437) -> Result<(), PlcError> { 438 validate_plc_operation(op)?; 439 let obj = op 440 .as_object() 441 .ok_or_else(|| PlcError::InvalidResponse("Operation must be an object".to_string()))?; 442 let op_type = obj.get("type").and_then(|v| v.as_str()).unwrap_or(""); 443 if op_type != "plc_operation" { 444 return Ok(()); 445 } 446 let rotation_keys = obj 447 .get("rotationKeys") 448 .and_then(|v| v.as_array()) 449 .ok_or_else(|| PlcError::InvalidResponse("rotationKeys must be an array".to_string()))?; 450 let rotation_key_strings: Vec<&str> = rotation_keys.iter().filter_map(|v| v.as_str()).collect(); 451 if !rotation_key_strings.contains(&ctx.server_rotation_key.as_str()) { 452 return Err(PlcError::InvalidResponse( 453 "Rotation keys do not include server's rotation key".to_string(), 454 )); 455 } 456 let verification_methods = obj 457 .get("verificationMethods") 458 .and_then(|v| v.as_object()) 459 .ok_or_else(|| { 460 PlcError::InvalidResponse("verificationMethods must be an object".to_string()) 461 })?; 462 if let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) 463 && atproto_key != ctx.expected_signing_key 464 { 465 return Err(PlcError::InvalidResponse( 466 "Incorrect signing key".to_string(), 467 )); 468 } 469 let also_known_as = obj 470 .get("alsoKnownAs") 471 .and_then(|v| v.as_array()) 472 .ok_or_else(|| PlcError::InvalidResponse("alsoKnownAs must be an array".to_string()))?; 473 let expected_handle_uri = format!("at://{}", ctx.expected_handle); 474 let has_correct_handle = also_known_as 475 .iter() 476 .filter_map(|v| v.as_str()) 477 .any(|s| s == expected_handle_uri); 478 if !has_correct_handle && !also_known_as.is_empty() { 479 return Err(PlcError::InvalidResponse( 480 "Incorrect handle in alsoKnownAs".to_string(), 481 )); 482 } 483 let services = obj 484 .get("services") 485 .and_then(|v| v.as_object()) 486 .ok_or_else(|| PlcError::InvalidResponse("services must be an object".to_string()))?; 487 if let Some(pds_service) = services.get("atproto_pds").and_then(|v| v.as_object()) { 488 let service_type = pds_service 489 .get("type") 490 .and_then(|v| v.as_str()) 491 .unwrap_or(""); 492 if service_type != "AtprotoPersonalDataServer" { 493 return Err(PlcError::InvalidResponse( 494 "Incorrect type on atproto_pds service".to_string(), 495 )); 496 } 497 let endpoint = pds_service 498 .get("endpoint") 499 .and_then(|v| v.as_str()) 500 .unwrap_or(""); 501 if endpoint != ctx.expected_pds_endpoint { 502 return Err(PlcError::InvalidResponse( 503 "Incorrect endpoint on atproto_pds service".to_string(), 504 )); 505 } 506 } 507 Ok(()) 508} 509 510pub fn verify_operation_signature(op: &Value, rotation_keys: &[String]) -> Result<bool, PlcError> { 511 let obj = op 512 .as_object() 513 .ok_or_else(|| PlcError::InvalidResponse("Operation must be an object".to_string()))?; 514 let sig_b64 = obj 515 .get("sig") 516 .and_then(|v| v.as_str()) 517 .ok_or_else(|| PlcError::InvalidResponse("Missing sig".to_string()))?; 518 let sig_bytes = URL_SAFE_NO_PAD 519 .decode(sig_b64) 520 .map_err(|e| PlcError::InvalidResponse(format!("Invalid signature encoding: {}", e)))?; 521 let signature = Signature::from_slice(&sig_bytes) 522 .map_err(|e| PlcError::InvalidResponse(format!("Invalid signature format: {}", e)))?; 523 let mut unsigned_op = op.clone(); 524 if let Some(unsigned_obj) = unsigned_op.as_object_mut() { 525 unsigned_obj.remove("sig"); 526 } 527 let cbor_bytes = serde_ipld_dagcbor::to_vec(&unsigned_op) 528 .map_err(|e| PlcError::Serialization(e.to_string()))?; 529 for key_did in rotation_keys { 530 if let Ok(true) = verify_signature_with_did_key(key_did, &cbor_bytes, &signature) { 531 return Ok(true); 532 } 533 } 534 Ok(false) 535} 536 537fn verify_signature_with_did_key( 538 did_key: &str, 539 message: &[u8], 540 signature: &Signature, 541) -> Result<bool, PlcError> { 542 use k256::ecdsa::{VerifyingKey, signature::Verifier}; 543 if !did_key.starts_with("did:key:z") { 544 return Err(PlcError::InvalidResponse( 545 "Invalid did:key format".to_string(), 546 )); 547 } 548 let multibase_part = &did_key[8..]; 549 let (_, decoded) = multibase::decode(multibase_part) 550 .map_err(|e| PlcError::InvalidResponse(format!("Failed to decode did:key: {}", e)))?; 551 if decoded.len() < 2 { 552 return Err(PlcError::InvalidResponse( 553 "Invalid did:key data".to_string(), 554 )); 555 } 556 let (codec, key_bytes) = if decoded[0] == 0xe7 && decoded[1] == 0x01 { 557 (0xe701u16, &decoded[2..]) 558 } else { 559 return Err(PlcError::InvalidResponse( 560 "Unsupported key type in did:key".to_string(), 561 )); 562 }; 563 if codec != 0xe701 { 564 return Err(PlcError::InvalidResponse( 565 "Only secp256k1 keys are supported".to_string(), 566 )); 567 } 568 let verifying_key = VerifyingKey::from_sec1_bytes(key_bytes) 569 .map_err(|e| PlcError::InvalidResponse(format!("Invalid public key: {}", e)))?; 570 Ok(verifying_key.verify(message, signature).is_ok()) 571} 572 573#[cfg(test)] 574mod tests { 575 use super::*; 576 577 #[test] 578 fn test_signing_key_to_did_key() { 579 let key = SigningKey::random(&mut rand::thread_rng()); 580 let did_key = signing_key_to_did_key(&key); 581 assert!(did_key.starts_with("did:key:z")); 582 } 583 584 #[test] 585 fn test_cid_for_cbor() { 586 let value = json!({ 587 "test": "data", 588 "number": 42 589 }); 590 let cid = cid_for_cbor(&value).unwrap(); 591 assert!(cid.starts_with("bafyrei")); 592 } 593 594 #[test] 595 fn test_sign_operation() { 596 let key = SigningKey::random(&mut rand::thread_rng()); 597 let op = json!({ 598 "type": "plc_operation", 599 "rotationKeys": [], 600 "verificationMethods": {}, 601 "alsoKnownAs": [], 602 "services": {}, 603 "prev": null 604 }); 605 let signed = sign_operation(&op, &key).unwrap(); 606 assert!(signed.get("sig").is_some()); 607 } 608}