this repo has no description
1use crate::state::AppState;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard::types::{integer::LimitedU32, string::Tid};
5use jacquard_repo::storage::BlockStore;
6use k256::ecdsa::{Signature, SigningKey, signature::Signer};
7use serde::Serialize;
8use serde_json::json;
9use uuid::Uuid;
10/*
11 * Why am I making custom commit objects instead of jacquard's Commit::sign(), you ask?
12 *
13 * At time of writing, jacquard has a bug in how it creates unsigned bytes for signing.
14 * Jacquard sets sig to empty bytes and serializes (6-field CBOR map)
15 * Indigo/ATProto creates a struct *without* the sig field (5-field CBOR map)
16 *
17 * These produce different CBOR bytes, so signatures created with jacquard
18 * don't verify with the relay's algorithm. The relay silently rejects commits
19 * with invalid signatures.
20 *
21 * If you have it downloaded, see: reference-relay-indigo/atproto/repo/commit.go UnsignedBytes()
22 */
23#[derive(Serialize)]
24struct UnsignedCommit<'a> {
25 data: Cid,
26 did: &'a str,
27 prev: Option<Cid>,
28 rev: &'a str,
29 version: i64,
30}
31
32fn create_signed_commit(
33 did: &str,
34 data: Cid,
35 rev: &str,
36 prev: Option<Cid>,
37 signing_key: &SigningKey,
38) -> Result<(Vec<u8>, Bytes), String> {
39 let unsigned = UnsignedCommit {
40 data,
41 did,
42 prev,
43 rev,
44 version: 3,
45 };
46 let unsigned_bytes = serde_ipld_dagcbor::to_vec(&unsigned)
47 .map_err(|e| format!("Failed to serialize unsigned commit: {:?}", e))?;
48 let sig: Signature = signing_key.sign(&unsigned_bytes);
49 let sig_bytes = Bytes::copy_from_slice(&sig.to_bytes());
50 #[derive(Serialize)]
51 struct SignedCommit<'a> {
52 data: Cid,
53 did: &'a str,
54 prev: Option<Cid>,
55 rev: &'a str,
56 #[serde(with = "serde_bytes")]
57 sig: &'a [u8],
58 version: i64,
59 }
60 let signed = SignedCommit {
61 data,
62 did,
63 prev,
64 rev,
65 sig: &sig_bytes,
66 version: 3,
67 };
68 let signed_bytes = serde_ipld_dagcbor::to_vec(&signed)
69 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?;
70 Ok((signed_bytes, sig_bytes))
71}
72
73pub enum RecordOp {
74 Create {
75 collection: String,
76 rkey: String,
77 cid: Cid,
78 },
79 Update {
80 collection: String,
81 rkey: String,
82 cid: Cid,
83 prev: Option<Cid>,
84 },
85 Delete {
86 collection: String,
87 rkey: String,
88 prev: Option<Cid>,
89 },
90}
91
92pub struct CommitResult {
93 pub commit_cid: Cid,
94 pub rev: String,
95}
96
97pub struct CommitParams<'a> {
98 pub did: &'a str,
99 pub user_id: Uuid,
100 pub current_root_cid: Option<Cid>,
101 pub prev_data_cid: Option<Cid>,
102 pub new_mst_root: Cid,
103 pub ops: Vec<RecordOp>,
104 pub blocks_cids: &'a [String],
105}
106
107pub async fn commit_and_log(
108 state: &AppState,
109 params: CommitParams<'_>,
110) -> Result<CommitResult, String> {
111 let CommitParams {
112 did,
113 user_id,
114 current_root_cid,
115 prev_data_cid,
116 new_mst_root,
117 ops,
118 blocks_cids,
119 } = params;
120 let key_row = sqlx::query!(
121 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
122 user_id
123 )
124 .fetch_one(&state.db)
125 .await
126 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
127 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
128 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
129 let signing_key =
130 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?;
131 let rev = Tid::now(LimitedU32::MIN);
132 let rev_str = rev.to_string();
133 let (new_commit_bytes, _sig) =
134 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?;
135 let new_root_cid = state
136 .block_store
137 .put(&new_commit_bytes)
138 .await
139 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
140 let mut tx = state
141 .db
142 .begin()
143 .await
144 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
145 let lock_result = sqlx::query!(
146 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
147 user_id
148 )
149 .fetch_optional(&mut *tx)
150 .await;
151 match lock_result {
152 Err(e) => {
153 if let Some(db_err) = e.as_database_error()
154 && db_err.code().as_deref() == Some("55P03") {
155 return Err(
156 "ConcurrentModification: Another request is modifying this repo"
157 .to_string(),
158 );
159 }
160 return Err(format!("Failed to acquire repo lock: {}", e));
161 }
162 Ok(Some(row)) => {
163 if let Some(expected_root) = ¤t_root_cid
164 && row.repo_root_cid != expected_root.to_string() {
165 return Err(
166 "ConcurrentModification: Repo has been modified since last read"
167 .to_string(),
168 );
169 }
170 }
171 Ok(None) => {
172 return Err("Repo not found".to_string());
173 }
174 }
175 sqlx::query!(
176 "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2",
177 new_root_cid.to_string(),
178 user_id
179 )
180 .execute(&mut *tx)
181 .await
182 .map_err(|e| format!("DB Error (repos): {}", e))?;
183 let mut upsert_collections: Vec<String> = Vec::new();
184 let mut upsert_rkeys: Vec<String> = Vec::new();
185 let mut upsert_cids: Vec<String> = Vec::new();
186 let mut delete_collections: Vec<String> = Vec::new();
187 let mut delete_rkeys: Vec<String> = Vec::new();
188 for op in &ops {
189 match op {
190 RecordOp::Create {
191 collection,
192 rkey,
193 cid,
194 }
195 | RecordOp::Update {
196 collection,
197 rkey,
198 cid,
199 ..
200 } => {
201 upsert_collections.push(collection.clone());
202 upsert_rkeys.push(rkey.clone());
203 upsert_cids.push(cid.to_string());
204 }
205 RecordOp::Delete {
206 collection, rkey, ..
207 } => {
208 delete_collections.push(collection.clone());
209 delete_rkeys.push(rkey.clone());
210 }
211 }
212 }
213 if !upsert_collections.is_empty() {
214 sqlx::query!(
215 r#"
216 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev)
217 SELECT $1, collection, rkey, record_cid, $5
218 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid)
219 ON CONFLICT (repo_id, collection, rkey) DO UPDATE
220 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW()
221 "#,
222 user_id,
223 &upsert_collections,
224 &upsert_rkeys,
225 &upsert_cids,
226 rev_str
227 )
228 .execute(&mut *tx)
229 .await
230 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?;
231 }
232 if !delete_collections.is_empty() {
233 sqlx::query!(
234 r#"
235 DELETE FROM records
236 WHERE repo_id = $1
237 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[]))
238 "#,
239 user_id,
240 &delete_collections,
241 &delete_rkeys
242 )
243 .execute(&mut *tx)
244 .await
245 .map_err(|e| format!("DB Error (records batch delete): {}", e))?;
246 }
247 let ops_json = ops
248 .iter()
249 .map(|op| match op {
250 RecordOp::Create {
251 collection,
252 rkey,
253 cid,
254 } => json!({
255 "action": "create",
256 "path": format!("{}/{}", collection, rkey),
257 "cid": cid.to_string()
258 }),
259 RecordOp::Update {
260 collection,
261 rkey,
262 cid,
263 prev,
264 } => {
265 let mut obj = json!({
266 "action": "update",
267 "path": format!("{}/{}", collection, rkey),
268 "cid": cid.to_string()
269 });
270 if let Some(prev_cid) = prev {
271 obj["prev"] = json!(prev_cid.to_string());
272 }
273 obj
274 }
275 RecordOp::Delete {
276 collection,
277 rkey,
278 prev,
279 } => {
280 let mut obj = json!({
281 "action": "delete",
282 "path": format!("{}/{}", collection, rkey),
283 "cid": null
284 });
285 if let Some(prev_cid) = prev {
286 obj["prev"] = json!(prev_cid.to_string());
287 }
288 obj
289 }
290 })
291 .collect::<Vec<_>>();
292 let event_type = "commit";
293 let prev_cid_str = current_root_cid.map(|c| c.to_string());
294 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string());
295 let seq_row = sqlx::query!(
296 r#"
297 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid)
298 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
299 RETURNING seq
300 "#,
301 did,
302 event_type,
303 new_root_cid.to_string(),
304 prev_cid_str,
305 json!(ops_json),
306 &[] as &[String],
307 blocks_cids,
308 prev_data_cid_str,
309 )
310 .fetch_one(&mut *tx)
311 .await
312 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
313 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
314 .execute(&mut *tx)
315 .await
316 .map_err(|e| format!("DB Error (notify): {}", e))?;
317 tx.commit()
318 .await
319 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
320 let _ = sequence_sync_event(state, did, &new_root_cid.to_string()).await;
321 Ok(CommitResult {
322 commit_cid: new_root_cid,
323 rev: rev_str,
324 })
325}
326pub async fn create_record_internal(
327 state: &AppState,
328 did: &str,
329 collection: &str,
330 rkey: &str,
331 record: &serde_json::Value,
332) -> Result<(String, Cid), String> {
333 use crate::repo::tracking::TrackingBlockStore;
334 use jacquard_repo::mst::Mst;
335 use std::sync::Arc;
336 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
337 .fetch_optional(&state.db)
338 .await
339 .map_err(|e| format!("DB error: {}", e))?
340 .ok_or_else(|| "User not found".to_string())?;
341 let root_cid_str: String = sqlx::query_scalar!(
342 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
343 user_id
344 )
345 .fetch_optional(&state.db)
346 .await
347 .map_err(|e| format!("DB error: {}", e))?
348 .ok_or_else(|| "Repo not found".to_string())?;
349 let current_root_cid =
350 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?;
351 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
352 let commit_bytes = tracking_store
353 .get(¤t_root_cid)
354 .await
355 .map_err(|e| format!("Failed to fetch commit: {:?}", e))?
356 .ok_or_else(|| "Commit block not found".to_string())?;
357 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes)
358 .map_err(|e| format!("Failed to parse commit: {:?}", e))?;
359 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
360 let mut record_bytes = Vec::new();
361 serde_ipld_dagcbor::to_writer(&mut record_bytes, record)
362 .map_err(|e| format!("Failed to serialize record: {:?}", e))?;
363 let record_cid = tracking_store
364 .put(&record_bytes)
365 .await
366 .map_err(|e| format!("Failed to save record block: {:?}", e))?;
367 let key = format!("{}/{}", collection, rkey);
368 let new_mst = mst
369 .add(&key, record_cid)
370 .await
371 .map_err(|e| format!("Failed to add to MST: {:?}", e))?;
372 let new_mst_root = new_mst
373 .persist()
374 .await
375 .map_err(|e| format!("Failed to persist MST: {:?}", e))?;
376 let op = RecordOp::Create {
377 collection: collection.to_string(),
378 rkey: rkey.to_string(),
379 cid: record_cid,
380 };
381 let mut relevant_blocks = std::collections::BTreeMap::new();
382 new_mst
383 .blocks_for_path(&key, &mut relevant_blocks)
384 .await
385 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?;
386 mst.blocks_for_path(&key, &mut relevant_blocks)
387 .await
388 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?;
389 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
390 let mut written_cids = tracking_store.get_all_relevant_cids();
391 for cid in relevant_blocks.keys() {
392 if !written_cids.contains(cid) {
393 written_cids.push(*cid);
394 }
395 }
396 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
397 let result = commit_and_log(
398 state,
399 CommitParams {
400 did,
401 user_id,
402 current_root_cid: Some(current_root_cid),
403 prev_data_cid: Some(commit.data),
404 new_mst_root,
405 ops: vec![op],
406 blocks_cids: &written_cids_str,
407 },
408 )
409 .await?;
410 let uri = format!("at://{}/{}/{}", did, collection, rkey);
411 Ok((uri, result.commit_cid))
412}
413use std::str::FromStr;
414pub async fn sequence_identity_event(
415 state: &AppState,
416 did: &str,
417 handle: Option<&str>,
418) -> Result<i64, String> {
419 let seq_row = sqlx::query!(
420 r#"
421 INSERT INTO repo_seq (did, event_type, handle)
422 VALUES ($1, 'identity', $2)
423 RETURNING seq
424 "#,
425 did,
426 handle,
427 )
428 .fetch_one(&state.db)
429 .await
430 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
431 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
432 .execute(&state.db)
433 .await
434 .map_err(|e| format!("DB Error (notify): {}", e))?;
435 Ok(seq_row.seq)
436}
437pub async fn sequence_account_event(
438 state: &AppState,
439 did: &str,
440 active: bool,
441 status: Option<&str>,
442) -> Result<i64, String> {
443 let seq_row = sqlx::query!(
444 r#"
445 INSERT INTO repo_seq (did, event_type, active, status)
446 VALUES ($1, 'account', $2, $3)
447 RETURNING seq
448 "#,
449 did,
450 active,
451 status,
452 )
453 .fetch_one(&state.db)
454 .await
455 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
456 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
457 .execute(&state.db)
458 .await
459 .map_err(|e| format!("DB Error (notify): {}", e))?;
460 Ok(seq_row.seq)
461}
462pub async fn sequence_sync_event(
463 state: &AppState,
464 did: &str,
465 commit_cid: &str,
466) -> Result<i64, String> {
467 let seq_row = sqlx::query!(
468 r#"
469 INSERT INTO repo_seq (did, event_type, commit_cid)
470 VALUES ($1, 'sync', $2)
471 RETURNING seq
472 "#,
473 did,
474 commit_cid,
475 )
476 .fetch_one(&state.db)
477 .await
478 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
479 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
480 .execute(&state.db)
481 .await
482 .map_err(|e| format!("DB Error (notify): {}", e))?;
483 Ok(seq_row.seq)
484}