···11+[package]
22+name = "tapped"
33+version = "0.1.0"
44+edition = "2021"
55+description = "Rust wrapper for the tap ATProto utility"
66+license = "MIT"
77+readme = "README.md"
88+authors = ["Thomas Karpiniec <tom.karpiniec@outlook.com>"]
99+keywords = ["atproto", "tap"]
1010+repository = "https://github.com/thombles/tapped"
1111+1212+[dependencies]
1313+tokio = { version = "1", features = ["net", "process", "rt", "sync", "time"] }
1414+reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json"] }
1515+tokio-tungstenite = { version = "0.26", features = ["rustls-tls-native-roots"] }
1616+serde = { version = "1", features = ["derive"] }
1717+serde_json = "1"
1818+thiserror = "2"
1919+url = { version = "2", features = ["serde"] }
2020+futures-util = "0.3"
2121+tracing = "0.1"
2222+base64 = "0.22"
2323+libc = "0.2"
2424+2525+[dev-dependencies]
2626+tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
2727+which = "7"
2828+env_logger = "0.11"
2929+log = "0.4"
+254
README.md
···11+# tapped
22+33+A Rust wrapper library for the [`tap`](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) ATProto sync utility.
44+55+`tapped` provides an idiomatic async Rust interface for spawning and communicating with a `tap` subprocess, making it easy to build applications that sync data from the ATProto network.
66+77+## Features
88+99+- Spawn and manage `tap` subprocesses with graceful shutdown
1010+- Strongly-typed configuration for all tap envvars
1111+- Strongly-typed async Rust functions covering all of tap's HTTP API endpoints
1212+- WebSocket-based event channel with automatic acknowledgment
1313+1414+## Installation
1515+1616+Add to your `Cargo.toml`:
1717+1818+```toml
1919+[dependencies]
2020+tapped = "0.1"
2121+```
2222+2323+You'll also need the `tap` binary. Build it from the [indigo repository](https://github.com/bluesky-social/indigo):
2424+2525+```bash
2626+cd cmd/tap && go build
2727+```
2828+2929+`tapped` has been most recently tested against:
3030+3131+```
3232+tap version v0.0.0-20260114211028-207c9d49d0de-rev-207c9d4
3333+```
3434+3535+## Quick Start
3636+3737+```rust
3838+use tapped::{TapHandle, TapConfig, Event};
3939+4040+#[tokio::main]
4141+async fn main() -> tapped::Result<()> {
4242+ let config = TapConfig::builder()
4343+ .database_url("sqlite://tap.db")
4444+ .collection_filter("app.bsky.feed.post")
4545+ .build();
4646+4747+ // Spawn tap and connect
4848+ let handle = TapHandle::spawn_default(config).await?;
4949+5050+ // Subscribe to events
5151+ let mut channel = handle.channel().await?;
5252+5353+ while let Ok(received) = channel.recv().await {
5454+ match &received.event {
5555+ Event::Record(record) => {
5656+ println!("[{}] {}/{}",
5757+ record.action,
5858+ record.collection,
5959+ record.rkey
6060+ );
6161+ }
6262+ Event::Identity(identity) => {
6363+ println!("Identity: {} -> {}", identity.did, identity.handle);
6464+ }
6565+ }
6666+ // Event is auto-acknowledged when `received` is dropped
6767+ }
6868+6969+ Ok(())
7070+}
7171+```
7272+7373+## Usage Patterns
7474+7575+### Connect to Existing Instance
7676+7777+If you have a tap instance already running:
7878+7979+```rust
8080+use tapped::TapClient;
8181+8282+let client = TapClient::new("http://localhost:2480")?;
8383+client.health().await?;
8484+```
8585+8686+### Spawn with Custom Binary Path
8787+8888+```rust
8989+use tapped::{TapProcess, TapConfig};
9090+9191+let config = TapConfig::builder()
9292+ .database_url("sqlite://my-app.db")
9393+ .build();
9494+9595+let mut process = TapProcess::spawn("/path/to/tap", config).await?;
9696+let client = process.client();
9797+9898+// Use the client...
9999+100100+process.shutdown().await?;
101101+```
102102+103103+### Using TapHandle (Recommended)
104104+105105+`TapHandle` combines process management and client access:
106106+107107+```rust
108108+use tapped::{TapHandle, TapConfig};
109109+110110+let config = TapConfig::builder()
111111+ .database_url("sqlite://app.db")
112112+ .full_network(false)
113113+ .build();
114114+115115+let handle = TapHandle::spawn_default(config).await?;
116116+117117+// TapHandle derefs to TapClient, so you can call client methods directly
118118+handle.health().await?;
119119+let count = handle.repo_count().await?;
120120+println!("Tracking {} repos", count);
121121+```
122122+123123+### Configuration Options
124124+125125+```rust
126126+use tapped::{TapConfig, LogLevel};
127127+use std::time::Duration;
128128+129129+let config = TapConfig::builder()
130130+ // Database
131131+ .database_url("sqlite://tap.db")
132132+ .max_db_conns(10)
133133+134134+ // Network
135135+ .bind("127.0.0.1:2480")
136136+ .relay_url("wss://bsky.network")
137137+ .plc_url("https://plc.directory")
138138+139139+ // Filtering
140140+ .signal_collection("app.bsky.feed.post")
141141+ .collection_filter("app.bsky.feed.post")
142142+ .collection_filter("app.bsky.feed.like")
143143+ .full_network(false)
144144+145145+ // Performance
146146+ .firehose_parallelism(10)
147147+ .resync_parallelism(5)
148148+ .outbox_parallelism(10)
149149+ .outbox_capacity(10000)
150150+151151+ // Timeouts
152152+ .repo_fetch_timeout(Duration::from_secs(30))
153153+ .startup_timeout(Duration::from_secs(60))
154154+ .shutdown_timeout(Duration::from_secs(10))
155155+156156+ // Logging
157157+ .log_level(LogLevel::Info)
158158+159159+ .build();
160160+```
161161+162162+### Working with Events
163163+164164+Events are automatically acknowledged when dropped:
165165+166166+```rust
167167+use tapped::{Event, RecordAction};
168168+169169+let mut channel = client.channel().await?;
170170+171171+while let Ok(received) = channel.recv().await {
172172+ match &received.event {
173173+ Event::Record(record) => {
174174+ match record.action {
175175+ RecordAction::Create => {
176176+ if let Some(ref rec) = record.record {
177177+ // Access the raw JSON
178178+ println!("Type: {:?}", rec.record_type());
179179+180180+ // Or deserialize to a specific type
181181+ // let post: MyPostType = rec.deserialize_as()?;
182182+ }
183183+ }
184184+ RecordAction::Update => { /* ... */ }
185185+ RecordAction::Delete => { /* ... */ }
186186+ _ => {}
187187+ }
188188+ }
189189+ Event::Identity(identity) => {
190190+ println!("{} is now @{}", identity.did, identity.handle);
191191+ }
192192+ }
193193+ // Ack sent automatically here when `received` goes out of scope
194194+}
195195+```
196196+197197+### Managing Repositories
198198+199199+```rust
200200+// Add repos to track
201201+client.add_repos(&["did:plc:abc123", "did:plc:def456"]).await?;
202202+203203+// Remove repos
204204+client.remove_repos(&["did:plc:abc123"]).await?;
205205+206206+// Get info about a specific repo
207207+let info = client.repo_info("did:plc:def456").await?;
208208+println!("State: {:?}, Records: {}", info.state, info.records);
209209+210210+// Resolve a DID to its document
211211+let doc = client.resolve_did("did:plc:def456").await?;
212212+println!("Handles: {:?}", doc.also_known_as);
213213+```
214214+215215+### Checking Stats
216216+217217+```rust
218218+let repos = client.repo_count().await?;
219219+let records = client.record_count().await?;
220220+let outbox = client.outbox_buffer().await?;
221221+let resync = client.resync_buffer().await?;
222222+let cursors = client.cursors().await?;
223223+224224+println!("Tracking {} repos with {} records", repos, records);
225225+println!("Outbox buffer: {}, Resync buffer: {}", outbox, resync);
226226+println!("Firehose cursor: {:?}", cursors.firehose);
227227+```
228228+229229+## Error Handling
230230+231231+All operations return `tapped::Result<T>`, which uses the `tapped::Error` enum:
232232+233233+```rust
234234+use tapped::Error;
235235+236236+match client.health().await {
237237+ Ok(()) => println!("Healthy!"),
238238+ Err(Error::Http(e)) => println!("HTTP error: {}", e),
239239+ Err(Error::Timeout) => println!("Request timed out"),
240240+ Err(e) => println!("Other error: {}", e),
241241+}
242242+```
243243+244244+## Example: Syncing Standard Site Records
245245+246246+See [examples/standard_site_sync.rs](examples/standard_site_sync.rs) for a complete example that syncs `site.standard.publication` and `site.standard.document` records to local files.
247247+248248+```bash
249249+cargo run --example standard_site_sync
250250+```
251251+252252+## License
253253+254254+MIT
+295
examples/standard_site_sync.rs
···11+//! Example: Sync site.standard.publication and site.standard.document records.
22+//!
33+//! This example demonstrates using tapped to track publication and document
44+//! records from the ATProto network. It maintains an on-disk cache and
55+//! writes URL lists to disk on each change for inspection.
66+//!
77+//! Run with: `RUST_LOG=info cargo run --example standard_site_sync`
88+99+use std::collections::HashMap;
1010+use std::fs;
1111+1212+use log::info;
1313+use serde::{Deserialize, Serialize};
1414+use tapped::{Event, RecordAction, RecordEvent, TapConfig, TapProcess};
1515+1616+/// A site.standard.publication record (subset of fields).
1717+#[derive(Debug, Deserialize)]
1818+struct Publication {
1919+ url: String,
2020+ name: String,
2121+}
2222+2323+/// A site.standard.document record (subset of fields).
2424+#[derive(Debug, Deserialize)]
2525+struct Document {
2626+ site: String,
2727+ title: String,
2828+ #[serde(default)]
2929+ path: Option<String>,
3030+}
3131+3232+/// Key for storing records in cache.
3333+#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
3434+struct RecordKey {
3535+ did: String,
3636+ collection: String,
3737+ rkey: String,
3838+}
3939+4040+impl RecordKey {
4141+ fn new(event: &RecordEvent) -> Self {
4242+ Self {
4343+ did: event.did.clone(),
4444+ collection: event.collection.clone(),
4545+ rkey: event.rkey.clone(),
4646+ }
4747+ }
4848+}
4949+5050+/// Cached record data.
5151+#[derive(Debug, Clone, Serialize, Deserialize)]
5252+#[serde(tag = "type")]
5353+enum CachedRecord {
5454+ Publication { url: String },
5555+ Document { site: String, path: Option<String> },
5656+}
5757+5858+fn main() -> Result<(), Box<dyn std::error::Error>> {
5959+ tokio::runtime::Builder::new_multi_thread()
6060+ .enable_all()
6161+ .build()?
6262+ .block_on(async_main())
6363+}
6464+6565+async fn async_main() -> Result<(), Box<dyn std::error::Error>> {
6666+ env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
6767+ .format_timestamp_secs()
6868+ .init();
6969+7070+ info!("Starting standard.site sync example");
7171+7272+ let config = TapConfig::builder()
7373+ .database_url("sqlite://./tap-example.db")
7474+ .bind(":2480")
7575+ .signal_collection("site.standard.publication")
7676+ .collection_filters(vec![
7777+ "site.standard.publication".to_string(),
7878+ "site.standard.document".to_string(),
7979+ ])
8080+ .disable_acks(false)
8181+ .inherit_stdio(true)
8282+ .build();
8383+8484+ info!("Spawning tap process...");
8585+8686+ // Spawn tap (looks for ./tap first, then tap on PATH)
8787+ let process = TapProcess::spawn_default(config).await?;
8888+ info!("Tap running at {}", process.url());
8989+9090+ let client = process.client()?;
9191+ client.health().await?;
9292+ info!("Tap is healthy!");
9393+9494+ let mut receiver = client.channel().await?;
9595+ info!("Connected! Waiting for events...");
9696+9797+ // In-memory cache - load from disk if available
9898+ let mut cache: HashMap<RecordKey, CachedRecord> = load_cache_from_disk();
9999+ let (pub_count, doc_count) = count_cached(&cache);
100100+ info!(
101101+ "Loaded cache from disk: {} publications, {} documents",
102102+ pub_count, doc_count
103103+ );
104104+105105+ let mut live_count = 0u64;
106106+ let mut backfill_count = 0u64;
107107+108108+ loop {
109109+ match receiver.recv().await {
110110+ Ok(received) => {
111111+ if let Event::Record(ref record_event) = *received {
112112+ // Track live vs backfill
113113+ if record_event.live {
114114+ live_count += 1;
115115+ } else {
116116+ backfill_count += 1;
117117+ }
118118+119119+ process_record_event(record_event, &mut cache);
120120+ write_output_files(&cache)?;
121121+122122+ // Periodically show event source breakdown
123123+ if (live_count + backfill_count).is_multiple_of(100) {
124124+ info!(
125125+ "[Stats] Live events: {}, Backfill events: {}",
126126+ live_count, backfill_count
127127+ );
128128+ }
129129+ } else if let Event::Identity(ref identity_event) = *received {
130130+ info!(
131131+ "[IDENTITY] {} -> {} (active: {})",
132132+ identity_event.did, identity_event.handle, identity_event.is_active
133133+ );
134134+ }
135135+ // Event is automatically acked when `received` is dropped here
136136+ }
137137+ Err(e) => {
138138+ eprintln!("Error receiving event: {}", e);
139139+ break;
140140+ }
141141+ }
142142+ }
143143+144144+ Ok(())
145145+}
146146+147147+fn process_record_event(event: &RecordEvent, cache: &mut HashMap<RecordKey, CachedRecord>) {
148148+ let key = RecordKey::new(event);
149149+ let action_str = match event.action {
150150+ RecordAction::Create => "CREATE",
151151+ RecordAction::Update => "UPDATE",
152152+ RecordAction::Delete => "DELETE",
153153+ _ => "UNKNOWN",
154154+ };
155155+156156+ // Show whether this is a live or backfill event
157157+ let source = if event.live { "LIVE" } else { "BACKFILL" };
158158+159159+ info!(
160160+ "[{} {}] {} {}/{}",
161161+ action_str, source, event.did, event.collection, event.rkey
162162+ );
163163+164164+ match event.action {
165165+ RecordAction::Create | RecordAction::Update => {
166166+ if let Some(ref record) = event.record {
167167+ match event.collection.as_str() {
168168+ "site.standard.publication" => match record.deserialize_as::<Publication>() {
169169+ Ok(pub_record) => {
170170+ info!("Publication: {} ({})", pub_record.name, pub_record.url);
171171+ cache.insert(
172172+ key,
173173+ CachedRecord::Publication {
174174+ url: pub_record.url,
175175+ },
176176+ );
177177+ }
178178+ Err(e) => {
179179+ log::error!("Failed to parse publication: {}", e);
180180+ }
181181+ },
182182+ "site.standard.document" => match record.deserialize_as::<Document>() {
183183+ Ok(doc_record) => {
184184+ let full_url = match &doc_record.path {
185185+ Some(path) if !path.is_empty() => {
186186+ format!(
187187+ "{}/{}",
188188+ doc_record.site.trim_end_matches('/'),
189189+ path.trim_start_matches('/')
190190+ )
191191+ }
192192+ _ => doc_record.site.clone(),
193193+ };
194194+ info!("Document: {} ({})", doc_record.title, full_url);
195195+ cache.insert(
196196+ key,
197197+ CachedRecord::Document {
198198+ site: doc_record.site,
199199+ path: doc_record.path,
200200+ },
201201+ );
202202+ }
203203+ Err(e) => {
204204+ log::error!("Failed to parse document: {}", e);
205205+ }
206206+ },
207207+ _ => {}
208208+ }
209209+ }
210210+ }
211211+ RecordAction::Delete => {
212212+ cache.remove(&key);
213213+ }
214214+ _ => {}
215215+ }
216216+217217+ // Log cache stats
218218+ let (pub_count, doc_count) = count_cached(cache);
219219+ info!(
220220+ "Cached: {} publications, {} documents",
221221+ pub_count, doc_count
222222+ );
223223+}
224224+225225+fn count_cached(cache: &HashMap<RecordKey, CachedRecord>) -> (usize, usize) {
226226+ let mut publications = 0;
227227+ let mut documents = 0;
228228+229229+ for record in cache.values() {
230230+ match record {
231231+ CachedRecord::Publication { .. } => publications += 1,
232232+ CachedRecord::Document { .. } => documents += 1,
233233+ }
234234+ }
235235+236236+ (publications, documents)
237237+}
238238+239239+fn write_output_files(cache: &HashMap<RecordKey, CachedRecord>) -> Result<(), std::io::Error> {
240240+ let mut publication_urls: Vec<&str> = Vec::new();
241241+ let mut document_urls: Vec<String> = Vec::new();
242242+243243+ for record in cache.values() {
244244+ match record {
245245+ CachedRecord::Publication { url } => {
246246+ publication_urls.push(url);
247247+ }
248248+ CachedRecord::Document { site, path } => {
249249+ let full_url = match path {
250250+ Some(p) if !p.is_empty() => {
251251+ format!(
252252+ "{}/{}",
253253+ site.trim_end_matches('/'),
254254+ p.trim_start_matches('/')
255255+ )
256256+ }
257257+ _ => site.clone(),
258258+ };
259259+ document_urls.push(full_url);
260260+ }
261261+ }
262262+ }
263263+264264+ publication_urls.sort();
265265+ document_urls.sort();
266266+267267+ fs::write("publications.txt", publication_urls.join("\n"))?;
268268+269269+ // Write documents.txt (publication + document URLs combined)
270270+ let mut all_urls: Vec<String> = publication_urls.iter().map(|s| s.to_string()).collect();
271271+ all_urls.extend(document_urls);
272272+ all_urls.sort();
273273+ all_urls.dedup();
274274+ fs::write("documents.txt", all_urls.join("\n"))?;
275275+276276+ // Write cache.json for persistence
277277+ let cache_entries: Vec<(&RecordKey, &CachedRecord)> = cache.iter().collect();
278278+ let cache_json = serde_json::to_string_pretty(&cache_entries).map_err(std::io::Error::other)?;
279279+ fs::write("cache.json", cache_json)?;
280280+281281+ Ok(())
282282+}
283283+284284+fn load_cache_from_disk() -> HashMap<RecordKey, CachedRecord> {
285285+ match fs::read_to_string("cache.json") {
286286+ Ok(content) => match serde_json::from_str::<Vec<(RecordKey, CachedRecord)>>(&content) {
287287+ Ok(entries) => entries.into_iter().collect(),
288288+ Err(e) => {
289289+ log::warn!("Failed to parse cache.json: {}", e);
290290+ HashMap::new()
291291+ }
292292+ },
293293+ Err(_) => HashMap::new(),
294294+ }
295295+}
+209
src/channel.rs
···11+//! WebSocket event channel and receiver.
22+33+use futures_util::{SinkExt, StreamExt};
44+use serde::Serialize;
55+use tokio::sync::mpsc;
66+use tokio_tungstenite::{connect_async, tungstenite::Message};
77+use url::Url;
88+99+use crate::types::RawEvent;
1010+use crate::{Error, Event, Result};
1111+1212+type WsStream =
1313+ tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
1414+type WsSink = futures_util::stream::SplitSink<WsStream, Message>;
1515+type WsSource = futures_util::stream::SplitStream<WsStream>;
1616+1717+/// Receiver for events from a tap WebSocket channel.
1818+///
1919+/// Events are received via the [`recv`](EventReceiver::recv) method.
2020+/// Acknowledgments are sent automatically when events are dropped.
2121+///
2222+/// This type does not implement auto-reconnection. If the connection
2323+/// closes, `recv()` will return an error and you must create a new
2424+/// `EventReceiver` via [`TapClient::channel()`](crate::TapClient::channel).
2525+pub struct EventReceiver {
2626+ event_rx: mpsc::Receiver<Result<EventWithAck>>,
2727+ _ack_tx: mpsc::Sender<u64>,
2828+}
2929+3030+struct EventWithAck {
3131+ event: Event,
3232+ ack_tx: mpsc::Sender<u64>,
3333+}
3434+3535+struct AckGuard {
3636+ id: u64,
3737+ ack_tx: Option<mpsc::Sender<u64>>,
3838+}
3939+4040+impl Drop for AckGuard {
4141+ fn drop(&mut self) {
4242+ if let Some(tx) = self.ack_tx.take() {
4343+ // Fire and forget - if the channel is closed, we can't ack anyway
4444+ let id = self.id;
4545+ tokio::spawn(async move {
4646+ let _ = tx.send(id).await;
4747+ });
4848+ }
4949+ }
5050+}
5151+5252+/// Wrapper around Event that includes the ack trigger.
5353+pub struct ReceivedEvent {
5454+ pub event: Event,
5555+ _ack_guard: AckGuard,
5656+}
5757+5858+impl std::ops::Deref for ReceivedEvent {
5959+ type Target = Event;
6060+6161+ fn deref(&self) -> &Self::Target {
6262+ &self.event
6363+ }
6464+}
6565+6666+impl std::fmt::Debug for ReceivedEvent {
6767+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
6868+ self.event.fmt(f)
6969+ }
7070+}
7171+7272+impl EventReceiver {
7373+ /// Connect to a tap WebSocket channel.
7474+ pub(crate) async fn connect(base_url: &Url, admin_password: Option<&str>) -> Result<Self> {
7575+ let mut ws_url = base_url.clone();
7676+ match ws_url.scheme() {
7777+ "http" => ws_url.set_scheme("ws").unwrap(),
7878+ "https" => ws_url.set_scheme("wss").unwrap(),
7979+ _ => {}
8080+ }
8181+ ws_url.set_path("/channel");
8282+8383+ if let Some(password) = admin_password {
8484+ ws_url
8585+ .set_username("admin")
8686+ .map_err(|_| Error::InvalidUrl("cannot set username".into()))?;
8787+ ws_url
8888+ .set_password(Some(password))
8989+ .map_err(|_| Error::InvalidUrl("cannot set password".into()))?;
9090+ }
9191+9292+ let (ws_stream, response) = connect_async(ws_url.as_str())
9393+ .await
9494+ .map_err(|e| Error::WebSocket(Box::new(e)))?;
9595+9696+ if response.status().as_u16() == 400 {
9797+ return Err(Error::WebhookModeActive);
9898+ }
9999+100100+ let (write, read) = ws_stream.split();
101101+102102+ let (event_tx, event_rx) = mpsc::channel::<Result<EventWithAck>>(100);
103103+ let (ack_tx, ack_rx) = mpsc::channel::<u64>(1000);
104104+105105+ let ack_tx_clone = ack_tx.clone();
106106+ tokio::spawn(async move {
107107+ Self::writer_task(write, ack_rx).await;
108108+ });
109109+110110+ tokio::spawn(async move {
111111+ Self::reader_task(read, event_tx, ack_tx_clone).await;
112112+ });
113113+114114+ Ok(Self {
115115+ event_rx,
116116+ _ack_tx: ack_tx,
117117+ })
118118+ }
119119+120120+ /// Receive the next event.
121121+ ///
122122+ /// Returns the event wrapped in a [`ReceivedEvent`] that automatically
123123+ /// sends an acknowledgment when dropped.
124124+ ///
125125+ /// # Errors
126126+ ///
127127+ /// Returns [`Error::ChannelClosed`] if the WebSocket connection closes.
128128+ pub async fn recv(&mut self) -> Result<ReceivedEvent> {
129129+ match self.event_rx.recv().await {
130130+ Some(Ok(event_with_ack)) => {
131131+ let id = event_with_ack.event.id();
132132+ Ok(ReceivedEvent {
133133+ event: event_with_ack.event,
134134+ _ack_guard: AckGuard {
135135+ id,
136136+ ack_tx: Some(event_with_ack.ack_tx),
137137+ },
138138+ })
139139+ }
140140+ Some(Err(e)) => Err(e),
141141+ None => Err(Error::ChannelClosed),
142142+ }
143143+ }
144144+145145+ /// Writer task: sends ack messages to the WebSocket.
146146+ async fn writer_task(mut write: WsSink, mut ack_rx: mpsc::Receiver<u64>) {
147147+ #[derive(Serialize)]
148148+ struct AckMessage {
149149+ #[serde(rename = "type")]
150150+ type_: &'static str,
151151+ id: u64,
152152+ }
153153+154154+ while let Some(id) = ack_rx.recv().await {
155155+ let msg = AckMessage { type_: "ack", id };
156156+ let json = match serde_json::to_string(&msg) {
157157+ Ok(j) => j,
158158+ Err(e) => {
159159+ tracing::warn!("Failed to serialize ack: {}", e);
160160+ continue;
161161+ }
162162+ };
163163+164164+ if let Err(e) = write.send(Message::Text(json.into())).await {
165165+ tracing::warn!("Failed to send ack: {}", e);
166166+ break;
167167+ }
168168+ }
169169+ }
170170+171171+ /// Reader task: reads events from WebSocket and sends to channel.
172172+ async fn reader_task(
173173+ mut read: WsSource,
174174+ event_tx: mpsc::Sender<Result<EventWithAck>>,
175175+ ack_tx: mpsc::Sender<u64>,
176176+ ) {
177177+ while let Some(msg_result) = read.next().await {
178178+ match msg_result {
179179+ Ok(Message::Text(text)) => match serde_json::from_str::<RawEvent>(&text) {
180180+ Ok(raw) => {
181181+ if let Some(event) = raw.into_event() {
182182+ let event_with_ack = EventWithAck {
183183+ event,
184184+ ack_tx: ack_tx.clone(),
185185+ };
186186+ if event_tx.send(Ok(event_with_ack)).await.is_err() {
187187+ break;
188188+ }
189189+ }
190190+ }
191191+ Err(e) => {
192192+ tracing::warn!("Failed to parse event: {}", e);
193193+ }
194194+ },
195195+ Ok(Message::Close(_)) => {
196196+ let _ = event_tx.send(Err(Error::ChannelClosed)).await;
197197+ break;
198198+ }
199199+ Ok(_) => {
200200+ // Ignore ping/pong/binary
201201+ }
202202+ Err(e) => {
203203+ let _ = event_tx.send(Err(Error::WebSocket(Box::new(e)))).await;
204204+ break;
205205+ }
206206+ }
207207+ }
208208+ }
209209+}
+362
src/client.rs
···11+//! HTTP client for tap API.
22+33+use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
44+use reqwest::Response;
55+use serde::de::DeserializeOwned;
66+use serde::Serialize;
77+use url::Url;
88+99+use crate::channel::EventReceiver;
1010+use crate::types::{
1111+ ApiError, Cursors, DidDocument, OutboxBufferResponse, RecordCountResponse, RepoCountResponse,
1212+ RepoInfo, ResyncBufferResponse,
1313+};
1414+use crate::{Error, Result, TapConfig};
1515+1616+/// HTTP client for interacting with a tap instance.
1717+///
1818+/// Provides methods for all tap HTTP endpoints. The client is cheap to clone
1919+/// and can be shared across tasks.
2020+#[derive(Debug, Clone)]
2121+pub struct TapClient {
2222+ client: reqwest::Client,
2323+ base_url: Url,
2424+ admin_password: Option<String>,
2525+}
2626+2727+impl TapClient {
2828+ /// Create a new client connecting to the given URL.
2929+ ///
3030+ /// # Example
3131+ ///
3232+ /// ```no_run
3333+ /// use tapped::TapClient;
3434+ ///
3535+ /// let client = TapClient::new("http://localhost:2480")?;
3636+ /// # Ok::<(), tapped::Error>(())
3737+ /// ```
3838+ pub fn new(url: impl AsRef<str>) -> Result<Self> {
3939+ Self::with_config(url, &TapConfig::default())
4040+ }
4141+4242+ /// Create a new client with Basic auth.
4343+ ///
4444+ /// # Example
4545+ ///
4646+ /// ```no_run
4747+ /// use tapped::TapClient;
4848+ ///
4949+ /// let client = TapClient::with_auth("http://localhost:2480", "secret")?;
5050+ /// # Ok::<(), tapped::Error>(())
5151+ /// ```
5252+ pub fn with_auth(url: impl AsRef<str>, password: impl Into<String>) -> Result<Self> {
5353+ let config = TapConfig::builder().admin_password(password.into()).build();
5454+ Self::with_config(url, &config)
5555+ }
5656+5757+ /// Create a new client with the given configuration.
5858+ pub fn with_config(url: impl AsRef<str>, config: &TapConfig) -> Result<Self> {
5959+ let base_url: Url = url
6060+ .as_ref()
6161+ .parse()
6262+ .map_err(|_| Error::InvalidUrl(url.as_ref().to_string()))?;
6363+6464+ let timeout = config.request_timeout();
6565+ let client = reqwest::Client::builder()
6666+ .timeout(timeout)
6767+ .build()
6868+ .map_err(Error::Http)?;
6969+7070+ Ok(Self {
7171+ client,
7272+ base_url,
7373+ admin_password: config.admin_password.clone(),
7474+ })
7575+ }
7676+7777+ /// Get the base URL of the tap instance.
7878+ pub fn url(&self) -> &Url {
7979+ &self.base_url
8080+ }
8181+8282+ /// Build authorization headers if password is set.
8383+ fn auth_headers(&self) -> HeaderMap {
8484+ let mut headers = HeaderMap::new();
8585+ if let Some(ref password) = self.admin_password {
8686+ use base64::Engine;
8787+ let credentials = format!("admin:{}", password);
8888+ let encoded = base64::engine::general_purpose::STANDARD.encode(credentials);
8989+ if let Ok(value) = HeaderValue::from_str(&format!("Basic {}", encoded)) {
9090+ headers.insert(AUTHORIZATION, value);
9191+ }
9292+ }
9393+ headers
9494+ }
9595+9696+ /// Handle a response, returning an error for non-success status codes.
9797+ async fn handle_response<T: DeserializeOwned>(resp: Response) -> Result<T> {
9898+ if resp.status().is_success() {
9999+ Ok(resp.json().await?)
100100+ } else {
101101+ Err(Self::error_from_response(resp).await)
102102+ }
103103+ }
104104+105105+ /// Handle a response that returns no body on success.
106106+ async fn handle_empty_response(resp: Response) -> Result<()> {
107107+ if resp.status().is_success() {
108108+ let _ = resp.bytes().await;
109109+ Ok(())
110110+ } else {
111111+ Err(Self::error_from_response(resp).await)
112112+ }
113113+ }
114114+115115+ /// Extract an error from a failed response.
116116+ async fn error_from_response(resp: Response) -> Error {
117117+ let status = resp.status().as_u16();
118118+ let message = resp
119119+ .json::<ApiError>()
120120+ .await
121121+ .map(|e| e.message)
122122+ .unwrap_or_else(|_| "Unknown error".into());
123123+ Error::Api { status, message }
124124+ }
125125+126126+ /// Check if the tap instance is healthy.
127127+ ///
128128+ /// # Example
129129+ ///
130130+ /// ```no_run
131131+ /// # async fn example() -> tapped::Result<()> {
132132+ /// use tapped::TapClient;
133133+ ///
134134+ /// let client = TapClient::new("http://localhost:2480")?;
135135+ /// client.health().await?;
136136+ /// println!("Tap is healthy!");
137137+ /// # Ok(())
138138+ /// # }
139139+ /// ```
140140+ pub async fn health(&self) -> Result<()> {
141141+ let url = self.base_url.join("/health")?;
142142+ let resp = self
143143+ .client
144144+ .get(url)
145145+ .headers(self.auth_headers())
146146+ .send()
147147+ .await?;
148148+ Self::handle_empty_response(resp).await
149149+ }
150150+151151+ /// Add DIDs to track.
152152+ ///
153153+ /// Triggers backfill for newly added repos.
154154+ ///
155155+ /// # Example
156156+ ///
157157+ /// ```no_run
158158+ /// # async fn example() -> tapped::Result<()> {
159159+ /// use tapped::TapClient;
160160+ ///
161161+ /// let client = TapClient::new("http://localhost:2480")?;
162162+ /// client.add_repos(&["did:plc:example1234567890abc"]).await?;
163163+ /// # Ok(())
164164+ /// # }
165165+ /// ```
166166+ pub async fn add_repos(&self, dids: &[impl AsRef<str>]) -> Result<()> {
167167+ #[derive(Serialize)]
168168+ struct Payload {
169169+ dids: Vec<String>,
170170+ }
171171+172172+ let payload = Payload {
173173+ dids: dids.iter().map(|d| d.as_ref().to_string()).collect(),
174174+ };
175175+176176+ let url = self.base_url.join("/repos/add")?;
177177+ let resp = self
178178+ .client
179179+ .post(url)
180180+ .headers(self.auth_headers())
181181+ .json(&payload)
182182+ .send()
183183+ .await?;
184184+ Self::handle_empty_response(resp).await
185185+ }
186186+187187+ /// Remove DIDs from tracking.
188188+ ///
189189+ /// Stops sync and deletes tracked repo metadata. Does not delete buffered
190190+ /// events in the outbox.
191191+ ///
192192+ /// # Example
193193+ ///
194194+ /// ```no_run
195195+ /// # async fn example() -> tapped::Result<()> {
196196+ /// use tapped::TapClient;
197197+ ///
198198+ /// let client = TapClient::new("http://localhost:2480")?;
199199+ /// client.remove_repos(&["did:plc:example1234567890abc"]).await?;
200200+ /// # Ok(())
201201+ /// # }
202202+ /// ```
203203+ pub async fn remove_repos(&self, dids: &[impl AsRef<str>]) -> Result<()> {
204204+ #[derive(Serialize)]
205205+ struct Payload {
206206+ dids: Vec<String>,
207207+ }
208208+209209+ let payload = Payload {
210210+ dids: dids.iter().map(|d| d.as_ref().to_string()).collect(),
211211+ };
212212+213213+ let url = self.base_url.join("/repos/remove")?;
214214+ let resp = self
215215+ .client
216216+ .post(url)
217217+ .headers(self.auth_headers())
218218+ .json(&payload)
219219+ .send()
220220+ .await?;
221221+ Self::handle_empty_response(resp).await
222222+ }
223223+224224+ /// Resolve a DID to its DID document.
225225+ ///
226226+ /// # Example
227227+ ///
228228+ /// ```no_run
229229+ /// # async fn example() -> tapped::Result<()> {
230230+ /// use tapped::TapClient;
231231+ ///
232232+ /// let client = TapClient::new("http://localhost:2480")?;
233233+ /// let doc = client.resolve_did("did:plc:example1234567890abc").await?;
234234+ /// println!("Handle: {:?}", doc.also_known_as);
235235+ /// # Ok(())
236236+ /// # }
237237+ /// ```
238238+ pub async fn resolve_did(&self, did: &str) -> Result<DidDocument> {
239239+ let url = self.base_url.join(&format!("/resolve/{}", did))?;
240240+ let resp = self
241241+ .client
242242+ .get(url)
243243+ .headers(self.auth_headers())
244244+ .send()
245245+ .await?;
246246+ Self::handle_response(resp).await
247247+ }
248248+249249+ /// Get information about a tracked repository.
250250+ ///
251251+ /// # Example
252252+ ///
253253+ /// ```no_run
254254+ /// # async fn example() -> tapped::Result<()> {
255255+ /// use tapped::TapClient;
256256+ ///
257257+ /// let client = TapClient::new("http://localhost:2480")?;
258258+ /// let info = client.repo_info("did:plc:example1234567890abc").await?;
259259+ /// println!("State: {:?}, Records: {}", info.state, info.records);
260260+ /// # Ok(())
261261+ /// # }
262262+ /// ```
263263+ pub async fn repo_info(&self, did: &str) -> Result<RepoInfo> {
264264+ let url = self.base_url.join(&format!("/info/{}", did))?;
265265+ let resp = self
266266+ .client
267267+ .get(url)
268268+ .headers(self.auth_headers())
269269+ .send()
270270+ .await?;
271271+ Self::handle_response(resp).await
272272+ }
273273+274274+ /// Get the total number of tracked repositories.
275275+ pub async fn repo_count(&self) -> Result<u64> {
276276+ let url = self.base_url.join("/stats/repo-count")?;
277277+ let resp = self
278278+ .client
279279+ .get(url)
280280+ .headers(self.auth_headers())
281281+ .send()
282282+ .await?;
283283+ let data: RepoCountResponse = Self::handle_response(resp).await?;
284284+ Ok(data.repo_count)
285285+ }
286286+287287+ /// Get the total number of tracked records.
288288+ pub async fn record_count(&self) -> Result<u64> {
289289+ let url = self.base_url.join("/stats/record-count")?;
290290+ let resp = self
291291+ .client
292292+ .get(url)
293293+ .headers(self.auth_headers())
294294+ .send()
295295+ .await?;
296296+ let data: RecordCountResponse = Self::handle_response(resp).await?;
297297+ Ok(data.record_count)
298298+ }
299299+300300+ /// Get the number of events in the outbox buffer.
301301+ pub async fn outbox_buffer(&self) -> Result<u64> {
302302+ let url = self.base_url.join("/stats/outbox-buffer")?;
303303+ let resp = self
304304+ .client
305305+ .get(url)
306306+ .headers(self.auth_headers())
307307+ .send()
308308+ .await?;
309309+ let data: OutboxBufferResponse = Self::handle_response(resp).await?;
310310+ Ok(data.outbox_buffer)
311311+ }
312312+313313+ /// Get the number of events in the resync buffer.
314314+ pub async fn resync_buffer(&self) -> Result<u64> {
315315+ let url = self.base_url.join("/stats/resync-buffer")?;
316316+ let resp = self
317317+ .client
318318+ .get(url)
319319+ .headers(self.auth_headers())
320320+ .send()
321321+ .await?;
322322+ let data: ResyncBufferResponse = Self::handle_response(resp).await?;
323323+ Ok(data.resync_buffer)
324324+ }
325325+326326+ /// Get the current cursor positions.
327327+ pub async fn cursors(&self) -> Result<Cursors> {
328328+ let url = self.base_url.join("/stats/cursors")?;
329329+ let resp = self
330330+ .client
331331+ .get(url)
332332+ .headers(self.auth_headers())
333333+ .send()
334334+ .await?;
335335+ Self::handle_response(resp).await
336336+ }
337337+338338+ /// Connect to the WebSocket event channel.
339339+ ///
340340+ /// Returns an [`EventReceiver`] for receiving events. Events are
341341+ /// automatically acknowledged when dropped.
342342+ ///
343343+ /// # Example
344344+ ///
345345+ /// ```no_run
346346+ /// # async fn example() -> tapped::Result<()> {
347347+ /// use tapped::TapClient;
348348+ ///
349349+ /// let client = TapClient::new("http://localhost:2480")?;
350350+ /// let mut receiver = client.channel().await?;
351351+ ///
352352+ /// while let Ok(event) = receiver.recv().await {
353353+ /// println!("Event: {:?}", event);
354354+ /// // Event is automatically acknowledged when dropped
355355+ /// }
356356+ /// # Ok(())
357357+ /// # }
358358+ /// ```
359359+ pub async fn channel(&self) -> Result<EventReceiver> {
360360+ EventReceiver::connect(&self.base_url, self.admin_password.as_deref()).await
361361+ }
362362+}
+444
src/config.rs
···11+//! Configuration types for tap process and client.
22+33+use std::time::Duration;
44+use url::Url;
55+66+/// Log level for tap process.
77+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
88+#[non_exhaustive]
99+pub enum LogLevel {
1010+ Debug,
1111+ #[default]
1212+ Info,
1313+ Warn,
1414+ Error,
1515+}
1616+1717+impl LogLevel {
1818+ /// Convert to the string value expected by tap.
1919+ pub fn as_str(&self) -> &'static str {
2020+ match self {
2121+ LogLevel::Debug => "debug",
2222+ LogLevel::Info => "info",
2323+ LogLevel::Warn => "warn",
2424+ LogLevel::Error => "error",
2525+ }
2626+ }
2727+}
2828+2929+/// Configuration for a tap instance.
3030+///
3131+/// All fields are optional. When spawning a tap process, unset fields will use
3232+/// tap's built-in defaults. When connecting to an existing instance, only
3333+/// client-side options (timeouts, auth) are relevant.
3434+///
3535+/// Use [`TapConfig::builder()`] for ergonomic construction.
3636+#[derive(Debug, Clone, Default)]
3737+pub struct TapConfig {
3838+ // Database
3939+ /// Database connection string (sqlite://path or postgres://...)
4040+ pub database_url: Option<String>,
4141+ /// Maximum number of database connections
4242+ pub max_db_conns: Option<u32>,
4343+4444+ // Server
4545+ /// HTTP server bind address (e.g., ":2480", "127.0.0.1:2480", or "[::1]:2480")
4646+ pub bind: Option<String>,
4747+ /// Basic auth admin password for all requests
4848+ pub admin_password: Option<String>,
4949+ /// Address for metrics/pprof server
5050+ pub metrics_listen: Option<String>,
5151+ /// Log verbosity level
5252+ pub log_level: Option<LogLevel>,
5353+5454+ // AT Protocol
5555+ /// PLC directory URL
5656+ pub plc_url: Option<Url>,
5757+ /// AT Protocol relay URL
5858+ pub relay_url: Option<Url>,
5959+6060+ // Processing
6161+ /// Number of parallel firehose event processors
6262+ pub firehose_parallelism: Option<u32>,
6363+ /// Number of parallel resync workers
6464+ pub resync_parallelism: Option<u32>,
6565+ /// Number of parallel outbox workers
6666+ pub outbox_parallelism: Option<u32>,
6767+ /// How often to save firehose cursor
6868+ pub cursor_save_interval: Option<Duration>,
6969+ /// Timeout for fetching repo CARs from PDS
7070+ pub repo_fetch_timeout: Option<Duration>,
7171+ /// Size of in-process identity cache
7272+ pub ident_cache_size: Option<u32>,
7373+ /// Size of outbox before back pressure
7474+ pub outbox_capacity: Option<u32>,
7575+ /// Timeout before retrying unacked events
7676+ pub retry_timeout: Option<Duration>,
7777+7878+ // Network boundary
7979+ /// Track all repos on the network
8080+ pub full_network: Option<bool>,
8181+ /// Track repos with records in this collection
8282+ pub signal_collection: Option<String>,
8383+8484+ // Filtering
8585+ /// Filter output records by collection (supports wildcards)
8686+ pub collection_filters: Option<Vec<String>>,
8787+8888+ // Delivery mode
8989+ /// Enable fire-and-forget mode (no client acks)
9090+ pub disable_acks: Option<bool>,
9191+ /// Webhook URL for event delivery
9292+ pub webhook_url: Option<Url>,
9393+ /// Run in outbox-only mode
9494+ pub outbox_only: Option<bool>,
9595+9696+ // Client-side options (not sent to tap process)
9797+ /// Forward tap's stdout/stderr to this process (default: false)
9898+ pub inherit_stdio: Option<bool>,
9999+ /// Graceful shutdown timeout (default: 5s)
100100+ pub shutdown_timeout: Option<Duration>,
101101+ /// HTTP request timeout (default: 30s)
102102+ pub request_timeout: Option<Duration>,
103103+ /// Max wait for tap to become healthy (default: 30s)
104104+ pub startup_timeout: Option<Duration>,
105105+}
106106+107107+impl TapConfig {
108108+ /// Create a new empty configuration.
109109+ pub fn new() -> Self {
110110+ Self::default()
111111+ }
112112+113113+ /// Create a builder for ergonomic configuration.
114114+ pub fn builder() -> TapConfigBuilder {
115115+ TapConfigBuilder::default()
116116+ }
117117+118118+ /// Get the shutdown timeout, or the default (5 seconds).
119119+ pub fn shutdown_timeout(&self) -> Duration {
120120+ self.shutdown_timeout.unwrap_or(Duration::from_secs(5))
121121+ }
122122+123123+ /// Get the request timeout, or the default (30 seconds).
124124+ pub fn request_timeout(&self) -> Duration {
125125+ self.request_timeout.unwrap_or(Duration::from_secs(30))
126126+ }
127127+128128+ /// Get the startup timeout, or the default (30 seconds).
129129+ pub fn startup_timeout(&self) -> Duration {
130130+ self.startup_timeout.unwrap_or(Duration::from_secs(30))
131131+ }
132132+133133+ /// Whether to inherit stdio from the parent process (default: false).
134134+ pub fn inherit_stdio(&self) -> bool {
135135+ self.inherit_stdio.unwrap_or(false)
136136+ }
137137+138138+ /// Convert configuration to environment variables for subprocess.
139139+ pub fn to_env_vars(&self) -> Vec<(String, String)> {
140140+ let mut vars = Vec::new();
141141+142142+ /// Helper macro to push an env var if the field is Some.
143143+ macro_rules! push_env {
144144+ ($field:expr, $name:literal, clone) => {
145145+ if let Some(ref v) = $field {
146146+ vars.push(($name.into(), v.clone()));
147147+ }
148148+ };
149149+ ($field:expr, $name:literal, string) => {
150150+ if let Some(v) = $field {
151151+ vars.push(($name.into(), v.to_string()));
152152+ }
153153+ };
154154+ ($field:expr, $name:literal, ref_string) => {
155155+ if let Some(ref v) = $field {
156156+ vars.push(($name.into(), v.to_string()));
157157+ }
158158+ };
159159+ ($field:expr, $name:literal, as_str) => {
160160+ if let Some(ref v) = $field {
161161+ vars.push(($name.into(), v.as_str().into()));
162162+ }
163163+ };
164164+ ($field:expr, $name:literal, duration) => {
165165+ if let Some(v) = $field {
166166+ vars.push(($name.into(), format_duration(v)));
167167+ }
168168+ };
169169+ }
170170+171171+ push_env!(self.database_url, "TAP_DATABASE_URL", clone);
172172+ push_env!(self.max_db_conns, "TAP_MAX_DB_CONNS", string);
173173+ push_env!(self.bind, "TAP_BIND", clone);
174174+ push_env!(self.admin_password, "TAP_ADMIN_PASSWORD", clone);
175175+ push_env!(self.metrics_listen, "TAP_METRICS_LISTEN", clone);
176176+ push_env!(self.log_level, "TAP_LOG_LEVEL", as_str);
177177+ push_env!(self.plc_url, "TAP_PLC_URL", ref_string);
178178+ push_env!(self.relay_url, "TAP_RELAY_URL", ref_string);
179179+ push_env!(
180180+ self.firehose_parallelism,
181181+ "TAP_FIREHOSE_PARALLELISM",
182182+ string
183183+ );
184184+ push_env!(self.resync_parallelism, "TAP_RESYNC_PARALLELISM", string);
185185+ push_env!(self.outbox_parallelism, "TAP_OUTBOX_PARALLELISM", string);
186186+ push_env!(
187187+ self.cursor_save_interval,
188188+ "TAP_CURSOR_SAVE_INTERVAL",
189189+ duration
190190+ );
191191+ push_env!(self.repo_fetch_timeout, "TAP_REPO_FETCH_TIMEOUT", duration);
192192+ push_env!(self.ident_cache_size, "RELAY_IDENT_CACHE_SIZE", string);
193193+ push_env!(self.outbox_capacity, "TAP_OUTBOX_CAPACITY", string);
194194+ push_env!(self.retry_timeout, "TAP_RETRY_TIMEOUT", duration);
195195+ push_env!(self.full_network, "TAP_FULL_NETWORK", string);
196196+ push_env!(self.signal_collection, "TAP_SIGNAL_COLLECTION", clone);
197197+ push_env!(self.disable_acks, "TAP_DISABLE_ACKS", string);
198198+ push_env!(self.webhook_url, "TAP_WEBHOOK_URL", ref_string);
199199+ push_env!(self.outbox_only, "TAP_OUTBOX_ONLY", string);
200200+201201+ if let Some(ref v) = self.collection_filters {
202202+ vars.push(("TAP_COLLECTION_FILTERS".into(), v.join(",")));
203203+ }
204204+205205+ vars
206206+ }
207207+}
208208+209209+/// Format a Duration as a Go-style duration string (e.g., "30s", "5m").
210210+fn format_duration(d: Duration) -> String {
211211+ let secs = d.as_secs();
212212+ let millis = d.subsec_millis();
213213+214214+ if millis == 0 {
215215+ if secs > 0 && secs.is_multiple_of(3600) {
216216+ format!("{}h", secs / 3600)
217217+ } else if secs > 0 && secs.is_multiple_of(60) {
218218+ format!("{}m", secs / 60)
219219+ } else {
220220+ format!("{}s", secs)
221221+ }
222222+ } else {
223223+ format!("{}ms", d.as_millis())
224224+ }
225225+}
226226+227227+/// Builder for [`TapConfig`].
228228+#[derive(Debug, Clone, Default)]
229229+pub struct TapConfigBuilder {
230230+ config: TapConfig,
231231+}
232232+233233+impl TapConfigBuilder {
234234+ /// Set the database URL.
235235+ pub fn database_url(mut self, url: impl Into<String>) -> Self {
236236+ self.config.database_url = Some(url.into());
237237+ self
238238+ }
239239+240240+ /// Set the maximum number of database connections.
241241+ pub fn max_db_conns(mut self, n: u32) -> Self {
242242+ self.config.max_db_conns = Some(n);
243243+ self
244244+ }
245245+246246+ /// Set the HTTP server bind address.
247247+ pub fn bind(mut self, addr: impl Into<String>) -> Self {
248248+ self.config.bind = Some(addr.into());
249249+ self
250250+ }
251251+252252+ /// Set the admin password for Basic auth.
253253+ pub fn admin_password(mut self, password: impl Into<String>) -> Self {
254254+ self.config.admin_password = Some(password.into());
255255+ self
256256+ }
257257+258258+ /// Set the metrics server listen address.
259259+ pub fn metrics_listen(mut self, addr: impl Into<String>) -> Self {
260260+ self.config.metrics_listen = Some(addr.into());
261261+ self
262262+ }
263263+264264+ /// Set the log level.
265265+ pub fn log_level(mut self, level: LogLevel) -> Self {
266266+ self.config.log_level = Some(level);
267267+ self
268268+ }
269269+270270+ /// Set the PLC directory URL.
271271+ pub fn plc_url(mut self, url: Url) -> Self {
272272+ self.config.plc_url = Some(url);
273273+ self
274274+ }
275275+276276+ /// Set the relay URL.
277277+ pub fn relay_url(mut self, url: Url) -> Self {
278278+ self.config.relay_url = Some(url);
279279+ self
280280+ }
281281+282282+ /// Set the firehose parallelism.
283283+ pub fn firehose_parallelism(mut self, n: u32) -> Self {
284284+ self.config.firehose_parallelism = Some(n);
285285+ self
286286+ }
287287+288288+ /// Set the resync parallelism.
289289+ pub fn resync_parallelism(mut self, n: u32) -> Self {
290290+ self.config.resync_parallelism = Some(n);
291291+ self
292292+ }
293293+294294+ /// Set the outbox parallelism.
295295+ pub fn outbox_parallelism(mut self, n: u32) -> Self {
296296+ self.config.outbox_parallelism = Some(n);
297297+ self
298298+ }
299299+300300+ /// Set how often to save the firehose cursor.
301301+ pub fn cursor_save_interval(mut self, d: Duration) -> Self {
302302+ self.config.cursor_save_interval = Some(d);
303303+ self
304304+ }
305305+306306+ /// Set the repo fetch timeout.
307307+ pub fn repo_fetch_timeout(mut self, d: Duration) -> Self {
308308+ self.config.repo_fetch_timeout = Some(d);
309309+ self
310310+ }
311311+312312+ /// Set the identity cache size.
313313+ pub fn ident_cache_size(mut self, n: u32) -> Self {
314314+ self.config.ident_cache_size = Some(n);
315315+ self
316316+ }
317317+318318+ /// Set the outbox capacity.
319319+ pub fn outbox_capacity(mut self, n: u32) -> Self {
320320+ self.config.outbox_capacity = Some(n);
321321+ self
322322+ }
323323+324324+ /// Set the retry timeout for unacked events.
325325+ pub fn retry_timeout(mut self, d: Duration) -> Self {
326326+ self.config.retry_timeout = Some(d);
327327+ self
328328+ }
329329+330330+ /// Enable full network mode.
331331+ pub fn full_network(mut self, enabled: bool) -> Self {
332332+ self.config.full_network = Some(enabled);
333333+ self
334334+ }
335335+336336+ /// Set the signal collection for repo discovery.
337337+ pub fn signal_collection(mut self, collection: impl Into<String>) -> Self {
338338+ self.config.signal_collection = Some(collection.into());
339339+ self
340340+ }
341341+342342+ /// Add a collection filter.
343343+ ///
344344+ /// This can be called multiple times to add multiple filters.
345345+ /// Supports wildcards (e.g., "app.bsky.feed.*").
346346+ pub fn collection_filter(mut self, filter: impl Into<String>) -> Self {
347347+ self.config
348348+ .collection_filters
349349+ .get_or_insert_with(Vec::new)
350350+ .push(filter.into());
351351+ self
352352+ }
353353+354354+ /// Set collection filters.
355355+ pub fn collection_filters(mut self, filters: Vec<String>) -> Self {
356356+ self.config.collection_filters = Some(filters);
357357+ self
358358+ }
359359+360360+ /// Disable acknowledgments (fire-and-forget mode).
361361+ pub fn disable_acks(mut self, disabled: bool) -> Self {
362362+ self.config.disable_acks = Some(disabled);
363363+ self
364364+ }
365365+366366+ /// Set the webhook URL for event delivery.
367367+ pub fn webhook_url(mut self, url: Url) -> Self {
368368+ self.config.webhook_url = Some(url);
369369+ self
370370+ }
371371+372372+ /// Enable outbox-only mode.
373373+ pub fn outbox_only(mut self, enabled: bool) -> Self {
374374+ self.config.outbox_only = Some(enabled);
375375+ self
376376+ }
377377+378378+ /// Set the graceful shutdown timeout.
379379+ pub fn shutdown_timeout(mut self, d: Duration) -> Self {
380380+ self.config.shutdown_timeout = Some(d);
381381+ self
382382+ }
383383+384384+ /// Set the HTTP request timeout.
385385+ pub fn request_timeout(mut self, d: Duration) -> Self {
386386+ self.config.request_timeout = Some(d);
387387+ self
388388+ }
389389+390390+ /// Set the startup health check timeout.
391391+ pub fn startup_timeout(mut self, d: Duration) -> Self {
392392+ self.config.startup_timeout = Some(d);
393393+ self
394394+ }
395395+396396+ /// Forward tap's stdout/stderr to this process.
397397+ ///
398398+ /// When enabled, tap's output will be visible in the terminal.
399399+ /// When disabled (default), tap's output is discarded.
400400+ pub fn inherit_stdio(mut self, inherit: bool) -> Self {
401401+ self.config.inherit_stdio = Some(inherit);
402402+ self
403403+ }
404404+405405+ /// Build the configuration.
406406+ pub fn build(self) -> TapConfig {
407407+ self.config
408408+ }
409409+}
410410+411411+#[cfg(test)]
412412+mod tests {
413413+ use super::*;
414414+415415+ #[test]
416416+ fn test_format_duration() {
417417+ assert_eq!(format_duration(Duration::from_secs(30)), "30s");
418418+ assert_eq!(format_duration(Duration::from_secs(60)), "1m");
419419+ assert_eq!(format_duration(Duration::from_secs(3600)), "1h");
420420+ assert_eq!(format_duration(Duration::from_secs(90)), "90s");
421421+ assert_eq!(format_duration(Duration::from_millis(500)), "500ms");
422422+ }
423423+424424+ #[test]
425425+ fn test_config_to_env_vars() {
426426+ let config = TapConfig::builder()
427427+ .database_url("sqlite://./test.db")
428428+ .bind(":3000")
429429+ .signal_collection("app.bsky.feed.post")
430430+ .collection_filters(vec!["app.bsky.feed.post".into(), "app.bsky.graph.*".into()])
431431+ .disable_acks(true)
432432+ .build();
433433+434434+ let vars = config.to_env_vars();
435435+ assert!(vars.contains(&("TAP_DATABASE_URL".into(), "sqlite://./test.db".into())));
436436+ assert!(vars.contains(&("TAP_BIND".into(), ":3000".into())));
437437+ assert!(vars.contains(&("TAP_SIGNAL_COLLECTION".into(), "app.bsky.feed.post".into())));
438438+ assert!(vars.contains(&(
439439+ "TAP_COLLECTION_FILTERS".into(),
440440+ "app.bsky.feed.post,app.bsky.graph.*".into()
441441+ )));
442442+ assert!(vars.contains(&("TAP_DISABLE_ACKS".into(), "true".into())));
443443+ }
444444+}
+67
src/error.rs
···11+//! Error types for the tapped crate.
22+33+use std::io;
44+55+/// The error type for tapped operations.
66+#[derive(Debug, thiserror::Error)]
77+#[non_exhaustive]
88+pub enum Error {
99+ /// I/O error (process/file operations)
1010+ #[error("I/O error: {0}")]
1111+ Io(#[from] io::Error),
1212+1313+ /// HTTP client error
1414+ #[error("HTTP error: {0}")]
1515+ Http(#[from] reqwest::Error),
1616+1717+ /// WebSocket error
1818+ #[error("WebSocket error: {0}")]
1919+ WebSocket(#[from] Box<tokio_tungstenite::tungstenite::Error>),
2020+2121+ /// JSON serialisation/deserialisation error
2222+ #[error("JSON error: {0}")]
2323+ Json(#[from] serde_json::Error),
2424+2525+ /// API error returned by tap server
2626+ #[error("API error (status {status}): {message}")]
2727+ Api {
2828+ /// HTTP status code
2929+ status: u16,
3030+ /// Error message from server
3131+ message: String,
3232+ },
3333+3434+ /// Failed to start the tap process
3535+ #[error("Failed to start tap process: {message}")]
3636+ ProcessStart {
3737+ /// Description of the failure
3838+ message: String,
3939+ },
4040+4141+ /// The tap process exited unexpectedly
4242+ #[error("Tap process exited with code: {code:?}")]
4343+ ProcessExited {
4444+ /// Exit code, if available
4545+ code: Option<i32>,
4646+ },
4747+4848+ /// The event channel was closed
4949+ #[error("Event channel closed")]
5050+ ChannelClosed,
5151+5252+ /// WebSocket is not available because tap is in webhook mode
5353+ #[error("WebSocket not available: tap is in webhook mode")]
5454+ WebhookModeActive,
5555+5656+ /// Invalid URL provided
5757+ #[error("Invalid URL: {0}")]
5858+ InvalidUrl(String),
5959+6060+ /// Operation timed out
6161+ #[error("Operation timed out")]
6262+ Timeout,
6363+6464+ /// URL parse error
6565+ #[error("URL parse error: {0}")]
6666+ UrlParse(#[from] url::ParseError),
6767+}
+120
src/handle.rs
···11+//! Convenience type combining process and client.
22+33+use std::ops::Deref;
44+use std::path::Path;
55+66+use crate::client::TapClient;
77+use crate::config::TapConfig;
88+use crate::process::TapProcess;
99+use crate::Result;
1010+1111+/// A convenience type that owns both a tap process and its client.
1212+///
1313+/// `TapHandle` manages the lifecycle of a tap subprocess and provides
1414+/// access to a [`TapClient`] for interacting with it. When dropped,
1515+/// the process is gracefully shut down.
1616+///
1717+/// # Example
1818+///
1919+/// ```no_run
2020+/// use tapped::{TapHandle, TapConfig};
2121+///
2222+/// #[tokio::main]
2323+/// async fn main() -> tapped::Result<()> {
2424+/// let config = TapConfig::builder()
2525+/// .database_url("sqlite://tap.db")
2626+/// .build();
2727+///
2828+/// // Spawn tap and get a handle
2929+/// let handle = TapHandle::spawn_default(config).await?;
3030+///
3131+/// // Use the client methods directly on the handle
3232+/// handle.health().await?;
3333+///
3434+/// let mut channel = handle.channel().await?;
3535+/// while let Ok(event) = channel.recv().await {
3636+/// println!("Event: {:?}", event.event);
3737+/// }
3838+///
3939+/// Ok(())
4040+/// }
4141+/// ```
4242+pub struct TapHandle {
4343+ process: TapProcess,
4444+ client: TapClient,
4545+}
4646+4747+impl TapHandle {
4848+ /// Spawn a tap process at the given path and create a client for it.
4949+ ///
5050+ /// This combines [`TapProcess::spawn`] and [`TapClient`] creation into
5151+ /// a single convenient call.
5252+ ///
5353+ /// # Arguments
5454+ ///
5555+ /// * `path` - Path to the tap binary
5656+ /// * `config` - Configuration for the tap instance
5757+ ///
5858+ /// # Errors
5959+ ///
6060+ /// Returns an error if the process fails to start or become healthy.
6161+ pub async fn spawn(path: impl AsRef<Path>, config: TapConfig) -> Result<Self> {
6262+ let process = TapProcess::spawn(path, config).await?;
6363+ let client = process.client()?;
6464+ Ok(Self { process, client })
6565+ }
6666+6767+ /// Spawn a tap process using the default binary location.
6868+ ///
6969+ /// This looks for `tap` in the current directory first, then on the PATH.
7070+ /// Combines [`TapProcess::spawn_default`] and [`TapClient`] creation.
7171+ ///
7272+ /// # Arguments
7373+ ///
7474+ /// * `config` - Configuration for the tap instance
7575+ ///
7676+ /// # Errors
7777+ ///
7878+ /// Returns an error if no tap binary is found or it fails to start.
7979+ pub async fn spawn_default(config: TapConfig) -> Result<Self> {
8080+ let process = TapProcess::spawn_default(config).await?;
8181+ let client = process.client()?;
8282+ Ok(Self { process, client })
8383+ }
8484+8585+ /// Get a reference to the underlying process.
8686+ pub fn process(&self) -> &TapProcess {
8787+ &self.process
8888+ }
8989+9090+ /// Get a mutable reference to the underlying process.
9191+ pub fn process_mut(&mut self) -> &mut TapProcess {
9292+ &mut self.process
9393+ }
9494+9595+ /// Get a reference to the client.
9696+ pub fn client(&self) -> &TapClient {
9797+ &self.client
9898+ }
9999+100100+ /// Check if the tap process is still running.
101101+ pub fn is_running(&mut self) -> bool {
102102+ self.process.is_running()
103103+ }
104104+105105+ /// Gracefully shut down the tap process.
106106+ ///
107107+ /// This sends SIGTERM and waits for the process to exit, then
108108+ /// sends SIGKILL if it doesn't exit within the shutdown timeout.
109109+ pub async fn shutdown(&mut self) -> Result<()> {
110110+ self.process.shutdown().await
111111+ }
112112+}
113113+114114+impl Deref for TapHandle {
115115+ type Target = TapClient;
116116+117117+ fn deref(&self) -> &Self::Target {
118118+ &self.client
119119+ }
120120+}
+63
src/lib.rs
···11+//! # tapped
22+//!
33+//! A Rust wrapper for the `tap` ATProto sync utility.
44+//!
55+//! Tap simplifies ATProto sync by handling the firehose connection, verification,
66+//! backfill, and filtering. This crate provides an idiomatic async Rust interface
77+//! to tap's HTTP API and WebSocket event stream.
88+//!
99+//! ## Features
1010+//!
1111+//! - Connect to an existing tap instance or spawn one as a subprocess
1212+//! - Strongly-typed configuration with builder pattern
1313+//! - Async event streaming with automatic acknowledgment
1414+//! - Full HTTP API coverage for repo management and statistics
1515+//!
1616+//! ## Example
1717+//!
1818+//! ```no_run
1919+//! use tapped::{TapClient, TapConfig, Result};
2020+//!
2121+//! #[tokio::main]
2222+//! async fn main() -> Result<()> {
2323+//! // Connect to an existing tap instance
2424+//! let client = TapClient::new("http://localhost:2480")?;
2525+//!
2626+//! // Check health
2727+//! client.health().await?;
2828+//!
2929+//! // Add repos to track
3030+//! client.add_repos(&["did:plc:example1234567890abc"]).await?;
3131+//!
3232+//! // Stream events
3333+//! let mut receiver = client.channel().await?;
3434+//! while let Ok(event) = receiver.recv().await {
3535+//! println!("Received event: {:?}", event);
3636+//! // Event is automatically acknowledged when dropped
3737+//! }
3838+//!
3939+//! Ok(())
4040+//! }
4141+//! ```
4242+4343+mod channel;
4444+mod client;
4545+mod config;
4646+mod error;
4747+mod handle;
4848+mod process;
4949+mod types;
5050+5151+pub use channel::{EventReceiver, ReceivedEvent};
5252+pub use client::TapClient;
5353+pub use config::{LogLevel, TapConfig, TapConfigBuilder};
5454+pub use error::Error;
5555+pub use handle::TapHandle;
5656+pub use process::TapProcess;
5757+pub use types::{
5858+ AccountStatus, Cursors, DidDocument, Event, IdentityEvent, Record, RecordAction, RecordEvent,
5959+ RepoInfo, RepoState, Service, VerificationMethod,
6060+};
6161+6262+/// A specialised Result type for tapped operations.
6363+pub type Result<T> = std::result::Result<T, Error>;
+236
src/process.rs
···11+//! Subprocess management for spawning and managing a tap process.
22+33+use std::path::{Path, PathBuf};
44+use std::process::Stdio;
55+use std::time::Duration;
66+77+use tokio::process::{Child, Command};
88+use tokio::time::{sleep, timeout};
99+use url::Url;
1010+1111+use crate::{Error, Result, TapClient, TapConfig};
1212+1313+/// A running tap process.
1414+///
1515+/// The process is gracefully shut down when this struct is dropped.
1616+pub struct TapProcess {
1717+ child: Child,
1818+ url: Url,
1919+ config: TapConfig,
2020+}
2121+2222+impl TapProcess {
2323+ /// Spawn a tap process at the given path with the given configuration.
2424+ ///
2525+ /// The path should point to the `tap` binary. The process will be started
2626+ /// with the `run` subcommand and configuration passed as environment
2727+ /// variables.
2828+ ///
2929+ /// # Example
3030+ ///
3131+ /// ```no_run
3232+ /// # async fn example() -> tapped::Result<()> {
3333+ /// use tapped::{TapProcess, TapConfig};
3434+ ///
3535+ /// let config = TapConfig::builder()
3636+ /// .database_url("sqlite://./my-tap.db")
3737+ /// .build();
3838+ ///
3939+ /// let process = TapProcess::spawn("./tap", config).await?;
4040+ /// let client = process.client()?;
4141+ /// # Ok(())
4242+ /// # }
4343+ /// ```
4444+ pub async fn spawn(path: impl AsRef<Path>, config: TapConfig) -> Result<Self> {
4545+ let path = path.as_ref();
4646+4747+ if !path.exists() {
4848+ return Err(Error::ProcessStart {
4949+ message: format!("tap binary not found at: {}", path.display()),
5050+ });
5151+ }
5252+5353+ Self::spawn_inner(path.to_path_buf(), config).await
5454+ }
5555+5656+ /// Spawn a tap process using the default path discovery.
5757+ ///
5858+ /// Checks for `./tap` first, then falls back to `tap` on PATH.
5959+ ///
6060+ /// # Example
6161+ ///
6262+ /// ```no_run
6363+ /// # async fn example() -> tapped::Result<()> {
6464+ /// use tapped::{TapProcess, TapConfig};
6565+ ///
6666+ /// let config = TapConfig::builder()
6767+ /// .database_url("sqlite://./my-tap.db")
6868+ /// .build();
6969+ ///
7070+ /// let process = TapProcess::spawn_default(config).await?;
7171+ /// # Ok(())
7272+ /// # }
7373+ /// ```
7474+ pub async fn spawn_default(config: TapConfig) -> Result<Self> {
7575+ let local_tap = PathBuf::from("./tap");
7676+ if local_tap.exists() {
7777+ return Self::spawn_inner(local_tap, config).await;
7878+ }
7979+8080+ Self::spawn_inner(PathBuf::from("tap"), config).await
8181+ }
8282+8383+ async fn spawn_inner(path: PathBuf, config: TapConfig) -> Result<Self> {
8484+ let bind = config.bind.clone().unwrap_or_else(|| ":2480".to_string());
8585+ let port = parse_port(&bind).unwrap_or(2480);
8686+8787+ // Spawned processes are always local - connect to localhost regardless
8888+ // of what interface tap binds to.
8989+ let url: Url = format!("http://127.0.0.1:{}", port)
9090+ .parse()
9191+ .map_err(|_| Error::InvalidUrl(format!("http://127.0.0.1:{}", port)))?;
9292+9393+ let mut cmd = Command::new(&path);
9494+ cmd.arg("run").stdin(Stdio::null()).kill_on_drop(true);
9595+9696+ if config.inherit_stdio() {
9797+ cmd.stdout(Stdio::inherit()).stderr(Stdio::inherit());
9898+ } else {
9999+ cmd.stdout(Stdio::null()).stderr(Stdio::null());
100100+ }
101101+102102+ for (key, value) in config.to_env_vars() {
103103+ cmd.env(key, value);
104104+ }
105105+106106+ let child = cmd.spawn().map_err(|e| Error::ProcessStart {
107107+ message: format!("Failed to spawn {}: {}", path.display(), e),
108108+ })?;
109109+110110+ let mut process = Self {
111111+ child,
112112+ url: url.clone(),
113113+ config,
114114+ };
115115+116116+ if let Err(e) = process.wait_for_healthy().await {
117117+ // Kill the process if health check fails
118118+ let _ = process.child.kill().await;
119119+ return Err(e);
120120+ }
121121+122122+ Ok(process)
123123+ }
124124+125125+ /// Wait for the tap process to become healthy.
126126+ async fn wait_for_healthy(&self) -> Result<()> {
127127+ let startup_timeout = self.config.startup_timeout();
128128+ let client = reqwest::Client::builder()
129129+ .timeout(Duration::from_secs(2))
130130+ .build()
131131+ .map_err(Error::Http)?;
132132+133133+ let health_url = self.url.join("/health")?;
134134+135135+ let result = timeout(startup_timeout, async {
136136+ loop {
137137+ match client.get(health_url.clone()).send().await {
138138+ Ok(resp) if resp.status().is_success() => return Ok(()),
139139+ _ => sleep(Duration::from_millis(100)).await,
140140+ }
141141+ }
142142+ })
143143+ .await;
144144+145145+ match result {
146146+ Ok(Ok(())) => Ok(()),
147147+ Ok(Err(e)) => Err(e),
148148+ Err(_) => Err(Error::Timeout),
149149+ }
150150+ }
151151+152152+ /// Get the URL of the running tap instance.
153153+ pub fn url(&self) -> &Url {
154154+ &self.url
155155+ }
156156+157157+ /// Create a client connected to this tap process.
158158+ pub fn client(&self) -> Result<TapClient> {
159159+ TapClient::with_config(self.url.as_str(), &self.config)
160160+ }
161161+162162+ /// Check if the process is still running.
163163+ pub fn is_running(&mut self) -> bool {
164164+ matches!(self.child.try_wait(), Ok(None))
165165+ }
166166+167167+ /// Gracefully shut down the tap process.
168168+ ///
169169+ /// Sends SIGTERM and waits up to `shutdown_timeout` before sending SIGKILL.
170170+ pub async fn shutdown(&mut self) -> Result<()> {
171171+ #[cfg(unix)]
172172+ {
173173+ // Send SIGTERM
174174+ if let Some(pid) = self.child.id() {
175175+ unsafe {
176176+ libc::kill(pid as i32, libc::SIGTERM);
177177+ }
178178+ }
179179+180180+ // Wait for graceful shutdown
181181+ let shutdown_timeout = self.config.shutdown_timeout();
182182+ match timeout(shutdown_timeout, self.child.wait()).await {
183183+ Ok(Ok(_)) => return Ok(()),
184184+ Ok(Err(e)) => return Err(Error::Io(e)),
185185+ Err(_) => {
186186+ // Timeout, send SIGKILL
187187+ let _ = self.child.kill().await;
188188+ }
189189+ }
190190+ }
191191+192192+ #[cfg(not(unix))]
193193+ {
194194+ let _ = self.child.kill().await;
195195+ }
196196+197197+ Ok(())
198198+ }
199199+}
200200+201201+impl Drop for TapProcess {
202202+ fn drop(&mut self) {
203203+ #[cfg(unix)]
204204+ {
205205+ if let Some(pid) = self.child.id() {
206206+ unsafe {
207207+ libc::kill(pid as i32, libc::SIGTERM);
208208+ }
209209+ }
210210+ }
211211+ }
212212+}
213213+214214+/// Extract the port from a bind address.
215215+///
216216+/// The bind format is passed directly to tap (Go's net.Listen format).
217217+/// We just need the port to construct a localhost URL for health checks.
218218+fn parse_port(bind: &str) -> Option<u16> {
219219+ // The port is always after the last colon, even for IPv6 like [::1]:2480
220220+ bind.rsplit(':').next()?.parse().ok()
221221+}
222222+223223+#[cfg(test)]
224224+mod tests {
225225+ use super::*;
226226+227227+ #[test]
228228+ fn test_parse_port() {
229229+ assert_eq!(parse_port(":2480"), Some(2480));
230230+ assert_eq!(parse_port("127.0.0.1:3000"), Some(3000));
231231+ assert_eq!(parse_port("0.0.0.0:8080"), Some(8080));
232232+ assert_eq!(parse_port("[::1]:2480"), Some(2480));
233233+ assert_eq!(parse_port("[2001:db8::1]:8080"), Some(8080));
234234+ assert_eq!(parse_port("invalid"), None);
235235+ }
236236+}