Server tools to backfill, tail, mirror, and verify PLC logs

fjall: implement signature verification

ptr.pet bab84164 97a6ed79

verified
+80 -21
+80 -21
src/plc_fjall.rs
··· 1 1 use crate::{ 2 2 BundleSource, Dt, ExportPage, Op as CommonOp, PageBoundaryState, Week, 3 - crypto::{DidKey, Signature}, 3 + crypto::{DidKey, Signature, assure_valid_sig}, 4 4 }; 5 5 use anyhow::Context; 6 6 use data_encoding::BASE32_NOPAD; ··· 368 368 } 369 369 370 370 impl StoredOp { 371 + fn get_keys(&self) -> Vec<&DidKey> { 372 + let mut keys = Vec::with_capacity(self.rotation_keys.as_ref().map_or(2, |keys| keys.len())); 373 + if let Some(rot_keys) = self.rotation_keys.as_ref() { 374 + keys.extend(rot_keys.iter()); 375 + } else { 376 + // legacy genesis op 377 + if let Some(recovery_key) = self.recovery_key.as_ref() { 378 + keys.push(recovery_key); 379 + } 380 + if let Some(signing_key) = self.signing_key.as_ref() { 381 + keys.push(signing_key); 382 + } 383 + } 384 + keys 385 + } 371 386 fn from_json_value(v: serde_json::Value) -> (Option<Self>, Vec<StoredOpError>) { 372 387 let serde_json::Value::Object(mut obj) = v else { 373 388 return (None, vec![StoredOpError::NotAnObject]); ··· 897 912 .get(30..) 898 913 .ok_or_else(|| anyhow::anyhow!("invalid cid length (suffix): {}", op.cid))?; 899 914 900 - let pk = by_did_key(&op.did, &op.created_at, cid_suffix)?; 901 - if self.inner.by_did.get(&pk)?.is_some() { 902 - return Ok(0); 903 - } 904 - let ts_key = op_key(&op.created_at, cid_suffix); 905 - 906 - let mut encoded_did = Vec::with_capacity(15); 907 - encode_did(&mut encoded_did, &op.did)?; 908 - 909 - let json_val: serde_json::Value = serde_json::from_str(op.operation.get())?; 910 - let (stored, mut errors) = StoredOp::from_json_value(json_val); 915 + let op_json: serde_json::Value = serde_json::from_str(op.operation.get())?; 916 + let (stored, mut errors) = StoredOp::from_json_value(op_json); 911 917 912 918 let Some(operation) = stored else { 913 919 return Err(errors.remove(0)).context("fatal operation parse error"); 914 920 }; 915 921 916 - for e in &errors { 917 - log::warn!("failed to parse operation {} {}: {}", op.did, op.cid, e); 922 + for err in &errors { 923 + log::warn!("failed to parse operation {} {}: {err}", op.did, op.cid); 918 924 } 919 925 if !errors.is_empty() { 920 926 // if parse failed but not fatal, we just dont store it 921 927 return Ok(0); 922 928 } 923 929 930 + // get keys from either prev or genesis op 931 + let prev_op = self._ops_for_did(&op.did)?.rev().next().transpose()?; 932 + let keys = prev_op.as_ref().map_or_else( 933 + || operation.get_keys(), 934 + |(_, _, op)| op.operation.get_keys(), 935 + ); 936 + 937 + if keys.is_empty() { 938 + log::warn!("no keys for op {} {}", op.did, op.cid); 939 + return Ok(0); 940 + } 941 + 942 + let data = { 943 + let serde_json::Value::Object(mut data) = operation.to_json_value() else { 944 + unreachable!("we checked if operation is valid already") 945 + }; 946 + data.remove("sig"); 947 + serde_json::Value::Object(data) 948 + }; 949 + let results = assure_valid_sig(keys, &operation.sig, &data)?; 950 + if !results.valid { 951 + for err in results.errors { 952 + log::warn!("invalid signature for op {} {}: {err}", op.did, op.cid); 953 + } 954 + // don't accept invalid ops 955 + return Ok(0); 956 + } 957 + 924 958 let db_op = DbOp { 925 - did: encoded_did, 959 + did: { 960 + let mut encoded_did = Vec::with_capacity(15); 961 + encode_did(&mut encoded_did, &op.did)?; 962 + encoded_did 963 + }, 926 964 cid_prefix, 927 965 nullified: op.nullified, 928 966 operation, 929 967 }; 930 - let value = rmp_serde::to_vec(&db_op)?; 931 - batch.insert(&self.inner.ops, &ts_key, &value); 932 - batch.insert(&self.inner.by_did, &pk, &[]); 968 + 969 + batch.insert( 970 + &self.inner.ops, 971 + op_key(&op.created_at, cid_suffix), 972 + rmp_serde::to_vec(&db_op)?, 973 + ); 974 + batch.insert( 975 + &self.inner.by_did, 976 + by_did_key(&op.did, &op.created_at, cid_suffix)?, 977 + &[], 978 + ); 979 + 933 980 Ok(1) 934 981 } 935 982 936 - pub fn ops_for_did( 983 + fn _ops_for_did( 937 984 &self, 938 985 did: &str, 939 - ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Op>> + '_> { 986 + ) -> anyhow::Result<impl DoubleEndedIterator<Item = anyhow::Result<(Dt, String, DbOp)>> + '_> 987 + { 940 988 let prefix = by_did_prefix(did)?; 941 989 942 990 Ok(self.inner.by_did.prefix(&prefix).map(move |guard| { ··· 969 1017 full_cid_bytes.extend_from_slice(cid_suffix); 970 1018 971 1019 let cid = decode_cid(&full_cid_bytes)?; 1020 + 1021 + Ok((ts, cid, op)) 1022 + })) 1023 + } 1024 + 1025 + pub fn ops_for_did( 1026 + &self, 1027 + did: &str, 1028 + ) -> anyhow::Result<impl DoubleEndedIterator<Item = anyhow::Result<Op>> + '_> { 1029 + Ok(self._ops_for_did(did)?.map(|res| { 1030 + let (ts, cid, op) = res?; 1031 + 972 1032 let did = decode_did(&op.did); 973 1033 974 1034 Ok(Op { ··· 1283 1343 panic!("{msg}"); 1284 1344 } 1285 1345 1286 - // msgpack verification 1287 1346 let packed = rmp_serde::to_vec(&stored).unwrap(); 1288 1347 let unpacked: StoredOp = rmp_serde::from_slice(&packed).unwrap(); 1289 1348