this repo has no description
1use crate::state::AppState;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard::types::{integer::LimitedU32, string::Tid};
5use jacquard_repo::storage::BlockStore;
6use k256::ecdsa::{signature::Signer, Signature, SigningKey};
7use serde::Serialize;
8use serde_json::json;
9use uuid::Uuid;
10/*
11 * Why am I making custom commit objects instead of jacquard's Commit::sign(), you ask?
12 *
13 * At time of writing, jacquard has a bug in how it creates unsigned bytes for signing.
14 * Jacquard sets sig to empty bytes and serializes (6-field CBOR map)
15 * Indigo/ATProto creates a struct *without* the sig field (5-field CBOR map)
16 *
17 * These produce different CBOR bytes, so signatures created with jacquard
18 * don't verify with the relay's algorithm. The relay silently rejects commits
19 * with invalid signatures.
20 *
21 * If you have it downloaded, see: reference-relay-indigo/atproto/repo/commit.go UnsignedBytes()
22 */
23#[derive(Serialize)]
24struct UnsignedCommit<'a> {
25 data: Cid,
26 did: &'a str,
27 prev: Option<Cid>,
28 rev: &'a str,
29 version: i64,
30}
31
32fn create_signed_commit(
33 did: &str,
34 data: Cid,
35 rev: &str,
36 prev: Option<Cid>,
37 signing_key: &SigningKey,
38) -> Result<(Vec<u8>, Bytes), String> {
39 let unsigned = UnsignedCommit {
40 data,
41 did,
42 prev,
43 rev,
44 version: 3,
45 };
46 let unsigned_bytes = serde_ipld_dagcbor::to_vec(&unsigned)
47 .map_err(|e| format!("Failed to serialize unsigned commit: {:?}", e))?;
48 let sig: Signature = signing_key.sign(&unsigned_bytes);
49 let sig_bytes = Bytes::copy_from_slice(&sig.to_bytes());
50 #[derive(Serialize)]
51 struct SignedCommit<'a> {
52 data: Cid,
53 did: &'a str,
54 prev: Option<Cid>,
55 rev: &'a str,
56 #[serde(with = "serde_bytes")]
57 sig: &'a [u8],
58 version: i64,
59 }
60 let signed = SignedCommit {
61 data,
62 did,
63 prev,
64 rev,
65 sig: &sig_bytes,
66 version: 3,
67 };
68 let signed_bytes = serde_ipld_dagcbor::to_vec(&signed)
69 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?;
70 Ok((signed_bytes, sig_bytes))
71}
72
73pub enum RecordOp {
74 Create { collection: String, rkey: String, cid: Cid },
75 Update { collection: String, rkey: String, cid: Cid, prev: Option<Cid> },
76 Delete { collection: String, rkey: String, prev: Option<Cid> },
77}
78
79pub struct CommitResult {
80 pub commit_cid: Cid,
81 pub rev: String,
82}
83
84pub async fn commit_and_log(
85 state: &AppState,
86 did: &str,
87 user_id: Uuid,
88 current_root_cid: Option<Cid>,
89 prev_data_cid: Option<Cid>,
90 new_mst_root: Cid,
91 ops: Vec<RecordOp>,
92 blocks_cids: &[String],
93) -> Result<CommitResult, String> {
94 let key_row = sqlx::query!(
95 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
96 user_id
97 )
98 .fetch_one(&state.db)
99 .await
100 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
101 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
102 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
103 let signing_key = SigningKey::from_slice(&key_bytes)
104 .map_err(|e| format!("Invalid signing key: {}", e))?;
105 let rev = Tid::now(LimitedU32::MIN);
106 let rev_str = rev.to_string();
107 let (new_commit_bytes, _sig) = create_signed_commit(
108 did,
109 new_mst_root,
110 &rev_str,
111 current_root_cid,
112 &signing_key,
113 )?;
114 let new_root_cid = state.block_store.put(&new_commit_bytes).await
115 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
116 let mut tx = state.db.begin().await
117 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
118 let lock_result = sqlx::query!(
119 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
120 user_id
121 )
122 .fetch_optional(&mut *tx)
123 .await;
124 match lock_result {
125 Err(e) => {
126 if let Some(db_err) = e.as_database_error() {
127 if db_err.code().as_deref() == Some("55P03") {
128 return Err("ConcurrentModification: Another request is modifying this repo".to_string());
129 }
130 }
131 return Err(format!("Failed to acquire repo lock: {}", e));
132 }
133 Ok(Some(row)) => {
134 if let Some(expected_root) = ¤t_root_cid {
135 if row.repo_root_cid != expected_root.to_string() {
136 return Err("ConcurrentModification: Repo has been modified since last read".to_string());
137 }
138 }
139 }
140 Ok(None) => {
141 return Err("Repo not found".to_string());
142 }
143 }
144 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id)
145 .execute(&mut *tx)
146 .await
147 .map_err(|e| format!("DB Error (repos): {}", e))?;
148 let mut upsert_collections: Vec<String> = Vec::new();
149 let mut upsert_rkeys: Vec<String> = Vec::new();
150 let mut upsert_cids: Vec<String> = Vec::new();
151 let mut delete_collections: Vec<String> = Vec::new();
152 let mut delete_rkeys: Vec<String> = Vec::new();
153 for op in &ops {
154 match op {
155 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid, .. } => {
156 upsert_collections.push(collection.clone());
157 upsert_rkeys.push(rkey.clone());
158 upsert_cids.push(cid.to_string());
159 }
160 RecordOp::Delete { collection, rkey, .. } => {
161 delete_collections.push(collection.clone());
162 delete_rkeys.push(rkey.clone());
163 }
164 }
165 }
166 if !upsert_collections.is_empty() {
167 sqlx::query!(
168 r#"
169 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev)
170 SELECT $1, collection, rkey, record_cid, $5
171 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid)
172 ON CONFLICT (repo_id, collection, rkey) DO UPDATE
173 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW()
174 "#,
175 user_id,
176 &upsert_collections,
177 &upsert_rkeys,
178 &upsert_cids,
179 rev_str
180 )
181 .execute(&mut *tx)
182 .await
183 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?;
184 }
185 if !delete_collections.is_empty() {
186 sqlx::query!(
187 r#"
188 DELETE FROM records
189 WHERE repo_id = $1
190 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[]))
191 "#,
192 user_id,
193 &delete_collections,
194 &delete_rkeys
195 )
196 .execute(&mut *tx)
197 .await
198 .map_err(|e| format!("DB Error (records batch delete): {}", e))?;
199 }
200 let ops_json = ops.iter().map(|op| {
201 match op {
202 RecordOp::Create { collection, rkey, cid } => json!({
203 "action": "create",
204 "path": format!("{}/{}", collection, rkey),
205 "cid": cid.to_string()
206 }),
207 RecordOp::Update { collection, rkey, cid, prev } => {
208 let mut obj = json!({
209 "action": "update",
210 "path": format!("{}/{}", collection, rkey),
211 "cid": cid.to_string()
212 });
213 if let Some(prev_cid) = prev {
214 obj["prev"] = json!(prev_cid.to_string());
215 }
216 obj
217 },
218 RecordOp::Delete { collection, rkey, prev } => {
219 let mut obj = json!({
220 "action": "delete",
221 "path": format!("{}/{}", collection, rkey),
222 "cid": null
223 });
224 if let Some(prev_cid) = prev {
225 obj["prev"] = json!(prev_cid.to_string());
226 }
227 obj
228 },
229 }
230 }).collect::<Vec<_>>();
231 let event_type = "commit";
232 let prev_cid_str = current_root_cid.map(|c| c.to_string());
233 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string());
234 let seq_row = sqlx::query!(
235 r#"
236 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid)
237 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
238 RETURNING seq
239 "#,
240 did,
241 event_type,
242 new_root_cid.to_string(),
243 prev_cid_str,
244 json!(ops_json),
245 &[] as &[String],
246 blocks_cids,
247 prev_data_cid_str,
248 )
249 .fetch_one(&mut *tx)
250 .await
251 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
252 sqlx::query(
253 &format!("NOTIFY repo_updates, '{}'", seq_row.seq)
254 )
255 .execute(&mut *tx)
256 .await
257 .map_err(|e| format!("DB Error (notify): {}", e))?;
258 tx.commit().await
259 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
260 let _ = sequence_sync_event(state, did, &new_root_cid.to_string()).await;
261 Ok(CommitResult {
262 commit_cid: new_root_cid,
263 rev: rev_str,
264 })
265}
266pub async fn create_record_internal(
267 state: &AppState,
268 did: &str,
269 collection: &str,
270 rkey: &str,
271 record: &serde_json::Value,
272) -> Result<(String, Cid), String> {
273 use crate::repo::tracking::TrackingBlockStore;
274 use jacquard_repo::mst::Mst;
275 use std::sync::Arc;
276 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
277 .fetch_optional(&state.db)
278 .await
279 .map_err(|e| format!("DB error: {}", e))?
280 .ok_or_else(|| "User not found".to_string())?;
281 let root_cid_str: String =
282 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
283 .fetch_optional(&state.db)
284 .await
285 .map_err(|e| format!("DB error: {}", e))?
286 .ok_or_else(|| "Repo not found".to_string())?;
287 let current_root_cid = Cid::from_str(&root_cid_str)
288 .map_err(|_| "Invalid repo root CID".to_string())?;
289 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
290 let commit_bytes = tracking_store.get(¤t_root_cid).await
291 .map_err(|e| format!("Failed to fetch commit: {:?}", e))?
292 .ok_or_else(|| "Commit block not found".to_string())?;
293 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes)
294 .map_err(|e| format!("Failed to parse commit: {:?}", e))?;
295 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
296 let mut record_bytes = Vec::new();
297 serde_ipld_dagcbor::to_writer(&mut record_bytes, record)
298 .map_err(|e| format!("Failed to serialize record: {:?}", e))?;
299 let record_cid = tracking_store.put(&record_bytes).await
300 .map_err(|e| format!("Failed to save record block: {:?}", e))?;
301 let key = format!("{}/{}", collection, rkey);
302 let new_mst = mst.add(&key, record_cid).await
303 .map_err(|e| format!("Failed to add to MST: {:?}", e))?;
304 let new_mst_root = new_mst.persist().await
305 .map_err(|e| format!("Failed to persist MST: {:?}", e))?;
306 let op = RecordOp::Create {
307 collection: collection.to_string(),
308 rkey: rkey.to_string(),
309 cid: record_cid,
310 };
311 let mut relevant_blocks = std::collections::BTreeMap::new();
312 new_mst.blocks_for_path(&key, &mut relevant_blocks).await
313 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?;
314 mst.blocks_for_path(&key, &mut relevant_blocks).await
315 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?;
316 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
317 let mut written_cids = tracking_store.get_all_relevant_cids();
318 for cid in relevant_blocks.keys() {
319 if !written_cids.contains(cid) {
320 written_cids.push(*cid);
321 }
322 }
323 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
324 let result = commit_and_log(
325 state,
326 did,
327 user_id,
328 Some(current_root_cid),
329 Some(commit.data),
330 new_mst_root,
331 vec![op],
332 &written_cids_str,
333 ).await?;
334 let uri = format!("at://{}/{}/{}", did, collection, rkey);
335 Ok((uri, result.commit_cid))
336}
337use std::str::FromStr;
338pub async fn sequence_identity_event(
339 state: &AppState,
340 did: &str,
341 handle: Option<&str>,
342) -> Result<i64, String> {
343 let seq_row = sqlx::query!(
344 r#"
345 INSERT INTO repo_seq (did, event_type, handle)
346 VALUES ($1, 'identity', $2)
347 RETURNING seq
348 "#,
349 did,
350 handle,
351 )
352 .fetch_one(&state.db)
353 .await
354 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
355 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
356 .execute(&state.db)
357 .await
358 .map_err(|e| format!("DB Error (notify): {}", e))?;
359 Ok(seq_row.seq)
360}
361pub async fn sequence_account_event(
362 state: &AppState,
363 did: &str,
364 active: bool,
365 status: Option<&str>,
366) -> Result<i64, String> {
367 let seq_row = sqlx::query!(
368 r#"
369 INSERT INTO repo_seq (did, event_type, active, status)
370 VALUES ($1, 'account', $2, $3)
371 RETURNING seq
372 "#,
373 did,
374 active,
375 status,
376 )
377 .fetch_one(&state.db)
378 .await
379 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
380 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
381 .execute(&state.db)
382 .await
383 .map_err(|e| format!("DB Error (notify): {}", e))?;
384 Ok(seq_row.seq)
385}
386pub async fn sequence_sync_event(
387 state: &AppState,
388 did: &str,
389 commit_cid: &str,
390) -> Result<i64, String> {
391 let seq_row = sqlx::query!(
392 r#"
393 INSERT INTO repo_seq (did, event_type, commit_cid)
394 VALUES ($1, 'sync', $2)
395 RETURNING seq
396 "#,
397 did,
398 commit_cid,
399 )
400 .fetch_one(&state.db)
401 .await
402 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
403 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
404 .execute(&state.db)
405 .await
406 .map_err(|e| format!("DB Error (notify): {}", e))?;
407 Ok(seq_row.seq)
408}