AT-based link agregator. Mirror of https://github.com/likeandscribe/frontpage

Store invalid messages in dead letter queue (#213)

* Store invalid messages in dead letter queue

* Abstract dlq inputs

* Remove unused key fn

* Add command for getting dlq messages

authored by tom.sherman.is and committed by

GitHub 1b108d87 626d80f3

+103 -29
+22 -6
Cargo.lock
··· 412 412 dependencies = [ 413 413 "anyhow", 414 414 "atrium-api", 415 - "bincode", 416 415 "chrono", 417 416 "dotenv-flow", 418 417 "drainpipe-store", 419 418 "env_logger", 420 - "flume", 421 419 "futures-util", 420 + "jetstream", 422 421 "log", 423 422 "reqwest", 424 423 "serde", 425 424 "serde_json", 426 425 "sled", 427 - "thiserror 2.0.17", 428 426 "tokio", 429 427 "tokio-metrics", 430 - "tokio-tungstenite", 431 - "url", 432 - "zstd", 433 428 ] 434 429 435 430 [[package]] ··· 439 434 "chrono", 440 435 "dotenv-flow", 441 436 "drainpipe-store", 437 + "serde_json", 442 438 "structopt", 443 439 ] 444 440 ··· 448 444 dependencies = [ 449 445 "anyhow", 450 446 "bincode", 447 + "jetstream", 451 448 "log", 452 449 "serde", 450 + "serde_json", 453 451 "sled", 454 452 ] 455 453 ··· 1013 1011 version = "1.0.15" 1014 1012 source = "registry+https://github.com/rust-lang/crates.io-index" 1015 1013 checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" 1014 + 1015 + [[package]] 1016 + name = "jetstream" 1017 + version = "0.1.0" 1018 + dependencies = [ 1019 + "atrium-api", 1020 + "chrono", 1021 + "flume", 1022 + "futures-util", 1023 + "log", 1024 + "serde", 1025 + "serde_json", 1026 + "thiserror 2.0.17", 1027 + "tokio", 1028 + "tokio-tungstenite", 1029 + "url", 1030 + "zstd", 1031 + ] 1016 1032 1017 1033 [[package]] 1018 1034 name = "jiff"
+1
Cargo.toml
··· 3 3 "packages-rs/drainpipe", 4 4 "packages-rs/drainpipe-cli", 5 5 "packages-rs/drainpipe-store", 6 + "packages-rs/jetstream", 6 7 ] 7 8 resolver = "2"
+1
packages-rs/drainpipe-cli/Cargo.toml
··· 8 8 drainpipe-store = { path = "../drainpipe-store" } 9 9 dotenv-flow = "0.16.2" 10 10 chrono = "0.4.42" 11 + serde_json = "1.0.145"
+15
packages-rs/drainpipe-cli/src/main.rs
··· 18 18 #[structopt(long, env = "STORE_LOCATION", parse(from_os_str))] 19 19 db: PathBuf, 20 20 }, 21 + 22 + GetDeadLetterMessages { 23 + #[structopt(long, env = "STORE_LOCATION", parse(from_os_str))] 24 + db: PathBuf, 25 + }, 21 26 } 22 27 23 28 fn main() { ··· 45 50 } else { 46 51 println!("Cursor not set"); 47 52 std::process::exit(1); 53 + } 54 + } 55 + 56 + Opt::GetDeadLetterMessages { db } => { 57 + let store = drainpipe_store::Store::open(&db).expect("Could not open store"); 58 + let messages = store 59 + .get_dead_letter_messages() 60 + .expect("Could not get dead letter messages"); 61 + for message in messages { 62 + println!("{}", serde_json::to_string_pretty(&message).unwrap()); 48 63 } 49 64 } 50 65 }
+2
packages-rs/drainpipe-store/Cargo.toml
··· 6 6 [dependencies] 7 7 anyhow = "1.0.93" 8 8 bincode = "1.3.3" 9 + jetstream = { path = "../jetstream" } 9 10 log = "0.4.22" 10 11 serde = { version = "1.0.215", features = ["derive"] } 12 + serde_json = "1.0.133" 11 13 sled = "0.34.7"
+35 -9
packages-rs/drainpipe-store/src/lib.rs
··· 5 5 use sled::Tree; 6 6 7 7 pub struct Store { 8 + db: sled::Db, 8 9 cursor_tree: Tree, 9 10 dead_letter_tree: Tree, 10 11 } ··· 50 51 error_message, 51 52 }) 52 53 } 53 - 54 - pub fn key(&self) -> &String { 55 - match &self.0 { 56 - DeadLetterInner::V1 { key, .. } => key, 57 - } 58 - } 59 54 } 60 55 61 56 impl Store { 62 57 pub fn open(path: &PathBuf) -> anyhow::Result<Store> { 63 58 let db = sled::open(path)?; 64 59 Ok(Self { 60 + db: db.clone(), 65 61 cursor_tree: db.open_tree("cursor")?, 66 62 dead_letter_tree: db.open_tree("dead_letter")?, 67 63 }) ··· 92 88 .transpose() 93 89 } 94 90 95 - pub fn record_dead_letter(&self, dead_letter: &DeadLetter) -> anyhow::Result<()> { 96 - self.dead_letter_tree 97 - .insert(dead_letter.key(), bincode::serialize(&dead_letter)?)?; 91 + fn record_dead_letter(&self, commit_json: String, error_message: String) -> anyhow::Result<()> { 92 + let key = self.db.generate_id()?.to_string(); 93 + self.dead_letter_tree.insert( 94 + key.clone(), 95 + bincode::serialize(&DeadLetter::new(key, commit_json, error_message))?, 96 + )?; 98 97 Ok(()) 98 + } 99 + 100 + pub fn record_dead_letter_commit( 101 + &self, 102 + commit: &jetstream::event::CommitEvent, 103 + error_message: String, 104 + ) -> anyhow::Result<()> { 105 + self.record_dead_letter(serde_json::to_string(commit)?, error_message) 106 + } 107 + 108 + pub fn record_dead_letter_jetstream_error( 109 + &self, 110 + error: &jetstream::error::JetstreamEventError, 111 + ) -> anyhow::Result<()> { 112 + self.record_dead_letter("null".into(), error.to_string()) 113 + } 114 + 115 + pub fn get_dead_letter_messages(&self) -> anyhow::Result<Vec<DeadLetter>> { 116 + let mut messages = Vec::new(); 117 + 118 + for item in self.dead_letter_tree.iter() { 119 + let (_key, value) = item?; 120 + let message: DeadLetter = 121 + bincode::deserialize(&value).context("Failed to deserialize dead letter")?; 122 + messages.push(message); 123 + } 124 + Ok(messages) 99 125 } 100 126 }
+2 -7
packages-rs/drainpipe/Cargo.toml
··· 6 6 [dependencies] 7 7 anyhow = "1.0.93" 8 8 atrium-api = "0.24.7" 9 - bincode = "1.3.3" 10 - chrono = { version = "0.4.38", features = ["serde"] } 9 + chrono = "0.4.38" 11 10 dotenv-flow = "0.16.2" 12 11 drainpipe-store = { path = "../drainpipe-store" } 13 12 env_logger = "0.11.5" 14 - flume = "0.11.1" 15 13 futures-util = "0.3" 14 + jetstream = { path = "../jetstream" } 16 15 log = "0.4.22" 17 16 reqwest = { version = "0.12.4", features = ["json"] } 18 17 serde = { version = "1.0.215", features = ["derive"] } 19 18 serde_json = "1.0.132" 20 19 sled = "0.34.7" 21 - thiserror = "2.0.3" 22 20 tokio = { version = "1.38.0", features = ["full"] } 23 21 tokio-metrics = "0.3.1" 24 - tokio-tungstenite = { version = "0.24.0", features = ["native-tls"] } 25 - url = "2.5.3" 26 - zstd = "0.13.2"
packages-rs/drainpipe/src/jetstream.rs packages-rs/jetstream/src/lib.rs
packages-rs/drainpipe/src/jetstream/error.rs packages-rs/jetstream/src/error.rs
packages-rs/drainpipe/src/jetstream/event.rs packages-rs/jetstream/src/event.rs
+2 -7
packages-rs/drainpipe/src/main.rs
··· 1 1 mod config; 2 - mod jetstream; 3 2 4 3 use chrono::{TimeZone, Utc}; 5 4 use config::Config; ··· 79 78 80 79 send_frontpage_commit(&config, commit).await.or_else(|e| { 81 80 log::error!("Error processing commit: {:?}", e); 82 - store.record_dead_letter(&drainpipe_store::DeadLetter::new( 83 - commit.info().time_us.to_string(), 84 - serde_json::to_string(commit)?, 85 - e.to_string(), 86 - )) 81 + store.record_dead_letter_commit(&commit, e.to_string()) 87 82 })? 88 83 } 89 84 ··· 95 90 } 96 91 97 92 Ok(Err(e)) => { 98 - // TODO: This should add a dead letter 93 + store.record_dead_letter_jetstream_error(&e)?; 99 94 log::error!( 100 95 "Error receiving event (possible junk event structure?): {:?}", 101 96 e
packages-rs/drainpipe/zstd_dictionary packages-rs/jetstream/zstd_dictionary
+18
packages-rs/jetstream/Cargo.toml
··· 1 + [package] 2 + name = "jetstream" 3 + version = "0.1.0" 4 + edition = "2021" 5 + 6 + [dependencies] 7 + atrium-api = "0.24.8" 8 + chrono = { version = "0.4.38", features = ["serde"] } 9 + flume = "0.11.1" 10 + futures-util = "0.3.31" 11 + log = "0.4.22" 12 + serde = { version = "1.0.215", features = ["derive"] } 13 + serde_json = "1.0.132" 14 + thiserror = "2.0.3" 15 + tokio = { version = "1.38.0", features = ["full"] } 16 + tokio-tungstenite = { version = "0.24.0", features = ["native-tls"] } 17 + url = "2.5.4" 18 + zstd = "0.13.2"
+5
packages-rs/jetstream/README.md
··· 1 + # Jetstream Client 2 + 3 + A client for [jetstream](https://github.com/bluesky-social/jetstream). 4 + 5 + This started as a fork of [jetstream-oxide](https://github.com/videah/jetstream-oxide) but has since diverged with a few breaking changes and critical fixes.