+20
-21
crates/tranquil-oauth/src/client.rs
+20
-21
crates/tranquil-oauth/src/client.rs
···
529
529
let signature_bytes = URL_SAFE_NO_PAD
530
530
.decode(parts[2])
531
531
.map_err(|_| OAuthError::InvalidClient("Invalid signature encoding".to_string()))?;
532
-
for key in matching_keys {
533
-
let key_alg = key.get("alg").and_then(|a| a.as_str());
534
-
if key_alg.is_some() && key_alg != Some(alg) {
535
-
continue;
536
-
}
537
-
let kty = key.get("kty").and_then(|k| k.as_str()).unwrap_or("");
538
-
let verified = match (alg, kty) {
539
-
("ES256", "EC") => verify_es256(key, &signing_input, &signature_bytes),
540
-
("ES384", "EC") => verify_es384(key, &signing_input, &signature_bytes),
541
-
("RS256" | "RS384" | "RS512", "RSA") => {
542
-
verify_rsa(alg, key, &signing_input, &signature_bytes)
532
+
matching_keys
533
+
.into_iter()
534
+
.filter(|key| {
535
+
let key_alg = key.get("alg").and_then(|a| a.as_str());
536
+
key_alg.is_none() || key_alg == Some(alg)
537
+
})
538
+
.find_map(|key| {
539
+
let kty = key.get("kty").and_then(|k| k.as_str()).unwrap_or("");
540
+
match (alg, kty) {
541
+
("ES256", "EC") => verify_es256(key, &signing_input, &signature_bytes).ok(),
542
+
("ES384", "EC") => verify_es384(key, &signing_input, &signature_bytes).ok(),
543
+
("RS256" | "RS384" | "RS512", "RSA") => {
544
+
verify_rsa(alg, key, &signing_input, &signature_bytes).ok()
545
+
}
546
+
("EdDSA", "OKP") => verify_eddsa(key, &signing_input, &signature_bytes).ok(),
547
+
_ => None,
543
548
}
544
-
("EdDSA", "OKP") => verify_eddsa(key, &signing_input, &signature_bytes),
545
-
_ => continue,
546
-
};
547
-
if verified.is_ok() {
548
-
return Ok(());
549
-
}
550
-
}
551
-
Err(OAuthError::InvalidClient(
552
-
"client_assertion signature verification failed".to_string(),
553
-
))
549
+
})
550
+
.ok_or_else(|| {
551
+
OAuthError::InvalidClient("client_assertion signature verification failed".to_string())
552
+
})
554
553
}
555
554
556
555
fn verify_es256(
+6
-7
crates/tranquil-pds/src/api/identity/account.rs
+6
-7
crates/tranquil-pds/src/api/identity/account.rs
···
189
189
if input.handle.contains(' ') || input.handle.contains('\t') {
190
190
return ApiError::InvalidRequest("Handle cannot contain spaces".into()).into_response();
191
191
}
192
-
for c in input.handle.chars() {
193
-
if !c.is_ascii_alphanumeric() && c != '.' && c != '-' {
194
-
return ApiError::InvalidRequest(format!(
195
-
"Handle contains invalid character: {}",
196
-
c
197
-
))
192
+
if let Some(c) = input
193
+
.handle
194
+
.chars()
195
+
.find(|c| !c.is_ascii_alphanumeric() && *c != '.' && *c != '-')
196
+
{
197
+
return ApiError::InvalidRequest(format!("Handle contains invalid character: {}", c))
198
198
.into_response();
199
-
}
200
199
}
201
200
let handle_lower = input.handle.to_lowercase();
202
201
if crate::moderation::has_explicit_slur(&handle_lower) {
+11
-10
crates/tranquil-pds/src/api/identity/did.rs
+11
-10
crates/tranquil-pds/src/api/identity/did.rs
···
639
639
return ApiError::InvalidHandle(Some("Handle contains invalid characters".into()))
640
640
.into_response();
641
641
}
642
-
for segment in new_handle.split('.') {
643
-
if segment.is_empty() {
644
-
return ApiError::InvalidHandle(Some("Handle contains empty segment".into()))
645
-
.into_response();
646
-
}
647
-
if segment.starts_with('-') || segment.ends_with('-') {
648
-
return ApiError::InvalidHandle(Some(
649
-
"Handle segment cannot start or end with hyphen".into(),
650
-
))
642
+
if new_handle.split('.').any(|segment| segment.is_empty()) {
643
+
return ApiError::InvalidHandle(Some("Handle contains empty segment".into()))
651
644
.into_response();
652
-
}
645
+
}
646
+
if new_handle
647
+
.split('.')
648
+
.any(|segment| segment.starts_with('-') || segment.ends_with('-'))
649
+
{
650
+
return ApiError::InvalidHandle(Some(
651
+
"Handle segment cannot start or end with hyphen".into(),
652
+
))
653
+
.into_response();
653
654
}
654
655
if crate::moderation::has_explicit_slur(&new_handle) {
655
656
return ApiError::InvalidHandle(Some("Inappropriate language in handle".into()))
+3
-1
crates/tranquil-pds/src/api/server/account_status.rs
+3
-1
crates/tranquil-pds/src/api/server/account_status.rs
···
203
203
Ok(data) => {
204
204
let pds_endpoint = data
205
205
.get("services")
206
-
.and_then(|s: &serde_json::Value| s.get("atproto_pds").or_else(|| s.get("atprotoPds")))
206
+
.and_then(|s: &serde_json::Value| {
207
+
s.get("atproto_pds").or_else(|| s.get("atprotoPds"))
208
+
})
207
209
.and_then(|p: &serde_json::Value| p.get("endpoint"))
208
210
.and_then(|e: &serde_json::Value| e.as_str());
209
211
+5
-5
crates/tranquil-pds/src/api/server/migration.rs
+5
-5
crates/tranquil-pds/src/api/server/migration.rs
···
115
115
}
116
116
}
117
117
118
-
if let Some(ref handles) = input.also_known_as {
119
-
if handles.iter().any(|h| !h.starts_with("at://")) {
120
-
return ApiError::InvalidRequest("alsoKnownAs entries must be at:// URIs".into())
121
-
.into_response();
122
-
}
118
+
if let Some(ref handles) = input.also_known_as
119
+
&& handles.iter().any(|h| !h.starts_with("at://"))
120
+
{
121
+
return ApiError::InvalidRequest("alsoKnownAs entries must be at:// URIs".into())
122
+
.into_response();
123
123
}
124
124
125
125
if let Some(ref endpoint) = input.service_endpoint {
+4
-4
crates/tranquil-pds/src/api/server/session.rs
+4
-4
crates/tranquil-pds/src/api/server/session.rs
···
949
949
}
950
950
};
951
951
952
-
let jwt_sessions = jwt_rows.into_iter().map(|(id, access_jti, created_at, expires_at)| {
953
-
SessionInfo {
952
+
let jwt_sessions = jwt_rows
953
+
.into_iter()
954
+
.map(|(id, access_jti, created_at, expires_at)| SessionInfo {
954
955
id: format!("jwt:{}", id),
955
956
session_type: "legacy".to_string(),
956
957
client_name: None,
957
958
created_at: created_at.to_rfc3339(),
958
959
expires_at: expires_at.to_rfc3339(),
959
960
is_current: current_jti.as_ref() == Some(&access_jti),
960
-
}
961
-
});
961
+
});
962
962
963
963
let is_oauth = auth.0.is_oauth;
964
964
let oauth_sessions =
+4
-2
crates/tranquil-pds/src/api/server/totp.rs
+4
-2
crates/tranquil-pds/src/api/server/totp.rs
···
195
195
return ApiError::InternalError(None).into_response();
196
196
}
197
197
198
-
let backup_hashes: Result<Vec<_>, _> = backup_codes.iter().map(|c| hash_backup_code(c)).collect();
198
+
let backup_hashes: Result<Vec<_>, _> =
199
+
backup_codes.iter().map(|c| hash_backup_code(c)).collect();
199
200
let backup_hashes = match backup_hashes {
200
201
Ok(hashes) => hashes,
201
202
Err(e) => {
···
484
485
return ApiError::InternalError(None).into_response();
485
486
}
486
487
487
-
let backup_hashes: Result<Vec<_>, _> = backup_codes.iter().map(|c| hash_backup_code(c)).collect();
488
+
let backup_hashes: Result<Vec<_>, _> =
489
+
backup_codes.iter().map(|c| hash_backup_code(c)).collect();
488
490
let backup_hashes = match backup_hashes {
489
491
Ok(hashes) => hashes,
490
492
Err(e) => {
+8
-9
crates/tranquil-pds/src/auth/verification_token.rs
+8
-9
crates/tranquil-pds/src/auth/verification_token.rs
···
296
296
}
297
297
298
298
pub fn format_token_for_display(token: &str) -> String {
299
-
let clean = token.replace(['-', ' '], "");
300
-
let mut result = String::new();
301
-
for (i, c) in clean.chars().enumerate() {
302
-
if i > 0 && i % 4 == 0 {
303
-
result.push('-');
304
-
}
305
-
result.push(c);
306
-
}
307
-
result
299
+
token
300
+
.replace(['-', ' '], "")
301
+
.chars()
302
+
.collect::<Vec<_>>()
303
+
.chunks(4)
304
+
.map(|chunk| chunk.iter().collect::<String>())
305
+
.collect::<Vec<_>>()
306
+
.join("-")
308
307
}
309
308
310
309
pub fn normalize_token_input(input: &str) -> String {
+3
-7
crates/tranquil-pds/src/oauth/db/scope_preference.rs
+3
-7
crates/tranquil-pds/src/oauth/db/scope_preference.rs
···
75
75
let stored_scopes: std::collections::HashSet<&str> =
76
76
stored_prefs.iter().map(|p| p.scope.as_str()).collect();
77
77
78
-
for scope in requested_scopes {
79
-
if !stored_scopes.contains(scope.as_str()) {
80
-
return Ok(true);
81
-
}
82
-
}
83
-
84
-
Ok(false)
78
+
Ok(requested_scopes
79
+
.iter()
80
+
.any(|scope| !stored_scopes.contains(scope.as_str())))
85
81
}
86
82
87
83
pub async fn delete_scope_preferences(
+20
-20
crates/tranquil-pds/src/oauth/db/token.rs
+20
-20
crates/tranquil-pds/src/oauth/db/token.rs
···
315
315
)
316
316
.fetch_all(pool)
317
317
.await?;
318
-
let mut tokens = Vec::with_capacity(rows.len());
319
-
for r in rows {
320
-
tokens.push(TokenData {
321
-
did: r.did,
322
-
token_id: r.token_id,
323
-
created_at: r.created_at,
324
-
updated_at: r.updated_at,
325
-
expires_at: r.expires_at,
326
-
client_id: r.client_id,
327
-
client_auth: from_json(r.client_auth)?,
328
-
device_id: r.device_id,
329
-
parameters: from_json(r.parameters)?,
330
-
details: r.details,
331
-
code: r.code,
332
-
current_refresh_token: r.current_refresh_token,
333
-
scope: r.scope,
334
-
controller_did: r.controller_did,
335
-
});
336
-
}
337
-
Ok(tokens)
318
+
rows.into_iter()
319
+
.map(|r| {
320
+
Ok(TokenData {
321
+
did: r.did,
322
+
token_id: r.token_id,
323
+
created_at: r.created_at,
324
+
updated_at: r.updated_at,
325
+
expires_at: r.expires_at,
326
+
client_id: r.client_id,
327
+
client_auth: from_json(r.client_auth)?,
328
+
device_id: r.device_id,
329
+
parameters: from_json(r.parameters)?,
330
+
details: r.details,
331
+
code: r.code,
332
+
current_refresh_token: r.current_refresh_token,
333
+
scope: r.scope,
334
+
controller_did: r.controller_did,
335
+
})
336
+
})
337
+
.collect()
338
338
}
339
339
340
340
pub async fn count_tokens_for_user(pool: &PgPool, did: &str) -> Result<i64, OAuthError> {
+34
-32
crates/tranquil-pds/src/oauth/endpoints/par.rs
+34
-32
crates/tranquil-pds/src/oauth/endpoints/par.rs
···
182
182
if requested_scopes.is_empty() {
183
183
return Ok(Some("atproto".to_string()));
184
184
}
185
-
let mut has_transition = false;
186
-
let mut has_granular = false;
185
+
if let Some(unknown) = requested_scopes
186
+
.iter()
187
+
.find(|s| matches!(parse_scope(s), ParsedScope::Unknown(_)))
188
+
{
189
+
return Err(OAuthError::InvalidScope(format!(
190
+
"Unsupported scope: {}",
191
+
unknown
192
+
)));
193
+
}
187
194
188
-
for scope in &requested_scopes {
189
-
let parsed = parse_scope(scope);
190
-
match &parsed {
191
-
ParsedScope::Unknown(_) => {
192
-
return Err(OAuthError::InvalidScope(format!(
193
-
"Unsupported scope: {}",
194
-
scope
195
-
)));
196
-
}
195
+
let has_transition = requested_scopes.iter().any(|s| {
196
+
matches!(
197
+
parse_scope(s),
197
198
ParsedScope::TransitionGeneric
198
-
| ParsedScope::TransitionChat
199
-
| ParsedScope::TransitionEmail => {
200
-
has_transition = true;
201
-
}
199
+
| ParsedScope::TransitionChat
200
+
| ParsedScope::TransitionEmail
201
+
)
202
+
});
203
+
let has_granular = requested_scopes.iter().any(|s| {
204
+
matches!(
205
+
parse_scope(s),
202
206
ParsedScope::Repo(_)
203
-
| ParsedScope::Blob(_)
204
-
| ParsedScope::Rpc(_)
205
-
| ParsedScope::Account(_)
206
-
| ParsedScope::Identity(_)
207
-
| ParsedScope::Include(_) => {
208
-
has_granular = true;
209
-
}
210
-
ParsedScope::Atproto => {}
211
-
}
212
-
}
207
+
| ParsedScope::Blob(_)
208
+
| ParsedScope::Rpc(_)
209
+
| ParsedScope::Account(_)
210
+
| ParsedScope::Identity(_)
211
+
| ParsedScope::Include(_)
212
+
)
213
+
});
213
214
214
215
if has_transition && has_granular {
215
216
return Err(OAuthError::InvalidScope(
···
219
220
220
221
if let Some(client_scope) = &client_metadata.scope {
221
222
let client_scopes: Vec<&str> = client_scope.split_whitespace().collect();
222
-
for scope in &requested_scopes {
223
-
if !client_scopes.iter().any(|cs| scope_matches(cs, scope)) {
224
-
return Err(OAuthError::InvalidScope(format!(
225
-
"Scope '{}' not registered for this client",
226
-
scope
227
-
)));
228
-
}
223
+
if let Some(unregistered) = requested_scopes
224
+
.iter()
225
+
.find(|scope| !client_scopes.iter().any(|cs| scope_matches(cs, scope)))
226
+
{
227
+
return Err(OAuthError::InvalidScope(format!(
228
+
"Scope '{}' not registered for this client",
229
+
unregistered
230
+
)));
229
231
}
230
232
}
231
233
Ok(Some(requested_scopes.join(" ")))
+1
-7
crates/tranquil-pds/src/oauth/endpoints/token/grants.rs
+1
-7
crates/tranquil-pds/src/oauth/endpoints/token/grants.rs
···
334
334
REFRESH_TOKEN_EXPIRY_DAYS_CONFIDENTIAL
335
335
};
336
336
let new_expires_at = Utc::now() + Duration::days(refresh_expiry_days);
337
-
db::rotate_token(
338
-
&state.db,
339
-
db_id,
340
-
&new_refresh_token.0,
341
-
new_expires_at,
342
-
)
343
-
.await?;
337
+
db::rotate_token(&state.db, db_id, &new_refresh_token.0, new_expires_at).await?;
344
338
tracing::info!(
345
339
did = %token_data.did,
346
340
new_expires_at = %new_expires_at,
+13
-5
crates/tranquil-pds/src/oauth/endpoints/token/helpers.rs
+13
-5
crates/tranquil-pds/src/oauth/endpoints/token/helpers.rs
···
11
11
12
12
pub struct TokenClaims {
13
13
pub jti: String,
14
+
pub sid: String,
14
15
pub exp: i64,
15
16
pub iat: i64,
16
17
}
···
33
34
}
34
35
35
36
pub fn create_access_token(
36
-
token_id: &str,
37
+
session_id: &str,
37
38
sub: &str,
38
39
dpop_jkt: Option<&str>,
39
40
scope: Option<&str>,
40
41
) -> Result<String, OAuthError> {
41
-
create_access_token_with_delegation(token_id, sub, dpop_jkt, scope, None)
42
+
create_access_token_with_delegation(session_id, sub, dpop_jkt, scope, None)
42
43
}
43
44
44
45
pub fn create_access_token_with_delegation(
45
-
token_id: &str,
46
+
session_id: &str,
46
47
sub: &str,
47
48
dpop_jkt: Option<&str>,
48
49
scope: Option<&str>,
49
50
controller_did: Option<&str>,
50
51
) -> Result<String, OAuthError> {
51
52
use serde_json::json;
53
+
let jti = uuid::Uuid::new_v4().to_string();
52
54
let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
53
55
let issuer = format!("https://{}", pds_hostname);
54
56
let now = Utc::now().timestamp();
···
60
62
"aud": issuer,
61
63
"iat": now,
62
64
"exp": exp,
63
-
"jti": token_id,
65
+
"jti": jti,
66
+
"sid": session_id,
64
67
"scope": actual_scope
65
68
});
66
69
if let Some(jkt) = dpop_jkt {
···
132
135
.and_then(|j| j.as_str())
133
136
.ok_or_else(|| OAuthError::InvalidToken("Missing jti claim".to_string()))?
134
137
.to_string();
138
+
let sid = payload
139
+
.get("sid")
140
+
.and_then(|s| s.as_str())
141
+
.ok_or_else(|| OAuthError::InvalidToken("Missing sid claim".to_string()))?
142
+
.to_string();
135
143
let exp = payload
136
144
.get("exp")
137
145
.and_then(|e| e.as_i64())
···
140
148
.get("iat")
141
149
.and_then(|i| i.as_i64())
142
150
.ok_or_else(|| OAuthError::InvalidToken("Missing iat claim".to_string()))?;
143
-
Ok(TokenClaims { jti, exp, iat })
151
+
Ok(TokenClaims { jti, sid, exp, iat })
144
152
}
+1
-1
crates/tranquil-pds/src/oauth/endpoints/token/introspect.rs
+1
-1
crates/tranquil-pds/src/oauth/endpoints/token/introspect.rs
···
102
102
Ok(info) => info,
103
103
Err(_) => return Ok(Json(inactive_response)),
104
104
};
105
-
let token_data = match db::get_token_by_id(&state.db, &token_info.jti).await {
105
+
let token_data = match db::get_token_by_id(&state.db, &token_info.sid).await {
106
106
Ok(Some(data)) => data,
107
107
_ => return Ok(Json(inactive_response)),
108
108
};
+2
-2
crates/tranquil-pds/src/oauth/verify.rs
+2
-2
crates/tranquil-pds/src/oauth/verify.rs
···
142
142
return Err(OAuthError::ExpiredToken("Token has expired".to_string()));
143
143
}
144
144
let token_id = payload
145
-
.get("jti")
145
+
.get("sid")
146
146
.and_then(|j| j.as_str())
147
-
.ok_or_else(|| OAuthError::InvalidToken("Missing jti claim".to_string()))?
147
+
.ok_or_else(|| OAuthError::InvalidToken("Missing sid claim".to_string()))?
148
148
.to_string();
149
149
let did = payload
150
150
.get("sub")
+2
-6
crates/tranquil-pds/src/sync/deprecated.rs
+2
-6
crates/tranquil-pds/src/sync/deprecated.rs
···
146
146
stack.push(*cid);
147
147
}
148
148
Ipld::Map(map) => {
149
-
for v in map.values() {
150
-
extract_links_ipld(v, stack);
151
-
}
149
+
map.values().for_each(|v| extract_links_ipld(v, stack));
152
150
}
153
151
Ipld::List(arr) => {
154
-
for v in arr {
155
-
extract_links_ipld(v, stack);
156
-
}
152
+
arr.iter().for_each(|v| extract_links_ipld(v, stack));
157
153
}
158
154
_ => {}
159
155
}
+2
-6
crates/tranquil-pds/src/sync/import.rs
+2
-6
crates/tranquil-pds/src/sync/import.rs
···
148
148
links.push(*cid);
149
149
}
150
150
Ipld::Map(map) => {
151
-
for v in map.values() {
152
-
extract_links(v, links);
153
-
}
151
+
map.values().for_each(|v| extract_links(v, links));
154
152
}
155
153
Ipld::List(arr) => {
156
-
for v in arr {
157
-
extract_links(v, links);
158
-
}
154
+
arr.iter().for_each(|v| extract_links(v, links));
159
155
}
160
156
_ => {}
161
157
}
+22
-20
crates/tranquil-pds/src/sync/repo.rs
+22
-20
crates/tranquil-pds/src/sync/repo.rs
···
181
181
}
182
182
};
183
183
184
-
let mut block_cids: Vec<Cid> = Vec::new();
185
-
for event in &events {
186
-
if let Some(cids) = &event.blocks_cids {
187
-
for cid_str in cids {
188
-
if let Ok(cid) = Cid::from_str(cid_str)
189
-
&& !block_cids.contains(&cid)
190
-
{
191
-
block_cids.push(cid);
192
-
}
184
+
let block_cids: Vec<Cid> = events
185
+
.iter()
186
+
.flat_map(|event| {
187
+
let block_cids = event
188
+
.blocks_cids
189
+
.as_ref()
190
+
.map(|cids| cids.iter().filter_map(|s| Cid::from_str(s).ok()).collect())
191
+
.unwrap_or_else(Vec::new);
192
+
let commit_cid = event
193
+
.commit_cid
194
+
.as_ref()
195
+
.and_then(|s| Cid::from_str(s).ok());
196
+
block_cids.into_iter().chain(commit_cid)
197
+
})
198
+
.fold(Vec::new(), |mut acc, cid| {
199
+
if !acc.contains(&cid) {
200
+
acc.push(cid);
193
201
}
194
-
}
195
-
if let Some(commit_cid_str) = &event.commit_cid
196
-
&& let Ok(cid) = Cid::from_str(commit_cid_str)
197
-
&& !block_cids.contains(&cid)
198
-
{
199
-
block_cids.push(cid);
200
-
}
201
-
}
202
+
acc
203
+
});
202
204
203
205
let mut car_bytes = match encode_car_header(head_cid) {
204
206
Ok(h) => h,
···
334
336
car.extend_from_slice(&writer);
335
337
};
336
338
write_block(&mut car_bytes, &commit_cid, &commit_bytes);
337
-
for (cid, data) in &proof_blocks {
338
-
write_block(&mut car_bytes, cid, data);
339
-
}
339
+
proof_blocks
340
+
.iter()
341
+
.for_each(|(cid, data)| write_block(&mut car_bytes, cid, data));
340
342
write_block(&mut car_bytes, &record_cid, &record_block);
341
343
(
342
344
StatusCode::OK,
+43
-57
crates/tranquil-pds/src/sync/util.rs
+43
-57
crates/tranquil-pds/src/sync/util.rs
···
210
210
let mut buffer = Cursor::new(Vec::new());
211
211
let header = CarHeader::new_v1(vec![commit_cid]);
212
212
let mut writer = CarWriter::new(header, &mut buffer);
213
-
for (cid, data) in other_blocks {
214
-
if cid != commit_cid {
215
-
writer
216
-
.write(cid, data.as_ref())
217
-
.await
218
-
.map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?;
219
-
}
213
+
for (cid, data) in other_blocks.iter().filter(|(c, _)| **c != commit_cid) {
214
+
writer
215
+
.write(*cid, data.as_ref())
216
+
.await
217
+
.map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?;
220
218
}
221
219
if let Some(data) = commit_bytes {
222
220
writer
···
360
358
}
361
359
let car_bytes = if !all_cids.is_empty() {
362
360
let fetched = state.block_store.get_many(&all_cids).await?;
363
-
let mut blocks = std::collections::BTreeMap::new();
364
-
let mut commit_bytes: Option<Bytes> = None;
365
-
for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) {
366
-
if let Some(data) = data_opt {
367
-
if *cid == commit_cid {
368
-
commit_bytes = Some(data.clone());
369
-
if let Some(rev) = extract_rev_from_commit_bytes(data) {
370
-
frame.rev = rev;
371
-
}
372
-
} else {
373
-
blocks.insert(*cid, data.clone());
374
-
}
375
-
}
361
+
let (commit_data, other_blocks): (Vec<_>, Vec<_>) = all_cids
362
+
.iter()
363
+
.zip(fetched.iter())
364
+
.filter_map(|(cid, data_opt)| data_opt.as_ref().map(|data| (*cid, data.clone())))
365
+
.partition(|(cid, _)| *cid == commit_cid);
366
+
let commit_bytes = commit_data.into_iter().next().map(|(_, data)| data);
367
+
if let Some(ref cb) = commit_bytes
368
+
&& let Some(rev) = extract_rev_from_commit_bytes(cb)
369
+
{
370
+
frame.rev = rev;
376
371
}
372
+
let blocks: std::collections::BTreeMap<Cid, Bytes> = other_blocks.into_iter().collect();
377
373
write_car_blocks(commit_cid, commit_bytes, blocks).await?
378
374
} else {
379
375
Vec::new()
···
393
389
state: &AppState,
394
390
events: &[SequencedEvent],
395
391
) -> Result<HashMap<Cid, Bytes>, anyhow::Error> {
396
-
let mut all_cids: Vec<Cid> = Vec::new();
397
-
for event in events {
398
-
if let Some(ref commit_cid_str) = event.commit_cid
399
-
&& let Ok(cid) = Cid::from_str(commit_cid_str)
400
-
{
401
-
all_cids.push(cid);
402
-
}
403
-
if let Some(ref prev_cid_str) = event.prev_cid
404
-
&& let Ok(cid) = Cid::from_str(prev_cid_str)
405
-
{
406
-
all_cids.push(cid);
407
-
}
408
-
if let Some(ref block_cids_str) = event.blocks_cids {
409
-
for s in block_cids_str {
410
-
if let Ok(cid) = Cid::from_str(s) {
411
-
all_cids.push(cid);
412
-
}
413
-
}
414
-
}
415
-
}
392
+
let mut all_cids: Vec<Cid> = events
393
+
.iter()
394
+
.flat_map(|event| {
395
+
let commit_cid = event
396
+
.commit_cid
397
+
.as_ref()
398
+
.and_then(|s| Cid::from_str(s).ok());
399
+
let prev_cid = event.prev_cid.as_ref().and_then(|s| Cid::from_str(s).ok());
400
+
let block_cids = event
401
+
.blocks_cids
402
+
.as_ref()
403
+
.map(|cids| cids.iter().filter_map(|s| Cid::from_str(s).ok()).collect())
404
+
.unwrap_or_else(Vec::new);
405
+
commit_cid.into_iter().chain(prev_cid).chain(block_cids)
406
+
})
407
+
.collect();
416
408
all_cids.sort();
417
409
all_cids.dedup();
418
410
if all_cids.is_empty() {
419
411
return Ok(HashMap::new());
420
412
}
421
413
let fetched = state.block_store.get_many(&all_cids).await?;
422
-
let mut blocks_map = HashMap::with_capacity(all_cids.len());
423
-
for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) {
424
-
if let Some(data) = data_opt {
425
-
blocks_map.insert(cid, data);
426
-
}
427
-
}
414
+
let blocks_map: HashMap<Cid, Bytes> = all_cids
415
+
.into_iter()
416
+
.zip(fetched)
417
+
.filter_map(|(cid, data_opt)| data_opt.map(|data| (cid, data)))
418
+
.collect();
428
419
Ok(blocks_map)
429
420
}
430
421
···
511
502
frame.since = Some(rev);
512
503
}
513
504
let car_bytes = if !all_cids.is_empty() {
514
-
let mut blocks = BTreeMap::new();
515
-
let mut commit_bytes_for_car: Option<Bytes> = None;
516
-
for cid in all_cids {
517
-
if let Some(data) = prefetched.get(&cid) {
518
-
if cid == commit_cid {
519
-
commit_bytes_for_car = Some(data.clone());
520
-
} else {
521
-
blocks.insert(cid, data.clone());
522
-
}
523
-
}
524
-
}
505
+
let (commit_data, other_blocks): (Vec<_>, Vec<_>) = all_cids
506
+
.into_iter()
507
+
.filter_map(|cid| prefetched.get(&cid).map(|data| (cid, data.clone())))
508
+
.partition(|(cid, _)| *cid == commit_cid);
509
+
let commit_bytes_for_car = commit_data.into_iter().next().map(|(_, data)| data);
510
+
let blocks: BTreeMap<Cid, Bytes> = other_blocks.into_iter().collect();
525
511
write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await?
526
512
} else {
527
513
Vec::new()
+14
-16
crates/tranquil-pds/tests/common/mod.rs
+14
-16
crates/tranquil-pds/tests/common/mod.rs
···
256
256
.unwrap_or_default()
257
257
.to_string();
258
258
259
-
if let Ok(body) = serde_json::from_slice::<Value>(request.body.as_slice()) {
260
-
if let Ok(mut store) = self.store.write() {
261
-
store.insert(did, body);
262
-
}
259
+
if let Ok(body) = serde_json::from_slice::<Value>(request.body.as_slice())
260
+
&& let Ok(mut store) = self.store.write()
261
+
{
262
+
store.insert(did, body);
263
263
}
264
264
ResponseTemplate::new(200)
265
265
}
···
298
298
299
299
match endpoint {
300
300
"/log/last" => {
301
-
let response = operation
302
-
.cloned()
303
-
.unwrap_or_else(|| {
304
-
json!({
305
-
"type": "plc_operation",
306
-
"rotationKeys": [],
307
-
"verificationMethods": {},
308
-
"alsoKnownAs": [],
309
-
"services": {},
310
-
"prev": null
311
-
})
312
-
});
301
+
let response = operation.cloned().unwrap_or_else(|| {
302
+
json!({
303
+
"type": "plc_operation",
304
+
"rotationKeys": [],
305
+
"verificationMethods": {},
306
+
"alsoKnownAs": [],
307
+
"services": {},
308
+
"prev": null
309
+
})
310
+
});
313
311
ResponseTemplate::new(200).set_body_json(response)
314
312
}
315
313
"/log/audit" => ResponseTemplate::new(200).set_body_json(json!([])),
+1
-4
crates/tranquil-pds/tests/import_verification.rs
+1
-4
crates/tranquil-pds/tests/import_verification.rs
···
159
159
let status = import_res.status();
160
160
if status != StatusCode::OK {
161
161
let body = import_res.text().await.unwrap_or_default();
162
-
panic!(
163
-
"Import failed with status {}: {}",
164
-
status, body
165
-
);
162
+
panic!("Import failed with status {}: {}", status, body);
166
163
}
167
164
}
168
165