Server tools to backfill, tail, mirror, and verify PLC logs

fjall: only use 6 bytes of CID in keys

ptr.pet 10393452 8f532f83

verified
+40 -19
+40 -19
src/plc_fjall.rs
··· 33 } 34 35 // 59 bytes -> 36 bytes 36 - fn encode_cid(buf: &mut Vec<u8>, s: &str) -> anyhow::Result<usize> { 37 - IpldCid::try_from(s)? 38 - .write_bytes(buf) 39 - .map_err(|e| anyhow::anyhow!("failed to encode cid {s}: {e}")) 40 } 41 42 fn decode_cid(bytes: &[u8]) -> anyhow::Result<String> { ··· 50 format!("did:plc:{decoded}") 51 } 52 53 - fn op_key(created_at: &Dt, cid: &str) -> anyhow::Result<Vec<u8>> { 54 let micros = created_at.timestamp_micros() as u64; 55 - let mut key = Vec::with_capacity(8 + 1 + cid.len()); 56 key.extend_from_slice(&micros.to_be_bytes()); 57 key.push(SEP); 58 - encode_cid(&mut key, cid)?; 59 - Ok(key) 60 } 61 62 fn by_did_prefix(did: &str) -> anyhow::Result<Vec<u8>> { ··· 66 Ok(p) 67 } 68 69 - fn by_did_key(did: &str, created_at: &Dt, cid: &str) -> anyhow::Result<Vec<u8>> { 70 let mut key = by_did_prefix(did)?; 71 let micros = created_at.timestamp_micros() as u64; 72 key.extend_from_slice(&micros.to_be_bytes()); 73 key.push(SEP); 74 - encode_cid(&mut key, cid)?; 75 Ok(key) 76 } 77 ··· 785 struct DbOp { 786 #[serde(with = "serde_bytes")] 787 pub did: Vec<u8>, 788 pub nullified: bool, 789 pub operation: StoredOp, 790 } ··· 879 } 880 881 pub fn insert_op(&self, batch: &mut OwnedWriteBatch, op: &CommonOp) -> anyhow::Result<usize> { 882 - let pk = by_did_key(&op.did, &op.created_at, &op.cid)?; 883 if self.inner.by_did.get(&pk)?.is_some() { 884 return Ok(0); 885 } 886 - let ts_key = op_key(&op.created_at, &op.cid)?; 887 888 let mut encoded_did = Vec::with_capacity(15); 889 encode_did(&mut encoded_did, &op.did)?; ··· 905 906 let db_op = DbOp { 907 did: encoded_did, 908 nullified: op.nullified, 909 operation, 910 }; ··· 932 let ts_bytes = key_rest 933 .get(..8) 934 .ok_or_else(|| anyhow::anyhow!("invalid length: {key_rest:?}"))?; 935 - let cid_bytes = key_rest 936 .get(9..) 937 .ok_or_else(|| anyhow::anyhow!("invalid length: {key_rest:?}"))?; 938 939 - let op_key = [ts_bytes, &[SEP][..], cid_bytes].concat(); 940 let ts = decode_timestamp(ts_bytes)?; 941 942 let value = self ··· 946 .ok_or_else(|| anyhow::anyhow!("op not found: {op_key:?}"))?; 947 948 let op: DbOp = rmp_serde::from_slice(&value)?; 949 - let cid = decode_cid(cid_bytes)?; 950 let did = decode_did(&op.did); 951 952 Ok(Op { ··· 980 key.get(..8) 981 .ok_or_else(|| anyhow::anyhow!("invalid op key {key:?}"))?, 982 )?; 983 - let cid = decode_cid( 984 - key.get(9..) 985 - .ok_or_else(|| anyhow::anyhow!("invalid op key {key:?}"))?, 986 - )?; 987 let did = decode_did(&db_op.did); 988 989 Ok(Op {
··· 33 } 34 35 // 59 bytes -> 36 bytes 36 + fn decode_cid_str(s: &str) -> anyhow::Result<Vec<u8>> { 37 + let cid = IpldCid::try_from(s)?; 38 + let mut buf = Vec::new(); 39 + cid.write_bytes(&mut buf) 40 + .map_err(|e| anyhow::anyhow!("failed to encode cid {s}: {e}"))?; 41 + Ok(buf) 42 } 43 44 fn decode_cid(bytes: &[u8]) -> anyhow::Result<String> { ··· 52 format!("did:plc:{decoded}") 53 } 54 55 + fn op_key(created_at: &Dt, cid_suffix: &[u8]) -> Vec<u8> { 56 let micros = created_at.timestamp_micros() as u64; 57 + let mut key = Vec::with_capacity(8 + 1 + cid_suffix.len()); 58 key.extend_from_slice(&micros.to_be_bytes()); 59 key.push(SEP); 60 + key.extend_from_slice(cid_suffix); 61 + key 62 } 63 64 fn by_did_prefix(did: &str) -> anyhow::Result<Vec<u8>> { ··· 68 Ok(p) 69 } 70 71 + fn by_did_key(did: &str, created_at: &Dt, cid_suffix: &[u8]) -> anyhow::Result<Vec<u8>> { 72 let mut key = by_did_prefix(did)?; 73 let micros = created_at.timestamp_micros() as u64; 74 key.extend_from_slice(&micros.to_be_bytes()); 75 key.push(SEP); 76 + key.extend_from_slice(cid_suffix); 77 Ok(key) 78 } 79 ··· 787 struct DbOp { 788 #[serde(with = "serde_bytes")] 789 pub did: Vec<u8>, 790 + #[serde(with = "serde_bytes")] 791 + pub cid_prefix: Vec<u8>, 792 pub nullified: bool, 793 pub operation: StoredOp, 794 } ··· 883 } 884 885 pub fn insert_op(&self, batch: &mut OwnedWriteBatch, op: &CommonOp) -> anyhow::Result<usize> { 886 + let cid_bytes = decode_cid_str(&op.cid)?; 887 + let cid_prefix = cid_bytes 888 + .get(..30) 889 + .ok_or_else(|| anyhow::anyhow!("invalid cid length (prefix): {}", op.cid))? 890 + .to_vec(); 891 + let cid_suffix = cid_bytes 892 + .get(30..) 893 + .ok_or_else(|| anyhow::anyhow!("invalid cid length (suffix): {}", op.cid))?; 894 + 895 + let pk = by_did_key(&op.did, &op.created_at, cid_suffix)?; 896 if self.inner.by_did.get(&pk)?.is_some() { 897 return Ok(0); 898 } 899 + let ts_key = op_key(&op.created_at, cid_suffix); 900 901 let mut encoded_did = Vec::with_capacity(15); 902 encode_did(&mut encoded_did, &op.did)?; ··· 918 919 let db_op = DbOp { 920 did: encoded_did, 921 + cid_prefix, 922 nullified: op.nullified, 923 operation, 924 }; ··· 946 let ts_bytes = key_rest 947 .get(..8) 948 .ok_or_else(|| anyhow::anyhow!("invalid length: {key_rest:?}"))?; 949 + let cid_suffix = key_rest 950 .get(9..) 951 .ok_or_else(|| anyhow::anyhow!("invalid length: {key_rest:?}"))?; 952 953 + let op_key = [ts_bytes, &[SEP][..], cid_suffix].concat(); 954 let ts = decode_timestamp(ts_bytes)?; 955 956 let value = self ··· 960 .ok_or_else(|| anyhow::anyhow!("op not found: {op_key:?}"))?; 961 962 let op: DbOp = rmp_serde::from_slice(&value)?; 963 + let mut full_cid_bytes = op.cid_prefix.clone(); 964 + full_cid_bytes.extend_from_slice(cid_suffix); 965 + 966 + let cid = decode_cid(&full_cid_bytes)?; 967 let did = decode_did(&op.did); 968 969 Ok(Op { ··· 997 key.get(..8) 998 .ok_or_else(|| anyhow::anyhow!("invalid op key {key:?}"))?, 999 )?; 1000 + let cid_suffix = key 1001 + .get(9..) 1002 + .ok_or_else(|| anyhow::anyhow!("invalid op key {key:?}"))?; 1003 + 1004 + let mut full_cid_bytes = db_op.cid_prefix.clone(); 1005 + full_cid_bytes.extend_from_slice(cid_suffix); 1006 + 1007 + let cid = decode_cid(&full_cid_bytes)?; 1008 let did = decode_did(&db_op.did); 1009 1010 Ok(Op {