···11//! PDS repository endpoints /xrpc/com.atproto.repo.*)
22+mod apply_writes;
33+pub(crate) use apply_writes::apply_writes;
44+25use std::{collections::HashSet, str::FromStr};
3647use anyhow::{Context as _, anyhow};
55-use atrium_api::com::atproto::repo::apply_writes::{self, InputWritesItem, OutputResultsItem};
88+use atrium_api::com::atproto::repo::apply_writes::{
99+ self as atrium_apply_writes, InputWritesItem, OutputResultsItem,
1010+};
611use atrium_api::{
712 com::atproto::repo::{self, defs::CommitMetaData},
813 types::{
···2530use serde::Deserialize;
2631use tokio::io::AsyncWriteExt as _;
27323333+use crate::repo::block_map::cid_for_cbor;
3434+use crate::repo::types::PreparedCreateOrUpdate;
2835use crate::{
2936 AppState, Db, Error, Result, SigningKey,
3037 actor_store::{ActorStore, ActorStoreReader, ActorStoreTransactor, ActorStoreWriter},
···111118 Ok((did.to_owned(), handle.to_owned()))
112119}
113120114114-/// Apply a batch transaction of repository creates, updates, and deletes. Requires auth, implemented by PDS.
115115-/// - POST /xrpc/com.atproto.repo.applyWrites
116116-/// ### Request Body
117117-/// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account).
118118-/// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data across all operations, 'true' to require it, or leave unset to validate only for known Lexicons.
119119-/// - `writes`: `object[]` // One of:
120120-/// - - com.atproto.repo.applyWrites.create
121121-/// - - com.atproto.repo.applyWrites.update
122122-/// - - com.atproto.repo.applyWrites.delete
123123-/// - `swap_commit`: `cid` // If provided, the entire operation will fail if the current repo commit CID does not match this value. Used to prevent conflicting repo mutations.
124124-async fn apply_writes(
125125- user: AuthenticatedUser,
126126- State(actor_store): State<ActorStore>,
127127- State(skey): State<SigningKey>,
128128- State(config): State<AppConfig>,
129129- State(db): State<Db>,
130130- State(fhp): State<FirehoseProducer>,
131131- Json(input): Json<repo::apply_writes::Input>,
132132-) -> Result<Json<repo::apply_writes::Output>> {
133133- // TODO: Implement validation when `input.validate` is set
134134-135135- // Ensure that we are updating the correct repository.
136136- todo!();
137137- // Convert ATProto writes to our internal format
138138- todo!();
139139- // Process the writes using the actor store
140140- todo!();
141141-142142- // Update metrics
143143- counter!(REPO_COMMITS).increment(1);
144144- todo!();
145145-146146- // Send commit to firehose
147147- todo!();
148148-149149- // Convert to API response format
150150- todo!();
151151-}
152152-153121/// Create a single new repository record. Requires auth, implemented by PDS.
154122/// - POST /xrpc/com.atproto.repo.createRecord
155123/// ### Request Body
···172140 State(fhp): State<FirehoseProducer>,
173141 Json(input): Json<repo::create_record::Input>,
174142) -> Result<Json<repo::create_record::Output>> {
175175- let write_result = apply_writes(
143143+ let write_result = apply_writes::apply_writes(
176144 user,
177145 State(actor_store),
178146 State(skey),
···265233 }
266234 .into();
267235268268- let write_result = apply_writes(
236236+ let write_result = apply_writes::apply_writes(
269237 user,
270238 State(actor_store),
271239 State(skey),
···329297330298 Ok(Json(
331299 repo::delete_record::OutputData {
332332- commit: apply_writes(
300300+ commit: apply_writes::apply_writes(
333301 user,
334302 State(actor_store),
335303 State(skey),
+483
src/endpoints/repo/apply_writes.rs
···11+//! Apply a batch transaction of repository creates, updates, and deletes. Requires auth, implemented by PDS.
22+use std::{collections::HashSet, str::FromStr};
33+44+use anyhow::{Context as _, anyhow};
55+use atrium_api::com::atproto::repo::apply_writes::{self, InputWritesItem, OutputResultsItem};
66+use atrium_api::{
77+ com::atproto::repo::{self, defs::CommitMetaData},
88+ types::{
99+ LimitedU32, Object, TryFromUnknown as _, TryIntoUnknown as _, Unknown,
1010+ string::{AtIdentifier, Nsid, Tid},
1111+ },
1212+};
1313+use atrium_repo::{Cid, blockstore::CarStore};
1414+use axum::{
1515+ Json, Router,
1616+ body::Body,
1717+ extract::{Query, Request, State},
1818+ http::{self, StatusCode},
1919+ routing::{get, post},
2020+};
2121+use constcat::concat;
2222+use futures::TryStreamExt as _;
2323+use metrics::counter;
2424+use rsky_syntax::aturi::AtUri;
2525+use serde::Deserialize;
2626+use tokio::io::AsyncWriteExt as _;
2727+2828+use crate::repo::block_map::cid_for_cbor;
2929+use crate::repo::types::PreparedCreateOrUpdate;
3030+use crate::{
3131+ AppState, Db, Error, Result, SigningKey,
3232+ actor_store::{ActorStore, ActorStoreReader, ActorStoreTransactor, ActorStoreWriter},
3333+ auth::AuthenticatedUser,
3434+ config::AppConfig,
3535+ error::ErrorMessage,
3636+ firehose::{self, FirehoseProducer, RepoOp},
3737+ metrics::{REPO_COMMITS, REPO_OP_CREATE, REPO_OP_DELETE, REPO_OP_UPDATE},
3838+ repo::types::{PreparedWrite, WriteOpAction},
3939+ storage,
4040+};
4141+4242+use super::resolve_did;
4343+4444+/// Apply a batch transaction of repository creates, updates, and deletes. Requires auth, implemented by PDS.
4545+/// - POST /xrpc/com.atproto.repo.applyWrites
4646+/// ### Request Body
4747+/// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account).
4848+/// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data across all operations, 'true' to require it, or leave unset to validate only for known Lexicons.
4949+/// - `writes`: `object[]` // One of:
5050+/// - - com.atproto.repo.applyWrites.create
5151+/// - - com.atproto.repo.applyWrites.update
5252+/// - - com.atproto.repo.applyWrites.delete
5353+/// - `swap_commit`: `cid` // If provided, the entire operation will fail if the current repo commit CID does not match this value. Used to prevent conflicting repo mutations.
5454+pub(crate) async fn apply_writes(
5555+ user: AuthenticatedUser,
5656+ State(skey): State<SigningKey>,
5757+ State(config): State<AppConfig>,
5858+ State(db): State<Db>,
5959+ State(fhp): State<FirehoseProducer>,
6060+ Json(input): Json<repo::apply_writes::Input>,
6161+) -> Result<Json<repo::apply_writes::Output>> {
6262+ // TODO: `input.validate`
6363+6464+ // Resolve DID from identifier
6565+ let (target_did, _) = resolve_did(&db, &input.repo)
6666+ .await
6767+ .context("failed to resolve did")?;
6868+6969+ // Ensure that we are updating the correct repository
7070+ if target_did.as_str() != user.did() {
7171+ return Err(Error::with_status(
7272+ StatusCode::BAD_REQUEST,
7373+ anyhow!("repo did not match the authenticated user"),
7474+ ));
7575+ }
7676+7777+ // Validate writes count
7878+ if input.writes.len() > 200 {
7979+ return Err(Error::with_status(
8080+ StatusCode::BAD_REQUEST,
8181+ anyhow!("Too many writes. Max: 200"),
8282+ ));
8383+ }
8484+8585+ // Convert input writes to prepared format
8686+ let mut prepared_writes = Vec::with_capacity(input.writes.len());
8787+ for write in input.writes.iter() {
8888+ match write {
8989+ InputWritesItem::Create(create) => {
9090+ let uri = AtUri::make(
9191+ user.did(),
9292+ &create.collection.as_str(),
9393+ create
9494+ .rkey
9595+ .as_deref()
9696+ .unwrap_or(&Tid::now(LimitedU32::MIN).to_string()),
9797+ );
9898+9999+ let cid = match cid_for_cbor(&create.value) {
100100+ Ok(cid) => cid,
101101+ Err(e) => {
102102+ return Err(Error::with_status(
103103+ StatusCode::BAD_REQUEST,
104104+ anyhow!("Failed to encode record: {}", e),
105105+ ));
106106+ }
107107+ };
108108+109109+ let blobs = scan_blobs(&create.value)
110110+ .unwrap_or_default()
111111+ .into_iter()
112112+ .map(|cid| {
113113+ // TODO: Create BlobRef from cid with proper metadata
114114+ BlobRef {
115115+ cid,
116116+ mime_type: "application/octet-stream".to_string(), // Default
117117+ size: 0, // Unknown at this point
118118+ }
119119+ })
120120+ .collect();
121121+122122+ prepared_writes.push(PreparedCreateOrUpdate {
123123+ action: WriteOpAction::Create,
124124+ uri: uri?.to_string(),
125125+ cid,
126126+ record: create.value.clone(),
127127+ blobs,
128128+ swap_cid: None,
129129+ });
130130+ }
131131+ InputWritesItem::Update(update) => {
132132+ let uri = AtUri::make(
133133+ user.did(),
134134+ Some(update.collection.to_string()),
135135+ Some(update.rkey.to_string()),
136136+ );
137137+138138+ let cid = match cid_for_cbor(&update.value) {
139139+ Ok(cid) => cid,
140140+ Err(e) => {
141141+ return Err(Error::with_status(
142142+ StatusCode::BAD_REQUEST,
143143+ anyhow!("Failed to encode record: {}", e),
144144+ ));
145145+ }
146146+ };
147147+148148+ let blobs = scan_blobs(&update.value)
149149+ .unwrap_or_default()
150150+ .into_iter()
151151+ .map(|cid| {
152152+ // TODO: Create BlobRef from cid with proper metadata
153153+ BlobRef {
154154+ cid,
155155+ mime_type: "application/octet-stream".to_string(),
156156+ size: 0,
157157+ }
158158+ })
159159+ .collect();
160160+161161+ prepared_writes.push(PreparedCreateOrUpdate {
162162+ action: WriteOpAction::Update,
163163+ uri: uri?.to_string(),
164164+ cid,
165165+ record: update.value.clone(),
166166+ blobs,
167167+ swap_cid: None,
168168+ });
169169+ }
170170+ InputWritesItem::Delete(delete) => {
171171+ let uri = AtUri::make(user.did(), &delete.collection.as_str(), &delete.rkey);
172172+173173+ prepared_writes.push(PreparedCreateOrUpdate {
174174+ action: WriteOpAction::Delete,
175175+ uri: uri?.to_string(),
176176+ cid: Cid::default(), // Not needed for delete
177177+ record: serde_json::Value::Null,
178178+ blobs: vec![],
179179+ swap_cid: None,
180180+ });
181181+ }
182182+ }
183183+ }
184184+185185+ // Get swap commit CID if provided
186186+ let swap_commit_cid = input.swap_commit.as_ref().map(|cid| *cid.as_ref());
187187+188188+ let did_str = user.did();
189189+ let mut repo = storage::open_repo_db(&config.repo, &db, did_str)
190190+ .await
191191+ .context("failed to open user repo")?;
192192+ let orig_cid = repo.root();
193193+ let orig_rev = repo.commit().rev();
194194+195195+ let mut blobs = vec![];
196196+ let mut res = vec![];
197197+ let mut ops = vec![];
198198+199199+ for write in &prepared_writes {
200200+ let (builder, key) = match write.action {
201201+ WriteOpAction::Create => {
202202+ let key = format!("{}/{}", write.uri.collection, write.uri.rkey);
203203+ let uri = format!("at://{}/{}", user.did(), key);
204204+205205+ let (builder, cid) = repo
206206+ .add_raw(&key, &write.record)
207207+ .await
208208+ .context("failed to add record")?;
209209+210210+ // Extract and track blobs
211211+ if let Ok(new_blobs) = scan_blobs(&write.record) {
212212+ blobs.extend(
213213+ new_blobs
214214+ .into_iter()
215215+ .map(|blob_cid| (key.clone(), blob_cid)),
216216+ );
217217+ }
218218+219219+ ops.push(RepoOp::Create {
220220+ cid,
221221+ path: key.clone(),
222222+ });
223223+224224+ res.push(OutputResultsItem::CreateResult(Box::new(
225225+ apply_writes::CreateResultData {
226226+ cid: atrium_api::types::string::Cid::new(cid),
227227+ uri,
228228+ validation_status: None,
229229+ }
230230+ .into(),
231231+ )));
232232+233233+ (builder, key)
234234+ }
235235+ WriteOpAction::Update => {
236236+ let key = format!("{}/{}", write.uri.collection, write.uri.rkey);
237237+ let uri = format!("at://{}/{}", user.did(), key);
238238+239239+ let prev = repo
240240+ .tree()
241241+ .get(&key)
242242+ .await
243243+ .context("failed to search MST")?;
244244+245245+ if prev.is_none() {
246246+ // No existing record, treat as create
247247+ let (create_builder, cid) = repo
248248+ .add_raw(&key, &write.record)
249249+ .await
250250+ .context("failed to add record")?;
251251+252252+ if let Ok(new_blobs) = scan_blobs(&write.record) {
253253+ blobs.extend(
254254+ new_blobs
255255+ .into_iter()
256256+ .map(|blob_cid| (key.clone(), blob_cid)),
257257+ );
258258+ }
259259+260260+ ops.push(RepoOp::Create {
261261+ cid,
262262+ path: key.clone(),
263263+ });
264264+265265+ res.push(OutputResultsItem::CreateResult(Box::new(
266266+ apply_writes::CreateResultData {
267267+ cid: atrium_api::types::string::Cid::new(cid),
268268+ uri,
269269+ validation_status: None,
270270+ }
271271+ .into(),
272272+ )));
273273+274274+ (create_builder, key)
275275+ } else {
276276+ // Update existing record
277277+ let prev = prev.context("should be able to find previous record")?;
278278+ let (update_builder, cid) = repo
279279+ .update_raw(&key, &write.record)
280280+ .await
281281+ .context("failed to add record")?;
282282+283283+ if let Ok(new_blobs) = scan_blobs(&write.record) {
284284+ blobs.extend(
285285+ new_blobs
286286+ .into_iter()
287287+ .map(|blob_cid| (key.clone(), blob_cid)),
288288+ );
289289+ }
290290+291291+ ops.push(RepoOp::Update {
292292+ cid,
293293+ path: key.clone(),
294294+ prev,
295295+ });
296296+297297+ res.push(OutputResultsItem::UpdateResult(Box::new(
298298+ apply_writes::UpdateResultData {
299299+ cid: atrium_api::types::string::Cid::new(cid),
300300+ uri,
301301+ validation_status: None,
302302+ }
303303+ .into(),
304304+ )));
305305+306306+ (update_builder, key)
307307+ }
308308+ }
309309+ WriteOpAction::Delete => {
310310+ let key = format!("{}/{}", write.uri.collection, write.uri.rkey);
311311+312312+ let prev = repo
313313+ .tree()
314314+ .get(&key)
315315+ .await
316316+ .context("failed to search MST")?
317317+ .context("previous record does not exist")?;
318318+319319+ ops.push(RepoOp::Delete {
320320+ path: key.clone(),
321321+ prev,
322322+ });
323323+324324+ res.push(OutputResultsItem::DeleteResult(Box::new(
325325+ apply_writes::DeleteResultData {}.into(),
326326+ )));
327327+328328+ let builder = repo
329329+ .delete_raw(&key)
330330+ .await
331331+ .context("failed to add record")?;
332332+333333+ (builder, key)
334334+ }
335335+ };
336336+337337+ let sig = skey
338338+ .sign(&builder.bytes())
339339+ .context("failed to sign commit")?;
340340+341341+ _ = builder
342342+ .finalize(sig)
343343+ .await
344344+ .context("failed to write signed commit")?;
345345+ }
346346+347347+ // Construct a firehose record
348348+ let mut mem = Vec::new();
349349+ let mut store = CarStore::create_with_roots(std::io::Cursor::new(&mut mem), [repo.root()])
350350+ .await
351351+ .context("failed to create temp store")?;
352352+353353+ // Extract the records out of the user's repository
354354+ for write in &prepared_writes {
355355+ let key = format!("{}/{}", write.uri.collection, write.uri.rkey);
356356+ repo.extract_raw_into(&key, &mut store)
357357+ .await
358358+ .context("failed to extract key")?;
359359+ }
360360+361361+ let mut tx = db.begin().await.context("failed to begin transaction")?;
362362+363363+ if !swap_commit(
364364+ &mut *tx,
365365+ repo.root(),
366366+ repo.commit().rev(),
367367+ input.swap_commit.as_ref().map(|cid| *cid.as_ref()),
368368+ &user.did(),
369369+ )
370370+ .await
371371+ .context("failed to swap commit")?
372372+ {
373373+ // This should always succeed.
374374+ let old = input
375375+ .swap_commit
376376+ .clone()
377377+ .context("swap_commit should always be Some")?;
378378+379379+ // The swap failed. Return the old commit and do not update the repository.
380380+ return Ok(Json(
381381+ apply_writes::OutputData {
382382+ results: None,
383383+ commit: Some(
384384+ CommitMetaData {
385385+ cid: old,
386386+ rev: orig_rev,
387387+ }
388388+ .into(),
389389+ ),
390390+ }
391391+ .into(),
392392+ ));
393393+ }
394394+395395+ // For updates and removals, unlink the old/deleted record from the blob_ref table
396396+ for op in &ops {
397397+ match op {
398398+ &RepoOp::Update { ref path, .. } | &RepoOp::Delete { ref path, .. } => {
399399+ // FIXME: This may cause issues if a user deletes more than one record referencing the same blob.
400400+ _ = &sqlx::query!(
401401+ r#"UPDATE blob_ref SET record = NULL WHERE did = ? AND record = ?"#,
402402+ did_str,
403403+ path
404404+ )
405405+ .execute(&mut *tx)
406406+ .await
407407+ .context("failed to remove blob_ref")?;
408408+ }
409409+ &RepoOp::Create { .. } => {}
410410+ }
411411+ }
412412+413413+ // Process blobs
414414+ for (key, cid) in &blobs {
415415+ let cid_str = cid.to_string();
416416+417417+ // Handle the case where a new record references an existing blob
418418+ if sqlx::query!(
419419+ r#"UPDATE blob_ref SET record = ? WHERE cid = ? AND did = ? AND record IS NULL"#,
420420+ key,
421421+ cid_str,
422422+ did_str,
423423+ )
424424+ .execute(&mut *tx)
425425+ .await
426426+ .context("failed to update blob_ref")?
427427+ .rows_affected()
428428+ == 0
429429+ {
430430+ _ = sqlx::query!(
431431+ r#"INSERT INTO blob_ref (record, cid, did) VALUES (?, ?, ?)"#,
432432+ key,
433433+ cid_str,
434434+ did_str,
435435+ )
436436+ .execute(&mut *tx)
437437+ .await
438438+ .context("failed to update blob_ref")?;
439439+ }
440440+ }
441441+442442+ tx.commit()
443443+ .await
444444+ .context("failed to commit blob ref to database")?;
445445+446446+ // Update counters
447447+ counter!(REPO_COMMITS).increment(1);
448448+ for op in &ops {
449449+ match *op {
450450+ RepoOp::Create { .. } => counter!(REPO_OP_CREATE).increment(1),
451451+ RepoOp::Update { .. } => counter!(REPO_OP_UPDATE).increment(1),
452452+ RepoOp::Delete { .. } => counter!(REPO_OP_DELETE).increment(1),
453453+ }
454454+ }
455455+456456+ // We've committed the transaction to the database, and the commit is now stored in the user's
457457+ // canonical repository.
458458+ // We can now broadcast this on the firehose.
459459+ fhp.commit(firehose::Commit {
460460+ car: mem,
461461+ ops,
462462+ cid: repo.root(),
463463+ rev: repo.commit().rev().to_string(),
464464+ did: atrium_api::types::string::Did::new(user.did()).expect("should be valid DID"),
465465+ pcid: Some(orig_cid),
466466+ blobs: blobs.into_iter().map(|(_, cid)| cid).collect::<Vec<_>>(),
467467+ })
468468+ .await;
469469+470470+ Ok(Json(
471471+ apply_writes::OutputData {
472472+ results: Some(res),
473473+ commit: Some(
474474+ CommitMetaData {
475475+ cid: atrium_api::types::string::Cid::new(repo.root()),
476476+ rev: repo.commit().rev(),
477477+ }
478478+ .into(),
479479+ ),
480480+ }
481481+ .into(),
482482+ ))
483483+}