Server tools to backfill, tail, mirror, and verify PLC logs

fjall: dont propagate parsing error unless its fatal

ptr.pet 5cc551b4 e61a3187

verified
+370 -104
+370 -104
src/plc_fjall.rs
··· 1 use crate::{Dt, ExportPage, Op as CommonOp, PageBoundaryState}; 2 - use data_encoding::{BASE32_NOPAD, BASE64URL, BASE64URL_NOPAD}; 3 use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode}; 4 use serde::{Deserialize, Serialize}; 5 use std::collections::BTreeMap; ··· 199 } 200 } 201 202 #[derive(Debug, Clone, Serialize, Deserialize)] 203 struct StoredService { 204 r#type: String, ··· 227 } 228 229 impl StoredOp { 230 - fn from_json_value(v: &serde_json::Value) -> anyhow::Result<Self> { 231 - let obj = v 232 - .as_object() 233 - .ok_or_else(|| anyhow::anyhow!("operation is not an object"))?; 234 235 - let mut known_keys = Vec::new(); 236 - let mut get = |key: &'static str| { 237 - known_keys.push(key); 238 - obj.get(key) 239 }; 240 241 - let op_type = get("type") 242 - .and_then(|t| t.as_str()) 243 - .map(OpType::from_str) 244 - .ok_or_else(|| anyhow::anyhow!("missing type field"))?; 245 246 - let sig = get("sig") 247 - .and_then(|s| s.as_str()) 248 - .ok_or_else(|| anyhow::anyhow!("missing sig field")) 249 - .and_then(Signature::from_base64url)?; 250 251 - let prev = match get("prev").and_then(|p| p.as_str()) { 252 - Some(s) => Some(PlcCid::from_cid_str(s)?), 253 - None => None, 254 }; 255 256 - let rotation_keys = get("rotationKeys") 257 - .and_then(|v| v.as_array()) 258 - .map(|arr| { 259 - arr.iter() 260 - .filter_map(|v| v.as_str()) 261 - .map(DidKey::from_did_key) 262 - .collect::<anyhow::Result<Vec<_>>>() 263 - }) 264 - .transpose()?; 265 266 - let verification_methods = get("verificationMethods") 267 - .and_then(|v| v.as_object()) 268 - .map(|map| { 269 - map.iter() 270 - .map(|(k, v)| { 271 - let key = DidKey::from_did_key(v.as_str().unwrap_or_default())?; 272 - Ok((k.clone(), key)) 273 }) 274 - .collect::<anyhow::Result<BTreeMap<_, _>>>() 275 - }) 276 - .transpose()?; 277 278 - let also_known_as = get("alsoKnownAs").and_then(|v| v.as_array()).map(|arr| { 279 - arr.iter() 280 - .filter_map(|v| v.as_str()) 281 - .map(Aka::from_str) 282 - .collect() 283 - }); 284 285 - let services = get("services").and_then(|v| v.as_object()).map(|map| { 286 - map.iter() 287 - .filter_map(|(k, v)| { 288 - let svc = StoredService { 289 - r#type: v.get("type")?.as_str()?.to_string(), 290 - endpoint: v.get("endpoint")?.as_str()?.to_string(), 291 - }; 292 - Some((k.clone(), svc)) 293 - }) 294 - .collect() 295 - }); 296 297 - let signing_key = get("signingKey") 298 - .and_then(|v| v.as_str()) 299 - .map(DidKey::from_did_key) 300 - .transpose()?; 301 302 - let recovery_key = get("recoveryKey") 303 - .and_then(|v| v.as_str()) 304 - .map(DidKey::from_did_key) 305 - .transpose()?; 306 307 - let handle = get("handle") 308 - .and_then(|v| v.as_str()) 309 - .map(|s| s.to_string()); 310 - 311 - let service = get("service") 312 - .and_then(|v| v.as_str()) 313 - .map(|s| s.to_string()); 314 315 - let mut unknown = BTreeMap::new(); 316 for (k, v) in obj { 317 - if !known_keys.contains(&k.as_str()) { 318 - unknown.insert(k.clone(), v.clone()); 319 - } 320 } 321 322 - Ok(Self { 323 - op_type, 324 - sig, 325 - prev, 326 - rotation_keys, 327 - verification_methods, 328 - also_known_as, 329 - services, 330 - signing_key, 331 - recovery_key, 332 - handle, 333 - service, 334 - unknown, 335 - }) 336 } 337 338 fn to_json_value(&self) -> serde_json::Value { 339 let mut map = serde_json::Map::new(); 340 341 - map.insert("type".into(), self.op_type.as_str().into()); 342 - map.insert("sig".into(), self.sig.to_string().into()); 343 map.insert( 344 - "prev".into(), 345 self.prev 346 .as_ref() 347 .map(|c| serde_json::Value::String(c.to_string())) ··· 350 351 if let Some(keys) = &self.rotation_keys { 352 map.insert( 353 - "rotationKeys".into(), 354 keys.iter() 355 .map(|k| serde_json::Value::String(k.to_string())) 356 .collect::<Vec<_>>() ··· 363 .iter() 364 .map(|(k, v)| (k.clone(), serde_json::Value::String(v.to_string()))) 365 .collect(); 366 - map.insert("verificationMethods".into(), obj.into()); 367 } 368 369 if let Some(aka) = &self.also_known_as { 370 map.insert( 371 - "alsoKnownAs".into(), 372 aka.iter() 373 .map(|h| serde_json::Value::String(h.to_string())) 374 .collect::<Vec<_>>() ··· 389 ) 390 }) 391 .collect(); 392 - map.insert("services".into(), obj.into()); 393 } 394 395 // legacy create fields 396 if let Some(key) = &self.signing_key { 397 - map.insert("signingKey".into(), key.to_string().into()); 398 } 399 if let Some(key) = &self.recovery_key { 400 - map.insert("recoveryKey".into(), key.to_string().into()); 401 } 402 if let Some(handle) = &self.handle { 403 - map.insert("handle".into(), handle.clone().into()); 404 } 405 if let Some(service) = &self.service { 406 - map.insert("service".into(), service.clone().into()); 407 } 408 409 for (k, v) in &self.unknown { ··· 505 encode_did(&mut encoded_did, &op.did)?; 506 507 let json_val: serde_json::Value = serde_json::from_str(op.operation.get())?; 508 - let stored = StoredOp::from_json_value(&json_val)?; 509 let db_op = DbOp { 510 did: encoded_did, 511 nullified: op.nullified, 512 - operation: stored, 513 }; 514 let value = rmp_serde::to_vec(&db_op)?; 515 batch.insert(&self.inner.ops, &ts_key, &value); ··· 757 758 for entry in &entries { 759 let op = &entry["operation"]; 760 - let stored = StoredOp::from_json_value(op) 761 - .unwrap_or_else(|e| panic!("failed to parse op in {path}: {e}\n{op}")); 762 763 // msgpack verification 764 let packed = rmp_serde::to_vec(&stored).unwrap();
··· 1 use crate::{Dt, ExportPage, Op as CommonOp, PageBoundaryState}; 2 + use anyhow::Context; 3 + use data_encoding::{BASE32_NOPAD, BASE64URL_NOPAD}; 4 use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode}; 5 use serde::{Deserialize, Serialize}; 6 use std::collections::BTreeMap; ··· 200 } 201 } 202 203 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 204 + enum StoredOpField { 205 + Type, 206 + Sig, 207 + Prev, 208 + RotationKeys, 209 + VerificationMethods, 210 + AlsoKnownAs, 211 + Services, 212 + SigningKey, 213 + RecoveryKey, 214 + Handle, 215 + Service, 216 + } 217 + 218 + impl StoredOpField { 219 + fn as_str(&self) -> &'static str { 220 + match self { 221 + Self::Type => "type", 222 + Self::Sig => "sig", 223 + Self::Prev => "prev", 224 + Self::RotationKeys => "rotationKeys", 225 + Self::VerificationMethods => "verificationMethods", 226 + Self::AlsoKnownAs => "alsoKnownAs", 227 + Self::Services => "services", 228 + Self::SigningKey => "signingKey", 229 + Self::RecoveryKey => "recoveryKey", 230 + Self::Handle => "handle", 231 + Self::Service => "service", 232 + } 233 + } 234 + } 235 + 236 + impl AsRef<str> for StoredOpField { 237 + fn as_ref(&self) -> &str { 238 + self.as_str() 239 + } 240 + } 241 + 242 + impl std::ops::Deref for StoredOpField { 243 + type Target = str; 244 + fn deref(&self) -> &Self::Target { 245 + self.as_str() 246 + } 247 + } 248 + 249 + impl fmt::Display for StoredOpField { 250 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 251 + f.write_str(self.as_str()) 252 + } 253 + } 254 + 255 + #[derive(Debug, thiserror::Error)] 256 + enum StoredOpError { 257 + #[error("operation is not an object")] 258 + NotAnObject, 259 + #[error("missing required field: {0}")] 260 + MissingField(StoredOpField), 261 + #[error("invalid field {0}: {1}")] 262 + InvalidField(StoredOpField, #[source] anyhow::Error), 263 + #[error("type mismatch for field {0}: expected {1}")] 264 + TypeMismatch(StoredOpField, &'static str), 265 + } 266 + 267 #[derive(Debug, Clone, Serialize, Deserialize)] 268 struct StoredService { 269 r#type: String, ··· 292 } 293 294 impl StoredOp { 295 + fn from_json_value(v: serde_json::Value) -> (Option<Self>, Vec<StoredOpError>) { 296 + let serde_json::Value::Object(mut obj) = v else { 297 + return (None, vec![StoredOpError::NotAnObject]); 298 + }; 299 300 + let mut errors = Vec::new(); 301 + let mut unknown = BTreeMap::new(); 302 + 303 + let op_type = match obj.remove(&*StoredOpField::Type) { 304 + Some(serde_json::Value::String(s)) => OpType::from_str(&s), 305 + Some(v) => { 306 + errors.push(StoredOpError::TypeMismatch(StoredOpField::Type, "string")); 307 + unknown.insert(StoredOpField::Type.to_string(), v); 308 + OpType::Other(String::new()) 309 + } 310 + Option::None => { 311 + errors.push(StoredOpError::MissingField(StoredOpField::Type)); 312 + OpType::Other(String::new()) 313 + } 314 }; 315 316 + let sig = match obj.remove(&*StoredOpField::Sig) { 317 + Some(serde_json::Value::String(s)) => match Signature::from_base64url(&s) { 318 + Ok(sig) => sig, 319 + Err(e) => { 320 + errors.push(StoredOpError::InvalidField(StoredOpField::Sig, e)); 321 + unknown.insert(StoredOpField::Sig.to_string(), serde_json::Value::String(s)); 322 + Signature(Vec::new()) 323 + } 324 + }, 325 + Some(v) => { 326 + errors.push(StoredOpError::TypeMismatch(StoredOpField::Sig, "string")); 327 + unknown.insert(StoredOpField::Sig.to_string(), v); 328 + Signature(Vec::new()) 329 + } 330 + Option::None => { 331 + errors.push(StoredOpError::MissingField(StoredOpField::Sig)); 332 + Signature(Vec::new()) 333 + } 334 + }; 335 336 + let prev = match obj.remove(&*StoredOpField::Prev) { 337 + Some(serde_json::Value::Null) | Option::None => Option::None, 338 + Some(serde_json::Value::String(s)) => match PlcCid::from_cid_str(&s) { 339 + Ok(p) => Some(p), 340 + Err(e) => { 341 + errors.push(StoredOpError::InvalidField(StoredOpField::Prev, e)); 342 + unknown.insert( 343 + StoredOpField::Prev.to_string(), 344 + serde_json::Value::String(s), 345 + ); 346 + Option::None 347 + } 348 + }, 349 + Some(v) => { 350 + errors.push(StoredOpError::TypeMismatch(StoredOpField::Prev, "string")); 351 + unknown.insert(StoredOpField::Prev.to_string(), v); 352 + Option::None 353 + } 354 + }; 355 356 + let rotation_keys = match obj.remove(&*StoredOpField::RotationKeys) { 357 + Some(serde_json::Value::Array(arr)) => { 358 + let mut keys = Vec::with_capacity(arr.len()); 359 + let mut failed = false; 360 + for v in arr { 361 + match v { 362 + serde_json::Value::String(s) => match DidKey::from_did_key(&s) { 363 + Ok(k) => keys.push(k), 364 + Err(e) => { 365 + errors.push(StoredOpError::InvalidField( 366 + StoredOpField::RotationKeys, 367 + e, 368 + )); 369 + failed = true; 370 + break; 371 + } 372 + }, 373 + _ => { 374 + errors.push(StoredOpError::TypeMismatch( 375 + StoredOpField::RotationKeys, 376 + "string inside array", 377 + )); 378 + failed = true; 379 + break; 380 + } 381 + } 382 + } 383 + if failed { 384 + // we don't have the original array anymore here because we consumed it, 385 + // but we can reconstruct it for the unknown map if we really want to, 386 + // though usually we just move the whole thing. 387 + // since we consumed it, let's just push an error and return None. 388 + // to be absolutely correct about preserving 'unknown', we should have peeked or cloned. 389 + // let's adjust the implementation to clone if we suspect it might fail, 390 + // OR just move the whole value back if it fails. 391 + Option::None 392 + } else { 393 + Some(keys) 394 + } 395 + } 396 + Some(v) => { 397 + errors.push(StoredOpError::TypeMismatch( 398 + StoredOpField::RotationKeys, 399 + "array", 400 + )); 401 + unknown.insert(StoredOpField::RotationKeys.to_string(), v); 402 + Option::None 403 + } 404 + Option::None => Option::None, 405 }; 406 407 + let verification_methods = match obj.remove(&*StoredOpField::VerificationMethods) { 408 + Some(serde_json::Value::Object(map)) => { 409 + let mut methods = BTreeMap::new(); 410 + let mut failed = false; 411 + for (k, v) in map { 412 + match v { 413 + serde_json::Value::String(s) => match DidKey::from_did_key(&s) { 414 + Ok(key) => { 415 + methods.insert(k, key); 416 + } 417 + Err(e) => { 418 + errors.push(StoredOpError::InvalidField( 419 + StoredOpField::VerificationMethods, 420 + e, 421 + )); 422 + failed = true; 423 + break; 424 + } 425 + }, 426 + _ => { 427 + errors.push(StoredOpError::TypeMismatch( 428 + StoredOpField::VerificationMethods, 429 + "string value in object", 430 + )); 431 + failed = true; 432 + break; 433 + } 434 + } 435 + } 436 + if failed { Option::None } else { Some(methods) } 437 + } 438 + Some(v) => { 439 + errors.push(StoredOpError::TypeMismatch( 440 + StoredOpField::VerificationMethods, 441 + "object", 442 + )); 443 + unknown.insert(StoredOpField::VerificationMethods.to_string(), v); 444 + Option::None 445 + } 446 + Option::None => Option::None, 447 + }; 448 449 + let also_known_as = match obj.remove(&*StoredOpField::AlsoKnownAs) { 450 + Some(serde_json::Value::Array(arr)) => Some( 451 + arr.into_iter() 452 + .filter_map(|v| match v { 453 + serde_json::Value::String(s) => Some(Aka::from_str(&s)), 454 + _ => None, 455 }) 456 + .collect(), 457 + ), 458 + Some(v) => { 459 + errors.push(StoredOpError::TypeMismatch( 460 + StoredOpField::AlsoKnownAs, 461 + "array", 462 + )); 463 + unknown.insert(StoredOpField::AlsoKnownAs.to_string(), v); 464 + Option::None 465 + } 466 + Option::None => Option::None, 467 + }; 468 469 + let services = match obj.remove(&*StoredOpField::Services) { 470 + Some(serde_json::Value::Object(map)) => Some( 471 + map.into_iter() 472 + .filter_map(|(k, v)| { 473 + let svc = StoredService { 474 + r#type: v.get("type")?.as_str()?.to_string(), 475 + endpoint: v.get("endpoint")?.as_str()?.to_string(), 476 + }; 477 + Some((k, svc)) 478 + }) 479 + .collect(), 480 + ), 481 + Some(v) => { 482 + errors.push(StoredOpError::TypeMismatch( 483 + StoredOpField::Services, 484 + "object", 485 + )); 486 + unknown.insert(StoredOpField::Services.to_string(), v); 487 + Option::None 488 + } 489 + Option::None => Option::None, 490 + }; 491 492 + let signing_key = match obj.remove(&*StoredOpField::SigningKey) { 493 + Some(serde_json::Value::String(s)) => match DidKey::from_did_key(&s) { 494 + Ok(key) => Some(key), 495 + Err(e) => { 496 + errors.push(StoredOpError::InvalidField(StoredOpField::SigningKey, e)); 497 + unknown.insert( 498 + StoredOpField::SigningKey.to_string(), 499 + serde_json::Value::String(s), 500 + ); 501 + Option::None 502 + } 503 + }, 504 + Some(v) => { 505 + errors.push(StoredOpError::TypeMismatch( 506 + StoredOpField::SigningKey, 507 + "string", 508 + )); 509 + unknown.insert(StoredOpField::SigningKey.to_string(), v); 510 + Option::None 511 + } 512 + Option::None => Option::None, 513 + }; 514 515 + let recovery_key = match obj.remove(&*StoredOpField::RecoveryKey) { 516 + Some(serde_json::Value::String(s)) => match DidKey::from_did_key(&s) { 517 + Ok(key) => Some(key), 518 + Err(e) => { 519 + errors.push(StoredOpError::InvalidField(StoredOpField::RecoveryKey, e)); 520 + unknown.insert( 521 + StoredOpField::RecoveryKey.to_string(), 522 + serde_json::Value::String(s), 523 + ); 524 + Option::None 525 + } 526 + }, 527 + Some(v) => { 528 + errors.push(StoredOpError::TypeMismatch( 529 + StoredOpField::RecoveryKey, 530 + "string", 531 + )); 532 + unknown.insert(StoredOpField::RecoveryKey.to_string(), v); 533 + Option::None 534 + } 535 + Option::None => Option::None, 536 + }; 537 538 + let handle = match obj.remove(&*StoredOpField::Handle) { 539 + Some(serde_json::Value::String(s)) => Some(s), 540 + Some(v) => { 541 + errors.push(StoredOpError::TypeMismatch(StoredOpField::Handle, "string")); 542 + unknown.insert(StoredOpField::Handle.to_string(), v); 543 + Option::None 544 + } 545 + Option::None => Option::None, 546 + }; 547 548 + let service = match obj.remove(&*StoredOpField::Service) { 549 + Some(serde_json::Value::String(s)) => Some(s), 550 + Some(v) => { 551 + errors.push(StoredOpError::TypeMismatch( 552 + StoredOpField::Service, 553 + "string", 554 + )); 555 + unknown.insert(StoredOpField::Service.to_string(), v); 556 + Option::None 557 + } 558 + Option::None => Option::None, 559 + }; 560 561 for (k, v) in obj { 562 + unknown.insert(k, v); 563 } 564 565 + ( 566 + Some(Self { 567 + op_type, 568 + sig, 569 + prev, 570 + rotation_keys, 571 + verification_methods, 572 + also_known_as, 573 + services, 574 + signing_key, 575 + recovery_key, 576 + handle, 577 + service, 578 + unknown, 579 + }), 580 + errors, 581 + ) 582 } 583 584 fn to_json_value(&self) -> serde_json::Value { 585 let mut map = serde_json::Map::new(); 586 587 + map.insert((*StoredOpField::Type).into(), self.op_type.as_str().into()); 588 + map.insert((*StoredOpField::Sig).into(), self.sig.to_string().into()); 589 map.insert( 590 + (*StoredOpField::Prev).into(), 591 self.prev 592 .as_ref() 593 .map(|c| serde_json::Value::String(c.to_string())) ··· 596 597 if let Some(keys) = &self.rotation_keys { 598 map.insert( 599 + (*StoredOpField::RotationKeys).into(), 600 keys.iter() 601 .map(|k| serde_json::Value::String(k.to_string())) 602 .collect::<Vec<_>>() ··· 609 .iter() 610 .map(|(k, v)| (k.clone(), serde_json::Value::String(v.to_string()))) 611 .collect(); 612 + map.insert((*StoredOpField::VerificationMethods).into(), obj.into()); 613 } 614 615 if let Some(aka) = &self.also_known_as { 616 map.insert( 617 + (*StoredOpField::AlsoKnownAs).into(), 618 aka.iter() 619 .map(|h| serde_json::Value::String(h.to_string())) 620 .collect::<Vec<_>>() ··· 635 ) 636 }) 637 .collect(); 638 + map.insert((*StoredOpField::Services).into(), obj.into()); 639 } 640 641 // legacy create fields 642 if let Some(key) = &self.signing_key { 643 + map.insert((*StoredOpField::SigningKey).into(), key.to_string().into()); 644 } 645 if let Some(key) = &self.recovery_key { 646 + map.insert((*StoredOpField::RecoveryKey).into(), key.to_string().into()); 647 } 648 if let Some(handle) = &self.handle { 649 + map.insert((*StoredOpField::Handle).into(), handle.clone().into()); 650 } 651 if let Some(service) = &self.service { 652 + map.insert((*StoredOpField::Service).into(), service.clone().into()); 653 } 654 655 for (k, v) in &self.unknown { ··· 751 encode_did(&mut encoded_did, &op.did)?; 752 753 let json_val: serde_json::Value = serde_json::from_str(op.operation.get())?; 754 + let (stored, mut errors) = StoredOp::from_json_value(json_val); 755 + 756 + let Some(operation) = stored else { 757 + return Err(errors.remove(0)).context("fatal operation parse error"); 758 + }; 759 + 760 + for e in &errors { 761 + log::warn!("failed to parse operation {} {}: {}", op.did, op.cid, e); 762 + } 763 + if !errors.is_empty() { 764 + // if parse failed but not fatal, we just dont store it 765 + return Ok(0); 766 + } 767 + 768 let db_op = DbOp { 769 did: encoded_did, 770 nullified: op.nullified, 771 + operation, 772 }; 773 let value = rmp_serde::to_vec(&db_op)?; 774 batch.insert(&self.inner.ops, &ts_key, &value); ··· 1016 1017 for entry in &entries { 1018 let op = &entry["operation"]; 1019 + let (stored, errors) = StoredOp::from_json_value(op.clone()); 1020 + if !errors.is_empty() { 1021 + let mut msg = format!("failed to parse op in {path}:\n"); 1022 + for e in errors { 1023 + msg.push_str(&format!(" - {e:?}\n")); 1024 + } 1025 + msg.push_str(&format!("op: {op}\n")); 1026 + panic!("{msg}"); 1027 + } 1028 1029 // msgpack verification 1030 let packed = rmp_serde::to_vec(&stored).unwrap();