this repo has no description
1use crate::state::AppState;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard::types::{integer::LimitedU32, string::Tid};
5use jacquard_repo::storage::BlockStore;
6use k256::ecdsa::{signature::Signer, Signature, SigningKey};
7use serde::Serialize;
8use serde_json::json;
9use uuid::Uuid;
10/*
11 * Why am I making custom commit objects instead of jacquard's Commit::sign(), you ask?
12 *
13 * At time of writing, jacquard has a bug in how it creates unsigned bytes for signing.
14 * Jacquard sets sig to empty bytes and serializes (6-field CBOR map)
15 * Indigo/ATProto creates a struct *without* the sig field (5-field CBOR map)
16 *
17 * These produce different CBOR bytes, so signatures created with jacquard
18 * don't verify with the relay's algorithm. The relay silently rejects commits
19 * with invalid signatures.
20 *
21 * If you have it downloaded, see: reference-relay-indigo/atproto/repo/commit.go UnsignedBytes()
22 */
23#[derive(Serialize)]
24struct UnsignedCommit<'a> {
25 data: Cid,
26 did: &'a str,
27 prev: Option<Cid>,
28 rev: &'a str,
29 version: i64,
30}
31fn create_signed_commit(
32 did: &str,
33 data: Cid,
34 rev: &str,
35 prev: Option<Cid>,
36 signing_key: &SigningKey,
37) -> Result<(Vec<u8>, Bytes), String> {
38 let unsigned = UnsignedCommit {
39 data,
40 did,
41 prev,
42 rev,
43 version: 3,
44 };
45 let unsigned_bytes = serde_ipld_dagcbor::to_vec(&unsigned)
46 .map_err(|e| format!("Failed to serialize unsigned commit: {:?}", e))?;
47 let sig: Signature = signing_key.sign(&unsigned_bytes);
48 let sig_bytes = Bytes::copy_from_slice(&sig.to_bytes());
49 #[derive(Serialize)]
50 struct SignedCommit<'a> {
51 data: Cid,
52 did: &'a str,
53 prev: Option<Cid>,
54 rev: &'a str,
55 #[serde(with = "serde_bytes")]
56 sig: &'a [u8],
57 version: i64,
58 }
59 let signed = SignedCommit {
60 data,
61 did,
62 prev,
63 rev,
64 sig: &sig_bytes,
65 version: 3,
66 };
67 let signed_bytes = serde_ipld_dagcbor::to_vec(&signed)
68 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?;
69 Ok((signed_bytes, sig_bytes))
70}
71pub enum RecordOp {
72 Create { collection: String, rkey: String, cid: Cid },
73 Update { collection: String, rkey: String, cid: Cid, prev: Option<Cid> },
74 Delete { collection: String, rkey: String, prev: Option<Cid> },
75}
76pub struct CommitResult {
77 pub commit_cid: Cid,
78 pub rev: String,
79}
80pub async fn commit_and_log(
81 state: &AppState,
82 did: &str,
83 user_id: Uuid,
84 current_root_cid: Option<Cid>,
85 prev_data_cid: Option<Cid>,
86 new_mst_root: Cid,
87 ops: Vec<RecordOp>,
88 blocks_cids: &[String],
89) -> Result<CommitResult, String> {
90 let key_row = sqlx::query!(
91 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
92 user_id
93 )
94 .fetch_one(&state.db)
95 .await
96 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
97 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
98 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
99 let signing_key = SigningKey::from_slice(&key_bytes)
100 .map_err(|e| format!("Invalid signing key: {}", e))?;
101 let rev = Tid::now(LimitedU32::MIN);
102 let rev_str = rev.to_string();
103 let (new_commit_bytes, _sig) = create_signed_commit(
104 did,
105 new_mst_root,
106 &rev_str,
107 current_root_cid,
108 &signing_key,
109 )?;
110 let new_root_cid = state.block_store.put(&new_commit_bytes).await
111 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
112 let mut tx = state.db.begin().await
113 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
114 let lock_result = sqlx::query!(
115 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
116 user_id
117 )
118 .fetch_optional(&mut *tx)
119 .await;
120 match lock_result {
121 Err(e) => {
122 if let Some(db_err) = e.as_database_error() {
123 if db_err.code().as_deref() == Some("55P03") {
124 return Err("ConcurrentModification: Another request is modifying this repo".to_string());
125 }
126 }
127 return Err(format!("Failed to acquire repo lock: {}", e));
128 }
129 Ok(Some(row)) => {
130 if let Some(expected_root) = ¤t_root_cid {
131 if row.repo_root_cid != expected_root.to_string() {
132 return Err("ConcurrentModification: Repo has been modified since last read".to_string());
133 }
134 }
135 }
136 Ok(None) => {
137 return Err("Repo not found".to_string());
138 }
139 }
140 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id)
141 .execute(&mut *tx)
142 .await
143 .map_err(|e| format!("DB Error (repos): {}", e))?;
144 let mut upsert_collections: Vec<String> = Vec::new();
145 let mut upsert_rkeys: Vec<String> = Vec::new();
146 let mut upsert_cids: Vec<String> = Vec::new();
147 let mut delete_collections: Vec<String> = Vec::new();
148 let mut delete_rkeys: Vec<String> = Vec::new();
149 for op in &ops {
150 match op {
151 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid, .. } => {
152 upsert_collections.push(collection.clone());
153 upsert_rkeys.push(rkey.clone());
154 upsert_cids.push(cid.to_string());
155 }
156 RecordOp::Delete { collection, rkey, .. } => {
157 delete_collections.push(collection.clone());
158 delete_rkeys.push(rkey.clone());
159 }
160 }
161 }
162 if !upsert_collections.is_empty() {
163 sqlx::query!(
164 r#"
165 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev)
166 SELECT $1, collection, rkey, record_cid, $5
167 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid)
168 ON CONFLICT (repo_id, collection, rkey) DO UPDATE
169 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW()
170 "#,
171 user_id,
172 &upsert_collections,
173 &upsert_rkeys,
174 &upsert_cids,
175 rev_str
176 )
177 .execute(&mut *tx)
178 .await
179 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?;
180 }
181 if !delete_collections.is_empty() {
182 sqlx::query!(
183 r#"
184 DELETE FROM records
185 WHERE repo_id = $1
186 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[]))
187 "#,
188 user_id,
189 &delete_collections,
190 &delete_rkeys
191 )
192 .execute(&mut *tx)
193 .await
194 .map_err(|e| format!("DB Error (records batch delete): {}", e))?;
195 }
196 let ops_json = ops.iter().map(|op| {
197 match op {
198 RecordOp::Create { collection, rkey, cid } => json!({
199 "action": "create",
200 "path": format!("{}/{}", collection, rkey),
201 "cid": cid.to_string()
202 }),
203 RecordOp::Update { collection, rkey, cid, prev } => {
204 let mut obj = json!({
205 "action": "update",
206 "path": format!("{}/{}", collection, rkey),
207 "cid": cid.to_string()
208 });
209 if let Some(prev_cid) = prev {
210 obj["prev"] = json!(prev_cid.to_string());
211 }
212 obj
213 },
214 RecordOp::Delete { collection, rkey, prev } => {
215 let mut obj = json!({
216 "action": "delete",
217 "path": format!("{}/{}", collection, rkey),
218 "cid": null
219 });
220 if let Some(prev_cid) = prev {
221 obj["prev"] = json!(prev_cid.to_string());
222 }
223 obj
224 },
225 }
226 }).collect::<Vec<_>>();
227 let event_type = "commit";
228 let prev_cid_str = current_root_cid.map(|c| c.to_string());
229 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string());
230 let seq_row = sqlx::query!(
231 r#"
232 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid)
233 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
234 RETURNING seq
235 "#,
236 did,
237 event_type,
238 new_root_cid.to_string(),
239 prev_cid_str,
240 json!(ops_json),
241 &[] as &[String],
242 blocks_cids,
243 prev_data_cid_str,
244 )
245 .fetch_one(&mut *tx)
246 .await
247 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
248 sqlx::query(
249 &format!("NOTIFY repo_updates, '{}'", seq_row.seq)
250 )
251 .execute(&mut *tx)
252 .await
253 .map_err(|e| format!("DB Error (notify): {}", e))?;
254 tx.commit().await
255 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
256 let _ = sequence_sync_event(state, did, &new_root_cid.to_string()).await;
257 Ok(CommitResult {
258 commit_cid: new_root_cid,
259 rev: rev_str,
260 })
261}
262pub async fn create_record_internal(
263 state: &AppState,
264 did: &str,
265 collection: &str,
266 rkey: &str,
267 record: &serde_json::Value,
268) -> Result<(String, Cid), String> {
269 use crate::repo::tracking::TrackingBlockStore;
270 use jacquard_repo::mst::Mst;
271 use std::sync::Arc;
272 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
273 .fetch_optional(&state.db)
274 .await
275 .map_err(|e| format!("DB error: {}", e))?
276 .ok_or_else(|| "User not found".to_string())?;
277 let root_cid_str: String =
278 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
279 .fetch_optional(&state.db)
280 .await
281 .map_err(|e| format!("DB error: {}", e))?
282 .ok_or_else(|| "Repo not found".to_string())?;
283 let current_root_cid = Cid::from_str(&root_cid_str)
284 .map_err(|_| "Invalid repo root CID".to_string())?;
285 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
286 let commit_bytes = tracking_store.get(¤t_root_cid).await
287 .map_err(|e| format!("Failed to fetch commit: {:?}", e))?
288 .ok_or_else(|| "Commit block not found".to_string())?;
289 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes)
290 .map_err(|e| format!("Failed to parse commit: {:?}", e))?;
291 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
292 let mut record_bytes = Vec::new();
293 serde_ipld_dagcbor::to_writer(&mut record_bytes, record)
294 .map_err(|e| format!("Failed to serialize record: {:?}", e))?;
295 let record_cid = tracking_store.put(&record_bytes).await
296 .map_err(|e| format!("Failed to save record block: {:?}", e))?;
297 let key = format!("{}/{}", collection, rkey);
298 let new_mst = mst.add(&key, record_cid).await
299 .map_err(|e| format!("Failed to add to MST: {:?}", e))?;
300 let new_mst_root = new_mst.persist().await
301 .map_err(|e| format!("Failed to persist MST: {:?}", e))?;
302 let op = RecordOp::Create {
303 collection: collection.to_string(),
304 rkey: rkey.to_string(),
305 cid: record_cid,
306 };
307 let mut relevant_blocks = std::collections::BTreeMap::new();
308 new_mst.blocks_for_path(&key, &mut relevant_blocks).await
309 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?;
310 mst.blocks_for_path(&key, &mut relevant_blocks).await
311 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?;
312 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
313 let mut written_cids = tracking_store.get_all_relevant_cids();
314 for cid in relevant_blocks.keys() {
315 if !written_cids.contains(cid) {
316 written_cids.push(*cid);
317 }
318 }
319 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
320 let result = commit_and_log(
321 state,
322 did,
323 user_id,
324 Some(current_root_cid),
325 Some(commit.data),
326 new_mst_root,
327 vec![op],
328 &written_cids_str,
329 ).await?;
330 let uri = format!("at://{}/{}/{}", did, collection, rkey);
331 Ok((uri, result.commit_cid))
332}
333use std::str::FromStr;
334pub async fn sequence_identity_event(
335 state: &AppState,
336 did: &str,
337 handle: Option<&str>,
338) -> Result<i64, String> {
339 let seq_row = sqlx::query!(
340 r#"
341 INSERT INTO repo_seq (did, event_type, handle)
342 VALUES ($1, 'identity', $2)
343 RETURNING seq
344 "#,
345 did,
346 handle,
347 )
348 .fetch_one(&state.db)
349 .await
350 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
351 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
352 .execute(&state.db)
353 .await
354 .map_err(|e| format!("DB Error (notify): {}", e))?;
355 Ok(seq_row.seq)
356}
357pub async fn sequence_account_event(
358 state: &AppState,
359 did: &str,
360 active: bool,
361 status: Option<&str>,
362) -> Result<i64, String> {
363 let seq_row = sqlx::query!(
364 r#"
365 INSERT INTO repo_seq (did, event_type, active, status)
366 VALUES ($1, 'account', $2, $3)
367 RETURNING seq
368 "#,
369 did,
370 active,
371 status,
372 )
373 .fetch_one(&state.db)
374 .await
375 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
376 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
377 .execute(&state.db)
378 .await
379 .map_err(|e| format!("DB Error (notify): {}", e))?;
380 Ok(seq_row.seq)
381}
382pub async fn sequence_sync_event(
383 state: &AppState,
384 did: &str,
385 commit_cid: &str,
386) -> Result<i64, String> {
387 let seq_row = sqlx::query!(
388 r#"
389 INSERT INTO repo_seq (did, event_type, commit_cid)
390 VALUES ($1, 'sync', $2)
391 RETURNING seq
392 "#,
393 did,
394 commit_cid,
395 )
396 .fetch_one(&state.db)
397 .await
398 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
399 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
400 .execute(&state.db)
401 .await
402 .map_err(|e| format!("DB Error (notify): {}", e))?;
403 Ok(seq_row.seq)
404}