···1use crate::api::error::ApiError;
2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log};
3use crate::api::repo::record::write::{CommitInfo, prepare_repo_write};
4-use crate::auth::{Active, Auth};
05use crate::delegation::DelegationActionType;
6use crate::repo::tracking::TrackingBlockStore;
7use crate::state::AppState;
···43 auth: Auth<Active>,
44 Json(input): Json<DeleteRecordInput>,
45) -> Result<Response, crate::api::error::ApiError> {
46- let repo_auth = match prepare_repo_write(&state, &auth, &input.repo).await {
0000047 Ok(res) => res,
48 Err(err_res) => return Ok(err_res),
49 };
5051- if let Err(e) = crate::auth::scope_check::check_repo_scope(
52- repo_auth.is_oauth,
53- repo_auth.scope.as_deref(),
54- crate::oauth::RepoAction::Delete,
55- &input.collection,
56- ) {
57- return Ok(e);
58- }
59-60 let did = repo_auth.did;
61 let user_id = repo_auth.user_id;
62 let current_root_cid = repo_auth.current_root_cid;
63 let controller_did = repo_auth.controller_did;
6465 if let Some(swap_commit) = &input.swap_commit
66- && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
67 {
68 return Ok(ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response());
69 }
70 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
71- let commit_bytes = match tracking_store.get(¤t_root_cid).await {
72 Ok(Some(b)) => b,
73 _ => {
74 return Ok(
···159 .into_iter()
160 .collect();
161 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
162- let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
163 .chain(
164 old_mst_blocks
165 .keys()
···173 CommitParams {
174 did: &did,
175 user_id,
176- current_root_cid: Some(current_root_cid),
177 prev_data_cid: Some(commit.data),
178 new_mst_root,
179 ops: vec![op],
···1use crate::api::error::ApiError;
2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log};
3use crate::api::repo::record::write::{CommitInfo, prepare_repo_write};
4+use crate::auth::{Active, Auth, VerifyScope};
5+use crate::cid_types::CommitCid;
6use crate::delegation::DelegationActionType;
7use crate::repo::tracking::TrackingBlockStore;
8use crate::state::AppState;
···44 auth: Auth<Active>,
45 Json(input): Json<DeleteRecordInput>,
46) -> Result<Response, crate::api::error::ApiError> {
47+ let scope_proof = match auth.verify_repo_delete(&input.collection) {
48+ Ok(proof) => proof,
49+ Err(e) => return Ok(e.into_response()),
50+ };
51+52+ let repo_auth = match prepare_repo_write(&state, &scope_proof, &input.repo).await {
53 Ok(res) => res,
54 Err(err_res) => return Ok(err_res),
55 };
5600000000057 let did = repo_auth.did;
58 let user_id = repo_auth.user_id;
59 let current_root_cid = repo_auth.current_root_cid;
60 let controller_did = repo_auth.controller_did;
6162 if let Some(swap_commit) = &input.swap_commit
63+ && CommitCid::from_str(swap_commit).ok().as_ref() != Some(¤t_root_cid)
64 {
65 return Ok(ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response());
66 }
67 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
68+ let commit_bytes = match tracking_store.get(current_root_cid.as_cid()).await {
69 Ok(Some(b)) => b,
70 _ => {
71 return Ok(
···156 .into_iter()
157 .collect();
158 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
159+ let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid.into_cid())
160 .chain(
161 old_mst_blocks
162 .keys()
···170 CommitParams {
171 did: &did,
172 user_id,
173+ current_root_cid: Some(current_root_cid.into_cid()),
174 prev_data_cid: Some(commit.data),
175 new_mst_root,
176 ops: vec![op],
+5
crates/tranquil-pds/src/api/repo/record/mod.rs
···1pub mod batch;
2pub mod delete;
03pub mod read;
4pub mod utils;
5pub mod validation;
06pub mod write;
00078pub use batch::apply_writes;
9pub use delete::{DeleteRecordInput, delete_record, delete_record_internal};
···1pub mod batch;
2pub mod delete;
3+pub mod pagination;
4pub mod read;
5pub mod utils;
6pub mod validation;
7+pub mod validation_mode;
8pub mod write;
9+10+pub use pagination::PaginationDirection;
11+pub use validation_mode::ValidationMode;
1213pub use batch::apply_writes;
14pub use delete::{DeleteRecordInput, delete_record, delete_record_internal};
···01use std::sync::OnceLock;
2use tranquil_db_traits::SsoProviderType;
3···50 };
5152 if config.is_any_enabled() {
53- let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_default();
54 if hostname.is_empty() || hostname == "localhost" {
55 panic!(
56 "PDS_HOSTNAME must be set to a valid hostname when SSO is enabled. \
···1+use crate::util::pds_hostname;
2use std::sync::OnceLock;
3use tranquil_db_traits::SsoProviderType;
4···51 };
5253 if config.is_any_enabled() {
54+ let hostname = pds_hostname();
55 if hostname.is_empty() || hostname == "localhost" {
56 panic!(
57 "PDS_HOSTNAME must be set to a valid hostname when SSO is enabled. \
···2use crate::sync::firehose::SequencedEvent;
3use std::sync::atomic::{AtomicI64, Ordering};
4use tracing::{debug, error, info, warn};
056static LAST_BROADCAST_SEQ: AtomicI64 = AtomicI64::new(0);
78pub async fn start_sequencer_listener(state: AppState) {
9- let initial_seq = state.repo_repo.get_max_seq().await.unwrap_or(0);
10- LAST_BROADCAST_SEQ.store(initial_seq, Ordering::SeqCst);
11- info!(initial_seq = initial_seq, "Initialized sequencer listener");
000000012 tokio::spawn(async move {
13 info!("Starting sequencer listener background task");
14 loop {
···27 .await
28 .map_err(|e| anyhow::anyhow!("Failed to subscribe to events: {:?}", e))?;
29 info!("Connected to database and listening for repo updates");
30- let catchup_start = LAST_BROADCAST_SEQ.load(Ordering::SeqCst);
31 let events = state
32 .repo_repo
33 .get_events_since_seq(catchup_start, None)
···36 if !events.is_empty() {
37 info!(
38 count = events.len(),
39- from_seq = catchup_start,
40 "Broadcasting catch-up events"
41 );
42 events.into_iter().for_each(|event| {
43 let seq = event.seq;
44 let firehose_event = to_firehose_event(event);
45 let _ = state.firehose_tx.send(firehose_event);
46- LAST_BROADCAST_SEQ.store(seq, Ordering::SeqCst);
47 });
48 }
49 loop {
···63 if seq_id > last_seq + 1 {
64 let gap_events = state
65 .repo_repo
66- .get_events_in_seq_range(last_seq, seq_id)
00067 .await
68 .unwrap_or_default();
69 if !gap_events.is_empty() {
···72 let seq = event.seq;
73 let firehose_event = to_firehose_event(event);
74 let _ = state.firehose_tx.send(firehose_event);
75- LAST_BROADCAST_SEQ.store(seq, Ordering::SeqCst);
76 });
77 }
78 }
79 let event = state
80 .repo_repo
81- .get_event_by_seq(seq_id)
82 .await
83 .ok()
84 .flatten();
···97 warn!(seq = seq_id, error = %e, "Failed to broadcast event (no receivers?)");
98 }
99 }
100- LAST_BROADCAST_SEQ.store(seq, Ordering::SeqCst);
101 } else {
102 warn!(
103 seq = seq_id,
···2use crate::sync::firehose::SequencedEvent;
3use std::sync::atomic::{AtomicI64, Ordering};
4use tracing::{debug, error, info, warn};
5+use tranquil_db_traits::SequenceNumber;
67static LAST_BROADCAST_SEQ: AtomicI64 = AtomicI64::new(0);
89pub async fn start_sequencer_listener(state: AppState) {
10+ let initial_seq = state
11+ .repo_repo
12+ .get_max_seq()
13+ .await
14+ .unwrap_or(SequenceNumber::ZERO);
15+ LAST_BROADCAST_SEQ.store(initial_seq.as_i64(), Ordering::SeqCst);
16+ info!(
17+ initial_seq = initial_seq.as_i64(),
18+ "Initialized sequencer listener"
19+ );
20 tokio::spawn(async move {
21 info!("Starting sequencer listener background task");
22 loop {
···35 .await
36 .map_err(|e| anyhow::anyhow!("Failed to subscribe to events: {:?}", e))?;
37 info!("Connected to database and listening for repo updates");
38+ let catchup_start = SequenceNumber::from_raw(LAST_BROADCAST_SEQ.load(Ordering::SeqCst));
39 let events = state
40 .repo_repo
41 .get_events_since_seq(catchup_start, None)
···44 if !events.is_empty() {
45 info!(
46 count = events.len(),
47+ from_seq = catchup_start.as_i64(),
48 "Broadcasting catch-up events"
49 );
50 events.into_iter().for_each(|event| {
51 let seq = event.seq;
52 let firehose_event = to_firehose_event(event);
53 let _ = state.firehose_tx.send(firehose_event);
54+ LAST_BROADCAST_SEQ.store(seq.as_i64(), Ordering::SeqCst);
55 });
56 }
57 loop {
···71 if seq_id > last_seq + 1 {
72 let gap_events = state
73 .repo_repo
74+ .get_events_in_seq_range(
75+ SequenceNumber::from_raw(last_seq),
76+ SequenceNumber::from_raw(seq_id),
77+ )
78 .await
79 .unwrap_or_default();
80 if !gap_events.is_empty() {
···83 let seq = event.seq;
84 let firehose_event = to_firehose_event(event);
85 let _ = state.firehose_tx.send(firehose_event);
86+ LAST_BROADCAST_SEQ.store(seq.as_i64(), Ordering::SeqCst);
87 });
88 }
89 }
90 let event = state
91 .repo_repo
92+ .get_event_by_seq(SequenceNumber::from_raw(seq_id))
93 .await
94 .ok()
95 .flatten();
···108 warn!(seq = seq_id, error = %e, "Failed to broadcast event (no receivers?)");
109 }
110 }
111+ LAST_BROADCAST_SEQ.store(seq.as_i64(), Ordering::SeqCst);
112 } else {
113 warn!(
114 seq = seq_id,
+2-2
crates/tranquil-pds/src/sync/mod.rs
···18pub use deprecated::{get_checkout, get_head};
19pub use repo::{get_blocks, get_record, get_repo};
20pub use subscribe_repos::subscribe_repos;
021pub use util::{
22- AccountStatus, RepoAccount, RepoAvailabilityError, assert_repo_availability,
23- get_account_with_status,
24};
25pub use verify::{CarVerifier, VerifiedCar, VerifyError};
···18pub use deprecated::{get_checkout, get_head};
19pub use repo::{get_blocks, get_record, get_repo};
20pub use subscribe_repos::subscribe_repos;
21+pub use tranquil_db_traits::AccountStatus;
22pub use util::{
23+ RepoAccount, RepoAvailabilityError, assert_repo_availability, get_account_with_status,
024};
25pub use verify::{CarVerifier, VerifiedCar, VerifyError};
+12-6
crates/tranquil-pds/src/sync/subscribe_repos.rs
···13use std::sync::atomic::{AtomicUsize, Ordering};
14use tokio::sync::broadcast::error::RecvError;
15use tracing::{error, info, warn};
01617const BACKFILL_BATCH_SIZE: i64 = 1000;
18···69 params: SubscribeReposParams,
70) -> Result<(), ()> {
71 let mut rx = state.firehose_tx.subscribe();
72- let mut last_seen: i64 = -1;
7374 if let Some(cursor) = params.cursor {
75- let current_seq = state.repo_repo.get_max_seq().await.unwrap_or(0);
000007677- if cursor > current_seq {
78 if let Ok(error_bytes) =
79 format_error_frame("FutureCursor", Some("Cursor in the future."))
80 {
···8889 let first_event = state
90 .repo_repo
91- .get_events_since_cursor(cursor, 1)
92 .await
93 .ok()
94 .and_then(|events| events.into_iter().next());
9596- let mut current_cursor = cursor;
9798 if let Some(ref event) = first_event
99 && event.created_at < backfill_time
···113 .flatten();
114115 if let Some(earliest_seq) = earliest {
116- current_cursor = earliest_seq - 1;
117 }
118 }
119
···13use std::sync::atomic::{AtomicUsize, Ordering};
14use tokio::sync::broadcast::error::RecvError;
15use tracing::{error, info, warn};
16+use tranquil_db_traits::SequenceNumber;
1718const BACKFILL_BATCH_SIZE: i64 = 1000;
19···70 params: SubscribeReposParams,
71) -> Result<(), ()> {
72 let mut rx = state.firehose_tx.subscribe();
73+ let mut last_seen = SequenceNumber::UNSET;
7475 if let Some(cursor) = params.cursor {
76+ let cursor_seq = SequenceNumber::from_raw(cursor);
77+ let current_seq = state
78+ .repo_repo
79+ .get_max_seq()
80+ .await
81+ .unwrap_or(SequenceNumber::ZERO);
8283+ if cursor_seq > current_seq {
84 if let Ok(error_bytes) =
85 format_error_frame("FutureCursor", Some("Cursor in the future."))
86 {
···9495 let first_event = state
96 .repo_repo
97+ .get_events_since_cursor(cursor_seq, 1)
98 .await
99 .ok()
100 .and_then(|events| events.into_iter().next());
101102+ let mut current_cursor = cursor_seq;
103104 if let Some(ref event) = first_event
105 && event.created_at < backfill_time
···119 .flatten();
120121 if let Some(earliest_seq) = earliest {
122+ current_cursor = SequenceNumber::from_raw(earliest_seq.as_i64() - 1);
123 }
124 }
125
···99 use tranquil_pds::api::repo::record::utils::create_signed_commit;
100101 let signing_key = SigningKey::random(&mut rand::thread_rng());
102- let did = Did::new_unchecked("did:plc:testuser123456789abcdef");
103 let data_cid =
104 Cid::from_str("bafyreib2rxk3ryblouj3fxza5jvx6psmwewwessc4m6g6e7pqhhkwqomfi").unwrap();
105 let rev = Tid::now(LimitedU32::MIN).to_string();
···99 use tranquil_pds::api::repo::record::utils::create_signed_commit;
100101 let signing_key = SigningKey::random(&mut rand::thread_rng());
102+ let did = unsafe { Did::new_unchecked("did:plc:testuser123456789abcdef") };
103 let data_cid =
104 Cid::from_str("bafyreib2rxk3ryblouj3fxza5jvx6psmwewwessc4m6g6e7pqhhkwqomfi").unwrap();
105 let rev = Tid::now(LimitedU32::MIN).to_string();