···14141515It is a superset of the reference PDS, including: passkeys and 2FA (WebAuthn/FIDO2, TOTP, backup codes, trusted devices), SSO login and signup, did:web support (PDS-hosted subdomains or bring-your-own), multi-channel communication (email, discord, telegram, signal) for verification and alerts, granular OAuth scopes with a consent UI showing human-readable descriptions, app passwords with granular permissions (read-only, post-only, or custom scopes), account delegation (letting others manage an account with configurable permission levels), automatic backups (configurable retention and frequency, one-click restore), and a built-in web UI for account management, OAuth consent, repo browsing, and admin.
16161717-The PDS itself is a single small binary with no node/npm runtime. It requires postgres and stores blobs on the local filesystem. Valkey is optional (enables distributed rate limiting for multi-node setups).
1717+The PDS itself is a single small binary with no node/npm runtime. It requires postgres. Blobs are stored on the local filesystem by default (S3 optional). Valkey is optional (supported as an alternative to the built-in cache).
18181919## Quick Start
2020
···1212 let (user_did, _) = setup_new_user("search-target").await;
1313 let mut found = false;
1414 let mut cursor: Option<String> = None;
1515- for _ in 0..10 {
1515+ for _ in 0..100 {
1616 let url = match &cursor {
1717 Some(c) => format!(
1818 "{}/xrpc/com.atproto.admin.searchAccounts?limit=100&cursor={}",
+270-72
crates/tranquil-pds/tests/common/mod.rs
···99use serde_json::{Value, json};
1010use sqlx::postgres::PgPoolOptions;
1111use std::collections::HashMap;
1212+use std::net::SocketAddr;
1213use std::path::PathBuf;
1314use std::sync::{Arc, OnceLock, RwLock};
1415#[allow(unused_imports)]
1516use std::time::Duration;
1617use tokio::net::TcpListener;
1718use tokio_util::sync::CancellationToken;
1919+use tranquil_pds::cache::{Cache, DistributedRateLimiter};
1820use tranquil_pds::state::AppState;
1921use wiremock::matchers::{method, path};
2022use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate};
···2527static MOCK_PLC: OnceLock<MockServer> = OnceLock::new();
2628static TEST_DB_POOL: OnceLock<sqlx::PgPool> = OnceLock::new();
2729static TEST_TEMP_DIR: OnceLock<PathBuf> = OnceLock::new();
3030+static CLUSTER: OnceLock<Vec<ServerInstance>> = OnceLock::new();
3131+3232+#[allow(dead_code)]
3333+pub struct ServerConfig {
3434+ pub pool: sqlx::PgPool,
3535+ pub cache: Option<(Arc<dyn Cache>, Arc<dyn DistributedRateLimiter>)>,
3636+}
3737+3838+#[allow(dead_code)]
3939+#[derive(Clone)]
4040+pub struct ServerInstance {
4141+ pub url: String,
4242+ pub port: u16,
4343+ pub cache: Option<Arc<dyn Cache>>,
4444+ pub distributed_rate_limiter: Option<Arc<dyn DistributedRateLimiter>>,
4545+}
28462947#[cfg(all(not(feature = "external-infra"), feature = "s3-storage"))]
3048use testcontainers::GenericImage;
···139157 std::env::var("DATABASE_URL").expect("DATABASE_URL must be set when using external infra");
140158 let plc_url = setup_mock_plc_directory().await;
141159 unsafe {
142142- if std::env::var("S3_ENDPOINT").is_ok() {
143143- let s3_endpoint = std::env::var("S3_ENDPOINT").unwrap();
144144- std::env::set_var("BLOB_STORAGE_BACKEND", "s3");
145145- std::env::set_var("BACKUP_STORAGE_BACKEND", "s3");
146146- std::env::set_var("BACKUP_S3_BUCKET", "test-backups");
147147- std::env::set_var(
148148- "S3_BUCKET",
149149- std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()),
150150- );
151151- std::env::set_var(
152152- "AWS_ACCESS_KEY_ID",
153153- std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()),
154154- );
155155- std::env::set_var(
156156- "AWS_SECRET_ACCESS_KEY",
157157- std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()),
158158- );
159159- std::env::set_var(
160160- "AWS_REGION",
161161- std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()),
162162- );
163163- std::env::set_var("S3_ENDPOINT", &s3_endpoint);
164164- } else if std::env::var("BLOB_STORAGE_PATH").is_ok() {
165165- std::env::set_var("BLOB_STORAGE_BACKEND", "filesystem");
166166- std::env::set_var("BACKUP_STORAGE_BACKEND", "filesystem");
167167- } else {
168168- panic!("Either S3_ENDPOINT or BLOB_STORAGE_PATH must be set for external-infra");
169169- }
170170- std::env::set_var("MAX_IMPORT_SIZE", "100000000");
171171- std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
160160+ configure_external_storage_env();
172161 std::env::set_var("PLC_DIRECTORY_URL", &plc_url);
173162 }
174174- let mock_server = MockServer::start().await;
175175- setup_mock_appview(&mock_server).await;
176176- let mock_uri = mock_server.uri();
177177- let mock_host = mock_uri.strip_prefix("http://").unwrap_or(&mock_uri);
178178- let mock_did = format!("did:web:{}", mock_host.replace(':', "%3A"));
179179- setup_mock_did_document(&mock_server, &mock_did, &mock_uri).await;
180180- MOCK_APPVIEW.set(mock_server).ok();
163163+ register_mock_appview().await;
181164 spawn_app(database_url).await
182165}
183166···199182 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
200183 std::env::set_var("PLC_DIRECTORY_URL", &plc_url);
201184 }
202202- let mock_server = MockServer::start().await;
203203- setup_mock_appview(&mock_server).await;
204204- let mock_uri = mock_server.uri();
205205- let mock_host = mock_uri.strip_prefix("http://").unwrap_or(&mock_uri);
206206- let mock_did = format!("did:web:{}", mock_host.replace(':', "%3A"));
207207- setup_mock_did_document(&mock_server, &mock_did, &mock_uri).await;
208208- MOCK_APPVIEW.set(mock_server).ok();
185185+ register_mock_appview().await;
209186 let container = Postgres::default()
210187 .with_tag("18-alpine")
211188 .with_label("tranquil_pds_test", "true")
···275252 .bucket("test-backups")
276253 .send()
277254 .await;
278278- let mock_server = MockServer::start().await;
279279- setup_mock_appview(&mock_server).await;
280280- let mock_uri = mock_server.uri();
281281- let mock_host = mock_uri.strip_prefix("http://").unwrap_or(&mock_uri);
282282- let mock_did = format!("did:web:{}", mock_host.replace(':', "%3A"));
283283- setup_mock_did_document(&mock_server, &mock_did, &mock_uri).await;
284284- MOCK_APPVIEW.set(mock_server).ok();
255255+ register_mock_appview().await;
285256 S3_CONTAINER.set(s3_container).ok();
286257 let container = Postgres::default()
287258 .with_tag("18-alpine")
···324295325296async fn setup_mock_appview(_mock_server: &MockServer) {}
326297298298+async fn register_mock_appview() {
299299+ let mock_server = MockServer::start().await;
300300+ setup_mock_appview(&mock_server).await;
301301+ let mock_uri = mock_server.uri();
302302+ let mock_host = mock_uri.strip_prefix("http://").unwrap_or(&mock_uri);
303303+ let mock_did = format!("did:web:{}", mock_host.replace(':', "%3A"));
304304+ setup_mock_did_document(&mock_server, &mock_did, &mock_uri).await;
305305+ MOCK_APPVIEW.set(mock_server).ok();
306306+}
307307+308308+unsafe fn configure_external_storage_env() {
309309+ unsafe {
310310+ if std::env::var("S3_ENDPOINT").is_ok() {
311311+ let s3_endpoint = std::env::var("S3_ENDPOINT").unwrap();
312312+ std::env::set_var("BLOB_STORAGE_BACKEND", "s3");
313313+ std::env::set_var("BACKUP_STORAGE_BACKEND", "s3");
314314+ std::env::set_var("BACKUP_S3_BUCKET", "test-backups");
315315+ std::env::set_var(
316316+ "S3_BUCKET",
317317+ std::env::var("S3_BUCKET").unwrap_or_else(|_| "test-bucket".to_string()),
318318+ );
319319+ std::env::set_var(
320320+ "AWS_ACCESS_KEY_ID",
321321+ std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "minioadmin".to_string()),
322322+ );
323323+ std::env::set_var(
324324+ "AWS_SECRET_ACCESS_KEY",
325325+ std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_else(|_| "minioadmin".to_string()),
326326+ );
327327+ std::env::set_var(
328328+ "AWS_REGION",
329329+ std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string()),
330330+ );
331331+ std::env::set_var("S3_ENDPOINT", &s3_endpoint);
332332+ } else {
333333+ let process_dir = std::env::temp_dir().join(format!(
334334+ "tranquil-pds-test-{}",
335335+ std::process::id()
336336+ ));
337337+ let blob_path = process_dir.join("blobs");
338338+ let backup_path = process_dir.join("backups");
339339+ std::fs::create_dir_all(&blob_path).expect("Failed to create blob directory");
340340+ std::fs::create_dir_all(&backup_path).expect("Failed to create backup directory");
341341+ TEST_TEMP_DIR.set(process_dir).ok();
342342+ std::env::set_var("BLOB_STORAGE_BACKEND", "filesystem");
343343+ std::env::set_var("BLOB_STORAGE_PATH", blob_path.to_str().unwrap());
344344+ std::env::set_var("BACKUP_STORAGE_BACKEND", "filesystem");
345345+ std::env::set_var("BACKUP_STORAGE_PATH", backup_path.to_str().unwrap());
346346+ }
347347+ std::env::set_var("MAX_IMPORT_SIZE", "100000000");
348348+ std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
349349+ }
350350+}
351351+327352type PlcOperationStore = Arc<RwLock<HashMap<String, Value>>>;
328353329354struct PlcPostResponder {
···515540 plc_url
516541}
517542518518-async fn spawn_app(database_url: String) -> String {
543543+async fn spawn_server(config: ServerConfig) -> ServerInstance {
519544 use tranquil_pds::rate_limit::RateLimiters;
520520- let pool = PgPoolOptions::new()
521521- .max_connections(10)
522522- .acquire_timeout(std::time::Duration::from_secs(30))
523523- .connect(&database_url)
524524- .await
525525- .expect("Failed to connect to Postgres. Make sure the database is running.");
526526- sqlx::migrate!("./migrations")
527527- .run(&pool)
528528- .await
529529- .expect("Failed to run migrations");
530530- let test_pool = PgPoolOptions::new()
531531- .max_connections(5)
532532- .acquire_timeout(std::time::Duration::from_secs(30))
533533- .connect(&database_url)
534534- .await
535535- .expect("Failed to create test pool");
536536- TEST_DB_POOL.set(test_pool).ok();
537545 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
538546 let addr = listener.local_addr().unwrap();
539539- APP_PORT.set(addr.port()).ok();
540547 unsafe {
541548 std::env::set_var("PDS_HOSTNAME", format!("pds.test:{}", addr.port()));
542549 }
···547554 .with_email_update_limit(10000)
548555 .with_oauth_authorize_limit(10000)
549556 .with_oauth_token_limit(10000);
550550- let state = AppState::from_db(pool, CancellationToken::new())
557557+ let cache_refs = config.cache.as_ref().map(|(c, r)| (c.clone(), r.clone()));
558558+ let mut state = AppState::from_db(config.pool, CancellationToken::new())
551559 .await
552560 .with_rate_limiters(rate_limiters);
561561+ if let Some((cache, distributed_rate_limiter)) = config.cache {
562562+ state = state.with_cache(cache, distributed_rate_limiter);
563563+ }
553564 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await;
554565 let app = tranquil_pds::app(state);
555566 tokio::spawn(async move {
556567 axum::serve(listener, app).await.unwrap();
557568 });
558558- format!("http://localhost:{}", addr.port())
569569+ let (cache, distributed_rate_limiter) = cache_refs
570570+ .map(|(c, r)| (Some(c), Some(r)))
571571+ .unwrap_or((None, None));
572572+ ServerInstance {
573573+ url: format!("http://localhost:{}", addr.port()),
574574+ port: addr.port(),
575575+ cache,
576576+ distributed_rate_limiter,
577577+ }
578578+}
579579+580580+async fn spawn_app(database_url: String) -> String {
581581+ let pool = PgPoolOptions::new()
582582+ .max_connections(10)
583583+ .acquire_timeout(std::time::Duration::from_secs(30))
584584+ .connect(&database_url)
585585+ .await
586586+ .expect("Failed to connect to Postgres. Make sure the database is running.");
587587+ sqlx::migrate!("./migrations")
588588+ .run(&pool)
589589+ .await
590590+ .expect("Failed to run migrations");
591591+ let test_pool = PgPoolOptions::new()
592592+ .max_connections(2)
593593+ .acquire_timeout(std::time::Duration::from_secs(30))
594594+ .connect(&database_url)
595595+ .await
596596+ .expect("Failed to create test pool");
597597+ TEST_DB_POOL.set(test_pool).ok();
598598+ let instance = spawn_server(ServerConfig { pool, cache: None }).await;
599599+ APP_PORT.set(instance.port).ok();
600600+ instance.url
601601+}
602602+603603+#[allow(dead_code)]
604604+pub async fn spawn_cluster(database_url: String, node_count: usize) -> Vec<ServerInstance> {
605605+ use tranquil_ripple::{RippleConfig, RippleEngine};
606606+607607+ let pool = PgPoolOptions::new()
608608+ .max_connections(10)
609609+ .acquire_timeout(std::time::Duration::from_secs(30))
610610+ .connect(&database_url)
611611+ .await
612612+ .expect("Failed to connect to Postgres for cluster");
613613+ sqlx::migrate!("./migrations")
614614+ .run(&pool)
615615+ .await
616616+ .expect("Failed to run migrations for cluster");
617617+ let test_pool = PgPoolOptions::new()
618618+ .max_connections(2)
619619+ .acquire_timeout(std::time::Duration::from_secs(30))
620620+ .connect(&database_url)
621621+ .await
622622+ .expect("Failed to create test pool for cluster");
623623+ TEST_DB_POOL.set(test_pool).ok();
624624+625625+ let shutdown = CancellationToken::new();
626626+627627+ let mut ripple_nodes: Vec<(Arc<dyn Cache>, Arc<dyn DistributedRateLimiter>)> =
628628+ Vec::with_capacity(node_count);
629629+ let mut bound_addrs: Vec<SocketAddr> = Vec::with_capacity(node_count);
630630+631631+ for i in 0..node_count {
632632+ let config = RippleConfig {
633633+ bind_addr: "127.0.0.1:0".parse().unwrap(),
634634+ seed_peers: bound_addrs.clone(),
635635+ machine_id: i as u64 + 1,
636636+ gossip_interval_ms: 100,
637637+ cache_max_bytes: 64 * 1024 * 1024,
638638+ };
639639+ let (cache, rate_limiter, addr) = RippleEngine::start(config, shutdown.clone())
640640+ .await
641641+ .expect("failed to start ripple node");
642642+ bound_addrs.push(addr);
643643+ ripple_nodes.push((cache, rate_limiter));
644644+ }
645645+646646+ let mut instances: Vec<ServerInstance> = Vec::with_capacity(node_count);
647647+ for (cache, rate_limiter) in ripple_nodes {
648648+ let server_config = ServerConfig {
649649+ pool: pool.clone(),
650650+ cache: Some((cache, rate_limiter)),
651651+ };
652652+ let instance = spawn_server(server_config).await;
653653+ instances.push(instance);
654654+ }
655655+656656+ let first = &instances[0];
657657+ APP_PORT.set(first.port).ok();
658658+659659+ tokio::time::sleep(Duration::from_millis(2000)).await;
660660+661661+ instances
662662+}
663663+664664+#[allow(dead_code)]
665665+pub async fn cluster() -> &'static [ServerInstance] {
666666+ CLUSTER.get_or_init(|| {
667667+ let (tx, rx) = std::sync::mpsc::channel();
668668+ std::thread::spawn(move || {
669669+ unsafe {
670670+ std::env::set_var("TRANQUIL_PDS_ALLOW_INSECURE_SECRETS", "1");
671671+ }
672672+ if std::env::var("DOCKER_HOST").is_err()
673673+ && let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR")
674674+ {
675675+ let podman_sock = std::path::Path::new(&runtime_dir).join("podman/podman.sock");
676676+ if podman_sock.exists() {
677677+ unsafe {
678678+ std::env::set_var(
679679+ "DOCKER_HOST",
680680+ format!("unix://{}", podman_sock.display()),
681681+ );
682682+ }
683683+ }
684684+ }
685685+ let rt = tokio::runtime::Runtime::new().unwrap();
686686+ rt.block_on(async move {
687687+ unsafe {
688688+ std::env::remove_var("DISABLE_RATE_LIMITING");
689689+ }
690690+ let database_url = if has_external_infra() {
691691+ setup_cluster_external_infra().await
692692+ } else {
693693+ setup_cluster_testcontainers().await
694694+ };
695695+ let nodes = spawn_cluster(database_url, 3).await;
696696+ tx.send(nodes).unwrap();
697697+ std::future::pending::<()>().await;
698698+ });
699699+ });
700700+ rx.recv().expect("Failed to start test cluster")
701701+ })
702702+}
703703+704704+async fn setup_cluster_external_infra() -> String {
705705+ let database_url =
706706+ std::env::var("DATABASE_URL").expect("DATABASE_URL must be set when using external infra");
707707+ let plc_url = setup_mock_plc_directory().await;
708708+ unsafe {
709709+ configure_external_storage_env();
710710+ std::env::set_var("PLC_DIRECTORY_URL", &plc_url);
711711+ }
712712+ register_mock_appview().await;
713713+ database_url
714714+}
715715+716716+#[cfg(not(feature = "external-infra"))]
717717+async fn setup_cluster_testcontainers() -> String {
718718+ let temp_dir = std::env::temp_dir().join(format!("tranquil-pds-cluster-{}", uuid::Uuid::new_v4()));
719719+ let blob_path = temp_dir.join("blobs");
720720+ let backup_path = temp_dir.join("backups");
721721+ std::fs::create_dir_all(&blob_path).expect("Failed to create blob temp directory");
722722+ std::fs::create_dir_all(&backup_path).expect("Failed to create backup temp directory");
723723+ TEST_TEMP_DIR.set(temp_dir).ok();
724724+ let plc_url = setup_mock_plc_directory().await;
725725+ unsafe {
726726+ std::env::set_var("BLOB_STORAGE_BACKEND", "filesystem");
727727+ std::env::set_var("BLOB_STORAGE_PATH", blob_path.to_str().unwrap());
728728+ std::env::set_var("BACKUP_STORAGE_BACKEND", "filesystem");
729729+ std::env::set_var("BACKUP_STORAGE_PATH", backup_path.to_str().unwrap());
730730+ std::env::set_var("MAX_IMPORT_SIZE", "100000000");
731731+ std::env::set_var("SKIP_IMPORT_VERIFICATION", "true");
732732+ std::env::set_var("PLC_DIRECTORY_URL", &plc_url);
733733+ }
734734+ register_mock_appview().await;
735735+ let container = Postgres::default()
736736+ .with_tag("18-alpine")
737737+ .with_label("tranquil_pds_test", "true")
738738+ .start()
739739+ .await
740740+ .expect("Failed to start Postgres for cluster");
741741+ let connection_string = format!(
742742+ "postgres://postgres:postgres@127.0.0.1:{}",
743743+ container
744744+ .get_host_port_ipv4(5432)
745745+ .await
746746+ .expect("Failed to get port")
747747+ );
748748+ DB_CONTAINER.set(container).ok();
749749+ connection_string
750750+}
751751+752752+#[cfg(feature = "external-infra")]
753753+async fn setup_cluster_testcontainers() -> String {
754754+ panic!(
755755+ "Testcontainers disabled with external-infra feature. Set DATABASE_URL and BLOB_STORAGE_PATH (or S3_ENDPOINT)."
756756+ );
559757}
560758561759#[allow(dead_code)]
+6-1
crates/tranquil-pds/tests/firehose_validation.rs
···800800801801 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
802802803803- let outdated_cursor = 1i64;
803803+ let pool = get_test_db_pool().await;
804804+ let max_seq: i64 = sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(seq), 0) FROM repo_seq")
805805+ .fetch_one(pool)
806806+ .await
807807+ .unwrap();
808808+ let outdated_cursor = (max_seq - 100).max(1);
804809 let url = format!(
805810 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos?cursor={}",
806811 app_port(),
+4-2
crates/tranquil-pds/tests/repo_blob.rs
···2525async fn test_upload_blob_success() {
2626 let client = client();
2727 let (token, _) = create_account_and_login(&client).await;
2828+ let blob_data = format!("blob-{}", uuid::Uuid::new_v4());
2829 let res = client
2930 .post(format!(
3031 "{}/xrpc/com.atproto.repo.uploadBlob",
···3233 ))
3334 .header(header::CONTENT_TYPE, "text/plain")
3435 .bearer_auth(token)
3535- .body("This is our blob data")
3636+ .body(blob_data)
3637 .send()
3738 .await
3839 .expect("Failed to send request");
3939- assert_eq!(res.status(), StatusCode::OK);
4040+ let status = res.status();
4041 let body: Value = res.json().await.expect("Response was not valid JSON");
4242+ assert_eq!(status, StatusCode::OK, "uploadBlob failed: {body}");
4143 assert!(body["blob"]["ref"]["$link"].as_str().is_some());
4244}
4345
+1001
crates/tranquil-pds/tests/ripple_cluster.rs
···11+mod common;
22+33+use reqwest::StatusCode;
44+use serde_json::json;
55+use std::sync::Arc;
66+use std::time::Duration;
77+use tranquil_pds::cache::{Cache, DistributedRateLimiter};
88+99+async fn poll_until<F, Fut>(max_ms: u64, interval_ms: u64, check_fn: F)
1010+where
1111+ F: Fn() -> Fut,
1212+ Fut: std::future::Future<Output = bool>,
1313+{
1414+ let deadline = tokio::time::Instant::now() + Duration::from_millis(max_ms);
1515+ let interval = Duration::from_millis(interval_ms);
1616+1717+ loop {
1818+ if check_fn().await {
1919+ return;
2020+ }
2121+ if tokio::time::Instant::now() + interval > deadline {
2222+ panic!("poll_until timed out after {max_ms}ms");
2323+ }
2424+ tokio::time::sleep(interval).await;
2525+ }
2626+}
2727+2828+fn cache_for(nodes: &[common::ServerInstance], idx: usize) -> Arc<dyn Cache> {
2929+ nodes[idx]
3030+ .cache
3131+ .clone()
3232+ .unwrap_or_else(|| panic!("node {idx} should have a cache"))
3333+}
3434+3535+fn rl_for(nodes: &[common::ServerInstance], idx: usize) -> Arc<dyn DistributedRateLimiter> {
3636+ nodes[idx]
3737+ .distributed_rate_limiter
3838+ .clone()
3939+ .unwrap_or_else(|| panic!("node {idx} should have a rate limiter"))
4040+}
4141+4242+#[tokio::test]
4343+async fn cluster_formation() {
4444+ let nodes = common::cluster().await;
4545+ assert!(nodes.len() >= 3, "expected at least 3 cluster nodes");
4646+4747+ let client = common::client();
4848+ let results: Vec<_> = futures::future::join_all(
4949+ nodes.iter().map(|node| {
5050+ let client = client.clone();
5151+ let url = node.url.clone();
5252+ async move {
5353+ client
5454+ .get(format!("{url}/xrpc/com.atproto.server.describeServer"))
5555+ .send()
5656+ .await
5757+ }
5858+ })
5959+ ).await;
6060+6161+ results.iter().enumerate().for_each(|(i, result)| {
6262+ let resp = result.as_ref().unwrap_or_else(|e| panic!("node {i} unreachable: {e}"));
6363+ assert_eq!(
6464+ resp.status(),
6565+ StatusCode::OK,
6666+ "node {i} returned non-200 status"
6767+ );
6868+ });
6969+}
7070+7171+#[tokio::test]
7272+async fn cluster_any_node_access() {
7373+ let nodes = common::cluster().await;
7474+ let client = common::client();
7575+7676+ let handle = format!("u{}", &uuid::Uuid::new_v4().simple().to_string()[..12]);
7777+ let payload = serde_json::json!({
7878+ "handle": handle,
7979+ "email": format!("{handle}@example.com"),
8080+ "password": "Testpass123!"
8181+ });
8282+ let create_res = client
8383+ .post(format!(
8484+ "{}/xrpc/com.atproto.server.createAccount",
8585+ nodes[0].url
8686+ ))
8787+ .json(&payload)
8888+ .send()
8989+ .await
9090+ .expect("createAccount on node 0 failed");
9191+ assert_eq!(create_res.status(), StatusCode::OK);
9292+ let body: serde_json::Value = create_res.json().await.expect("invalid json");
9393+ let did = body["did"].as_str().expect("no did").to_string();
9494+ let access_jwt = body["accessJwt"].as_str().expect("no accessJwt").to_string();
9595+9696+ let pool = common::get_test_db_pool().await;
9797+ let body_text: String = sqlx::query_scalar!(
9898+ "SELECT body FROM comms_queue WHERE user_id = (SELECT id FROM users WHERE did = $1) AND comms_type = 'email_verification' ORDER BY created_at DESC LIMIT 1",
9999+ &did
100100+ )
101101+ .fetch_one(pool)
102102+ .await
103103+ .expect("verification code not found");
104104+105105+ let lines: Vec<&str> = body_text.lines().collect();
106106+ let verification_code = lines
107107+ .iter()
108108+ .enumerate()
109109+ .find(|(_, line)| line.contains("verification code is:") || line.contains("code is:"))
110110+ .and_then(|(i, _)| lines.get(i + 1).map(|s| s.trim().to_string()))
111111+ .or_else(|| {
112112+ body_text
113113+ .lines()
114114+ .find(|line| line.trim().starts_with("MX"))
115115+ .map(|s| s.trim().to_string())
116116+ })
117117+ .unwrap_or_else(|| body_text.clone());
118118+119119+ let confirm_payload = serde_json::json!({
120120+ "did": did,
121121+ "verificationCode": verification_code
122122+ });
123123+ let confirm_res = client
124124+ .post(format!(
125125+ "{}/xrpc/com.atproto.server.confirmSignup",
126126+ nodes[0].url
127127+ ))
128128+ .json(&confirm_payload)
129129+ .send()
130130+ .await
131131+ .expect("confirmSignup failed");
132132+133133+ let token = match confirm_res.status() {
134134+ StatusCode::OK => {
135135+ let confirm_body: serde_json::Value =
136136+ confirm_res.json().await.expect("invalid json from confirmSignup");
137137+ confirm_body["accessJwt"]
138138+ .as_str()
139139+ .unwrap_or(&access_jwt)
140140+ .to_string()
141141+ }
142142+ _ => access_jwt,
143143+ };
144144+145145+ let describe_res = client
146146+ .get(format!(
147147+ "{}/xrpc/com.atproto.server.getSession",
148148+ nodes[1].url
149149+ ))
150150+ .bearer_auth(&token)
151151+ .send()
152152+ .await
153153+ .expect("getSession on node 1 failed");
154154+ assert_eq!(
155155+ describe_res.status(),
156156+ StatusCode::OK,
157157+ "session created on node 0 should be valid on node 1 (shared postgres)"
158158+ );
159159+ let session: serde_json::Value = describe_res.json().await.expect("invalid json");
160160+ assert_eq!(session["did"].as_str().unwrap(), did);
161161+}
162162+163163+#[tokio::test]
164164+async fn cache_convergence() {
165165+ let nodes = common::cluster().await;
166166+167167+ let cache_a = nodes[0]
168168+ .cache
169169+ .as_ref()
170170+ .expect("node 0 should have a cache");
171171+ let cache_b = nodes[1]
172172+ .cache
173173+ .as_ref()
174174+ .expect("node 1 should have a cache");
175175+176176+ let test_key = format!("ripple-test-{}", uuid::Uuid::new_v4());
177177+ let test_value = "converged-value";
178178+179179+ cache_a
180180+ .set(&test_key, test_value, Duration::from_secs(300))
181181+ .await
182182+ .expect("cache set on node A failed");
183183+184184+ let found_on_a = cache_a.get(&test_key).await;
185185+ assert_eq!(
186186+ found_on_a.as_deref(),
187187+ Some(test_value),
188188+ "value should be immediately readable on the originating node"
189189+ );
190190+191191+ let mut converged = false;
192192+ let mut attempts = 0;
193193+ let max_attempts = 50;
194194+ while attempts < max_attempts {
195195+ tokio::time::sleep(Duration::from_millis(200)).await;
196196+ if let Some(val) = cache_b.get(&test_key).await {
197197+ assert_eq!(val, test_value, "converged value should match");
198198+ converged = true;
199199+ break;
200200+ }
201201+ attempts += 1;
202202+ }
203203+204204+ assert!(
205205+ converged,
206206+ "cache value did not converge to node B within {}ms",
207207+ max_attempts * 200
208208+ );
209209+}
210210+211211+#[tokio::test]
212212+async fn rate_limit_convergence() {
213213+ let nodes = common::cluster().await;
214214+215215+ let rl_a = nodes[0]
216216+ .distributed_rate_limiter
217217+ .as_ref()
218218+ .expect("node 0 should have a rate limiter");
219219+ let rl_b = nodes[1]
220220+ .distributed_rate_limiter
221221+ .as_ref()
222222+ .expect("node 1 should have a rate limiter");
223223+224224+ let test_key = format!("rl-test-{}", uuid::Uuid::new_v4());
225225+ let limit: u32 = 100;
226226+ let window_ms: u64 = 600_000;
227227+ let hits_on_a: u32 = 80;
228228+229229+ let mut count = 0u32;
230230+ while count < hits_on_a {
231231+ let allowed = rl_a.check_rate_limit(&test_key, limit, window_ms).await;
232232+ assert!(
233233+ allowed,
234234+ "request {count} should be allowed (limit is {limit})"
235235+ );
236236+ count += 1;
237237+ }
238238+239239+ let rl_b2 = rl_b.clone();
240240+ let k = test_key.clone();
241241+ poll_until(15_000, 200, move || {
242242+ let rl = rl_b2.clone();
243243+ let k = k.clone();
244244+ async move { rl.peek_rate_limit_count(&k, window_ms).await >= hits_on_a as u64 }
245245+ })
246246+ .await;
247247+248248+ let mut allowed_on_b = 0u32;
249249+ while allowed_on_b < limit {
250250+ if !rl_b.check_rate_limit(&test_key, limit, window_ms).await {
251251+ break;
252252+ }
253253+ allowed_on_b += 1;
254254+ }
255255+256256+ assert!(
257257+ allowed_on_b < limit,
258258+ "node B should have been rate limited after convergence, but {allowed_on_b} requests were allowed (limit={limit})"
259259+ );
260260+ assert!(
261261+ allowed_on_b <= limit - hits_on_a + 10,
262262+ "node B allowed {allowed_on_b} requests but expected at most {} (convergence margin)",
263263+ limit - hits_on_a + 10
264264+ );
265265+}
266266+267267+#[tokio::test]
268268+async fn delete_convergence() {
269269+ let nodes = common::cluster().await;
270270+ let cache_0 = cache_for(nodes, 0);
271271+ let cache_1 = cache_for(nodes, 1);
272272+273273+ let key = format!("del-cluster-{}", uuid::Uuid::new_v4());
274274+275275+ cache_0
276276+ .set(&key, "to-delete", Duration::from_secs(300))
277277+ .await
278278+ .expect("set on node 0 failed");
279279+280280+ let c1 = cache_1.clone();
281281+ let k = key.clone();
282282+ poll_until(10_000, 200, move || {
283283+ let c = c1.clone();
284284+ let k = k.clone();
285285+ async move { c.get(&k).await.is_some() }
286286+ })
287287+ .await;
288288+289289+ cache_0.delete(&key).await.expect("delete on node 0 failed");
290290+291291+ let c1 = cache_1.clone();
292292+ let k = key.clone();
293293+ poll_until(10_000, 200, move || {
294294+ let c = c1.clone();
295295+ let k = k.clone();
296296+ async move { c.get(&k).await.is_none() }
297297+ })
298298+ .await;
299299+}
300300+301301+#[tokio::test]
302302+async fn three_node_transitive_convergence() {
303303+ let nodes = common::cluster().await;
304304+ let cache_0 = cache_for(nodes, 0);
305305+ let cache_2 = cache_for(nodes, 2);
306306+307307+ let key = format!("trans-{}", uuid::Uuid::new_v4());
308308+309309+ cache_0
310310+ .set(&key, "reaches-all", Duration::from_secs(300))
311311+ .await
312312+ .expect("set on node 0 failed");
313313+314314+ let c2 = cache_2.clone();
315315+ let k = key.clone();
316316+ poll_until(15_000, 200, move || {
317317+ let c = c2.clone();
318318+ let k = k.clone();
319319+ async move { c.get(&k).await.as_deref() == Some("reaches-all") }
320320+ })
321321+ .await;
322322+}
323323+324324+#[tokio::test]
325325+async fn cluster_overwrite_conflict_resolution() {
326326+ let nodes = common::cluster().await;
327327+ let cache_0 = cache_for(nodes, 0);
328328+ let cache_1 = cache_for(nodes, 1);
329329+ let cache_2 = cache_for(nodes, 2);
330330+331331+ let key = format!("conflict-{}", uuid::Uuid::new_v4());
332332+333333+ cache_0
334334+ .set(&key, "from-node-0", Duration::from_secs(300))
335335+ .await
336336+ .expect("set on node 0 failed");
337337+338338+ cache_1
339339+ .set(&key, "from-node-1", Duration::from_secs(300))
340340+ .await
341341+ .expect("set on node 1 failed");
342342+343343+ let c0 = cache_0.clone();
344344+ let c1 = cache_1.clone();
345345+ let c2 = cache_2.clone();
346346+ let k = key.clone();
347347+ poll_until(15_000, 200, move || {
348348+ let c0 = c0.clone();
349349+ let c1 = c1.clone();
350350+ let c2 = c2.clone();
351351+ let k = k.clone();
352352+ async move {
353353+ let (v0, v1, v2) = tokio::join!(c0.get(&k), c1.get(&k), c2.get(&k));
354354+ matches!((v0, v1, v2), (Some(a), Some(b), Some(c)) if a == b && b == c)
355355+ }
356356+ })
357357+ .await;
358358+359359+ let v0 = cache_0.get(&key).await.expect("node 0 should have key");
360360+ let v1 = cache_1.get(&key).await.expect("node 1 should have key");
361361+ let v2 = cache_2.get(&key).await.expect("node 2 should have key");
362362+363363+ assert_eq!(v0, v1, "node 0 and 1 must agree");
364364+ assert_eq!(v1, v2, "node 1 and 2 must agree");
365365+}
366366+367367+#[tokio::test]
368368+async fn cluster_bulk_key_convergence() {
369369+ let nodes = common::cluster().await;
370370+ let cache_0 = cache_for(nodes, 0);
371371+ let cache_1 = cache_for(nodes, 1);
372372+ let cache_2 = cache_for(nodes, 2);
373373+374374+ let prefix = format!("bulk-{}", uuid::Uuid::new_v4());
375375+376376+ futures::future::join_all((0..500).map(|i| {
377377+ let cache = cache_0.clone();
378378+ let p = prefix.clone();
379379+ async move {
380380+ cache
381381+ .set(
382382+ &format!("{p}-{i}"),
383383+ &format!("v-{i}"),
384384+ Duration::from_secs(300),
385385+ )
386386+ .await
387387+ .expect("set failed");
388388+ }
389389+ }))
390390+ .await;
391391+392392+ let c1 = cache_1.clone();
393393+ let p = prefix.clone();
394394+ poll_until(60_000, 500, move || {
395395+ let c = c1.clone();
396396+ let p = p.clone();
397397+ async move {
398398+ futures::future::join_all((0..500).map(|i| {
399399+ let c = c.clone();
400400+ let p = p.clone();
401401+ async move { c.get(&format!("{p}-{i}")).await.is_some() }
402402+ }))
403403+ .await
404404+ .into_iter()
405405+ .all(|v| v)
406406+ }
407407+ })
408408+ .await;
409409+410410+ let spot_checks: Vec<Option<String>> = futures::future::join_all(
411411+ [0, 99, 250, 499].iter().map(|&i| {
412412+ let c = cache_2.clone();
413413+ let p = prefix.clone();
414414+ async move { c.get(&format!("{p}-{i}")).await }
415415+ }),
416416+ )
417417+ .await;
418418+419419+ spot_checks.iter().enumerate().for_each(|(idx, val)| {
420420+ assert!(
421421+ val.is_some(),
422422+ "node 2 missing spot-check key at index {idx}"
423423+ );
424424+ });
425425+}
426426+427427+#[tokio::test]
428428+async fn cluster_concurrent_multi_node_writes() {
429429+ let nodes = common::cluster().await;
430430+ let cache_0 = cache_for(nodes, 0);
431431+ let cache_1 = cache_for(nodes, 1);
432432+ let cache_2 = cache_for(nodes, 2);
433433+434434+ let prefix = format!("multi-{}", uuid::Uuid::new_v4());
435435+436436+ let write_0 = {
437437+ let cache = cache_0.clone();
438438+ let p = prefix.clone();
439439+ async move {
440440+ futures::future::join_all((0..100).map(|i| {
441441+ let cache = cache.clone();
442442+ let p = p.clone();
443443+ async move {
444444+ cache
445445+ .set(
446446+ &format!("{p}-0-{i}"),
447447+ &format!("n0-{i}"),
448448+ Duration::from_secs(300),
449449+ )
450450+ .await
451451+ .expect("set failed");
452452+ }
453453+ }))
454454+ .await;
455455+ }
456456+ };
457457+458458+ let write_1 = {
459459+ let cache = cache_1.clone();
460460+ let p = prefix.clone();
461461+ async move {
462462+ futures::future::join_all((0..100).map(|i| {
463463+ let cache = cache.clone();
464464+ let p = p.clone();
465465+ async move {
466466+ cache
467467+ .set(
468468+ &format!("{p}-1-{i}"),
469469+ &format!("n1-{i}"),
470470+ Duration::from_secs(300),
471471+ )
472472+ .await
473473+ .expect("set failed");
474474+ }
475475+ }))
476476+ .await;
477477+ }
478478+ };
479479+480480+ let write_2 = {
481481+ let cache = cache_2.clone();
482482+ let p = prefix.clone();
483483+ async move {
484484+ futures::future::join_all((0..100).map(|i| {
485485+ let cache = cache.clone();
486486+ let p = p.clone();
487487+ async move {
488488+ cache
489489+ .set(
490490+ &format!("{p}-2-{i}"),
491491+ &format!("n2-{i}"),
492492+ Duration::from_secs(300),
493493+ )
494494+ .await
495495+ .expect("set failed");
496496+ }
497497+ }))
498498+ .await;
499499+ }
500500+ };
501501+502502+ tokio::join!(write_0, write_1, write_2);
503503+504504+ let caches: Vec<Arc<dyn Cache>> = vec![cache_0.clone(), cache_1.clone(), cache_2.clone()];
505505+506506+ futures::future::join_all(caches.iter().enumerate().map(|(ci, cache)| {
507507+ let cache = cache.clone();
508508+ let p = prefix.clone();
509509+ async move {
510510+ let c = cache.clone();
511511+ let p2 = p.clone();
512512+ poll_until(60_000, 500, move || {
513513+ let c = c.clone();
514514+ let p = p2.clone();
515515+ async move {
516516+ let checks = futures::future::join_all((0..3u8).flat_map(|node| {
517517+ let c = c.clone();
518518+ let p = p.clone();
519519+ (0..100).map(move |i| {
520520+ let c = c.clone();
521521+ let p = p.clone();
522522+ async move { c.get(&format!("{p}-{node}-{i}")).await.is_some() }
523523+ })
524524+ }))
525525+ .await;
526526+ checks.into_iter().all(|v| v)
527527+ }
528528+ })
529529+ .await;
530530+ eprintln!("node {ci} has all 300 keys");
531531+ }
532532+ }))
533533+ .await;
534534+}
535535+536536+#[tokio::test]
537537+async fn cluster_rate_limit_multi_node_convergence() {
538538+ let nodes = common::cluster().await;
539539+ let rl_0 = rl_for(nodes, 0);
540540+ let rl_1 = rl_for(nodes, 1);
541541+ let rl_2 = rl_for(nodes, 2);
542542+543543+ let key = format!("rl-multi-{}", uuid::Uuid::new_v4());
544544+ let limit: u32 = 300;
545545+ let window_ms: u64 = 600_000;
546546+547547+ futures::future::join_all((0..50).map(|_| {
548548+ let rl = rl_0.clone();
549549+ let k = key.clone();
550550+ async move {
551551+ assert!(rl.check_rate_limit(&k, limit, window_ms).await);
552552+ }
553553+ }))
554554+ .await;
555555+556556+ futures::future::join_all((0..40).map(|_| {
557557+ let rl = rl_1.clone();
558558+ let k = key.clone();
559559+ async move {
560560+ assert!(rl.check_rate_limit(&k, limit, window_ms).await);
561561+ }
562562+ }))
563563+ .await;
564564+565565+ futures::future::join_all((0..30).map(|_| {
566566+ let rl = rl_2.clone();
567567+ let k = key.clone();
568568+ async move {
569569+ assert!(rl.check_rate_limit(&k, limit, window_ms).await);
570570+ }
571571+ }))
572572+ .await;
573573+574574+ let rl_peek = rl_0.clone();
575575+ let k = key.clone();
576576+ poll_until(15_000, 200, move || {
577577+ let rl = rl_peek.clone();
578578+ let k = k.clone();
579579+ async move { rl.peek_rate_limit_count(&k, window_ms).await >= 120 }
580580+ })
581581+ .await;
582582+583583+ let mut remaining = 0u32;
584584+ loop {
585585+ if !rl_0.check_rate_limit(&key, limit, window_ms).await {
586586+ break;
587587+ }
588588+ remaining += 1;
589589+ if remaining > limit {
590590+ panic!("rate limiter never denied - convergence failed");
591591+ }
592592+ }
593593+594594+ let expected_remaining = limit - 120;
595595+ let margin = 20;
596596+ assert!(
597597+ remaining.abs_diff(expected_remaining) <= margin,
598598+ "expected ~{expected_remaining} remaining hits, got {remaining} (margin={margin})"
599599+ );
600600+}
601601+602602+fn create_account_on_node<'a>(
603603+ client: &'a reqwest::Client,
604604+ node_url: &'a str,
605605+) -> std::pin::Pin<Box<dyn std::future::Future<Output = (String, String)> + Send + 'a>> {
606606+ let url = node_url.to_string();
607607+ Box::pin(async move {
608608+ let handle = format!("u{}", &uuid::Uuid::new_v4().simple().to_string()[..12]);
609609+ let payload = json!({
610610+ "handle": handle,
611611+ "email": format!("{handle}@example.com"),
612612+ "password": "Testpass123!"
613613+ });
614614+ let create_res = client
615615+ .post(format!("{url}/xrpc/com.atproto.server.createAccount"))
616616+ .json(&payload)
617617+ .send()
618618+ .await
619619+ .expect("createAccount failed");
620620+ assert_eq!(create_res.status(), StatusCode::OK, "createAccount non-200");
621621+ let body: serde_json::Value = create_res.json().await.expect("invalid json");
622622+ let did = body["did"].as_str().expect("no did").to_string();
623623+ let access_jwt = body["accessJwt"].as_str().expect("no accessJwt").to_string();
624624+625625+ let pool = common::get_test_db_pool().await;
626626+ let body_text: String = sqlx::query_scalar!(
627627+ "SELECT body FROM comms_queue WHERE user_id = (SELECT id FROM users WHERE did = $1) AND comms_type = 'email_verification' ORDER BY created_at DESC LIMIT 1",
628628+ &did
629629+ )
630630+ .fetch_one(pool)
631631+ .await
632632+ .expect("verification code not found");
633633+634634+ let lines: Vec<&str> = body_text.lines().collect();
635635+ let verification_code = lines
636636+ .iter()
637637+ .enumerate()
638638+ .find(|(_, line)| line.contains("verification code is:") || line.contains("code is:"))
639639+ .and_then(|(i, _)| lines.get(i + 1).map(|s| s.trim().to_string()))
640640+ .or_else(|| {
641641+ body_text
642642+ .lines()
643643+ .find(|line| line.trim().starts_with("MX"))
644644+ .map(|s| s.trim().to_string())
645645+ })
646646+ .unwrap_or_else(|| body_text.clone());
647647+648648+ let confirm_res = client
649649+ .post(format!("{url}/xrpc/com.atproto.server.confirmSignup"))
650650+ .json(&json!({ "did": did, "verificationCode": verification_code }))
651651+ .send()
652652+ .await
653653+ .expect("confirmSignup failed");
654654+655655+ let token = match confirm_res.status() {
656656+ StatusCode::OK => {
657657+ let confirm_body: serde_json::Value =
658658+ confirm_res.json().await.expect("invalid json from confirmSignup");
659659+ confirm_body["accessJwt"]
660660+ .as_str()
661661+ .unwrap_or(&access_jwt)
662662+ .to_string()
663663+ }
664664+ _ => access_jwt,
665665+ };
666666+667667+ (token, did)
668668+ })
669669+}
670670+671671+#[tokio::test]
672672+async fn cross_node_rate_limit_via_login() {
673673+ let nodes = common::cluster().await;
674674+ let client = common::client();
675675+676676+ let uuid_bytes = uuid::Uuid::new_v4();
677677+ let b = uuid_bytes.as_bytes();
678678+ let unique_ip = format!("10.{}.{}.{}", b[0], b[1], b[2]);
679679+680680+ let statuses: Vec<StatusCode> = futures::future::join_all((0..10).map(|_| {
681681+ let client = client.clone();
682682+ let url = nodes[0].url.clone();
683683+ let ip = unique_ip.clone();
684684+ async move {
685685+ client
686686+ .post(format!("{url}/xrpc/com.atproto.server.createSession"))
687687+ .header("X-Forwarded-For", &ip)
688688+ .json(&json!({
689689+ "identifier": "nonexistent@example.com",
690690+ "password": "wrongpass"
691691+ }))
692692+ .send()
693693+ .await
694694+ .expect("request failed")
695695+ .status()
696696+ }
697697+ }))
698698+ .await;
699699+700700+ statuses.iter().enumerate().for_each(|(i, status)| {
701701+ assert_ne!(
702702+ *status,
703703+ StatusCode::TOO_MANY_REQUESTS,
704704+ "request {i} should not be rate limited within first 10 attempts"
705705+ );
706706+ });
707707+708708+ let rl_1 = rl_for(nodes, 1);
709709+ let rl_key = format!("login:{unique_ip}");
710710+ let rl_1c = rl_1.clone();
711711+ let k = rl_key.clone();
712712+ poll_until(30_000, 200, move || {
713713+ let rl = rl_1c.clone();
714714+ let k = k.clone();
715715+ async move { rl.peek_rate_limit_count(&k, 60_000).await >= 10 }
716716+ })
717717+ .await;
718718+719719+ let cross_node_res = client
720720+ .post(format!(
721721+ "{}/xrpc/com.atproto.server.createSession",
722722+ nodes[1].url
723723+ ))
724724+ .header("X-Forwarded-For", &unique_ip)
725725+ .json(&json!({
726726+ "identifier": "nonexistent@example.com",
727727+ "password": "wrongpass"
728728+ }))
729729+ .send()
730730+ .await
731731+ .expect("cross-node request failed");
732732+733733+ assert_eq!(
734734+ cross_node_res.status(),
735735+ StatusCode::TOO_MANY_REQUESTS,
736736+ "node 1 should rate limit after cross-node convergence of login attempts"
737737+ );
738738+}
739739+740740+#[tokio::test]
741741+async fn cross_node_handle_resolution_from_cache() {
742742+ let nodes = common::cluster().await;
743743+ let client = common::client();
744744+ let cache_0 = cache_for(nodes, 0);
745745+746746+ let fake_handle = format!("cached-{}.test", uuid::Uuid::new_v4().simple());
747747+ let fake_did = format!("did:plc:cached{}", &uuid::Uuid::new_v4().simple().to_string()[..16]);
748748+749749+ cache_0
750750+ .set(
751751+ &format!("handle:{fake_handle}"),
752752+ &fake_did,
753753+ Duration::from_secs(300),
754754+ )
755755+ .await
756756+ .expect("cache set failed");
757757+758758+ let cache_1 = cache_for(nodes, 1);
759759+ let c1 = cache_1.clone();
760760+ let k = format!("handle:{fake_handle}");
761761+ poll_until(10_000, 200, move || {
762762+ let c = c1.clone();
763763+ let k = k.clone();
764764+ async move { c.get(&k).await.is_some() }
765765+ })
766766+ .await;
767767+768768+ let res = client
769769+ .get(format!(
770770+ "{}/xrpc/com.atproto.identity.resolveHandle?handle={}",
771771+ nodes[1].url, fake_handle
772772+ ))
773773+ .send()
774774+ .await
775775+ .expect("resolveHandle request failed");
776776+777777+ assert_eq!(
778778+ res.status(),
779779+ StatusCode::OK,
780780+ "resolveHandle should succeed from propagated cache"
781781+ );
782782+ let body: serde_json::Value = res.json().await.expect("invalid json");
783783+ assert_eq!(
784784+ body["did"].as_str().unwrap(),
785785+ fake_did,
786786+ "resolved DID should match the cache-propagated value"
787787+ );
788788+}
789789+790790+#[tokio::test]
791791+async fn cross_node_cache_delete_observable_via_http() {
792792+ let nodes = common::cluster().await;
793793+ let client = common::client();
794794+ let cache_0 = cache_for(nodes, 0);
795795+ let cache_1 = cache_for(nodes, 1);
796796+797797+ let fake_handle = format!("deltest-{}.test", uuid::Uuid::new_v4().simple());
798798+ let fake_did = format!("did:plc:del{}", &uuid::Uuid::new_v4().simple().to_string()[..16]);
799799+ let cache_key = format!("handle:{fake_handle}");
800800+801801+ cache_0
802802+ .set(&cache_key, &fake_did, Duration::from_secs(300))
803803+ .await
804804+ .expect("cache set failed");
805805+806806+ let c1 = cache_1.clone();
807807+ let k = cache_key.clone();
808808+ poll_until(10_000, 200, move || {
809809+ let c = c1.clone();
810810+ let k = k.clone();
811811+ async move { c.get(&k).await.is_some() }
812812+ })
813813+ .await;
814814+815815+ let res = client
816816+ .get(format!(
817817+ "{}/xrpc/com.atproto.identity.resolveHandle?handle={}",
818818+ nodes[1].url, fake_handle
819819+ ))
820820+ .send()
821821+ .await
822822+ .expect("resolveHandle request failed");
823823+ assert_eq!(res.status(), StatusCode::OK, "should resolve before delete");
824824+825825+ cache_0
826826+ .delete(&cache_key)
827827+ .await
828828+ .expect("cache delete failed");
829829+830830+ let c1 = cache_1.clone();
831831+ let k = cache_key.clone();
832832+ poll_until(10_000, 200, move || {
833833+ let c = c1.clone();
834834+ let k = k.clone();
835835+ async move { c.get(&k).await.is_none() }
836836+ })
837837+ .await;
838838+839839+ let res = client
840840+ .get(format!(
841841+ "{}/xrpc/com.atproto.identity.resolveHandle?handle={}",
842842+ nodes[1].url, fake_handle
843843+ ))
844844+ .send()
845845+ .await
846846+ .expect("resolveHandle request failed after delete");
847847+ assert_ne!(
848848+ res.status(),
849849+ StatusCode::OK,
850850+ "resolveHandle should fail after cache delete propagation (handle not in DB)"
851851+ );
852852+}
853853+854854+#[tokio::test]
855855+async fn cross_node_email_update_status() {
856856+ let nodes = common::cluster().await;
857857+ let client = common::client();
858858+ let cache_0 = cache_for(nodes, 0);
859859+ let cache_1 = cache_for(nodes, 1);
860860+861861+ let (token, did) = create_account_on_node(&client, &nodes[0].url).await;
862862+863863+ let new_email = format!("updated-{}@example.com", uuid::Uuid::new_v4().simple());
864864+ let update_res = client
865865+ .post(format!(
866866+ "{}/xrpc/com.atproto.server.requestEmailUpdate",
867867+ nodes[0].url
868868+ ))
869869+ .bearer_auth(&token)
870870+ .json(&json!({ "newEmail": new_email }))
871871+ .send()
872872+ .await
873873+ .expect("requestEmailUpdate failed");
874874+ assert_eq!(
875875+ update_res.status(),
876876+ StatusCode::OK,
877877+ "requestEmailUpdate should succeed"
878878+ );
879879+ let update_body: serde_json::Value = update_res.json().await.expect("invalid json");
880880+ assert_eq!(
881881+ update_body["tokenRequired"].as_bool(),
882882+ Some(true),
883883+ "tokenRequired should be true (email is verified after confirmSignup)"
884884+ );
885885+886886+ let cache_key = format!("email_update:{did}");
887887+ let val_on_0 = cache_0.get(&cache_key).await;
888888+ assert!(
889889+ val_on_0.is_some(),
890890+ "email_update entry should exist on node 0 immediately after requestEmailUpdate"
891891+ );
892892+893893+ let c1 = cache_1.clone();
894894+ let k = cache_key.clone();
895895+ poll_until(10_000, 200, move || {
896896+ let c = c1.clone();
897897+ let k = k.clone();
898898+ async move { c.get(&k).await.is_some() }
899899+ })
900900+ .await;
901901+902902+ let status_res = client
903903+ .get(format!(
904904+ "{}/xrpc/_account.checkEmailUpdateStatus",
905905+ nodes[1].url
906906+ ))
907907+ .bearer_auth(&token)
908908+ .send()
909909+ .await
910910+ .expect("checkEmailUpdateStatus on node 1 failed");
911911+ assert_eq!(
912912+ status_res.status(),
913913+ StatusCode::OK,
914914+ "checkEmailUpdateStatus should succeed on node 1"
915915+ );
916916+ let status_body: serde_json::Value = status_res.json().await.expect("invalid json");
917917+ assert_eq!(
918918+ status_body["pending"].as_bool(),
919919+ Some(true),
920920+ "email update should be pending on node 1 via cache propagation"
921921+ );
922922+ assert_eq!(
923923+ status_body["newEmail"].as_str().unwrap(),
924924+ new_email,
925925+ "new email should match on node 1"
926926+ );
927927+}
928928+929929+#[tokio::test]
930930+async fn cross_node_session_revocation() {
931931+ let nodes = common::cluster().await;
932932+ let client = common::client();
933933+934934+ let (token, _did) = create_account_on_node(&client, &nodes[0].url).await;
935935+936936+ let session_res = client
937937+ .get(format!(
938938+ "{}/xrpc/com.atproto.server.getSession",
939939+ nodes[0].url
940940+ ))
941941+ .bearer_auth(&token)
942942+ .send()
943943+ .await
944944+ .expect("getSession on node 0 failed");
945945+ assert_eq!(
946946+ session_res.status(),
947947+ StatusCode::OK,
948948+ "session should be valid on node 0"
949949+ );
950950+951951+ let client2 = client.clone();
952952+ let url1 = nodes[1].url.clone();
953953+ let t = token.clone();
954954+ poll_until(15_000, 200, move || {
955955+ let c = client2.clone();
956956+ let u = url1.clone();
957957+ let t = t.clone();
958958+ async move {
959959+ c.get(format!("{u}/xrpc/com.atproto.server.getSession"))
960960+ .bearer_auth(&t)
961961+ .send()
962962+ .await
963963+ .map(|r| r.status() == StatusCode::OK)
964964+ .unwrap_or(false)
965965+ }
966966+ })
967967+ .await;
968968+969969+ let delete_res = client
970970+ .post(format!(
971971+ "{}/xrpc/com.atproto.server.deleteSession",
972972+ nodes[0].url
973973+ ))
974974+ .bearer_auth(&token)
975975+ .send()
976976+ .await
977977+ .expect("deleteSession failed");
978978+ assert_eq!(
979979+ delete_res.status(),
980980+ StatusCode::OK,
981981+ "deleteSession should succeed"
982982+ );
983983+984984+ let client3 = client.clone();
985985+ let url1 = nodes[1].url.clone();
986986+ let t = token.clone();
987987+ poll_until(15_000, 200, move || {
988988+ let c = client3.clone();
989989+ let u = url1.clone();
990990+ let t = t.clone();
991991+ async move {
992992+ c.get(format!("{u}/xrpc/com.atproto.server.getSession"))
993993+ .bearer_auth(&t)
994994+ .send()
995995+ .await
996996+ .map(|r| r.status() != StatusCode::OK)
997997+ .unwrap_or(false)
998998+ }
999999+ })
10001000+ .await;
10011001+}
+27-14
crates/tranquil-pds/tests/sync_conformance.rs
···160160161161 set_account_takedown(&did, Some("test-takedown-ref")).await;
162162163163- let res = client
164164- .get(format!(
163163+ let mut cursor: Option<String> = None;
164164+ let mut takendown_repo: Option<Value> = None;
165165+ loop {
166166+ let mut url = format!(
165167 "{}/xrpc/com.atproto.sync.listRepos?limit=1000",
166168 base_url().await
167167- ))
168168- .send()
169169- .await
170170- .expect("Failed to send request");
171171-172172- assert_eq!(res.status(), StatusCode::OK);
173173- let body: Value = res.json().await.expect("Response was not valid JSON");
174174- let repos = body["repos"].as_array().unwrap();
175175-176176- let takendown_repo = repos.iter().find(|r| r["did"] == did);
169169+ );
170170+ if let Some(ref c) = cursor {
171171+ url.push_str(&format!("&cursor={}", c));
172172+ }
173173+ let res = client
174174+ .get(&url)
175175+ .send()
176176+ .await
177177+ .expect("Failed to send request");
178178+ assert_eq!(res.status(), StatusCode::OK);
179179+ let body: Value = res.json().await.expect("Response was not valid JSON");
180180+ let repos = body["repos"].as_array().unwrap();
181181+ if let Some(found) = repos.iter().find(|r| r["did"] == did) {
182182+ takendown_repo = Some(found.clone());
183183+ break;
184184+ }
185185+ match body["cursor"].as_str() {
186186+ Some(c) => cursor = Some(c.to_string()),
187187+ None => break,
188188+ }
189189+ }
177190 assert!(takendown_repo.is_some(), "Takendown repo should be in list");
178191 let repo = takendown_repo.unwrap();
179179- assert_eq!(repo["active"], false);
180180- assert_eq!(repo["status"], "takendown");
192192+ assert_eq!(repo["active"], false, "repo should be inactive: {:?}", repo);
193193+ assert_eq!(repo["status"], "takendown", "repo status should be takendown: {:?}", repo);
181194}
182195183196#[tokio::test]
+63-63
crates/tranquil-pds/tests/whole_story.rs
···486486 let base = base_url().await;
487487 let (did, jwt) = setup_new_user("blob-lifecycle").await;
488488489489- let blob1_data = b"First blob for testing lifecycle";
489489+ let blob1_data = format!("First blob for testing lifecycle {}", uuid::Uuid::new_v4());
490490+ let blob1_data = blob1_data.as_bytes();
490491 let upload1_res = client
491492 .post(format!("{}/xrpc/com.atproto.repo.uploadBlob", base))
492493 .header(header::CONTENT_TYPE, "text/plain")
···500501 let blob1 = upload1_body["blob"].clone();
501502 let blob1_cid = blob1["ref"]["$link"].as_str().unwrap();
502503503503- let blob2_data = b"Second blob for testing lifecycle";
504504+ let blob2_data = format!("Second blob for testing lifecycle {}", uuid::Uuid::new_v4());
505505+ let blob2_data = blob2_data.as_bytes();
504506 let upload2_res = client
505507 .post(format!("{}/xrpc/com.atproto.repo.uploadBlob", base))
506508 .header(header::CONTENT_TYPE, "text/plain")
···12781280 let (did, jwt) = setup_new_user("scale-posts").await;
1279128112801282 let post_count = 1000;
12811281- let post_futures: Vec<_> = (0..post_count)
12831283+ futures::stream::iter(0..post_count)
12821284 .map(|i| {
12831285 let client = client.clone();
12841286 let base = base.to_string();
···13111313 );
13121314 }
13131315 })
13141314- .collect();
13151315-13161316- join_all(post_futures).await;
13161316+ .buffer_unordered(50)
13171317+ .collect::<Vec<()>>()
13181318+ .await;
1317131913181320 let count_res = client
13191321 .get(format!("{}/xrpc/com.atproto.repo.listRecords", base))
···13491351 "All posts should have unique URIs"
13501352 );
1351135313521352- let delete_futures: Vec<_> = all_uris
13531353- .iter()
13541354- .take(500)
13541354+ futures::stream::iter(all_uris.iter().take(500))
13551355 .map(|uri| {
13561356 let client = client.clone();
13571357 let base = base.to_string();
···13731373 assert_eq!(res.status(), StatusCode::OK);
13741374 }
13751375 })
13761376- .collect();
13771377-13781378- join_all(delete_futures).await;
13761376+ .buffer_unordered(50)
13771377+ .collect::<Vec<()>>()
13781378+ .await;
1379137913801380 let final_count = count_records(&client, base, &jwt, &did, "app.bsky.feed.post").await;
13811381 assert_eq!(
···1396139613971397 let users: Vec<(String, String)> = join_all(user_futures).await;
1398139813991399- let follow_futures: Vec<_> = users
13991399+ let follow_pairs: Vec<(String, String, String)> = users
14001400 .iter()
14011401 .enumerate()
14021402 .flat_map(|(i, (follower_did, follower_jwt))| {
14031403+ users.iter().enumerate()
14041404+ .filter(move |(j, _)| *j != i)
14051405+ .map(|(_, (followee_did, _))| {
14061406+ (follower_did.clone(), follower_jwt.clone(), followee_did.clone())
14071407+ })
14081408+ .collect::<Vec<_>>()
14091409+ })
14101410+ .collect();
14111411+ futures::stream::iter(follow_pairs)
14121412+ .map(|(follower_did, follower_jwt, followee_did)| {
14031413 let client = client.clone();
14041414 let base = base.to_string();
14051405- users.iter().enumerate().filter(move |(j, _)| *j != i).map({
14061406- let client = client.clone();
14071407- let base = base.clone();
14081408- let follower_did = follower_did.clone();
14091409- let follower_jwt = follower_jwt.clone();
14101410- move |(_, (followee_did, _))| {
14111411- let client = client.clone();
14121412- let base = base.clone();
14131413- let follower_did = follower_did.clone();
14141414- let follower_jwt = follower_jwt.clone();
14151415- let followee_did = followee_did.clone();
14161416- async move {
14171417- let rkey = format!(
14181418- "follow_{}",
14191419- &uuid::Uuid::new_v4().simple().to_string()[..12]
14201420- );
14211421- let res = client
14221422- .post(format!("{}/xrpc/com.atproto.repo.putRecord", base))
14231423- .bearer_auth(&follower_jwt)
14241424- .json(&json!({
14251425- "repo": follower_did,
14261426- "collection": "app.bsky.graph.follow",
14271427- "rkey": rkey,
14281428- "record": {
14291429- "$type": "app.bsky.graph.follow",
14301430- "subject": followee_did,
14311431- "createdAt": Utc::now().to_rfc3339()
14321432- }
14331433- }))
14341434- .send()
14351435- .await
14361436- .expect("Follow failed");
14371437- let status = res.status();
14381438- let body: Value = res.json().await.unwrap_or_default();
14391439- assert_eq!(status, StatusCode::OK, "Follow failed: {:?}", body);
14401440- }
14411441- }
14421442- })
14151415+ async move {
14161416+ let rkey = format!(
14171417+ "follow_{}",
14181418+ &uuid::Uuid::new_v4().simple().to_string()[..12]
14191419+ );
14201420+ let res = client
14211421+ .post(format!("{}/xrpc/com.atproto.repo.putRecord", base))
14221422+ .bearer_auth(&follower_jwt)
14231423+ .json(&json!({
14241424+ "repo": follower_did,
14251425+ "collection": "app.bsky.graph.follow",
14261426+ "rkey": rkey,
14271427+ "record": {
14281428+ "$type": "app.bsky.graph.follow",
14291429+ "subject": followee_did,
14301430+ "createdAt": Utc::now().to_rfc3339()
14311431+ }
14321432+ }))
14331433+ .send()
14341434+ .await
14351435+ .expect("Follow failed");
14361436+ let status = res.status();
14371437+ let body: Value = res.json().await.unwrap_or_default();
14381438+ assert_eq!(status, StatusCode::OK, "Follow failed: {:?}", body);
14391439+ }
14431440 })
14441444- .collect();
14451445-14461446- join_all(follow_futures).await;
14411441+ .buffer_unordered(50)
14421442+ .collect::<Vec<()>>()
14431443+ .await;
1447144414481445 let expected_follows_per_user = user_count - 1;
14491446 let verify_futures: Vec<_> = users
···15291526 let (did, jwt) = setup_new_user("scale-blobs").await;
1530152715311528 let blob_count = 300;
15321532- let blob_futures: Vec<_> = (0..blob_count)
15291529+ let blobs: Vec<Value> = futures::stream::iter(0..blob_count)
15331530 .map(|i| {
15341531 let client = client.clone();
15351532 let base = base.to_string();
15361533 let jwt = jwt.clone();
15371534 async move {
15381538- let blob_data = format!("Blob data number {} with some padding to make it realistic size for testing purposes", i);
15351535+ let blob_data = format!("Blob data number {} {} with some padding to make it realistic size for testing purposes", i, uuid::Uuid::new_v4());
15391536 let res = client
15401537 .post(format!("{}/xrpc/com.atproto.repo.uploadBlob", base))
15411538 .header(header::CONTENT_TYPE, "text/plain")
···15491546 body["blob"].clone()
15501547 }
15511548 })
15521552- .collect();
15491549+ .buffer_unordered(50)
15501550+ .collect::<Vec<Value>>()
15511551+ .await;
1553155215541554- let blobs: Vec<Value> = join_all(blob_futures).await;
15551555-15561556- let post_futures: Vec<_> = blobs
15531553+ let blob_chunks: Vec<(usize, Vec<Value>)> = blobs
15571554 .chunks(3)
15581555 .enumerate()
15561556+ .map(|(i, chunk)| (i, chunk.to_vec()))
15571557+ .collect();
15581558+ futures::stream::iter(blob_chunks)
15591559 .map(|(i, blob_chunk)| {
15601560 let client = client.clone();
15611561 let base = base.to_string();
···15961596 assert_eq!(status, StatusCode::OK, "Post with blobs failed: {:?}", body);
15971597 }
15981598 })
15991599- .collect();
16001600-16011601- join_all(post_futures).await;
15991599+ .buffer_unordered(50)
16001600+ .collect::<Vec<()>>()
16011601+ .await;
1602160216031603 let list_blobs_res = client
16041604 .get(format!("{}/xrpc/com.atproto.sync.listBlobs", base))
···11+pub mod cache;
22+pub mod config;
33+pub mod crdt;
44+pub mod engine;
55+pub mod eviction;
66+pub mod gossip;
77+pub mod rate_limiter;
88+pub mod transport;
99+1010+pub use config::RippleConfig;
1111+pub use engine::{RippleEngine, RippleStartError};
···43434444## Standalone Containers (No Compose)
45454646-If you already have postgres and valkey running on the host (eg., from the [Debian install guide](install-debian.md)), you can run just the app containers.
4646+If you already have postgres running on the host (eg. from the [Debian install guide](install-debian.md)), you can run just the app containers.
47474848Build the images:
4949```sh
···5151podman build -t tranquil-pds-frontend:latest ./frontend
5252```
53535454-Run the backend with host networking (so it can access postgres/valkey on localhost) and mount the blob storage:
5454+Run the backend with host networking (so it can access postgres on localhost) and mount the blob storage:
5555```sh
5656podman run -d --name tranquil-pds \
5757 --network=host \
···106106107107```bash
108108mkdir -p /etc/containers/systemd
109109-mkdir -p /srv/tranquil-pds/{postgres,valkey,blobs,backups,certs,acme,config}
109109+mkdir -p /srv/tranquil-pds/{postgres,blobs,backups,certs,acme,config}
110110```
111111112112## Create Environment File
···127127128128Copy the quadlet files from the repository:
129129```bash
130130-cp /opt/tranquil-pds/deploy/quadlets/*.pod /etc/containers/systemd/
131131-cp /opt/tranquil-pds/deploy/quadlets/*.container /etc/containers/systemd/
130130+cp /opt/tranquil-pds/deploy/quadlets/tranquil-pds.pod /etc/containers/systemd/
131131+cp /opt/tranquil-pds/deploy/quadlets/tranquil-pds-db.container /etc/containers/systemd/
132132+cp /opt/tranquil-pds/deploy/quadlets/tranquil-pds-app.container /etc/containers/systemd/
133133+cp /opt/tranquil-pds/deploy/quadlets/tranquil-pds-frontend.container /etc/containers/systemd/
134134+cp /opt/tranquil-pds/deploy/quadlets/tranquil-pds-nginx.container /etc/containers/systemd/
132135```
133136137137+Optional quadlets for valkey and minio are also available in `deploy/quadlets/` if you need them.
138138+134139Note: Systemd doesn't support shell-style variable expansion in `Environment=` lines. The quadlet files expect DATABASE_URL to be set in the environment file.
135140136141## Create nginx Configuration
···160165161166```bash
162167systemctl daemon-reload
163163-systemctl start tranquil-pds-db tranquil-pds-valkey
168168+systemctl start tranquil-pds-db
164169sleep 10
165170```
166171···172177173178## Obtain Wildcard SSL Certificate
174179175175-User handles are served as subdomains (eg., `alice.pds.example.com`), so you need a wildcard certificate. Wildcard certs require DNS-01 validation.
180180+User handles are served as subdomains (eg. `alice.pds.example.com`), so you need a wildcard certificate. Wildcard certs require DNS-01 validation.
176181177182Create temporary self-signed cert to start services:
178183```bash
···195200196201Follow the prompts to add TXT records to your DNS. Note: manual mode doesn't auto-renew.
197202198198-For automated renewal, use a DNS provider plugin (eg., cloudflare, route53).
203203+For automated renewal, use a DNS provider plugin (eg. cloudflare, route53).
199204200205Link certificates and restart:
201206```bash
···207212## Enable All Services
208213209214```bash
210210-systemctl enable tranquil-pds-db tranquil-pds-valkey tranquil-pds-app tranquil-pds-frontend tranquil-pds-nginx
215215+systemctl enable tranquil-pds-db tranquil-pds-app tranquil-pds-frontend tranquil-pds-nginx
211216```
212217213218## Configure Firewall
···252257253258```sh
254259mkdir -p /srv/tranquil-pds/{data,config}
255255-mkdir -p /srv/tranquil-pds/data/{postgres,valkey,blobs,backups,certs,acme}
260260+mkdir -p /srv/tranquil-pds/data/{postgres,blobs,backups,certs,acme}
256261```
257262258263## Clone Repository and Build Images
···346351347352## Obtain Wildcard SSL Certificate
348353349349-User handles are served as subdomains (eg., `alice.pds.example.com`), so you need a wildcard certificate. Wildcard certs require DNS-01 validation.
354354+User handles are served as subdomains (eg. `alice.pds.example.com`), so you need a wildcard certificate. Wildcard certs require DNS-01 validation.
350355351356Create temporary self-signed cert to start services:
352357```sh
-8
docs/install-debian.md
···46464747We'll set ownership after creating the service user.
48484949-## Install valkey
5050-5151-```bash
5252-apt install -y valkey
5353-systemctl enable valkey-server
5454-systemctl start valkey-server
5555-```
5656-5749## Install deno (for frontend build)
58505951```bash
-3
docs/install-kubernetes.md
···33If you're reaching for kubernetes for this app, you're experienced enough to know how to spin up:
4455- cloudnativepg (or your preferred postgres operator)
66-- valkey
76- a PersistentVolume for blob storage
87- the app itself (it's just a container with some env vars)
98···1312- `DATABASE_URL` - postgres connection string
1413- `BLOB_STORAGE_PATH` - path to blob storage (mount a PV here)
1514- `BACKUP_STORAGE_PATH` - path for repo backups (optional but recommended)
1616-- `VALKEY_URL` - redis:// connection string
1715- `PDS_HOSTNAME` - your PDS hostname (without protocol)
1816- `JWT_SECRET`, `DPOP_SECRET`, `MASTER_KEY` - generate with `openssl rand -base64 48`
1917- `CRAWLERS` - typically `https://bsky.network`
···4139 </body>
4240 </html>
4341```
4444-