Server tools to backfill, tail, mirror, and verify PLC logs

plc_rocksdb: straight json, try helping zstd

+5 -45
+5 -45
src/plc_rocksdb.rs
··· 105 105 async fn insert_op(&self, db: &rocksdb::DB, op: &Op) -> Result<(), rocksdb::Error> { 106 106 let cf = db.cf_handle("ops").unwrap(); 107 107 let id = self.store_did(db, op.did.clone()).await?; 108 - db.put_cf( 109 - &cf, 110 - op_key(&op, id), 111 - bincode::encode_to_vec(&RocksOp::from_op(&op).await, bincode::config::standard()) 112 - .unwrap(), 113 - )?; 108 + db.put_cf(&cf, op_key(&op, id), serde_json::to_vec(&op).unwrap())?; 114 109 self.set_latest_timestamp(&db, op.created_at).await?; 115 110 116 111 Ok(()) ··· 167 162 } 168 163 } 169 164 170 - #[derive(bincode::Encode, bincode::Decode)] 171 - struct RocksOp { 172 - pub did: String, 173 - pub cid: String, 174 - pub created_at: i64, 175 - pub nullified: bool, 176 - pub operation: String, 177 - } 178 - 179 - impl RocksOp { 180 - async fn from_op(op: &Op) -> Self { 181 - Self { 182 - operation: op.operation.to_string(), 183 - did: op.did.clone(), 184 - cid: op.cid.clone(), 185 - created_at: op.created_at.timestamp_millis(), 186 - nullified: op.nullified, 187 - } 188 - } 189 - 190 - async fn to_op(&self) -> Op { 191 - Op { 192 - did: self.did.clone(), 193 - cid: self.cid.clone(), 194 - created_at: Dt::from_timestamp_millis(self.created_at).unwrap(), 195 - nullified: self.nullified, 196 - operation: serde_json::value::RawValue::from_string(self.operation.clone()).unwrap(), 197 - } 198 - } 199 - } 200 - 201 165 fn op_key(op: &Op, id: u64) -> String { 202 166 format!( 203 167 "{:0width$}_{}", ··· 303 267 304 268 let key_prefix = format!("{}_", id); 305 269 306 - let mut va: Vec<RocksOp> = db 270 + let mut va: Vec<Op> = db 307 271 .prefix_iterator_cf(&cf, key_prefix.clone()) 308 272 .into_iter() 309 273 .map_while(|e| { ··· 313 277 return None; 314 278 } 315 279 316 - let asd = bincode::decode_from_slice(&v, bincode::config::standard()).unwrap(); 317 - 318 - Some(asd.0) 280 + Some(serde_json::from_slice(&v).unwrap()) 319 281 }) 320 282 .collect(); 321 283 322 284 va.sort_by(|a, b| a.created_at.cmp(&b.created_at)); 323 285 324 - Ok(stream::iter(va) 325 - .then(async |e| e.to_op().await) 326 - .collect() 327 - .await) 286 + Ok(stream::iter(va).collect().await) 328 287 } 329 288 330 289 async fn seen_dids_amount(&self) -> Result<u64, Error> { ··· 345 304 ) -> anyhow::Result<&'static str> { 346 305 let mut last_at = None; 347 306 while let Some(page) = pages.recv().await { 307 + log::info!("new page!"); 348 308 for op in &page.ops { 349 309 let rdb = db.db.lock().await; 350 310 db.insert_op(&rdb, op).await?;