···88use axum::{extract::FromRequestParts, http::StatusCode};
99use base64::Engine;
10101111-use crate::{AppState, Error, auth, error::ErrorMessage};
1111+use crate::{AppState, Error, error::ErrorMessage};
12121313/// This is an axum request extractor that represents an authenticated user.
1414///
1515/// If specified in an API endpoint, this will guarantee that the API can only be called
1616/// by an authenticated user.
1717-pub struct AuthenticatedUser {
1717+pub(crate) struct AuthenticatedUser {
1818 did: String,
1919}
20202121impl AuthenticatedUser {
2222- pub fn did(&self) -> String {
2222+ pub(crate) fn did(&self) -> String {
2323 self.did.clone()
2424 }
2525}
26262727impl FromRequestParts<AppState> for AuthenticatedUser {
2828- type Rejection = crate::Error;
2828+ type Rejection = Error;
29293030 async fn from_request_parts(
3131 parts: &mut axum::http::request::Parts,
3232 state: &AppState,
3333- ) -> std::result::Result<Self, Self::Rejection> {
3333+ ) -> Result<Self, Self::Rejection> {
3434 let token = parts
3535 .headers
3636 .get(axum::http::header::AUTHORIZATION)
···52525353 // N.B: We ignore all fields inside of the token up until this point because they can be
5454 // attacker-controlled.
5555- let (typ, claims) = auth::verify(&state.signing_key.did(), token).map_err(|e| {
5555+ let (typ, claims) = verify(&state.signing_key.did(), token).map_err(|e| {
5656 Error::with_status(
5757 StatusCode::UNAUTHORIZED,
5858 e.context("failed to verify auth token"),
···9797}
98989999/// Cryptographically sign a JSON web token with the specified key.
100100-pub fn sign(
100100+pub(crate) fn sign(
101101 key: &Secp256k1Keypair,
102102 typ: &str,
103103 claims: serde_json::Value,
···120120}
121121122122/// Cryptographically verify a JSON web token's validity using the specified public key.
123123-pub fn verify(key: &str, token: &str) -> anyhow::Result<(String, serde_json::Value)> {
123123+pub(crate) fn verify(key: &str, token: &str) -> anyhow::Result<(String, serde_json::Value)> {
124124 let mut parts = token.splitn(3, '.');
125125 let hdr = parts.next().context("no header")?;
126126 let claims = parts.next().context("no claims")?;
+8-8
src/config.rs
···33use serde::Deserialize;
44use url::Url;
5566-pub mod metrics {
66+pub(crate) mod metrics {
77 use super::*;
8899 #[derive(Deserialize, Debug, Clone)]
1010- pub struct PrometheusConfig {
1010+ pub(crate) struct PrometheusConfig {
1111 /// The URL of the Prometheus server's exporter endpoint.
1212 pub url: Url,
1313 }
···15151616#[derive(Deserialize, Debug, Clone)]
1717#[serde(tag = "type")]
1818-pub enum MetricConfig {
1818+pub(crate) enum MetricConfig {
1919 PrometheusPush(metrics::PrometheusConfig),
2020}
21212222#[derive(Deserialize, Debug, Clone)]
2323-pub struct FirehoseConfig {
2323+pub(crate) struct FirehoseConfig {
2424 /// A list of upstream relays that this PDS will try to reach out to.
2525 pub relays: Vec<Url>,
2626}
27272828#[derive(Deserialize, Debug, Clone)]
2929-pub struct RepoConfig {
2929+pub(crate) struct RepoConfig {
3030 /// The path to the repository storage.
3131 pub path: PathBuf,
3232}
33333434#[derive(Deserialize, Debug, Clone)]
3535-pub struct PlcConfig {
3535+pub(crate) struct PlcConfig {
3636 /// The path to the local PLC cache.
3737 pub path: PathBuf,
3838}
39394040#[derive(Deserialize, Debug, Clone)]
4141-pub struct BlobConfig {
4141+pub(crate) struct BlobConfig {
4242 /// The path to store blobs into.
4343 pub path: PathBuf,
4444 /// The maximum size limit of blobs.
···4646}
47474848#[derive(Deserialize, Debug, Clone)]
4949-pub struct AppConfig {
4949+pub(crate) struct AppConfig {
5050 /// The primary signing keys for all PLC/DID operations.
5151 pub key: PathBuf,
5252 /// The hostname of the PDS. Typically a domain name.
···350350351351 // The swap failed. Return the old commit and do not update the repository.
352352 return Ok(Json(
353353- repo::apply_writes::OutputData {
353353+ apply_writes::OutputData {
354354 results: None,
355355 commit: Some(
356356 CommitMetaData {
···438438 .await;
439439440440 Ok(Json(
441441- repo::apply_writes::OutputData {
441441+ apply_writes::OutputData {
442442 results: Some(res),
443443 commit: Some(
444444 CommitMetaData {
···702702703703#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
704704#[serde(rename_all = "camelCase")]
705705-pub struct ListRecordsParameters {
705705+pub(super) struct ListRecordsParameters {
706706 ///The NSID of the record type.
707707- pub collection: atrium_api::types::string::Nsid,
707707+ pub collection: Nsid,
708708 #[serde(skip_serializing_if = "core::option::Option::is_none")]
709709- pub cursor: core::option::Option<String>,
709709+ pub cursor: Option<String>,
710710 ///The number of records to return.
711711 #[serde(skip_serializing_if = "core::option::Option::is_none")]
712712- pub limit: core::option::Option<String>,
712712+ pub limit: Option<String>,
713713 ///The handle or DID of the repo.
714714- pub repo: atrium_api::types::string::AtIdentifier,
714714+ pub repo: AtIdentifier,
715715 ///Flag to reverse the order of the returned records.
716716 #[serde(skip_serializing_if = "core::option::Option::is_none")]
717717- pub reverse: core::option::Option<bool>,
717717+ pub reverse: Option<bool>,
718718 ///DEPRECATED: The highest sort-ordered rkey to stop at (exclusive)
719719 #[serde(skip_serializing_if = "core::option::Option::is_none")]
720720- pub rkey_end: core::option::Option<String>,
720720+ pub rkey_end: Option<String>,
721721 ///DEPRECATED: The lowest sort-ordered rkey to start from (exclusive)
722722 #[serde(skip_serializing_if = "core::option::Option::is_none")]
723723- pub rkey_start: core::option::Option<String>,
723723+ pub rkey_start: Option<String>,
724724}
725725726726async fn list_records(
···845845 drop(file);
846846 let hash = sha.finalize();
847847848848- let cid = atrium_repo::Cid::new_v1(
848848+ let cid = Cid::new_v1(
849849 IPLD_RAW,
850850 atrium_repo::Multihash::wrap(IPLD_MH_SHA2_256, hash.as_slice()).unwrap(),
851851 );
···885885}
886886887887#[rustfmt::skip]
888888-pub fn routes() -> Router<AppState> {
888888+pub(super) fn routes() -> Router<AppState> {
889889 // AP /xrpc/com.atproto.repo.applyWrites
890890 // AP /xrpc/com.atproto.repo.createRecord
891891 // AP /xrpc/com.atproto.repo.putRecord
+5-5
src/endpoints/server.rs
···373373 // SEC: Call argon2's `verify_password` to simulate password verification and discard the result.
374374 // We do this to avoid exposing a timing attack where attackers can measure the response time to
375375 // determine whether or not an account exists.
376376- let _ = argon2::Argon2::default().verify_password(
376376+ let _ = Argon2::default().verify_password(
377377 password.as_bytes(),
378378 &PasswordHash::new(DUMMY_PASSWORD).unwrap(),
379379 );
···384384 ));
385385 };
386386387387- match argon2::Argon2::default().verify_password(
387387+ match Argon2::default().verify_password(
388388 password.as_bytes(),
389389 &PasswordHash::new(account.password.as_str()).context("invalid password hash in db")?,
390390 ) {
···517517 refresh_jwt: refresh_token,
518518519519 active: Some(active), // TODO?
520520- did: atrium_api::types::string::Did::new(did.to_string()).unwrap(),
520520+ did: Did::new(did.to_string()).unwrap(),
521521 did_doc: None,
522522- handle: atrium_api::types::string::Handle::new(user.handle).unwrap(),
522522+ handle: Handle::new(user.handle).unwrap(),
523523 status,
524524 }
525525 .into(),
···624624}
625625626626#[rustfmt::skip]
627627-pub fn routes() -> Router<AppState> {
627627+pub(super) fn routes() -> Router<AppState> {
628628 // UG /xrpc/com.atproto.server.describeServer
629629 // UP /xrpc/com.atproto.server.createAccount
630630 // UP /xrpc/com.atproto.server.createSession
+12-12
src/endpoints/sync.rs
···194194// HACK: `limit` may be passed as a string, so we must treat it as one.
195195#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
196196#[serde(rename_all = "camelCase")]
197197-pub struct ListBlobsParameters {
197197+pub(super) struct ListBlobsParameters {
198198 #[serde(skip_serializing_if = "core::option::Option::is_none")]
199199- pub cursor: core::option::Option<String>,
199199+ pub cursor: Option<String>,
200200 ///The DID of the repo.
201201- pub did: atrium_api::types::string::Did,
201201+ pub did: Did,
202202 #[serde(skip_serializing_if = "core::option::Option::is_none")]
203203- pub limit: core::option::Option<String>,
203203+ pub limit: Option<String>,
204204 ///Optional revision of the repo to list blobs since.
205205 #[serde(skip_serializing_if = "core::option::Option::is_none")]
206206- pub since: core::option::Option<String>,
206206+ pub since: Option<String>,
207207}
208208209209async fn list_blobs(
···224224 let cids = cids
225225 .into_iter()
226226 .map(|c| {
227227- atrium_repo::Cid::from_str(&c)
227227+ Cid::from_str(&c)
228228 .map(atrium_api::types::string::Cid::new)
229229 .map_err(anyhow::Error::new)
230230 })
···239239// HACK: `limit` may be passed as a string, so we must treat it as one.
240240#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
241241#[serde(rename_all = "camelCase")]
242242-pub struct ListReposParameters {
242242+pub(super) struct ListReposParameters {
243243 #[serde(skip_serializing_if = "core::option::Option::is_none")]
244244- pub cursor: core::option::Option<String>,
244244+ pub cursor: Option<String>,
245245 #[serde(skip_serializing_if = "core::option::Option::is_none")]
246246- pub limit: core::option::Option<String>,
246246+ pub limit: Option<String>,
247247}
248248249249async fn list_repos(
···309309// HACK: `cursor` may be passed as a string, so we must treat it as one.
310310#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
311311#[serde(rename_all = "camelCase")]
312312-pub struct SubscribeReposParametersData {
312312+pub(super) struct SubscribeReposParametersData {
313313 ///The last known event seq number to backfill from.
314314 #[serde(skip_serializing_if = "core::option::Option::is_none")]
315315- pub cursor: core::option::Option<String>,
315315+ pub cursor: Option<String>,
316316}
317317318318async fn subscribe_repos(
···336336}
337337338338#[rustfmt::skip]
339339-pub fn routes() -> axum::Router<AppState> {
339339+pub(super) fn routes() -> Router<AppState> {
340340 // UG /xrpc/com.atproto.sync.getBlob
341341 // UG /xrpc/com.atproto.sync.getBlocks
342342 // UG /xrpc/com.atproto.sync.getLatestCommit
+1-1
src/error.rs
···8686}
87878888impl IntoResponse for Error {
8989- fn into_response(self) -> axum::response::Response {
8989+ fn into_response(self) -> Response {
9090 error!("{:?}", self.err);
91919292 // N.B: Forward out the error message to the requester if this is a debug build.
+9-9
src/firehose.rs
···20202121enum FirehoseMessage {
2222 Broadcast(sync::subscribe_repos::Message),
2323- Connect(Box<(axum::extract::ws::WebSocket, Option<i64>)>),
2323+ Connect(Box<(WebSocket, Option<i64>)>),
2424}
25252626enum FrameHeader {
···120120121121/// A firehose producer. This is used to transmit messages to the firehose for broadcast.
122122#[derive(Clone, Debug)]
123123-pub struct FirehoseProducer {
123123+pub(crate) struct FirehoseProducer {
124124 tx: tokio::sync::mpsc::Sender<FirehoseMessage>,
125125}
126126127127impl FirehoseProducer {
128128 /// Broadcast an `#account` event.
129129- pub async fn account(&self, account: impl Into<sync::subscribe_repos::Account>) {
129129+ pub(crate) async fn account(&self, account: impl Into<sync::subscribe_repos::Account>) {
130130 let _ = self
131131 .tx
132132 .send(FirehoseMessage::Broadcast(
···136136 }
137137138138 /// Broadcast an `#identity` event.
139139- pub async fn identity(&self, identity: impl Into<sync::subscribe_repos::Identity>) {
139139+ pub(crate) async fn identity(&self, identity: impl Into<sync::subscribe_repos::Identity>) {
140140 let _ = self
141141 .tx
142142 .send(FirehoseMessage::Broadcast(
···146146 }
147147148148 /// Broadcast a `#commit` event.
149149- pub async fn commit(&self, commit: impl Into<sync::subscribe_repos::Commit>) {
149149+ pub(crate) async fn commit(&self, commit: impl Into<sync::subscribe_repos::Commit>) {
150150 let _ = self
151151 .tx
152152 .send(FirehoseMessage::Broadcast(
···155155 .await;
156156 }
157157158158- pub async fn client_connection(&self, ws: WebSocket, cursor: Option<i64>) {
158158+ pub(crate) async fn client_connection(&self, ws: WebSocket, cursor: Option<i64>) {
159159 let _ = self
160160 .tx
161161 .send(FirehoseMessage::Connect(Box::new((ws, cursor))))
···213213 seq: u64,
214214 history: &VecDeque<(u64, &str, sync::subscribe_repos::Message)>,
215215 cursor: Option<i64>,
216216-) -> anyhow::Result<WebSocket> {
216216+) -> Result<WebSocket> {
217217 if let Some(cursor) = cursor {
218218 let mut frame = Vec::new();
219219 let cursor = cursor as u64;
···257257 Ok(ws)
258258}
259259260260-pub async fn reconnect_relays(client: &Client, config: &AppConfig) {
260260+pub(crate) async fn reconnect_relays(client: &Client, config: &AppConfig) {
261261 // Avoid connecting to upstream relays in test mode.
262262 if config.test {
263263 return;
···308308/// This will broadcast all updates in this PDS out to anyone who is listening.
309309///
310310/// Reference: https://atproto.com/specs/sync
311311-pub async fn spawn(
311311+pub(crate) async fn spawn(
312312 client: Client,
313313 config: AppConfig,
314314) -> (tokio::task::JoinHandle<()>, FirehoseProducer) {
+4-4
src/main.rs
···4242mod plc;
4343mod storage;
44444545-pub type Result<T> = std::result::Result<T, error::Error>;
4545+pub type Result<T> = std::result::Result<T, Error>;
4646pub use error::Error;
4747use uuid::Uuid;
48484949pub type Client = reqwest_middleware::ClientWithMiddleware;
5050-pub type Db = sqlx::SqlitePool;
5050+pub type Db = SqlitePool;
5151pub type Cred = Arc<dyn TokenCredential>;
52525353pub const APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
···191191 }
192192193193 #[rustfmt::skip]
194194- pub fn routes() -> Router<AppState> {
194194+ pub(crate) fn routes() -> Router<AppState> {
195195 // AP /xrpc/app.bsky.actor.putPreferences
196196 // AG /xrpc/app.bsky.actor.getPreferences
197197 Router::new()
···289289 let r = client
290290 .request(request.method().clone(), url)
291291 .headers(h)
292292- .header(axum::http::header::AUTHORIZATION, format!("Bearer {token}"))
292292+ .header(http::header::AUTHORIZATION, format!("Bearer {token}"))
293293 .body(reqwest::Body::wrap_stream(
294294 request.into_body().into_data_stream(),
295295 ))