this repo has no description
1use crate::state::AppState;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard::types::{integer::LimitedU32, string::Tid};
5use jacquard_repo::storage::BlockStore;
6use k256::ecdsa::{signature::Signer, Signature, SigningKey};
7use serde::Serialize;
8use serde_json::json;
9use uuid::Uuid;
10
11/*
12 * Why custom commit signing instead of jacquard's Commit::sign()?
13 *
14 * Jacquard previously had a bug in how it created unsigned bytes for signing:
15 * it set sig to empty bytes and serialized (6-field CBOR map), while the
16 * ATProto spec creates a struct *without* the sig field (5-field CBOR map).
17 * These produce different CBOR bytes, so signatures didn't verify with relays.
18 *
19 * The bug has been fixed in jacquard, but the fix is untested here.
20 * TODO: Switch back to jacquard's Commit::sign() and verify it works.
21 */
22
23#[derive(Serialize)]
24struct UnsignedCommit<'a> {
25 data: Cid,
26 did: &'a str,
27 prev: Option<Cid>,
28 rev: &'a str,
29 version: i64,
30}
31
32pub fn create_signed_commit(
33 did: &str,
34 data: Cid,
35 rev: &str,
36 prev: Option<Cid>,
37 signing_key: &SigningKey,
38) -> Result<(Vec<u8>, Bytes), String> {
39 let unsigned = UnsignedCommit {
40 data,
41 did,
42 prev,
43 rev,
44 version: 3,
45 };
46 let unsigned_bytes = serde_ipld_dagcbor::to_vec(&unsigned)
47 .map_err(|e| format!("Failed to serialize unsigned commit: {:?}", e))?;
48 let sig: Signature = signing_key.sign(&unsigned_bytes);
49 let sig_bytes = Bytes::copy_from_slice(&sig.to_bytes());
50 #[derive(Serialize)]
51 struct SignedCommit<'a> {
52 data: Cid,
53 did: &'a str,
54 prev: Option<Cid>,
55 rev: &'a str,
56 #[serde(with = "serde_bytes")]
57 sig: &'a [u8],
58 version: i64,
59 }
60 let signed = SignedCommit {
61 data,
62 did,
63 prev,
64 rev,
65 sig: &sig_bytes,
66 version: 3,
67 };
68 let signed_bytes = serde_ipld_dagcbor::to_vec(&signed)
69 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?;
70 Ok((signed_bytes, sig_bytes))
71}
72
73pub enum RecordOp {
74 Create {
75 collection: String,
76 rkey: String,
77 cid: Cid,
78 },
79 Update {
80 collection: String,
81 rkey: String,
82 cid: Cid,
83 prev: Option<Cid>,
84 },
85 Delete {
86 collection: String,
87 rkey: String,
88 prev: Option<Cid>,
89 },
90}
91
92pub struct CommitResult {
93 pub commit_cid: Cid,
94 pub rev: String,
95}
96
97pub struct CommitParams<'a> {
98 pub did: &'a str,
99 pub user_id: Uuid,
100 pub current_root_cid: Option<Cid>,
101 pub prev_data_cid: Option<Cid>,
102 pub new_mst_root: Cid,
103 pub ops: Vec<RecordOp>,
104 pub blocks_cids: &'a [String],
105}
106
107pub async fn commit_and_log(
108 state: &AppState,
109 params: CommitParams<'_>,
110) -> Result<CommitResult, String> {
111 let CommitParams {
112 did,
113 user_id,
114 current_root_cid,
115 prev_data_cid,
116 new_mst_root,
117 ops,
118 blocks_cids,
119 } = params;
120 let key_row = sqlx::query!(
121 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
122 user_id
123 )
124 .fetch_one(&state.db)
125 .await
126 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
127 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
128 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
129 let signing_key =
130 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?;
131 let rev = Tid::now(LimitedU32::MIN);
132 let rev_str = rev.to_string();
133 let (new_commit_bytes, _sig) =
134 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?;
135 let new_root_cid = state
136 .block_store
137 .put(&new_commit_bytes)
138 .await
139 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
140 let mut tx = state
141 .db
142 .begin()
143 .await
144 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
145 let lock_result = sqlx::query!(
146 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
147 user_id
148 )
149 .fetch_optional(&mut *tx)
150 .await;
151 match lock_result {
152 Err(e) => {
153 if let Some(db_err) = e.as_database_error()
154 && db_err.code().as_deref() == Some("55P03")
155 {
156 return Err(
157 "ConcurrentModification: Another request is modifying this repo".to_string(),
158 );
159 }
160 return Err(format!("Failed to acquire repo lock: {}", e));
161 }
162 Ok(Some(row)) => {
163 if let Some(expected_root) = ¤t_root_cid
164 && row.repo_root_cid != expected_root.to_string()
165 {
166 return Err(
167 "ConcurrentModification: Repo has been modified since last read".to_string(),
168 );
169 }
170 }
171 Ok(None) => {
172 return Err("Repo not found".to_string());
173 }
174 }
175 let is_account_active = sqlx::query_scalar!(
176 "SELECT deactivated_at IS NULL FROM users WHERE id = $1",
177 user_id
178 )
179 .fetch_optional(&mut *tx)
180 .await
181 .map_err(|e| format!("Failed to check account status: {}", e))?
182 .flatten()
183 .unwrap_or(false);
184 sqlx::query!(
185 "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2",
186 new_root_cid.to_string(),
187 user_id
188 )
189 .execute(&mut *tx)
190 .await
191 .map_err(|e| format!("DB Error (repos): {}", e))?;
192 let mut upsert_collections: Vec<String> = Vec::new();
193 let mut upsert_rkeys: Vec<String> = Vec::new();
194 let mut upsert_cids: Vec<String> = Vec::new();
195 let mut delete_collections: Vec<String> = Vec::new();
196 let mut delete_rkeys: Vec<String> = Vec::new();
197 for op in &ops {
198 match op {
199 RecordOp::Create {
200 collection,
201 rkey,
202 cid,
203 }
204 | RecordOp::Update {
205 collection,
206 rkey,
207 cid,
208 ..
209 } => {
210 upsert_collections.push(collection.clone());
211 upsert_rkeys.push(rkey.clone());
212 upsert_cids.push(cid.to_string());
213 }
214 RecordOp::Delete {
215 collection, rkey, ..
216 } => {
217 delete_collections.push(collection.clone());
218 delete_rkeys.push(rkey.clone());
219 }
220 }
221 }
222 if !upsert_collections.is_empty() {
223 sqlx::query!(
224 r#"
225 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev)
226 SELECT $1, collection, rkey, record_cid, $5
227 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid)
228 ON CONFLICT (repo_id, collection, rkey) DO UPDATE
229 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW()
230 "#,
231 user_id,
232 &upsert_collections,
233 &upsert_rkeys,
234 &upsert_cids,
235 rev_str
236 )
237 .execute(&mut *tx)
238 .await
239 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?;
240 }
241 if !delete_collections.is_empty() {
242 sqlx::query!(
243 r#"
244 DELETE FROM records
245 WHERE repo_id = $1
246 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[]))
247 "#,
248 user_id,
249 &delete_collections,
250 &delete_rkeys
251 )
252 .execute(&mut *tx)
253 .await
254 .map_err(|e| format!("DB Error (records batch delete): {}", e))?;
255 }
256 let ops_json = ops
257 .iter()
258 .map(|op| match op {
259 RecordOp::Create {
260 collection,
261 rkey,
262 cid,
263 } => json!({
264 "action": "create",
265 "path": format!("{}/{}", collection, rkey),
266 "cid": cid.to_string()
267 }),
268 RecordOp::Update {
269 collection,
270 rkey,
271 cid,
272 prev,
273 } => {
274 let mut obj = json!({
275 "action": "update",
276 "path": format!("{}/{}", collection, rkey),
277 "cid": cid.to_string()
278 });
279 if let Some(prev_cid) = prev {
280 obj["prev"] = json!(prev_cid.to_string());
281 }
282 obj
283 }
284 RecordOp::Delete {
285 collection,
286 rkey,
287 prev,
288 } => {
289 let mut obj = json!({
290 "action": "delete",
291 "path": format!("{}/{}", collection, rkey),
292 "cid": null
293 });
294 if let Some(prev_cid) = prev {
295 obj["prev"] = json!(prev_cid.to_string());
296 }
297 obj
298 }
299 })
300 .collect::<Vec<_>>();
301 if is_account_active {
302 let event_type = "commit";
303 let prev_cid_str = current_root_cid.map(|c| c.to_string());
304 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string());
305 let seq_row = sqlx::query!(
306 r#"
307 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid)
308 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
309 RETURNING seq
310 "#,
311 did,
312 event_type,
313 new_root_cid.to_string(),
314 prev_cid_str,
315 json!(ops_json),
316 &[] as &[String],
317 blocks_cids,
318 prev_data_cid_str,
319 )
320 .fetch_one(&mut *tx)
321 .await
322 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
323 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
324 .execute(&mut *tx)
325 .await
326 .map_err(|e| format!("DB Error (notify): {}", e))?;
327 }
328 tx.commit()
329 .await
330 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
331 if is_account_active {
332 let _ = sequence_sync_event(state, did, &new_root_cid.to_string()).await;
333 }
334 Ok(CommitResult {
335 commit_cid: new_root_cid,
336 rev: rev_str,
337 })
338}
339pub async fn create_record_internal(
340 state: &AppState,
341 did: &str,
342 collection: &str,
343 rkey: &str,
344 record: &serde_json::Value,
345) -> Result<(String, Cid), String> {
346 use crate::repo::tracking::TrackingBlockStore;
347 use jacquard_repo::mst::Mst;
348 use std::sync::Arc;
349 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
350 .fetch_optional(&state.db)
351 .await
352 .map_err(|e| format!("DB error: {}", e))?
353 .ok_or_else(|| "User not found".to_string())?;
354 let root_cid_str: String = sqlx::query_scalar!(
355 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
356 user_id
357 )
358 .fetch_optional(&state.db)
359 .await
360 .map_err(|e| format!("DB error: {}", e))?
361 .ok_or_else(|| "Repo not found".to_string())?;
362 let current_root_cid =
363 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?;
364 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
365 let commit_bytes = tracking_store
366 .get(¤t_root_cid)
367 .await
368 .map_err(|e| format!("Failed to fetch commit: {:?}", e))?
369 .ok_or_else(|| "Commit block not found".to_string())?;
370 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes)
371 .map_err(|e| format!("Failed to parse commit: {:?}", e))?;
372 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
373 let mut record_bytes = Vec::new();
374 serde_ipld_dagcbor::to_writer(&mut record_bytes, record)
375 .map_err(|e| format!("Failed to serialize record: {:?}", e))?;
376 let record_cid = tracking_store
377 .put(&record_bytes)
378 .await
379 .map_err(|e| format!("Failed to save record block: {:?}", e))?;
380 let key = format!("{}/{}", collection, rkey);
381 let new_mst = mst
382 .add(&key, record_cid)
383 .await
384 .map_err(|e| format!("Failed to add to MST: {:?}", e))?;
385 let new_mst_root = new_mst
386 .persist()
387 .await
388 .map_err(|e| format!("Failed to persist MST: {:?}", e))?;
389 let op = RecordOp::Create {
390 collection: collection.to_string(),
391 rkey: rkey.to_string(),
392 cid: record_cid,
393 };
394 let mut relevant_blocks = std::collections::BTreeMap::new();
395 new_mst
396 .blocks_for_path(&key, &mut relevant_blocks)
397 .await
398 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?;
399 mst.blocks_for_path(&key, &mut relevant_blocks)
400 .await
401 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?;
402 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
403 let mut written_cids = tracking_store.get_all_relevant_cids();
404 for cid in relevant_blocks.keys() {
405 if !written_cids.contains(cid) {
406 written_cids.push(*cid);
407 }
408 }
409 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
410 let result = commit_and_log(
411 state,
412 CommitParams {
413 did,
414 user_id,
415 current_root_cid: Some(current_root_cid),
416 prev_data_cid: Some(commit.data),
417 new_mst_root,
418 ops: vec![op],
419 blocks_cids: &written_cids_str,
420 },
421 )
422 .await?;
423 let uri = format!("at://{}/{}/{}", did, collection, rkey);
424 Ok((uri, result.commit_cid))
425}
426use std::str::FromStr;
427pub async fn sequence_identity_event(
428 state: &AppState,
429 did: &str,
430 handle: Option<&str>,
431) -> Result<i64, String> {
432 let seq_row = sqlx::query!(
433 r#"
434 INSERT INTO repo_seq (did, event_type, handle)
435 VALUES ($1, 'identity', $2)
436 RETURNING seq
437 "#,
438 did,
439 handle,
440 )
441 .fetch_one(&state.db)
442 .await
443 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
444 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
445 .execute(&state.db)
446 .await
447 .map_err(|e| format!("DB Error (notify): {}", e))?;
448 Ok(seq_row.seq)
449}
450pub async fn sequence_account_event(
451 state: &AppState,
452 did: &str,
453 active: bool,
454 status: Option<&str>,
455) -> Result<i64, String> {
456 let seq_row = sqlx::query!(
457 r#"
458 INSERT INTO repo_seq (did, event_type, active, status)
459 VALUES ($1, 'account', $2, $3)
460 RETURNING seq
461 "#,
462 did,
463 active,
464 status,
465 )
466 .fetch_one(&state.db)
467 .await
468 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
469 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
470 .execute(&state.db)
471 .await
472 .map_err(|e| format!("DB Error (notify): {}", e))?;
473 Ok(seq_row.seq)
474}
475pub async fn sequence_sync_event(
476 state: &AppState,
477 did: &str,
478 commit_cid: &str,
479) -> Result<i64, String> {
480 let seq_row = sqlx::query!(
481 r#"
482 INSERT INTO repo_seq (did, event_type, commit_cid)
483 VALUES ($1, 'sync', $2)
484 RETURNING seq
485 "#,
486 did,
487 commit_cid,
488 )
489 .fetch_one(&state.db)
490 .await
491 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
492 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
493 .execute(&state.db)
494 .await
495 .map_err(|e| format!("DB Error (notify): {}", e))?;
496 Ok(seq_row.seq)
497}
498
499pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> {
500 let repo_root = sqlx::query_scalar!(
501 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
502 did
503 )
504 .fetch_optional(&state.db)
505 .await
506 .map_err(|e| format!("DB Error fetching repo root: {}", e))?
507 .ok_or_else(|| "Repo not found".to_string())?;
508 let ops = serde_json::json!([]);
509 let blobs: Vec<String> = vec![];
510 let blocks_cids: Vec<String> = vec![];
511 let seq_row = sqlx::query!(
512 r#"
513 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)
514 VALUES ($1, 'commit', $2, $2, $3, $4, $5)
515 RETURNING seq
516 "#,
517 did,
518 repo_root,
519 ops,
520 &blobs,
521 &blocks_cids
522 )
523 .fetch_one(&state.db)
524 .await
525 .map_err(|e| format!("DB Error (repo_seq empty commit): {}", e))?;
526 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
527 .execute(&state.db)
528 .await
529 .map_err(|e| format!("DB Error (notify): {}", e))?;
530 Ok(seq_row.seq)
531}