Server tools to backfill, tail, mirror, and verify PLC logs

fjall: parse operation to store types more efficiently

ptr.pet 6ac66700 b4923abf

verified
+437 -5
+1
Cargo.lock
··· 35 35 "governor", 36 36 "http-body-util", 37 37 "log", 38 + "multibase", 38 39 "native-tls", 39 40 "opentelemetry", 40 41 "opentelemetry-otlp",
+1
Cargo.toml
··· 48 48 rmp-serde = "1.3.1" 49 49 bincode = "1.3.3" 50 50 serde_bytes = "0.11.19" 51 + multibase = "0.9.2" 51 52
+435 -5
src/plc_fjall.rs
··· 1 1 use crate::{Dt, ExportPage, Op as CommonOp, PageBoundaryState}; 2 - use data_encoding::BASE32_NOPAD; 2 + use data_encoding::{BASE32_NOPAD, BASE64URL}; 3 3 use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode}; 4 4 use serde::{Deserialize, Serialize}; 5 + use std::collections::BTreeMap; 6 + use std::fmt; 5 7 use std::path::Path; 6 8 use std::sync::Arc; 7 9 use std::time::Instant; ··· 78 80 .ok_or_else(|| anyhow::anyhow!("invalid timestamp {micros}")) 79 81 } 80 82 83 + /// base64url-encoded ECDSA signature → raw bytes 84 + #[derive(Debug, Clone, Serialize, Deserialize)] 85 + struct Signature(#[serde(with = "serde_bytes")] Vec<u8>); 86 + 87 + impl Signature { 88 + fn from_base64url(s: &str) -> anyhow::Result<Self> { 89 + BASE64URL 90 + .decode(s.as_bytes()) 91 + .map(Self) 92 + .map_err(|e| anyhow::anyhow!("invalid base64url sig: {e}")) 93 + } 94 + } 95 + 96 + impl fmt::Display for Signature { 97 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 98 + f.write_str(&BASE64URL.encode(&self.0)) 99 + } 100 + } 101 + 102 + /// did:key:z... → raw multicodec public key bytes 103 + #[derive(Debug, Clone, Serialize, Deserialize)] 104 + struct DidKey(#[serde(with = "serde_bytes")] Vec<u8>); 105 + 106 + impl DidKey { 107 + fn from_did_key(s: &str) -> anyhow::Result<Self> { 108 + let multibase_str = s 109 + .strip_prefix("did:key:") 110 + .ok_or_else(|| anyhow::anyhow!("missing did:key: prefix in {s}"))?; 111 + let (_base, bytes) = multibase::decode(multibase_str) 112 + .map_err(|e| anyhow::anyhow!("invalid multibase in did:key {s}: {e}"))?; 113 + Ok(Self(bytes)) 114 + } 115 + } 116 + 117 + impl fmt::Display for DidKey { 118 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 119 + write!( 120 + f, 121 + "did:key:{}", 122 + multibase::encode(multibase::Base::Base58Btc, &self.0) 123 + ) 124 + } 125 + } 126 + 127 + /// CID string → binary CID bytes 128 + #[derive(Debug, Clone, Serialize, Deserialize)] 129 + struct PlcCid(#[serde(with = "serde_bytes")] Vec<u8>); 130 + 131 + impl PlcCid { 132 + fn from_cid_str(s: &str) -> anyhow::Result<Self> { 133 + let cid = IpldCid::try_from(s)?; 134 + let mut buf = Vec::new(); 135 + cid.write_bytes(&mut buf) 136 + .map_err(|e| anyhow::anyhow!("failed to encode cid {s}: {e}"))?; 137 + Ok(Self(buf)) 138 + } 139 + } 140 + 141 + impl fmt::Display for PlcCid { 142 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 143 + let cid = IpldCid::try_from(self.0.as_slice()).map_err(|_| fmt::Error)?; 144 + write!(f, "{cid}") 145 + } 146 + } 147 + 148 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 149 + enum Aka { 150 + Atproto(String), 151 + Other(String), 152 + } 153 + 154 + impl Aka { 155 + fn from_str(s: &str) -> Self { 156 + if let Some(stripped) = s.strip_prefix("at://") { 157 + Self::Atproto(stripped.to_string()) 158 + } else { 159 + Self::Other(s.to_string()) 160 + } 161 + } 162 + } 163 + 164 + impl fmt::Display for Aka { 165 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 166 + match self { 167 + Self::Atproto(h) => write!(f, "at://{h}"), 168 + Self::Other(s) => f.write_str(s), 169 + } 170 + } 171 + } 172 + 173 + #[derive(Debug, Clone, Serialize, Deserialize)] 174 + #[serde(rename_all = "snake_case")] 175 + enum OpType { 176 + PlcOperation, 177 + Create, 178 + PlcTombstone, 179 + Other(String), 180 + } 181 + 182 + impl OpType { 183 + fn from_str(s: &str) -> Self { 184 + match s { 185 + "plc_operation" => Self::PlcOperation, 186 + "create" => Self::Create, 187 + "plc_tombstone" => Self::PlcTombstone, 188 + other => Self::Other(other.to_string()), 189 + } 190 + } 191 + 192 + fn as_str(&self) -> &str { 193 + match self { 194 + Self::PlcOperation => "plc_operation", 195 + Self::Create => "create", 196 + Self::PlcTombstone => "plc_tombstone", 197 + Self::Other(s) => s, 198 + } 199 + } 200 + } 201 + 202 + #[derive(Debug, Clone, Serialize, Deserialize)] 203 + struct StoredService { 204 + r#type: String, 205 + endpoint: String, 206 + } 207 + 208 + #[derive(Debug, Clone, Serialize, Deserialize)] 209 + struct StoredOp { 210 + op_type: OpType, 211 + sig: Signature, 212 + prev: Option<PlcCid>, 213 + 214 + rotation_keys: Option<Vec<DidKey>>, 215 + verification_methods: Option<BTreeMap<String, DidKey>>, 216 + also_known_as: Option<Vec<Aka>>, 217 + services: Option<BTreeMap<String, StoredService>>, 218 + 219 + // legacy create fields 220 + signing_key: Option<DidKey>, 221 + recovery_key: Option<DidKey>, 222 + handle: Option<String>, 223 + service: Option<String>, 224 + 225 + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] 226 + unknown: BTreeMap<String, serde_json::Value>, 227 + } 228 + 229 + impl StoredOp { 230 + fn from_json_value(v: &serde_json::Value) -> anyhow::Result<Self> { 231 + let obj = v 232 + .as_object() 233 + .ok_or_else(|| anyhow::anyhow!("operation is not an object"))?; 234 + 235 + let mut known_keys = Vec::new(); 236 + let mut get = |key: &'static str| { 237 + known_keys.push(key); 238 + obj.get(key) 239 + }; 240 + 241 + let op_type = get("type") 242 + .and_then(|t| t.as_str()) 243 + .map(OpType::from_str) 244 + .ok_or_else(|| anyhow::anyhow!("missing type field"))?; 245 + 246 + let sig = get("sig") 247 + .and_then(|s| s.as_str()) 248 + .ok_or_else(|| anyhow::anyhow!("missing sig field")) 249 + .and_then(Signature::from_base64url)?; 250 + 251 + let prev = match get("prev").and_then(|p| p.as_str()) { 252 + Some(s) => Some(PlcCid::from_cid_str(s)?), 253 + None => None, 254 + }; 255 + 256 + let rotation_keys = get("rotationKeys") 257 + .and_then(|v| v.as_array()) 258 + .map(|arr| { 259 + arr.iter() 260 + .filter_map(|v| v.as_str()) 261 + .map(DidKey::from_did_key) 262 + .collect::<anyhow::Result<Vec<_>>>() 263 + }) 264 + .transpose()?; 265 + 266 + let verification_methods = get("verificationMethods") 267 + .and_then(|v| v.as_object()) 268 + .map(|map| { 269 + map.iter() 270 + .map(|(k, v)| { 271 + let key = DidKey::from_did_key(v.as_str().unwrap_or_default())?; 272 + Ok((k.clone(), key)) 273 + }) 274 + .collect::<anyhow::Result<BTreeMap<_, _>>>() 275 + }) 276 + .transpose()?; 277 + 278 + let also_known_as = get("alsoKnownAs").and_then(|v| v.as_array()).map(|arr| { 279 + arr.iter() 280 + .filter_map(|v| v.as_str()) 281 + .map(Aka::from_str) 282 + .collect() 283 + }); 284 + 285 + let services = get("services").and_then(|v| v.as_object()).map(|map| { 286 + map.iter() 287 + .filter_map(|(k, v)| { 288 + let svc = StoredService { 289 + r#type: v.get("type")?.as_str()?.to_string(), 290 + endpoint: v.get("endpoint")?.as_str()?.to_string(), 291 + }; 292 + Some((k.clone(), svc)) 293 + }) 294 + .collect() 295 + }); 296 + 297 + let signing_key = get("signingKey") 298 + .and_then(|v| v.as_str()) 299 + .map(DidKey::from_did_key) 300 + .transpose()?; 301 + 302 + let recovery_key = get("recoveryKey") 303 + .and_then(|v| v.as_str()) 304 + .map(DidKey::from_did_key) 305 + .transpose()?; 306 + 307 + let handle = get("handle") 308 + .and_then(|v| v.as_str()) 309 + .map(|s| s.to_string()); 310 + 311 + let service = get("service") 312 + .and_then(|v| v.as_str()) 313 + .map(|s| s.to_string()); 314 + 315 + let mut unknown = BTreeMap::new(); 316 + for (k, v) in obj { 317 + if !known_keys.contains(&k.as_str()) { 318 + unknown.insert(k.clone(), v.clone()); 319 + } 320 + } 321 + 322 + Ok(Self { 323 + op_type, 324 + sig, 325 + prev, 326 + rotation_keys, 327 + verification_methods, 328 + also_known_as, 329 + services, 330 + signing_key, 331 + recovery_key, 332 + handle, 333 + service, 334 + unknown, 335 + }) 336 + } 337 + 338 + fn to_json_value(&self) -> serde_json::Value { 339 + let mut map = serde_json::Map::new(); 340 + 341 + map.insert("type".into(), self.op_type.as_str().into()); 342 + map.insert("sig".into(), self.sig.to_string().into()); 343 + map.insert( 344 + "prev".into(), 345 + self.prev 346 + .as_ref() 347 + .map(|c| serde_json::Value::String(c.to_string())) 348 + .unwrap_or(serde_json::Value::Null), 349 + ); 350 + 351 + if let Some(keys) = &self.rotation_keys { 352 + map.insert( 353 + "rotationKeys".into(), 354 + keys.iter() 355 + .map(|k| serde_json::Value::String(k.to_string())) 356 + .collect::<Vec<_>>() 357 + .into(), 358 + ); 359 + } 360 + 361 + if let Some(methods) = &self.verification_methods { 362 + let obj: serde_json::Map<String, serde_json::Value> = methods 363 + .iter() 364 + .map(|(k, v)| (k.clone(), serde_json::Value::String(v.to_string()))) 365 + .collect(); 366 + map.insert("verificationMethods".into(), obj.into()); 367 + } 368 + 369 + if let Some(aka) = &self.also_known_as { 370 + map.insert( 371 + "alsoKnownAs".into(), 372 + aka.iter() 373 + .map(|h| serde_json::Value::String(h.to_string())) 374 + .collect::<Vec<_>>() 375 + .into(), 376 + ); 377 + } 378 + 379 + if let Some(services) = &self.services { 380 + let obj: serde_json::Map<String, serde_json::Value> = services 381 + .iter() 382 + .map(|(k, svc)| { 383 + ( 384 + k.clone(), 385 + serde_json::json!({ 386 + "type": svc.r#type, 387 + "endpoint": svc.endpoint, 388 + }), 389 + ) 390 + }) 391 + .collect(); 392 + map.insert("services".into(), obj.into()); 393 + } 394 + 395 + // legacy create fields 396 + if let Some(key) = &self.signing_key { 397 + map.insert("signingKey".into(), key.to_string().into()); 398 + } 399 + if let Some(key) = &self.recovery_key { 400 + map.insert("recoveryKey".into(), key.to_string().into()); 401 + } 402 + if let Some(handle) = &self.handle { 403 + map.insert("handle".into(), handle.clone().into()); 404 + } 405 + if let Some(service) = &self.service { 406 + map.insert("service".into(), service.clone().into()); 407 + } 408 + 409 + for (k, v) in &self.unknown { 410 + map.insert(k.clone(), v.clone()); 411 + } 412 + 413 + serde_json::Value::Object(map) 414 + } 415 + } 416 + 81 417 // we have our own Op struct for fjall since we dont want to have to convert Value back to RawValue 82 418 #[derive(Debug, Serialize)] 83 419 pub struct Op { ··· 96 432 #[serde(with = "serde_bytes")] 97 433 pub did: Vec<u8>, 98 434 pub nullified: bool, 99 - pub operation: serde_json::Value, 435 + pub operation: StoredOp, 100 436 } 101 437 102 438 #[derive(Clone)] ··· 168 504 let mut encoded_did = Vec::with_capacity(15); 169 505 encode_did(&mut encoded_did, &op.did)?; 170 506 507 + let json_val: serde_json::Value = serde_json::from_str(op.operation.get())?; 508 + let stored = StoredOp::from_json_value(&json_val)?; 171 509 let db_op = DbOp { 172 510 did: encoded_did, 173 511 nullified: op.nullified, 174 - operation: serde_json::to_value(&op.operation)?, 512 + operation: stored, 175 513 }; 176 514 let value = rmp_serde::to_vec(&db_op)?; 177 515 batch.insert(&self.inner.ops, &ts_key, &value); ··· 219 557 cid, 220 558 created_at: ts, 221 559 nullified: op.nullified, 222 - operation: op.operation, 560 + operation: op.operation.to_json_value(), 223 561 }) 224 562 })) 225 563 } ··· 256 594 cid, 257 595 created_at, 258 596 nullified: db_op.nullified, 259 - operation: db_op.operation, 597 + operation: db_op.operation.to_json_value(), 260 598 }) 261 599 })) 262 600 } ··· 350 688 ); 351 689 Ok("pages_to_fjall") 352 690 } 691 + 692 + #[cfg(test)] 693 + mod tests { 694 + use super::*; 695 + 696 + #[test] 697 + fn signature_roundtrip() { 698 + let original = "9NuYV7AqwHVTc0YuWzNV3CJafsSZWH7qCxHRUIP2xWlB-YexXC1OaYAnUayiCXLVzRQ8WBXIqF-SvZdNalwcjA"; 699 + let sig = Signature::from_base64url(original).unwrap(); 700 + assert_eq!(sig.0.len(), 64); 701 + assert_eq!(sig.to_string(), original); 702 + } 703 + 704 + #[test] 705 + fn did_key_roundtrip() { 706 + let original = "did:key:zQ3shhCGUqDKjStzuDxPkTxN6ujddP4RkEKJJouJGRRkaLGbg"; 707 + let key = DidKey::from_did_key(original).unwrap(); 708 + assert_eq!(key.to_string(), original); 709 + } 710 + 711 + #[test] 712 + fn plc_cid_roundtrip() { 713 + let original = "bafyreigp6shzy6dlcxuowwoxz7u5nemdrkad2my5zwzpwilcnhih7bw6zm"; 714 + let cid = PlcCid::from_cid_str(original).unwrap(); 715 + assert_eq!(cid.to_string(), original); 716 + } 717 + 718 + #[test] 719 + fn handle_roundtrip() { 720 + let h = Aka::from_str("at://alice.bsky.social"); 721 + assert_eq!(h, Aka::Atproto("alice.bsky.social".to_string())); 722 + assert_eq!(h.to_string(), "at://alice.bsky.social"); 723 + } 724 + 725 + #[test] 726 + fn handle_without_prefix() { 727 + // According to DID spec, alsoKnownAs should be URIs. 728 + // If an alternative URI scheme is used, it will preserve it. 729 + let h = Aka::from_str("https://something.else"); 730 + assert_eq!(h, Aka::Other("https://something.else".to_string())); 731 + assert_eq!(h.to_string(), "https://something.else"); 732 + } 733 + 734 + #[test] 735 + fn op_type_roundtrip() { 736 + assert_eq!(OpType::from_str("plc_operation").as_str(), "plc_operation"); 737 + assert_eq!(OpType::from_str("create").as_str(), "create"); 738 + assert_eq!(OpType::from_str("plc_tombstone").as_str(), "plc_tombstone"); 739 + assert_eq!(OpType::from_str("weird_thing").as_str(), "weird_thing"); 740 + } 741 + 742 + #[test] 743 + fn stored_op_fixture_roundtrip() { 744 + let fixtures = [ 745 + "tests/fixtures/log_bskyapp.json", 746 + "tests/fixtures/log_legacy_dholms.json", 747 + "tests/fixtures/log_nullification.json", 748 + "tests/fixtures/log_tombstone.json", 749 + ]; 750 + 751 + let mut total_json_size = 0; 752 + let mut total_packed_size = 0; 753 + 754 + for path in fixtures { 755 + let data = std::fs::read_to_string(path).unwrap(); 756 + let entries: Vec<serde_json::Value> = serde_json::from_str(&data).unwrap(); 757 + 758 + for entry in &entries { 759 + let op = &entry["operation"]; 760 + let stored = StoredOp::from_json_value(op) 761 + .unwrap_or_else(|e| panic!("failed to parse op in {path}: {e}\n{op}")); 762 + 763 + // msgpack verification 764 + let packed = rmp_serde::to_vec(&stored).unwrap(); 765 + let unpacked: StoredOp = rmp_serde::from_slice(&packed).unwrap(); 766 + 767 + let reconstructed = unpacked.to_json_value(); 768 + assert_eq!(*op, reconstructed, "roundtrip mismatch in {path}"); 769 + 770 + total_json_size += serde_json::to_vec(op).unwrap().len(); 771 + total_packed_size += packed.len(); 772 + } 773 + } 774 + 775 + println!( 776 + "json size: {} bytes, msgpack size: {} bytes, saved: {} bytes", 777 + total_json_size, 778 + total_packed_size, 779 + total_json_size as isize - total_packed_size as isize 780 + ); 781 + } 782 + }