···5### Storage backend abstraction
6Make storage layers swappable via traits.
78-filesystem blob storage
9-- [ ] FilesystemBlobStorage implementation
10-- [ ] directory structure (content-addressed like blobs/{cid} already used in objsto)
11-- [ ] atomic writes (write to temp, rename)
12-- [ ] config option to choose backend (env var or config flag)
13-- [ ] also traitify BackupStorage (currently hardcoded to objsto)
14-15sqlite database backend
16- [ ] abstract db layer behind trait (queries, transactions, migrations)
17- [ ] sqlite implementation matching postgres behavior
···20- [ ] testing: run full test suite against both backends
21- [ ] config option to choose backend (postgres vs sqlite)
22- [ ] document tradeoffs (sqlite for single-user/small, postgres for multi-user/scale)
002324### Plugin system
25WASM component model plugins. Compile to wasm32-wasip2, sandboxed via wasmtime, capability-gated. Based on zed's extensions.
···131- [ ] on_access_grant_request for custom authorization
132- [ ] on_key_rotation to notify interested parties
133134----
135-136-## Completed
137-138-Core ATProto: Health, describeServer, all session endpoints, full repo CRUD, applyWrites, blob upload, importRepo, firehose with cursor replay, CAR export, blob sync, crawler notifications, handle resolution, PLC operations, full admin API, moderation reports.
139-140-did:web support: Self-hosted did:web (subdomain format `did:web:handle.pds.com`), external/BYOD did:web, DID document serving via `/.well-known/did.json`, clear registration warnings about did:web trade-offs vs did:plc.
141-142-OAuth 2.1: Authorization server metadata, JWKS, PAR, authorize endpoint with login UI, token endpoint (auth code + refresh), revocation, introspection, DPoP, PKCE S256, client metadata validation, private_key_jwt verification.
143-144-OAuth Scope Enforcement: Full granular scope system with consent UI, human-readable scope descriptions, per-client scope preferences, scope parsing (repo/blob/rpc/account/identity), endpoint-level scope checks, DPoP token support in auth extractors, token revocation on re-authorization, response_mode support (query/fragment).
145-146-App endpoints: getPreferences, putPreferences, getProfile, getProfiles, getTimeline, getAuthorFeed, getActorLikes, getPostThread, getFeed, registerPush (all with local-first + proxy fallback).
147-148-Infrastructure: Sequencer with cursor replay, postgres repo storage with atomic transactions, valkey DID cache, debounced crawler notifications with circuit breakers, multi-channel notifications (email/Discord/Telegram/Signal), image processing, distributed rate limiting, security hardening.
149-150-Web UI: OAuth login, registration, email verification, password reset, multi-account selector, dashboard, sessions, app passwords, invites, notification preferences, repo browser, CAR export, admin panel, OAuth consent screen with scope selection.
151-152-Auth: ES256K + HS256 dual support, JTI-only token storage, refresh token family tracking, encrypted signing keys (AES-256-GCM), DPoP replay protection, constant-time comparisons.
153-154-Passkeys and 2FA: WebAuthn/FIDO2 passkey registration and authentication, TOTP with QR setup, backup codes (hashed, one-time use), passkey-only account creation, trusted devices (remember this browser), re-auth for sensitive actions, rate-limited 2FA attempts, settings UI for managing all auth methods.
155-156-App password scopes: Granular permissions for app passwords using the same scope system as OAuth. Preset buttons for common use cases (full access, read-only, post-only), scope stored in session and preserved across token refresh, explicit RPC/repo/blob scope enforcement for restricted passwords.
157-158-Account Delegation: Delegated accounts controlled by other accounts instead of passwords. OAuth delegation flow (authenticate as controller), scope-based permissions (owner/admin/editor/viewer presets), scope intersection (tokens limited to granted permissions), `act` claim for delegation tracking, creating delegated account flow, controller management UI, "act as" account switcher, comprehensive audit logging with actor/controller tracking, delegation-aware OAuth consent with permission limitation notices.
159-160-Migration: OAuth-based inbound migration wizard with PLC token flow, offline restore from CAR file + rotation key for disaster recovery, scheduled automatic backups, standalone repo/blob export, did:web DID document editor for self-service identity management, handle preservation (keep existing external handle via DNS/HTTP verification or create new PDS-subdomain handle).
···5### Storage backend abstraction
6Make storage layers swappable via traits.
700000008sqlite database backend
9- [ ] abstract db layer behind trait (queries, transactions, migrations)
10- [ ] sqlite implementation matching postgres behavior
···13- [ ] testing: run full test suite against both backends
14- [ ] config option to choose backend (postgres vs sqlite)
15- [ ] document tradeoffs (sqlite for single-user/small, postgres for multi-user/scale)
16+17+- [ ] skip sqlite and just straight-up do our own db?!
1819### Plugin system
20WASM component model plugins. Compile to wasm32-wasip2, sandboxed via wasmtime, capability-gated. Based on zed's extensions.
···126- [ ] on_access_grant_request for custom authorization
127- [ ] on_key_rotation to notify interested parties
128000000000000000000000000000
···21 let bday = NaiveDate::parse_from_str(birth_date, "%Y-%m-%d").ok()?;
22 let today = Utc::now().date_naive();
23 let mut age = today.year() - bday.year();
24- let m = today.month() as i32 - bday.month() as i32;
25 if m < 0 || (m == 0 && today.day() < bday.day()) {
26 age -= 1;
27 }
···21 let bday = NaiveDate::parse_from_str(birth_date, "%Y-%m-%d").ok()?;
22 let today = Utc::now().date_naive();
23 let mut age = today.year() - bday.year();
24+ let m = i32::try_from(today.month()).unwrap_or(0) - i32::try_from(bday.month()).unwrap_or(0);
25 if m < 0 || (m == 0 && today.day() < bday.day()) {
26 age -= 1;
27 }
···5758 for part in scope_parts {
59 if let Some(cid_str) = part.strip_prefix("ref:") {
60- let cache_key = format!("scope_ref:{}", cid_str);
61 if let Some(cached) = state.cache.get(&cache_key).await {
62 for s in cached.split_whitespace() {
63 if !resolved_scopes.contains(&s.to_string()) {
···5758 for part in scope_parts {
59 if let Some(cid_str) = part.strip_prefix("ref:") {
60+ let cache_key = crate::cache_keys::scope_ref_key(cid_str);
61 if let Some(cached) = state.cache.get(&cache_key).await {
62 for s in cached.split_whitespace() {
63 if !resolved_scopes.contains(&s.to_string()) {
···197 max_size: u32,
198 ) -> Result<ProcessedImage, ImageError> {
199 let (orig_width, orig_height) = (img.width(), img.height());
00200 let (new_width, new_height) = if orig_width > orig_height {
201- let ratio = max_size as f64 / orig_width as f64;
202- (max_size, (orig_height as f64 * ratio) as u32)
0203 } else {
204- let ratio = max_size as f64 / orig_height as f64;
205- ((orig_width as f64 * ratio) as u32, max_size)
0206 };
207 let thumb = img.resize(new_width, new_height, FilterType::Lanczos3);
208 self.encode_image(&thumb)
···197 max_size: u32,
198 ) -> Result<ProcessedImage, ImageError> {
199 let (orig_width, orig_height) = (img.width(), img.height());
200+ let safe_f64_to_u32 =
201+ |v: f64| -> u32 { u32::try_from(v.round() as u64).unwrap_or(u32::MAX) };
202 let (new_width, new_height) = if orig_width > orig_height {
203+ let ratio = f64::from(max_size) / f64::from(orig_width);
204+ let scaled = safe_f64_to_u32((f64::from(orig_height) * ratio).max(1.0));
205+ (max_size, scaled.min(max_size))
206 } else {
207+ let ratio = f64::from(max_size) / f64::from(orig_height);
208+ let scaled = safe_f64_to_u32((f64::from(orig_width) * ratio).max(1.0));
209+ (scaled.min(max_size), max_size)
210 };
211 let thumb = img.resize(new_width, new_height, FilterType::Lanczos3);
212 self.encode_image(&thumb)
···210 if let Ipld::Map(entry_obj) = entry {
211 let prefix_len = entry_obj
212 .get("p")
213- .and_then(|p| {
214- if let Ipld::Integer(n) = p {
215- Some(*n as usize)
216- } else {
217- None
218- }
219 })
220 .unwrap_or(0);
221
···210 if let Ipld::Map(entry_obj) = entry {
211 let prefix_len = entry_obj
212 .get("p")
213+ .and_then(|p| match p {
214+ Ipld::Integer(n) => usize::try_from(*n).ok(),
215+ _ => None,
000216 })
217 .unwrap_or(0);
218
+2-1
crates/tranquil-pds/src/sync/mod.rs
···20pub use subscribe_repos::subscribe_repos;
21pub use tranquil_db_traits::AccountStatus;
22pub use util::{
23- RepoAccount, RepoAvailabilityError, assert_repo_availability, get_account_with_status,
024};
25pub use verify::{CarVerifier, VerifiedCar, VerifyError};
···20pub use subscribe_repos::subscribe_repos;
21pub use tranquil_db_traits::AccountStatus;
22pub use util::{
23+ RepoAccessLevel, RepoAccount, RepoAvailabilityError, assert_repo_availability,
24+ get_account_with_status,
25};
26pub use verify::{CarVerifier, VerifiedCar, VerifyError};
+57-86
crates/tranquil-pds/src/sync/repo.rs
···1use crate::api::error::ApiError;
2use crate::scheduled::generate_repo_car_from_user_blocks;
3use crate::state::AppState;
4-use crate::sync::car::encode_car_header;
5-use crate::sync::util::assert_repo_availability;
6use axum::{
7 extract::{Query, RawQuery, State},
8 http::StatusCode,
···11use cid::Cid;
12use jacquard_repo::storage::BlockStore;
13use serde::Deserialize;
14-use std::io::Write;
15use std::str::FromStr;
16use tracing::error;
17use tranquil_types::Did;
1819-fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> {
20- let did = crate::util::parse_repeated_query_param(Some(query_string), "did")
0000021 .into_iter()
22 .next()
23- .ok_or("Missing required parameter: did")?;
00024 let cids = crate::util::parse_repeated_query_param(Some(query_string), "cids");
25- Ok((did, cids))
26}
2728pub async fn get_blocks(State(state): State<AppState>, RawQuery(query): RawQuery) -> Response {
···30 return ApiError::InvalidRequest("Missing query parameters".into()).into_response();
31 };
3233- let (did_str, cid_strings) = match parse_get_blocks_query(&query_string) {
00034 Ok(parsed) => parsed,
35- Err(msg) => return ApiError::InvalidRequest(msg).into_response(),
36- };
37- let did: Did = match did_str.parse() {
38- Ok(d) => d,
39- Err(_) => return ApiError::InvalidRequest("invalid did".into()).into_response(),
40- };
41-42- let _account = match assert_repo_availability(state.repo_repo.as_ref(), &did, false).await {
43- Ok(a) => a,
44 Err(e) => return e.into_response(),
45 };
460000000047 let cids: Vec<Cid> = match cid_strings
48 .iter()
49 .map(|s| Cid::from_str(s).map_err(|_| s.clone()))
···89 }
90 };
91 let mut car_bytes = header;
92- for (i, block_opt) in blocks.into_iter().enumerate() {
93- if let Some(block) = block_opt {
94- let cid = cids[i];
95- let cid_bytes = cid.to_bytes();
96- let total_len = cid_bytes.len() + block.len();
97- let mut writer = Vec::new();
98- crate::sync::car::write_varint(&mut writer, total_len as u64)
99- .expect("Writing to Vec<u8> should never fail");
100- writer
101- .write_all(&cid_bytes)
102- .expect("Writing to Vec<u8> should never fail");
103- writer
104- .write_all(&block)
105- .expect("Writing to Vec<u8> should never fail");
106- car_bytes.extend_from_slice(&writer);
107- }
108- }
109 (
110 StatusCode::OK,
111 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
···116117#[derive(Deserialize)]
118pub struct GetRepoQuery {
119- pub did: String,
120 pub since: Option<String>,
121}
122···124 State(state): State<AppState>,
125 Query(query): Query<GetRepoQuery>,
126) -> Response {
127- let did: Did = match query.did.parse() {
128- Ok(d) => d,
129- Err(_) => return ApiError::InvalidRequest("invalid did".into()).into_response(),
130- };
131- let account = match assert_repo_availability(state.repo_repo.as_ref(), &did, false).await {
132- Ok(a) => a,
133- Err(e) => return e.into_response(),
134- };
135136 let Some(head_str) = account.repo_root_cid else {
137 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response();
···223 }
224 };
225226- for (i, block_opt) in blocks.into_iter().enumerate() {
227- if let Some(block) = block_opt {
228- let cid = block_cids[i];
229- let cid_bytes = cid.to_bytes();
230- let total_len = cid_bytes.len() + block.len();
231- let mut writer = Vec::new();
232- crate::sync::car::write_varint(&mut writer, total_len as u64)
233- .expect("Writing to Vec<u8> should never fail");
234- writer
235- .write_all(&cid_bytes)
236- .expect("Writing to Vec<u8> should never fail");
237- writer
238- .write_all(&block)
239- .expect("Writing to Vec<u8> should never fail");
240- car_bytes.extend_from_slice(&writer);
241- }
242- }
243244 (
245 StatusCode::OK,
···251252#[derive(Deserialize)]
253pub struct GetRecordQuery {
254- pub did: String,
255 pub collection: String,
256 pub rkey: String,
257}
···265 use std::collections::BTreeMap;
266 use std::sync::Arc;
267268- let did: Did = match query.did.parse() {
269- Ok(d) => d,
270- Err(_) => return ApiError::InvalidRequest("invalid did".into()).into_response(),
271- };
272- let account = match assert_repo_availability(state.repo_repo.as_ref(), &did, false).await {
273- Ok(a) => a,
274- Err(e) => return e.into_response(),
275- };
276277 let commit_cid_str = match account.repo_root_cid {
278 Some(cid) => cid,
···321 }
322 };
323 let mut car_bytes = header;
324- let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| {
325- let cid_bytes = cid.to_bytes();
326- let total_len = cid_bytes.len() + data.len();
327- let mut writer = Vec::new();
328- crate::sync::car::write_varint(&mut writer, total_len as u64)
329- .expect("Writing to Vec<u8> should never fail");
330- writer
331- .write_all(&cid_bytes)
332- .expect("Writing to Vec<u8> should never fail");
333- writer
334- .write_all(data)
335- .expect("Writing to Vec<u8> should never fail");
336- car.extend_from_slice(&writer);
337- };
338- write_block(&mut car_bytes, &commit_cid, &commit_bytes);
339 proof_blocks
340 .iter()
341- .for_each(|(cid, data)| write_block(&mut car_bytes, cid, data));
342- write_block(&mut car_bytes, &record_cid, &record_block);
343 (
344 StatusCode::OK,
345 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
···99 use tranquil_pds::api::repo::record::utils::create_signed_commit;
100101 let signing_key = SigningKey::random(&mut rand::thread_rng());
102- let did = unsafe { Did::new_unchecked("did:plc:testuser123456789abcdef") };
00103 let data_cid =
104 Cid::from_str("bafyreib2rxk3ryblouj3fxza5jvx6psmwewwessc4m6g6e7pqhhkwqomfi").unwrap();
105 let rev = Tid::now(LimitedU32::MIN).to_string();
···99 use tranquil_pds::api::repo::record::utils::create_signed_commit;
100101 let signing_key = SigningKey::random(&mut rand::thread_rng());
102+ let did: Did = "did:plc:testuser123456789abcdef"
103+ .parse()
104+ .expect("valid test DID");
105 let data_cid =
106 Cid::from_str("bafyreib2rxk3ryblouj3fxza5jvx6psmwewwessc4m6g6e7pqhhkwqomfi").unwrap();
107 let rev = Tid::now(LimitedU32::MIN).to_string();
···1+mod common;
2+mod firehose;
3+4+use cid::Cid;
5+use common::*;
6+use firehose::{FirehoseConsumer, ParsedCommitFrame};
7+use iroh_car::CarReader;
8+use jacquard_repo::commit::Commit;
9+use reqwest::StatusCode;
10+use serde_json::{Value, json};
11+use std::io::Cursor;
12+use std::str::FromStr;
13+use tranquil_scopes::RepoAction;
14+15+mod helpers;
16+17+async fn create_post_record(client: &reqwest::Client, token: &str, did: &str, text: &str) -> Value {
18+ let payload = json!({
19+ "repo": did,
20+ "collection": "app.bsky.feed.post",
21+ "record": {
22+ "$type": "app.bsky.feed.post",
23+ "text": text,
24+ "createdAt": chrono::Utc::now().to_rfc3339(),
25+ }
26+ });
27+ let res = client
28+ .post(format!(
29+ "{}/xrpc/com.atproto.repo.createRecord",
30+ base_url().await
31+ ))
32+ .bearer_auth(token)
33+ .json(&payload)
34+ .send()
35+ .await
36+ .expect("Failed to create post");
37+ assert_eq!(res.status(), StatusCode::OK);
38+ res.json().await.expect("Invalid JSON from createRecord")
39+}
40+41+async fn get_latest_commit(client: &reqwest::Client, token: &str, did: &str) -> Value {
42+ let res = client
43+ .get(format!(
44+ "{}/xrpc/com.atproto.sync.getLatestCommit?did={}",
45+ base_url().await,
46+ did
47+ ))
48+ .bearer_auth(token)
49+ .send()
50+ .await
51+ .expect("Failed to get latest commit");
52+ assert_eq!(res.status(), StatusCode::OK);
53+ res.json().await.expect("Invalid JSON from getLatestCommit")
54+}
55+56+#[tokio::test]
57+async fn test_create_record_cid_matches_firehose() {
58+ let client = client();
59+ let (token, did) = create_account_and_login(&client).await;
60+61+ let pool = get_test_db_pool().await;
62+ let cursor: i64 = sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(seq), 0) FROM repo_seq")
63+ .fetch_one(pool)
64+ .await
65+ .unwrap();
66+ let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await;
67+ tokio::time::sleep(std::time::Duration::from_millis(100)).await;
68+69+ let api_response = create_post_record(&client, &token, &did, "CID match test").await;
70+ let api_commit_cid = api_response["commit"]["cid"].as_str().unwrap();
71+ let api_commit_rev = api_response["commit"]["rev"].as_str().unwrap();
72+ let api_record_cid = api_response["cid"].as_str().unwrap();
73+74+ let frames = consumer
75+ .wait_for_commits(&did, 1, std::time::Duration::from_secs(10))
76+ .await;
77+ let frame = &frames[0];
78+79+ assert_eq!(
80+ api_commit_cid,
81+ frame.commit.to_string(),
82+ "API commit CID must match firehose commit CID"
83+ );
84+ assert_eq!(
85+ api_commit_rev, frame.rev,
86+ "API commit rev must match firehose rev"
87+ );
88+ assert_eq!(frame.ops.len(), 1, "Expected exactly 1 op");
89+ assert_eq!(
90+ api_record_cid,
91+ frame.ops[0].cid.unwrap().to_string(),
92+ "API record CID must match firehose op CID"
93+ );
94+ assert_eq!(frame.ops[0].action, RepoAction::Create);
95+ assert!(frame.ops[0].prev.is_none(), "Create op must have no prev");
96+97+ let latest = get_latest_commit(&client, &token, &did).await;
98+ assert_eq!(
99+ latest["cid"].as_str().unwrap(),
100+ api_commit_cid,
101+ "getLatestCommit CID must match"
102+ );
103+ assert_eq!(
104+ latest["rev"].as_str().unwrap(),
105+ api_commit_rev,
106+ "getLatestCommit rev must match"
107+ );
108+}
109+110+#[tokio::test]
111+async fn test_update_record_prev_matches_old_cid() {
112+ let client = client();
113+ let (token, did) = create_account_and_login(&client).await;
114+115+ let v1_payload = json!({
116+ "repo": did,
117+ "collection": "app.bsky.actor.profile",
118+ "rkey": "self",
119+ "record": {
120+ "$type": "app.bsky.actor.profile",
121+ "displayName": "Profile v1",
122+ }
123+ });
124+ let v1_res = client
125+ .post(format!(
126+ "{}/xrpc/com.atproto.repo.putRecord",
127+ base_url().await
128+ ))
129+ .bearer_auth(&token)
130+ .json(&v1_payload)
131+ .send()
132+ .await
133+ .expect("Failed to create profile v1");
134+ assert_eq!(v1_res.status(), StatusCode::OK);
135+ let v1_body: Value = v1_res.json().await.unwrap();
136+ let v1_cid_str = v1_body["cid"].as_str().unwrap();
137+ let v1_cid = Cid::from_str(v1_cid_str).unwrap();
138+139+ let pool = get_test_db_pool().await;
140+ let cursor: i64 = sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(seq), 0) FROM repo_seq")
141+ .fetch_one(pool)
142+ .await
143+ .unwrap();
144+ let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await;
145+ tokio::time::sleep(std::time::Duration::from_millis(100)).await;
146+147+ let v2_payload = json!({
148+ "repo": did,
149+ "collection": "app.bsky.actor.profile",
150+ "rkey": "self",
151+ "record": {
152+ "$type": "app.bsky.actor.profile",
153+ "displayName": "Profile v2",
154+ }
155+ });
156+ let v2_res = client
157+ .post(format!(
158+ "{}/xrpc/com.atproto.repo.putRecord",
159+ base_url().await
160+ ))
161+ .bearer_auth(&token)
162+ .json(&v2_payload)
163+ .send()
164+ .await
165+ .expect("Failed to update profile v2");
166+ assert_eq!(v2_res.status(), StatusCode::OK);
167+ let v2_body: Value = v2_res.json().await.unwrap();
168+ let v2_cid_str = v2_body["cid"].as_str().unwrap();
169+ let v2_cid = Cid::from_str(v2_cid_str).unwrap();
170+171+ let frames = consumer
172+ .wait_for_commits(&did, 1, std::time::Duration::from_secs(10))
173+ .await;
174+ let frame = &frames[0];
175+176+ let profile_op = frame
177+ .ops
178+ .iter()
179+ .find(|op| op.path.contains("app.bsky.actor.profile"))
180+ .expect("No profile op found");
181+182+ assert_eq!(profile_op.action, RepoAction::Update);
183+ assert_eq!(
184+ profile_op.prev,
185+ Some(v1_cid),
186+ "Update op.prev must be the old CID"
187+ );
188+ assert_eq!(
189+ profile_op.cid,
190+ Some(v2_cid),
191+ "Update op.cid must be the new CID"
192+ );
193+ assert!(
194+ frame.prev_data.is_some(),
195+ "Update commit must have prevData"
196+ );
197+}
198+199+#[tokio::test]
200+async fn test_delete_record_prev_set_cid_none() {
201+ let client = client();
202+ let (token, did) = create_account_and_login(&client).await;
203+204+ let create_body = create_post_record(&client, &token, &did, "To be deleted").await;
205+ let record_cid = Cid::from_str(create_body["cid"].as_str().unwrap()).unwrap();
206+ let uri = create_body["uri"].as_str().unwrap();
207+ let parts: Vec<&str> = uri.split('/').collect();
208+ let collection = parts[parts.len() - 2];
209+ let rkey = parts[parts.len() - 1];
210+211+ let pool = get_test_db_pool().await;
212+ let cursor: i64 = sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(seq), 0) FROM repo_seq")
213+ .fetch_one(pool)
214+ .await
215+ .unwrap();
216+ let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await;
217+ tokio::time::sleep(std::time::Duration::from_millis(100)).await;
218+219+ let delete_payload = json!({
220+ "repo": did,
221+ "collection": collection,
222+ "rkey": rkey,
223+ });
224+ let del_res = client
225+ .post(format!(
226+ "{}/xrpc/com.atproto.repo.deleteRecord",
227+ base_url().await
228+ ))
229+ .bearer_auth(&token)
230+ .json(&delete_payload)
231+ .send()
232+ .await
233+ .expect("Failed to delete record");
234+ assert_eq!(del_res.status(), StatusCode::OK);
235+236+ let frames = consumer
237+ .wait_for_commits(&did, 1, std::time::Duration::from_secs(10))
238+ .await;
239+ let frame = &frames[0];
240+241+ assert_eq!(frame.ops.len(), 1, "Expected exactly 1 delete op");
242+ let op = &frame.ops[0];
243+ assert_eq!(op.action, RepoAction::Delete);
244+ assert!(op.cid.is_none(), "Delete op.cid must be None");
245+ assert_eq!(
246+ op.prev,
247+ Some(record_cid),
248+ "Delete op.prev must be the original CID"
249+ );
250+}
251+252+#[tokio::test]
253+async fn test_five_record_commit_chain_integrity() {
254+ let client = client();
255+ let (token, did) = create_account_and_login(&client).await;
256+257+ let pool = get_test_db_pool().await;
258+ let cursor: i64 = sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(seq), 0) FROM repo_seq")
259+ .fetch_one(pool)
260+ .await
261+ .unwrap();
262+ let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await;
263+ tokio::time::sleep(std::time::Duration::from_millis(100)).await;
264+265+ let texts = [
266+ "Chain post 0",
267+ "Chain post 1",
268+ "Chain post 2",
269+ "Chain post 3",
270+ "Chain post 4",
271+ ];
272+ for text in &texts {
273+ create_post_record(&client, &token, &did, text).await;
274+ }
275+276+ let mut frames = consumer
277+ .wait_for_commits(&did, 5, std::time::Duration::from_secs(15))
278+ .await;
279+ frames.sort_by_key(|f| f.seq);
280+281+ let revs: Vec<&str> = frames.iter().map(|f| f.rev.as_str()).collect();
282+ let unique_revs: std::collections::HashSet<&&str> = revs.iter().collect();
283+ assert_eq!(
284+ unique_revs.len(),
285+ 5,
286+ "All rev values must be distinct, got: {:?}",
287+ revs
288+ );
289+290+ let seqs: Vec<i64> = frames.iter().map(|f| f.seq).collect();
291+ seqs.windows(2).for_each(|pair| {
292+ assert!(
293+ pair[1] > pair[0],
294+ "Seq values must be strictly monotonically increasing: {} <= {}",
295+ pair[1],
296+ pair[0]
297+ );
298+ });
299+300+ frames.iter().enumerate().skip(1).for_each(|(i, frame)| {
301+ assert_eq!(
302+ frame.since.as_deref(),
303+ Some(frames[i - 1].rev.as_str()),
304+ "Frame {} since must equal frame {} rev",
305+ i,
306+ i - 1
307+ );
308+ });
309+310+ let latest = get_latest_commit(&client, &token, &did).await;
311+ let final_frame = frames.last().unwrap();
312+ assert_eq!(
313+ latest["cid"].as_str().unwrap(),
314+ final_frame.commit.to_string(),
315+ "getLatestCommit CID must match final frame"
316+ );
317+ assert_eq!(
318+ latest["rev"].as_str().unwrap(),
319+ final_frame.rev,
320+ "getLatestCommit rev must match final frame"
321+ );
322+}
323+324+#[tokio::test]
325+async fn test_apply_writes_single_commit_multiple_ops() {
326+ let client = client();
327+ let (token, did) = create_account_and_login(&client).await;
328+329+ let pool = get_test_db_pool().await;
330+ let cursor: i64 = sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(seq), 0) FROM repo_seq")
331+ .fetch_one(pool)
332+ .await
333+ .unwrap();
334+ let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await;
335+ tokio::time::sleep(std::time::Duration::from_millis(100)).await;
336+337+ let now = chrono::Utc::now().to_rfc3339();
338+ let writes: Vec<Value> = (0..3)
339+ .map(|i| {
340+ json!({
341+ "$type": "com.atproto.repo.applyWrites#create",
342+ "collection": "app.bsky.feed.post",
343+ "value": {
344+ "$type": "app.bsky.feed.post",
345+ "text": format!("Batch post {}", i),
346+ "createdAt": now,
347+ }
348+ })
349+ })
350+ .collect();
351+352+ let payload = json!({
353+ "repo": did,
354+ "writes": writes,
355+ });
356+ let res = client
357+ .post(format!(
358+ "{}/xrpc/com.atproto.repo.applyWrites",
359+ base_url().await
360+ ))
361+ .bearer_auth(&token)
362+ .json(&payload)
363+ .send()
364+ .await
365+ .expect("Failed to applyWrites");
366+ assert_eq!(res.status(), StatusCode::OK);
367+ let api_body: Value = res.json().await.unwrap();
368+ let api_results = api_body["results"].as_array().expect("No results array");
369+ assert_eq!(api_results.len(), 3, "Expected 3 results from applyWrites");
370+371+ let frames = consumer
372+ .wait_for_commits(&did, 1, std::time::Duration::from_secs(10))
373+ .await;
374+ assert_eq!(
375+ frames.len(),
376+ 1,
377+ "applyWrites should produce exactly 1 commit"
378+ );
379+ let frame = &frames[0];
380+ assert_eq!(frame.ops.len(), 3, "Commit should contain 3 ops");
381+382+ frame.ops.iter().for_each(|op| {
383+ assert_eq!(op.action, RepoAction::Create, "All ops should be Create");
384+ });
385+386+ api_results.iter().enumerate().for_each(|(i, result)| {
387+ let api_cid = result["cid"].as_str().expect("No cid in result");
388+ let frame_cid = frame.ops[i].cid.expect("No cid in op").to_string();
389+ assert_eq!(
390+ api_cid, frame_cid,
391+ "API result[{}] CID must match firehose op[{}] CID",
392+ i, i
393+ );
394+ });
395+}
396+397+#[tokio::test]
398+async fn test_firehose_commit_signature_verification() {
399+ let client = client();
400+ let (token, did) = create_account_and_login(&client).await;
401+402+ let key_bytes = helpers::get_user_signing_key(&did)
403+ .await
404+ .expect("Failed to get signing key");
405+ let signing_key =
406+ k256::ecdsa::SigningKey::from_slice(&key_bytes).expect("Invalid signing key bytes");
407+ let pubkey_bytes = signing_key.verifying_key().to_encoded_point(true);
408+ let pubkey = jacquard_common::types::crypto::PublicKey {
409+ codec: jacquard_common::types::crypto::KeyCodec::Secp256k1,
410+ bytes: std::borrow::Cow::Owned(pubkey_bytes.as_bytes().to_vec()),
411+ };
412+413+ let pool = get_test_db_pool().await;
414+ let cursor: i64 = sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(seq), 0) FROM repo_seq")
415+ .fetch_one(pool)
416+ .await
417+ .unwrap();
418+ let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await;
419+ tokio::time::sleep(std::time::Duration::from_millis(100)).await;
420+421+ let _api_response =
422+ create_post_record(&client, &token, &did, "Signature verification test").await;
423+424+ let frames = consumer
425+ .wait_for_commits(&did, 1, std::time::Duration::from_secs(10))
426+ .await;
427+ let frame = &frames[0];
428+429+ let mut car_reader = CarReader::new(Cursor::new(&frame.blocks))
430+ .await
431+ .expect("Failed to parse CAR");
432+ let mut blocks = std::collections::HashMap::new();
433+ while let Ok(Some((cid, data))) = car_reader.next_block().await {
434+ blocks.insert(cid, data);
435+ }
436+437+ let commit_block = blocks
438+ .get(&frame.commit)
439+ .expect("Commit block not found in CAR");
440+441+ let commit = Commit::from_cbor(commit_block).expect("Failed to parse commit from CBOR");
442+443+ commit
444+ .verify(&pubkey)
445+ .expect("Commit signature verification failed");
446+447+ assert_eq!(
448+ commit.rev().to_string(),
449+ frame.rev,
450+ "Commit rev must match frame rev"
451+ );
452+ assert_eq!(
453+ commit.did().as_str(),
454+ did,
455+ "Commit DID must match account DID"
456+ );
457+}
458+459+#[tokio::test]
460+async fn test_cursor_backfill_completeness() {
461+ let client = client();
462+ let (token, did) = create_account_and_login(&client).await;
463+464+ let pool = get_test_db_pool().await;
465+ let baseline_seq: i64 =
466+ sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(seq), 0) FROM repo_seq")
467+ .fetch_one(pool)
468+ .await
469+ .unwrap();
470+471+ let mut expected_cids: Vec<String> = Vec::with_capacity(5);
472+ let texts = [
473+ "Backfill 0",
474+ "Backfill 1",
475+ "Backfill 2",
476+ "Backfill 3",
477+ "Backfill 4",
478+ ];
479+ for text in &texts {
480+ let body = create_post_record(&client, &token, &did, text).await;
481+ expected_cids.push(body["commit"]["cid"].as_str().unwrap().to_string());
482+ }
483+484+ tokio::time::sleep(std::time::Duration::from_millis(200)).await;
485+486+ let consumer = FirehoseConsumer::connect_with_cursor(app_port(), baseline_seq).await;
487+488+ let frames = consumer
489+ .wait_for_commits(&did, 5, std::time::Duration::from_secs(15))
490+ .await;
491+492+ let mut sorted_frames: Vec<ParsedCommitFrame> = frames;
493+ sorted_frames.sort_by_key(|f| f.seq);
494+495+ let received_cids: Vec<String> = sorted_frames.iter().map(|f| f.commit.to_string()).collect();
496+497+ expected_cids.iter().for_each(|expected| {
498+ assert!(
499+ received_cids.contains(expected),
500+ "Missing commit {} in backfill",
501+ expected
502+ );
503+ });
504+505+ let seqs: Vec<i64> = sorted_frames.iter().map(|f| f.seq).collect();
506+ let unique_seqs: std::collections::HashSet<&i64> = seqs.iter().collect();
507+ assert_eq!(
508+ unique_seqs.len(),
509+ seqs.len(),
510+ "No duplicate seq values allowed in backfill"
511+ );
512+}
513+514+#[tokio::test]
515+async fn test_multi_account_seq_interleaving() {
516+ let client = client();
517+ let (alice_token, alice_did) = create_account_and_login(&client).await;
518+ let (bob_token, bob_did) = create_account_and_login(&client).await;
519+520+ let pool = get_test_db_pool().await;
521+ let cursor: i64 = sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(seq), 0) FROM repo_seq")
522+ .fetch_one(pool)
523+ .await
524+ .unwrap();
525+ let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await;
526+ tokio::time::sleep(std::time::Duration::from_millis(100)).await;
527+528+ let _a1 = create_post_record(&client, &alice_token, &alice_did, "Alice post 1").await;
529+ tokio::time::sleep(std::time::Duration::from_millis(50)).await;
530+ let _b1 = create_post_record(&client, &bob_token, &bob_did, "Bob post 1").await;
531+ tokio::time::sleep(std::time::Duration::from_millis(50)).await;
532+ let _a2 = create_post_record(&client, &alice_token, &alice_did, "Alice post 2").await;
533+ tokio::time::sleep(std::time::Duration::from_millis(50)).await;
534+ let _b2 = create_post_record(&client, &bob_token, &bob_did, "Bob post 2").await;
535+536+ let alice_frames = consumer
537+ .wait_for_commits(&alice_did, 2, std::time::Duration::from_secs(10))
538+ .await;
539+ let bob_frames = consumer
540+ .wait_for_commits(&bob_did, 2, std::time::Duration::from_secs(10))
541+ .await;
542+543+ let mut all_commits = consumer.all_commits();
544+ all_commits.sort_by_key(|f| f.seq);
545+546+ let global_seqs: Vec<i64> = all_commits.iter().map(|f| f.seq).collect();
547+ global_seqs.windows(2).for_each(|pair| {
548+ assert!(
549+ pair[1] > pair[0],
550+ "Global seq must be strictly monotonically increasing: {} <= {}",
551+ pair[1],
552+ pair[0]
553+ );
554+ });
555+556+ let mut alice_sorted: Vec<ParsedCommitFrame> = alice_frames;
557+ alice_sorted.sort_by_key(|f| f.seq);
558+ assert_eq!(alice_sorted.len(), 2);
559+ assert!(
560+ alice_sorted[1].since.is_some(),
561+ "Alice's second commit must have since"
562+ );
563+ assert_eq!(
564+ alice_sorted[1].since.as_deref(),
565+ Some(alice_sorted[0].rev.as_str()),
566+ "Alice's since chain must be self-consistent"
567+ );
568+569+ let mut bob_sorted: Vec<ParsedCommitFrame> = bob_frames;
570+ bob_sorted.sort_by_key(|f| f.seq);
571+ assert_eq!(bob_sorted.len(), 2);
572+ assert!(
573+ bob_sorted[1].since.is_some(),
574+ "Bob's second commit must have since"
575+ );
576+ assert_eq!(
577+ bob_sorted[1].since.as_deref(),
578+ Some(bob_sorted[0].rev.as_str()),
579+ "Bob's since chain must be self-consistent"
580+ );
581+}
+13
crates/tranquil-pds/tests/ripple_cluster.rs
···677 let nodes = common::cluster().await;
678 let client = common::client();
6790000000000000680 let uuid_bytes = uuid::Uuid::new_v4();
681 let b = uuid_bytes.as_bytes();
682 let unique_ip = format!("10.{}.{}.{}", b[0], b[1], b[2]);
···677 let nodes = common::cluster().await;
678 let client = common::client();
679680+ let now_ms = u64::try_from(
681+ std::time::SystemTime::now()
682+ .duration_since(std::time::UNIX_EPOCH)
683+ .unwrap()
684+ .as_millis(),
685+ )
686+ .unwrap_or(u64::MAX);
687+ let login_window_ms: u64 = 60_000;
688+ let remaining = login_window_ms - (now_ms % login_window_ms);
689+ if remaining < 35_000 {
690+ tokio::time::sleep(Duration::from_millis(remaining + 100)).await;
691+ }
692+693 let uuid_bytes = uuid::Uuid::new_v4();
694 let b = uuid_bytes.as_bytes();
695 let unique_ip = format!("10.{}.{}.{}", b[0], b[1], b[2]);