# tapped [![Crates.io Version](https://img.shields.io/crates/v/tapped)](https://crates.io/crates/tapped) [![docs.rs](https://img.shields.io/docsrs/tapped)](https://docs.rs/tapped) A Rust wrapper library for the [`tap`](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) ATProto sync utility. `tapped` provides an idiomatic async Rust interface for spawning and communicating with a `tap` subprocess, making it easy to build applications that sync data from the ATProto network. ## Features - Spawn and manage `tap` subprocesses with graceful shutdown - Strongly-typed configuration for all tap envvars - Strongly-typed async Rust functions covering all of tap's HTTP API endpoints - WebSocket-based event channel with automatic acknowledgment ## Installation Add to your `Cargo.toml`: ```toml [dependencies] tapped = "0.3" ``` You'll also need the `tap` binary. Build it from the [indigo repository](https://github.com/bluesky-social/indigo): ```bash cd cmd/tap && go build ``` `tapped` has been most recently tested against: ``` tap version v0.0.0-20260120225912-12d69fa4d209-rev-12d69fa ``` ## Quick Start ```rust use tapped::{TapProcess, TapConfig, Event}; #[tokio::main] async fn main() -> tapped::Result<()> { let config = TapConfig::builder() .database_url("sqlite://tap.db") .collection_filter("app.bsky.feed.post") .build(); // Spawn tap - looks in current directory first, then PATH let process = TapProcess::spawn_default(config).await?; let client = process.client()?; // Subscribe to events let mut channel = client.channel().await?; while let Ok(received) = channel.recv().await { match &received.event { Event::Record(record) => { println!("[{:?}] {}/{}", record.action, record.collection, record.rkey ); } Event::Identity(identity) => { println!("Identity: {} -> {}", identity.did, identity.handle); } } // Event is auto-acknowledged when `received` is dropped } Ok(()) } ``` ## Usage Patterns ### Connect to Existing Instance If you have a tap instance already running: ```rust use tapped::TapClient; let client = TapClient::new("http://localhost:2480")?; client.health().await?; ``` ### Spawn an Instance ```rust use tapped::{TapProcess, TapConfig}; let config = TapConfig::builder() .database_url("sqlite://app.db") .full_network(false) .build(); // If you need a custom binary path, use spawn() instead: // let process = TapProcess::spawn("/path/to/tap", config).await?; let process = TapProcess::spawn_default(config).await?; let client = process.client()?; // Use the client client.health().await?; let count = client.repo_count().await?; println!("Tracking {} repos", count); ``` ### Configuration Options ```rust use tapped::{TapConfig, LogLevel}; use std::time::Duration; let config = TapConfig::builder() // Database .database_url("sqlite://tap.db") .max_db_conns(10) // Network .bind("127.0.0.1:2480") .relay_url("wss://bsky.network".parse().unwrap()) .plc_url("https://plc.directory".parse().unwrap()) // Filtering .signal_collection("app.bsky.feed.post") .collection_filter("app.bsky.feed.post") .collection_filter("app.bsky.feed.like") .full_network(false) // Performance .firehose_parallelism(10) .resync_parallelism(5) .outbox_parallelism(10) .outbox_capacity(10000) // Timeouts .repo_fetch_timeout(Duration::from_secs(30)) .startup_timeout(Duration::from_secs(60)) .shutdown_timeout(Duration::from_secs(10)) // Logging .log_level(LogLevel::Info) .build(); ``` ### Working with Events Events are automatically acknowledged when dropped: ```rust use tapped::{Event, RecordAction}; let mut channel = client.channel().await?; while let Ok(received) = channel.recv().await { match &received.event { Event::Record(record) => { match record.action { RecordAction::Create => { // Access the raw JSON as a string if let Some(json) = record.record_as_str() { println!("Raw JSON: {}", json); } // Or deserialize to a specific type // let post: MyPostType = record.deserialize_as()?; } RecordAction::Update => { /* ... */ } RecordAction::Delete => { /* ... */ } _ => {} } } Event::Identity(identity) => { println!("{} is now @{}", identity.did, identity.handle); } } // Ack sent automatically here when `received` goes out of scope } ``` ### Managing Repositories ```rust // Add repos to track client.add_repos(&["did:plc:abc123", "did:plc:def456"]).await?; // Remove repos client.remove_repos(&["did:plc:abc123"]).await?; // Get info about a specific repo let info = client.repo_info("did:plc:def456").await?; println!("State: {:?}, Records: {}", info.state, info.records); // Resolve a DID to its document let doc = client.resolve_did("did:plc:def456").await?; println!("Handles: {:?}", doc.also_known_as); ``` ### Checking Stats ```rust let repos = client.repo_count().await?; let records = client.record_count().await?; let outbox = client.outbox_buffer().await?; let resync = client.resync_buffer().await?; let cursors = client.cursors().await?; println!("Tracking {} repos with {} records", repos, records); println!("Outbox buffer: {}, Resync buffer: {}", outbox, resync); println!("Firehose cursor: {:?}", cursors.firehose); ``` ## Example: Syncing standard.site Records with Schema Generation and Validation The repository includes a complete example demonstrating how to sync and validate ATProto records using `tapped` together with the [jacquard](https://crates.io/crates/jacquard) crates. The jacquard ecosystem provides runtime validation of records against their lexicon constraints, and the ability to generate Rust structs from lexicon JSON files. ``` tapped/ ├── tapped/ # The main tapped library ├── lexicons-example/ # Generated types from lexicon schemas │ ├── lexicons/ # Source lexicon JSON files │ │ ├── site.standard.publication.json │ │ ├── site.standard.document.json │ │ └── ... │ └── src/ # Generated Rust code └── standard-site-sync/ # Example binary using both packages ``` These files were generated like so: ```bash # Install the code generator cargo install jacquard-lexgen jacquard-codegen -i lexicons-example/lexicons -o lexicons-example/src ``` This produces strongly-typed structs with built-in validation. For example, the `site.standard.publication` lexicon becomes: ```rust use lexicons_example::site_standard::publication::Publication; // Deserialize from JSON let publication: Publication = serde_json::from_str(json)?; // Validate against lexicon constraints (max length, grapheme limits, etc.) publication.validate()?; // Access typed fields println!("Name: {}", publication.name.as_str()); println!("URL: {}", publication.url.as_str()); ``` For more detail see `process_record_event` [in `main.rs`](https://tangled.org/octet-stream.net/tapped/blob/main/standard-site-sync/src/main.rs#L140-201). ## License MIT