Server tools to backfill, tail, mirror, and verify PLC logs

fjall: store more types as bsky-optimized variants if possible

ptr.pet 8f532f83 03108187

verified
+195 -28
+195 -28
src/plc_fjall.rs
··· 151 151 152 152 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 153 153 enum Aka { 154 + Bluesky(String), 154 155 Atproto(String), 155 156 Other(String), 156 157 } ··· 158 159 impl Aka { 159 160 fn from_str(s: &str) -> Self { 160 161 if let Some(stripped) = s.strip_prefix("at://") { 161 - Self::Atproto(stripped.to_string()) 162 + if let Some(handle) = stripped.strip_suffix(".bsky.social") { 163 + Self::Bluesky(handle.to_string()) 164 + } else { 165 + Self::Atproto(stripped.to_string()) 166 + } 162 167 } else { 163 168 Self::Other(s.to_string()) 164 169 } ··· 168 173 impl fmt::Display for Aka { 169 174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 170 175 match self { 176 + Self::Bluesky(h) => write!(f, "at://{h}.bsky.social"), 171 177 Self::Atproto(h) => write!(f, "at://{h}"), 172 178 Self::Other(s) => f.write_str(s), 173 179 } ··· 265 271 InvalidField(StoredOpField, #[source] anyhow::Error), 266 272 #[error("type mismatch for field {0}: expected {1}")] 267 273 TypeMismatch(StoredOpField, &'static str), 274 + } 275 + 276 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] 277 + enum VerificationMethodKey { 278 + Atproto, 279 + Other(String), 280 + } 281 + 282 + impl VerificationMethodKey { 283 + fn from_str(s: &str) -> Self { 284 + match s { 285 + "atproto" => Self::Atproto, 286 + _ => Self::Other(s.to_string()), 287 + } 288 + } 289 + 290 + fn as_str(&self) -> &str { 291 + match self { 292 + Self::Atproto => "atproto", 293 + Self::Other(s) => s, 294 + } 295 + } 296 + } 297 + 298 + impl fmt::Display for VerificationMethodKey { 299 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 300 + f.write_str(self.as_str()) 301 + } 302 + } 303 + 304 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] 305 + enum ServiceKey { 306 + AtprotoPds, 307 + Other(String), 308 + } 309 + 310 + impl ServiceKey { 311 + fn from_str(s: &str) -> Self { 312 + match s { 313 + "atproto_pds" => Self::AtprotoPds, 314 + _ => Self::Other(s.to_string()), 315 + } 316 + } 317 + 318 + fn as_str(&self) -> &str { 319 + match self { 320 + Self::AtprotoPds => "atproto_pds", 321 + Self::Other(s) => s, 322 + } 323 + } 324 + } 325 + 326 + impl fmt::Display for ServiceKey { 327 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 328 + f.write_str(self.as_str()) 329 + } 330 + } 331 + 332 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 333 + enum ServiceType { 334 + AtprotoPersonalDataServer, 335 + Other(String), 336 + } 337 + 338 + impl ServiceType { 339 + fn from_str(s: &str) -> Self { 340 + match s { 341 + "AtprotoPersonalDataServer" => Self::AtprotoPersonalDataServer, 342 + _ => Self::Other(s.to_string()), 343 + } 344 + } 345 + 346 + fn as_str(&self) -> &str { 347 + match self { 348 + Self::AtprotoPersonalDataServer => "AtprotoPersonalDataServer", 349 + Self::Other(s) => s, 350 + } 351 + } 352 + } 353 + 354 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 355 + enum ServiceEndpoint { 356 + BlueskyPds(String), 357 + Other(String), 358 + } 359 + 360 + impl ServiceEndpoint { 361 + fn from_str(s: &str) -> Self { 362 + if let Some(host) = s 363 + .strip_prefix("https://") 364 + .and_then(|h| h.strip_suffix(".host.bsky.network")) 365 + { 366 + Self::BlueskyPds(host.to_string()) 367 + } else { 368 + Self::Other(s.to_string()) 369 + } 370 + } 371 + 372 + fn as_string(&self) -> String { 373 + match self { 374 + Self::BlueskyPds(h) => format!("https://{h}.host.bsky.network"), 375 + Self::Other(s) => s.clone(), 376 + } 377 + } 268 378 } 269 379 270 380 #[derive(Debug, Clone, Serialize, Deserialize)] 271 381 struct StoredService { 272 - r#type: String, 273 - endpoint: String, 382 + r#type: ServiceType, 383 + endpoint: ServiceEndpoint, 274 384 } 275 385 276 386 #[derive(Debug, Clone, Serialize, Deserialize)] ··· 280 390 prev: Option<PlcCid>, 281 391 282 392 rotation_keys: Option<Vec<DidKey>>, 283 - verification_methods: Option<BTreeMap<String, DidKey>>, 393 + verification_methods: Option<BTreeMap<VerificationMethodKey, DidKey>>, 284 394 also_known_as: Option<Vec<Aka>>, 285 - services: Option<BTreeMap<String, StoredService>>, 395 + services: Option<BTreeMap<ServiceKey, StoredService>>, 286 396 287 397 // legacy create fields 288 398 signing_key: Option<DidKey>, ··· 415 525 match v { 416 526 serde_json::Value::String(s) => match DidKey::from_did_key(&s) { 417 527 Ok(key) => { 418 - methods.insert(k, key); 528 + methods.insert(VerificationMethodKey::from_str(&k), key); 419 529 } 420 530 Err(e) => { 421 531 errors.push(StoredOpError::InvalidField( ··· 474 584 map.into_iter() 475 585 .filter_map(|(k, v)| { 476 586 let svc = StoredService { 477 - r#type: v.get("type")?.as_str()?.to_string(), 478 - endpoint: v.get("endpoint")?.as_str()?.to_string(), 587 + r#type: ServiceType::from_str(v.get("type")?.as_str()?), 588 + endpoint: ServiceEndpoint::from_str(v.get("endpoint")?.as_str()?), 479 589 }; 480 - Some((k, svc)) 590 + Some((ServiceKey::from_str(&k), svc)) 481 591 }) 482 592 .collect(), 483 593 ), ··· 610 720 if let Some(methods) = &self.verification_methods { 611 721 let obj: serde_json::Map<String, serde_json::Value> = methods 612 722 .iter() 613 - .map(|(k, v)| (k.clone(), serde_json::Value::String(v.to_string()))) 723 + .map(|(k, v)| { 724 + ( 725 + k.as_str().to_string(), 726 + serde_json::Value::String(v.to_string()), 727 + ) 728 + }) 614 729 .collect(); 615 730 map.insert((*StoredOpField::VerificationMethods).into(), obj.into()); 616 731 } ··· 630 745 .iter() 631 746 .map(|(k, svc)| { 632 747 ( 633 - k.clone(), 748 + k.as_str().to_string(), 634 749 serde_json::json!({ 635 - "type": svc.r#type, 636 - "endpoint": svc.endpoint, 750 + "type": svc.r#type.as_str(), 751 + "endpoint": svc.endpoint.as_string(), 637 752 }), 638 753 ) 639 754 }) ··· 663 778 } 664 779 } 665 780 666 - // we have our own Op struct for fjall since we dont want to have to convert Value back to RawValue 667 - #[derive(Debug, Serialize)] 668 - pub struct Op { 669 - pub did: String, 670 - pub cid: String, 671 - pub created_at: Dt, 672 - pub nullified: bool, 673 - pub operation: serde_json::Value, 674 - } 675 - 676 781 // this is basically Op, but without the cid and created_at fields 677 782 // since we have them in the key already 678 783 #[derive(Debug, Deserialize, Serialize)] ··· 682 787 pub did: Vec<u8>, 683 788 pub nullified: bool, 684 789 pub operation: StoredOp, 790 + } 791 + 792 + // we have our own Op struct for fjall since we dont want to have to convert Value back to RawValue 793 + #[derive(Debug, Serialize)] 794 + pub struct Op { 795 + pub did: String, 796 + pub cid: String, 797 + pub created_at: Dt, 798 + pub nullified: bool, 799 + pub operation: serde_json::Value, 685 800 } 686 801 687 802 #[derive(Clone)] ··· 1060 1175 } 1061 1176 1062 1177 #[test] 1063 - fn handle_roundtrip() { 1178 + fn bsky_handle_roundtrip() { 1064 1179 let h = Aka::from_str("at://alice.bsky.social"); 1065 - assert_eq!(h, Aka::Atproto("alice.bsky.social".to_string())); 1180 + assert_eq!(h, Aka::Bluesky("alice".to_string())); 1066 1181 assert_eq!(h.to_string(), "at://alice.bsky.social"); 1067 1182 } 1068 1183 1069 1184 #[test] 1070 - fn handle_without_prefix() { 1071 - // According to DID spec, alsoKnownAs should be URIs. 1072 - // If an alternative URI scheme is used, it will preserve it. 1185 + fn atproto_handle_roundtrip() { 1186 + let h = Aka::from_str("at://alice.example.com"); 1187 + assert_eq!(h, Aka::Atproto("alice.example.com".to_string())); 1188 + assert_eq!(h.to_string(), "at://alice.example.com"); 1189 + } 1190 + 1191 + #[test] 1192 + fn other_handle_roundtrip() { 1073 1193 let h = Aka::from_str("https://something.else"); 1074 1194 assert_eq!(h, Aka::Other("https://something.else".to_string())); 1075 1195 assert_eq!(h.to_string(), "https://something.else"); 1196 + } 1197 + 1198 + #[test] 1199 + fn verification_method_key_roundtrip() { 1200 + let k1 = VerificationMethodKey::from_str("atproto"); 1201 + assert_eq!(k1, VerificationMethodKey::Atproto); 1202 + assert_eq!(k1.to_string(), "atproto"); 1203 + 1204 + let k2 = VerificationMethodKey::from_str("other_key"); 1205 + assert_eq!(k2, VerificationMethodKey::Other("other_key".to_string())); 1206 + assert_eq!(k2.to_string(), "other_key"); 1207 + } 1208 + 1209 + #[test] 1210 + fn service_key_roundtrip() { 1211 + let k1 = ServiceKey::from_str("atproto_pds"); 1212 + assert_eq!(k1, ServiceKey::AtprotoPds); 1213 + assert_eq!(k1.to_string(), "atproto_pds"); 1214 + 1215 + let k2 = ServiceKey::from_str("other_svc"); 1216 + assert_eq!(k2, ServiceKey::Other("other_svc".to_string())); 1217 + assert_eq!(k2.to_string(), "other_svc"); 1218 + } 1219 + 1220 + #[test] 1221 + fn service_type_roundtrip() { 1222 + let t1 = ServiceType::from_str("AtprotoPersonalDataServer"); 1223 + assert_eq!(t1, ServiceType::AtprotoPersonalDataServer); 1224 + assert_eq!(t1.as_str(), "AtprotoPersonalDataServer"); 1225 + 1226 + let t2 = ServiceType::from_str("OtherType"); 1227 + assert_eq!(t2, ServiceType::Other("OtherType".to_string())); 1228 + assert_eq!(t2.as_str(), "OtherType"); 1229 + } 1230 + 1231 + #[test] 1232 + fn service_endpoint_roundtrip() { 1233 + let e1 = ServiceEndpoint::from_str("https://example.host.bsky.network"); 1234 + assert_eq!(e1, ServiceEndpoint::BlueskyPds("example".to_string())); 1235 + assert_eq!(e1.as_string(), "https://example.host.bsky.network"); 1236 + 1237 + let e2 = ServiceEndpoint::from_str("https://other.endpoint.com"); 1238 + assert_eq!( 1239 + e2, 1240 + ServiceEndpoint::Other("https://other.endpoint.com".to_string()) 1241 + ); 1242 + assert_eq!(e2.as_string(), "https://other.endpoint.com"); 1076 1243 } 1077 1244 1078 1245 #[test]