this repo has no description
1use crate::cache::Cache;
2use base32::Alphabet;
3use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
4use k256::ecdsa::{Signature, SigningKey, signature::Signer};
5use reqwest::Client;
6use serde::{Deserialize, Serialize};
7use serde_json::{Value, json};
8use sha2::{Digest, Sha256};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::Duration;
12use thiserror::Error;
13
14#[derive(Error, Debug)]
15pub enum PlcError {
16 #[error("HTTP request failed: {0}")]
17 Http(#[from] reqwest::Error),
18 #[error("Invalid response: {0}")]
19 InvalidResponse(String),
20 #[error("DID not found")]
21 NotFound,
22 #[error("DID is tombstoned")]
23 Tombstoned,
24 #[error("Serialization error: {0}")]
25 Serialization(String),
26 #[error("Signing error: {0}")]
27 Signing(String),
28 #[error("Request timeout")]
29 Timeout,
30 #[error("Service unavailable (circuit breaker open)")]
31 CircuitBreakerOpen,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct PlcOperation {
36 #[serde(rename = "type")]
37 pub op_type: String,
38 #[serde(rename = "rotationKeys")]
39 pub rotation_keys: Vec<String>,
40 #[serde(rename = "verificationMethods")]
41 pub verification_methods: HashMap<String, String>,
42 #[serde(rename = "alsoKnownAs")]
43 pub also_known_as: Vec<String>,
44 pub services: HashMap<String, PlcService>,
45 pub prev: Option<String>,
46 #[serde(skip_serializing_if = "Option::is_none")]
47 pub sig: Option<String>,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct PlcService {
52 #[serde(rename = "type")]
53 pub service_type: String,
54 pub endpoint: String,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct PlcTombstone {
59 #[serde(rename = "type")]
60 pub op_type: String,
61 pub prev: String,
62 #[serde(skip_serializing_if = "Option::is_none")]
63 pub sig: Option<String>,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67#[serde(untagged)]
68pub enum PlcOpOrTombstone {
69 Operation(PlcOperation),
70 Tombstone(PlcTombstone),
71}
72
73impl PlcOpOrTombstone {
74 pub fn is_tombstone(&self) -> bool {
75 match self {
76 PlcOpOrTombstone::Tombstone(_) => true,
77 PlcOpOrTombstone::Operation(op) => op.op_type == "plc_tombstone",
78 }
79 }
80}
81
82const PLC_CACHE_TTL_SECS: u64 = 300;
83
84pub struct PlcClient {
85 base_url: String,
86 client: Client,
87 cache: Option<Arc<dyn Cache>>,
88}
89
90impl PlcClient {
91 pub fn new(base_url: Option<String>) -> Self {
92 Self::with_cache(base_url, None)
93 }
94
95 pub fn with_cache(base_url: Option<String>, cache: Option<Arc<dyn Cache>>) -> Self {
96 let base_url = base_url.unwrap_or_else(|| {
97 std::env::var("PLC_DIRECTORY_URL")
98 .unwrap_or_else(|_| "https://plc.directory".to_string())
99 });
100 let timeout_secs: u64 = std::env::var("PLC_TIMEOUT_SECS")
101 .ok()
102 .and_then(|v| v.parse().ok())
103 .unwrap_or(10);
104 let connect_timeout_secs: u64 = std::env::var("PLC_CONNECT_TIMEOUT_SECS")
105 .ok()
106 .and_then(|v| v.parse().ok())
107 .unwrap_or(5);
108 let client = Client::builder()
109 .timeout(Duration::from_secs(timeout_secs))
110 .connect_timeout(Duration::from_secs(connect_timeout_secs))
111 .pool_max_idle_per_host(5)
112 .pool_idle_timeout(Duration::from_secs(90))
113 .build()
114 .unwrap_or_else(|_| Client::new());
115 Self {
116 base_url,
117 client,
118 cache,
119 }
120 }
121
122 fn encode_did(did: &str) -> String {
123 urlencoding::encode(did).to_string()
124 }
125
126 pub async fn get_document(&self, did: &str) -> Result<Value, PlcError> {
127 let cache_key = format!("plc:doc:{}", did);
128 if let Some(ref cache) = self.cache
129 && let Some(cached) = cache.get(&cache_key).await
130 && let Ok(value) = serde_json::from_str(&cached)
131 {
132 return Ok(value);
133 }
134 let url = format!("{}/{}", self.base_url, Self::encode_did(did));
135 let response = self.client.get(&url).send().await?;
136 if response.status() == reqwest::StatusCode::NOT_FOUND {
137 return Err(PlcError::NotFound);
138 }
139 if !response.status().is_success() {
140 let status = response.status();
141 let body = response.text().await.unwrap_or_default();
142 return Err(PlcError::InvalidResponse(format!(
143 "HTTP {}: {}",
144 status, body
145 )));
146 }
147 let value: Value = response
148 .json()
149 .await
150 .map_err(|e| PlcError::InvalidResponse(e.to_string()))?;
151 if let Some(ref cache) = self.cache
152 && let Ok(json_str) = serde_json::to_string(&value)
153 {
154 let _ = cache
155 .set(
156 &cache_key,
157 &json_str,
158 Duration::from_secs(PLC_CACHE_TTL_SECS),
159 )
160 .await;
161 }
162 Ok(value)
163 }
164
165 pub async fn get_document_data(&self, did: &str) -> Result<Value, PlcError> {
166 let cache_key = format!("plc:data:{}", did);
167 if let Some(ref cache) = self.cache
168 && let Some(cached) = cache.get(&cache_key).await
169 && let Ok(value) = serde_json::from_str(&cached)
170 {
171 return Ok(value);
172 }
173 let url = format!("{}/{}/data", self.base_url, Self::encode_did(did));
174 let response = self.client.get(&url).send().await?;
175 if response.status() == reqwest::StatusCode::NOT_FOUND {
176 return Err(PlcError::NotFound);
177 }
178 if !response.status().is_success() {
179 let status = response.status();
180 let body = response.text().await.unwrap_or_default();
181 return Err(PlcError::InvalidResponse(format!(
182 "HTTP {}: {}",
183 status, body
184 )));
185 }
186 let value: Value = response
187 .json()
188 .await
189 .map_err(|e| PlcError::InvalidResponse(e.to_string()))?;
190 if let Some(ref cache) = self.cache
191 && let Ok(json_str) = serde_json::to_string(&value)
192 {
193 let _ = cache
194 .set(
195 &cache_key,
196 &json_str,
197 Duration::from_secs(PLC_CACHE_TTL_SECS),
198 )
199 .await;
200 }
201 Ok(value)
202 }
203
204 pub async fn get_last_op(&self, did: &str) -> Result<PlcOpOrTombstone, PlcError> {
205 let url = format!("{}/{}/log/last", self.base_url, Self::encode_did(did));
206 let response = self.client.get(&url).send().await?;
207 if response.status() == reqwest::StatusCode::NOT_FOUND {
208 return Err(PlcError::NotFound);
209 }
210 if !response.status().is_success() {
211 let status = response.status();
212 let body = response.text().await.unwrap_or_default();
213 return Err(PlcError::InvalidResponse(format!(
214 "HTTP {}: {}",
215 status, body
216 )));
217 }
218 response
219 .json()
220 .await
221 .map_err(|e| PlcError::InvalidResponse(e.to_string()))
222 }
223
224 pub async fn get_audit_log(&self, did: &str) -> Result<Vec<Value>, PlcError> {
225 let url = format!("{}/{}/log/audit", self.base_url, Self::encode_did(did));
226 let response = self.client.get(&url).send().await?;
227 if response.status() == reqwest::StatusCode::NOT_FOUND {
228 return Err(PlcError::NotFound);
229 }
230 if !response.status().is_success() {
231 let status = response.status();
232 let body = response.text().await.unwrap_or_default();
233 return Err(PlcError::InvalidResponse(format!(
234 "HTTP {}: {}",
235 status, body
236 )));
237 }
238 response
239 .json()
240 .await
241 .map_err(|e| PlcError::InvalidResponse(e.to_string()))
242 }
243
244 pub async fn send_operation(&self, did: &str, operation: &Value) -> Result<(), PlcError> {
245 let url = format!("{}/{}", self.base_url, Self::encode_did(did));
246 let response = self.client.post(&url).json(operation).send().await?;
247 if !response.status().is_success() {
248 let status = response.status();
249 let body = response.text().await.unwrap_or_default();
250 return Err(PlcError::InvalidResponse(format!(
251 "HTTP {}: {}",
252 status, body
253 )));
254 }
255 Ok(())
256 }
257}
258
259pub fn cid_for_cbor(value: &Value) -> Result<String, PlcError> {
260 let cbor_bytes =
261 serde_ipld_dagcbor::to_vec(value).map_err(|e| PlcError::Serialization(e.to_string()))?;
262 let mut hasher = Sha256::new();
263 hasher.update(&cbor_bytes);
264 let hash = hasher.finalize();
265 let multihash = multihash::Multihash::wrap(0x12, &hash)
266 .map_err(|e| PlcError::Serialization(e.to_string()))?;
267 let cid = cid::Cid::new_v1(0x71, multihash);
268 Ok(cid.to_string())
269}
270
271pub fn sign_operation(operation: &Value, signing_key: &SigningKey) -> Result<Value, PlcError> {
272 let mut op = operation.clone();
273 if let Some(obj) = op.as_object_mut() {
274 obj.remove("sig");
275 }
276 let cbor_bytes =
277 serde_ipld_dagcbor::to_vec(&op).map_err(|e| PlcError::Serialization(e.to_string()))?;
278 let signature: Signature = signing_key.sign(&cbor_bytes);
279 let sig_bytes = signature.to_bytes();
280 let sig_b64 = URL_SAFE_NO_PAD.encode(sig_bytes);
281 if let Some(obj) = op.as_object_mut() {
282 obj.insert("sig".to_string(), json!(sig_b64));
283 }
284 Ok(op)
285}
286
287pub fn create_update_op(
288 last_op: &PlcOpOrTombstone,
289 rotation_keys: Option<Vec<String>>,
290 verification_methods: Option<HashMap<String, String>>,
291 also_known_as: Option<Vec<String>>,
292 services: Option<HashMap<String, PlcService>>,
293) -> Result<Value, PlcError> {
294 let prev_value = match last_op {
295 PlcOpOrTombstone::Operation(op) => {
296 serde_json::to_value(op).map_err(|e| PlcError::Serialization(e.to_string()))?
297 }
298 PlcOpOrTombstone::Tombstone(t) => {
299 serde_json::to_value(t).map_err(|e| PlcError::Serialization(e.to_string()))?
300 }
301 };
302 let prev_cid = cid_for_cbor(&prev_value)?;
303 let (base_rotation_keys, base_verification_methods, base_also_known_as, base_services) =
304 match last_op {
305 PlcOpOrTombstone::Operation(op) => (
306 op.rotation_keys.clone(),
307 op.verification_methods.clone(),
308 op.also_known_as.clone(),
309 op.services.clone(),
310 ),
311 PlcOpOrTombstone::Tombstone(_) => {
312 return Err(PlcError::Tombstoned);
313 }
314 };
315 let new_op = PlcOperation {
316 op_type: "plc_operation".to_string(),
317 rotation_keys: rotation_keys.unwrap_or(base_rotation_keys),
318 verification_methods: verification_methods.unwrap_or(base_verification_methods),
319 also_known_as: also_known_as.unwrap_or(base_also_known_as),
320 services: services.unwrap_or(base_services),
321 prev: Some(prev_cid),
322 sig: None,
323 };
324 serde_json::to_value(new_op).map_err(|e| PlcError::Serialization(e.to_string()))
325}
326
327pub fn signing_key_to_did_key(signing_key: &SigningKey) -> String {
328 let verifying_key = signing_key.verifying_key();
329 let point = verifying_key.to_encoded_point(true);
330 let compressed_bytes = point.as_bytes();
331 let mut prefixed = vec![0xe7, 0x01];
332 prefixed.extend_from_slice(compressed_bytes);
333 let encoded = multibase::encode(multibase::Base::Base58Btc, &prefixed);
334 format!("did:key:{}", encoded)
335}
336
337pub struct GenesisResult {
338 pub did: String,
339 pub signed_operation: Value,
340}
341
342pub fn create_genesis_operation(
343 signing_key: &SigningKey,
344 rotation_key: &str,
345 handle: &str,
346 pds_endpoint: &str,
347) -> Result<GenesisResult, PlcError> {
348 let signing_did_key = signing_key_to_did_key(signing_key);
349 let mut verification_methods = HashMap::new();
350 verification_methods.insert("atproto".to_string(), signing_did_key.clone());
351 let mut services = HashMap::new();
352 services.insert(
353 "atproto_pds".to_string(),
354 PlcService {
355 service_type: "AtprotoPersonalDataServer".to_string(),
356 endpoint: pds_endpoint.to_string(),
357 },
358 );
359 let genesis_op = PlcOperation {
360 op_type: "plc_operation".to_string(),
361 rotation_keys: vec![rotation_key.to_string()],
362 verification_methods,
363 also_known_as: vec![format!("at://{}", handle)],
364 services,
365 prev: None,
366 sig: None,
367 };
368 let genesis_value =
369 serde_json::to_value(&genesis_op).map_err(|e| PlcError::Serialization(e.to_string()))?;
370 let signed_op = sign_operation(&genesis_value, signing_key)?;
371 let did = did_for_genesis_op(&signed_op)?;
372 Ok(GenesisResult {
373 did,
374 signed_operation: signed_op,
375 })
376}
377
378pub fn did_for_genesis_op(signed_op: &Value) -> Result<String, PlcError> {
379 let cbor_bytes = serde_ipld_dagcbor::to_vec(signed_op)
380 .map_err(|e| PlcError::Serialization(e.to_string()))?;
381 let mut hasher = Sha256::new();
382 hasher.update(&cbor_bytes);
383 let hash = hasher.finalize();
384 let encoded = base32::encode(Alphabet::Rfc4648Lower { padding: false }, &hash);
385 let truncated = &encoded[..24];
386 Ok(format!("did:plc:{}", truncated))
387}
388
389pub fn validate_plc_operation(op: &Value) -> Result<(), PlcError> {
390 let obj = op
391 .as_object()
392 .ok_or_else(|| PlcError::InvalidResponse("Operation must be an object".to_string()))?;
393 let op_type = obj
394 .get("type")
395 .and_then(|v| v.as_str())
396 .ok_or_else(|| PlcError::InvalidResponse("Missing type field".to_string()))?;
397 if op_type != "plc_operation" && op_type != "plc_tombstone" {
398 return Err(PlcError::InvalidResponse(format!(
399 "Invalid type: {}",
400 op_type
401 )));
402 }
403 if op_type == "plc_operation" {
404 if obj.get("rotationKeys").is_none() {
405 return Err(PlcError::InvalidResponse(
406 "Missing rotationKeys".to_string(),
407 ));
408 }
409 if obj.get("verificationMethods").is_none() {
410 return Err(PlcError::InvalidResponse(
411 "Missing verificationMethods".to_string(),
412 ));
413 }
414 if obj.get("alsoKnownAs").is_none() {
415 return Err(PlcError::InvalidResponse("Missing alsoKnownAs".to_string()));
416 }
417 if obj.get("services").is_none() {
418 return Err(PlcError::InvalidResponse("Missing services".to_string()));
419 }
420 }
421 if obj.get("sig").is_none() {
422 return Err(PlcError::InvalidResponse("Missing sig".to_string()));
423 }
424 Ok(())
425}
426
427pub struct PlcValidationContext {
428 pub server_rotation_key: String,
429 pub expected_signing_key: String,
430 pub expected_handle: String,
431 pub expected_pds_endpoint: String,
432}
433
434pub fn validate_plc_operation_for_submission(
435 op: &Value,
436 ctx: &PlcValidationContext,
437) -> Result<(), PlcError> {
438 validate_plc_operation(op)?;
439 let obj = op
440 .as_object()
441 .ok_or_else(|| PlcError::InvalidResponse("Operation must be an object".to_string()))?;
442 let op_type = obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
443 if op_type != "plc_operation" {
444 return Ok(());
445 }
446 let rotation_keys = obj
447 .get("rotationKeys")
448 .and_then(|v| v.as_array())
449 .ok_or_else(|| PlcError::InvalidResponse("rotationKeys must be an array".to_string()))?;
450 let rotation_key_strings: Vec<&str> = rotation_keys.iter().filter_map(|v| v.as_str()).collect();
451 if !rotation_key_strings.contains(&ctx.server_rotation_key.as_str()) {
452 return Err(PlcError::InvalidResponse(
453 "Rotation keys do not include server's rotation key".to_string(),
454 ));
455 }
456 let verification_methods = obj
457 .get("verificationMethods")
458 .and_then(|v| v.as_object())
459 .ok_or_else(|| {
460 PlcError::InvalidResponse("verificationMethods must be an object".to_string())
461 })?;
462 if let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str())
463 && atproto_key != ctx.expected_signing_key
464 {
465 return Err(PlcError::InvalidResponse(
466 "Incorrect signing key".to_string(),
467 ));
468 }
469 let also_known_as = obj
470 .get("alsoKnownAs")
471 .and_then(|v| v.as_array())
472 .ok_or_else(|| PlcError::InvalidResponse("alsoKnownAs must be an array".to_string()))?;
473 let expected_handle_uri = format!("at://{}", ctx.expected_handle);
474 let has_correct_handle = also_known_as
475 .iter()
476 .filter_map(|v| v.as_str())
477 .any(|s| s == expected_handle_uri);
478 if !has_correct_handle && !also_known_as.is_empty() {
479 return Err(PlcError::InvalidResponse(
480 "Incorrect handle in alsoKnownAs".to_string(),
481 ));
482 }
483 let services = obj
484 .get("services")
485 .and_then(|v| v.as_object())
486 .ok_or_else(|| PlcError::InvalidResponse("services must be an object".to_string()))?;
487 if let Some(pds_service) = services.get("atproto_pds").and_then(|v| v.as_object()) {
488 let service_type = pds_service
489 .get("type")
490 .and_then(|v| v.as_str())
491 .unwrap_or("");
492 if service_type != "AtprotoPersonalDataServer" {
493 return Err(PlcError::InvalidResponse(
494 "Incorrect type on atproto_pds service".to_string(),
495 ));
496 }
497 let endpoint = pds_service
498 .get("endpoint")
499 .and_then(|v| v.as_str())
500 .unwrap_or("");
501 if endpoint != ctx.expected_pds_endpoint {
502 return Err(PlcError::InvalidResponse(
503 "Incorrect endpoint on atproto_pds service".to_string(),
504 ));
505 }
506 }
507 Ok(())
508}
509
510pub fn verify_operation_signature(op: &Value, rotation_keys: &[String]) -> Result<bool, PlcError> {
511 let obj = op
512 .as_object()
513 .ok_or_else(|| PlcError::InvalidResponse("Operation must be an object".to_string()))?;
514 let sig_b64 = obj
515 .get("sig")
516 .and_then(|v| v.as_str())
517 .ok_or_else(|| PlcError::InvalidResponse("Missing sig".to_string()))?;
518 let sig_bytes = URL_SAFE_NO_PAD
519 .decode(sig_b64)
520 .map_err(|e| PlcError::InvalidResponse(format!("Invalid signature encoding: {}", e)))?;
521 let signature = Signature::from_slice(&sig_bytes)
522 .map_err(|e| PlcError::InvalidResponse(format!("Invalid signature format: {}", e)))?;
523 let mut unsigned_op = op.clone();
524 if let Some(unsigned_obj) = unsigned_op.as_object_mut() {
525 unsigned_obj.remove("sig");
526 }
527 let cbor_bytes = serde_ipld_dagcbor::to_vec(&unsigned_op)
528 .map_err(|e| PlcError::Serialization(e.to_string()))?;
529 for key_did in rotation_keys {
530 if let Ok(true) = verify_signature_with_did_key(key_did, &cbor_bytes, &signature) {
531 return Ok(true);
532 }
533 }
534 Ok(false)
535}
536
537fn verify_signature_with_did_key(
538 did_key: &str,
539 message: &[u8],
540 signature: &Signature,
541) -> Result<bool, PlcError> {
542 use k256::ecdsa::{VerifyingKey, signature::Verifier};
543 if !did_key.starts_with("did:key:z") {
544 return Err(PlcError::InvalidResponse(
545 "Invalid did:key format".to_string(),
546 ));
547 }
548 let multibase_part = &did_key[8..];
549 let (_, decoded) = multibase::decode(multibase_part)
550 .map_err(|e| PlcError::InvalidResponse(format!("Failed to decode did:key: {}", e)))?;
551 if decoded.len() < 2 {
552 return Err(PlcError::InvalidResponse(
553 "Invalid did:key data".to_string(),
554 ));
555 }
556 let (codec, key_bytes) = if decoded[0] == 0xe7 && decoded[1] == 0x01 {
557 (0xe701u16, &decoded[2..])
558 } else {
559 return Err(PlcError::InvalidResponse(
560 "Unsupported key type in did:key".to_string(),
561 ));
562 };
563 if codec != 0xe701 {
564 return Err(PlcError::InvalidResponse(
565 "Only secp256k1 keys are supported".to_string(),
566 ));
567 }
568 let verifying_key = VerifyingKey::from_sec1_bytes(key_bytes)
569 .map_err(|e| PlcError::InvalidResponse(format!("Invalid public key: {}", e)))?;
570 Ok(verifying_key.verify(message, signature).is_ok())
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576
577 #[test]
578 fn test_signing_key_to_did_key() {
579 let key = SigningKey::random(&mut rand::thread_rng());
580 let did_key = signing_key_to_did_key(&key);
581 assert!(did_key.starts_with("did:key:z"));
582 }
583
584 #[test]
585 fn test_cid_for_cbor() {
586 let value = json!({
587 "test": "data",
588 "number": 42
589 });
590 let cid = cid_for_cbor(&value).unwrap();
591 assert!(cid.starts_with("bafyrei"));
592 }
593
594 #[test]
595 fn test_sign_operation() {
596 let key = SigningKey::random(&mut rand::thread_rng());
597 let op = json!({
598 "type": "plc_operation",
599 "rotationKeys": [],
600 "verificationMethods": {},
601 "alsoKnownAs": [],
602 "services": {},
603 "prev": null
604 });
605 let signed = sign_operation(&op, &key).unwrap();
606 assert!(signed.get("sig").is_some());
607 }
608}