tangled
alpha
login
or
join now
tranquil.farm
/
tranquil-pds
157
fork
atom
Our Personal Data Server from scratch!
tranquil.farm
oauth
atproto
pds
rust
postgresql
objectstorage
fun
157
fork
atom
overview
issues
24
pulls
2
pipelines
Better repo action code quality
lewis.moe
2 months ago
cae667d2
fbc24777
+413
-301
15 changed files
expand all
collapse all
unified
split
src
api
admin
account
delete.rs
update.rs
status.rs
delegation.rs
identity
account.rs
did.rs
repo
record
batch.rs
delete.rs
read.rs
utils.rs
validation.rs
write.rs
server
account_status.rs
passkey_account.rs
tests
commit_signing.rs
+1
-1
src/api/admin/account/delete.rs
···
132
}
133
if let Err(e) = crate::api::repo::record::sequence_account_event(
134
&state,
135
-
did.as_str(),
136
false,
137
Some("deleted"),
138
)
···
132
}
133
if let Err(e) = crate::api::repo::record::sequence_account_event(
134
&state,
135
+
did,
136
false,
137
Some("deleted"),
138
)
+4
-3
src/api/admin/account/update.rs
···
2
use crate::api::error::ApiError;
3
use crate::auth::BearerAuthAdmin;
4
use crate::state::AppState;
5
-
use crate::types::{Did, PlainPassword};
6
use axum::{
7
Json,
8
extract::State,
···
103
let _ = state.cache.delete(&format!("handle:{}", old)).await;
104
}
105
let _ = state.cache.delete(&format!("handle:{}", handle)).await;
0
106
if let Err(e) = crate::api::repo::record::sequence_identity_event(
107
&state,
108
-
did.as_str(),
109
-
Some(&handle),
110
)
111
.await
112
{
···
2
use crate::api::error::ApiError;
3
use crate::auth::BearerAuthAdmin;
4
use crate::state::AppState;
5
+
use crate::types::{Did, Handle, PlainPassword};
6
use axum::{
7
Json,
8
extract::State,
···
103
let _ = state.cache.delete(&format!("handle:{}", old)).await;
104
}
105
let _ = state.cache.delete(&format!("handle:{}", handle)).await;
106
+
let handle_typed = Handle::new_unchecked(&handle);
107
if let Err(e) = crate::api::repo::record::sequence_identity_event(
108
&state,
109
+
did,
110
+
Some(&handle_typed),
111
)
112
.await
113
{
+10
-8
src/api/admin/status.rs
···
1
use crate::api::error::ApiError;
2
use crate::auth::BearerAuthAdmin;
3
use crate::state::AppState;
0
4
use axum::{
5
Json,
6
extract::{Query, State},
···
183
let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
184
match subject_type {
185
Some("com.atproto.admin.defs#repoRef") => {
186
-
let did = input.subject.get("did").and_then(|d| d.as_str());
187
-
if let Some(did) = did {
0
188
let mut tx = match state.db.begin().await {
189
Ok(tx) => tx,
190
Err(e) => {
···
201
if let Err(e) = sqlx::query!(
202
"UPDATE users SET takedown_ref = $1 WHERE did = $2",
203
takedown_ref,
204
-
did
205
)
206
.execute(&mut *tx)
207
.await
···
217
let result = if deactivated.applied {
218
sqlx::query!(
219
"UPDATE users SET deactivated_at = NOW() WHERE did = $1",
220
-
did
221
)
222
.execute(&mut *tx)
223
.await
224
} else {
225
-
sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
226
.execute(&mut *tx)
227
.await
228
};
···
249
};
250
if let Err(e) = crate::api::repo::record::sequence_account_event(
251
&state,
252
-
did,
253
!takedown.applied,
254
status,
255
)
···
266
};
267
if let Err(e) = crate::api::repo::record::sequence_account_event(
268
&state,
269
-
did,
270
!deactivated.applied,
271
status,
272
)
···
276
}
277
}
278
if let Ok(Some(handle)) =
279
-
sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
280
.fetch_optional(&state.db)
281
.await
282
{
···
1
use crate::api::error::ApiError;
2
use crate::auth::BearerAuthAdmin;
3
use crate::state::AppState;
4
+
use crate::types::Did;
5
use axum::{
6
Json,
7
extract::{Query, State},
···
184
let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
185
match subject_type {
186
Some("com.atproto.admin.defs#repoRef") => {
187
+
let did_str = input.subject.get("did").and_then(|d| d.as_str());
188
+
if let Some(did_str) = did_str {
189
+
let did = Did::new_unchecked(did_str);
190
let mut tx = match state.db.begin().await {
191
Ok(tx) => tx,
192
Err(e) => {
···
203
if let Err(e) = sqlx::query!(
204
"UPDATE users SET takedown_ref = $1 WHERE did = $2",
205
takedown_ref,
206
+
did.as_str()
207
)
208
.execute(&mut *tx)
209
.await
···
219
let result = if deactivated.applied {
220
sqlx::query!(
221
"UPDATE users SET deactivated_at = NOW() WHERE did = $1",
222
+
did.as_str()
223
)
224
.execute(&mut *tx)
225
.await
226
} else {
227
+
sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did.as_str())
228
.execute(&mut *tx)
229
.await
230
};
···
251
};
252
if let Err(e) = crate::api::repo::record::sequence_account_event(
253
&state,
254
+
&did,
255
!takedown.applied,
256
status,
257
)
···
268
};
269
if let Err(e) = crate::api::repo::record::sequence_account_event(
270
&state,
271
+
&did,
272
!deactivated.applied,
273
status,
274
)
···
278
}
279
}
280
if let Ok(Some(handle)) =
281
+
sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str())
282
.fetch_optional(&state.db)
283
.await
284
{
+12
-9
src/api/delegation.rs
···
4
use crate::delegation::{self, DelegationActionType};
5
use crate::oauth::db as oauth_db;
6
use crate::state::{AppState, RateLimitKind};
7
-
use crate::types::{Did, Handle};
8
use crate::util::extract_client_ip;
9
use axum::{
10
Json,
···
568
.into_response();
569
}
570
571
-
let did = genesis_result.did;
0
572
info!(did = %did, handle = %handle, controller = %&auth.0.did, "Created DID for delegated account");
573
574
let mut tx = match state.db.begin().await {
···
585
account_type, preferred_comms_channel
586
) VALUES ($1, $2, $3, NULL, FALSE, 'delegated'::account_type, 'email'::comms_channel) RETURNING id"#,
587
)
588
-
.bind(&handle)
589
.bind(&email)
590
-
.bind(&did)
591
.fetch_one(&mut *tx)
592
.await;
593
···
633
if let Err(e) = sqlx::query!(
634
r#"INSERT INTO account_delegations (delegated_did, controller_did, granted_scopes, granted_by)
635
VALUES ($1, $2, $3, $4)"#,
636
-
did,
637
-
&auth.0.did,
638
input.controller_scopes,
639
-
&auth.0.did
640
)
641
.execute(&mut *tx)
642
.await
···
736
"$type": "app.bsky.actor.profile",
737
"displayName": handle
738
});
0
0
739
if let Err(e) = crate::api::repo::record::create_record_internal(
740
&state,
741
&did,
742
-
"app.bsky.actor.profile",
743
-
"self",
744
&profile_record,
745
)
746
.await
···
4
use crate::delegation::{self, DelegationActionType};
5
use crate::oauth::db as oauth_db;
6
use crate::state::{AppState, RateLimitKind};
7
+
use crate::types::{Did, Handle, Nsid, Rkey};
8
use crate::util::extract_client_ip;
9
use axum::{
10
Json,
···
568
.into_response();
569
}
570
571
+
let did = Did::new_unchecked(&genesis_result.did);
572
+
let handle = Handle::new_unchecked(&handle);
573
info!(did = %did, handle = %handle, controller = %&auth.0.did, "Created DID for delegated account");
574
575
let mut tx = match state.db.begin().await {
···
586
account_type, preferred_comms_channel
587
) VALUES ($1, $2, $3, NULL, FALSE, 'delegated'::account_type, 'email'::comms_channel) RETURNING id"#,
588
)
589
+
.bind(handle.as_str())
590
.bind(&email)
591
+
.bind(did.as_str())
592
.fetch_one(&mut *tx)
593
.await;
594
···
634
if let Err(e) = sqlx::query!(
635
r#"INSERT INTO account_delegations (delegated_did, controller_did, granted_scopes, granted_by)
636
VALUES ($1, $2, $3, $4)"#,
637
+
did.as_str(),
638
+
auth.0.did.as_str(),
639
input.controller_scopes,
640
+
auth.0.did.as_str()
641
)
642
.execute(&mut *tx)
643
.await
···
737
"$type": "app.bsky.actor.profile",
738
"displayName": handle
739
});
740
+
let profile_collection = Nsid::new_unchecked("app.bsky.actor.profile");
741
+
let profile_rkey = Rkey::new_unchecked("self");
742
if let Err(e) = crate::api::repo::record::create_record_internal(
743
&state,
744
&did,
745
+
&profile_collection,
746
+
&profile_rkey,
747
&profile_record,
748
)
749
.await
+14
-9
src/api/identity/account.rs
···
4
use crate::auth::{ServiceTokenVerifier, is_service_token};
5
use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
6
use crate::state::{AppState, RateLimitKind};
7
-
use crate::types::{Did, Handle, PlainPassword};
8
use crate::validation::validate_password;
9
use axum::{
10
Json,
···
710
}
711
};
712
let rev = Tid::now(LimitedU32::MIN);
0
713
let (commit_bytes, _sig) =
714
-
match create_signed_commit(&did, mst_root, rev.as_ref(), None, &signing_key) {
715
Ok(result) => result,
716
Err(e) => {
717
error!("Error creating genesis commit: {:?}", e);
···
793
return ApiError::InternalError(None).into_response();
794
}
795
if !is_migration && !is_did_web_byod {
0
0
796
if let Err(e) =
797
-
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
798
{
799
warn!("Failed to sequence identity event for {}: {}", did, e);
800
}
801
if let Err(e) =
802
-
crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
803
{
804
warn!("Failed to sequence account event for {}: {}", did, e);
805
}
806
if let Err(e) = crate::api::repo::record::sequence_genesis_commit(
807
&state,
808
-
&did,
809
&commit_cid,
810
&mst_root,
811
&rev_str,
···
816
}
817
if let Err(e) = crate::api::repo::record::sequence_sync_event(
818
&state,
819
-
&did,
820
&commit_cid_str,
821
Some(rev.as_ref()),
822
)
···
828
"$type": "app.bsky.actor.profile",
829
"displayName": input.handle
830
});
0
0
831
if let Err(e) = crate::api::repo::record::create_record_internal(
832
&state,
833
-
&did,
834
-
"app.bsky.actor.profile",
835
-
"self",
836
&profile_record,
837
)
838
.await
···
4
use crate::auth::{ServiceTokenVerifier, is_service_token};
5
use crate::plc::{PlcClient, create_genesis_operation, signing_key_to_did_key};
6
use crate::state::{AppState, RateLimitKind};
7
+
use crate::types::{Did, Handle, Nsid, PlainPassword, Rkey};
8
use crate::validation::validate_password;
9
use axum::{
10
Json,
···
710
}
711
};
712
let rev = Tid::now(LimitedU32::MIN);
713
+
let did_for_commit = Did::new_unchecked(&did);
714
let (commit_bytes, _sig) =
715
+
match create_signed_commit(&did_for_commit, mst_root, rev.as_ref(), None, &signing_key) {
716
Ok(result) => result,
717
Err(e) => {
718
error!("Error creating genesis commit: {:?}", e);
···
794
return ApiError::InternalError(None).into_response();
795
}
796
if !is_migration && !is_did_web_byod {
797
+
let did_typed = Did::new_unchecked(&did);
798
+
let handle_typed = Handle::new_unchecked(&handle);
799
if let Err(e) =
800
+
crate::api::repo::record::sequence_identity_event(&state, &did_typed, Some(&handle_typed)).await
801
{
802
warn!("Failed to sequence identity event for {}: {}", did, e);
803
}
804
if let Err(e) =
805
+
crate::api::repo::record::sequence_account_event(&state, &did_typed, true, None).await
806
{
807
warn!("Failed to sequence account event for {}: {}", did, e);
808
}
809
if let Err(e) = crate::api::repo::record::sequence_genesis_commit(
810
&state,
811
+
&did_typed,
812
&commit_cid,
813
&mst_root,
814
&rev_str,
···
819
}
820
if let Err(e) = crate::api::repo::record::sequence_sync_event(
821
&state,
822
+
&did_typed,
823
&commit_cid_str,
824
Some(rev.as_ref()),
825
)
···
831
"$type": "app.bsky.actor.profile",
832
"displayName": input.handle
833
});
834
+
let profile_collection = Nsid::new_unchecked("app.bsky.actor.profile");
835
+
let profile_rkey = Rkey::new_unchecked("self");
836
if let Err(e) = crate::api::repo::record::create_record_internal(
837
&state,
838
+
&did_typed,
839
+
&profile_collection,
840
+
&profile_rkey,
841
&profile_record,
842
)
843
.await
+7
-3
src/api/identity/did.rs
···
2
use crate::auth::BearerAuthAllowDeactivated;
3
use crate::plc::signing_key_to_did_key;
4
use crate::state::AppState;
0
5
use axum::{
6
Json,
7
extract::{Path, Query, State},
···
669
format!("{}.{}", new_handle, hostname)
670
};
671
if full_handle == current_handle {
0
672
if let Err(e) =
673
-
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&full_handle))
674
.await
675
{
676
warn!("Failed to sequence identity event for handle update: {}", e);
···
692
full_handle
693
} else {
694
if new_handle == current_handle {
0
695
if let Err(e) =
696
-
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&new_handle))
697
.await
698
{
699
warn!("Failed to sequence identity event for handle update: {}", e);
···
749
.await;
750
}
751
let _ = state.cache.delete(&format!("handle:{}", handle)).await;
0
752
if let Err(e) =
753
-
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
754
{
755
warn!("Failed to sequence identity event for handle update: {}", e);
756
}
···
2
use crate::auth::BearerAuthAllowDeactivated;
3
use crate::plc::signing_key_to_did_key;
4
use crate::state::AppState;
5
+
use crate::types::Handle;
6
use axum::{
7
Json,
8
extract::{Path, Query, State},
···
670
format!("{}.{}", new_handle, hostname)
671
};
672
if full_handle == current_handle {
673
+
let handle_typed = Handle::new_unchecked(&full_handle);
674
if let Err(e) =
675
+
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle_typed))
676
.await
677
{
678
warn!("Failed to sequence identity event for handle update: {}", e);
···
694
full_handle
695
} else {
696
if new_handle == current_handle {
697
+
let handle_typed = Handle::new_unchecked(&new_handle);
698
if let Err(e) =
699
+
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle_typed))
700
.await
701
{
702
warn!("Failed to sequence identity event for handle update: {}", e);
···
752
.await;
753
}
754
let _ = state.cache.delete(&format!("handle:{}", handle)).await;
755
+
let handle_typed = Handle::new_unchecked(&handle);
756
if let Err(e) =
757
+
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle_typed)).await
758
{
759
warn!("Failed to sequence identity event for handle update: {}", e);
760
}
+190
-139
src/api/repo/record/batch.rs
···
6
use crate::delegation::{self, DelegationActionType};
7
use crate::repo::tracking::TrackingBlockStore;
8
use crate::state::AppState;
9
-
use crate::types::{AtIdentifier, AtUri, Nsid, Rkey};
10
use axum::{
11
Json,
12
extract::State,
···
22
use tracing::{error, info};
23
24
const MAX_BATCH_WRITES: usize = 200;
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
25
26
#[derive(Deserialize)]
27
#[serde(tag = "$type")]
···
237
_ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
238
};
239
let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
240
-
let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
241
-
let mut results: Vec<WriteResult> = Vec::new();
242
-
let mut ops: Vec<RecordOp> = Vec::new();
243
-
let mut modified_keys: Vec<String> = Vec::new();
244
-
let mut all_blob_cids: Vec<String> = Vec::new();
245
-
for write in &input.writes {
246
-
match write {
247
-
WriteOp::Create {
248
-
collection,
249
-
rkey,
250
-
value,
251
-
} => {
252
-
let validation_status = if input.validate == Some(false) {
253
-
None
254
-
} else {
255
-
let require_lexicon = input.validate == Some(true);
256
-
match validate_record_with_status(
257
-
value,
258
-
collection,
259
-
rkey.as_ref().map(|r| r.as_str()),
260
-
require_lexicon,
261
-
) {
262
-
Ok(status) => Some(status),
263
-
Err(err_response) => return *err_response,
264
-
}
265
-
};
266
-
all_blob_cids.extend(extract_blob_cids(value));
267
-
let rkey = rkey.clone().unwrap_or_else(Rkey::generate);
268
-
let record_ipld = crate::util::json_to_ipld(value);
269
-
let mut record_bytes = Vec::new();
270
-
if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
271
-
return ApiError::InvalidRecord("Failed to serialize record".into())
272
-
.into_response();
273
-
}
274
-
let record_cid = match tracking_store.put(&record_bytes).await {
275
-
Ok(c) => c,
276
-
Err(_) => {
277
-
return ApiError::InternalError(Some("Failed to store record".into()))
278
-
.into_response();
279
-
}
280
-
};
281
-
let key = format!("{}/{}", collection, rkey);
282
-
modified_keys.push(key.clone());
283
-
mst = match mst.add(&key, record_cid).await {
284
-
Ok(m) => m,
285
-
Err(_) => {
286
-
return ApiError::InternalError(Some("Failed to add to MST".into()))
287
-
.into_response();
288
-
}
289
-
};
290
-
let uri = AtUri::from_parts(&did, collection, &rkey);
291
-
results.push(WriteResult::CreateResult {
292
-
uri,
293
-
cid: record_cid.to_string(),
294
-
validation_status: validation_status.map(|s| s.to_string()),
295
-
});
296
-
ops.push(RecordOp::Create {
297
-
collection: collection.to_string(),
298
-
rkey: rkey.to_string(),
299
-
cid: record_cid,
300
-
});
301
-
}
302
-
WriteOp::Update {
303
-
collection,
304
-
rkey,
305
-
value,
306
-
} => {
307
-
let validation_status = if input.validate == Some(false) {
308
-
None
309
-
} else {
310
-
let require_lexicon = input.validate == Some(true);
311
-
match validate_record_with_status(
312
-
value,
313
-
collection,
314
-
Some(rkey.as_str()),
315
-
require_lexicon,
316
-
) {
317
-
Ok(status) => Some(status),
318
-
Err(err_response) => return *err_response,
319
-
}
320
-
};
321
-
all_blob_cids.extend(extract_blob_cids(value));
322
-
let record_ipld = crate::util::json_to_ipld(value);
323
-
let mut record_bytes = Vec::new();
324
-
if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
325
-
return ApiError::InvalidRecord("Failed to serialize record".into())
326
-
.into_response();
327
-
}
328
-
let record_cid = match tracking_store.put(&record_bytes).await {
329
-
Ok(c) => c,
330
-
Err(_) => {
331
-
return ApiError::InternalError(Some("Failed to store record".into()))
332
-
.into_response();
333
-
}
334
-
};
335
-
let key = format!("{}/{}", collection, rkey);
336
-
modified_keys.push(key.clone());
337
-
let prev_record_cid = mst.get(&key).await.ok().flatten();
338
-
mst = match mst.update(&key, record_cid).await {
339
-
Ok(m) => m,
340
-
Err(_) => {
341
-
return ApiError::InternalError(Some("Failed to update MST".into()))
342
-
.into_response();
343
-
}
344
-
};
345
-
let uri = AtUri::from_parts(&did, collection, rkey);
346
-
results.push(WriteResult::UpdateResult {
347
-
uri,
348
-
cid: record_cid.to_string(),
349
-
validation_status: validation_status.map(|s| s.to_string()),
350
-
});
351
-
ops.push(RecordOp::Update {
352
-
collection: collection.to_string(),
353
-
rkey: rkey.to_string(),
354
-
cid: record_cid,
355
-
prev: prev_record_cid,
356
-
});
357
-
}
358
-
WriteOp::Delete { collection, rkey } => {
359
-
let key = format!("{}/{}", collection, rkey);
360
-
modified_keys.push(key.clone());
361
-
let prev_record_cid = mst.get(&key).await.ok().flatten();
362
-
mst = match mst.delete(&key).await {
363
-
Ok(m) => m,
364
-
Err(_) => {
365
-
return ApiError::InternalError(Some("Failed to delete from MST".into()))
366
-
.into_response();
367
-
}
368
-
};
369
-
results.push(WriteResult::DeleteResult {});
370
-
ops.push(RecordOp::Delete {
371
-
collection: collection.to_string(),
372
-
rkey: rkey.to_string(),
373
-
prev: prev_record_cid,
374
-
});
375
-
}
376
-
}
377
-
}
378
let new_mst_root = match mst.persist().await {
379
Ok(c) => c,
380
Err(_) => {
···
6
use crate::delegation::{self, DelegationActionType};
7
use crate::repo::tracking::TrackingBlockStore;
8
use crate::state::AppState;
9
+
use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey};
10
use axum::{
11
Json,
12
extract::State,
···
22
use tracing::{error, info};
23
24
const MAX_BATCH_WRITES: usize = 200;
25
+
26
+
struct WriteAccumulator {
27
+
mst: Mst<TrackingBlockStore>,
28
+
results: Vec<WriteResult>,
29
+
ops: Vec<RecordOp>,
30
+
modified_keys: Vec<String>,
31
+
all_blob_cids: Vec<String>,
32
+
}
33
+
34
+
async fn process_single_write(
35
+
write: &WriteOp,
36
+
acc: WriteAccumulator,
37
+
did: &Did,
38
+
validate: Option<bool>,
39
+
tracking_store: &TrackingBlockStore,
40
+
) -> Result<WriteAccumulator, Response> {
41
+
let WriteAccumulator {
42
+
mst,
43
+
mut results,
44
+
mut ops,
45
+
mut modified_keys,
46
+
mut all_blob_cids,
47
+
} = acc;
48
+
49
+
match write {
50
+
WriteOp::Create {
51
+
collection,
52
+
rkey,
53
+
value,
54
+
} => {
55
+
let validation_status = match validate {
56
+
Some(false) => None,
57
+
_ => {
58
+
let require_lexicon = validate == Some(true);
59
+
match validate_record_with_status(
60
+
value,
61
+
collection,
62
+
rkey.as_ref(),
63
+
require_lexicon,
64
+
) {
65
+
Ok(status) => Some(status),
66
+
Err(err_response) => return Err(*err_response),
67
+
}
68
+
}
69
+
};
70
+
all_blob_cids.extend(extract_blob_cids(value));
71
+
let rkey = rkey.clone().unwrap_or_else(Rkey::generate);
72
+
let record_ipld = crate::util::json_to_ipld(value);
73
+
let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld).map_err(|_| {
74
+
ApiError::InvalidRecord("Failed to serialize record".into()).into_response()
75
+
})?;
76
+
let record_cid = tracking_store.put(&record_bytes).await.map_err(|_| {
77
+
ApiError::InternalError(Some("Failed to store record".into())).into_response()
78
+
})?;
79
+
let key = format!("{}/{}", collection, rkey);
80
+
modified_keys.push(key.clone());
81
+
let new_mst = mst.add(&key, record_cid).await.map_err(|_| {
82
+
ApiError::InternalError(Some("Failed to add to MST".into())).into_response()
83
+
})?;
84
+
let uri = AtUri::from_parts(did, collection, &rkey);
85
+
results.push(WriteResult::CreateResult {
86
+
uri,
87
+
cid: record_cid.to_string(),
88
+
validation_status: validation_status.map(|s| s.to_string()),
89
+
});
90
+
ops.push(RecordOp::Create {
91
+
collection: collection.clone(),
92
+
rkey: rkey.clone(),
93
+
cid: record_cid,
94
+
});
95
+
Ok(WriteAccumulator {
96
+
mst: new_mst,
97
+
results,
98
+
ops,
99
+
modified_keys,
100
+
all_blob_cids,
101
+
})
102
+
}
103
+
WriteOp::Update {
104
+
collection,
105
+
rkey,
106
+
value,
107
+
} => {
108
+
let validation_status = match validate {
109
+
Some(false) => None,
110
+
_ => {
111
+
let require_lexicon = validate == Some(true);
112
+
match validate_record_with_status(
113
+
value,
114
+
collection,
115
+
Some(rkey),
116
+
require_lexicon,
117
+
) {
118
+
Ok(status) => Some(status),
119
+
Err(err_response) => return Err(*err_response),
120
+
}
121
+
}
122
+
};
123
+
all_blob_cids.extend(extract_blob_cids(value));
124
+
let record_ipld = crate::util::json_to_ipld(value);
125
+
let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld).map_err(|_| {
126
+
ApiError::InvalidRecord("Failed to serialize record".into()).into_response()
127
+
})?;
128
+
let record_cid = tracking_store.put(&record_bytes).await.map_err(|_| {
129
+
ApiError::InternalError(Some("Failed to store record".into())).into_response()
130
+
})?;
131
+
let key = format!("{}/{}", collection, rkey);
132
+
modified_keys.push(key.clone());
133
+
let prev_record_cid = mst.get(&key).await.ok().flatten();
134
+
let new_mst = mst.update(&key, record_cid).await.map_err(|_| {
135
+
ApiError::InternalError(Some("Failed to update MST".into())).into_response()
136
+
})?;
137
+
let uri = AtUri::from_parts(did, collection, rkey);
138
+
results.push(WriteResult::UpdateResult {
139
+
uri,
140
+
cid: record_cid.to_string(),
141
+
validation_status: validation_status.map(|s| s.to_string()),
142
+
});
143
+
ops.push(RecordOp::Update {
144
+
collection: collection.clone(),
145
+
rkey: rkey.clone(),
146
+
cid: record_cid,
147
+
prev: prev_record_cid,
148
+
});
149
+
Ok(WriteAccumulator {
150
+
mst: new_mst,
151
+
results,
152
+
ops,
153
+
modified_keys,
154
+
all_blob_cids,
155
+
})
156
+
}
157
+
WriteOp::Delete { collection, rkey } => {
158
+
let key = format!("{}/{}", collection, rkey);
159
+
modified_keys.push(key.clone());
160
+
let prev_record_cid = mst.get(&key).await.ok().flatten();
161
+
let new_mst = mst.delete(&key).await.map_err(|_| {
162
+
ApiError::InternalError(Some("Failed to delete from MST".into())).into_response()
163
+
})?;
164
+
results.push(WriteResult::DeleteResult {});
165
+
ops.push(RecordOp::Delete {
166
+
collection: collection.clone(),
167
+
rkey: rkey.clone(),
168
+
prev: prev_record_cid,
169
+
});
170
+
Ok(WriteAccumulator {
171
+
mst: new_mst,
172
+
results,
173
+
ops,
174
+
modified_keys,
175
+
all_blob_cids,
176
+
})
177
+
}
178
+
}
179
+
}
180
+
181
+
async fn process_writes(
182
+
writes: &[WriteOp],
183
+
initial_mst: Mst<TrackingBlockStore>,
184
+
did: &Did,
185
+
validate: Option<bool>,
186
+
tracking_store: &TrackingBlockStore,
187
+
) -> Result<WriteAccumulator, Response> {
188
+
use futures::stream::{self, TryStreamExt};
189
+
let initial_acc = WriteAccumulator {
190
+
mst: initial_mst,
191
+
results: Vec::new(),
192
+
ops: Vec::new(),
193
+
modified_keys: Vec::new(),
194
+
all_blob_cids: Vec::new(),
195
+
};
196
+
stream::iter(writes.iter().map(Ok::<_, Response>))
197
+
.try_fold(initial_acc, |acc, write| async move {
198
+
process_single_write(write, acc, did, validate, tracking_store).await
199
+
})
200
+
.await
201
+
}
202
203
#[derive(Deserialize)]
204
#[serde(tag = "$type")]
···
414
_ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
415
};
416
let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
417
+
let initial_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
418
+
let WriteAccumulator {
419
+
mst,
420
+
results,
421
+
ops,
422
+
modified_keys,
423
+
all_blob_cids,
424
+
} = match process_writes(&input.writes, initial_mst, &did, input.validate, &tracking_store).await
425
+
{
426
+
Ok(acc) => acc,
427
+
Err(response) => return response,
428
+
};
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
429
let new_mst_root = match mst.persist().await {
430
Ok(c) => c,
431
Err(_) => {
+2
-9
src/api/repo/record/delete.rs
···
65
return e;
66
}
67
68
-
if crate::util::is_account_migrated(&state.db, &auth.did)
69
-
.await
70
-
.unwrap_or(false)
71
-
{
72
-
return ApiError::AccountMigrated.into_response();
73
-
}
74
-
75
let did = auth.did;
76
let user_id = auth.user_id;
77
let current_root_cid = auth.current_root_cid;
···
125
let collection_for_audit = input.collection.to_string();
126
let rkey_for_audit = input.rkey.to_string();
127
let op = RecordOp::Delete {
128
-
collection: input.collection.to_string(),
129
-
rkey: rkey_for_audit.clone(),
130
prev: prev_record_cid,
131
};
132
let mut new_mst_blocks = std::collections::BTreeMap::new();
···
65
return e;
66
}
67
0
0
0
0
0
0
0
68
let did = auth.did;
69
let user_id = auth.user_id;
70
let current_root_cid = auth.current_root_cid;
···
118
let collection_for_audit = input.collection.to_string();
119
let rkey_for_audit = input.rkey.to_string();
120
let op = RecordOp::Delete {
121
+
collection: input.collection.clone(),
122
+
rkey: input.rkey.clone(),
123
prev: prev_record_cid,
124
};
125
let mut new_mst_blocks = std::collections::BTreeMap::new();
+24
-23
src/api/repo/record/read.rs
···
13
use jacquard_repo::storage::BlockStore;
14
use serde::{Deserialize, Serialize};
15
use serde_json::{Map, Value, json};
16
-
use std::collections::HashMap;
17
use std::str::FromStr;
18
use tracing::error;
19
···
237
}
238
};
239
let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
240
-
let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
241
-
let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
242
-
for (rkey, cid_str) in &rows {
243
-
if let Ok(cid) = Cid::from_str(cid_str) {
244
-
cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
245
-
cids.push(cid);
246
-
}
247
-
}
0
248
let blocks = match state.block_store.get_many(&cids).await {
249
Ok(b) => b,
250
Err(e) => {
···
252
return ApiError::InternalError(None).into_response();
253
}
254
};
255
-
let mut records = Vec::new();
256
-
for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
257
-
if let Some(block) = block_opt
258
-
&& let Some((rkey, cid_str)) = cid_to_rkey.get(cid)
259
-
&& let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block)
260
-
{
261
-
let value = ipld_to_json(ipld);
262
-
records.push(json!({
263
-
"uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
264
-
"cid": cid_str,
265
-
"value": value
266
-
}));
267
-
}
268
-
}
0
269
Json(ListRecordsOutput {
270
cursor: last_rkey,
271
records,
···
13
use jacquard_repo::storage::BlockStore;
14
use serde::{Deserialize, Serialize};
15
use serde_json::{Map, Value, json};
0
16
use std::str::FromStr;
17
use tracing::error;
18
···
236
}
237
};
238
let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
239
+
let parsed_rows: Vec<(Cid, String, String)> = rows
240
+
.iter()
241
+
.filter_map(|(rkey, cid_str)| {
242
+
Cid::from_str(cid_str)
243
+
.ok()
244
+
.map(|cid| (cid, rkey.clone(), cid_str.clone()))
245
+
})
246
+
.collect();
247
+
let cids: Vec<Cid> = parsed_rows.iter().map(|(cid, _, _)| *cid).collect();
248
let blocks = match state.block_store.get_many(&cids).await {
249
Ok(b) => b,
250
Err(e) => {
···
252
return ApiError::InternalError(None).into_response();
253
}
254
};
255
+
let records: Vec<Value> = parsed_rows
256
+
.iter()
257
+
.zip(blocks.into_iter())
258
+
.filter_map(|((_, rkey, cid_str), block_opt)| {
259
+
block_opt.and_then(|block| {
260
+
serde_ipld_dagcbor::from_slice::<Ipld>(&block).ok().map(|ipld| {
261
+
json!({
262
+
"uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
263
+
"cid": cid_str,
264
+
"value": ipld_to_json(ipld)
265
+
})
266
+
})
267
+
})
268
+
})
269
+
.collect();
270
Json(ListRecordsOutput {
271
cursor: last_rkey,
272
records,
+104
-63
src/api/repo/record/utils.rs
···
1
use crate::state::AppState;
0
2
use bytes::Bytes;
3
use cid::Cid;
4
use jacquard::types::{integer::LimitedU32, string::Tid};
···
38
}
39
40
pub fn create_signed_commit(
41
-
did: &str,
42
data: Cid,
43
rev: &str,
44
prev: Option<Cid>,
45
signing_key: &SigningKey,
46
) -> Result<(Vec<u8>, Bytes), String> {
47
-
let did =
48
-
jacquard::types::string::Did::new(did).map_err(|e| format!("Invalid DID: {:?}", e))?;
49
let rev =
50
jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?;
51
let unsigned = Commit::new_unsigned(did, data, rev, prev);
···
61
62
pub enum RecordOp {
63
Create {
64
-
collection: String,
65
-
rkey: String,
66
cid: Cid,
67
},
68
Update {
69
-
collection: String,
70
-
rkey: String,
71
cid: Cid,
72
prev: Option<Cid>,
73
},
74
Delete {
75
-
collection: String,
76
-
rkey: String,
77
prev: Option<Cid>,
78
},
79
}
···
84
}
85
86
pub struct CommitParams<'a> {
87
-
pub did: &'a str,
88
pub user_id: Uuid,
89
pub current_root_cid: Option<Cid>,
90
pub prev_data_cid: Option<Cid>,
···
218
.await
219
.map_err(|e| format!("DB Error (user_blocks delete obsolete): {}", e))?;
220
}
221
-
let mut upsert_collections: Vec<String> = Vec::new();
222
-
let mut upsert_rkeys: Vec<String> = Vec::new();
223
-
let mut upsert_cids: Vec<String> = Vec::new();
224
-
let mut delete_collections: Vec<String> = Vec::new();
225
-
let mut delete_rkeys: Vec<String> = Vec::new();
226
-
for op in &ops {
227
-
match op {
228
-
RecordOp::Create {
229
-
collection,
230
-
rkey,
231
-
cid,
232
-
}
233
-
| RecordOp::Update {
234
-
collection,
235
-
rkey,
236
-
cid,
237
-
..
238
-
} => {
239
-
upsert_collections.push(collection.clone());
240
-
upsert_rkeys.push(rkey.clone());
241
-
upsert_cids.push(cid.to_string());
242
-
}
0
0
0
0
0
0
0
0
0
0
243
RecordOp::Delete {
244
collection, rkey, ..
245
-
} => {
246
-
delete_collections.push(collection.clone());
247
-
delete_rkeys.push(rkey.clone());
248
-
}
249
-
}
250
-
}
251
if !upsert_collections.is_empty() {
252
sqlx::query!(
253
r#"
···
337
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
338
RETURNING seq
339
"#,
340
-
did,
341
event_type,
342
new_root_cid.to_string(),
343
prev_cid_str,
···
367
}
368
pub async fn create_record_internal(
369
state: &AppState,
370
-
did: &str,
371
-
collection: &str,
372
-
rkey: &str,
373
record: &serde_json::Value,
374
) -> Result<(String, Cid), String> {
375
use crate::repo::tracking::TrackingBlockStore;
376
use jacquard_repo::mst::Mst;
377
use std::sync::Arc;
378
-
let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
379
.fetch_optional(&state.db)
380
.await
381
.map_err(|e| format!("DB error: {}", e))?
···
417
.await
418
.map_err(|e| format!("Failed to persist MST: {:?}", e))?;
419
let op = RecordOp::Create {
420
-
collection: collection.to_string(),
421
-
rkey: rkey.to_string(),
422
cid: record_cid,
423
};
424
let mut new_mst_blocks = std::collections::BTreeMap::new();
···
471
472
pub async fn sequence_identity_event(
473
state: &AppState,
474
-
did: &str,
475
-
handle: Option<&str>,
476
) -> Result<i64, String> {
0
0
0
0
0
477
let seq_row = sqlx::query!(
478
r#"
479
INSERT INTO repo_seq (did, event_type, handle)
480
VALUES ($1, 'identity', $2)
481
RETURNING seq
482
"#,
483
-
did,
484
-
handle,
485
)
486
-
.fetch_one(&state.db)
487
.await
488
.map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
489
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
490
-
.execute(&state.db)
491
.await
492
.map_err(|e| format!("DB Error (notify): {}", e))?;
0
0
0
493
Ok(seq_row.seq)
494
}
495
pub async fn sequence_account_event(
496
state: &AppState,
497
-
did: &str,
498
active: bool,
499
status: Option<&str>,
500
) -> Result<i64, String> {
0
0
0
0
0
501
let seq_row = sqlx::query!(
502
r#"
503
INSERT INTO repo_seq (did, event_type, active, status)
504
VALUES ($1, 'account', $2, $3)
505
RETURNING seq
506
"#,
507
-
did,
508
active,
509
status,
510
)
511
-
.fetch_one(&state.db)
512
.await
513
.map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
514
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
515
-
.execute(&state.db)
516
.await
517
.map_err(|e| format!("DB Error (notify): {}", e))?;
0
0
0
518
Ok(seq_row.seq)
519
}
520
pub async fn sequence_sync_event(
521
state: &AppState,
522
-
did: &str,
523
commit_cid: &str,
524
rev: Option<&str>,
525
) -> Result<i64, String> {
0
0
0
0
0
526
let seq_row = sqlx::query!(
527
r#"
528
INSERT INTO repo_seq (did, event_type, commit_cid, rev)
529
VALUES ($1, 'sync', $2, $3)
530
RETURNING seq
531
"#,
532
-
did,
533
commit_cid,
534
rev,
535
)
536
-
.fetch_one(&state.db)
537
.await
538
.map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
539
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
540
-
.execute(&state.db)
541
.await
542
.map_err(|e| format!("DB Error (notify): {}", e))?;
0
0
0
543
Ok(seq_row.seq)
544
}
545
546
pub async fn sequence_genesis_commit(
547
state: &AppState,
548
-
did: &str,
549
commit_cid: &Cid,
550
mst_root_cid: &Cid,
551
rev: &str,
···
555
let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
556
let prev_cid: Option<&str> = None;
557
let commit_cid_str = commit_cid.to_string();
0
0
0
0
0
558
let seq_row = sqlx::query!(
559
r#"
560
INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev)
561
VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7)
562
RETURNING seq
563
"#,
564
-
did,
565
commit_cid_str,
566
prev_cid,
567
ops,
···
569
&blocks_cids,
570
rev
571
)
572
-
.fetch_one(&state.db)
573
.await
574
.map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?;
575
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
576
-
.execute(&state.db)
577
.await
578
.map_err(|e| format!("DB Error (notify): {}", e))?;
0
0
0
579
Ok(seq_row.seq)
580
}
···
1
use crate::state::AppState;
2
+
use crate::types::{Did, Handle, Nsid, Rkey};
3
use bytes::Bytes;
4
use cid::Cid;
5
use jacquard::types::{integer::LimitedU32, string::Tid};
···
39
}
40
41
pub fn create_signed_commit(
42
+
did: &Did,
43
data: Cid,
44
rev: &str,
45
prev: Option<Cid>,
46
signing_key: &SigningKey,
47
) -> Result<(Vec<u8>, Bytes), String> {
48
+
let did = jacquard::types::string::Did::new(did.as_str())
49
+
.map_err(|e| format!("Invalid DID: {:?}", e))?;
50
let rev =
51
jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?;
52
let unsigned = Commit::new_unsigned(did, data, rev, prev);
···
62
63
pub enum RecordOp {
64
Create {
65
+
collection: Nsid,
66
+
rkey: Rkey,
67
cid: Cid,
68
},
69
Update {
70
+
collection: Nsid,
71
+
rkey: Rkey,
72
cid: Cid,
73
prev: Option<Cid>,
74
},
75
Delete {
76
+
collection: Nsid,
77
+
rkey: Rkey,
78
prev: Option<Cid>,
79
},
80
}
···
85
}
86
87
pub struct CommitParams<'a> {
88
+
pub did: &'a Did,
89
pub user_id: Uuid,
90
pub current_root_cid: Option<Cid>,
91
pub prev_data_cid: Option<Cid>,
···
219
.await
220
.map_err(|e| format!("DB Error (user_blocks delete obsolete): {}", e))?;
221
}
222
+
let (upserts, deletes): (Vec<_>, Vec<_>) = ops.iter().partition(|op| {
223
+
matches!(op, RecordOp::Create { .. } | RecordOp::Update { .. })
224
+
});
225
+
let (upsert_collections, upsert_rkeys, upsert_cids): (Vec<String>, Vec<String>, Vec<String>) =
226
+
upserts
227
+
.into_iter()
228
+
.filter_map(|op| match op {
229
+
RecordOp::Create {
230
+
collection,
231
+
rkey,
232
+
cid,
233
+
}
234
+
| RecordOp::Update {
235
+
collection,
236
+
rkey,
237
+
cid,
238
+
..
239
+
} => Some((collection.to_string(), rkey.to_string(), cid.to_string())),
240
+
_ => None,
241
+
})
242
+
.fold(
243
+
(Vec::new(), Vec::new(), Vec::new()),
244
+
|(mut cols, mut rkeys, mut cids), (c, r, ci)| {
245
+
cols.push(c);
246
+
rkeys.push(r);
247
+
cids.push(ci);
248
+
(cols, rkeys, cids)
249
+
},
250
+
);
251
+
let (delete_collections, delete_rkeys): (Vec<String>, Vec<String>) = deletes
252
+
.into_iter()
253
+
.filter_map(|op| match op {
254
RecordOp::Delete {
255
collection, rkey, ..
256
+
} => Some((collection.to_string(), rkey.to_string())),
257
+
_ => None,
258
+
})
259
+
.unzip();
0
0
260
if !upsert_collections.is_empty() {
261
sqlx::query!(
262
r#"
···
346
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
347
RETURNING seq
348
"#,
349
+
did.as_str(),
350
event_type,
351
new_root_cid.to_string(),
352
prev_cid_str,
···
376
}
377
pub async fn create_record_internal(
378
state: &AppState,
379
+
did: &Did,
380
+
collection: &Nsid,
381
+
rkey: &Rkey,
382
record: &serde_json::Value,
383
) -> Result<(String, Cid), String> {
384
use crate::repo::tracking::TrackingBlockStore;
385
use jacquard_repo::mst::Mst;
386
use std::sync::Arc;
387
+
let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
388
.fetch_optional(&state.db)
389
.await
390
.map_err(|e| format!("DB error: {}", e))?
···
426
.await
427
.map_err(|e| format!("Failed to persist MST: {:?}", e))?;
428
let op = RecordOp::Create {
429
+
collection: collection.clone(),
430
+
rkey: rkey.clone(),
431
cid: record_cid,
432
};
433
let mut new_mst_blocks = std::collections::BTreeMap::new();
···
480
481
pub async fn sequence_identity_event(
482
state: &AppState,
483
+
did: &Did,
484
+
handle: Option<&Handle>,
485
) -> Result<i64, String> {
486
+
let mut tx = state
487
+
.db
488
+
.begin()
489
+
.await
490
+
.map_err(|e| format!("Failed to begin transaction: {}", e))?;
491
let seq_row = sqlx::query!(
492
r#"
493
INSERT INTO repo_seq (did, event_type, handle)
494
VALUES ($1, 'identity', $2)
495
RETURNING seq
496
"#,
497
+
did.as_str(),
498
+
handle.map(|h| h.as_str()),
499
)
500
+
.fetch_one(&mut *tx)
501
.await
502
.map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
503
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
504
+
.execute(&mut *tx)
505
.await
506
.map_err(|e| format!("DB Error (notify): {}", e))?;
507
+
tx.commit()
508
+
.await
509
+
.map_err(|e| format!("Failed to commit transaction: {}", e))?;
510
Ok(seq_row.seq)
511
}
512
pub async fn sequence_account_event(
513
state: &AppState,
514
+
did: &Did,
515
active: bool,
516
status: Option<&str>,
517
) -> Result<i64, String> {
518
+
let mut tx = state
519
+
.db
520
+
.begin()
521
+
.await
522
+
.map_err(|e| format!("Failed to begin transaction: {}", e))?;
523
let seq_row = sqlx::query!(
524
r#"
525
INSERT INTO repo_seq (did, event_type, active, status)
526
VALUES ($1, 'account', $2, $3)
527
RETURNING seq
528
"#,
529
+
did.as_str(),
530
active,
531
status,
532
)
533
+
.fetch_one(&mut *tx)
534
.await
535
.map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
536
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
537
+
.execute(&mut *tx)
538
.await
539
.map_err(|e| format!("DB Error (notify): {}", e))?;
540
+
tx.commit()
541
+
.await
542
+
.map_err(|e| format!("Failed to commit transaction: {}", e))?;
543
Ok(seq_row.seq)
544
}
545
pub async fn sequence_sync_event(
546
state: &AppState,
547
+
did: &Did,
548
commit_cid: &str,
549
rev: Option<&str>,
550
) -> Result<i64, String> {
551
+
let mut tx = state
552
+
.db
553
+
.begin()
554
+
.await
555
+
.map_err(|e| format!("Failed to begin transaction: {}", e))?;
556
let seq_row = sqlx::query!(
557
r#"
558
INSERT INTO repo_seq (did, event_type, commit_cid, rev)
559
VALUES ($1, 'sync', $2, $3)
560
RETURNING seq
561
"#,
562
+
did.as_str(),
563
commit_cid,
564
rev,
565
)
566
+
.fetch_one(&mut *tx)
567
.await
568
.map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
569
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
570
+
.execute(&mut *tx)
571
.await
572
.map_err(|e| format!("DB Error (notify): {}", e))?;
573
+
tx.commit()
574
+
.await
575
+
.map_err(|e| format!("Failed to commit transaction: {}", e))?;
576
Ok(seq_row.seq)
577
}
578
579
pub async fn sequence_genesis_commit(
580
state: &AppState,
581
+
did: &Did,
582
commit_cid: &Cid,
583
mst_root_cid: &Cid,
584
rev: &str,
···
588
let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
589
let prev_cid: Option<&str> = None;
590
let commit_cid_str = commit_cid.to_string();
591
+
let mut tx = state
592
+
.db
593
+
.begin()
594
+
.await
595
+
.map_err(|e| format!("Failed to begin transaction: {}", e))?;
596
let seq_row = sqlx::query!(
597
r#"
598
INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev)
599
VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7)
600
RETURNING seq
601
"#,
602
+
did.as_str(),
603
commit_cid_str,
604
prev_cid,
605
ops,
···
607
&blocks_cids,
608
rev
609
)
610
+
.fetch_one(&mut *tx)
611
.await
612
.map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?;
613
sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
614
+
.execute(&mut *tx)
615
.await
616
.map_err(|e| format!("DB Error (notify): {}", e))?;
617
+
tx.commit()
618
+
.await
619
+
.map_err(|e| format!("Failed to commit transaction: {}", e))?;
620
Ok(seq_row.seq)
621
}
+12
-7
src/api/repo/record/validation.rs
···
1
use crate::api::error::ApiError;
0
2
use crate::validation::{RecordValidator, ValidationError, ValidationStatus};
3
use axum::response::Response;
4
5
-
pub fn validate_record(record: &serde_json::Value, collection: &str) -> Result<(), Box<Response>> {
6
validate_record_with_rkey(record, collection, None)
7
}
8
9
pub fn validate_record_with_rkey(
10
record: &serde_json::Value,
11
-
collection: &str,
12
-
rkey: Option<&str>,
13
) -> Result<(), Box<Response>> {
14
let validator = RecordValidator::new();
15
-
validation_error_to_response(validator.validate_with_rkey(record, collection, rkey))
0
0
0
0
16
}
17
18
pub fn validate_record_with_status(
19
record: &serde_json::Value,
20
-
collection: &str,
21
-
rkey: Option<&str>,
22
require_lexicon: bool,
23
) -> Result<ValidationStatus, Box<Response>> {
24
let validator = RecordValidator::new().require_lexicon(require_lexicon);
25
-
match validator.validate_with_rkey(record, collection, rkey) {
26
Ok(status) => Ok(status),
27
Err(e) => Err(validation_error_to_box_response(e)),
28
}
···
1
use crate::api::error::ApiError;
2
+
use crate::types::{Nsid, Rkey};
3
use crate::validation::{RecordValidator, ValidationError, ValidationStatus};
4
use axum::response::Response;
5
6
+
pub fn validate_record(record: &serde_json::Value, collection: &Nsid) -> Result<(), Box<Response>> {
7
validate_record_with_rkey(record, collection, None)
8
}
9
10
pub fn validate_record_with_rkey(
11
record: &serde_json::Value,
12
+
collection: &Nsid,
13
+
rkey: Option<&Rkey>,
14
) -> Result<(), Box<Response>> {
15
let validator = RecordValidator::new();
16
+
validation_error_to_response(validator.validate_with_rkey(
17
+
record,
18
+
collection.as_str(),
19
+
rkey.map(|r| r.as_str()),
20
+
))
21
}
22
23
pub fn validate_record_with_status(
24
record: &serde_json::Value,
25
+
collection: &Nsid,
26
+
rkey: Option<&Rkey>,
27
require_lexicon: bool,
28
) -> Result<ValidationStatus, Box<Response>> {
29
let validator = RecordValidator::new().require_lexicon(require_lexicon);
30
+
match validator.validate_with_rkey(record, collection.as_str(), rkey.map(|r| r.as_str())) {
31
Ok(status) => Ok(status),
32
Err(e) => Err(validation_error_to_box_response(e)),
33
}
+12
-12
src/api/repo/record/write.rs
···
21
use tracing::error;
22
use uuid::Uuid;
23
24
-
pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
25
let row = sqlx::query(
26
r#"
27
SELECT
···
33
WHERE did = $1
34
"#,
35
)
36
-
.bind(did)
37
.fetch_optional(db)
38
.await?;
39
match row {
···
60
pub async fn prepare_repo_write(
61
state: &AppState,
62
headers: &HeaderMap,
63
-
repo_did: &str,
64
http_method: &str,
65
http_uri: &str,
66
) -> Result<RepoWriteAuth, Response> {
···
96
}
97
response
98
})?;
99
-
if repo_did != auth_user.did {
100
return Err(
101
ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(),
102
);
···
229
match validate_record_with_status(
230
&input.record,
231
&input.collection,
232
-
input.rkey.as_ref().map(|r| r.as_str()),
233
require_lexicon,
234
) {
235
Ok(status) => Some(status),
···
259
_ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(),
260
};
261
let op = RecordOp::Create {
262
-
collection: input.collection.to_string(),
263
-
rkey: rkey.to_string(),
264
cid: record_cid,
265
};
266
let mut new_mst_blocks = std::collections::BTreeMap::new();
···
443
match validate_record_with_status(
444
&input.record,
445
&input.collection,
446
-
Some(input.rkey.as_str()),
447
require_lexicon,
448
) {
449
Ok(status) => Some(status),
···
510
};
511
let op = if existing_cid.is_some() {
512
RecordOp::Update {
513
-
collection: input.collection.to_string(),
514
-
rkey: input.rkey.to_string(),
515
cid: record_cid,
516
prev: existing_cid,
517
}
518
} else {
519
RecordOp::Create {
520
-
collection: input.collection.to_string(),
521
-
rkey: input.rkey.to_string(),
522
cid: record_cid,
523
}
524
};
···
21
use tracing::error;
22
use uuid::Uuid;
23
24
+
pub async fn has_verified_comms_channel(db: &PgPool, did: &Did) -> Result<bool, sqlx::Error> {
25
let row = sqlx::query(
26
r#"
27
SELECT
···
33
WHERE did = $1
34
"#,
35
)
36
+
.bind(did.as_str())
37
.fetch_optional(db)
38
.await?;
39
match row {
···
60
pub async fn prepare_repo_write(
61
state: &AppState,
62
headers: &HeaderMap,
63
+
repo: &AtIdentifier,
64
http_method: &str,
65
http_uri: &str,
66
) -> Result<RepoWriteAuth, Response> {
···
96
}
97
response
98
})?;
99
+
if repo.as_str() != auth_user.did.as_str() {
100
return Err(
101
ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(),
102
);
···
229
match validate_record_with_status(
230
&input.record,
231
&input.collection,
232
+
input.rkey.as_ref(),
233
require_lexicon,
234
) {
235
Ok(status) => Some(status),
···
259
_ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(),
260
};
261
let op = RecordOp::Create {
262
+
collection: input.collection.clone(),
263
+
rkey: rkey.clone(),
264
cid: record_cid,
265
};
266
let mut new_mst_blocks = std::collections::BTreeMap::new();
···
443
match validate_record_with_status(
444
&input.record,
445
&input.collection,
446
+
Some(&input.rkey),
447
require_lexicon,
448
) {
449
Ok(status) => Some(status),
···
510
};
511
let op = if existing_cid.is_some() {
512
RecordOp::Update {
513
+
collection: input.collection.clone(),
514
+
rkey: input.rkey.clone(),
515
cid: record_cid,
516
prev: existing_cid,
517
}
518
} else {
519
RecordOp::Create {
520
+
collection: input.collection.clone(),
521
+
rkey: input.rkey.clone(),
522
cid: record_cid,
523
}
524
};
+7
-6
src/api/server/account_status.rs
···
3
use crate::cache::Cache;
4
use crate::plc::PlcClient;
5
use crate::state::AppState;
6
-
use crate::types::PlainPassword;
7
use axum::{
8
Json,
9
extract::State,
···
449
did
450
);
451
if let Err(e) =
452
-
crate::api::repo::record::sequence_account_event(&state, did.as_str(), true, None)
453
.await
454
{
455
warn!(
···
463
"[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
464
did, handle
465
);
0
466
if let Err(e) = crate::api::repo::record::sequence_identity_event(
467
&state,
468
-
did.as_str(),
469
-
handle.as_deref(),
470
)
471
.await
472
{
···
501
};
502
if let Err(e) = crate::api::repo::record::sequence_sync_event(
503
&state,
504
-
did.as_str(),
505
&root_cid,
506
rev.as_deref(),
507
)
···
609
}
610
if let Err(e) = crate::api::repo::record::sequence_account_event(
611
&state,
612
-
did.as_str(),
613
false,
614
Some("deactivated"),
615
)
···
3
use crate::cache::Cache;
4
use crate::plc::PlcClient;
5
use crate::state::AppState;
6
+
use crate::types::{Handle, PlainPassword};
7
use axum::{
8
Json,
9
extract::State,
···
449
did
450
);
451
if let Err(e) =
452
+
crate::api::repo::record::sequence_account_event(&state, &did, true, None)
453
.await
454
{
455
warn!(
···
463
"[MIGRATION] activateAccount: Sequencing identity event for did={} handle={:?}",
464
did, handle
465
);
466
+
let handle_typed = handle.as_ref().map(|h| Handle::new_unchecked(h));
467
if let Err(e) = crate::api::repo::record::sequence_identity_event(
468
&state,
469
+
&did,
470
+
handle_typed.as_ref(),
471
)
472
.await
473
{
···
502
};
503
if let Err(e) = crate::api::repo::record::sequence_sync_event(
504
&state,
505
+
&did,
506
&root_cid,
507
rev.as_deref(),
508
)
···
610
}
611
if let Err(e) = crate::api::repo::record::sequence_account_event(
612
&state,
613
+
&did,
614
false,
615
Some("deactivated"),
616
)
+11
-7
src/api/server/passkey_account.rs
···
20
use crate::api::repo::record::utils::create_signed_commit;
21
use crate::auth::{ServiceTokenVerifier, is_service_token};
22
use crate::state::{AppState, RateLimitKind};
23
-
use crate::types::{Did, Handle, PlainPassword};
24
use crate::validation::validate_password;
25
26
fn extract_client_ip(headers: &HeaderMap) -> String {
···
512
}
513
};
514
let rev = Tid::now(LimitedU32::MIN);
0
515
let (commit_bytes, _sig) =
516
-
match create_signed_commit(&did, mst_root, rev.as_ref(), None, &secret_key) {
517
Ok(result) => result,
518
Err(e) => {
519
error!("Error creating genesis commit: {:?}", e);
···
600
}
601
602
if !is_byod_did_web {
0
603
if let Err(e) =
604
-
crate::api::repo::record::sequence_identity_event(&state, &did, Some(&handle)).await
605
{
606
warn!("Failed to sequence identity event for {}: {}", did, e);
607
}
608
if let Err(e) =
609
-
crate::api::repo::record::sequence_account_event(&state, &did, true, None).await
610
{
611
warn!("Failed to sequence account event for {}: {}", did, e);
612
}
···
614
"$type": "app.bsky.actor.profile",
615
"displayName": handle
616
});
0
0
617
if let Err(e) = crate::api::repo::record::create_record_internal(
618
&state,
619
-
&did,
620
-
"app.bsky.actor.profile",
621
-
"self",
622
&profile_record,
623
)
624
.await
···
20
use crate::api::repo::record::utils::create_signed_commit;
21
use crate::auth::{ServiceTokenVerifier, is_service_token};
22
use crate::state::{AppState, RateLimitKind};
23
+
use crate::types::{Did, Handle, Nsid, PlainPassword, Rkey};
24
use crate::validation::validate_password;
25
26
fn extract_client_ip(headers: &HeaderMap) -> String {
···
512
}
513
};
514
let rev = Tid::now(LimitedU32::MIN);
515
+
let did_typed = Did::new_unchecked(&did);
516
let (commit_bytes, _sig) =
517
+
match create_signed_commit(&did_typed, mst_root, rev.as_ref(), None, &secret_key) {
518
Ok(result) => result,
519
Err(e) => {
520
error!("Error creating genesis commit: {:?}", e);
···
601
}
602
603
if !is_byod_did_web {
604
+
let handle_typed = Handle::new_unchecked(&handle);
605
if let Err(e) =
606
+
crate::api::repo::record::sequence_identity_event(&state, &did_typed, Some(&handle_typed)).await
607
{
608
warn!("Failed to sequence identity event for {}: {}", did, e);
609
}
610
if let Err(e) =
611
+
crate::api::repo::record::sequence_account_event(&state, &did_typed, true, None).await
612
{
613
warn!("Failed to sequence account event for {}: {}", did, e);
614
}
···
616
"$type": "app.bsky.actor.profile",
617
"displayName": handle
618
});
619
+
let profile_collection = Nsid::new_unchecked("app.bsky.actor.profile");
620
+
let profile_rkey = Rkey::new_unchecked("self");
621
if let Err(e) = crate::api::repo::record::create_record_internal(
622
&state,
623
+
&did_typed,
624
+
&profile_collection,
625
+
&profile_rkey,
626
&profile_record,
627
)
628
.await
+3
-2
tests/commit_signing.rs
···
3
use jacquard_repo::commit::Commit;
4
use k256::ecdsa::SigningKey;
5
use std::str::FromStr;
0
6
7
#[test]
8
fn test_commit_signing_produces_valid_signature() {
···
98
use tranquil_pds::api::repo::record::utils::create_signed_commit;
99
100
let signing_key = SigningKey::random(&mut rand::thread_rng());
101
-
let did = "did:plc:testuser123456789abcdef";
102
let data_cid =
103
Cid::from_str("bafyreib2rxk3ryblouj3fxza5jvx6psmwewwessc4m6g6e7pqhhkwqomfi").unwrap();
104
let rev = Tid::now(LimitedU32::MIN).to_string();
105
106
-
let (signed_bytes, sig) = create_signed_commit(did, data_cid, &rev, None, &signing_key)
107
.expect("signing should succeed");
108
109
assert!(!signed_bytes.is_empty());
···
3
use jacquard_repo::commit::Commit;
4
use k256::ecdsa::SigningKey;
5
use std::str::FromStr;
6
+
use tranquil_pds::Did;
7
8
#[test]
9
fn test_commit_signing_produces_valid_signature() {
···
99
use tranquil_pds::api::repo::record::utils::create_signed_commit;
100
101
let signing_key = SigningKey::random(&mut rand::thread_rng());
102
+
let did = Did::new_unchecked("did:plc:testuser123456789abcdef");
103
let data_cid =
104
Cid::from_str("bafyreib2rxk3ryblouj3fxza5jvx6psmwewwessc4m6g6e7pqhhkwqomfi").unwrap();
105
let rev = Tid::now(LimitedU32::MIN).to_string();
106
107
+
let (signed_bytes, sig) = create_signed_commit(&did, data_cid, &rev, None, &signing_key)
108
.expect("signing should succeed");
109
110
assert!(!signed_bytes.is_empty());