···1313PDS_HOSTNAME=localhost:3000
1414PLC_URL=plc.directory
15151616-# A comma-separated list of WebSocket URLs for firehose relays to push updates to.
1717-# e.g., RELAYS=wss://relay.bsky.social,wss://another-relay.com
1818-RELAYS=
1616+# A comma-separated list of relay URLs to notify via requestCrawl when we have updates.
1717+# e.g., CRAWLERS=https://bsky.network
1818+CRAWLERS=
19192020# Notification Service Configuration
2121# At least one notification channel should be configured for user notifications to work.
2222+2223# Email notifications (via sendmail/msmtp)
2324# MAIL_FROM_ADDRESS=noreply@example.com
2425# MAIL_FROM_NAME=My PDS
2526# SENDMAIL_PATH=/usr/sbin/sendmail
26272727-# Discord notifications (not yet implemented)
2828-# DISCORD_BOT_TOKEN=your-bot-token
2828+# Discord notifications (via webhook)
2929+# DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/...
29303030-# Telegram notifications (not yet implemented)
3131+# Telegram notifications (via bot)
3132# TELEGRAM_BOT_TOKEN=your-bot-token
32333333-# Signal notifications (not yet implemented)
3434+# Signal notifications (via signal-cli)
3435# SIGNAL_CLI_PATH=/usr/local/bin/signal-cli
3535-# SIGNAL_PHONE_NUMBER=+1234567890
3636+# SIGNAL_SENDER_NUMBER=+1234567890
36373738CARGO_MOMMYS_LITTLE=mister
3839CARGO_MOMMYS_PRONOUNS=his
···11-# Lewis' BS PDS Sandbox
11+# BSPDS, a Personal Data Server
2233-When I'm actually done then yeah let's make this into a proper official-looking repo perhaps under an official-looking account or something.
33+A production-grade Personal Data Server (PDS) implementation for the AT Protocol.
4455-This project implements a Personal Data Server (PDS) implementation for the AT Protocol.
55+Uses PostgreSQL instead of SQLite, S3-compatible blob storage, and is designed to be a complete drop-in replacement for Bluesky's reference PDS implementation.
6677-Uses PostgreSQL instead of SQLite, S3-compatible blob storage, and aims to be a complete drop-in replacement for Bluesky's reference PDS implementation.
77+## Features
8899-In fact I aim to also implement a plugin system soon, so that we can add things onto our own PDSes on top of the default BS.
99+- Full AT Protocol support, all `com.atproto.*` endpoints implemented
1010+- OAuth 2.1 Provider. PKCE, DPoP, Pushed Authorization Requests
1111+- PostgreSQL, prod-ready database backend
1212+- S3-compatible object storage for blobs; works with AWS S3, UpCloud object storage, self-hosted MinIO, etc.
1313+- WebSocket `subscribeRepos` endpoint for real-time sync
1414+- Crawler notifications via `requestCrawl`
1515+- Multi-channel notifications: email, discord, telegram, signal
1616+- Per-IP rate limiting on sensitive endpoints
10171111-I'm also taking ideas on what other PDSes lack, such as an on-PDS webpage that users can access to manage their records and preferences.
1818+## Running Locally
12191313-:3
2020+Requires Rust installed locally.
14211515-# Running locally
2222+Run PostgreSQL and S3-compatible object store (e.g., with podman/docker):
16231717-The reader will need rust installed locally.
2424+```bash
2525+podman compose up db objsto -d
2626+```
18271919-I personally run the postgres db, and an S3-compatible object store with podman compose up db objsto -d.
2828+Run the PDS:
20292121-Run the PDS directly:
3030+```bash
3131+just run
3232+```
22332323- just run
2424-2525-Configuration is via environment variables:
3434+## Configuration
26352727- DATABASE_URL postgres connection string
2828- S3_BUCKET blob storage bucket name
2929- S3_ENDPOINT S3 endpoint URL (for MinIO etc)
3030- AWS_ACCESS_KEY_ID S3 credentials
3131- AWS_SECRET_ACCESS_KEY
3232- AWS_REGION
3333- PDS_HOSTNAME public hostname of this PDS
3434- APPVIEW_URL appview to proxy unimplemented endpoints to
3535- RELAYS comma-separated list of relay WebSocket URLs
3636+### Required
36373737-Optional email stuff:
3838+| Variable | Description |
3939+|----------|-------------|
4040+| `DATABASE_URL` | PostgreSQL connection string |
4141+| `S3_BUCKET` | Blob storage bucket name |
4242+| `S3_ENDPOINT` | S3 endpoint URL (for MinIO, etc.) |
4343+| `AWS_ACCESS_KEY_ID` | S3 credentials |
4444+| `AWS_SECRET_ACCESS_KEY` | S3 credentials |
4545+| `AWS_REGION` | S3 region |
4646+| `PDS_HOSTNAME` | Public hostname of this PDS |
4747+| `JWT_SECRET` | Secret for OAuth token signing (HS256) |
4848+| `KEY_ENCRYPTION_KEY` | Key for encrypting user signing keys (AES-256-GCM) |
38493939- MAIL_FROM_ADDRESS sender address (enables email notifications)
4040- MAIL_FROM_NAME sender name (default: BSPDS)
4141- SENDMAIL_PATH path to sendmail binary
5050+### Optional
42514343-Development
5252+| Variable | Description |
5353+|----------|-------------|
5454+| `APPVIEW_URL` | Appview URL to proxy unimplemented endpoints to |
5555+| `CRAWLERS` | Comma-separated list of relay URLs to notify via `requestCrawl` |
44564545- just shows available commands
4646- just test run tests (spins up postgres and minio via testcontainers)
4747- just lint clippy + fmt check
4848- just db-reset drop and recreate local database
5757+### Notifications
49585050-The test suite uses testcontainers so you don't need to set up anything manually for running tests.
5959+At least one channel should be configured for user notifications (password reset, email verification, etc.):
51605252-## What's implemented
6161+| Variable | Description |
6262+|----------|-------------|
6363+| `MAIL_FROM_ADDRESS` | Email sender address (enables email via sendmail) |
6464+| `MAIL_FROM_NAME` | Email sender name (default: "BSPDS") |
6565+| `SENDMAIL_PATH` | Path to sendmail binary (default: /usr/sbin/sendmail) |
6666+| `DISCORD_WEBHOOK_URL` | Discord webhook URL for notifications |
6767+| `TELEGRAM_BOT_TOKEN` | Telegram bot token for notifications |
6868+| `SIGNAL_CLI_PATH` | Path to signal-cli binary |
6969+| `SIGNAL_SENDER_NUMBER` | Signal sender phone number (+1234567890 format) |
53705454-Most of the com.atproto.* namespace is done. Server endpoints, repo operations, sync, identity, admin, moderation. The firehose websocket works. OAuth is not done yet.
7171+## Development
55725656-See TODO.md for the full breakdown of what's done and what's left.
7373+```bash
7474+just # Show available commands
7575+just test # Run tests (auto-starts postgres/minio, runs nextest)
7676+just lint # Clippy + fmt check
7777+just db-reset # Drop and recreate local database
7878+```
57795858-Structure
8080+## Project Structure
59816060- src/
6161- main.rs server entrypoint
6262- lib.rs router setup
6363- state.rs app state (db pool, stores)
6464- api/ XRPC handlers organized by namespace
6565- auth/ JWT handling
6666- repo/ postgres block store
6767- storage/ S3 blob storage
6868- sync/ firehose, relay clients
6969- notifications/ email service
7070- tests/ integration tests
7171- migrations/ sqlx migrations
8282+```
8383+src/
8484+ main.rs Server entrypoint
8585+ lib.rs Router setup
8686+ state.rs AppState (db pool, stores, rate limiters, circuit breakers)
8787+ api/ XRPC handlers organized by namespace
8888+ auth/ JWT authentication (ES256K per-user keys)
8989+ oauth/ OAuth 2.1 provider (HS256 server-wide)
9090+ repo/ PostgreSQL block store
9191+ storage/ S3 blob storage
9292+ sync/ Firehose, CAR export, crawler notifications
9393+ notifications/ Multi-channel notification service
9494+ plc/ PLC directory client
9595+ circuit_breaker/ Circuit breaker for external services
9696+ rate_limit/ Per-IP rate limiting
9797+tests/ Integration tests
9898+migrations/ SQLx migrations
9999+```
721007373-License
101101+## License
741027575-idk
103103+TBD
+49-37
TODO.md
···8181 - [x] Implement `com.atproto.sync.listBlobs`.
8282- [x] Crawler Interaction
8383 - [x] Implement `com.atproto.sync.requestCrawl` (Notify relays to index us).
8484+- [x] Deprecated Sync Endpoints (for compatibility)
8585+ - [x] Implement `com.atproto.sync.getCheckout` (deprecated).
8686+ - [x] Implement `com.atproto.sync.getHead` (deprecated).
84878588## Identity (`com.atproto.identity`)
8689- [x] Resolution
···108111- [x] Implement `com.atproto.moderation.createReport`.
109112110113## Temp Namespace (`com.atproto.temp`)
111111-- [ ] Implement `com.atproto.temp.checkSignupQueue` (signup queue status for gated signups).
114114+- [x] Implement `com.atproto.temp.checkSignupQueue` (signup queue status for gated signups).
115115+116116+## Misc HTTP Endpoints
117117+- [x] Implement `/robots.txt` endpoint.
112118113119## OAuth 2.1 Support
114120Full OAuth 2.1 provider for ATProto native app authentication.
115121- [x] OAuth Provider Core
116122 - [x] Implement `/.well-known/oauth-protected-resource` metadata endpoint.
117123 - [x] Implement `/.well-known/oauth-authorization-server` metadata endpoint.
118118- - [x] Implement `/oauth/authorize` authorization endpoint (headless JSON mode).
124124+ - [x] Implement `/oauth/authorize` authorization endpoint (with login UI).
119125 - [x] Implement `/oauth/par` Pushed Authorization Request endpoint.
120126 - [x] Implement `/oauth/token` token endpoint (authorization_code + refresh_token grants).
121127 - [x] Implement `/oauth/jwks` JSON Web Key Set endpoint.
···132138- [x] Client metadata fetching and validation.
133139- [x] PKCE (S256) enforcement.
134140- [x] OAuth token verification extractor for protected resources.
135135-- [ ] Authorization UI templates (currently headless-only, returns JSON for programmatic flows).
136136-- [ ] Implement `private_key_jwt` signature verification (currently rejects with clear error).
141141+- [x] Authorization UI templates (HTML login form).
142142+- [x] Implement `private_key_jwt` signature verification with async JWKS fetching.
143143+- [x] HS256 JWT support (matches reference PDS).
137144138145## OAuth Security Notes
139146140140-I've tried to ensure that this codebase is not vulnerable to the following:
147147+Security measures implemented:
141148142149- Constant-time comparison for signature verification (prevents timing attacks)
143150- HMAC-SHA256 for access token signing with configurable secret
···151158- All database queries use parameterized statements (no SQL injection)
152159- Deactivated/taken-down accounts blocked from OAuth authorization
153160- Client ID validation on token exchange (defense-in-depth against cross-client attacks)
161161+- HTML escaping in OAuth templates (XSS prevention)
154162155163### Auth Notes
156156-- Algorithm choice: Using ES256K (secp256k1 ECDSA) with per-user keys. Ref PDS uses HS256 (HMAC) with single server key. Our approach provides better key isolation but differs from reference implementation.
157157- - [ ] Support the ref PDS HS256 system too.
158158-- Token storage: Now storing only token JTIs in session_tokens table (defense in depth against DB breaches). Refresh token family tracking enables detection of token reuse attacks.
159159-- Key encryption: User signing keys encrypted at rest using AES-256-GCM with keys derived via HKDF from MASTER_KEY environment variable. Migration-safe: supports both encrypted (version 1) and plaintext (version 0) keys.
164164+- Dual algorithm support: ES256K (secp256k1 ECDSA) with per-user keys AND HS256 (HMAC) for compatibility with reference PDS.
165165+- Token storage: Storing only token JTIs in session_tokens table (defense in depth against DB breaches). Refresh token family tracking enables detection of token reuse attacks.
166166+- Key encryption: User signing keys encrypted at rest using AES-256-GCM with keys derived via HKDF from KEY_ENCRYPTION_KEY environment variable.
160167161168## PDS-Level App Endpoints
162169These endpoints need to be implemented at the PDS level (not just proxied to appview).
···178185### Notification (`app.bsky.notification`)
179186- [x] Implement `app.bsky.notification.registerPush` (push notification registration, proxied).
180187181181-## Deprecated Sync Endpoints (for compatibility)
182182-- [ ] Implement `com.atproto.sync.getCheckout` (deprecated, still needed for compatibility).
183183-- [ ] Implement `com.atproto.sync.getHead` (deprecated, still needed for compatibility).
184184-185185-## Misc HTTP Endpoints
186186-- [ ] Implement `/robots.txt` endpoint.
187187-188188-## Record Schema Validation
189189-- [ ] Handle this generically.
190190-191191-## Preference Storage
192192-User preferences (for app.bsky.actor.getPreferences/putPreferences):
193193-- [x] Create preferences table for storing user app preferences.
194194-- [x] Implement `app.bsky.actor.getPreferences` handler (read from postgres, proxy fallback).
195195-- [x] Implement `app.bsky.actor.putPreferences` handler (write to postgres).
196196-197188## Infrastructure & Core Components
198189- [x] Sequencer (Event Log)
199190 - [x] Implement a `Sequencer` (backed by `repo_seq` table).
···206197 - [x] Manage Repo Root in `repos` table.
207198 - [x] Implement Atomic Repo Transactions.
208199 - [x] Ensure `blocks` write, `repo_root` update, `records` index update, and `sequencer` event are committed in a single transaction.
209209- - [ ] Implement concurrency control (row-level locking on `repos` table) to prevent concurrent writes to the same repo.
200200+ - [x] Implement concurrency control (row-level locking via FOR UPDATE).
210201- [ ] DID Cache
211202 - [ ] Implement caching layer for DID resolution (Redis or in-memory).
212203 - [ ] Handle cache invalidation/expiry.
213213-- [ ] Background Jobs
214214- - [ ] Implement `Crawlers` service (debounce notifications to relays).
204204+- [x] Crawlers Service
205205+ - [x] Implement `Crawlers` service (debounce notifications to relays).
206206+ - [x] 20-minute notification debounce.
207207+ - [x] Circuit breaker for relay failures.
215208- [x] Notification Service
216209 - [x] Queue-based notification system with database table
217210 - [x] Background worker polling for pending notifications
218211 - [x] Extensible sender trait for multiple channels
219212 - [x] Email sender via OS sendmail/msmtp
220220- - [ ] Discord bot sender
221221- - [ ] Telegram bot sender
222222- - [ ] Signal bot sender
213213+ - [x] Discord webhook sender
214214+ - [x] Telegram bot sender
215215+ - [x] Signal CLI sender
223216 - [x] Helper functions for common notification types (welcome, password reset, email verification, etc.)
224217 - [x] Respect user's `preferred_notification_channel` setting for non-email-specific notifications
225225-- [ ] Image Processing
226226- - [ ] Implement image resize/formatting pipeline (for blob uploads).
218218+- [x] Image Processing
219219+ - [x] Implement image resize/formatting pipeline (for blob uploads).
220220+ - [x] WebP conversion for thumbnails.
221221+ - [x] EXIF stripping.
222222+ - [x] File size limits (10MB default).
227223- [x] IPLD & MST
228224 - [x] Implement Merkle Search Tree logic for repo signing.
229225 - [x] Implement CAR (Content Addressable Archive) encoding/decoding.
230230-- [ ] Validation
231231- - [ ] DID PLC Operations (Sign rotation keys).
232232-- [ ] Fix any remaining TODOs in the code, everywhere, full stop.
226226+ - [x] Cycle detection in CAR export.
227227+- [x] Rate Limiting
228228+ - [x] Per-IP rate limiting on login (10/min).
229229+ - [x] Per-IP rate limiting on OAuth token endpoint (30/min).
230230+ - [x] Per-IP rate limiting on password reset (5/hour).
231231+ - [x] Per-IP rate limiting on account creation (10/hour).
232232+- [x] Circuit Breakers
233233+ - [x] PLC directory circuit breaker (5 failures → open, 60s timeout).
234234+ - [x] Relay notification circuit breaker (10 failures → open, 30s timeout).
235235+- [x] Security Hardening
236236+ - [x] Email header injection prevention (CRLF sanitization).
237237+ - [x] Signal command injection prevention (phone number validation).
238238+ - [x] Constant-time signature comparison.
239239+ - [x] SSRF protection for outbound requests.
233240234234-## Web Management UI
241241+## Lewis' fabulous mini-list of remaining TODOs
242242+- [ ] DID resolution caching (valkey).
243243+- [ ] Record schema validation (generic validation framework).
244244+- [ ] Fix any remaining TODOs in the code.
245245+246246+## Future: Web Management UI
235247A single-page web app for account management. The frontend (JS framework) calls existing ATProto XRPC endpoints - no server-side rendering or bespoke HTML form handlers.
236248237249### Architecture
+16
migrations/202512211700_add_2fa.sql
···11+ALTER TABLE users ADD COLUMN two_factor_enabled BOOLEAN NOT NULL DEFAULT FALSE;
22+33+ALTER TYPE notification_type ADD VALUE 'two_factor_code';
44+55+CREATE TABLE oauth_2fa_challenge (
66+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
77+ did TEXT NOT NULL REFERENCES users(did) ON DELETE CASCADE,
88+ request_uri TEXT NOT NULL,
99+ code TEXT NOT NULL,
1010+ attempts INTEGER NOT NULL DEFAULT 0,
1111+ created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1212+ expires_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + INTERVAL '10 minutes'
1313+);
1414+1515+CREATE INDEX idx_oauth_2fa_challenge_request_uri ON oauth_2fa_challenge(request_uri);
1616+CREATE INDEX idx_oauth_2fa_challenge_expires ON oauth_2fa_challenge(expires_at);
···1010pub mod read_after_write;
1111pub mod repo;
1212pub mod server;
1313+pub mod temp;
1314pub mod validation;
14151516pub use error::ApiError;
+46-46
src/api/repo/record/read.rs
···167167168168 let limit = input.limit.unwrap_or(50).clamp(1, 100);
169169 let reverse = input.reverse.unwrap_or(false);
170170-171171- // Simplistic query construction - no sophisticated cursor handling or rkey ranges for now, just basic pagination
172172- // TODO: Implement rkeyStart/End and correct cursor logic
173173-174170 let limit_i64 = limit as i64;
175175- let rows_res = if let Some(cursor) = &input.cursor {
176176- if reverse {
177177- sqlx::query!(
178178- "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey < $3 ORDER BY rkey DESC LIMIT $4",
179179- user_id,
180180- input.collection,
181181- cursor,
182182- limit_i64
183183- )
184184- .fetch_all(&state.db)
185185- .await
186186- .map(|rows| rows.into_iter().map(|r| (r.rkey, r.record_cid)).collect::<Vec<_>>())
187187- } else {
188188- sqlx::query!(
189189- "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey > $3 ORDER BY rkey ASC LIMIT $4",
190190- user_id,
191191- input.collection,
192192- cursor,
193193- limit_i64
194194- )
171171+ let order = if reverse { "ASC" } else { "DESC" };
172172+173173+ let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
174174+ let comparator = if reverse { ">" } else { "<" };
175175+ let query = format!(
176176+ "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
177177+ comparator, order
178178+ );
179179+ sqlx::query_as(&query)
180180+ .bind(user_id)
181181+ .bind(&input.collection)
182182+ .bind(cursor)
183183+ .bind(limit_i64)
195184 .fetch_all(&state.db)
196185 .await
197197- .map(|rows| rows.into_iter().map(|r| (r.rkey, r.record_cid)).collect::<Vec<_>>())
198198- }
199186 } else {
200200- if reverse {
201201- sqlx::query!(
202202- "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 ORDER BY rkey DESC LIMIT $3",
203203- user_id,
204204- input.collection,
205205- limit_i64
206206- )
207207- .fetch_all(&state.db)
208208- .await
209209- .map(|rows| rows.into_iter().map(|r| (r.rkey, r.record_cid)).collect::<Vec<_>>())
210210- } else {
211211- sqlx::query!(
212212- "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 ORDER BY rkey ASC LIMIT $3",
213213- user_id,
214214- input.collection,
215215- limit_i64
216216- )
217217- .fetch_all(&state.db)
218218- .await
219219- .map(|rows| rows.into_iter().map(|r| (r.rkey, r.record_cid)).collect::<Vec<_>>())
187187+ let mut conditions = vec!["repo_id = $1", "collection = $2"];
188188+ let mut param_idx = 3;
189189+190190+ if input.rkey_start.is_some() {
191191+ conditions.push("rkey > $3");
192192+ param_idx += 1;
220193 }
194194+195195+ if input.rkey_end.is_some() {
196196+ conditions.push(if param_idx == 3 { "rkey < $3" } else { "rkey < $4" });
197197+ param_idx += 1;
198198+ }
199199+200200+ let limit_idx = param_idx;
201201+202202+ let query = format!(
203203+ "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
204204+ conditions.join(" AND "),
205205+ order,
206206+ limit_idx
207207+ );
208208+209209+ let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
210210+ .bind(user_id)
211211+ .bind(&input.collection);
212212+213213+ if let Some(start) = &input.rkey_start {
214214+ query_builder = query_builder.bind(start);
215215+ }
216216+ if let Some(end) = &input.rkey_end {
217217+ query_builder = query_builder.bind(end);
218218+ }
219219+220220+ query_builder.bind(limit_i64).fetch_all(&state.db).await
221221 };
222222223223 let rows = match rows_res {
+28
src/api/repo/record/utils.rs
···5858 let mut tx = state.db.begin().await
5959 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
60606161+ let lock_result = sqlx::query!(
6262+ "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
6363+ user_id
6464+ )
6565+ .fetch_optional(&mut *tx)
6666+ .await;
6767+6868+ match lock_result {
6969+ Err(e) => {
7070+ if let Some(db_err) = e.as_database_error() {
7171+ if db_err.code().as_deref() == Some("55P03") {
7272+ return Err("ConcurrentModification: Another request is modifying this repo".to_string());
7373+ }
7474+ }
7575+ return Err(format!("Failed to acquire repo lock: {}", e));
7676+ }
7777+ Ok(Some(row)) => {
7878+ if let Some(expected_root) = ¤t_root_cid {
7979+ if row.repo_root_cid != expected_root.to_string() {
8080+ return Err("ConcurrentModification: Repo has been modified since last read".to_string());
8181+ }
8282+ }
8383+ }
8484+ Ok(None) => {
8585+ return Err("Repo not found".to_string());
8686+ }
8787+ }
8888+6189 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id)
6290 .execute(&mut *tx)
6391 .await
+8
src/api/server/meta.rs
···4455use tracing::error;
6677+pub async fn robots_txt() -> impl IntoResponse {
88+ (
99+ StatusCode::OK,
1010+ [("content-type", "text/plain")],
1111+ "# Hello!\n\n# Crawling the public API is allowed\nUser-agent: *\nAllow: /\n",
1212+ )
1313+}
1414+715pub async fn describe_server() -> impl IntoResponse {
816 let domains_str =
917 std::env::var("AVAILABLE_USER_DOMAINS").unwrap_or_else(|_| "example.com".to_string());
+1-1
src/api/server/mod.rs
···1515pub use app_password::{create_app_password, list_app_passwords, revoke_app_password};
1616pub use email::{confirm_email, request_email_update, update_email};
1717pub use invite::{create_invite_code, create_invite_codes, get_account_invite_codes};
1818-pub use meta::{describe_server, health};
1818+pub use meta::{describe_server, health, robots_txt};
1919pub use password::{request_password_reset, reset_password};
2020pub use service_auth::get_service_auth;
2121pub use session::{create_session, delete_session, get_session, refresh_session};
···1919};
2020pub use types::{TokenRequest, TokenResponse};
21212222+fn extract_client_ip(headers: &HeaderMap) -> String {
2323+ if let Some(forwarded) = headers.get("x-forwarded-for") {
2424+ if let Ok(value) = forwarded.to_str() {
2525+ if let Some(first_ip) = value.split(',').next() {
2626+ return first_ip.trim().to_string();
2727+ }
2828+ }
2929+ }
3030+ if let Some(real_ip) = headers.get("x-real-ip") {
3131+ if let Ok(value) = real_ip.to_str() {
3232+ return value.trim().to_string();
3333+ }
3434+ }
3535+ "unknown".to_string()
3636+}
3737+2238pub async fn token_endpoint(
2339 State(state): State<AppState>,
2440 headers: HeaderMap,
2541 Form(request): Form<TokenRequest>,
2642) -> Result<(HeaderMap, Json<TokenResponse>), OAuthError> {
4343+ let client_ip = extract_client_ip(&headers);
4444+ if state.rate_limiters.oauth_token.check_key(&client_ip).is_err() {
4545+ tracing::warn!(ip = %client_ip, "OAuth token rate limit exceeded");
4646+ return Err(OAuthError::InvalidRequest(
4747+ "Too many requests. Please try again later.".to_string(),
4848+ ));
4949+ }
5050+2751 let dpop_proof = headers
2852 .get("DPoP")
2953 .and_then(|v| v.to_str().ok())
+2
src/oauth/mod.rs
···55pub mod client;
66pub mod endpoints;
77pub mod error;
88+pub mod templates;
89pub mod verify;
9101011pub use types::*;
1112pub use error::OAuthError;
1213pub use verify::{verify_oauth_access_token, generate_dpop_nonce, VerifyResult, OAuthUser, OAuthAuthError};
1414+pub use templates::{DeviceAccount, mask_email};
···11+use crate::state::AppState;
22+use crate::sync::car::encode_car_header;
33+use axum::{
44+ Json,
55+ extract::{Query, State},
66+ http::StatusCode,
77+ response::{IntoResponse, Response},
88+};
99+use cid::Cid;
1010+use ipld_core::ipld::Ipld;
1111+use jacquard_repo::storage::BlockStore;
1212+use serde::{Deserialize, Serialize};
1313+use serde_json::json;
1414+use std::io::Write;
1515+use std::str::FromStr;
1616+use tracing::error;
1717+1818+const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000;
1919+2020+#[derive(Deserialize)]
2121+pub struct GetHeadParams {
2222+ pub did: String,
2323+}
2424+2525+#[derive(Serialize)]
2626+pub struct GetHeadOutput {
2727+ pub root: String,
2828+}
2929+3030+pub async fn get_head(
3131+ State(state): State<AppState>,
3232+ Query(params): Query<GetHeadParams>,
3333+) -> Response {
3434+ let did = params.did.trim();
3535+3636+ if did.is_empty() {
3737+ return (
3838+ StatusCode::BAD_REQUEST,
3939+ Json(json!({"error": "InvalidRequest", "message": "did is required"})),
4040+ )
4141+ .into_response();
4242+ }
4343+4444+ let result = sqlx::query!(
4545+ r#"
4646+ SELECT r.repo_root_cid
4747+ FROM repos r
4848+ JOIN users u ON r.user_id = u.id
4949+ WHERE u.did = $1
5050+ "#,
5151+ did
5252+ )
5353+ .fetch_optional(&state.db)
5454+ .await;
5555+5656+ match result {
5757+ Ok(Some(row)) => (StatusCode::OK, Json(GetHeadOutput { root: row.repo_root_cid })).into_response(),
5858+ Ok(None) => (
5959+ StatusCode::BAD_REQUEST,
6060+ Json(json!({"error": "HeadNotFound", "message": "Could not find root for DID"})),
6161+ )
6262+ .into_response(),
6363+ Err(e) => {
6464+ error!("DB error in get_head: {:?}", e);
6565+ (
6666+ StatusCode::INTERNAL_SERVER_ERROR,
6767+ Json(json!({"error": "InternalError"})),
6868+ )
6969+ .into_response()
7070+ }
7171+ }
7272+}
7373+7474+#[derive(Deserialize)]
7575+pub struct GetCheckoutParams {
7676+ pub did: String,
7777+}
7878+7979+pub async fn get_checkout(
8080+ State(state): State<AppState>,
8181+ Query(params): Query<GetCheckoutParams>,
8282+) -> Response {
8383+ let did = params.did.trim();
8484+8585+ if did.is_empty() {
8686+ return (
8787+ StatusCode::BAD_REQUEST,
8888+ Json(json!({"error": "InvalidRequest", "message": "did is required"})),
8989+ )
9090+ .into_response();
9191+ }
9292+9393+ let repo_row = sqlx::query!(
9494+ r#"
9595+ SELECT r.repo_root_cid
9696+ FROM repos r
9797+ JOIN users u ON u.id = r.user_id
9898+ WHERE u.did = $1
9999+ "#,
100100+ did
101101+ )
102102+ .fetch_optional(&state.db)
103103+ .await
104104+ .unwrap_or(None);
105105+106106+ let head_str = match repo_row {
107107+ Some(r) => r.repo_root_cid,
108108+ None => {
109109+ let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
110110+ .fetch_optional(&state.db)
111111+ .await
112112+ .unwrap_or(None);
113113+114114+ if user_exists.is_none() {
115115+ return (
116116+ StatusCode::NOT_FOUND,
117117+ Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
118118+ )
119119+ .into_response();
120120+ } else {
121121+ return (
122122+ StatusCode::NOT_FOUND,
123123+ Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})),
124124+ )
125125+ .into_response();
126126+ }
127127+ }
128128+ };
129129+130130+ let head_cid = match Cid::from_str(&head_str) {
131131+ Ok(c) => c,
132132+ Err(_) => {
133133+ return (
134134+ StatusCode::INTERNAL_SERVER_ERROR,
135135+ Json(json!({"error": "InternalError", "message": "Invalid head CID"})),
136136+ )
137137+ .into_response();
138138+ }
139139+ };
140140+141141+ let mut car_bytes = match encode_car_header(&head_cid) {
142142+ Ok(h) => h,
143143+ Err(e) => {
144144+ return (
145145+ StatusCode::INTERNAL_SERVER_ERROR,
146146+ Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})),
147147+ )
148148+ .into_response();
149149+ }
150150+ };
151151+152152+ let mut stack = vec![head_cid];
153153+ let mut visited = std::collections::HashSet::new();
154154+ let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL;
155155+156156+ while let Some(cid) = stack.pop() {
157157+ if visited.contains(&cid) {
158158+ continue;
159159+ }
160160+ visited.insert(cid);
161161+ if remaining == 0 {
162162+ break;
163163+ }
164164+ remaining -= 1;
165165+166166+ if let Ok(Some(block)) = state.block_store.get(&cid).await {
167167+ let cid_bytes = cid.to_bytes();
168168+ let total_len = cid_bytes.len() + block.len();
169169+ let mut writer = Vec::new();
170170+ crate::sync::car::write_varint(&mut writer, total_len as u64)
171171+ .expect("Writing to Vec<u8> should never fail");
172172+ writer.write_all(&cid_bytes)
173173+ .expect("Writing to Vec<u8> should never fail");
174174+ writer.write_all(&block)
175175+ .expect("Writing to Vec<u8> should never fail");
176176+ car_bytes.extend_from_slice(&writer);
177177+178178+ if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
179179+ extract_links_ipld(&value, &mut stack);
180180+ }
181181+ }
182182+ }
183183+184184+ (
185185+ StatusCode::OK,
186186+ [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
187187+ car_bytes,
188188+ )
189189+ .into_response()
190190+}
191191+192192+fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) {
193193+ match value {
194194+ Ipld::Link(cid) => {
195195+ stack.push(*cid);
196196+ }
197197+ Ipld::Map(map) => {
198198+ for v in map.values() {
199199+ extract_links_ipld(v, stack);
200200+ }
201201+ }
202202+ Ipld::List(arr) => {
203203+ for v in arr {
204204+ extract_links_ipld(v, stack);
205205+ }
206206+ }
207207+ _ => {}
208208+ }
209209+}
+3-2
src/sync/mod.rs
···22pub mod car;
33pub mod commit;
44pub mod crawl;
55+pub mod deprecated;
56pub mod firehose;
67pub mod frame;
78pub mod import;
89pub mod listener;
99-pub mod relay_client;
1010pub mod repo;
1111pub mod subscribe_repos;
1212pub mod util;
···1515pub use blob::{get_blob, list_blobs};
1616pub use commit::{get_latest_commit, get_repo_status, list_repos};
1717pub use crawl::{notify_of_update, request_crawl};
1818-pub use repo::{get_blocks, get_repo, get_record};
1818+pub use deprecated::{get_checkout, get_head};
1919+pub use repo::{get_blocks, get_record, get_repo};
1920pub use subscribe_repos::subscribe_repos;
2021pub use verify::{CarVerifier, VerifiedCar, VerifyError};