Server tools to backfill, tail, mirror, and verify PLC logs

fjall: use bitcode in db instead of rmp_serde

ptr.pet 9d82e094 74af90cd

verified
+132 -41
+43
Cargo.lock
··· 25 "async-compression", 26 "async-trait", 27 "bincode", 28 "chrono", 29 "cid", 30 "clap", ··· 169 checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" 170 171 [[package]] 172 name = "asn1-rs" 173 version = "0.7.1" 174 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 309 ] 310 311 [[package]] 312 name = "bitflags" 313 version = "1.3.2" 314 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 355 version = "3.20.2" 356 source = "registry+https://github.com/rust-lang/crates.io-index" 357 checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" 358 359 [[package]] 360 name = "byteorder" ··· 1167 "wasip2", 1168 "wasip3", 1169 ] 1170 1171 [[package]] 1172 name = "governor"
··· 25 "async-compression", 26 "async-trait", 27 "bincode", 28 + "bitcode", 29 "chrono", 30 "cid", 31 "clap", ··· 170 checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" 171 172 [[package]] 173 + name = "arrayvec" 174 + version = "0.7.6" 175 + source = "registry+https://github.com/rust-lang/crates.io-index" 176 + checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" 177 + 178 + [[package]] 179 name = "asn1-rs" 180 version = "0.7.1" 181 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 316 ] 317 318 [[package]] 319 + name = "bitcode" 320 + version = "0.6.9" 321 + source = "registry+https://github.com/rust-lang/crates.io-index" 322 + checksum = "0a6ed1b54d8dc333e7be604d00fa9262f4635485ffea923647b6521a5fff045d" 323 + dependencies = [ 324 + "arrayvec", 325 + "bitcode_derive", 326 + "bytemuck", 327 + "glam", 328 + "serde", 329 + ] 330 + 331 + [[package]] 332 + name = "bitcode_derive" 333 + version = "0.6.9" 334 + source = "registry+https://github.com/rust-lang/crates.io-index" 335 + checksum = "238b90427dfad9da4a9abd60f3ec1cdee6b80454bde49ed37f1781dd8e9dc7f9" 336 + dependencies = [ 337 + "proc-macro2", 338 + "quote", 339 + "syn", 340 + ] 341 + 342 + [[package]] 343 name = "bitflags" 344 version = "1.3.2" 345 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 386 version = "3.20.2" 387 source = "registry+https://github.com/rust-lang/crates.io-index" 388 checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" 389 + 390 + [[package]] 391 + name = "bytemuck" 392 + version = "1.25.0" 393 + source = "registry+https://github.com/rust-lang/crates.io-index" 394 + checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" 395 396 [[package]] 397 name = "byteorder" ··· 1204 "wasip2", 1205 "wasip3", 1206 ] 1207 + 1208 + [[package]] 1209 + name = "glam" 1210 + version = "0.32.1" 1211 + source = "registry+https://github.com/rust-lang/crates.io-index" 1212 + checksum = "f70749695b063ecbf6b62949ccccde2e733ec3ecbbd71d467dca4e5c6c97cca0" 1213 1214 [[package]] 1215 name = "governor"
+1
Cargo.toml
··· 55 k256 = "0.13.4" 56 serde_ipld_dagcbor = "0.6.4" 57 ordered-varint = "2.0.0" 58
··· 55 k256 = "0.13.4" 56 serde_ipld_dagcbor = "0.6.4" 57 ordered-varint = "2.0.0" 58 + bitcode = { version = "0.6.9", features = ["serde"] } 59
+2 -2
src/crypto.rs
··· 3 use std::fmt; 4 5 /// base64url-encoded ECDSA signature → raw bytes 6 - #[derive(Debug, Clone, Serialize, Deserialize)] 7 pub struct Signature(#[serde(with = "serde_bytes")] pub Vec<u8>); 8 9 impl Signature { ··· 22 } 23 24 /// did:key:z... → raw multicodec public key bytes 25 - #[derive(Debug, Clone, Serialize, Deserialize)] 26 pub struct DidKey(#[serde(with = "serde_bytes")] pub Vec<u8>); 27 28 impl DidKey {
··· 3 use std::fmt; 4 5 /// base64url-encoded ECDSA signature → raw bytes 6 + #[derive(Debug, Clone, Serialize, Deserialize, bitcode::Encode, bitcode::Decode)] 7 pub struct Signature(#[serde(with = "serde_bytes")] pub Vec<u8>); 8 9 impl Signature { ··· 22 } 23 24 /// did:key:z... → raw multicodec public key bytes 25 + #[derive(Debug, Clone, Serialize, Deserialize, bitcode::Encode, bitcode::Decode)] 26 pub struct DidKey(#[serde(with = "serde_bytes")] pub Vec<u8>); 27 28 impl DidKey {
+86 -39
src/plc_fjall.rs
··· 76 } 77 78 /// CID string → binary CID bytes 79 - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 80 struct PlcCid(#[serde(with = "serde_bytes")] Vec<u8>); 81 82 impl PlcCid { ··· 96 } 97 } 98 99 - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 100 enum Aka { 101 - Bluesky(String), 102 - Atproto(String), 103 - Other(String), 104 } 105 106 impl Aka { ··· 127 } 128 } 129 130 - #[derive(Debug, Clone, Serialize, Deserialize)] 131 #[serde(rename_all = "snake_case")] 132 enum OpType { 133 - PlcOperation, 134 - Create, 135 - PlcTombstone, 136 - Other(String), 137 } 138 139 impl OpType { ··· 220 TypeMismatch(StoredOpField, &'static str), 221 } 222 223 - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] 224 enum VerificationMethodKey { 225 - Atproto, 226 - Other(String), 227 } 228 229 impl VerificationMethodKey { ··· 248 } 249 } 250 251 - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] 252 enum ServiceKey { 253 - AtprotoPds, 254 - Other(String), 255 } 256 257 impl ServiceKey { ··· 276 } 277 } 278 279 - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 280 enum ServiceType { 281 - AtprotoPersonalDataServer, 282 - Other(String), 283 } 284 285 impl ServiceType { ··· 298 } 299 } 300 301 - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 302 enum ServiceEndpoint { 303 - BlueskyPds(String), 304 - Other(String), 305 - BlueskySocial, 306 } 307 308 impl ServiceEndpoint { ··· 328 } 329 } 330 331 - #[derive(Debug, Clone, Serialize, Deserialize)] 332 struct StoredService { 333 r#type: ServiceType, 334 endpoint: ServiceEndpoint, 335 } 336 337 - #[derive(Debug, Clone, Serialize, Deserialize)] 338 struct StoredOp { 339 op_type: OpType, 340 sig: Signature, ··· 351 handle: Option<String>, 352 service: Option<String>, 353 354 - #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] 355 - unknown: BTreeMap<String, serde_json::Value>, 356 } 357 358 impl StoredOp { ··· 371 } 372 keys 373 } 374 fn from_json_value(v: serde_json::Value) -> (Option<Self>, Vec<StoredOpError>) { 375 let serde_json::Value::Object(mut obj) = v else { 376 return (None, vec![StoredOpError::NotAnObject]); ··· 698 recovery_key, 699 handle, 700 service, 701 - unknown, 702 }), 703 errors, 704 ) ··· 780 map.insert((*StoredOpField::Service).into(), service.clone().into()); 781 } 782 783 - for (k, v) in &self.unknown { 784 - map.insert(k.clone(), v.clone()); 785 } 786 787 serde_json::Value::Object(map) ··· 816 817 // stored alongside the seq key in the ops keyspace 818 // cid and created_at are in the value (not the key) in the new layout 819 - #[derive(Debug, Deserialize, Serialize)] 820 #[serde(rename_all = "camelCase")] 821 struct DbOp { 822 #[serde(with = "serde_bytes")] ··· 938 .into_inner() 939 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?; 940 let seq = decode_seq_key(&key)?; 941 - let db_op: DbOp = rmp_serde::from_slice(&value)?; 942 let dt = Dt::from_timestamp_micros(db_op.created_at as i64) 943 .ok_or_else(|| anyhow::anyhow!("invalid created_at in last op"))?; 944 Ok(Some((seq, dt))) ··· 1017 operation, 1018 }; 1019 1020 - let seq_val = rmp_serde::to_vec(&db_op)?; 1021 let seq_key_bytes = seq_key(seq); 1022 let by_did_key_bytes = by_did_key(&op.did, seq)?; 1023 ··· 1037 .range(seq_key(seq)..) 1038 .next() 1039 .map(|v| { 1040 - rmp_serde::from_slice::<DbOp>(&v.value()?) 1041 .context("failed to decode op") 1042 .map(|op| { 1043 Ok(Op { ··· 1075 .get(seq_key(seq))? 1076 .ok_or_else(|| anyhow::anyhow!("op not found for seq {seq}"))?; 1077 1078 - let op: DbOp = rmp_serde::from_slice(&value)?; 1079 let ts = Dt::from_timestamp_micros(op.created_at as i64) 1080 .ok_or_else(|| anyhow::anyhow!("invalid created_at_micros {}", op.created_at))?; 1081 let cid = PlcCid(op.cid.clone()); ··· 1140 .into_inner() 1141 .map_err(|e: fjall::Error| anyhow::anyhow!("fjall read error: {e}"))?; 1142 let seq = decode_seq_key(&key)?; 1143 - let db_op: DbOp = rmp_serde::from_slice(&value)?; 1144 let created_at = 1145 Dt::from_timestamp_micros(db_op.created_at as i64).ok_or_else(|| { 1146 anyhow::anyhow!("invalid created_at_micros {}", db_op.created_at) ··· 1696 msg.push_str(&format!("op: {op}\n")); 1697 panic!("{msg}"); 1698 } 1699 1700 - let packed = rmp_serde::to_vec(&stored).unwrap(); 1701 - let unpacked: StoredOp = rmp_serde::from_slice(&packed).unwrap(); 1702 1703 let reconstructed = unpacked.to_json_value(); 1704 assert_eq!(*op, reconstructed, "roundtrip mismatch in {path}"); ··· 1709 } 1710 1711 println!( 1712 - "json size: {} bytes, msgpack size: {} bytes, saved: {} bytes", 1713 total_json_size, 1714 total_packed_size, 1715 total_json_size as isize - total_packed_size as isize
··· 76 } 77 78 /// CID string → binary CID bytes 79 + // STABILITY: never reorder variants, only append. 80 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)] 81 struct PlcCid(#[serde(with = "serde_bytes")] Vec<u8>); 82 83 impl PlcCid { ··· 97 } 98 } 99 100 + // STABILITY: never reorder variants, only append. 101 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)] 102 enum Aka { 103 + Other(String), // 0 104 + Bluesky(String), // 1 105 + Atproto(String), // 2 106 } 107 108 impl Aka { ··· 129 } 130 } 131 132 + // STABILITY: never reorder variants, only append. 133 + #[derive(Debug, Clone, Serialize, Deserialize, bitcode::Encode, bitcode::Decode)] 134 #[serde(rename_all = "snake_case")] 135 enum OpType { 136 + Other(String), // 0 137 + PlcOperation, // 1 138 + Create, // 2 139 + PlcTombstone, // 3 140 } 141 142 impl OpType { ··· 223 TypeMismatch(StoredOpField, &'static str), 224 } 225 226 + // STABILITY: never reorder variants, only append. 227 + #[derive( 228 + Debug, 229 + Clone, 230 + Serialize, 231 + Deserialize, 232 + PartialEq, 233 + Eq, 234 + PartialOrd, 235 + Ord, 236 + bitcode::Encode, 237 + bitcode::Decode, 238 + )] 239 enum VerificationMethodKey { 240 + Other(String), // 0 241 + Atproto, // 1 242 } 243 244 impl VerificationMethodKey { ··· 263 } 264 } 265 266 + // STABILITY: never reorder variants, only append. 267 + #[derive( 268 + Debug, 269 + Clone, 270 + Serialize, 271 + Deserialize, 272 + PartialEq, 273 + Eq, 274 + PartialOrd, 275 + Ord, 276 + bitcode::Encode, 277 + bitcode::Decode, 278 + )] 279 enum ServiceKey { 280 + Other(String), // 0 281 + AtprotoPds, // 1 282 } 283 284 impl ServiceKey { ··· 303 } 304 } 305 306 + // STABILITY: never reorder variants, only append. 307 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)] 308 enum ServiceType { 309 + Other(String), // 0 310 + AtprotoPersonalDataServer, // 1 311 } 312 313 impl ServiceType { ··· 326 } 327 } 328 329 + // STABILITY: never reorder variants, only append. 330 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bitcode::Encode, bitcode::Decode)] 331 enum ServiceEndpoint { 332 + Other(String), // 0 333 + BlueskyPds(String), // 1 334 + BlueskySocial, // 2 335 } 336 337 impl ServiceEndpoint { ··· 357 } 358 } 359 360 + #[derive(Debug, Clone, Serialize, Deserialize, bitcode::Encode, bitcode::Decode)] 361 struct StoredService { 362 r#type: ServiceType, 363 endpoint: ServiceEndpoint, 364 } 365 366 + #[derive(Debug, Clone, Serialize, Deserialize, bitcode::Encode, bitcode::Decode)] 367 struct StoredOp { 368 op_type: OpType, 369 sig: Signature, ··· 380 handle: Option<String>, 381 service: Option<String>, 382 383 + // msgpack-encoded BTreeMap<String, serde_json::Value>. 384 + // Vec<u8> is used because bitcode cannot handle serde_json::Value directly. 385 + // empty vec when there are no unknown fields (the common case). 386 + #[serde(skip)] 387 + unknown_packed: Vec<u8>, 388 } 389 390 impl StoredOp { ··· 403 } 404 keys 405 } 406 + 407 + fn unknown(&self) -> BTreeMap<String, serde_json::Value> { 408 + if self.unknown_packed.is_empty() { 409 + return BTreeMap::new(); 410 + } 411 + rmp_serde::from_slice(&self.unknown_packed).unwrap_or_default() 412 + } 413 + 414 + fn pack_unknown(unknown: BTreeMap<String, serde_json::Value>) -> Vec<u8> { 415 + if unknown.is_empty() { 416 + return Vec::new(); 417 + } 418 + rmp_serde::to_vec(&unknown).expect("unknown fields are serializable") 419 + } 420 fn from_json_value(v: serde_json::Value) -> (Option<Self>, Vec<StoredOpError>) { 421 let serde_json::Value::Object(mut obj) = v else { 422 return (None, vec![StoredOpError::NotAnObject]); ··· 744 recovery_key, 745 handle, 746 service, 747 + unknown_packed: Self::pack_unknown(unknown), 748 }), 749 errors, 750 ) ··· 826 map.insert((*StoredOpField::Service).into(), service.clone().into()); 827 } 828 829 + for (k, v) in self.unknown() { 830 + map.insert(k, v); 831 } 832 833 serde_json::Value::Object(map) ··· 862 863 // stored alongside the seq key in the ops keyspace 864 // cid and created_at are in the value (not the key) in the new layout 865 + #[derive(Debug, Deserialize, Serialize, bitcode::Encode, bitcode::Decode)] 866 #[serde(rename_all = "camelCase")] 867 struct DbOp { 868 #[serde(with = "serde_bytes")] ··· 984 .into_inner() 985 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?; 986 let seq = decode_seq_key(&key)?; 987 + let db_op: DbOp = bitcode::decode::<DbOp>(&value)?; 988 let dt = Dt::from_timestamp_micros(db_op.created_at as i64) 989 .ok_or_else(|| anyhow::anyhow!("invalid created_at in last op"))?; 990 Ok(Some((seq, dt))) ··· 1063 operation, 1064 }; 1065 1066 + let seq_val = bitcode::encode(&db_op); 1067 let seq_key_bytes = seq_key(seq); 1068 let by_did_key_bytes = by_did_key(&op.did, seq)?; 1069 ··· 1083 .range(seq_key(seq)..) 1084 .next() 1085 .map(|v| { 1086 + bitcode::decode::<DbOp>(&v.value()?) 1087 .context("failed to decode op") 1088 .map(|op| { 1089 Ok(Op { ··· 1121 .get(seq_key(seq))? 1122 .ok_or_else(|| anyhow::anyhow!("op not found for seq {seq}"))?; 1123 1124 + let op: DbOp = bitcode::decode::<DbOp>(&value)?; 1125 let ts = Dt::from_timestamp_micros(op.created_at as i64) 1126 .ok_or_else(|| anyhow::anyhow!("invalid created_at_micros {}", op.created_at))?; 1127 let cid = PlcCid(op.cid.clone()); ··· 1186 .into_inner() 1187 .map_err(|e: fjall::Error| anyhow::anyhow!("fjall read error: {e}"))?; 1188 let seq = decode_seq_key(&key)?; 1189 + let db_op: DbOp = bitcode::decode::<DbOp>(&value)?; 1190 let created_at = 1191 Dt::from_timestamp_micros(db_op.created_at as i64).ok_or_else(|| { 1192 anyhow::anyhow!("invalid created_at_micros {}", db_op.created_at) ··· 1742 msg.push_str(&format!("op: {op}\n")); 1743 panic!("{msg}"); 1744 } 1745 + let stored = stored.unwrap(); 1746 1747 + let packed = bitcode::encode(&stored); 1748 + let unpacked: StoredOp = bitcode::decode::<StoredOp>(&packed).unwrap(); 1749 1750 let reconstructed = unpacked.to_json_value(); 1751 assert_eq!(*op, reconstructed, "roundtrip mismatch in {path}"); ··· 1756 } 1757 1758 println!( 1759 + "json size: {} bytes, bitcode size: {} bytes, saved: {} bytes", 1760 total_json_size, 1761 total_packed_size, 1762 total_json_size as isize - total_packed_size as isize