+1
-1
src/api/admin/account/delete.rs
+1
-1
src/api/admin/account/delete.rs
+4
-3
src/api/admin/account/update.rs
+4
-3
src/api/admin/account/update.rs
···
2
use crate::api::error::ApiError;
3
use crate::auth::BearerAuthAdmin;
4
use crate::state::AppState;
5
-
use crate::types::{Did, PlainPassword};
6
use axum::{
7
Json,
8
extract::State,
···
103
let _ = state.cache.delete(&format!("handle:{}", old)).await;
104
}
105
let _ = state.cache.delete(&format!("handle:{}", handle)).await;
106
if let Err(e) = crate::api::repo::record::sequence_identity_event(
107
&state,
108
-
did.as_str(),
109
-
Some(&handle),
110
)
111
.await
112
{
···
2
use crate::api::error::ApiError;
3
use crate::auth::BearerAuthAdmin;
4
use crate::state::AppState;
5
+
use crate::types::{Did, Handle, PlainPassword};
6
use axum::{
7
Json,
8
extract::State,
···
103
let _ = state.cache.delete(&format!("handle:{}", old)).await;
104
}
105
let _ = state.cache.delete(&format!("handle:{}", handle)).await;
106
+
let handle_typed = Handle::new_unchecked(&handle);
107
if let Err(e) = crate::api::repo::record::sequence_identity_event(
108
&state,
109
+
did,
110
+
Some(&handle_typed),
111
)
112
.await
113
{
+10
-8
src/api/admin/status.rs
+10
-8
src/api/admin/status.rs
···
1
use crate::api::error::ApiError;
2
use crate::auth::BearerAuthAdmin;
3
use crate::state::AppState;
4
use axum::{
5
Json,
6
extract::{Query, State},
···
183
let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
184
match subject_type {
185
Some("com.atproto.admin.defs#repoRef") => {
186
-
let did = input.subject.get("did").and_then(|d| d.as_str());
187
-
if let Some(did) = did {
188
let mut tx = match state.db.begin().await {
189
Ok(tx) => tx,
190
Err(e) => {
···
201
if let Err(e) = sqlx::query!(
202
"UPDATE users SET takedown_ref = $1 WHERE did = $2",
203
takedown_ref,
204
-
did
205
)
206
.execute(&mut *tx)
207
.await
···
217
let result = if deactivated.applied {
218
sqlx::query!(
219
"UPDATE users SET deactivated_at = NOW() WHERE did = $1",
220
-
did
221
)
222
.execute(&mut *tx)
223
.await
224
} else {
225
-
sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
226
.execute(&mut *tx)
227
.await
228
};
···
249
};
250
if let Err(e) = crate::api::repo::record::sequence_account_event(
251
&state,
252
-
did,
253
!takedown.applied,
254
status,
255
)
···
266
};
267
if let Err(e) = crate::api::repo::record::sequence_account_event(
268
&state,
269
-
did,
270
!deactivated.applied,
271
status,
272
)
···
276
}
277
}
278
if let Ok(Some(handle)) =
279
-
sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
280
.fetch_optional(&state.db)
281
.await
282
{
···
1
use crate::api::error::ApiError;
2
use crate::auth::BearerAuthAdmin;
3
use crate::state::AppState;
4
+
use crate::types::Did;
5
use axum::{
6
Json,
7
extract::{Query, State},
···
184
let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
185
match subject_type {
186
Some("com.atproto.admin.defs#repoRef") => {
187
+
let did_str = input.subject.get("did").and_then(|d| d.as_str());
188
+
if let Some(did_str) = did_str {
189
+
let did = Did::new_unchecked(did_str);
190
let mut tx = match state.db.begin().await {
191
Ok(tx) => tx,
192
Err(e) => {
···
203
if let Err(e) = sqlx::query!(
204
"UPDATE users SET takedown_ref = $1 WHERE did = $2",
205
takedown_ref,
206
+
did.as_str()
207
)
208
.execute(&mut *tx)
209
.await
···
219
let result = if deactivated.applied {
220
sqlx::query!(
221
"UPDATE users SET deactivated_at = NOW() WHERE did = $1",
222
+
did.as_str()
223
)
224
.execute(&mut *tx)
225
.await
226
} else {
227
+
sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did.as_str())
228
.execute(&mut *tx)
229
.await
230
};
···
251
};
252
if let Err(e) = crate::api::repo::record::sequence_account_event(
253
&state,
254
+
&did,
255
!takedown.applied,
256
status,
257
)
···
268
};
269
if let Err(e) = crate::api::repo::record::sequence_account_event(
270
&state,
271
+
&did,
272
!deactivated.applied,
273
status,
274
)
···
278
}
279
}
280
if let Ok(Some(handle)) =
281
+
sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str())
282
.fetch_optional(&state.db)
283
.await
284
{
+12
-9
src/api/delegation.rs
+12
-9
src/api/delegation.rs
···
4
use crate::delegation::{self, DelegationActionType};
5
use crate::oauth::db as oauth_db;
6
use crate::state::{AppState, RateLimitKind};
7
-
use crate::types::{Did, Handle};
8
use crate::util::extract_client_ip;
9
use axum::{
10
Json,
···
568
.into_response();
569
}
570
571
-
let did = genesis_result.did;
572
info!(did = %did, handle = %handle, controller = %&auth.0.did, "Created DID for delegated account");
573
574
let mut tx = match state.db.begin().await {
···
585
account_type, preferred_comms_channel
586
) VALUES ($1, $2, $3, NULL, FALSE, 'delegated'::account_type, 'email'::comms_channel) RETURNING id"#,
587
)
588
-
.bind(&handle)
589
.bind(&email)
590
-
.bind(&did)
591
.fetch_one(&mut *tx)
592
.await;
593
···
633
if let Err(e) = sqlx::query!(
634
r#"INSERT INTO account_delegations (delegated_did, controller_did, granted_scopes, granted_by)
635
VALUES ($1, $2, $3, $4)"#,
636
-
did,
637
-
&auth.0.did,
638
input.controller_scopes,
639
-
&auth.0.did
640
)
641
.execute(&mut *tx)
642
.await
···
736
"$type": "app.bsky.actor.profile",
737
"displayName": handle
738
});
739
if let Err(e) = crate::api::repo::record::create_record_internal(
740
&state,
741
&did,
742
-
"app.bsky.actor.profile",
743
-
"self",
744
&profile_record,
745
)
746
.await
···
4
use crate::delegation::{self, DelegationActionType};
5
use crate::oauth::db as oauth_db;
6
use crate::state::{AppState, RateLimitKind};
7
+
use crate::types::{Did, Handle, Nsid, Rkey};
8
use crate::util::extract_client_ip;
9
use axum::{
10
Json,
···
568
.into_response();
569
}
570
571
+
let did = Did::new_unchecked(&genesis_result.did);
572
+
let handle = Handle::new_unchecked(&handle);
573
info!(did = %did, handle = %handle, controller = %&auth.0.did, "Created DID for delegated account");
574
575
let mut tx = match state.db.begin().await {
···
586
account_type, preferred_comms_channel
587
) VALUES ($1, $2, $3, NULL, FALSE, 'delegated'::account_type, 'email'::comms_channel) RETURNING id"#,
588
)
589
+
.bind(handle.as_str())
590
.bind(&email)
591
+
.bind(did.as_str())
592
.fetch_one(&mut *tx)
593
.await;
594
···
634
if let Err(e) = sqlx::query!(
635
r#"INSERT INTO account_delegations (delegated_did, controller_did, granted_scopes, granted_by)
636
VALUES ($1, $2, $3, $4)"#,
637
+
did.as_str(),
638
+
auth.0.did.as_str(),
639
input.controller_scopes,
640
+
auth.0.did.as_str()
641
)
642
.execute(&mut *tx)
643
.await
···
737
"$type": "app.bsky.actor.profile",
738
"displayName": handle
739
});
740
+
let profile_collection = Nsid::new_unchecked("app.bsky.actor.profile");
741
+
let profile_rkey = Rkey::new_unchecked("self");
742
if let Err(e) = crate::api::repo::record::create_record_internal(
743
&state,
744
&did,
745
+
&profile_collection,
746
+
&profile_rkey,
747
&profile_record,
748
)
749
.await
+14
-9
src/api/identity/account.rs
+14
-9
src/api/identity/account.rs
···
4
use crate::auth::{ServiceTokenVerifier, is_service_token};
5
use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
6
use crate::state::{AppState, RateLimitKind};
7
-
use crate::types::{Did, Handle, PlainPassword};
8
use crate::validation::validate_password;
9
use axum::{
10
Json,
···
710
}
711
};
712
let rev = Tid::now(LimitedU32::MIN);
713
let (commit_bytes, _sig) =
714
-
match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) {
715
Ok(result) => result,
716
Err(e) => {
717
error!("Error creating genesis commit: {:?}", e);
···
793
return ApiError::InternalError(None).into_response();
794
}
795
if !is_migration && !is_did_web_byod {
796
if let Err(e) =
797
-
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
798
{
799
warn!("Failed to sequence identity event for {}: {}", did, e);
800
}
801
if let Err(e) =
802
-
crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
803
{
804
warn!("Failed to sequence account event for {}: {}", did, e);
805
}
806
if let Err(e) = crate::api::repo::record::sequence_genesis_commit(
807
&state,
808
-
&did,
809
&commit_cid,
810
&mst_root,
811
&rev_str,
···
816
}
817
if let Err(e) = crate::api::repo::record::sequence_sync_event(
818
&state,
819
-
&did,
820
&commit_cid_str,
821
Some(rev.as_ref()),
822
)
···
828
"$type": "app.bsky.actor.profile",
829
"displayName": input.handle
830
});
831
if let Err(e) = crate::api::repo::record::create_record_internal(
832
&state,
833
-
&did,
834
-
"app.bsky.actor.profile",
835
-
"self",
836
&profile_record,
837
)
838
.await
···
4
use crate::auth::{ServiceTokenVerifier, is_service_token};
5
use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
6
use crate::state::{AppState, RateLimitKind};
7
+
use crate::types::{Did, Handle, Nsid, PlainPassword, Rkey};
8
use crate::validation::validate_password;
9
use axum::{
10
Json,
···
710
}
711
};
712
let rev = Tid::now(LimitedU32::MIN);
713
+
let did_for_commit = Did::new_unchecked(&did);
714
let (commit_bytes, _sig) =
715
+
match create_signed_commit(&did_for_commit, mst_root, rev.as_ref(), None, &signing_key) {
716
Ok(result) => result,
717
Err(e) => {
718
error!("Error creating genesis commit: {:?}", e);
···
794
return ApiError::InternalError(None).into_response();
795
}
796
if !is_migration && !is_did_web_byod {
797
+
let did_typed = Did::new_unchecked(&did);
798
+
let handle_typed = Handle::new_unchecked(&handle);
799
if let Err(e) =
800
+
crate::api::repo::record::sequence_identity_event(&state, &did_typed, Some(&handle_typed)).await
801
{
802
warn!("Failed to sequence identity event for {}: {}", did, e);
803
}
804
if let Err(e) =
805
+
crate::api::repo::record::sequence_account_event(&state, &did_typed, true, None).await
806
{
807
warn!("Failed to sequence account event for {}: {}", did, e);
808
}
809
if let Err(e) = crate::api::repo::record::sequence_genesis_commit(
810
&state,
811
+
&did_typed,
812
&commit_cid,
813
&mst_root,
814
&rev_str,
···
819
}
820
if let Err(e) = crate::api::repo::record::sequence_sync_event(
821
&state,
822
+
&did_typed,
823
&commit_cid_str,
824
Some(rev.as_ref()),
825
)
···
831
"$type": "app.bsky.actor.profile",
832
"displayName": input.handle
833
});
834
+
let profile_collection = Nsid::new_unchecked("app.bsky.actor.profile");
835
+
let profile_rkey = Rkey::new_unchecked("self");
836
if let Err(e) = crate::api::repo::record::create_record_internal(
837
&state,
838
+
&did_typed,
839
+
&profile_collection,
840
+
&profile_rkey,
841
&profile_record,
842
)
843
.await
+7
-3
src/api/identity/did.rs
+7
-3
src/api/identity/did.rs
···
2
use crate::auth::BearerAuthAllowDeactivated;
3
use crate::plc::signing_key_to_did_key;
4
use crate::state::AppState;
5
use axum::{
6
Json,
7
extract::{Path, Query, State},
···
669
format!("{}.{}", new_handle, hostname)
670
};
671
if full_handle == current_handle {
672
if let Err(e) =
673
-
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&full_handle))
674
.await
675
{
676
warn!("Failed to sequence identity event for handle update: {}", e);
···
692
full_handle
693
} else {
694
if new_handle == current_handle {
695
if let Err(e) =
696
-
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&new_handle))
697
.await
698
{
699
warn!("Failed to sequence identity event for handle update: {}", e);
···
749
.await;
750
}
751
let _ = state.cache.delete(&format!("handle:{}", handle)).await;
752
if let Err(e) =
753
-
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
754
{
755
warn!("Failed to sequence identity event for handle update: {}", e);
756
}
···
2
use crate::auth::BearerAuthAllowDeactivated;
3
use crate::plc::signing_key_to_did_key;
4
use crate::state::AppState;
5
+
use crate::types::Handle;
6
use axum::{
7
Json,
8
extract::{Path, Query, State},
···
670
format!("{}.{}", new_handle, hostname)
671
};
672
if full_handle == current_handle {
673
+
let handle_typed = Handle::new_unchecked(&full_handle);
674
if let Err(e) =
675
+
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle_typed))
676
.await
677
{
678
warn!("Failed to sequence identity event for handle update: {}", e);
···
694
full_handle
695
} else {
696
if new_handle == current_handle {
697
+
let handle_typed = Handle::new_unchecked(&new_handle);
698
if let Err(e) =
699
+
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle_typed))
700
.await
701
{
702
warn!("Failed to sequence identity event for handle update: {}", e);
···
752
.await;
753
}
754
let _ = state.cache.delete(&format!("handle:{}", handle)).await;
755
+
let handle_typed = Handle::new_unchecked(&handle);
756
if let Err(e) =
757
+
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle_typed)).await
758
{
759
warn!("Failed to sequence identity event for handle update: {}", e);
760
}
+190
-139
src/api/repo/record/batch.rs
+190
-139
src/api/repo/record/batch.rs
···
6
use crate::delegation::{self, DelegationActionType};
7
use crate::repo::tracking::TrackingBlockStore;
8
use crate::state::AppState;
9
-
use crate::types::{AtIdentifier, AtUri, Nsid, Rkey};
10
use axum::{
11
Json,
12
extract::State,
···
22
use tracing::{error, info};
23
24
const MAX_BATCH_WRITES: usize = 200;
25
26
#[derive(Deserialize)]
27
#[serde(tag = "$type")]
···
237
_ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
238
};
239
let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
240
-
let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
241
-
let mut results: Vec<WriteResult> = Vec::new();
242
-
let mut ops: Vec<RecordOp> = Vec::new();
243
-
let mut modified_keys: Vec<String> = Vec::new();
244
-
let mut all_blob_cids: Vec<String> = Vec::new();
245
-
for write in &input.writes {
246
-
match write {
247
-
WriteOp::Create {
248
-
collection,
249
-
rkey,
250
-
value,
251
-
} => {
252
-
let validation_status = if input.validate == Some(false) {
253
-
None
254
-
} else {
255
-
let require_lexicon = input.validate == Some(true);
256
-
match validate_record_with_status(
257
-
value,
258
-
collection,
259
-
rkey.as_ref().map(|r| r.as_str()),
260
-
require_lexicon,
261
-
) {
262
-
Ok(status) => Some(status),
263
-
Err(err_response) => return *err_response,
264
-
}
265
-
};
266
-
all_blob_cids.extend(extract_blob_cids(value));
267
-
let rkey = rkey.clone().unwrap_or_else(Rkey::generate);
268
-
let record_ipld = crate::util::json_to_ipld(value);
269
-
let mut record_bytes = Vec::new();
270
-
if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
271
-
return ApiError::InvalidRecord("Failed to serialize record".into())
272
-
.into_response();
273
-
}
274
-
let record_cid = match tracking_store.put(&record_bytes).await {
275
-
Ok(c) => c,
276
-
Err(_) => {
277
-
return ApiError::InternalError(Some("Failed to store record".into()))
278
-
.into_response();
279
-
}
280
-
};
281
-
let key = format!("{}/{}", collection, rkey);
282
-
modified_keys.push(key.clone());
283
-
mst = match mst.add(&key, record_cid).await {
284
-
Ok(m) => m,
285
-
Err(_) => {
286
-
return ApiError::InternalError(Some("Failed to add to MST".into()))
287
-
.into_response();
288
-
}
289
-
};
290
-
let uri = AtUri::from_parts(&did, collection, &rkey);
291
-
results.push(WriteResult::CreateResult {
292
-
uri,
293
-
cid: record_cid.to_string(),
294
-
validation_status: validation_status.map(|s| s.to_string()),
295
-
});
296
-
ops.push(RecordOp::Create {
297
-
collection: collection.to_string(),
298
-
rkey: rkey.to_string(),
299
-
cid: record_cid,
300
-
});
301
-
}
302
-
WriteOp::Update {
303
-
collection,
304
-
rkey,
305
-
value,
306
-
} => {
307
-
let validation_status = if input.validate == Some(false) {
308
-
None
309
-
} else {
310
-
let require_lexicon = input.validate == Some(true);
311
-
match validate_record_with_status(
312
-
value,
313
-
collection,
314
-
Some(rkey.as_str()),
315
-
require_lexicon,
316
-
) {
317
-
Ok(status) => Some(status),
318
-
Err(err_response) => return *err_response,
319
-
}
320
-
};
321
-
all_blob_cids.extend(extract_blob_cids(value));
322
-
let record_ipld = crate::util::json_to_ipld(value);
323
-
let mut record_bytes = Vec::new();
324
-
if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
325
-
return ApiError::InvalidRecord("Failed to serialize record".into())
326
-
.into_response();
327
-
}
328
-
let record_cid = match tracking_store.put(&record_bytes).await {
329
-
Ok(c) => c,
330
-
Err(_) => {
331
-
return ApiError::InternalError(Some("Failed to store record".into()))
332
-
.into_response();
333
-
}
334
-
};
335
-
let key = format!("{}/{}", collection, rkey);
336
-
modified_keys.push(key.clone());
337
-
let prev_record_cid = mst.get(&key).await.ok().flatten();
338
-
mst = match mst.update(&key, record_cid).await {
339
-
Ok(m) => m,
340
-
Err(_) => {
341
-
return ApiError::InternalError(Some("Failed to update MST".into()))
342
-
.into_response();
343
-
}
344
-
};
345
-
let uri = AtUri::from_parts(&did, collection, rkey);
346
-
results.push(WriteResult::UpdateResult {
347
-
uri,
348
-
cid: record_cid.to_string(),
349
-
validation_status: validation_status.map(|s| s.to_string()),
350
-
});
351
-
ops.push(RecordOp::Update {
352
-
collection: collection.to_string(),
353
-
rkey: rkey.to_string(),
354
-
cid: record_cid,
355
-
prev: prev_record_cid,
356
-
});
357
-
}
358
-
WriteOp::Delete { collection, rkey } => {
359
-
let key = format!("{}/{}", collection, rkey);
360
-
modified_keys.push(key.clone());
361
-
let prev_record_cid = mst.get(&key).await.ok().flatten();
362
-
mst = match mst.delete(&key).await {
363
-
Ok(m) => m,
364
-
Err(_) => {
365
-
return ApiError::InternalError(Some("Failed to delete from MST".into()))
366
-
.into_response();
367
-
}
368
-
};
369
-
results.push(WriteResult::DeleteResult {});
370
-
ops.push(RecordOp::Delete {
371
-
collection: collection.to_string(),
372
-
rkey: rkey.to_string(),
373
-
prev: prev_record_cid,
374
-
});
375
-
}
376
-
}
377
-
}
378
let new_mst_root = match mst.persist().await {
379
Ok(c) => c,
380
Err(_) => {
···
6
use crate::delegation::{self, DelegationActionType};
7
use crate::repo::tracking::TrackingBlockStore;
8
use crate::state::AppState;
9
+
use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey};
10
use axum::{
11
Json,
12
extract::State,
···
22
use tracing::{error, info};
23
24
const MAX_BATCH_WRITES: usize = 200;
25
+
26
+
struct WriteAccumulator {
27
+
mst: Mst<TrackingBlockStore>,
28
+
results: Vec<WriteResult>,
29
+
ops: Vec<RecordOp>,
30
+
modified_keys: Vec<String>,
31
+
all_blob_cids: Vec<String>,
32
+
}
33
+
34
+
async fn process_single_write(
35
+
write: &WriteOp,
36
+
acc: WriteAccumulator,
37
+
did: &Did,
38
+
validate: Option<bool>,
39
+
tracking_store: &TrackingBlockStore,
40
+
) -> Result<WriteAccumulator, Response> {
41
+
let WriteAccumulator {
42
+
mst,
43
+
mut results,
44
+
mut ops,
45
+
mut modified_keys,
46
+
mut all_blob_cids,
47
+
} = acc;
48
+
49
+
match write {
50
+
WriteOp::Create {
51
+
collection,
52
+
rkey,
53
+
value,
54
+
} => {
55
+
let validation_status = match validate {
56
+
Some(false) => None,
57
+
_ => {
58
+
let require_lexicon = validate == Some(true);
59
+
match validate_record_with_status(
60
+
value,
61
+
collection,
62
+
rkey.as_ref(),
63
+
require_lexicon,
64
+
) {
65
+
Ok(status) => Some(status),
66
+
Err(err_response) => return Err(*err_response),
67
+
}
68
+
}
69
+
};
70
+
all_blob_cids.extend(extract_blob_cids(value));
71
+
let rkey = rkey.clone().unwrap_or_else(Rkey::generate);
72
+
let record_ipld = crate::util::json_to_ipld(value);
73
+
let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld).map_err(|_| {
74
+
ApiError::InvalidRecord("Failed to serialize record".into()).into_response()
75
+
})?;
76
+
let record_cid = tracking_store.put(&record_bytes).await.map_err(|_| {
77
+
ApiError::InternalError(Some("Failed to store record".into())).into_response()
78
+
})?;
79
+
let key = format!("{}/{}", collection, rkey);
80
+
modified_keys.push(key.clone());
81
+
let new_mst = mst.add(&key, record_cid).await.map_err(|_| {
82
+
ApiError::InternalError(Some("Failed to add to MST".into())).into_response()
83
+
})?;
84
+
let uri = AtUri::from_parts(did, collection, &rkey);
85
+
results.push(WriteResult::CreateResult {
86
+
uri,
87
+
cid: record_cid.to_string(),
88
+
validation_status: validation_status.map(|s| s.to_string()),
89
+
});
90
+
ops.push(RecordOp::Create {
91
+
collection: collection.clone(),
92
+
rkey: rkey.clone(),
93
+
cid: record_cid,
94
+
});
95
+
Ok(WriteAccumulator {
96
+
mst: new_mst,
97
+
results,
98
+
ops,
99
+
modified_keys,
100
+
all_blob_cids,
101
+
})
102
+
}
103
+
WriteOp::Update {
104
+
collection,
105
+
rkey,
106
+
value,
107
+
} => {
108
+
let validation_status = match validate {
109
+
Some(false) => None,
110
+
_ => {
111
+
let require_lexicon = validate == Some(true);
112
+
match validate_record_with_status(
113
+
value,
114
+
collection,
115
+
Some(rkey),
116
+
require_lexicon,
117
+
) {
118
+
Ok(status) => Some(status),
119
+
Err(err_response) => return Err(*err_response),
120
+
}
121
+
}
122
+
};
123
+
all_blob_cids.extend(extract_blob_cids(value));
124
+
let record_ipld = crate::util::json_to_ipld(value);
125
+
let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld).map_err(|_| {
126
+
ApiError::InvalidRecord("Failed to serialize record".into()).into_response()
127
+
})?;
128
+
let record_cid = tracking_store.put(&record_bytes).await.map_err(|_| {
129
+
ApiError::InternalError(Some("Failed to store record".into())).into_response()
130
+
})?;
131
+
let key = format!("{}/{}", collection, rkey);
132
+
modified_keys.push(key.clone());
133
+
let prev_record_cid = mst.get(&key).await.ok().flatten();
134
+
let new_mst = mst.update(&key, record_cid).await.map_err(|_| {
135
+
ApiError::InternalError(Some("Failed to update MST".into())).into_response()
136
+
})?;
137
+
let uri = AtUri::from_parts(did, collection, rkey);
138
+
results.push(WriteResult::UpdateResult {
139
+
uri,
140
+
cid: record_cid.to_string(),
141
+
validation_status: validation_status.map(|s| s.to_string()),
142
+
});
143
+
ops.push(RecordOp::Update {
144
+
collection: collection.clone(),
145
+
rkey: rkey.clone(),
146
+
cid: record_cid,
147
+
prev: prev_record_cid,
148
+
});
149
+
Ok(WriteAccumulator {
150
+
mst: new_mst,
151
+
results,
152
+
ops,
153
+
modified_keys,
154
+
all_blob_cids,
155
+
})
156
+
}
157
+
WriteOp::Delete { collection, rkey } => {
158
+
let key = format!("{}/{}", collection, rkey);
159
+
modified_keys.push(key.clone());
160
+
let prev_record_cid = mst.get(&key).await.ok().flatten();
161
+
let new_mst = mst.delete(&key).await.map_err(|_| {
162
+
ApiError::InternalError(Some("Failed to delete from MST".into())).into_response()
163
+
})?;
164
+
results.push(WriteResult::DeleteResult {});
165
+
ops.push(RecordOp::Delete {
166
+
collection: collection.clone(),
167
+
rkey: rkey.clone(),
168
+
prev: prev_record_cid,
169
+
});
170
+
Ok(WriteAccumulator {
171
+
mst: new_mst,
172
+
results,
173
+
ops,
174
+
modified_keys,
175
+
all_blob_cids,
176
+
})
177
+
}
178
+
}
179
+
}
180
+
181
+
async fn process_writes(
182
+
writes: &[WriteOp],
183
+
initial_mst: Mst<TrackingBlockStore>,
184
+
did: &Did,
185
+
validate: Option<bool>,
186
+
tracking_store: &TrackingBlockStore,
187
+
) -> Result<WriteAccumulator, Response> {
188
+
use futures::stream::{self, TryStreamExt};
189
+
let initial_acc = WriteAccumulator {
190
+
mst: initial_mst,
191
+
results: Vec::new(),
192
+
ops: Vec::new(),
193
+
modified_keys: Vec::new(),
194
+
all_blob_cids: Vec::new(),
195
+
};
196
+
stream::iter(writes.iter().map(Ok::<_, Response>))
197
+
.try_fold(initial_acc, |acc, write| async move {
198
+
process_single_write(write, acc, did, validate, tracking_store).await
199
+
})
200
+
.await
201
+
}
202
203
#[derive(Deserialize)]
204
#[serde(tag = "$type")]
···
414
_ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
415
};
416
let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
417
+
let initial_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
418
+
let WriteAccumulator {
419
+
mst,
420
+
results,
421
+
ops,
422
+
modified_keys,
423
+
all_blob_cids,
424
+
} = match process_writes(&input.writes, initial_mst, &did, input.validate, &tracking_store).await
425
+
{
426
+
Ok(acc) => acc,
427
+
Err(response) => return response,
428
+
};
429
let new_mst_root = match mst.persist().await {
430
Ok(c) => c,
431
Err(_) => {
+2
-9
src/api/repo/record/delete.rs
+2
-9
src/api/repo/record/delete.rs
···
65
return e;
66
}
67
68
-
if crate::util::is_account_migrated(&state.db, &auth.did)
69
-
.await
70
-
.unwrap_or(false)
71
-
{
72
-
return ApiError::AccountMigrated.into_response();
73
-
}
74
-
75
let did = auth.did;
76
let user_id = auth.user_id;
77
let current_root_cid = auth.current_root_cid;
···
125
let collection_for_audit = input.collection.to_string();
126
let rkey_for_audit = input.rkey.to_string();
127
let op = RecordOp::Delete {
128
-
collection: input.collection.to_string(),
129
-
rkey: rkey_for_audit.clone(),
130
prev: prev_record_cid,
131
};
132
let mut new_mst_blocks = std::collections::BTreeMap::new();
···
65
return e;
66
}
67
68
let did = auth.did;
69
let user_id = auth.user_id;
70
let current_root_cid = auth.current_root_cid;
···
118
let collection_for_audit = input.collection.to_string();
119
let rkey_for_audit = input.rkey.to_string();
120
let op = RecordOp::Delete {
121
+
collection: input.collection.clone(),
122
+
rkey: input.rkey.clone(),
123
prev: prev_record_cid,
124
};
125
let mut new_mst_blocks = std::collections::BTreeMap::new();
+24
-23
src/api/repo/record/read.rs
+24
-23
src/api/repo/record/read.rs
···
13
use jacquard_repo::storage::BlockStore;
14
use serde::{Deserialize, Serialize};
15
use serde_json::{Map, Value, json};
16
-
use std::collections::HashMap;
17
use std::str::FromStr;
18
use tracing::error;
19
···
237
}
238
};
239
let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
240
-
let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
241
-
let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
242
-
for (rkey, cid_str) in &rows {
243
-
if let Ok(cid) = Cid::from_str(cid_str) {
244
-
cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
245
-
cids.push(cid);
246
-
}
247
-
}
248
let blocks = match state.block_store.get_many(&cids).await {
249
Ok(b) => b,
250
Err(e) => {
···
252
return ApiError::InternalError(None).into_response();
253
}
254
};
255
-
let mut records = Vec::new();
256
-
for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
257
-
if let Some(block) = block_opt
258
-
&& let Some((rkey, cid_str)) = cid_to_rkey.get(cid)
259
-
&& let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block)
260
-
{
261
-
let value = ipld_to_json(ipld);
262
-
records.push(json!({
263
-
"uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
264
-
"cid": cid_str,
265
-
"value": value
266
-
}));
267
-
}
268
-
}
269
Json(ListRecordsOutput {
270
cursor: last_rkey,
271
records,
···
13
use jacquard_repo::storage::BlockStore;
14
use serde::{Deserialize, Serialize};
15
use serde_json::{Map, Value, json};
16
use std::str::FromStr;
17
use tracing::error;
18
···
236
}
237
};
238
let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
239
+
let parsed_rows: Vec<(Cid, String, String)> = rows
240
+
.iter()
241
+
.filter_map(|(rkey, cid_str)| {
242
+
Cid::from_str(cid_str)
243
+
.ok()
244
+
.map(|cid| (cid, rkey.clone(), cid_str.clone()))
245
+
})
246
+
.collect();
247
+
let cids: Vec<Cid> = parsed_rows.iter().map(|(cid, _, _)| *cid).collect();
248
let blocks = match state.block_store.get_many(&cids).await {
249
Ok(b) => b,
250
Err(e) => {
···
252
return ApiError::InternalError(None).into_response();
253
}
254
};
255
+
let records: Vec<Value> = parsed_rows
256
+
.iter()
257
+
.zip(blocks.into_iter())
258
+
.filter_map(|((_, rkey, cid_str), block_opt)| {
259
+
block_opt.and_then(|block| {
260
+
serde_ipld_dagcbor::from_slice::<Ipld>(&block).ok().map(|ipld| {
261
+
json!({
262
+
"uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
263
+
"cid": cid_str,
264
+
"value": ipld_to_json(ipld)
265
+
})
266
+
})
267
+
})
268
+
})
269
+
.collect();
270
Json(ListRecordsOutput {
271
cursor: last_rkey,
272
records,
+104
-63
src/api/repo/record/utils.rs
+104
-63
src/api/repo/record/utils.rs
···
1
use crate::state::AppState;
2
use bytes::Bytes;
3
use cid::Cid;
4
use jacquard::types::{integer::LimitedU32, string::Tid};
···
38
}
39
40
pub fn create_signed_commit(
41
-
did: &str,
42
data: Cid,
43
rev: &str,
44
prev: Option<Cid>,
45
signing_key: &SigningKey,
46
) -> Result<(Vec<u8>, Bytes), String> {
47
-
let did =
48
-
jacquard::types::string::Did::new(did).map_err(|e| format!("Invalid DID: {:?}", e))?;
49
let rev =
50
jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?;
51
let unsigned = Commit::new_unsigned(did, data, rev, prev);
···
61
62
pub enum RecordOp {
63
Create {
64
-
collection: String,
65
-
rkey: String,
66
cid: Cid,
67
},
68
Update {
69
-
collection: String,
70
-
rkey: String,
71
cid: Cid,
72
prev: Option<Cid>,
73
},
74
Delete {
75
-
collection: String,
76
-
rkey: String,
77
prev: Option<Cid>,
78
},
79
}
···
84
}
85
86
pub struct CommitParams<'a> {
87
-
pub did: &'a str,
88
pub user_id: Uuid,
89
pub current_root_cid: Option<Cid>,
90
pub prev_data_cid: Option<Cid>,
···
218
.await
219
.map_err(|e| format!("DB Error (user_blocks delete obsolete): {}", e))?;
220
}
221
-
let mut upsert_collections: Vec<String> = Vec::new();
222
-
let mut upsert_rkeys: Vec<String> = Vec::new();
223
-
let mut upsert_cids: Vec<String> = Vec::new();
224
-
let mut delete_collections: Vec<String> = Vec::new();
225
-
let mut delete_rkeys: Vec<String> = Vec::new();
226
-
for op in &ops {
227
-
match op {
228
-
RecordOp::Create {
229
-
collection,
230
-
rkey,
231
-
cid,
232
-
}
233
-
| RecordOp::Update {
234
-
collection,
235
-
rkey,
236
-
cid,
237
-
..
238
-
} => {
239
-
upsert_collections.push(collection.clone());
240
-
upsert_rkeys.push(rkey.clone());
241
-
upsert_cids.push(cid.to_string());
242
-
}
243
RecordOp::Delete {
244
collection, rkey, ..
245
-
} => {
246
-
delete_collections.push(collection.clone());
247
-
delete_rkeys.push(rkey.clone());
248
-
}
249
-
}
250
-
}
251
if !upsert_collections.is_empty() {
252
sqlx::query!(
253
r#"
···
337
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
338
RETURNING seq
339
"#,
340
-
did,
341
event_type,
342
new_root_cid.to_string(),
343
prev_cid_str,
···
367
}
368
pub async fn create_record_internal(
369
state: &AppState,
370
-
did: &str,
371
-
collection: &str,
372
-
rkey: &str,
373
record: &serde_json::Value,
374
) -> Result<(String, Cid), String> {
375
use crate::repo::tracking::TrackingBlockStore;
376
use jacquard_repo::mst::Mst;
377
use std::sync::Arc;
378
-
let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
379
.fetch_optional(&state.db)
380
.await
381
.map_err(|e| format!("DB error: {}", e))?
···
417
.await
418
.map_err(|e| format!("Failed to persist MST: {:?}", e))?;
419
let op = RecordOp::Create {
420
-
collection: collection.to_string(),
421
-
rkey: rkey.to_string(),
422
cid: record_cid,
423
};
424
let mut new_mst_blocks = std::collections::BTreeMap::new();
···
471
472
pub async fn sequence_identity_event(
473
state: &AppState,
474
-
did: &str,
475
-
handle: Option<&str>,
476
) -> Result<i64, String> {
477
let seq_row = sqlx::query!(
478
r#"
479
INSERT INTO repo_seq (did, event_type, handle)
480
VALUES ($1, 'identity', $2)
481
RETURNING seq
482
"#,
483
-
did,
484
-
handle,
485
)
486
-
.fetch_one(&state.db)
487
.await
488
.map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
489
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
490
-
.execute(&state.db)
491
.await
492
.map_err(|e| format!("DB Error (notify): {}", e))?;
493
Ok(seq_row.seq)
494
}
495
pub async fn sequence_account_event(
496
state: &AppState,
497
-
did: &str,
498
active: bool,
499
status: Option<&str>,
500
) -> Result<i64, String> {
501
let seq_row = sqlx::query!(
502
r#"
503
INSERT INTO repo_seq (did, event_type, active, status)
504
VALUES ($1, 'account', $2, $3)
505
RETURNING seq
506
"#,
507
-
did,
508
active,
509
status,
510
)
511
-
.fetch_one(&state.db)
512
.await
513
.map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
514
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
515
-
.execute(&state.db)
516
.await
517
.map_err(|e| format!("DB Error (notify): {}", e))?;
518
Ok(seq_row.seq)
519
}
520
pub async fn sequence_sync_event(
521
state: &AppState,
522
-
did: &str,
523
commit_cid: &str,
524
rev: Option<&str>,
525
) -> Result<i64, String> {
526
let seq_row = sqlx::query!(
527
r#"
528
INSERT INTO repo_seq (did, event_type, commit_cid, rev)
529
VALUES ($1, 'sync', $2, $3)
530
RETURNING seq
531
"#,
532
-
did,
533
commit_cid,
534
rev,
535
)
536
-
.fetch_one(&state.db)
537
.await
538
.map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
539
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
540
-
.execute(&state.db)
541
.await
542
.map_err(|e| format!("DB Error (notify): {}", e))?;
543
Ok(seq_row.seq)
544
}
545
546
pub async fn sequence_genesis_commit(
547
state: &AppState,
548
-
did: &str,
549
commit_cid: &Cid,
550
mst_root_cid: &Cid,
551
rev: &str,
···
555
let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
556
let prev_cid: Option<&str> = None;
557
let commit_cid_str = commit_cid.to_string();
558
let seq_row = sqlx::query!(
559
r#"
560
INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev)
561
VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7)
562
RETURNING seq
563
"#,
564
-
did,
565
commit_cid_str,
566
prev_cid,
567
ops,
···
569
&blocks_cids,
570
rev
571
)
572
-
.fetch_one(&state.db)
573
.await
574
.map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?;
575
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
576
-
.execute(&state.db)
577
.await
578
.map_err(|e| format!("DB Error (notify): {}", e))?;
579
Ok(seq_row.seq)
580
}
···
1
use crate::state::AppState;
2
+
use crate::types::{Did, Handle, Nsid, Rkey};
3
use bytes::Bytes;
4
use cid::Cid;
5
use jacquard::types::{integer::LimitedU32, string::Tid};
···
39
}
40
41
pub fn create_signed_commit(
42
+
did: &Did,
43
data: Cid,
44
rev: &str,
45
prev: Option<Cid>,
46
signing_key: &SigningKey,
47
) -> Result<(Vec<u8>, Bytes), String> {
48
+
let did = jacquard::types::string::Did::new(did.as_str())
49
+
.map_err(|e| format!("Invalid DID: {:?}", e))?;
50
let rev =
51
jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?;
52
let unsigned = Commit::new_unsigned(did, data, rev, prev);
···
62
63
pub enum RecordOp {
64
Create {
65
+
collection: Nsid,
66
+
rkey: Rkey,
67
cid: Cid,
68
},
69
Update {
70
+
collection: Nsid,
71
+
rkey: Rkey,
72
cid: Cid,
73
prev: Option<Cid>,
74
},
75
Delete {
76
+
collection: Nsid,
77
+
rkey: Rkey,
78
prev: Option<Cid>,
79
},
80
}
···
85
}
86
87
pub struct CommitParams<'a> {
88
+
pub did: &'a Did,
89
pub user_id: Uuid,
90
pub current_root_cid: Option<Cid>,
91
pub prev_data_cid: Option<Cid>,
···
219
.await
220
.map_err(|e| format!("DB Error (user_blocks delete obsolete): {}", e))?;
221
}
222
+
let (upserts, deletes): (Vec<_>, Vec<_>) = ops.iter().partition(|op| {
223
+
matches!(op, RecordOp::Create { .. } | RecordOp::Update { .. })
224
+
});
225
+
let (upsert_collections, upsert_rkeys, upsert_cids): (Vec<String>, Vec<String>, Vec<String>) =
226
+
upserts
227
+
.into_iter()
228
+
.filter_map(|op| match op {
229
+
RecordOp::Create {
230
+
collection,
231
+
rkey,
232
+
cid,
233
+
}
234
+
| RecordOp::Update {
235
+
collection,
236
+
rkey,
237
+
cid,
238
+
..
239
+
} => Some((collection.to_string(), rkey.to_string(), cid.to_string())),
240
+
_ => None,
241
+
})
242
+
.fold(
243
+
(Vec::new(), Vec::new(), Vec::new()),
244
+
|(mut cols, mut rkeys, mut cids), (c, r, ci)| {
245
+
cols.push(c);
246
+
rkeys.push(r);
247
+
cids.push(ci);
248
+
(cols, rkeys, cids)
249
+
},
250
+
);
251
+
let (delete_collections, delete_rkeys): (Vec<String>, Vec<String>) = deletes
252
+
.into_iter()
253
+
.filter_map(|op| match op {
254
RecordOp::Delete {
255
collection, rkey, ..
256
+
} => Some((collection.to_string(), rkey.to_string())),
257
+
_ => None,
258
+
})
259
+
.unzip();
260
if !upsert_collections.is_empty() {
261
sqlx::query!(
262
r#"
···
346
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
347
RETURNING seq
348
"#,
349
+
did.as_str(),
350
event_type,
351
new_root_cid.to_string(),
352
prev_cid_str,
···
376
}
377
pub async fn create_record_internal(
378
state: &AppState,
379
+
did: &Did,
380
+
collection: &Nsid,
381
+
rkey: &Rkey,
382
record: &serde_json::Value,
383
) -> Result<(String, Cid), String> {
384
use crate::repo::tracking::TrackingBlockStore;
385
use jacquard_repo::mst::Mst;
386
use std::sync::Arc;
387
+
let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
388
.fetch_optional(&state.db)
389
.await
390
.map_err(|e| format!("DB error: {}", e))?
···
426
.await
427
.map_err(|e| format!("Failed to persist MST: {:?}", e))?;
428
let op = RecordOp::Create {
429
+
collection: collection.clone(),
430
+
rkey: rkey.clone(),
431
cid: record_cid,
432
};
433
let mut new_mst_blocks = std::collections::BTreeMap::new();
···
480
481
pub async fn sequence_identity_event(
482
state: &AppState,
483
+
did: &Did,
484
+
handle: Option<&Handle>,
485
) -> Result<i64, String> {
486
+
let mut tx = state
487
+
.db
488
+
.begin()
489
+
.await
490
+
.map_err(|e| format!("Failed to begin transaction: {}", e))?;
491
let seq_row = sqlx::query!(
492
r#"
493
INSERT INTO repo_seq (did, event_type, handle)
494
VALUES ($1, 'identity', $2)
495
RETURNING seq
496
"#,
497
+
did.as_str(),
498
+
handle.map(|h| h.as_str()),
499
)
500
+
.fetch_one(&mut *tx)
501
.await
502
.map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
503
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
504
+
.execute(&mut *tx)
505
.await
506
.map_err(|e| format!("DB Error (notify): {}", e))?;
507
+
tx.commit()
508
+
.await
509
+
.map_err(|e| format!("Failed to commit transaction: {}", e))?;
510
Ok(seq_row.seq)
511
}
512
pub async fn sequence_account_event(
513
state: &AppState,
514
+
did: &Did,
515
active: bool,
516
status: Option<&str>,
517
) -> Result<i64, String> {
518
+
let mut tx = state
519
+
.db
520
+
.begin()
521
+
.await
522
+
.map_err(|e| format!("Failed to begin transaction: {}", e))?;
523
let seq_row = sqlx::query!(
524
r#"
525
INSERT INTO repo_seq (did, event_type, active, status)
526
VALUES ($1, 'account', $2, $3)
527
RETURNING seq
528
"#,
529
+
did.as_str(),
530
active,
531
status,
532
)
533
+
.fetch_one(&mut *tx)
534
.await
535
.map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
536
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
537
+
.execute(&mut *tx)
538
.await
539
.map_err(|e| format!("DB Error (notify): {}", e))?;
540
+
tx.commit()
541
+
.await
542
+
.map_err(|e| format!("Failed to commit transaction: {}", e))?;
543
Ok(seq_row.seq)
544
}
545
pub async fn sequence_sync_event(
546
state: &AppState,
547
+
did: &Did,
548
commit_cid: &str,
549
rev: Option<&str>,
550
) -> Result<i64, String> {
551
+
let mut tx = state
552
+
.db
553
+
.begin()
554
+
.await
555
+
.map_err(|e| format!("Failed to begin transaction: {}", e))?;
556
let seq_row = sqlx::query!(
557
r#"
558
INSERT INTO repo_seq (did, event_type, commit_cid, rev)
559
VALUES ($1, 'sync', $2, $3)
560
RETURNING seq
561
"#,
562
+
did.as_str(),
563
commit_cid,
564
rev,
565
)
566
+
.fetch_one(&mut *tx)
567
.await
568
.map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
569
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
570
+
.execute(&mut *tx)
571
.await
572
.map_err(|e| format!("DB Error (notify): {}", e))?;
573
+
tx.commit()
574
+
.await
575
+
.map_err(|e| format!("Failed to commit transaction: {}", e))?;
576
Ok(seq_row.seq)
577
}
578
579
pub async fn sequence_genesis_commit(
580
state: &AppState,
581
+
did: &Did,
582
commit_cid: &Cid,
583
mst_root_cid: &Cid,
584
rev: &str,
···
588
let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
589
let prev_cid: Option<&str> = None;
590
let commit_cid_str = commit_cid.to_string();
591
+
let mut tx = state
592
+
.db
593
+
.begin()
594
+
.await
595
+
.map_err(|e| format!("Failed to begin transaction: {}", e))?;
596
let seq_row = sqlx::query!(
597
r#"
598
INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev)
599
VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7)
600
RETURNING seq
601
"#,
602
+
did.as_str(),
603
commit_cid_str,
604
prev_cid,
605
ops,
···
607
&blocks_cids,
608
rev
609
)
610
+
.fetch_one(&mut *tx)
611
.await
612
.map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?;
613
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
614
+
.execute(&mut *tx)
615
.await
616
.map_err(|e| format!("DB Error (notify): {}", e))?;
617
+
tx.commit()
618
+
.await
619
+
.map_err(|e| format!("Failed to commit transaction: {}", e))?;
620
Ok(seq_row.seq)
621
}
+12
-7
src/api/repo/record/validation.rs
+12
-7
src/api/repo/record/validation.rs
···
1
use crate::api::error::ApiError;
2
use crate::validation::{RecordValidator, ValidationError, ValidationStatus};
3
use axum::response::Response;
4
5
-
pub fn validate_record(record: &serde_json::Value, collection: &str) -> Result<(), Box<Response>> {
6
validate_record_with_rkey(record, collection, None)
7
}
8
9
pub fn validate_record_with_rkey(
10
record: &serde_json::Value,
11
-
collection: &str,
12
-
rkey: Option<&str>,
13
) -> Result<(), Box<Response>> {
14
let validator = RecordValidator::new();
15
-
validation_error_to_response(validator.validate_with_rkey(record, collection, rkey))
16
}
17
18
pub fn validate_record_with_status(
19
record: &serde_json::Value,
20
-
collection: &str,
21
-
rkey: Option<&str>,
22
require_lexicon: bool,
23
) -> Result<ValidationStatus, Box<Response>> {
24
let validator = RecordValidator::new().require_lexicon(require_lexicon);
25
-
match validator.validate_with_rkey(record, collection, rkey) {
26
Ok(status) => Ok(status),
27
Err(e) => Err(validation_error_to_box_response(e)),
28
}
···
1
use crate::api::error::ApiError;
2
+
use crate::types::{Nsid, Rkey};
3
use crate::validation::{RecordValidator, ValidationError, ValidationStatus};
4
use axum::response::Response;
5
6
+
pub fn validate_record(record: &serde_json::Value, collection: &Nsid) -> Result<(), Box<Response>> {
7
validate_record_with_rkey(record, collection, None)
8
}
9
10
pub fn validate_record_with_rkey(
11
record: &serde_json::Value,
12
+
collection: &Nsid,
13
+
rkey: Option<&Rkey>,
14
) -> Result<(), Box<Response>> {
15
let validator = RecordValidator::new();
16
+
validation_error_to_response(validator.validate_with_rkey(
17
+
record,
18
+
collection.as_str(),
19
+
rkey.map(|r| r.as_str()),
20
+
))
21
}
22
23
pub fn validate_record_with_status(
24
record: &serde_json::Value,
25
+
collection: &Nsid,
26
+
rkey: Option<&Rkey>,
27
require_lexicon: bool,
28
) -> Result<ValidationStatus, Box<Response>> {
29
let validator = RecordValidator::new().require_lexicon(require_lexicon);
30
+
match validator.validate_with_rkey(record, collection.as_str(), rkey.map(|r| r.as_str())) {
31
Ok(status) => Ok(status),
32
Err(e) => Err(validation_error_to_box_response(e)),
33
}
+12
-12
src/api/repo/record/write.rs
+12
-12
src/api/repo/record/write.rs
···
21
use tracing::error;
22
use uuid::Uuid;
23
24
-
pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
25
let row = sqlx::query(
26
r#"
27
SELECT
···
33
WHERE did = $1
34
"#,
35
)
36
-
.bind(did)
37
.fetch_optional(db)
38
.await?;
39
match row {
···
60
pub async fn prepare_repo_write(
61
state: &AppState,
62
headers: &HeaderMap,
63
-
repo_did: &str,
64
http_method: &str,
65
http_uri: &str,
66
) -> Result<RepoWriteAuth, Response> {
···
96
}
97
response
98
})?;
99
-
if repo_did != auth_user.did {
100
return Err(
101
ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(),
102
);
···
229
match validate_record_with_status(
230
&input.record,
231
&input.collection,
232
-
input.rkey.as_ref().map(|r| r.as_str()),
233
require_lexicon,
234
) {
235
Ok(status) => Some(status),
···
259
_ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(),
260
};
261
let op = RecordOp::Create {
262
-
collection: input.collection.to_string(),
263
-
rkey: rkey.to_string(),
264
cid: record_cid,
265
};
266
let mut new_mst_blocks = std::collections::BTreeMap::new();
···
443
match validate_record_with_status(
444
&input.record,
445
&input.collection,
446
-
Some(input.rkey.as_str()),
447
require_lexicon,
448
) {
449
Ok(status) => Some(status),
···
510
};
511
let op = if existing_cid.is_some() {
512
RecordOp::Update {
513
-
collection: input.collection.to_string(),
514
-
rkey: input.rkey.to_string(),
515
cid: record_cid,
516
prev: existing_cid,
517
}
518
} else {
519
RecordOp::Create {
520
-
collection: input.collection.to_string(),
521
-
rkey: input.rkey.to_string(),
522
cid: record_cid,
523
}
524
};
···
21
use tracing::error;
22
use uuid::Uuid;
23
24
+
pub async fn has_verified_comms_channel(db: &PgPool, did: &Did) -> Result<bool, sqlx::Error> {
25
let row = sqlx::query(
26
r#"
27
SELECT
···
33
WHERE did = $1
34
"#,
35
)
36
+
.bind(did.as_str())
37
.fetch_optional(db)
38
.await?;
39
match row {
···
60
pub async fn prepare_repo_write(
61
state: &AppState,
62
headers: &HeaderMap,
63
+
repo: &AtIdentifier,
64
http_method: &str,
65
http_uri: &str,
66
) -> Result<RepoWriteAuth, Response> {
···
96
}
97
response
98
})?;
99
+
if repo.as_str() != auth_user.did.as_str() {
100
return Err(
101
ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(),
102
);
···
229
match validate_record_with_status(
230
&input.record,
231
&input.collection,
232
+
input.rkey.as_ref(),
233
require_lexicon,
234
) {
235
Ok(status) => Some(status),
···
259
_ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(),
260
};
261
let op = RecordOp::Create {
262
+
collection: input.collection.clone(),
263
+
rkey: rkey.clone(),
264
cid: record_cid,
265
};
266
let mut new_mst_blocks = std::collections::BTreeMap::new();
···
443
match validate_record_with_status(
444
&input.record,
445
&input.collection,
446
+
Some(&input.rkey),
447
require_lexicon,
448
) {
449
Ok(status) => Some(status),
···
510
};
511
let op = if existing_cid.is_some() {
512
RecordOp::Update {
513
+
collection: input.collection.clone(),
514
+
rkey: input.rkey.clone(),
515
cid: record_cid,
516
prev: existing_cid,
517
}
518
} else {
519
RecordOp::Create {
520
+
collection: input.collection.clone(),
521
+
rkey: input.rkey.clone(),
522
cid: record_cid,
523
}
524
};
+7
-6
src/api/server/account_status.rs
+7
-6
src/api/server/account_status.rs
···
3
use crate::cache::Cache;
4
use crate::plc::PlcClient;
5
use crate::state::AppState;
6
-
use crate::types::PlainPassword;
7
use axum::{
8
Json,
9
extract::State,
···
449
did
450
);
451
if let Err(e) =
452
-
crate::api::repo::record::sequence_account_event(&state, did.as_str(), true, None)
453
.await
454
{
455
warn!(
···
463
"[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
464
did, handle
465
);
466
if let Err(e) = crate::api::repo::record::sequence_identity_event(
467
&state,
468
-
did.as_str(),
469
-
handle.as_deref(),
470
)
471
.await
472
{
···
501
};
502
if let Err(e) = crate::api::repo::record::sequence_sync_event(
503
&state,
504
-
did.as_str(),
505
&root_cid,
506
rev.as_deref(),
507
)
···
609
}
610
if let Err(e) = crate::api::repo::record::sequence_account_event(
611
&state,
612
-
did.as_str(),
613
false,
614
Some("deactivated"),
615
)
···
3
use crate::cache::Cache;
4
use crate::plc::PlcClient;
5
use crate::state::AppState;
6
+
use crate::types::{Handle, PlainPassword};
7
use axum::{
8
Json,
9
extract::State,
···
449
did
450
);
451
if let Err(e) =
452
+
crate::api::repo::record::sequence_account_event(&state, &did, true, None)
453
.await
454
{
455
warn!(
···
463
"[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
464
did, handle
465
);
466
+
let handle_typed = handle.as_ref().map(|h| Handle::new_unchecked(h));
467
if let Err(e) = crate::api::repo::record::sequence_identity_event(
468
&state,
469
+
&did,
470
+
handle_typed.as_ref(),
471
)
472
.await
473
{
···
502
};
503
if let Err(e) = crate::api::repo::record::sequence_sync_event(
504
&state,
505
+
&did,
506
&root_cid,
507
rev.as_deref(),
508
)
···
610
}
611
if let Err(e) = crate::api::repo::record::sequence_account_event(
612
&state,
613
+
&did,
614
false,
615
Some("deactivated"),
616
)
+11
-7
src/api/server/passkey_account.rs
+11
-7
src/api/server/passkey_account.rs
···
20
use crate::api::repo::record::utils::create_signed_commit;
21
use crate::auth::{ServiceTokenVerifier, is_service_token};
22
use crate::state::{AppState, RateLimitKind};
23
-
use crate::types::{Did, Handle, PlainPassword};
24
use crate::validation::validate_password;
25
26
fn extract_client_ip(headers: &HeaderMap) -> String {
···
512
}
513
};
514
let rev = Tid::now(LimitedU32::MIN);
515
let (commit_bytes, _sig) =
516
-
match create_signed_commit(&did, mst_root, rev.as_ref(), None, &secret_key) {
517
Ok(result) => result,
518
Err(e) => {
519
error!("Error creating genesis commit: {:?}", e);
···
600
}
601
602
if !is_byod_did_web {
603
if let Err(e) =
604
-
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
605
{
606
warn!("Failed to sequence identity event for {}: {}", did, e);
607
}
608
if let Err(e) =
609
-
crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
610
{
611
warn!("Failed to sequence account event for {}: {}", did, e);
612
}
···
614
"$type": "app.bsky.actor.profile",
615
"displayName": handle
616
});
617
if let Err(e) = crate::api::repo::record::create_record_internal(
618
&state,
619
-
&did,
620
-
"app.bsky.actor.profile",
621
-
"self",
622
&profile_record,
623
)
624
.await
···
20
use crate::api::repo::record::utils::create_signed_commit;
21
use crate::auth::{ServiceTokenVerifier, is_service_token};
22
use crate::state::{AppState, RateLimitKind};
23
+
use crate::types::{Did, Handle, Nsid, PlainPassword, Rkey};
24
use crate::validation::validate_password;
25
26
fn extract_client_ip(headers: &HeaderMap) -> String {
···
512
}
513
};
514
let rev = Tid::now(LimitedU32::MIN);
515
+
let did_typed = Did::new_unchecked(&did);
516
let (commit_bytes, _sig) =
517
+
match create_signed_commit(&did_typed, mst_root, rev.as_ref(), None, &secret_key) {
518
Ok(result) => result,
519
Err(e) => {
520
error!("Error creating genesis commit: {:?}", e);
···
601
}
602
603
if !is_byod_did_web {
604
+
let handle_typed = Handle::new_unchecked(&handle);
605
if let Err(e) =
606
+
crate::api::repo::record::sequence_identity_event(&state, &did_typed, Some(&handle_typed)).await
607
{
608
warn!("Failed to sequence identity event for {}: {}", did, e);
609
}
610
if let Err(e) =
611
+
crate::api::repo::record::sequence_account_event(&state, &did_typed, true, None).await
612
{
613
warn!("Failed to sequence account event for {}: {}", did, e);
614
}
···
616
"$type": "app.bsky.actor.profile",
617
"displayName": handle
618
});
619
+
let profile_collection = Nsid::new_unchecked("app.bsky.actor.profile");
620
+
let profile_rkey = Rkey::new_unchecked("self");
621
if let Err(e) = crate::api::repo::record::create_record_internal(
622
&state,
623
+
&did_typed,
624
+
&profile_collection,
625
+
&profile_rkey,
626
&profile_record,
627
)
628
.await
+3
-2
tests/commit_signing.rs
+3
-2
tests/commit_signing.rs
···
3
use jacquard_repo::commit::Commit;
4
use k256::ecdsa::SigningKey;
5
use std::str::FromStr;
6
7
#[test]
8
fn test_commit_signing_produces_valid_signature() {
···
98
use tranquil_pds::api::repo::record::utils::create_signed_commit;
99
100
let signing_key = SigningKey::random(&mut rand::thread_rng());
101
-
let did = "did:plc:testuser123456789abcdef";
102
let data_cid =
103
Cid::from_str("bafyreib2rxk3ryblouj3fxza5jvx6psmwewwessc4m6g6e7pqhhkwqomfi").unwrap();
104
let rev = Tid::now(LimitedU32::MIN).to_string();
105
106
-
let (signed_bytes, sig) = create_signed_commit(did, data_cid, &rev, None, &signing_key)
107
.expect("signing should succeed");
108
109
assert!(!signed_bytes.is_empty());
···
3
use jacquard_repo::commit::Commit;
4
use k256::ecdsa::SigningKey;
5
use std::str::FromStr;
6
+
use tranquil_pds::Did;
7
8
#[test]
9
fn test_commit_signing_produces_valid_signature() {
···
99
use tranquil_pds::api::repo::record::utils::create_signed_commit;
100
101
let signing_key = SigningKey::random(&mut rand::thread_rng());
102
+
let did = Did::new_unchecked("did:plc:testuser123456789abcdef");
103
let data_cid =
104
Cid::from_str("bafyreib2rxk3ryblouj3fxza5jvx6psmwewwessc4m6g6e7pqhhkwqomfi").unwrap();
105
let rev = Tid::now(LimitedU32::MIN).to_string();
106
107
+
let (signed_bytes, sig) = create_signed_commit(&did, data_cid, &rev, None, &signing_key)
108
.expect("signing should succeed");
109
110
assert!(!signed_bytes.is_empty());