+16
.sqlx/query-7c914c71e0340325e99495a1867fea9c814b056bd978c67a0eab55ed61278197.json
+16
.sqlx/query-7c914c71e0340325e99495a1867fea9c814b056bd978c67a0eab55ed61278197.json
···
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "\n INSERT INTO record_blobs (repo_id, record_uri, blob_cid)\n SELECT $1, record_uri, blob_cid\n FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid)\n ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING\n ",
4
+
"describe": {
5
+
"columns": [],
6
+
"parameters": {
7
+
"Left": [
8
+
"Uuid",
9
+
"TextArray",
10
+
"TextArray"
11
+
]
12
+
},
13
+
"nullable": []
14
+
},
15
+
"hash": "7c914c71e0340325e99495a1867fea9c814b056bd978c67a0eab55ed61278197"
16
+
}
+19
.sqlx/query-a97815493ba7b9b20f6759e2e96a9000473ec5e85d865325500d2e193d5dcf8c.json
+19
.sqlx/query-a97815493ba7b9b20f6759e2e96a9000473ec5e85d865325500d2e193d5dcf8c.json
···
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "\n INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes)\n VALUES ($1, $2, $3, $4, $5, $6)\n ",
4
+
"describe": {
5
+
"columns": [],
6
+
"parameters": {
7
+
"Left": [
8
+
"Uuid",
9
+
"Text",
10
+
"Text",
11
+
"Text",
12
+
"Int4",
13
+
"Int8"
14
+
]
15
+
},
16
+
"nullable": []
17
+
},
18
+
"hash": "a97815493ba7b9b20f6759e2e96a9000473ec5e85d865325500d2e193d5dcf8c"
19
+
}
+55
-23
crates/tranquil-pds/src/api/actor/preferences.rs
+55
-23
crates/tranquil-pds/src/api/actor/preferences.rs
···
127
))
128
.into_response();
129
}
130
-
let mut forbidden_prefs: Vec<String> = Vec::new();
131
-
for pref in &input.preferences {
132
-
let pref_str = serde_json::to_string(pref).unwrap_or_default();
133
-
if pref_str.len() > MAX_PREFERENCE_SIZE {
134
-
return ApiError::InvalidRequest(format!(
135
"Preference too large: {} bytes exceeds limit of {}",
136
-
pref_str.len(),
137
-
MAX_PREFERENCE_SIZE
138
))
139
-
.into_response();
140
-
}
141
-
let pref_type = match pref.get("$type").and_then(|t| t.as_str()) {
142
-
Some(t) => t,
143
-
None => {
144
-
return ApiError::InvalidRequest("Preference is missing a $type".into())
145
-
.into_response();
146
-
}
147
-
};
148
-
if !pref_type.starts_with(APP_BSKY_NAMESPACE) {
149
-
return ApiError::InvalidRequest(format!(
150
"Some preferences are not in the {} namespace",
151
APP_BSKY_NAMESPACE
152
))
153
-
.into_response();
154
-
}
155
-
if pref_type == PERSONAL_DETAILS_PREF && !has_full_access {
156
-
forbidden_prefs.push(pref_type.to_string());
157
-
}
158
}
159
if !forbidden_prefs.is_empty() {
160
return ApiError::InvalidRequest(format!(
161
"Do not have authorization to set preferences: {}",
···
127
))
128
.into_response();
129
}
130
+
enum PrefValidation {
131
+
Ok(Option<String>),
132
+
TooLarge(usize),
133
+
MissingType,
134
+
WrongNamespace,
135
+
}
136
+
137
+
let validation_results: Vec<PrefValidation> = input
138
+
.preferences
139
+
.iter()
140
+
.map(|pref| {
141
+
let pref_str = serde_json::to_string(pref).unwrap_or_default();
142
+
if pref_str.len() > MAX_PREFERENCE_SIZE {
143
+
return PrefValidation::TooLarge(pref_str.len());
144
+
}
145
+
let pref_type = match pref.get("$type").and_then(|t| t.as_str()) {
146
+
Some(t) => t,
147
+
None => return PrefValidation::MissingType,
148
+
};
149
+
if !pref_type.starts_with(APP_BSKY_NAMESPACE) {
150
+
return PrefValidation::WrongNamespace;
151
+
}
152
+
if pref_type == PERSONAL_DETAILS_PREF && !has_full_access {
153
+
PrefValidation::Ok(Some(pref_type.to_string()))
154
+
} else {
155
+
PrefValidation::Ok(None)
156
+
}
157
+
})
158
+
.collect();
159
+
160
+
if let Some(err) = validation_results.iter().find_map(|v| match v {
161
+
PrefValidation::TooLarge(size) => Some(
162
+
ApiError::InvalidRequest(format!(
163
"Preference too large: {} bytes exceeds limit of {}",
164
+
size, MAX_PREFERENCE_SIZE
165
))
166
+
.into_response(),
167
+
),
168
+
PrefValidation::MissingType => Some(
169
+
ApiError::InvalidRequest("Preference is missing a $type".into()).into_response(),
170
+
),
171
+
PrefValidation::WrongNamespace => Some(
172
+
ApiError::InvalidRequest(format!(
173
"Some preferences are not in the {} namespace",
174
APP_BSKY_NAMESPACE
175
))
176
+
.into_response(),
177
+
),
178
+
PrefValidation::Ok(_) => None,
179
+
}) {
180
+
return err;
181
}
182
+
183
+
let forbidden_prefs: Vec<String> = validation_results
184
+
.into_iter()
185
+
.filter_map(|v| match v {
186
+
PrefValidation::Ok(Some(s)) => Some(s),
187
+
_ => None,
188
+
})
189
+
.collect();
190
+
191
if !forbidden_prefs.is_empty() {
192
return ApiError::InvalidRequest(format!(
193
"Do not have authorization to set preferences: {}",
+47
-42
crates/tranquil-pds/src/api/repo/record/batch.rs
+47
-42
crates/tranquil-pds/src/api/repo/record/batch.rs
···
343
})
344
.collect();
345
346
-
for collection in create_collections {
347
-
if let Err(e) = crate::auth::scope_check::check_repo_scope(
348
-
is_oauth,
349
-
scope.as_deref(),
350
-
crate::oauth::RepoAction::Create,
351
-
collection,
352
-
) {
353
-
return e;
354
-
}
355
-
}
356
-
for collection in update_collections {
357
-
if let Err(e) = crate::auth::scope_check::check_repo_scope(
358
-
is_oauth,
359
-
scope.as_deref(),
360
-
crate::oauth::RepoAction::Update,
361
-
collection,
362
-
) {
363
-
return e;
364
-
}
365
-
}
366
-
for collection in delete_collections {
367
-
if let Err(e) = crate::auth::scope_check::check_repo_scope(
368
-
is_oauth,
369
-
scope.as_deref(),
370
-
crate::oauth::RepoAction::Delete,
371
-
collection,
372
-
) {
373
-
return e;
374
-
}
375
}
376
}
377
···
439
return ApiError::InternalError(Some("Failed to persist MST".into())).into_response();
440
}
441
};
442
-
let mut new_mst_blocks = std::collections::BTreeMap::new();
443
-
let mut old_mst_blocks = std::collections::BTreeMap::new();
444
-
for key in &modified_keys {
445
-
if mst.blocks_for_path(key, &mut new_mst_blocks).await.is_err() {
446
-
return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
447
.into_response();
448
-
}
449
-
if original_mst
450
-
.blocks_for_path(key, &mut old_mst_blocks)
451
-
.await
452
-
.is_err()
453
-
{
454
-
return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
455
.into_response();
456
}
457
-
}
458
let mut relevant_blocks = new_mst_blocks.clone();
459
relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
460
let written_cids: Vec<Cid> = tracking_store
···
343
})
344
.collect();
345
346
+
let scope_checks = create_collections
347
+
.iter()
348
+
.map(|c| (crate::oauth::RepoAction::Create, c))
349
+
.chain(
350
+
update_collections
351
+
.iter()
352
+
.map(|c| (crate::oauth::RepoAction::Update, c)),
353
+
)
354
+
.chain(
355
+
delete_collections
356
+
.iter()
357
+
.map(|c| (crate::oauth::RepoAction::Delete, c)),
358
+
);
359
+
360
+
if let Some(err) = scope_checks
361
+
.filter_map(|(action, collection)| {
362
+
crate::auth::scope_check::check_repo_scope(
363
+
is_oauth,
364
+
scope.as_deref(),
365
+
action,
366
+
collection,
367
+
)
368
+
.err()
369
+
})
370
+
.next()
371
+
{
372
+
return err;
373
}
374
}
375
···
437
return ApiError::InternalError(Some("Failed to persist MST".into())).into_response();
438
}
439
};
440
+
let (new_mst_blocks, old_mst_blocks) = {
441
+
let mut new_blocks = std::collections::BTreeMap::new();
442
+
let mut old_blocks = std::collections::BTreeMap::new();
443
+
for key in &modified_keys {
444
+
if mst.blocks_for_path(key, &mut new_blocks).await.is_err() {
445
+
return ApiError::InternalError(Some(
446
+
"Failed to get new MST blocks for path".into(),
447
+
))
448
.into_response();
449
+
}
450
+
if original_mst
451
+
.blocks_for_path(key, &mut old_blocks)
452
+
.await
453
+
.is_err()
454
+
{
455
+
return ApiError::InternalError(Some(
456
+
"Failed to get old MST blocks for path".into(),
457
+
))
458
.into_response();
459
+
}
460
}
461
+
(new_blocks, old_blocks)
462
+
};
463
let mut relevant_blocks = new_mst_blocks.clone();
464
relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
465
let written_cids: Vec<Cid> = tracking_store
+4
-6
crates/tranquil-pds/src/api/repo/record/utils.rs
+4
-6
crates/tranquil-pds/src/api/repo/record/utils.rs
+18
-17
crates/tranquil-pds/src/appview/mod.rs
+18
-17
crates/tranquil-pds/src/appview/mod.rs
···
231
}
232
233
fn extract_service_endpoint(&self, doc: &DidDocument) -> Option<ResolvedService> {
234
-
for service in &doc.service {
235
-
if service.service_type == "AtprotoAppView"
236
-
|| service.id.contains("atproto_appview")
237
-
|| service.id.ends_with("#bsky_appview")
238
-
{
239
-
return Some(ResolvedService {
240
-
url: service.service_endpoint.clone(),
241
-
did: doc.id.clone(),
242
-
});
243
-
}
244
}
245
246
-
for service in &doc.service {
247
-
if service.service_type.contains("AppView") || service.id.contains("appview") {
248
-
return Some(ResolvedService {
249
-
url: service.service_endpoint.clone(),
250
-
did: doc.id.clone(),
251
-
});
252
-
}
253
}
254
255
if let Some(service) = doc.service.first()
···
231
}
232
233
fn extract_service_endpoint(&self, doc: &DidDocument) -> Option<ResolvedService> {
234
+
if let Some(service) = doc.service.iter().find(|s| {
235
+
s.service_type == "AtprotoAppView"
236
+
|| s.id.contains("atproto_appview")
237
+
|| s.id.ends_with("#bsky_appview")
238
+
}) {
239
+
return Some(ResolvedService {
240
+
url: service.service_endpoint.clone(),
241
+
did: doc.id.clone(),
242
+
});
243
}
244
245
+
if let Some(service) = doc
246
+
.service
247
+
.iter()
248
+
.find(|s| s.service_type.contains("AppView") || s.id.contains("appview"))
249
+
{
250
+
return Some(ResolvedService {
251
+
url: service.service_endpoint.clone(),
252
+
did: doc.id.clone(),
253
+
});
254
}
255
256
if let Some(service) = doc.service.first()
+2
-6
crates/tranquil-pds/src/circuit_breaker.rs
+2
-6
crates/tranquil-pds/src/circuit_breaker.rs
···
265
async fn test_circuit_breaker_half_open_closes_after_successes() {
266
let cb = CircuitBreaker::new("test", 3, 2, 0);
267
268
-
for _ in 0..3 {
269
-
cb.record_failure().await;
270
-
}
271
assert_eq!(cb.state().await, CircuitState::Open);
272
273
tokio::time::sleep(Duration::from_millis(100)).await;
···
285
async fn test_circuit_breaker_half_open_reopens_on_failure() {
286
let cb = CircuitBreaker::new("test", 3, 2, 0);
287
288
-
for _ in 0..3 {
289
-
cb.record_failure().await;
290
-
}
291
292
tokio::time::sleep(Duration::from_millis(100)).await;
293
cb.can_execute().await;
···
265
async fn test_circuit_breaker_half_open_closes_after_successes() {
266
let cb = CircuitBreaker::new("test", 3, 2, 0);
267
268
+
futures::future::join_all((0..3).map(|_| cb.record_failure())).await;
269
assert_eq!(cb.state().await, CircuitState::Open);
270
271
tokio::time::sleep(Duration::from_millis(100)).await;
···
283
async fn test_circuit_breaker_half_open_reopens_on_failure() {
284
let cb = CircuitBreaker::new("test", 3, 2, 0);
285
286
+
futures::future::join_all((0..3).map(|_| cb.record_failure())).await;
287
288
tokio::time::sleep(Duration::from_millis(100)).await;
289
cb.can_execute().await;
+1
-3
crates/tranquil-pds/src/comms/service.rs
+1
-3
crates/tranquil-pds/src/comms/service.rs
+2
-2
crates/tranquil-pds/src/crawlers.rs
+2
-2
crates/tranquil-pds/src/crawlers.rs
+24
-29
crates/tranquil-pds/src/delegation/scopes.rs
+24
-29
crates/tranquil-pds/src/delegation/scopes.rs
···
57
return granted_set.into_iter().collect::<Vec<_>>().join(" ");
58
}
59
60
-
let mut result: Vec<&str> = Vec::new();
61
-
62
-
for requested_scope in &requested_set {
63
-
if granted_set.contains(requested_scope) {
64
-
result.push(requested_scope);
65
-
continue;
66
-
}
67
-
68
-
if let Some(match_result) = find_matching_scope(requested_scope, &granted_set) {
69
-
result.push(match_result);
70
-
}
71
-
}
72
73
result.sort();
74
result.join(" ")
···
118
return Ok(());
119
}
120
121
-
for scope in scopes.split_whitespace() {
122
-
let (base, _) = split_scope(scope);
123
-
124
-
if !is_valid_scope_prefix(base) {
125
-
return Err(format!("Invalid scope: {}", scope));
126
-
}
127
-
}
128
-
129
-
Ok(())
130
}
131
132
fn is_valid_scope_prefix(base: &str) -> bool {
133
-
let valid_prefixes = [
134
"atproto",
135
"repo:",
136
"blob:",
···
140
"transition:",
141
];
142
143
-
for prefix in valid_prefixes {
144
-
if base == prefix.trim_end_matches(':') || base.starts_with(prefix) {
145
-
return true;
146
-
}
147
-
}
148
-
149
-
false
150
}
151
152
#[cfg(test)]
···
57
return granted_set.into_iter().collect::<Vec<_>>().join(" ");
58
}
59
60
+
let mut result: Vec<&str> = requested_set
61
+
.iter()
62
+
.filter_map(|requested_scope| {
63
+
if granted_set.contains(requested_scope) {
64
+
Some(*requested_scope)
65
+
} else {
66
+
find_matching_scope(requested_scope, &granted_set)
67
+
}
68
+
})
69
+
.collect();
70
71
result.sort();
72
result.join(" ")
···
116
return Ok(());
117
}
118
119
+
scopes
120
+
.split_whitespace()
121
+
.try_for_each(|scope| {
122
+
let (base, _) = split_scope(scope);
123
+
if is_valid_scope_prefix(base) {
124
+
Ok(())
125
+
} else {
126
+
Err(format!("Invalid scope: {}", scope))
127
+
}
128
+
})
129
}
130
131
fn is_valid_scope_prefix(base: &str) -> bool {
132
+
const VALID_PREFIXES: [&str; 7] = [
133
"atproto",
134
"repo:",
135
"blob:",
···
139
"transition:",
140
];
141
142
+
VALID_PREFIXES
143
+
.iter()
144
+
.any(|prefix| base == prefix.trim_end_matches(':') || base.starts_with(prefix))
145
}
146
147
#[cfg(test)]
+12
-19
crates/tranquil-pds/src/handle/mod.rs
+12
-19
crates/tranquil-pds/src/handle/mod.rs
···
25
.txt_lookup(&query_name)
26
.await
27
.map_err(|e| HandleResolutionError::DnsError(e.to_string()))?;
28
-
for record in txt_lookup.iter() {
29
-
for txt in record.txt_data() {
30
let txt_str = String::from_utf8_lossy(txt);
31
-
if let Some(did) = txt_str.strip_prefix("did=") {
32
let did = did.trim();
33
-
if did.starts_with("did:") {
34
-
return Ok(did.to_string());
35
-
}
36
-
}
37
-
}
38
-
}
39
-
Err(HandleResolutionError::NotFound)
40
}
41
42
pub async fn resolve_handle_http(handle: &str) -> Result<String, HandleResolutionError> {
···
95
let service_domains: Vec<String> = std::env::var("PDS_SERVICE_HANDLE_DOMAINS")
96
.map(|s| s.split(',').map(|d| d.trim().to_string()).collect())
97
.unwrap_or_else(|_| vec![hostname.to_string()]);
98
-
for domain in service_domains {
99
-
if handle.ends_with(&format!(".{}", domain)) {
100
-
return true;
101
-
}
102
-
if handle == domain {
103
-
return true;
104
-
}
105
-
}
106
-
false
107
}
108
109
#[cfg(test)]
···
25
.txt_lookup(&query_name)
26
.await
27
.map_err(|e| HandleResolutionError::DnsError(e.to_string()))?;
28
+
txt_lookup
29
+
.iter()
30
+
.flat_map(|record| record.txt_data())
31
+
.find_map(|txt| {
32
let txt_str = String::from_utf8_lossy(txt);
33
+
txt_str.strip_prefix("did=").and_then(|did| {
34
let did = did.trim();
35
+
did.starts_with("did:").then(|| did.to_string())
36
+
})
37
+
})
38
+
.ok_or(HandleResolutionError::NotFound)
39
}
40
41
pub async fn resolve_handle_http(handle: &str) -> Result<String, HandleResolutionError> {
···
94
let service_domains: Vec<String> = std::env::var("PDS_SERVICE_HANDLE_DOMAINS")
95
.map(|s| s.split(',').map(|d| d.trim().to_string()).collect())
96
.unwrap_or_else(|_| vec![hostname.to_string()]);
97
+
service_domains
98
+
.iter()
99
+
.any(|domain| handle.ends_with(&format!(".{}", domain)) || handle == domain)
100
}
101
102
#[cfg(test)]
+6
-13
crates/tranquil-pds/src/handle/reserved.rs
+6
-13
crates/tranquil-pds/src/handle/reserved.rs
···
1029
];
1030
1031
pub static RESERVED_SUBDOMAINS: LazyLock<HashSet<&'static str>> = LazyLock::new(|| {
1032
-
let mut set = HashSet::with_capacity(
1033
-
ATP_SPECIFIC.len() + COMMONLY_RESERVED.len() + FAMOUS_ACCOUNTS.len(),
1034
-
);
1035
-
for s in ATP_SPECIFIC {
1036
-
set.insert(*s);
1037
-
}
1038
-
for s in COMMONLY_RESERVED {
1039
-
set.insert(*s);
1040
-
}
1041
-
for s in FAMOUS_ACCOUNTS {
1042
-
set.insert(*s);
1043
-
}
1044
-
set
1045
});
1046
1047
pub fn is_reserved_subdomain(subdomain: &str) -> bool {
···
1029
];
1030
1031
pub static RESERVED_SUBDOMAINS: LazyLock<HashSet<&'static str>> = LazyLock::new(|| {
1032
+
ATP_SPECIFIC
1033
+
.iter()
1034
+
.chain(COMMONLY_RESERVED.iter())
1035
+
.chain(FAMOUS_ACCOUNTS.iter())
1036
+
.copied()
1037
+
.collect()
1038
});
1039
1040
pub fn is_reserved_subdomain(subdomain: &str) -> bool {
+6
-6
crates/tranquil-pds/src/plc/mod.rs
+6
-6
crates/tranquil-pds/src/plc/mod.rs
···
526
}
527
let cbor_bytes = serde_ipld_dagcbor::to_vec(&unsigned_op)
528
.map_err(|e| PlcError::Serialization(e.to_string()))?;
529
-
for key_did in rotation_keys {
530
-
if let Ok(true) = verify_signature_with_did_key(key_did, &cbor_bytes, &signature) {
531
-
return Ok(true);
532
-
}
533
-
}
534
-
Ok(false)
535
}
536
537
fn verify_signature_with_did_key(
···
526
}
527
let cbor_bytes = serde_ipld_dagcbor::to_vec(&unsigned_op)
528
.map_err(|e| PlcError::Serialization(e.to_string()))?;
529
+
let verified = rotation_keys
530
+
.iter()
531
+
.any(|key_did| {
532
+
verify_signature_with_did_key(key_did, &cbor_bytes, &signature).unwrap_or(false)
533
+
});
534
+
Ok(verified)
535
}
536
537
fn verify_signature_with_did_key(
+443
-344
crates/tranquil-pds/src/scheduled.rs
+443
-344
crates/tranquil-pds/src/scheduled.rs
···
14
use crate::storage::{BackupStorage, BlobStorage};
15
use crate::sync::car::encode_car_header;
16
17
pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) {
18
let broken_genesis_commits = match sqlx::query!(
19
r#"
···
44
"Backfilling blocks_cids for genesis commits"
45
);
46
47
-
let mut success = 0;
48
-
let mut failed = 0;
49
-
50
-
for commit_row in broken_genesis_commits {
51
-
let commit_cid_str = match &commit_row.commit_cid {
52
-
Some(c) => c.clone(),
53
-
None => {
54
-
warn!(seq = commit_row.seq, "Genesis commit missing commit_cid");
55
-
failed += 1;
56
-
continue;
57
-
}
58
-
};
59
-
60
-
let commit_cid = match Cid::from_str(&commit_cid_str) {
61
-
Ok(c) => c,
62
-
Err(_) => {
63
-
warn!(seq = commit_row.seq, "Invalid commit CID");
64
-
failed += 1;
65
-
continue;
66
-
}
67
-
};
68
-
69
-
let block = match block_store.get(&commit_cid).await {
70
-
Ok(Some(b)) => b,
71
-
Ok(None) => {
72
-
warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store");
73
-
failed += 1;
74
-
continue;
75
-
}
76
-
Err(e) => {
77
-
warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block");
78
-
failed += 1;
79
-
continue;
80
-
}
81
-
};
82
-
83
-
let commit = match Commit::from_cbor(&block) {
84
-
Ok(c) => c,
85
-
Err(e) => {
86
-
warn!(seq = commit_row.seq, error = %e, "Failed to parse commit");
87
-
failed += 1;
88
-
continue;
89
-
}
90
-
};
91
-
92
-
let mst_root_cid = commit.data;
93
-
let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
94
-
95
-
if let Err(e) = sqlx::query!(
96
-
"UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2",
97
-
&blocks_cids,
98
-
commit_row.seq
99
)
100
-
.execute(db)
101
-
.await
102
-
{
103
-
warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids");
104
-
failed += 1;
105
-
} else {
106
-
info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids");
107
-
success += 1;
108
}
109
-
}
110
111
info!(
112
success,
113
failed, "Completed genesis commit blocks_cids backfill"
114
);
115
}
116
117
pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) {
···
137
"Backfilling repo_rev for existing repos"
138
);
139
140
-
let mut success = 0;
141
-
let mut failed = 0;
142
143
-
for repo in repos_missing_rev {
144
-
let cid = match Cid::from_str(&repo.repo_root_cid) {
145
-
Ok(c) => c,
146
-
Err(_) => {
147
-
failed += 1;
148
-
continue;
149
}
150
-
};
151
152
-
let block = match block_store.get(&cid).await {
153
-
Ok(Some(b)) => b,
154
-
_ => {
155
-
failed += 1;
156
-
continue;
157
-
}
158
-
};
159
160
-
let commit = match Commit::from_cbor(&block) {
161
-
Ok(c) => c,
162
-
Err(_) => {
163
-
failed += 1;
164
-
continue;
165
-
}
166
-
};
167
-
168
-
let rev = commit.rev().to_string();
169
-
170
-
if let Err(e) = sqlx::query!(
171
-
"UPDATE repos SET repo_rev = $1 WHERE user_id = $2",
172
-
rev,
173
-
repo.user_id
174
-
)
175
-
.execute(db)
176
.await
177
-
{
178
-
warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev");
179
-
failed += 1;
180
-
} else {
181
-
success += 1;
182
-
}
183
}
184
-
185
-
info!(success, failed, "Completed repo_rev backfill");
186
}
187
188
pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) {
···
214
"Backfilling user_blocks for existing repos"
215
);
216
217
-
let mut success = 0;
218
-
let mut failed = 0;
219
220
-
for user in users_without_blocks {
221
-
let root_cid = match Cid::from_str(&user.repo_root_cid) {
222
-
Ok(c) => c,
223
-
Err(_) => {
224
-
failed += 1;
225
-
continue;
226
-
}
227
-
};
228
-
229
-
match collect_current_repo_blocks(&block_store, &root_cid).await {
230
-
Ok(block_cids) => {
231
-
if block_cids.is_empty() {
232
-
failed += 1;
233
-
continue;
234
-
}
235
-
236
-
if let Err(e) = sqlx::query!(
237
-
r#"
238
-
INSERT INTO user_blocks (user_id, block_cid)
239
-
SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
240
-
ON CONFLICT (user_id, block_cid) DO NOTHING
241
-
"#,
242
-
user.user_id,
243
-
&block_cids
244
-
)
245
-
.execute(db)
246
-
.await
247
-
{
248
-
warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks");
249
-
failed += 1;
250
-
} else {
251
-
info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks");
252
-
success += 1;
253
-
}
254
-
}
255
-
Err(e) => {
256
-
warn!(user_id = %user.user_id, error = %e, "Failed to collect repo blocks for backfill");
257
-
failed += 1;
258
-
}
259
}
260
-
}
261
262
info!(success, failed, "Completed user_blocks backfill");
263
}
···
314
Ok(block_cids)
315
}
316
317
pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) {
318
let users_needing_backfill = match sqlx::query!(
319
r#"
···
344
"Backfilling record_blobs for existing repos"
345
);
346
347
-
let mut success = 0;
348
-
let mut failed = 0;
349
350
-
for user in users_needing_backfill {
351
-
let records = match sqlx::query!(
352
-
"SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1",
353
-
user.user_id
354
-
)
355
-
.fetch_all(db)
356
-
.await
357
-
{
358
-
Ok(r) => r,
359
-
Err(e) => {
360
-
warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill");
361
-
failed += 1;
362
-
continue;
363
}
364
-
};
365
-
366
-
let mut batch_record_uris: Vec<String> = Vec::new();
367
-
let mut batch_blob_cids: Vec<String> = Vec::new();
368
-
369
-
for record in records {
370
-
let record_cid = match Cid::from_str(&record.record_cid) {
371
-
Ok(c) => c,
372
-
Err(_) => continue,
373
-
};
374
-
375
-
let block_bytes = match block_store.get(&record_cid).await {
376
-
Ok(Some(b)) => b,
377
-
_ => continue,
378
-
};
379
-
380
-
let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) {
381
-
Ok(v) => v,
382
-
Err(_) => continue,
383
-
};
384
-
385
-
let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0);
386
-
for blob_ref in blob_refs {
387
-
let record_uri = format!("at://{}/{}/{}", user.did, record.collection, record.rkey);
388
-
batch_record_uris.push(record_uri);
389
-
batch_blob_cids.push(blob_ref.cid);
390
-
}
391
}
392
-
393
-
let blob_refs_found = batch_record_uris.len();
394
-
if !batch_record_uris.is_empty() {
395
-
if let Err(e) = sqlx::query!(
396
-
r#"
397
-
INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
398
-
SELECT $1, record_uri, blob_cid
399
-
FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid)
400
-
ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
401
-
"#,
402
-
user.user_id,
403
-
&batch_record_uris,
404
-
&batch_blob_cids
405
-
)
406
-
.execute(db)
407
-
.await
408
-
{
409
-
warn!(error = %e, "Failed to batch insert record_blobs during backfill");
410
-
} else {
411
-
info!(
412
-
user_id = %user.user_id,
413
-
did = %user.did,
414
-
blob_refs = blob_refs_found,
415
-
"Backfilled record_blobs"
416
-
);
417
-
}
418
}
419
-
success += 1;
420
-
}
421
422
info!(success, failed, "Completed record_blobs backfill");
423
}
···
487
"Processing scheduled account deletions"
488
);
489
490
-
for account in accounts_to_delete {
491
-
if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await {
492
-
warn!(
493
-
did = %account.did,
494
-
handle = %account.handle,
495
-
error = %e,
496
-
"Failed to delete scheduled account"
497
-
);
498
-
} else {
499
-
info!(
500
-
did = %account.did,
501
-
handle = %account.handle,
502
-
"Successfully deleted scheduled account"
503
-
);
504
-
}
505
-
}
506
507
Ok(())
508
}
···
526
.await
527
.map_err(|e| format!("DB error fetching blob keys: {}", e))?;
528
529
-
for storage_key in &blob_storage_keys {
530
-
if let Err(e) = blob_store.delete(storage_key).await {
531
-
warn!(
532
-
storage_key = %storage_key,
533
-
error = %e,
534
-
"Failed to delete blob from storage (continuing anyway)"
535
-
);
536
-
}
537
-
}
538
539
let mut tx = db
540
.begin()
···
624
}
625
}
626
627
async fn process_scheduled_backups(
628
db: &PgPool,
629
block_store: &PostgresBlockStore,
···
665
"Processing scheduled backups"
666
);
667
668
-
for user in users_needing_backup {
669
-
let repo_root_cid = user.repo_root_cid.clone();
670
-
671
-
let repo_rev = match &user.repo_rev {
672
-
Some(rev) => rev.clone(),
673
-
None => {
674
-
warn!(did = %user.did, "User has no repo_rev, skipping backup");
675
-
continue;
676
-
}
677
-
};
678
-
679
-
let head_cid = match Cid::from_str(&repo_root_cid) {
680
-
Ok(c) => c,
681
-
Err(e) => {
682
-
warn!(did = %user.did, error = %e, "Invalid repo_root_cid, skipping backup");
683
-
continue;
684
-
}
685
-
};
686
-
687
-
let car_result = generate_full_backup(db, block_store, user.user_id, &head_cid).await;
688
-
let car_bytes = match car_result {
689
-
Ok(bytes) => bytes,
690
-
Err(e) => {
691
-
warn!(did = %user.did, error = %e, "Failed to generate CAR for backup");
692
-
continue;
693
-
}
694
-
};
695
-
696
-
let block_count = count_car_blocks(&car_bytes);
697
-
let size_bytes = car_bytes.len() as i64;
698
-
699
-
let storage_key = match backup_storage
700
-
.put_backup(&user.did, &repo_rev, &car_bytes)
701
-
.await
702
-
{
703
-
Ok(key) => key,
704
-
Err(e) => {
705
-
warn!(did = %user.did, error = %e, "Failed to upload backup to storage");
706
-
continue;
707
-
}
708
-
};
709
-
710
-
if let Err(e) = sqlx::query!(
711
-
r#"
712
-
INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes)
713
-
VALUES ($1, $2, $3, $4, $5, $6)
714
-
"#,
715
user.user_id,
716
-
storage_key,
717
-
repo_root_cid,
718
-
repo_rev,
719
-
block_count,
720
-
size_bytes
721
)
722
-
.execute(db)
723
-
.await
724
-
{
725
-
warn!(did = %user.did, error = %e, "Failed to insert backup record, rolling back S3 upload");
726
-
if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await {
727
-
error!(
728
-
did = %user.did,
729
-
storage_key = %storage_key,
730
-
error = %rollback_err,
731
-
"Failed to rollback orphaned backup from S3"
732
);
733
}
734
-
continue;
735
}
736
-
737
-
info!(
738
-
did = %user.did,
739
-
rev = %repo_rev,
740
-
size_bytes,
741
-
block_count,
742
-
"Created backup"
743
-
);
744
-
745
-
if let Err(e) = cleanup_old_backups(db, backup_storage, user.user_id, retention_count).await
746
-
{
747
-
warn!(did = %user.did, error = %e, "Failed to cleanup old backups");
748
-
}
749
-
}
750
751
Ok(())
752
}
···
877
user_id: uuid::Uuid,
878
retention_count: u32,
879
) -> Result<(), String> {
880
-
let old_backups = sqlx::query!(
881
-
r#"
882
-
SELECT id, storage_key
883
-
FROM account_backups
884
-
WHERE user_id = $1
885
-
ORDER BY created_at DESC
886
-
OFFSET $2
887
-
"#,
888
-
user_id,
889
-
retention_count as i64
890
-
)
891
-
.fetch_all(db)
892
-
.await
893
-
.map_err(|e| format!("DB error fetching old backups: {}", e))?;
894
895
-
for backup in old_backups {
896
-
if let Err(e) = backup_storage.delete_backup(&backup.storage_key).await {
897
-
warn!(
898
-
storage_key = %backup.storage_key,
899
-
error = %e,
900
-
"Failed to delete old backup from storage, skipping DB cleanup to avoid orphan"
901
-
);
902
-
continue;
903
}
904
905
-
sqlx::query!("DELETE FROM account_backups WHERE id = $1", backup.id)
906
-
.execute(db)
907
-
.await
908
-
.map_err(|e| format!("Failed to delete old backup record: {}", e))?;
909
-
}
910
-
911
-
Ok(())
912
}
···
14
use crate::storage::{BackupStorage, BlobStorage};
15
use crate::sync::car::encode_car_header;
16
17
+
async fn update_genesis_blocks_cids(db: &PgPool, blocks_cids: &[String], seq: i64) -> Result<(), sqlx::Error> {
18
+
sqlx::query!(
19
+
"UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2",
20
+
blocks_cids,
21
+
seq
22
+
)
23
+
.execute(db)
24
+
.await?;
25
+
Ok(())
26
+
}
27
+
28
+
async fn update_repo_rev(db: &PgPool, rev: &str, user_id: uuid::Uuid) -> Result<(), sqlx::Error> {
29
+
sqlx::query!(
30
+
"UPDATE repos SET repo_rev = $1 WHERE user_id = $2",
31
+
rev,
32
+
user_id
33
+
)
34
+
.execute(db)
35
+
.await?;
36
+
Ok(())
37
+
}
38
+
39
+
async fn insert_user_blocks(db: &PgPool, user_id: uuid::Uuid, block_cids: &[Vec<u8>]) -> Result<(), sqlx::Error> {
40
+
sqlx::query!(
41
+
r#"
42
+
INSERT INTO user_blocks (user_id, block_cid)
43
+
SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
44
+
ON CONFLICT (user_id, block_cid) DO NOTHING
45
+
"#,
46
+
user_id,
47
+
block_cids
48
+
)
49
+
.execute(db)
50
+
.await?;
51
+
Ok(())
52
+
}
53
+
54
+
async fn fetch_user_records(db: &PgPool, user_id: uuid::Uuid) -> Result<Vec<(String, String, String)>, sqlx::Error> {
55
+
let rows = sqlx::query!(
56
+
"SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1",
57
+
user_id
58
+
)
59
+
.fetch_all(db)
60
+
.await?;
61
+
Ok(rows.into_iter().map(|r| (r.collection, r.rkey, r.record_cid)).collect())
62
+
}
63
+
64
+
async fn insert_record_blobs(db: &PgPool, user_id: uuid::Uuid, record_uris: &[String], blob_cids: &[String]) -> Result<(), sqlx::Error> {
65
+
sqlx::query!(
66
+
r#"
67
+
INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
68
+
SELECT $1, record_uri, blob_cid
69
+
FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid)
70
+
ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
71
+
"#,
72
+
user_id,
73
+
record_uris,
74
+
blob_cids
75
+
)
76
+
.execute(db)
77
+
.await?;
78
+
Ok(())
79
+
}
80
+
81
+
async fn delete_backup_record(db: &PgPool, id: uuid::Uuid) -> Result<(), sqlx::Error> {
82
+
sqlx::query!("DELETE FROM account_backups WHERE id = $1", id)
83
+
.execute(db)
84
+
.await?;
85
+
Ok(())
86
+
}
87
+
88
+
async fn fetch_old_backups(
89
+
db: &PgPool,
90
+
user_id: uuid::Uuid,
91
+
retention_count: i64,
92
+
) -> Result<Vec<(uuid::Uuid, String)>, sqlx::Error> {
93
+
let rows = sqlx::query!(
94
+
r#"
95
+
SELECT id, storage_key
96
+
FROM account_backups
97
+
WHERE user_id = $1
98
+
ORDER BY created_at DESC
99
+
OFFSET $2
100
+
"#,
101
+
user_id,
102
+
retention_count
103
+
)
104
+
.fetch_all(db)
105
+
.await?;
106
+
Ok(rows.into_iter().map(|r| (r.id, r.storage_key)).collect())
107
+
}
108
+
109
+
async fn insert_backup_record(
110
+
db: &PgPool,
111
+
user_id: uuid::Uuid,
112
+
storage_key: &str,
113
+
repo_root_cid: &str,
114
+
repo_rev: &str,
115
+
block_count: i32,
116
+
size_bytes: i64,
117
+
) -> Result<(), sqlx::Error> {
118
+
sqlx::query!(
119
+
r#"
120
+
INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes)
121
+
VALUES ($1, $2, $3, $4, $5, $6)
122
+
"#,
123
+
user_id,
124
+
storage_key,
125
+
repo_root_cid,
126
+
repo_rev,
127
+
block_count,
128
+
size_bytes
129
+
)
130
+
.execute(db)
131
+
.await?;
132
+
Ok(())
133
+
}
134
+
135
+
struct GenesisCommitRow {
136
+
seq: i64,
137
+
did: String,
138
+
commit_cid: Option<String>,
139
+
}
140
+
141
+
async fn process_genesis_commit(
142
+
db: &PgPool,
143
+
block_store: &PostgresBlockStore,
144
+
row: GenesisCommitRow,
145
+
) -> Result<(String, i64), (i64, &'static str)> {
146
+
let commit_cid_str = row.commit_cid.ok_or((row.seq, "missing commit_cid"))?;
147
+
let commit_cid = Cid::from_str(&commit_cid_str).map_err(|_| (row.seq, "invalid CID"))?;
148
+
let block = block_store
149
+
.get(&commit_cid)
150
+
.await
151
+
.map_err(|_| (row.seq, "failed to fetch block"))?
152
+
.ok_or((row.seq, "block not found"))?;
153
+
let commit = Commit::from_cbor(&block).map_err(|_| (row.seq, "failed to parse commit"))?;
154
+
let blocks_cids = vec![commit.data.to_string(), commit_cid.to_string()];
155
+
update_genesis_blocks_cids(db, &blocks_cids, row.seq)
156
+
.await
157
+
.map_err(|_| (row.seq, "failed to update"))?;
158
+
Ok((row.did, row.seq))
159
+
}
160
+
161
pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) {
162
let broken_genesis_commits = match sqlx::query!(
163
r#"
···
188
"Backfilling blocks_cids for genesis commits"
189
);
190
191
+
let results = futures::future::join_all(broken_genesis_commits.into_iter().map(|row| {
192
+
process_genesis_commit(
193
+
db,
194
+
&block_store,
195
+
GenesisCommitRow {
196
+
seq: row.seq,
197
+
did: row.did,
198
+
commit_cid: row.commit_cid,
199
+
},
200
)
201
+
}))
202
+
.await;
203
+
204
+
let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r {
205
+
Ok((did, seq)) => {
206
+
info!(seq = seq, did = %did, "Fixed genesis commit blocks_cids");
207
+
(s + 1, f)
208
}
209
+
Err((seq, reason)) => {
210
+
warn!(seq = seq, reason = reason, "Failed to process genesis commit");
211
+
(s, f + 1)
212
+
}
213
+
});
214
215
info!(
216
success,
217
failed, "Completed genesis commit blocks_cids backfill"
218
);
219
+
}
220
+
221
+
async fn process_repo_rev(
222
+
db: &PgPool,
223
+
block_store: &PostgresBlockStore,
224
+
user_id: uuid::Uuid,
225
+
repo_root_cid: String,
226
+
) -> Result<uuid::Uuid, uuid::Uuid> {
227
+
let cid = Cid::from_str(&repo_root_cid).map_err(|_| user_id)?;
228
+
let block = block_store
229
+
.get(&cid)
230
+
.await
231
+
.ok()
232
+
.flatten()
233
+
.ok_or(user_id)?;
234
+
let commit = Commit::from_cbor(&block).map_err(|_| user_id)?;
235
+
let rev = commit.rev().to_string();
236
+
update_repo_rev(db, &rev, user_id)
237
+
.await
238
+
.map_err(|_| user_id)?;
239
+
Ok(user_id)
240
}
241
242
pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) {
···
262
"Backfilling repo_rev for existing repos"
263
);
264
265
+
let results = futures::future::join_all(repos_missing_rev.into_iter().map(|repo| {
266
+
process_repo_rev(db, &block_store, repo.user_id, repo.repo_root_cid)
267
+
}))
268
+
.await;
269
270
+
let (success, failed) = results
271
+
.iter()
272
+
.fold((0, 0), |(s, f), r| match r {
273
+
Ok(_) => (s + 1, f),
274
+
Err(user_id) => {
275
+
warn!(user_id = %user_id, "Failed to update repo_rev");
276
+
(s, f + 1)
277
}
278
+
});
279
280
+
info!(success, failed, "Completed repo_rev backfill");
281
+
}
282
283
+
async fn process_user_blocks(
284
+
db: &PgPool,
285
+
block_store: &PostgresBlockStore,
286
+
user_id: uuid::Uuid,
287
+
repo_root_cid: String,
288
+
) -> Result<(uuid::Uuid, usize), uuid::Uuid> {
289
+
let root_cid = Cid::from_str(&repo_root_cid).map_err(|_| user_id)?;
290
+
let block_cids = collect_current_repo_blocks(block_store, &root_cid)
291
.await
292
+
.map_err(|_| user_id)?;
293
+
if block_cids.is_empty() {
294
+
return Err(user_id);
295
}
296
+
let count = block_cids.len();
297
+
insert_user_blocks(db, user_id, &block_cids)
298
+
.await
299
+
.map_err(|_| user_id)?;
300
+
Ok((user_id, count))
301
}
302
303
pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) {
···
329
"Backfilling user_blocks for existing repos"
330
);
331
332
+
let results = futures::future::join_all(users_without_blocks.into_iter().map(|user| {
333
+
process_user_blocks(db, &block_store, user.user_id, user.repo_root_cid)
334
+
}))
335
+
.await;
336
337
+
let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r {
338
+
Ok((user_id, count)) => {
339
+
info!(user_id = %user_id, block_count = count, "Backfilled user_blocks");
340
+
(s + 1, f)
341
+
}
342
+
Err(user_id) => {
343
+
warn!(user_id = %user_id, "Failed to backfill user_blocks");
344
+
(s, f + 1)
345
}
346
+
});
347
348
info!(success, failed, "Completed user_blocks backfill");
349
}
···
400
Ok(block_cids)
401
}
402
403
+
async fn process_record_blobs(
404
+
db: &PgPool,
405
+
block_store: &PostgresBlockStore,
406
+
user_id: uuid::Uuid,
407
+
did: String,
408
+
) -> Result<(uuid::Uuid, String, usize), (uuid::Uuid, &'static str)> {
409
+
let records = fetch_user_records(db, user_id)
410
+
.await
411
+
.map_err(|_| (user_id, "failed to fetch records"))?;
412
+
413
+
let mut batch_record_uris: Vec<String> = Vec::new();
414
+
let mut batch_blob_cids: Vec<String> = Vec::new();
415
+
416
+
futures::future::join_all(records.into_iter().map(|(collection, rkey, record_cid)| {
417
+
let did = did.clone();
418
+
async move {
419
+
let cid = Cid::from_str(&record_cid).ok()?;
420
+
let block_bytes = block_store.get(&cid).await.ok()??;
421
+
let record_ipld: Ipld = serde_ipld_dagcbor::from_slice(&block_bytes).ok()?;
422
+
let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0);
423
+
Some(
424
+
blob_refs
425
+
.into_iter()
426
+
.map(|blob_ref| {
427
+
let record_uri = format!("at://{}/{}/{}", did, collection, rkey);
428
+
(record_uri, blob_ref.cid)
429
+
})
430
+
.collect::<Vec<_>>(),
431
+
)
432
+
}
433
+
}))
434
+
.await
435
+
.into_iter()
436
+
.flatten()
437
+
.flatten()
438
+
.for_each(|(uri, cid)| {
439
+
batch_record_uris.push(uri);
440
+
batch_blob_cids.push(cid);
441
+
});
442
+
443
+
let blob_refs_found = batch_record_uris.len();
444
+
if !batch_record_uris.is_empty() {
445
+
insert_record_blobs(db, user_id, &batch_record_uris, &batch_blob_cids)
446
+
.await
447
+
.map_err(|_| (user_id, "failed to insert"))?;
448
+
}
449
+
Ok((user_id, did, blob_refs_found))
450
+
}
451
+
452
pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) {
453
let users_needing_backfill = match sqlx::query!(
454
r#"
···
479
"Backfilling record_blobs for existing repos"
480
);
481
482
+
let results = futures::future::join_all(users_needing_backfill.into_iter().map(|user| {
483
+
process_record_blobs(db, &block_store, user.user_id, user.did)
484
+
}))
485
+
.await;
486
487
+
let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r {
488
+
Ok((user_id, did, blob_refs)) => {
489
+
if *blob_refs > 0 {
490
+
info!(user_id = %user_id, did = %did, blob_refs = blob_refs, "Backfilled record_blobs");
491
}
492
+
(s + 1, f)
493
}
494
+
Err((user_id, reason)) => {
495
+
warn!(user_id = %user_id, reason = reason, "Failed to backfill record_blobs");
496
+
(s, f + 1)
497
}
498
+
});
499
500
info!(success, failed, "Completed record_blobs backfill");
501
}
···
565
"Processing scheduled account deletions"
566
);
567
568
+
futures::future::join_all(accounts_to_delete.into_iter().map(|account| async move {
569
+
let result = delete_account_data(db, blob_store, &account.did, &account.handle).await;
570
+
(account.did, account.handle, result)
571
+
}))
572
+
.await
573
+
.into_iter()
574
+
.for_each(|(did, handle, result)| match result {
575
+
Ok(()) => info!(did = %did, handle = %handle, "Successfully deleted scheduled account"),
576
+
Err(e) => warn!(did = %did, handle = %handle, error = %e, "Failed to delete scheduled account"),
577
+
});
578
579
Ok(())
580
}
···
598
.await
599
.map_err(|e| format!("DB error fetching blob keys: {}", e))?;
600
601
+
futures::future::join_all(blob_storage_keys.iter().map(|storage_key| async move {
602
+
(storage_key, blob_store.delete(storage_key).await)
603
+
}))
604
+
.await
605
+
.into_iter()
606
+
.filter_map(|(key, result)| result.err().map(|e| (key, e)))
607
+
.for_each(|(key, e)| {
608
+
warn!(storage_key = %key, error = %e, "Failed to delete blob from storage (continuing anyway)");
609
+
});
610
611
let mut tx = db
612
.begin()
···
696
}
697
}
698
699
+
struct BackupResult {
700
+
did: String,
701
+
repo_rev: String,
702
+
size_bytes: i64,
703
+
block_count: i32,
704
+
user_id: uuid::Uuid,
705
+
}
706
+
707
+
enum BackupOutcome {
708
+
Success(BackupResult),
709
+
Skipped(String, &'static str),
710
+
Failed(String, String),
711
+
}
712
+
713
+
async fn process_single_backup(
714
+
db: &PgPool,
715
+
block_store: &PostgresBlockStore,
716
+
backup_storage: &BackupStorage,
717
+
user_id: uuid::Uuid,
718
+
did: String,
719
+
repo_root_cid: String,
720
+
repo_rev: Option<String>,
721
+
) -> BackupOutcome {
722
+
let repo_rev = match repo_rev {
723
+
Some(rev) => rev,
724
+
None => return BackupOutcome::Skipped(did, "no repo_rev"),
725
+
};
726
+
727
+
let head_cid = match Cid::from_str(&repo_root_cid) {
728
+
Ok(c) => c,
729
+
Err(_) => return BackupOutcome::Skipped(did, "invalid repo_root_cid"),
730
+
};
731
+
732
+
let car_bytes = match generate_full_backup(db, block_store, user_id, &head_cid).await {
733
+
Ok(bytes) => bytes,
734
+
Err(e) => return BackupOutcome::Failed(did, format!("CAR generation: {}", e)),
735
+
};
736
+
737
+
let block_count = count_car_blocks(&car_bytes);
738
+
let size_bytes = car_bytes.len() as i64;
739
+
740
+
let storage_key = match backup_storage.put_backup(&did, &repo_rev, &car_bytes).await {
741
+
Ok(key) => key,
742
+
Err(e) => return BackupOutcome::Failed(did, format!("S3 upload: {}", e)),
743
+
};
744
+
745
+
if let Err(e) = insert_backup_record(
746
+
db,
747
+
user_id,
748
+
&storage_key,
749
+
&repo_root_cid,
750
+
&repo_rev,
751
+
block_count,
752
+
size_bytes,
753
+
)
754
+
.await
755
+
{
756
+
if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await {
757
+
error!(
758
+
did = %did,
759
+
storage_key = %storage_key,
760
+
error = %rollback_err,
761
+
"Failed to rollback orphaned backup from S3"
762
+
);
763
+
}
764
+
return BackupOutcome::Failed(did, format!("DB insert: {}", e));
765
+
}
766
+
767
+
BackupOutcome::Success(BackupResult {
768
+
did,
769
+
repo_rev,
770
+
size_bytes,
771
+
block_count,
772
+
user_id,
773
+
})
774
+
}
775
+
776
async fn process_scheduled_backups(
777
db: &PgPool,
778
block_store: &PostgresBlockStore,
···
814
"Processing scheduled backups"
815
);
816
817
+
let results = futures::future::join_all(users_needing_backup.into_iter().map(|user| {
818
+
process_single_backup(
819
+
db,
820
+
block_store,
821
+
backup_storage,
822
user.user_id,
823
+
user.did,
824
+
user.repo_root_cid,
825
+
user.repo_rev,
826
)
827
+
}))
828
+
.await;
829
+
830
+
futures::future::join_all(results.into_iter().map(|outcome| async move {
831
+
match outcome {
832
+
BackupOutcome::Success(result) => {
833
+
info!(
834
+
did = %result.did,
835
+
rev = %result.repo_rev,
836
+
size_bytes = result.size_bytes,
837
+
block_count = result.block_count,
838
+
"Created backup"
839
);
840
+
if let Err(e) =
841
+
cleanup_old_backups(db, backup_storage, result.user_id, retention_count).await
842
+
{
843
+
warn!(did = %result.did, error = %e, "Failed to cleanup old backups");
844
+
}
845
}
846
+
BackupOutcome::Skipped(did, reason) => {
847
+
warn!(did = %did, reason = reason, "Skipped backup");
848
+
}
849
+
BackupOutcome::Failed(did, error) => {
850
+
warn!(did = %did, error = %error, "Failed backup");
851
+
}
852
}
853
+
}))
854
+
.await;
855
856
Ok(())
857
}
···
982
user_id: uuid::Uuid,
983
retention_count: u32,
984
) -> Result<(), String> {
985
+
let old_backups = fetch_old_backups(db, user_id, retention_count as i64)
986
+
.await
987
+
.map_err(|e| format!("DB error fetching old backups: {}", e))?;
988
989
+
let results = futures::future::join_all(old_backups.into_iter().map(|(id, storage_key)| async move {
990
+
match backup_storage.delete_backup(&storage_key).await {
991
+
Ok(()) => match delete_backup_record(db, id).await {
992
+
Ok(()) => Ok(()),
993
+
Err(e) => Err(format!("DB delete failed for {}: {}", storage_key, e)),
994
+
},
995
+
Err(e) => {
996
+
warn!(
997
+
storage_key = %storage_key,
998
+
error = %e,
999
+
"Failed to delete old backup from storage, skipping DB cleanup to avoid orphan"
1000
+
);
1001
+
Ok(())
1002
+
}
1003
}
1004
+
}))
1005
+
.await;
1006
1007
+
results
1008
+
.into_iter()
1009
+
.find_map(|r| r.err())
1010
+
.map_or(Ok(()), Err)
1011
}
+4
-4
crates/tranquil-pds/src/sync/listener.rs
+4
-4
crates/tranquil-pds/src/sync/listener.rs
···
48
from_seq = catchup_start,
49
"Broadcasting catch-up events"
50
);
51
-
for event in events {
52
let seq = event.seq;
53
let _ = state.firehose_tx.send(event);
54
LAST_BROADCAST_SEQ.store(seq, Ordering::SeqCst);
55
-
}
56
}
57
loop {
58
let notification = listener.recv().await?;
···
93
.await?;
94
if !gap_events.is_empty() {
95
debug!(count = gap_events.len(), "Filling sequence gap");
96
-
for event in gap_events {
97
let seq = event.seq;
98
let _ = state.firehose_tx.send(event);
99
LAST_BROADCAST_SEQ.store(seq, Ordering::SeqCst);
100
-
}
101
}
102
}
103
let event = sqlx::query_as!(
···
48
from_seq = catchup_start,
49
"Broadcasting catch-up events"
50
);
51
+
events.into_iter().for_each(|event| {
52
let seq = event.seq;
53
let _ = state.firehose_tx.send(event);
54
LAST_BROADCAST_SEQ.store(seq, Ordering::SeqCst);
55
+
});
56
}
57
loop {
58
let notification = listener.recv().await?;
···
93
.await?;
94
if !gap_events.is_empty() {
95
debug!(count = gap_events.len(), "Filling sequence gap");
96
+
gap_events.into_iter().for_each(|event| {
97
let seq = event.seq;
98
let _ = state.firehose_tx.send(event);
99
LAST_BROADCAST_SEQ.store(seq, Ordering::SeqCst);
100
+
});
101
}
102
}
103
let event = sqlx::query_as!(
+5
-8
crates/tranquil-pds/src/util.rs
+5
-8
crates/tranquil-pds/src/util.rs
···
257
assert_eq!(parts[0].len(), 5);
258
assert_eq!(parts[1].len(), 5);
259
260
-
for c in code.chars() {
261
-
if c != '-' {
262
-
assert!(BASE32_ALPHABET.contains(c));
263
-
}
264
-
}
265
}
266
267
#[test]
···
270
let parts: Vec<&str> = code.split('-').collect();
271
assert_eq!(parts.len(), 3);
272
273
-
for part in parts {
274
-
assert_eq!(part.len(), 4);
275
-
}
276
}
277
278
#[test]
···
257
assert_eq!(parts[0].len(), 5);
258
assert_eq!(parts[1].len(), 5);
259
260
+
assert!(code
261
+
.chars()
262
+
.filter(|&c| c != '-')
263
+
.all(|c| BASE32_ALPHABET.contains(c)));
264
}
265
266
#[test]
···
269
let parts: Vec<&str> = code.split('-').collect();
270
assert_eq!(parts.len(), 3);
271
272
+
assert!(parts.iter().all(|part| part.len() == 4));
273
}
274
275
#[test]
+3
-2
crates/tranquil-pds/src/validation/mod.rs
+3
-2
crates/tranquil-pds/src/validation/mod.rs
···
534
"Collection NSID must have at least 3 segments".to_string(),
535
));
536
}
537
-
for part in &parts {
538
if part.is_empty() {
539
return Err(ValidationError::InvalidRecord(
540
"Collection NSID segments cannot be empty".to_string(),
···
545
"Collection NSID segments must be alphanumeric or hyphens".to_string(),
546
));
547
}
548
-
}
549
Ok(())
550
}
551
···
534
"Collection NSID must have at least 3 segments".to_string(),
535
));
536
}
537
+
parts.iter().try_for_each(|part| {
538
if part.is_empty() {
539
return Err(ValidationError::InvalidRecord(
540
"Collection NSID segments cannot be empty".to_string(),
···
545
"Collection NSID segments must be alphanumeric or hyphens".to_string(),
546
));
547
}
548
+
Ok(())
549
+
})?;
550
Ok(())
551
}
552
+45
-2
frontend/src/App.svelte
+45
-2
frontend/src/App.svelte
···
35
import ActAs from './routes/ActAs.svelte'
36
import Migration from './routes/Migration.svelte'
37
import DidDocumentEditor from './routes/DidDocumentEditor.svelte'
38
initI18n()
39
40
const auth = $derived(getAuthState())
41
42
let oauthCallbackPending = $state(hasOAuthCallback())
43
44
function hasOAuthCallback(): boolean {
45
if (window.location.pathname === '/app/migrate') {
···
50
}
51
52
$effect(() => {
53
initServerConfig()
54
initAuth().then(({ oauthLoginCompleted }) => {
55
if (oauthLoginCompleted) {
56
navigate('/dashboard', { replace: true })
57
}
58
oauthCallbackPending = false
59
})
60
})
61
62
$effect(() => {
63
if (auth.kind === 'loading') return
···
143
</script>
144
145
<main>
146
-
{#if auth.kind === 'loading' || $i18nLoading || oauthCallbackPending}
147
-
<div class="loading"></div>
148
{:else}
149
<CurrentComponent />
150
{/if}
···
158
159
.loading {
160
min-height: 100vh;
161
}
162
</style>
···
35
import ActAs from './routes/ActAs.svelte'
36
import Migration from './routes/Migration.svelte'
37
import DidDocumentEditor from './routes/DidDocumentEditor.svelte'
38
+
import { _ } from './lib/i18n'
39
initI18n()
40
41
const auth = $derived(getAuthState())
42
43
let oauthCallbackPending = $state(hasOAuthCallback())
44
+
let showSpinner = $state(false)
45
+
let loadingTimer: ReturnType<typeof setTimeout> | null = null
46
47
function hasOAuthCallback(): boolean {
48
if (window.location.pathname === '/app/migrate') {
···
53
}
54
55
$effect(() => {
56
+
loadingTimer = setTimeout(() => {
57
+
showSpinner = true
58
+
}, 5000)
59
+
60
initServerConfig()
61
initAuth().then(({ oauthLoginCompleted }) => {
62
if (oauthLoginCompleted) {
63
navigate('/dashboard', { replace: true })
64
}
65
oauthCallbackPending = false
66
+
if (loadingTimer) {
67
+
clearTimeout(loadingTimer)
68
+
loadingTimer = null
69
+
}
70
})
71
+
72
+
return () => {
73
+
if (loadingTimer) {
74
+
clearTimeout(loadingTimer)
75
+
}
76
+
}
77
})
78
+
79
+
const isLoading = $derived(
80
+
auth.kind === 'loading' || $i18nLoading || oauthCallbackPending
81
+
)
82
83
$effect(() => {
84
if (auth.kind === 'loading') return
···
164
</script>
165
166
<main>
167
+
{#if isLoading}
168
+
<div class="loading">
169
+
{#if showSpinner}
170
+
<div class="loading-content">
171
+
<div class="spinner"></div>
172
+
<p>{$_('common.loading')}</p>
173
+
</div>
174
+
{/if}
175
+
</div>
176
{:else}
177
<CurrentComponent />
178
{/if}
···
186
187
.loading {
188
min-height: 100vh;
189
+
display: flex;
190
+
align-items: center;
191
+
justify-content: center;
192
+
}
193
+
194
+
.loading-content {
195
+
display: flex;
196
+
flex-direction: column;
197
+
align-items: center;
198
+
gap: var(--space-4);
199
+
}
200
+
201
+
.loading-content p {
202
+
margin: 0;
203
+
color: var(--text-secondary);
204
}
205
</style>
+37
-1
frontend/src/routes/OAuthConsent.svelte
+37
-1
frontend/src/routes/OAuthConsent.svelte
···
27
}
28
29
let loading = $state(true)
30
let error = $state<string | null>(null)
31
let submitting = $state(false)
32
let consentData = $state<ConsentData | null>(null)
···
71
error = $_('oauth.error.genericError')
72
} finally {
73
loading = false
74
}
75
}
76
···
151
}
152
153
$effect(() => {
154
fetchConsentData()
155
})
156
157
let scopeGroups = $derived(consentData ? groupScopesByCategory(consentData.scopes) : {})
···
159
160
<div class="consent-container">
161
{#if loading}
162
-
<div class="loading"></div>
163
{:else if error}
164
<div class="error-container">
165
<h1>{$_('oauth.error.title')}</h1>
···
293
align-items: center;
294
justify-content: center;
295
min-height: 200px;
296
color: var(--text-secondary);
297
}
298
···
27
}
28
29
let loading = $state(true)
30
+
let showSpinner = $state(false)
31
+
let loadingTimer: ReturnType<typeof setTimeout> | null = null
32
let error = $state<string | null>(null)
33
let submitting = $state(false)
34
let consentData = $state<ConsentData | null>(null)
···
73
error = $_('oauth.error.genericError')
74
} finally {
75
loading = false
76
+
showSpinner = false
77
+
if (loadingTimer) {
78
+
clearTimeout(loadingTimer)
79
+
loadingTimer = null
80
+
}
81
}
82
}
83
···
158
}
159
160
$effect(() => {
161
+
loadingTimer = setTimeout(() => {
162
+
if (loading) {
163
+
showSpinner = true
164
+
}
165
+
}, 5000)
166
fetchConsentData()
167
+
return () => {
168
+
if (loadingTimer) {
169
+
clearTimeout(loadingTimer)
170
+
}
171
+
}
172
})
173
174
let scopeGroups = $derived(consentData ? groupScopesByCategory(consentData.scopes) : {})
···
176
177
<div class="consent-container">
178
{#if loading}
179
+
<div class="loading">
180
+
{#if showSpinner}
181
+
<div class="loading-content">
182
+
<div class="spinner"></div>
183
+
<p>{$_('common.loading')}</p>
184
+
</div>
185
+
{/if}
186
+
</div>
187
{:else if error}
188
<div class="error-container">
189
<h1>{$_('oauth.error.title')}</h1>
···
317
align-items: center;
318
justify-content: center;
319
min-height: 200px;
320
+
color: var(--text-secondary);
321
+
}
322
+
323
+
.loading-content {
324
+
display: flex;
325
+
flex-direction: column;
326
+
align-items: center;
327
+
gap: var(--space-4);
328
+
}
329
+
330
+
.loading-content p {
331
+
margin: 0;
332
color: var(--text-secondary);
333
}
334
+27
frontend/src/styles/base.css
+27
frontend/src/styles/base.css
···
494
.info-panel p:last-child {
495
margin-bottom: 0;
496
}
497
+
498
+
.spinner {
499
+
width: 40px;
500
+
height: 40px;
501
+
border: 3px solid var(--border-color);
502
+
border-top-color: var(--accent);
503
+
border-radius: 50%;
504
+
animation: spin 1s linear infinite;
505
+
}
506
+
507
+
.spinner.sm {
508
+
width: 20px;
509
+
height: 20px;
510
+
border-width: 2px;
511
+
}
512
+
513
+
.spinner.lg {
514
+
width: 60px;
515
+
height: 60px;
516
+
border-width: 4px;
517
+
}
518
+
519
+
@keyframes spin {
520
+
to {
521
+
transform: rotate(360deg);
522
+
}
523
+
}