Server tools to backfill, tail, mirror, and verify PLC logs

fjall: store serde_json::Value in db instead of storing RawValue

ptr.pet d9a146b6 17e48d10

verified
+37 -26
+20 -19
src/mirror/fjall.rs
··· 167 168 match sub_path { 169 "" => { 170 - let parsed: Vec<serde_json::Value> = ops 171 - .iter() 172 - .filter(|op| !op.nullified) 173 - .filter_map(|op| serde_json::from_str(op.operation.get()).ok()) 174 - .collect(); 175 - let data = doc::apply_op_log(did_str, &parsed); 176 let Some(data) = data else { 177 return Err(Error::from_string( 178 format!("DID not available: {did_str}"), ··· 185 .body(serde_json::to_string(&doc).unwrap())) 186 } 187 "/log" => { 188 - let log: Vec<serde_json::Value> = ops 189 .iter() 190 .filter(|op| !op.nullified) 191 - .filter_map(|op| serde_json::from_str(op.operation.get()).ok()) 192 .collect(); 193 Ok(Response::builder() 194 .content_type("application/json") ··· 200 .map(|op| { 201 serde_json::json!({ 202 "did": op.did, 203 - "operation": serde_json::from_str::<serde_json::Value>(op.operation.get()).unwrap_or_default(), 204 "cid": op.cid, 205 "nullified": op.nullified, 206 "createdAt": op.created_at.to_rfc3339(), ··· 212 .body(serde_json::to_string(&audit).unwrap())) 213 } 214 "/log/last" => { 215 - let last = 216 - ops.iter().filter(|op| !op.nullified).last().and_then(|op| { 217 - serde_json::from_str::<serde_json::Value>(op.operation.get()).ok() 218 - }); 219 let Some(last) = last else { 220 return Err(Error::from_string( 221 format!("DID not available: {did_str}"), ··· 227 .body(serde_json::to_string(&last).unwrap())) 228 } 229 "/data" => { 230 - let parsed: Vec<serde_json::Value> = ops 231 - .iter() 232 - .filter(|op| !op.nullified) 233 - .filter_map(|op| serde_json::from_str(op.operation.get()).ok()) 234 - .collect(); 235 - let data = doc::apply_op_log(did_str, &parsed); 236 let Some(data) = data else { 237 return Err(Error::from_string( 238 format!("DID not available: {did_str}"),
··· 167 168 match sub_path { 169 "" => { 170 + let data = doc::apply_op_log( 171 + did_str, 172 + ops.iter() 173 + .filter(|op| !op.nullified) 174 + .map(|op| &op.operation), 175 + ); 176 let Some(data) = data else { 177 return Err(Error::from_string( 178 format!("DID not available: {did_str}"), ··· 185 .body(serde_json::to_string(&doc).unwrap())) 186 } 187 "/log" => { 188 + let log: Vec<&serde_json::Value> = ops 189 .iter() 190 .filter(|op| !op.nullified) 191 + .map(|op| &op.operation) 192 .collect(); 193 Ok(Response::builder() 194 .content_type("application/json") ··· 200 .map(|op| { 201 serde_json::json!({ 202 "did": op.did, 203 + "operation": op.operation, 204 "cid": op.cid, 205 "nullified": op.nullified, 206 "createdAt": op.created_at.to_rfc3339(), ··· 212 .body(serde_json::to_string(&audit).unwrap())) 213 } 214 "/log/last" => { 215 + let last = ops 216 + .iter() 217 + .filter(|op| !op.nullified) 218 + .last() 219 + .map(|op| &op.operation); 220 let Some(last) = last else { 221 return Err(Error::from_string( 222 format!("DID not available: {did_str}"), ··· 228 .body(serde_json::to_string(&last).unwrap())) 229 } 230 "/data" => { 231 + let data = doc::apply_op_log( 232 + did_str, 233 + ops.iter() 234 + .filter(|op| !op.nullified) 235 + .map(|op| &op.operation), 236 + ); 237 let Some(data) = data else { 238 return Err(Error::from_string( 239 format!("DID not available: {did_str}"),
+17 -7
src/plc_fjall.rs
··· 1 - use crate::{Dt, ExportPage, Op, PageBoundaryState}; 2 use data_encoding::BASE32_NOPAD; 3 use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode}; 4 use serde::{Deserialize, Serialize}; ··· 76 ); 77 Dt::from_timestamp_micros(micros as i64) 78 .ok_or_else(|| anyhow::anyhow!("invalid timestamp {micros}")) 79 } 80 81 // this is basically Op, but without the cid and created_at fields ··· 86 #[serde(with = "serde_bytes")] 87 pub did: Vec<u8>, 88 pub nullified: bool, 89 - pub operation: Box<serde_json::value::RawValue>, 90 } 91 92 #[derive(Clone)] ··· 142 .map(Some) 143 } 144 145 - pub fn insert_op(&self, batch: &mut OwnedWriteBatch, op: &Op) -> anyhow::Result<usize> { 146 let pk = by_did_key(&op.did, &op.created_at, &op.cid)?; 147 if self.inner.by_did.get(&pk)?.is_some() { 148 return Ok(0); ··· 155 let db_op = DbOp { 156 did: encoded_did, 157 nullified: op.nullified, 158 - operation: op.operation.clone(), 159 }; 160 - let value = rmp_serde::to_vec_named(&db_op)?; 161 batch.insert(&self.inner.ops, &ts_key, &value); 162 batch.insert(&self.inner.by_did, &pk, &[]); 163 Ok(1) ··· 180 181 let ts_bytes = key_rest 182 .get(..8) 183 - .ok_or_else(|| anyhow::anyhow!("invalid length"))?; 184 let cid_bytes = key_rest 185 .get(9..) 186 - .ok_or_else(|| anyhow::anyhow!("invalid length"))?; 187 188 let op_key = [ts_bytes, &[SEP][..], cid_bytes].concat(); 189 let ts = decode_timestamp(ts_bytes)?;
··· 1 + use crate::{Dt, ExportPage, Op as CommonOp, PageBoundaryState}; 2 use data_encoding::BASE32_NOPAD; 3 use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode}; 4 use serde::{Deserialize, Serialize}; ··· 76 ); 77 Dt::from_timestamp_micros(micros as i64) 78 .ok_or_else(|| anyhow::anyhow!("invalid timestamp {micros}")) 79 + } 80 + 81 + // we have our own Op struct for fjall since we dont want to have to convert Value back to RawValue 82 + #[derive(Debug, Serialize)] 83 + pub struct Op { 84 + pub did: String, 85 + pub cid: String, 86 + pub created_at: Dt, 87 + pub nullified: bool, 88 + pub operation: serde_json::Value, 89 } 90 91 // this is basically Op, but without the cid and created_at fields ··· 96 #[serde(with = "serde_bytes")] 97 pub did: Vec<u8>, 98 pub nullified: bool, 99 + pub operation: serde_json::Value, 100 } 101 102 #[derive(Clone)] ··· 152 .map(Some) 153 } 154 155 + pub fn insert_op(&self, batch: &mut OwnedWriteBatch, op: &CommonOp) -> anyhow::Result<usize> { 156 let pk = by_did_key(&op.did, &op.created_at, &op.cid)?; 157 if self.inner.by_did.get(&pk)?.is_some() { 158 return Ok(0); ··· 165 let db_op = DbOp { 166 did: encoded_did, 167 nullified: op.nullified, 168 + operation: serde_json::to_value(&op.operation)?, 169 }; 170 + let value = rmp_serde::to_vec(&db_op)?; 171 batch.insert(&self.inner.ops, &ts_key, &value); 172 batch.insert(&self.inner.by_did, &pk, &[]); 173 Ok(1) ··· 190 191 let ts_bytes = key_rest 192 .get(..8) 193 + .ok_or_else(|| anyhow::anyhow!("invalid length: {key_rest:?}"))?; 194 let cid_bytes = key_rest 195 .get(9..) 196 + .ok_or_else(|| anyhow::anyhow!("invalid length: {key_rest:?}"))?; 197 198 let op_key = [ts_bytes, &[SEP][..], cid_bytes].concat(); 199 let ts = decode_timestamp(ts_bytes)?;