···15/// If specified in an API endpoint, this will guarantee that the API can only be called
16/// by an authenticated user.
17pub(crate) struct AuthenticatedUser {
018 did: String,
19}
20···40 auth.strip_prefix("Bearer ")
41 });
4243- let token = match token {
44- Some(tok) => tok,
45- None => {
46- return Err(Error::with_status(
47- StatusCode::UNAUTHORIZED,
48- anyhow!("no bearer token"),
49- ));
50- }
51 };
5253 // N.B: We ignore all fields inside of the token up until this point because they can be
···113pub(crate) fn sign(
114 key: &Secp256k1Keypair,
115 typ: &str,
116- claims: serde_json::Value,
117) -> anyhow::Result<String> {
118 // RFC 9068
119 let hdr = serde_json::json!({
···15/// If specified in an API endpoint, this will guarantee that the API can only be called
16/// by an authenticated user.
17pub(crate) struct AuthenticatedUser {
18+ /// The DID of the authenticated user.
19 did: String,
20}
21···41 auth.strip_prefix("Bearer ")
42 });
4344+ let Some(token) = token else {
45+ return Err(Error::with_status(
46+ StatusCode::UNAUTHORIZED,
47+ anyhow!("no bearer token"),
48+ ));
00049 };
5051 // N.B: We ignore all fields inside of the token up until this point because they can be
···111pub(crate) fn sign(
112 key: &Secp256k1Keypair,
113 typ: &str,
114+ claims: &serde_json::Value,
115) -> anyhow::Result<String> {
116 // RFC 9068
117 let hdr = serde_json::json!({
+1-1
src/config.rs
···1//! Configuration structures for the PDS.
2/// The metrics configuration.
3pub(crate) mod metrics {
4- use super::*;
56 #[derive(Deserialize, Debug, Clone)]
7 /// The Prometheus configuration.
···1//! Configuration structures for the PDS.
2/// The metrics configuration.
3pub(crate) mod metrics {
4+ use super::{Deserialize, Url};
56 #[derive(Deserialize, Debug, Clone)]
7 /// The Prometheus configuration.
···1+//! PDS repository endpoints /xrpc/com.atproto.repo.*)
2use std::{collections::HashSet, str::FromStr as _};
34use anyhow::{Context as _, anyhow};
···39/// SHA2-256 mulithash
40const IPLD_MH_SHA2_256: u64 = 0x12;
4142+/// Used in [`scan_blobs`] to identify a blob.
43#[derive(Deserialize, Debug, Clone)]
44struct BlobRef {
45+ /// `BlobRef` link. Include `$` when serializing to JSON, since `$` isn't allowed in struct names.
46 #[serde(rename = "$link")]
47 link: String,
48}
4950#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
51#[serde(rename_all = "camelCase")]
52+/// Parameters for [`list_records`].
53pub(super) struct ListRecordsParameters {
54 ///The NSID of the record type.
55 pub collection: Nsid,
56+ /// The cursor to start from.
57 #[serde(skip_serializing_if = "core::option::Option::is_none")]
58 pub cursor: Option<String>,
59 ///The number of records to return.
···113 }
114}
115116+/// Resolves DID to DID document. Does not bi-directionally verify handle.
117+/// - GET /xrpc/com.atproto.repo.resolveDid
118+/// ### Query Parameters
119+/// - `did`: DID to resolve.
120+/// ### Responses
121+/// - 200 OK: {`did_doc`: `did_doc`}
122+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `DidNotFound`, `DidDeactivated`]}
123async fn resolve_did(
124 db: &Db,
125 identifier: &AtIdentifier,
···162 Ok((did.to_owned(), handle.to_owned()))
163}
164165+/// Used in [`apply_writes`] to scan for blobs in the JSON object and return their CIDs.
166fn scan_blobs(unknown: &Unknown) -> anyhow::Result<Vec<Cid>> {
167 // { "$type": "blob", "ref": { "$link": "bafyrei..." } }
168···173 ];
174 while let Some(value) = stack.pop() {
175 match value {
176+ serde_json::Value::Bool(_)
177+ | serde_json::Value::Null
178+ | serde_json::Value::Number(_)
179+ | serde_json::Value::String(_) => (),
180 serde_json::Value::Array(values) => stack.extend(values.into_iter()),
181 serde_json::Value::Object(map) => {
182 if let (Some(blob_type), Some(blob_ref)) = (map.get("$type"), map.get("ref")) {
···209 }
210 });
211212+ let blob = scan_blobs(&json.try_into_unknown().expect("should be valid JSON"))
213+ .expect("should be able to scan blobs");
214 assert_eq!(
215 blob,
216+ vec![
217+ Cid::from_str("bafkreifzxf2wa6dyakzbdaxkz2wkvfrv3hiuafhxewbn5wahcw6eh3hzji")
218+ .expect("should be valid CID")
219+ ]
220 );
221}
222223+#[expect(clippy::too_many_lines)]
224+/// Apply a batch transaction of repository creates, updates, and deletes. Requires auth, implemented by PDS.
225+/// - POST /xrpc/com.atproto.repo.applyWrites
226+/// ### Request Body
227+/// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account).
228+/// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data across all operations, 'true' to require it, or leave unset to validate only for known Lexicons.
229+/// - `writes`: `object[]` // One of:
230+/// - - com.atproto.repo.applyWrites.create
231+/// - - com.atproto.repo.applyWrites.update
232+/// - - com.atproto.repo.applyWrites.delete
233+/// - `swap_commit`: `cid` // If provided, the entire operation will fail if the current repo commit CID does not match this value. Used to prevent conflicting repo mutations.
234async fn apply_writes(
235 user: AuthenticatedUser,
236 State(skey): State<SigningKey>,
···284 blobs.extend(
285 new_blobs
286 .into_iter()
287+ .map(|blob_cid| (key.clone(), blob_cid)),
288 );
289 }
290···323 blobs.extend(
324 new_blobs
325 .into_iter()
326+ .map(|blod_cid| (key.clone(), blod_cid)),
327 );
328 }
329 ops.push(RepoOp::Create {
···349 blobs.extend(
350 new_blobs
351 .into_iter()
352+ .map(|blod_cid| (key.clone(), blod_cid)),
353 );
354 }
355 ops.push(RepoOp::Update {
···472 .await
473 .context("failed to remove blob_ref")?;
474 }
475+ &RepoOp::Create { .. } => {}
476 }
477 }
4780479 for &mut (ref key, cid) in &mut blobs {
480 let cid_str = cid.to_string();
481···546 ))
547}
548549+/// Create a single new repository record. Requires auth, implemented by PDS.
550+/// - POST /xrpc/com.atproto.repo.createRecord
551+/// ### Request Body
552+/// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account).
553+/// - `collection`: `nsid` // The NSID of the record collection.
554+/// - `rkey`: `string` // The record key. <= 512 characters.
555+/// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data, 'true' to require it, or leave unset to validate only for known Lexicons.
556+/// - `record`
557+/// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID.
558+/// ### Responses
559+/// - 200 OK: {`cid`: `cid`, `uri`: `at-uri`, `commit`: {`cid`: `cid`, `rev`: `tid`}, `validation_status`: [`valid`, `unknown`]}
560+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidSwap`]}
561+/// - 401 Unauthorized
562async fn create_record(
563 user: AuthenticatedUser,
564 State(skey): State<SigningKey>,
···575 State(fhp),
576 Json(
577 repo::apply_writes::InputData {
578+ repo: input.repo.clone(),
579+ validate: input.validate,
580+ swap_commit: input.swap_commit.clone(),
581 writes: vec![repo::apply_writes::InputWritesItem::Create(Box::new(
582 repo::apply_writes::CreateData {
583+ collection: input.collection.clone(),
584+ rkey: input.rkey.clone(),
585+ value: input.record.clone(),
586 }
587 .into(),
588 ))],
···596 let create_result = if let repo::apply_writes::OutputResultsItem::CreateResult(create_result) =
597 write_result
598 .results
599+ .clone()
600 .and_then(|result| result.first().cloned())
601 .context("unexpected output from apply_writes")?
602 {
···608609 Ok(Json(
610 repo::create_record::OutputData {
611+ cid: create_result.cid.clone(),
612+ commit: write_result.commit.clone(),
613+ uri: create_result.uri.clone(),
614 validation_status: Some("unknown".to_owned()),
615 }
616 .into(),
617 ))
618}
619620+/// Write a repository record, creating or updating it as needed. Requires auth, implemented by PDS.
621+/// - POST /xrpc/com.atproto.repo.putRecord
622+/// ### Request Body
623+/// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account).
624+/// - `collection`: `nsid` // The NSID of the record collection.
625+/// - `rkey`: `string` // The record key. <= 512 characters.
626+/// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data, 'true' to require it, or leave unset to validate only for known Lexicons.
627+/// - `record`
628+/// - `swap_record`: `boolean` // Compare and swap with the previous record by CID. WARNING: nullable and optional field; may cause problems with golang implementation
629+/// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID.
630+/// ### Responses
631+/// - 200 OK: {"uri": "string","cid": "string","commit": {"cid": "string","rev": "string"},"validationStatus": "valid | unknown"}
632+/// - 400 Bad Request: {error:"`InvalidRequest` | `ExpiredToken` | `InvalidToken` | `InvalidSwap`"}
633+/// - 401 Unauthorized
634async fn put_record(
635 user: AuthenticatedUser,
636 State(skey): State<SigningKey>,
···649 State(fhp),
650 Json(
651 repo::apply_writes::InputData {
652+ repo: input.repo.clone(),
653 validate: input.validate,
654+ swap_commit: input.swap_commit.clone(),
655 writes: vec![repo::apply_writes::InputWritesItem::Update(Box::new(
656 repo::apply_writes::UpdateData {
657+ collection: input.collection.clone(),
658+ rkey: input.rkey.clone(),
659+ value: input.record.clone(),
660 }
661 .into(),
662 ))],
···669670 let update_result = write_result
671 .results
672+ .clone()
673 .and_then(|result| result.first().cloned())
674 .context("unexpected output from apply_writes")?;
675 let (cid, uri) = match update_result {
676 repo::apply_writes::OutputResultsItem::CreateResult(create_result) => (
677+ Some(create_result.cid.clone()),
678+ Some(create_result.uri.clone()),
679 ),
680 repo::apply_writes::OutputResultsItem::UpdateResult(update_result) => (
681+ Some(update_result.cid.clone()),
682+ Some(update_result.uri.clone()),
683 ),
684+ repo::apply_writes::OutputResultsItem::DeleteResult(_) => (None, None),
685 };
686 Ok(Json(
687 repo::put_record::OutputData {
688 cid: cid.context("missing cid")?,
689+ commit: write_result.commit.clone(),
690 uri: uri.context("missing uri")?,
691 validation_status: Some("unknown".to_owned()),
692 }
···694 ))
695}
696697+/// Delete a repository record, or ensure it doesn't exist. Requires auth, implemented by PDS.
698+/// - POST /xrpc/com.atproto.repo.deleteRecord
699+/// ### Request Body
700+/// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account).
701+/// - `collection`: `nsid` // The NSID of the record collection.
702+/// - `rkey`: `string` // The record key. <= 512 characters.
703+/// - `swap_record`: `boolean` // Compare and swap with the previous record by CID.
704+/// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID.
705+/// ### Responses
706+/// - 200 OK: {"commit": {"cid": "string","rev": "string"}}
707+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidSwap`]}
708+/// - 401 Unauthorized
709async fn delete_record(
710 user: AuthenticatedUser,
711 State(skey): State<SigningKey>,
···726 State(fhp),
727 Json(
728 repo::apply_writes::InputData {
729+ repo: input.repo.clone(),
730+ swap_commit: input.swap_commit.clone(),
731 validate: None,
732 writes: vec![repo::apply_writes::InputWritesItem::Delete(Box::new(
733 repo::apply_writes::DeleteData {
734+ collection: input.collection.clone(),
735+ rkey: input.rkey.clone(),
736 }
737 .into(),
738 ))],
···743 .await
744 .context("failed to apply writes")?
745 .commit
746+ .clone(),
747 }
748 .into(),
749 ))
750}
751752+/// Get information about an account and repository, including the list of collections. Does not require auth.
753+/// - GET /xrpc/com.atproto.repo.describeRepo
754+/// ### Query Parameters
755+/// - `repo`: `at-identifier` // The handle or DID of the repo.
756+/// ### Responses
757+/// - 200 OK: {"handle": "string","did": "string","didDoc": {},"collections": [string],"handleIsCorrect": true} \
758+/// handeIsCorrect - boolean - Indicates if handle is currently valid (resolves bi-directionally)
759+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]}
760+/// - 401 Unauthorized
761async fn describe_repo(
762 State(config): State<AppConfig>,
763 State(db): State<Db>,
···797 ))
798}
799800+/// Get a single record from a repository. Does not require auth.
801+/// - GET /xrpc/com.atproto.repo.getRecord
802+/// ### Query Parameters
803+/// - `repo`: `at-identifier` // The handle or DID of the repo.
804+/// - `collection`: `nsid` // The NSID of the record collection.
805+/// - `rkey`: `string` // The record key. <= 512 characters.
806+/// - `cid`: `cid` // The CID of the version of the record. If not specified, then return the most recent version.
807+/// ### Responses
808+/// - 200 OK: {"uri": "string","cid": "string","value": {}}
809+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `RecordNotFound`]}
810+/// - 401 Unauthorized
811async fn get_record(
812 State(config): State<AppConfig>,
813 State(db): State<Db>,
···845 Err(Error::with_message(
846 StatusCode::BAD_REQUEST,
847 anyhow!("could not find the requested record at {}", uri),
848+ ErrorMessage::new("RecordNotFound", format!("Could not locate record: {uri}")),
000849 ))
850 },
851 |record_value| {
852 Ok(Json(
853 repo::get_record::OutputData {
854 cid: cid.map(atrium_api::types::string::Cid::new),
855+ uri: uri.clone(),
856 value: record_value
857 .try_into_unknown()
858 .context("should be valid JSON")?,
···863 )
864}
865866+/// List a range of records in a repository, matching a specific collection. Does not require auth.
867+/// - GET /xrpc/com.atproto.repo.listRecords
868+/// ### Query Parameters
869+/// - `repo`: `at-identifier` // The handle or DID of the repo.
870+/// - `collection`: `nsid` // The NSID of the record type.
871+/// - `limit`: `integer` // The maximum number of records to return. Default 50, >=1 and <=100.
872+/// - `cursor`: `string`
873+/// - `reverse`: `boolean` // Flag to reverse the order of the returned records.
874+/// ### Responses
875+/// - 200 OK: {"cursor": "string","records": [{"uri": "string","cid": "string","value": {}}]}
876+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]}
877+/// - 401 Unauthorized
878async fn list_records(
879 State(config): State<AppConfig>,
880 State(db): State<Db>,
···924 value: value.try_into_unknown().context("should be valid JSON")?,
925 }
926 .into(),
927+ );
928 }
929930 #[expect(clippy::pattern_type_mismatch)]
···937 ))
938}
939940+/// Upload a new blob, to be referenced from a repository record. \
941+/// The blob will be deleted if it is not referenced within a time window (eg, minutes). \
942+/// Blob restrictions (mimetype, size, etc) are enforced when the reference is created. \
943+/// Requires auth, implemented by PDS.
944+/// - POST /xrpc/com.atproto.repo.uploadBlob
945+/// ### Request Body
946+/// ### Responses
947+/// - 200 OK: {"blob": "binary"}
948+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]}
949+/// - 401 Unauthorized
950async fn upload_blob(
951 user: AuthenticatedUser,
952 State(config): State<AppConfig>,
···10231024 let cid_str = cid.to_string();
10251026+ tokio::fs::rename(&filename, config.blob.path.join(format!("{cid_str}.blob")))
1027+ .await
1028+ .context("failed to finalize blob")?;
00010291030 let did_str = user.did();
1031···1052 ))
1053}
10541055+/// These endpoints are part of the atproto PDS repository management APIs. \
1056+/// Requests usually require authentication (unlike the com.atproto.sync.* endpoints), and are made directly to the user's own PDS instance.
1057+/// ### Routes
1058+/// - AP /xrpc/com.atproto.repo.applyWrites -> [`apply_writes`]
1059+/// - AP /xrpc/com.atproto.repo.createRecord -> [`create_record`]
1060+/// - AP /xrpc/com.atproto.repo.putRecord -> [`put_record`]
1061+/// - AP /xrpc/com.atproto.repo.deleteRecord -> [`delete_record`]
1062+/// - AP /xrpc/com.atproto.repo.uploadBlob -> [`upload_blob`]
1063+/// - UG /xrpc/com.atproto.repo.describeRepo -> [`describe_repo`]
1064+/// - UG /xrpc/com.atproto.repo.getRecord -> [`get_record`]
1065+/// - UG /xrpc/com.atproto.repo.listRecords -> [`list_records`]
1066pub(super) fn routes() -> Router<AppState> {
000000001067 Router::new()
1068+ .route(concat!("/", repo::apply_writes::NSID), post(apply_writes))
1069 .route(concat!("/", repo::create_record::NSID), post(create_record))
1070+ .route(concat!("/", repo::put_record::NSID), post(put_record))
1071 .route(concat!("/", repo::delete_record::NSID), post(delete_record))
1072+ .route(concat!("/", repo::upload_blob::NSID), post(upload_blob))
1073 .route(concat!("/", repo::describe_repo::NSID), get(describe_repo))
1074+ .route(concat!("/", repo::get_record::NSID), get(get_record))
1075+ .route(concat!("/", repo::list_records::NSID), get(list_records))
1076}
+105-38
src/endpoints/server.rs
···01use std::{collections::HashMap, str::FromStr as _};
23use anyhow::{Context as _, anyhow};
···37/// This is a dummy password that can be used in absence of a real password.
38const DUMMY_PASSWORD: &str = "$argon2id$v=19$m=19456,t=2,p=1$En2LAfHjeO0SZD5IUU1Abg$RpS8nHhhqY4qco2uyd41p9Y/1C+Lvi214MAWukzKQMI";
3900000000040async fn create_invite_code(
41 _user: AuthenticatedUser,
42 State(db): State<Db>,
···70 ))
71}
7200000000000000000073async fn create_account(
74 State(db): State<Db>,
75 State(skey): State<SigningKey>,
···164 prev: None,
165 },
166 )
167- .await
168 .context("failed to sign genesis op")?;
169 let op_bytes = serde_ipld_dagcbor::to_vec(&op).context("failed to encode genesis op")?;
170···179 #[expect(clippy::string_slice, reason = "digest length confirmed")]
180 digest[..24].to_owned()
181 };
182- let did = format!("did:plc:{}", did_hash);
183184- let doc = tokio::fs::File::create(config.plc.path.join(format!("{}.car", did_hash)))
185 .await
186 .context("failed to create did doc")?;
187···205 // Write out an initial commit for the user.
206 // https://atproto.com/guides/account-lifecycle
207 let (cid, rev, store) = async {
208- let file = tokio::fs::File::create_new(config.repo.path.join(format!("{}.car", did_hash)))
209 .await
210 .context("failed to create repo file")?;
211 let mut store = CarStore::create(file)
···316 let token = auth::sign(
317 &skey,
318 "at+jwt",
319- serde_json::json!({
320 "scope": "com.atproto.access",
321 "sub": did,
322 "iat": chrono::Utc::now().timestamp(),
···329 let refresh_token = auth::sign(
330 &skey,
331 "refresh+jwt",
332- serde_json::json!({
333 "scope": "com.atproto.refresh",
334 "sub": did,
335 "iat": chrono::Utc::now().timestamp(),
···351 ))
352}
35300000000000354async fn create_session(
355 State(db): State<Db>,
356 State(skey): State<SigningKey>,
···363 // TODO: `input.allow_takedown`
364 // TODO: `input.auth_factor_token`
365366- let account = if let Some(account) = sqlx::query!(
367 r#"
368- WITH LatestHandles AS (
369- SELECT did, handle
370- FROM handles
371- WHERE (did, created_at) IN (
372- SELECT did, MAX(created_at) AS max_created_at
373- FROM handles
374- GROUP BY did
375- )
376- )
377- SELECT a.did, a.password, h.handle
378- FROM accounts a
379- LEFT JOIN LatestHandles h ON a.did = h.did
380- WHERE h.handle = ?
381- "#,
382 handle
383 )
384 .fetch_optional(&db)
385 .await
386 .context("failed to authenticate")?
387- {
388- account
389- } else {
390 counter!(AUTH_FAILED).increment(1);
391392 // SEC: Call argon2's `verify_password` to simulate password verification and discard the result.
···407 password.as_bytes(),
408 &PasswordHash::new(account.password.as_str()).context("invalid password hash in db")?,
409 ) {
410- Ok(_) => {}
411 Err(_e) => {
412 counter!(AUTH_FAILED).increment(1);
413···423 let token = auth::sign(
424 &skey,
425 "at+jwt",
426- serde_json::json!({
427 "scope": "com.atproto.access",
428 "sub": did,
429 "iat": chrono::Utc::now().timestamp(),
···436 let refresh_token = auth::sign(
437 &skey,
438 "refresh+jwt",
439- serde_json::json!({
440 "scope": "com.atproto.refresh",
441 "sub": did,
442 "iat": chrono::Utc::now().timestamp(),
···464 ))
465}
466000000467async fn refresh_session(
468 State(db): State<Db>,
469 State(skey): State<SigningKey>,
···490 }
491 if claims
492 .get("exp")
493- .and_then(|exp| exp.as_i64())
494 .context("failed to get `exp`")?
495 < chrono::Utc::now().timestamp()
496 {
···534 let token = auth::sign(
535 &skey,
536 "at+jwt",
537- serde_json::json!({
538 "scope": "com.atproto.access",
539 "sub": did,
540 "iat": chrono::Utc::now().timestamp(),
···547 let refresh_token = auth::sign(
548 &skey,
549 "refresh+jwt",
550- serde_json::json!({
551 "scope": "com.atproto.refresh",
552 "sub": did,
553 "iat": chrono::Utc::now().timestamp(),
···575 ))
576}
5770000000000578async fn get_service_auth(
579 user: AuthenticatedUser,
580 State(skey): State<SigningKey>,
···608 }
609610 // Mint a bearer token by signing a JSON web token.
611- let token = auth::sign(&skey, "JWT", claims).context("failed to sign jwt")?;
612613 Ok(Json(server::get_service_auth::OutputData { token }.into()))
614}
615000000616async fn get_session(
617 user: AuthenticatedUser,
618 State(db): State<Db>,
···661 }
662}
663000000664async fn describe_server(
665 State(config): State<AppConfig>,
666) -> Result<Json<server::describe_server::Output>> {
···679}
680681#[rustfmt::skip]
0000000000682pub(super) fn routes() -> Router<AppState> {
683- // UG /xrpc/com.atproto.server.describeServer
684- // UP /xrpc/com.atproto.server.createAccount
685- // UP /xrpc/com.atproto.server.createSession
686- // AP /xrpc/com.atproto.server.refreshSession
687- // AG /xrpc/com.atproto.server.getServiceAuth
688- // AG /xrpc/com.atproto.server.getSession
689- // AP /xrpc/com.atproto.server.createInviteCode
690 Router::new()
691 .route(concat!("/", server::describe_server::NSID), get(describe_server))
692 .route(concat!("/", server::create_account::NSID), post(create_account))
···1+//! Server endpoints. (/xrpc/com.atproto.server.*)
2use std::{collections::HashMap, str::FromStr as _};
34use anyhow::{Context as _, anyhow};
···38/// This is a dummy password that can be used in absence of a real password.
39const DUMMY_PASSWORD: &str = "$argon2id$v=19$m=19456,t=2,p=1$En2LAfHjeO0SZD5IUU1Abg$RpS8nHhhqY4qco2uyd41p9Y/1C+Lvi214MAWukzKQMI";
4041+/// Create an invite code.
42+/// - POST /xrpc/com.atproto.server.createInviteCode
43+/// ### Request Body
44+/// - `useCount`: integer
45+/// - `forAccount`: string (optional)
46+/// ### Responses
47+/// - 200 OK: {code: string}
48+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]}
49+/// - 401 Unauthorized
50async fn create_invite_code(
51 _user: AuthenticatedUser,
52 State(db): State<Db>,
···80 ))
81}
8283+#[expect(clippy::too_many_lines, reason = "TODO: refactor")]
84+/// Create an account. Implemented by PDS.
85+/// - POST /xrpc/com.atproto.server.createAccount
86+/// ### Request Body
87+/// - `email`: string
88+/// - `handle`: string (required)
89+/// - `did`: string - Pre-existing atproto DID, being imported to a new account.
90+/// - `inviteCode`: string
91+/// - `verificationCode`: string
92+/// - `verificationPhone`: string
93+/// - `password`: string - Initial account password. May need to meet instance-specific password strength requirements.
94+/// - `recoveryKey`: string - DID PLC rotation key (aka, recovery key) to be included in PLC creation operation.
95+/// - `plcOp`: object
96+/// ## Responses
97+/// - 200 OK: {"accessJwt": "string","refreshJwt": "string","handle": "string","did": "string","didDoc": {}}
98+/// - 400 Bad Request: {error: [`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidHandle`, `InvalidPassword`, \
99+/// `InvalidInviteCode`, `HandleNotAvailable`, `UnsupportedDomain`, `UnresolvableDid`, `IncompatibleDidDoc`)}
100+/// - 401 Unauthorized
101async fn create_account(
102 State(db): State<Db>,
103 State(skey): State<SigningKey>,
···192 prev: None,
193 },
194 )
0195 .context("failed to sign genesis op")?;
196 let op_bytes = serde_ipld_dagcbor::to_vec(&op).context("failed to encode genesis op")?;
197···206 #[expect(clippy::string_slice, reason = "digest length confirmed")]
207 digest[..24].to_owned()
208 };
209+ let did = format!("did:plc:{did_hash}");
210211+ let doc = tokio::fs::File::create(config.plc.path.join(format!("{did_hash}.car")))
212 .await
213 .context("failed to create did doc")?;
214···232 // Write out an initial commit for the user.
233 // https://atproto.com/guides/account-lifecycle
234 let (cid, rev, store) = async {
235+ let file = tokio::fs::File::create_new(config.repo.path.join(format!("{did_hash}.car")))
236 .await
237 .context("failed to create repo file")?;
238 let mut store = CarStore::create(file)
···343 let token = auth::sign(
344 &skey,
345 "at+jwt",
346+ &serde_json::json!({
347 "scope": "com.atproto.access",
348 "sub": did,
349 "iat": chrono::Utc::now().timestamp(),
···356 let refresh_token = auth::sign(
357 &skey,
358 "refresh+jwt",
359+ &serde_json::json!({
360 "scope": "com.atproto.refresh",
361 "sub": did,
362 "iat": chrono::Utc::now().timestamp(),
···378 ))
379}
380381+/// Create an authentication session.
382+/// - POST /xrpc/com.atproto.server.createSession
383+/// ### Request Body
384+/// - `identifier`: string - Handle or other identifier supported by the server for the authenticating user.
385+/// - `password`: string - Password for the authenticating user.
386+/// - `authFactorToken` - string (optional)
387+/// - `allowTakedown` - boolean (optional) - When true, instead of throwing error for takendown accounts, a valid response with a narrow scoped token will be returned
388+/// ### Responses
389+/// - 200 OK: {"accessJwt": "string","refreshJwt": "string","handle": "string","did": "string","didDoc": {},"email": "string","emailConfirmed": true,"emailAuthFactor": true,"active": true,"status": "takendown"}
390+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `AccountTakedown`, `AuthFactorTokenRequired`]}
391+/// - 401 Unauthorized
392async fn create_session(
393 State(db): State<Db>,
394 State(skey): State<SigningKey>,
···401 // TODO: `input.allow_takedown`
402 // TODO: `input.auth_factor_token`
403404+ let Some(account) = sqlx::query!(
405 r#"
406+ WITH LatestHandles AS (
407+ SELECT did, handle
408+ FROM handles
409+ WHERE (did, created_at) IN (
410+ SELECT did, MAX(created_at) AS max_created_at
411+ FROM handles
412+ GROUP BY did
413+ )
414+ )
415+ SELECT a.did, a.password, h.handle
416+ FROM accounts a
417+ LEFT JOIN LatestHandles h ON a.did = h.did
418+ WHERE h.handle = ?
419+ "#,
420 handle
421 )
422 .fetch_optional(&db)
423 .await
424 .context("failed to authenticate")?
425+ else {
00426 counter!(AUTH_FAILED).increment(1);
427428 // SEC: Call argon2's `verify_password` to simulate password verification and discard the result.
···443 password.as_bytes(),
444 &PasswordHash::new(account.password.as_str()).context("invalid password hash in db")?,
445 ) {
446+ Ok(()) => {}
447 Err(_e) => {
448 counter!(AUTH_FAILED).increment(1);
449···459 let token = auth::sign(
460 &skey,
461 "at+jwt",
462+ &serde_json::json!({
463 "scope": "com.atproto.access",
464 "sub": did,
465 "iat": chrono::Utc::now().timestamp(),
···472 let refresh_token = auth::sign(
473 &skey,
474 "refresh+jwt",
475+ &serde_json::json!({
476 "scope": "com.atproto.refresh",
477 "sub": did,
478 "iat": chrono::Utc::now().timestamp(),
···500 ))
501}
502503+/// Refresh an authentication session. Requires auth using the 'refreshJwt' (not the 'accessJwt').
504+/// - POST /xrpc/com.atproto.server.refreshSession
505+/// ### Responses
506+/// - 200 OK: {"accessJwt": "string","refreshJwt": "string","handle": "string","did": "string","didDoc": {},"active": true,"status": "takendown"}
507+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `AccountTakedown`]}
508+/// - 401 Unauthorized
509async fn refresh_session(
510 State(db): State<Db>,
511 State(skey): State<SigningKey>,
···532 }
533 if claims
534 .get("exp")
535+ .and_then(serde_json::Value::as_i64)
536 .context("failed to get `exp`")?
537 < chrono::Utc::now().timestamp()
538 {
···576 let token = auth::sign(
577 &skey,
578 "at+jwt",
579+ &serde_json::json!({
580 "scope": "com.atproto.access",
581 "sub": did,
582 "iat": chrono::Utc::now().timestamp(),
···589 let refresh_token = auth::sign(
590 &skey,
591 "refresh+jwt",
592+ &serde_json::json!({
593 "scope": "com.atproto.refresh",
594 "sub": did,
595 "iat": chrono::Utc::now().timestamp(),
···617 ))
618}
619620+/// Get a signed token on behalf of the requesting DID for the requested service.
621+/// - GET /xrpc/com.atproto.server.getServiceAuth
622+/// ### Request Query Parameters
623+/// - `aud`: string - The DID of the service that the token will be used to authenticate with
624+/// - `exp`: integer (optional) - The time in Unix Epoch seconds that the JWT expires. Defaults to 60 seconds in the future. The service may enforce certain time bounds on tokens depending on the requested scope.
625+/// - `lxm`: string (optional) - Lexicon (XRPC) method to bind the requested token to
626+/// ### Responses
627+/// - 200 OK: {token: string}
628+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `BadExpiration`]}
629+/// - 401 Unauthorized
630async fn get_service_auth(
631 user: AuthenticatedUser,
632 State(skey): State<SigningKey>,
···660 }
661662 // Mint a bearer token by signing a JSON web token.
663+ let token = auth::sign(&skey, "JWT", &claims).context("failed to sign jwt")?;
664665 Ok(Json(server::get_service_auth::OutputData { token }.into()))
666}
667668+/// Get information about the current auth session. Requires auth.
669+/// - GET /xrpc/com.atproto.server.getSession
670+/// ### Responses
671+/// - 200 OK: {"handle": "string","did": "string","email": "string","emailConfirmed": true,"emailAuthFactor": true,"didDoc": {},"active": true,"status": "takendown"}
672+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]}
673+/// - 401 Unauthorized
674async fn get_session(
675 user: AuthenticatedUser,
676 State(db): State<Db>,
···719 }
720}
721722+/// Describes the server's account creation requirements and capabilities. Implemented by PDS.
723+/// - GET /xrpc/com.atproto.server.describeServer
724+/// ### Responses
725+/// - 200 OK: {"inviteCodeRequired": true,"phoneVerificationRequired": true,"availableUserDomains": [`string`],"links": {"privacyPolicy": "string","termsOfService": "string"},"contact": {"email": "string"},"did": "string"}
726+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]}
727+/// - 401 Unauthorized
728async fn describe_server(
729 State(config): State<AppConfig>,
730) -> Result<Json<server::describe_server::Output>> {
···743}
744745#[rustfmt::skip]
746+/// These endpoints are part of the atproto PDS server and account management APIs. \
747+/// Requests often require authentication and are made directly to the user's own PDS instance.
748+/// ### Routes
749+/// - `GET /xrpc/com.atproto.server.describeServer` -> [`describe_server`]
750+/// - `POST /xrpc/com.atproto.server.createAccount` -> [`create_account`]
751+/// - `POST /xrpc/com.atproto.server.createSession` -> [`create_session`]
752+/// - `POST /xrpc/com.atproto.server.refreshSession` -> [`refresh_session`]
753+/// - `GET /xrpc/com.atproto.server.getServiceAuth` -> [`get_service_auth`]
754+/// - `GET /xrpc/com.atproto.server.getSession` -> [`get_session`]
755+/// - `POST /xrpc/com.atproto.server.createInviteCode` -> [`create_invite_code`]
756pub(super) fn routes() -> Router<AppState> {
0000000757 Router::new()
758 .route(concat!("/", server::describe_server::NSID), get(describe_server))
759 .route(concat!("/", server::create_account::NSID), post(create_account))
+92-16
src/endpoints/sync.rs
···01use std::str::FromStr as _;
23use anyhow::{Context as _, anyhow};
···27 storage::{open_repo_db, open_store},
28};
2930-// HACK: `limit` may be passed as a string, so we must treat it as one.
31#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
32#[serde(rename_all = "camelCase")]
0033pub(super) struct ListBlobsParameters {
34 #[serde(skip_serializing_if = "core::option::Option::is_none")]
035 pub cursor: Option<String>,
36 ///The DID of the repo.
37 pub did: Did,
38 #[serde(skip_serializing_if = "core::option::Option::is_none")]
039 pub limit: Option<String>,
40 ///Optional revision of the repo to list blobs since.
41 #[serde(skip_serializing_if = "core::option::Option::is_none")]
42 pub since: Option<String>,
43}
44-// HACK: `limit` may be passed as a string, so we must treat it as one.
45#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
46#[serde(rename_all = "camelCase")]
0047pub(super) struct ListReposParameters {
48 #[serde(skip_serializing_if = "core::option::Option::is_none")]
049 pub cursor: Option<String>,
50 #[serde(skip_serializing_if = "core::option::Option::is_none")]
051 pub limit: Option<String>,
52}
53-// HACK: `cursor` may be passed as a string, so we must treat it as one.
54#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
55#[serde(rename_all = "camelCase")]
0056pub(super) struct SubscribeReposParametersData {
57 ///The last known event seq number to backfill from.
58 #[serde(skip_serializing_if = "core::option::Option::is_none")]
···80 let s = ReaderStream::new(f);
8182 Ok(Response::builder()
83- .header(http::header::CONTENT_LENGTH, format!("{}", len))
84 .body(Body::from_stream(s))
85 .context("failed to construct response")?)
86}
8700000000088async fn get_blocks(
89 State(config): State<AppConfig>,
90 Query(input): Query<sync::get_blocks::Parameters>,
···120 .context("failed to construct response")?)
121}
122000000123async fn get_latest_commit(
124 State(config): State<AppConfig>,
125 State(db): State<Db>,
···141 ))
142}
143000000000144async fn get_record(
145 State(config): State<AppConfig>,
146 State(db): State<Db>,
···168 .context("failed to construct response")?)
169}
170000000171async fn get_repo_status(
172 State(db): State<Db>,
173 Query(input): Query<sync::get_repo::Parameters>,
···178 .await
179 .context("failed to execute query")?;
180181- let r = if let Some(r) = r {
182- r
183- } else {
184 return Err(Error::with_status(
185 StatusCode::NOT_FOUND,
186 anyhow!("account not found"),
···201 ))
202}
203000000000204async fn get_repo(
205 State(config): State<AppConfig>,
206 State(db): State<Db>,
···225 .context("failed to construct response")?)
226}
2270000000000228async fn list_blobs(
229 State(db): State<Db>,
230 Query(input): Query<ListBlobsParameters>,
···255 ))
256}
25700000000258async fn list_repos(
259 State(db): State<Db>,
260 Query(input): Query<ListReposParameters>,
261) -> Result<Json<sync::list_repos::Output>> {
262 struct Record {
0263 did: String,
0264 rev: String,
0265 root: String,
266 }
267···317 Ok(Json(sync::list_repos::OutputData { cursor, repos }.into()))
318}
3190000000320async fn subscribe_repos(
321 ws_up: WebSocketUpgrade,
322 State(fh): State<FirehoseProducer>,
···337}
338339#[rustfmt::skip]
000000000000340pub(super) fn routes() -> Router<AppState> {
341- // UG /xrpc/com.atproto.sync.getBlob
342- // UG /xrpc/com.atproto.sync.getBlocks
343- // UG /xrpc/com.atproto.sync.getLatestCommit
344- // UG /xrpc/com.atproto.sync.getRecord
345- // UG /xrpc/com.atproto.sync.getRepoStatus
346- // UG /xrpc/com.atproto.sync.getRepo
347- // UG /xrpc/com.atproto.sync.listBlobs
348- // UG /xrpc/com.atproto.sync.listRepos
349- // UG /xrpc/com.atproto.sync.subscribeRepos
350 Router::new()
351 .route(concat!("/", sync::get_blob::NSID), get(get_blob))
352 .route(concat!("/", sync::get_blocks::NSID), get(get_blocks))
···1+//! Endpoints for the `ATProto` sync API. (/xrpc/com.atproto.sync.*)
2use std::str::FromStr as _;
34use anyhow::{Context as _, anyhow};
···28 storage::{open_repo_db, open_store},
29};
30031#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
32#[serde(rename_all = "camelCase")]
33+/// Parameters for `/xrpc/com.atproto.sync.listBlobs` \
34+/// HACK: `limit` may be passed as a string, so we must treat it as one.
35pub(super) struct ListBlobsParameters {
36 #[serde(skip_serializing_if = "core::option::Option::is_none")]
37+ /// Optional cursor to paginate through blobs.
38 pub cursor: Option<String>,
39 ///The DID of the repo.
40 pub did: Did,
41 #[serde(skip_serializing_if = "core::option::Option::is_none")]
42+ /// Optional limit of blobs to return.
43 pub limit: Option<String>,
44 ///Optional revision of the repo to list blobs since.
45 #[serde(skip_serializing_if = "core::option::Option::is_none")]
46 pub since: Option<String>,
47}
048#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
49#[serde(rename_all = "camelCase")]
50+/// Parameters for `/xrpc/com.atproto.sync.listRepos` \
51+/// HACK: `limit` may be passed as a string, so we must treat it as one.
52pub(super) struct ListReposParameters {
53 #[serde(skip_serializing_if = "core::option::Option::is_none")]
54+ /// Optional cursor to paginate through repos.
55 pub cursor: Option<String>,
56 #[serde(skip_serializing_if = "core::option::Option::is_none")]
57+ /// Optional limit of repos to return.
58 pub limit: Option<String>,
59}
060#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
61#[serde(rename_all = "camelCase")]
62+/// Parameters for `/xrpc/com.atproto.sync.subscribeRepos` \
63+/// HACK: `cursor` may be passed as a string, so we must treat it as one.
64pub(super) struct SubscribeReposParametersData {
65 ///The last known event seq number to backfill from.
66 #[serde(skip_serializing_if = "core::option::Option::is_none")]
···88 let s = ReaderStream::new(f);
8990 Ok(Response::builder()
91+ .header(http::header::CONTENT_LENGTH, format!("{len}"))
92 .body(Body::from_stream(s))
93 .context("failed to construct response")?)
94}
9596+/// Enumerates which accounts the requesting account is currently blocking. Requires auth.
97+/// - GET /xrpc/com.atproto.sync.getBlocks
98+/// ### Query Parameters
99+/// - `limit`: integer, optional, default: 50, >=1 and <=100
100+/// - `cursor`: string, optional
101+/// ### Responses
102+/// - 200 OK: ...
103+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]}
104+/// - 401 Unauthorized
105async fn get_blocks(
106 State(config): State<AppConfig>,
107 Query(input): Query<sync::get_blocks::Parameters>,
···137 .context("failed to construct response")?)
138}
139140+/// Get the current commit CID & revision of the specified repo. Does not require auth.
141+/// ### Query Parameters
142+/// - `did`: The DID of the repo.
143+/// ### Responses
144+/// - 200 OK: {"cid": "string","rev": "string"}
145+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `RepoTakendown`, `RepoSuspended`, `RepoDeactivated`]}
146async fn get_latest_commit(
147 State(config): State<AppConfig>,
148 State(db): State<Db>,
···164 ))
165}
166167+/// Get data blocks needed to prove the existence or non-existence of record in the current version of repo. Does not require auth.
168+/// ### Query Parameters
169+/// - `did`: The DID of the repo.
170+/// - `collection`: nsid
171+/// - `rkey`: record-key
172+/// ### Responses
173+/// - 200 OK: ...
174+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `RecordNotFound`, `RepoNotFound`, `RepoTakendown`,
175+/// `RepoSuspended`, `RepoDeactivated`]}
176async fn get_record(
177 State(config): State<AppConfig>,
178 State(db): State<Db>,
···200 .context("failed to construct response")?)
201}
202203+/// Get the hosting status for a repository, on this server. Expected to be implemented by PDS and Relay.
204+/// ### Query Parameters
205+/// - `did`: The DID of the repo.
206+/// ### Responses
207+/// - 200 OK: {"did": "string","active": true,"status": "takendown","rev": "string"}
208+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `RepoNotFound`]}
209async fn get_repo_status(
210 State(db): State<Db>,
211 Query(input): Query<sync::get_repo::Parameters>,
···216 .await
217 .context("failed to execute query")?;
218219+ let Some(r) = r else {
00220 return Err(Error::with_status(
221 StatusCode::NOT_FOUND,
222 anyhow!("account not found"),
···237 ))
238}
239240+/// Download a repository export as CAR file. Optionally only a 'diff' since a previous revision.
241+/// Does not require auth; implemented by PDS.
242+/// ### Query Parameters
243+/// - `did`: The DID of the repo.
244+/// - `since`: The revision ('rev') of the repo to create a diff from.
245+/// ### Responses
246+/// - 200 OK: ...
247+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `RepoNotFound`,
248+/// `RepoTakendown`, `RepoSuspended`, `RepoDeactivated`]}
249async fn get_repo(
250 State(config): State<AppConfig>,
251 State(db): State<Db>,
···270 .context("failed to construct response")?)
271}
272273+/// List blob CIDs for an account, since some repo revision. Does not require auth; implemented by PDS.
274+/// ### Query Parameters
275+/// - `did`: The DID of the repo. Required.
276+/// - `since`: Optional revision of the repo to list blobs since.
277+/// - `limit`: >= 1 and <= 1000, default 500
278+/// - `cursor`: string
279+/// ### Responses
280+/// - 200 OK: {"cursor": "string","cids": [string]}
281+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `RepoNotFound`, `RepoTakendown`,
282+/// `RepoSuspended`, `RepoDeactivated`]}
283async fn list_blobs(
284 State(db): State<Db>,
285 Query(input): Query<ListBlobsParameters>,
···310 ))
311}
312313+/// Enumerates all the DID, rev, and commit CID for all repos hosted by this service.
314+/// Does not require auth; implemented by PDS and Relay.
315+/// ### Query Parameters
316+/// - `limit`: >= 1 and <= 1000, default 500
317+/// - `cursor`: string
318+/// ### Responses
319+/// - 200 OK: {"cursor": "string","repos": [{"did": "string","head": "string","rev": "string","active": true,"status": "takendown"}]}
320+/// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]}
321async fn list_repos(
322 State(db): State<Db>,
323 Query(input): Query<ListReposParameters>,
324) -> Result<Json<sync::list_repos::Output>> {
325 struct Record {
326+ /// The DID of the repo.
327 did: String,
328+ /// The commit CID of the repo.
329 rev: String,
330+ /// The root CID of the repo.
331 root: String,
332 }
333···383 Ok(Json(sync::list_repos::OutputData { cursor, repos }.into()))
384}
385386+/// Repository event stream, aka Firehose endpoint. Outputs repo commits with diff data, and identity update events,
387+/// for all repositories on the current server. See the atproto specifications for details around stream sequencing,
388+/// repo versioning, CAR diff format, and more. Public and does not require auth; implemented by PDS and Relay.
389+/// ### Query Parameters
390+/// - `cursor`: The last known event seq number to backfill from.
391+/// ### Responses
392+/// - 200 OK: ...
393async fn subscribe_repos(
394 ws_up: WebSocketUpgrade,
395 State(fh): State<FirehoseProducer>,
···410}
411412#[rustfmt::skip]
413+/// These endpoints are part of the atproto repository synchronization APIs. Requests usually do not require authentication,
414+/// and can be made to PDS intances or Relay instances.
415+/// ### Routes
416+/// - `GET /xrpc/com.atproto.sync.getBlob` -> [`get_blob`]
417+/// - `GET /xrpc/com.atproto.sync.getBlocks` -> [`get_blocks`]
418+/// - `GET /xrpc/com.atproto.sync.getLatestCommit` -> [`get_latest_commit`]
419+/// - `GET /xrpc/com.atproto.sync.getRecord` -> [`get_record`]
420+/// - `GET /xrpc/com.atproto.sync.getRepoStatus` -> [`get_repo_status`]
421+/// - `GET /xrpc/com.atproto.sync.getRepo` -> [`get_repo`]
422+/// - `GET /xrpc/com.atproto.sync.listBlobs` -> [`list_blobs`]
423+/// - `GET /xrpc/com.atproto.sync.listRepos` -> [`list_repos`]
424+/// - `GET /xrpc/com.atproto.sync.subscribeRepos` -> [`subscribe_repos`]
425pub(super) fn routes() -> Router<AppState> {
000000000426 Router::new()
427 .route(concat!("/", sync::get_blob::NSID), get(get_blob))
428 .route(concat!("/", sync::get_blocks::NSID), get(get_blocks))
···11#[derive(Error)]
12#[expect(clippy::error_impl_error, reason = "just one")]
13pub struct Error {
14+ /// The actual error that occurred.
15 err: anyhow::Error,
16+ /// The error message to be returned as JSON body.
17 message: Option<ErrorMessage>,
18+ /// The HTTP status code to be returned.
19 status: StatusCode,
20}
2122#[derive(Default, serde::Serialize)]
23/// A JSON error message.
24pub(crate) struct ErrorMessage {
25+ /// The error type.
26+ /// This is used to identify the error in the client.
27+ /// E.g. `InvalidRequest`, `ExpiredToken`, `InvalidToken`, `HandleNotFound`.
28 error: String,
29+ /// The error message.
30 message: String,
31}
32impl std::fmt::Display for ErrorMessage {
···98}
99100impl IntoResponse for Error {
0101 fn into_response(self) -> Response {
102 error!("{:?}", self.err);
103
+42-54
src/firehose.rs
···145/// A firehose producer. This is used to transmit messages to the firehose for broadcast.
146#[derive(Clone, Debug)]
147pub(crate) struct FirehoseProducer {
0148 tx: tokio::sync::mpsc::Sender<FirehoseMessage>,
149}
150···189 }
190}
191192-#[expect(clippy::as_conversions)]
0000000193const fn convert_usize_f64(x: usize) -> Result<f64, &'static str> {
194 let result = x as f64;
195- if result as usize != x {
196 return Err("cannot convert");
197 }
198 Ok(result)
199}
200201/// Serialize a message.
202-async fn serialize_message(
203- seq: u64,
204- mut msg: sync::subscribe_repos::Message,
205-) -> (&'static str, Vec<u8>) {
206 let mut dummy_seq = 0_i64;
207 #[expect(clippy::pattern_type_mismatch)]
208 let (ty, nseq) = match &mut msg {
···214 sync::subscribe_repos::Message::Migrate(m) => ("#migrate", &mut m.seq),
215 sync::subscribe_repos::Message::Tombstone(m) => ("#tombstone", &mut m.seq),
216 };
217-218- #[expect(clippy::as_conversions)]
219- const fn convert_u64_i64(x: u64) -> Result<i64, &'static str> {
220- let result = x as i64;
221- if result as u64 != x {
222- return Err("cannot convert");
223- }
224- Ok(result)
225- }
226 // Set the sequence number.
227- *nseq = convert_u64_i64(seq).expect("should find seq");
228229 let hdr = FrameHeader::Message(ty.to_owned());
230···261) -> Result<WebSocket> {
262 if let Some(cursor) = cursor {
263 let mut frame = Vec::new();
264- #[expect(clippy::as_conversions)]
265- const fn convert_i64_u64(x: i64) -> Result<u64, &'static str> {
266- let result = x as u64;
267- if result as i64 != x {
268- return Err("cannot convert");
269- }
270- Ok(result)
271 }
272- let cursor = convert_i64_u64(cursor).expect("should find cursor");
273-274 // Cursor specified; attempt to backfill the consumer.
275 if cursor > seq {
276 let hdr = FrameHeader::Error;
···286 );
287 }
288289- for &(historical_seq, ty, ref msg) in history.iter() {
290 if cursor > historical_seq {
291 continue;
292 }
···314315 info!("attempting to reconnect to upstream relays");
316 for relay in &config.firehose.relays {
317- let host = match relay.host_str() {
318- Some(host) => host,
319- None => {
320- warn!("relay {} has no host specified", relay);
321- continue;
322- }
323 };
324325 let r = client
···356///
357/// This will broadcast all updates in this PDS out to anyone who is listening.
358///
359-/// Reference: https://atproto.com/specs/sync
360-pub(crate) async fn spawn(
361 client: Client,
362 config: AppConfig,
363) -> (tokio::task::JoinHandle<()>, FirehoseProducer) {
364 let (tx, mut rx) = tokio::sync::mpsc::channel(1000);
365 let handle = tokio::spawn(async move {
366- let mut clients: Vec<WebSocket> = Vec::new();
367- let mut history = VecDeque::with_capacity(1000);
368 fn time_since_inception() -> u64 {
369 chrono::Utc::now()
370 .timestamp_micros()
···372 .expect("should not wrap")
373 .unsigned_abs()
374 }
00375 let mut seq = time_since_inception();
376377 // TODO: We should use `com.atproto.sync.notifyOfUpdate` to reach out to relays
378 // that may have disconnected from us due to timeout.
379380 loop {
381- match tokio::time::timeout(Duration::from_secs(30), rx.recv()).await {
382- Ok(msg) => match msg {
383 Some(FirehoseMessage::Broadcast(msg)) => {
384- let (ty, by) = serialize_message(seq, msg.clone()).await;
385386 history.push_back((seq, ty, msg));
387 gauge!(FIREHOSE_HISTORY).set(
···419 }
420 // All producers have been destroyed.
421 None => break,
422- },
423- Err(_) => {
424- if clients.is_empty() {
425- reconnect_relays(&client, &config).await;
426- }
427428- let contents = rand::thread_rng()
429- .sample_iter(rand::distributions::Alphanumeric)
430- .take(15)
431- .map(char::from)
432- .collect::<String>();
433434- // Send a websocket ping message.
435- // Reference: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#pings_and_pongs_the_heartbeat_of_websockets
436- let message = Message::Ping(axum::body::Bytes::from_owner(contents));
437- drop(broadcast_message(&mut clients, message).await);
438- }
439 }
440 }
441 });
···145/// A firehose producer. This is used to transmit messages to the firehose for broadcast.
146#[derive(Clone, Debug)]
147pub(crate) struct FirehoseProducer {
148+ /// The channel to send messages to the firehose.
149 tx: tokio::sync::mpsc::Sender<FirehoseMessage>,
150}
151···190 }
191}
192193+#[expect(
194+ clippy::as_conversions,
195+ clippy::cast_possible_truncation,
196+ clippy::cast_sign_loss,
197+ clippy::cast_precision_loss,
198+ clippy::arithmetic_side_effects
199+)]
200+/// Convert a `usize` to a `f64`.
201const fn convert_usize_f64(x: usize) -> Result<f64, &'static str> {
202 let result = x as f64;
203+ if result as usize - x > 0 {
204 return Err("cannot convert");
205 }
206 Ok(result)
207}
208209/// Serialize a message.
210+fn serialize_message(seq: u64, mut msg: sync::subscribe_repos::Message) -> (&'static str, Vec<u8>) {
000211 let mut dummy_seq = 0_i64;
212 #[expect(clippy::pattern_type_mismatch)]
213 let (ty, nseq) = match &mut msg {
···219 sync::subscribe_repos::Message::Migrate(m) => ("#migrate", &mut m.seq),
220 sync::subscribe_repos::Message::Tombstone(m) => ("#tombstone", &mut m.seq),
221 };
000000000222 // Set the sequence number.
223+ *nseq = i64::try_from(seq).expect("should find seq");
224225 let hdr = FrameHeader::Message(ty.to_owned());
226···257) -> Result<WebSocket> {
258 if let Some(cursor) = cursor {
259 let mut frame = Vec::new();
260+ let cursor = u64::try_from(cursor);
261+ if cursor.is_err() {
262+ tracing::warn!("cursor is not a valid u64");
263+ return Ok(ws);
000264 }
265+ let cursor = cursor.expect("should be valid u64");
0266 // Cursor specified; attempt to backfill the consumer.
267 if cursor > seq {
268 let hdr = FrameHeader::Error;
···278 );
279 }
280281+ for &(historical_seq, ty, ref msg) in history {
282 if cursor > historical_seq {
283 continue;
284 }
···306307 info!("attempting to reconnect to upstream relays");
308 for relay in &config.firehose.relays {
309+ let Some(host) = relay.host_str() else {
310+ warn!("relay {} has no host specified", relay);
311+ continue;
000312 };
313314 let r = client
···345///
346/// This will broadcast all updates in this PDS out to anyone who is listening.
347///
348+/// Reference: <https://atproto.com/specs/sync>
349+pub(crate) fn spawn(
350 client: Client,
351 config: AppConfig,
352) -> (tokio::task::JoinHandle<()>, FirehoseProducer) {
353 let (tx, mut rx) = tokio::sync::mpsc::channel(1000);
354 let handle = tokio::spawn(async move {
00355 fn time_since_inception() -> u64 {
356 chrono::Utc::now()
357 .timestamp_micros()
···359 .expect("should not wrap")
360 .unsigned_abs()
361 }
362+ let mut clients: Vec<WebSocket> = Vec::new();
363+ let mut history = VecDeque::with_capacity(1000);
364 let mut seq = time_since_inception();
365366 // TODO: We should use `com.atproto.sync.notifyOfUpdate` to reach out to relays
367 // that may have disconnected from us due to timeout.
368369 loop {
370+ if let Ok(msg) = tokio::time::timeout(Duration::from_secs(30), rx.recv()).await {
371+ match msg {
372 Some(FirehoseMessage::Broadcast(msg)) => {
373+ let (ty, by) = serialize_message(seq, msg.clone());
374375 history.push_back((seq, ty, msg));
376 gauge!(FIREHOSE_HISTORY).set(
···408 }
409 // All producers have been destroyed.
410 None => break,
411+ }
412+ } else {
413+ if clients.is_empty() {
414+ reconnect_relays(&client, &config).await;
415+ }
416417+ let contents = rand::thread_rng()
418+ .sample_iter(rand::distributions::Alphanumeric)
419+ .take(15)
420+ .map(char::from)
421+ .collect::<String>();
422423+ // Send a websocket ping message.
424+ // Reference: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#pings_and_pongs_the_heartbeat_of_websockets
425+ let message = Message::Ping(axum::body::Bytes::from_owner(contents));
426+ drop(broadcast_message(&mut clients, message).await);
0427 }
428 }
429 });
+42-46
src/main.rs
···68 }
69 }
7071- #[rustfmt::skip]
72 /// Register all actor endpoints.
73 pub(crate) fn routes() -> Router<AppState> {
74 // AP /xrpc/app.bsky.actor.putPreferences
75 // AG /xrpc/app.bsky.actor.getPreferences
76 Router::new()
77- .route(concat!("/", actor::put_preferences::NSID), post(put_preferences))
78- .route(concat!("/", actor::get_preferences::NSID), get(get_preferences))
00000079 }
80}
81···202203/// The index (/) route.
204async fn index() -> impl IntoResponse {
205- r#"
206 __ __
207 /\ \__ /\ \__
208 __ \ \ ,_\ _____ _ __ ___\ \ ,_\ ___
···220221 Code: https://github.com/DrChat/bluepds
222 Protocol: https://atproto.com
223- "#
224}
225226/// Service proxy.
227///
228-/// Reference: https://atproto.com/specs/xrpc#service-proxying
229async fn service_proxy(
230 uri: Uri,
231 user: AuthenticatedUser,
···265 .await
266 .with_context(|| format!("failed to resolve did document {}", did.as_str()))?;
267268- let service = match did_doc.service.iter().find(|s| s.id == id) {
269- Some(service) => service,
270- None => {
271- return Err(Error::with_status(
272- StatusCode::BAD_REQUEST,
273- anyhow!("could not find resolve service #{id}"),
274- ));
275- }
276 };
277278- let url = service
279 .service_endpoint
280- .join(&format!("/xrpc{}", url_path))
281 .context("failed to construct target url")?;
282283 let exp = (chrono::Utc::now().checked_add_signed(chrono::Duration::minutes(1)))
···294 let token = auth::sign(
295 &skey,
296 "JWT",
297- serde_json::json!({
298 "iss": user_did.as_str(),
299 "aud": did.as_str(),
300 "lxm": lxm,
···313 }
314315 let r = client
316- .request(request.method().clone(), url)
317 .headers(h)
318 .header(http::header::AUTHORIZATION, format!("Bearer {token}"))
319 .body(reqwest::Body::wrap_stream(
···338/// The main application entry point.
339#[expect(
340 clippy::cognitive_complexity,
0341 reason = "main function has high complexity"
342)]
343async fn run() -> anyhow::Result<()> {
···346 // Set up trace logging to console and account for the user-provided verbosity flag.
347 if args.verbosity.log_level_filter() != LevelFilter::Off {
348 let lvl = match args.verbosity.log_level_filter() {
349- LevelFilter::Off => tracing::Level::INFO,
350 LevelFilter::Error => tracing::Level::ERROR,
351 LevelFilter::Warn => tracing::Level::WARN,
352- LevelFilter::Info => tracing::Level::INFO,
353 LevelFilter::Debug => tracing::Level::DEBUG,
354 LevelFilter::Trace => tracing::Level::TRACE,
355 };
···384 }
385386 // Initialize metrics reporting.
387- metrics::setup(&config.metrics).context("failed to set up metrics exporter")?;
388389 // Create a reqwest client that will be used for all outbound requests.
390 let simple_client = reqwest::Client::builder()
···404 .context("failed to create key directory")?;
405406 // Check if crypto keys exist. If not, create new ones.
407- let (skey, rkey) = match std::fs::File::open(&config.key) {
408- Ok(f) => {
409- let keys: KeyData = serde_ipld_dagcbor::from_reader(std::io::BufReader::new(f))
410- .context("failed to deserialize crypto keys")?;
411412- let skey =
413- Secp256k1Keypair::import(&keys.skey).context("failed to import signing key")?;
414- let rkey =
415- Secp256k1Keypair::import(&keys.rkey).context("failed to import rotation key")?;
416417- (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey)))
418- }
419- _ => {
420- info!("signing keys not found, generating new ones");
421422- let skey = Secp256k1Keypair::create(&mut rand::thread_rng());
423- let rkey = Secp256k1Keypair::create(&mut rand::thread_rng());
424425- let keys = KeyData {
426- skey: skey.export(),
427- rkey: rkey.export(),
428- };
429430- let mut f = std::fs::File::create(&config.key).context("failed to create key file")?;
431- serde_ipld_dagcbor::to_writer(&mut f, &keys)
432- .context("failed to serialize crypto keys")?;
433434- (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey)))
435- }
436 };
437438 tokio::fs::create_dir_all(&config.repo.path).await?;
···451 .await
452 .context("failed to apply migrations")?;
453454- let (_fh, fhp) = firehose::spawn(client.clone(), config.clone()).await;
455456 let addr = config
457 .listen_address
···536537 serve
538 .await
539- .map_err(|e| e.into())
540 .and_then(|r| r)
541 .context("failed to serve app")
542}
···68 }
69 }
70071 /// Register all actor endpoints.
72 pub(crate) fn routes() -> Router<AppState> {
73 // AP /xrpc/app.bsky.actor.putPreferences
74 // AG /xrpc/app.bsky.actor.getPreferences
75 Router::new()
76+ .route(
77+ concat!("/", actor::put_preferences::NSID),
78+ post(put_preferences),
79+ )
80+ .route(
81+ concat!("/", actor::get_preferences::NSID),
82+ get(get_preferences),
83+ )
84 }
85}
86···207208/// The index (/) route.
209async fn index() -> impl IntoResponse {
210+ r"
211 __ __
212 /\ \__ /\ \__
213 __ \ \ ,_\ _____ _ __ ___\ \ ,_\ ___
···225226 Code: https://github.com/DrChat/bluepds
227 Protocol: https://atproto.com
228+ "
229}
230231/// Service proxy.
232///
233+/// Reference: <https://atproto.com/specs/xrpc#service-proxying>
234async fn service_proxy(
235 uri: Uri,
236 user: AuthenticatedUser,
···270 .await
271 .with_context(|| format!("failed to resolve did document {}", did.as_str()))?;
272273+ let Some(service) = did_doc.service.iter().find(|s| s.id == id) else {
274+ return Err(Error::with_status(
275+ StatusCode::BAD_REQUEST,
276+ anyhow!("could not find resolve service #{id}"),
277+ ));
000278 };
279280+ let target_url: url::Url = service
281 .service_endpoint
282+ .join(&format!("/xrpc{url_path}"))
283 .context("failed to construct target url")?;
284285 let exp = (chrono::Utc::now().checked_add_signed(chrono::Duration::minutes(1)))
···296 let token = auth::sign(
297 &skey,
298 "JWT",
299+ &serde_json::json!({
300 "iss": user_did.as_str(),
301 "aud": did.as_str(),
302 "lxm": lxm,
···315 }
316317 let r = client
318+ .request(request.method().clone(), target_url)
319 .headers(h)
320 .header(http::header::AUTHORIZATION, format!("Bearer {token}"))
321 .body(reqwest::Body::wrap_stream(
···340/// The main application entry point.
341#[expect(
342 clippy::cognitive_complexity,
343+ clippy::too_many_lines,
344 reason = "main function has high complexity"
345)]
346async fn run() -> anyhow::Result<()> {
···349 // Set up trace logging to console and account for the user-provided verbosity flag.
350 if args.verbosity.log_level_filter() != LevelFilter::Off {
351 let lvl = match args.verbosity.log_level_filter() {
0352 LevelFilter::Error => tracing::Level::ERROR,
353 LevelFilter::Warn => tracing::Level::WARN,
354+ LevelFilter::Info | LevelFilter::Off => tracing::Level::INFO,
355 LevelFilter::Debug => tracing::Level::DEBUG,
356 LevelFilter::Trace => tracing::Level::TRACE,
357 };
···386 }
387388 // Initialize metrics reporting.
389+ metrics::setup(config.metrics.as_ref()).context("failed to set up metrics exporter")?;
390391 // Create a reqwest client that will be used for all outbound requests.
392 let simple_client = reqwest::Client::builder()
···406 .context("failed to create key directory")?;
407408 // Check if crypto keys exist. If not, create new ones.
409+ let (skey, rkey) = if let Ok(f) = std::fs::File::open(&config.key) {
410+ let keys: KeyData = serde_ipld_dagcbor::from_reader(std::io::BufReader::new(f))
411+ .context("failed to deserialize crypto keys")?;
0412413+ let skey = Secp256k1Keypair::import(&keys.skey).context("failed to import signing key")?;
414+ let rkey = Secp256k1Keypair::import(&keys.rkey).context("failed to import rotation key")?;
00415416+ (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey)))
417+ } else {
418+ info!("signing keys not found, generating new ones");
0419420+ let skey = Secp256k1Keypair::create(&mut rand::thread_rng());
421+ let rkey = Secp256k1Keypair::create(&mut rand::thread_rng());
422423+ let keys = KeyData {
424+ skey: skey.export(),
425+ rkey: rkey.export(),
426+ };
427428+ let mut f = std::fs::File::create(&config.key).context("failed to create key file")?;
429+ serde_ipld_dagcbor::to_writer(&mut f, &keys).context("failed to serialize crypto keys")?;
0430431+ (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey)))
0432 };
433434 tokio::fs::create_dir_all(&config.repo.path).await?;
···447 .await
448 .context("failed to apply migrations")?;
449450+ let (_fh, fhp) = firehose::spawn(client.clone(), config.clone());
451452 let addr = config
453 .listen_address
···532533 serve
534 .await
535+ .map_err(Into::into)
536 .and_then(|r| r)
537 .context("failed to serve app")
538}
+2-2
src/metrics.rs
···28pub(crate) const REPO_OP_DELETE: &str = "bluepds.repo.op.delete";
2930/// Must be ran exactly once on startup. This will declare all of the instruments for `metrics`.
31-pub(crate) fn setup(config: &Option<config::MetricConfig>) -> anyhow::Result<()> {
32 describe_counter!(AUTH_FAILED, "The number of failed authentication attempts.");
3334 describe_gauge!(FIREHOSE_HISTORY, "The size of the firehose history buffer.");
···53 describe_counter!(REPO_OP_UPDATE, "The count of updated records.");
54 describe_counter!(REPO_OP_DELETE, "The count of deleted records.");
5556- if let Some(ref config) = *config {
57 match *config {
58 config::MetricConfig::PrometheusPush(ref prometheus_config) => {
59 PrometheusBuilder::new()
···28pub(crate) const REPO_OP_DELETE: &str = "bluepds.repo.op.delete";
2930/// Must be ran exactly once on startup. This will declare all of the instruments for `metrics`.
31+pub(crate) fn setup(config: Option<&config::MetricConfig>) -> anyhow::Result<()> {
32 describe_counter!(AUTH_FAILED, "The number of failed authentication attempts.");
3334 describe_gauge!(FIREHOSE_HISTORY, "The size of the firehose history buffer.");
···53 describe_counter!(REPO_OP_UPDATE, "The count of updated records.");
54 describe_counter!(REPO_OP_DELETE, "The count of deleted records.");
5556+ if let Some(config) = config {
57 match *config {
58 config::MetricConfig::PrometheusPush(ref prometheus_config) => {
59 PrometheusBuilder::new()
+1-4
src/plc.rs
···72 pub sig: String,
73}
7475-pub(crate) async fn sign_op(
76- rkey: &RotationKey,
77- op: PlcOperation,
78-) -> anyhow::Result<SignedPlcOperation> {
79 let bytes = serde_ipld_dagcbor::to_vec(&op).context("failed to encode op")?;
80 let bytes = rkey.sign(&bytes).context("failed to sign op")?;
81
···72 pub sig: String,
73}
7475+pub(crate) fn sign_op(rkey: &RotationKey, op: PlcOperation) -> anyhow::Result<SignedPlcOperation> {
00076 let bytes = serde_ipld_dagcbor::to_vec(&op).context("failed to encode op")?;
77 let bytes = rkey.sign(&bytes).context("failed to sign op")?;
78
+1-1
src/storage.rs
···1-//! ATProto user repository datastore functionality.
23use std::str::FromStr as _;
4
···1+//! `ATProto` user repository datastore functionality.
23use std::str::FromStr as _;
4