+1
-1
.gitignore
+1
-1
.gitignore
+4
-1
Cargo.lock
+4
-1
Cargo.lock
···
450
dependencies = [
451
"anyhow",
452
"axum",
453
"bcrypt",
454
"bytes",
455
"chrono",
···
459
"jacquard-axum",
460
"jacquard-repo",
461
"jsonwebtoken",
462
"multihash",
463
"reqwest",
464
"serde",
465
"serde_ipld_dagcbor",
···
973
checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976"
974
dependencies = [
975
"data-encoding",
976
-
"syn 2.0.111",
977
]
978
979
[[package]]
···
450
dependencies = [
451
"anyhow",
452
"axum",
453
+
"base64 0.22.1",
454
"bcrypt",
455
"bytes",
456
"chrono",
···
460
"jacquard-axum",
461
"jacquard-repo",
462
"jsonwebtoken",
463
+
"k256",
464
"multihash",
465
+
"rand 0.8.5",
466
"reqwest",
467
"serde",
468
"serde_ipld_dagcbor",
···
976
checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976"
977
dependencies = [
978
"data-encoding",
979
+
"syn 1.0.109",
980
]
981
982
[[package]]
+3
Cargo.toml
+3
Cargo.toml
···
6
[dependencies]
7
anyhow = "1.0.100"
8
axum = "0.8.7"
9
bcrypt = "0.17.1"
10
bytes = "1.11.0"
11
chrono = { version = "0.4.42", features = ["serde"] }
···
15
jacquard-axum = "0.9.2"
16
jacquard-repo = "0.9.2"
17
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
18
multihash = "0.19.3"
19
reqwest = { version = "0.12.24", features = ["json"] }
20
serde = { version = "1.0.228", features = ["derive"] }
21
serde_ipld_dagcbor = "0.6.4"
···
6
[dependencies]
7
anyhow = "1.0.100"
8
axum = "0.8.7"
9
+
base64 = "0.22.1"
10
bcrypt = "0.17.1"
11
bytes = "1.11.0"
12
chrono = { version = "0.4.42", features = ["serde"] }
···
16
jacquard-axum = "0.9.2"
17
jacquard-repo = "0.9.2"
18
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
19
+
k256 = { version = "0.13.3", features = ["ecdsa", "pem", "pkcs8"] }
20
multihash = "0.19.3"
21
+
rand = "0.8.5"
22
reqwest = { version = "0.12.24", features = ["json"] }
23
serde = { version = "1.0.228", features = ["derive"] }
24
serde_ipld_dagcbor = "0.6.4"
+75
-9
TODO.md
+75
-9
TODO.md
···
2
3
Lewis' corrected big boy todofile
4
5
-
## 1. Server Infrastructure & Proxying
6
- [x] Health Check
7
- [x] Implement `GET /health` endpoint (returns "OK").
8
- [x] Server Description
···
11
- [x] Implement strict forwarding for all `app.bsky.*` and `chat.bsky.*` requests to an appview.
12
- [x] Forward Auth headers correctly.
13
- [x] Handle AppView errors/timeouts gracefully.
14
15
-
## 2. Authentication & Account Management (`com.atproto.server`)
16
- [x] Account Creation
17
- [x] Implement `com.atproto.server.createAccount`.
18
- [x] Validate handle format (reject invalid characters).
···
25
- [x] Implement `com.atproto.server.getSession`.
26
- [x] Implement `com.atproto.server.refreshSession`.
27
- [x] Implement `com.atproto.server.deleteSession` (Logout).
28
29
-
## 3. Repository Operations (`com.atproto.repo`)
30
- [ ] Record CRUD
31
- [ ] Implement `com.atproto.repo.createRecord`.
32
- [ ] Validate schema against Lexicon (just structure, not complex logic).
···
38
- [ ] Implement `com.atproto.repo.deleteRecord`.
39
- [ ] Implement `com.atproto.repo.listRecords`.
40
- [ ] Implement `com.atproto.repo.describeRepo`.
41
- [ ] Blob Management
42
- [ ] Implement `com.atproto.repo.uploadBlob`.
43
- [ ] Store blob (S3).
44
- [ ] return `blob` ref (CID + MimeType).
45
46
-
## 4. Sync & Federation (`com.atproto.sync`)
47
- [ ] The Firehose (WebSocket)
48
- [ ] Implement `com.atproto.sync.subscribeRepos`.
49
- [ ] Broadcast real-time commit events.
···
53
- [ ] Implement `com.atproto.sync.getBlocks` (Return specific blocks via CIDs).
54
- [ ] Implement `com.atproto.sync.getLatestCommit`.
55
- [ ] Implement `com.atproto.sync.getRecord` (Sync version, distinct from repo.getRecord).
56
- [ ] Blob Sync
57
- [ ] Implement `com.atproto.sync.getBlob`.
58
- [ ] Implement `com.atproto.sync.listBlobs`.
59
- [ ] Crawler Interaction
60
- [ ] Implement `com.atproto.sync.requestCrawl` (Notify relays to index us).
61
62
-
## 5. Identity (`com.atproto.identity`)
63
- [ ] Resolution
64
- [ ] Implement `com.atproto.identity.resolveHandle` (Can be internal or proxy to PLC).
65
- [ ] Implement `/.well-known/did.json` (Depends on supporting did:web).
66
67
-
## 6. Record Schema Validation
68
- [ ] `app.bsky.feed.post`
69
- [ ] `app.bsky.feed.like`
70
- [ ] `app.bsky.feed.repost`
···
73
- [ ] `app.bsky.actor.profile`
74
- [ ] Other app(view) validation too!!!
75
76
-
## 7. General Requirements
77
- [ ] IPLD & MST
78
-
- [ ] Implement Merkle Search Tree (MST) logic for repo signing.
79
-
- [ ] Implement CAR (Content Addressable Archives) encoding/decoding.
80
- [ ] Validation
81
- [ ] DID PLC Operations (Sign rotation keys).
···
2
3
Lewis' corrected big boy todofile
4
5
+
## Server Infrastructure & Proxying
6
- [x] Health Check
7
- [x] Implement `GET /health` endpoint (returns "OK").
8
- [x] Server Description
···
11
- [x] Implement strict forwarding for all `app.bsky.*` and `chat.bsky.*` requests to an appview.
12
- [x] Forward Auth headers correctly.
13
- [x] Handle AppView errors/timeouts gracefully.
14
+
- [ ] Implement Read-After-Write (RAW) consistency (Local Overlay) for proxied requests (merge local unindexed records).
15
16
+
## Authentication & Account Management (`com.atproto.server`)
17
- [x] Account Creation
18
- [x] Implement `com.atproto.server.createAccount`.
19
- [x] Validate handle format (reject invalid characters).
···
26
- [x] Implement `com.atproto.server.getSession`.
27
- [x] Implement `com.atproto.server.refreshSession`.
28
- [x] Implement `com.atproto.server.deleteSession` (Logout).
29
+
- [ ] Implement `com.atproto.server.activateAccount`.
30
+
- [ ] Implement `com.atproto.server.checkAccountStatus`.
31
+
- [ ] Implement `com.atproto.server.confirmEmail`.
32
+
- [ ] Implement `com.atproto.server.createAppPassword`.
33
+
- [ ] Implement `com.atproto.server.createInviteCode`.
34
+
- [ ] Implement `com.atproto.server.createInviteCodes`.
35
+
- [ ] Implement `com.atproto.server.deactivateAccount` / `deleteAccount`.
36
+
- [ ] Implement `com.atproto.server.getAccountInviteCodes`.
37
+
- [ ] Implement `com.atproto.server.getServiceAuth` (Cross-service auth).
38
+
- [ ] Implement `com.atproto.server.listAppPasswords`.
39
+
- [ ] Implement `com.atproto.server.requestAccountDelete`.
40
+
- [ ] Implement `com.atproto.server.requestEmailConfirmation` / `requestEmailUpdate`.
41
+
- [ ] Implement `com.atproto.server.requestPasswordReset` / `resetPassword`.
42
+
- [ ] Implement `com.atproto.server.reserveSigningKey`.
43
+
- [ ] Implement `com.atproto.server.revokeAppPassword`.
44
+
- [ ] Implement `com.atproto.server.updateEmail`.
45
46
+
## Repository Operations (`com.atproto.repo`)
47
- [ ] Record CRUD
48
- [ ] Implement `com.atproto.repo.createRecord`.
49
- [ ] Validate schema against Lexicon (just structure, not complex logic).
···
55
- [ ] Implement `com.atproto.repo.deleteRecord`.
56
- [ ] Implement `com.atproto.repo.listRecords`.
57
- [ ] Implement `com.atproto.repo.describeRepo`.
58
+
- [ ] Implement `com.atproto.repo.applyWrites` (Batch writes).
59
+
- [ ] Implement `com.atproto.repo.importRepo` (Migration).
60
+
- [ ] Implement `com.atproto.repo.listMissingBlobs`.
61
- [ ] Blob Management
62
- [ ] Implement `com.atproto.repo.uploadBlob`.
63
- [ ] Store blob (S3).
64
- [ ] return `blob` ref (CID + MimeType).
65
66
+
## Sync & Federation (`com.atproto.sync`)
67
- [ ] The Firehose (WebSocket)
68
- [ ] Implement `com.atproto.sync.subscribeRepos`.
69
- [ ] Broadcast real-time commit events.
···
73
- [ ] Implement `com.atproto.sync.getBlocks` (Return specific blocks via CIDs).
74
- [ ] Implement `com.atproto.sync.getLatestCommit`.
75
- [ ] Implement `com.atproto.sync.getRecord` (Sync version, distinct from repo.getRecord).
76
+
- [ ] Implement `com.atproto.sync.getRepoStatus`.
77
+
- [ ] Implement `com.atproto.sync.listRepos`.
78
+
- [ ] Implement `com.atproto.sync.notifyOfUpdate`.
79
- [ ] Blob Sync
80
- [ ] Implement `com.atproto.sync.getBlob`.
81
- [ ] Implement `com.atproto.sync.listBlobs`.
82
- [ ] Crawler Interaction
83
- [ ] Implement `com.atproto.sync.requestCrawl` (Notify relays to index us).
84
85
+
## Identity (`com.atproto.identity`)
86
- [ ] Resolution
87
- [ ] Implement `com.atproto.identity.resolveHandle` (Can be internal or proxy to PLC).
88
+
- [ ] Implement `com.atproto.identity.updateHandle`.
89
+
- [ ] Implement `com.atproto.identity.submitPlcOperation` / `signPlcOperation` / `requestPlcOperationSignature`.
90
+
- [ ] Implement `com.atproto.identity.getRecommendedDidCredentials`.
91
- [ ] Implement `/.well-known/did.json` (Depends on supporting did:web).
92
93
+
## Admin Management (`com.atproto.admin`)
94
+
- [ ] Implement `com.atproto.admin.deleteAccount`.
95
+
- [ ] Implement `com.atproto.admin.disableAccountInvites`.
96
+
- [ ] Implement `com.atproto.admin.disableInviteCodes`.
97
+
- [ ] Implement `com.atproto.admin.enableAccountInvites`.
98
+
- [ ] Implement `com.atproto.admin.getAccountInfo` / `getAccountInfos`.
99
+
- [ ] Implement `com.atproto.admin.getInviteCodes`.
100
+
- [ ] Implement `com.atproto.admin.getSubjectStatus`.
101
+
- [ ] Implement `com.atproto.admin.sendEmail`.
102
+
- [ ] Implement `com.atproto.admin.updateAccountEmail`.
103
+
- [ ] Implement `com.atproto.admin.updateAccountHandle`.
104
+
- [ ] Implement `com.atproto.admin.updateAccountPassword`.
105
+
- [ ] Implement `com.atproto.admin.updateSubjectStatus`.
106
+
107
+
## Moderation (`com.atproto.moderation`)
108
+
- [ ] Implement `com.atproto.moderation.createReport`.
109
+
110
+
## Record Schema Validation
111
- [ ] `app.bsky.feed.post`
112
- [ ] `app.bsky.feed.like`
113
- [ ] `app.bsky.feed.repost`
···
116
- [ ] `app.bsky.actor.profile`
117
- [ ] Other app(view) validation too!!!
118
119
+
## Infrastructure & Core Components
120
+
- [ ] Sequencer (Event Log)
121
+
- [ ] Implement a `Sequencer` (backed by `repo_seq` table? Like in ref impl).
122
+
- [ ] Implement event formatting (`commit`, `handle`, `identity`, `account`).
123
+
- [ ] Implement database polling / event emission mechanism.
124
+
- [ ] Implement cursor-based event replay (`requestSeqRange`).
125
+
- [ ] Repo Storage & Consistency (in postgres)
126
+
- [ ] Implement `RepoStorage` for postgres (replaces per-user SQLite).
127
+
- [ ] Read/Write IPLD blocks to `blocks` table (global deduplication).
128
+
- [ ] Manage Repo Root in `repos` table.
129
+
- [ ] Implement Atomic Repo Transactions.
130
+
- [ ] Ensure `blocks` write, `repo_root` update, `records` index update, and `sequencer` event are committed in a single transaction.
131
+
- [ ] Implement concurrency control (row-level locking on `repos` table) to prevent concurrent writes to the same repo.
132
+
- [ ] DID Cache
133
+
- [ ] Implement caching layer for DID resolution (Redis or in-memory).
134
+
- [ ] Handle cache invalidation/expiry.
135
+
- [ ] Background Jobs
136
+
- [ ] Implement background queue for async tasks (crawler notifications, discord/telegram 2FA sending instead of email).
137
+
- [ ] Implement `Crawlers` service (debounce notifications to relays).
138
+
- [ ] Mailer equivalent
139
+
- [ ] Implement code/notification sending service as a replacement for the mailer because there's no way I'm starting with email. :D
140
+
- [ ] Image Processing
141
+
- [ ] Implement image resize/formatting pipeline (for blob uploads).
142
- [ ] IPLD & MST
143
+
- [ ] Implement Merkle Search Tree logic for repo signing.
144
+
- [ ] Implement CAR (Content Addressable Archive) encoding/decoding.
145
- [ ] Validation
146
- [ ] DID PLC Operations (Sign rotation keys).
147
+
+1
justfile
+1
justfile
+11
ref_pds_downloader.sh
+11
ref_pds_downloader.sh
···
+36
-4
src/api/proxy.rs
+36
-4
src/api/proxy.rs
···
1
use axum::{
2
-
extract::{Path, Query},
3
http::{HeaderMap, Method, StatusCode},
4
response::{IntoResponse, Response},
5
body::Bytes,
···
7
use reqwest::Client;
8
use tracing::{info, error};
9
use std::collections::HashMap;
10
11
pub async fn proxy_handler(
12
Path(method): Path<String>,
13
method_verb: Method,
14
headers: HeaderMap,
···
20
.and_then(|h| h.to_str().ok())
21
.map(|s| s.to_string());
22
23
-
let appview_url = match proxy_header {
24
-
Some(url) => url,
25
None => match std::env::var("APPVIEW_URL") {
26
Ok(url) => url,
27
Err(_) => return (StatusCode::BAD_GATEWAY, "No upstream AppView configured").into_response(),
···
38
.request(method_verb, &target_url)
39
.query(¶ms);
40
41
for (key, value) in headers.iter() {
42
-
if key != "host" && key != "content-length" {
43
request_builder = request_builder.header(key, value);
44
}
45
}
···
1
use axum::{
2
+
extract::{Path, Query, State},
3
http::{HeaderMap, Method, StatusCode},
4
response::{IntoResponse, Response},
5
body::Bytes,
···
7
use reqwest::Client;
8
use tracing::{info, error};
9
use std::collections::HashMap;
10
+
use crate::state::AppState;
11
+
use sqlx::Row;
12
13
pub async fn proxy_handler(
14
+
State(state): State<AppState>,
15
Path(method): Path<String>,
16
method_verb: Method,
17
headers: HeaderMap,
···
23
.and_then(|h| h.to_str().ok())
24
.map(|s| s.to_string());
25
26
+
let appview_url = match &proxy_header {
27
+
Some(url) => url.clone(),
28
None => match std::env::var("APPVIEW_URL") {
29
Ok(url) => url,
30
Err(_) => return (StatusCode::BAD_GATEWAY, "No upstream AppView configured").into_response(),
···
41
.request(method_verb, &target_url)
42
.query(¶ms);
43
44
+
let mut auth_header_val = headers.get("Authorization").map(|h| h.clone());
45
+
46
+
if let Some(aud) = &proxy_header {
47
+
if let Some(auth_val) = &auth_header_val {
48
+
if let Ok(token) = auth_val.to_str() {
49
+
let token = token.replace("Bearer ", "");
50
+
if let Ok(did) = crate::auth::get_did_from_token(&token) {
51
+
let key_row = sqlx::query("SELECT k.key_bytes FROM user_keys k JOIN users u ON k.user_id = u.id WHERE u.did = $1")
52
+
.bind(&did)
53
+
.fetch_optional(&state.db)
54
+
.await;
55
+
56
+
if let Ok(Some(row)) = key_row {
57
+
let key_bytes: Vec<u8> = row.get("key_bytes");
58
+
if let Ok(new_token) = crate::auth::create_service_token(&did, aud, &method, &key_bytes) {
59
+
if let Ok(val) = axum::http::HeaderValue::from_str(&format!("Bearer {}", new_token)) {
60
+
auth_header_val = Some(val);
61
+
}
62
+
}
63
+
}
64
+
}
65
+
}
66
+
}
67
+
}
68
+
69
+
if let Some(val) = auth_header_val {
70
+
request_builder = request_builder.header("Authorization", val);
71
+
}
72
+
73
for (key, value) in headers.iter() {
74
+
if key != "host" && key != "content-length" && key != "authorization" {
75
request_builder = request_builder.header(key, value);
76
}
77
}
+9
-7
src/api/repo.rs
+9
-7
src/api/repo.rs
···
46
}
47
let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
48
49
-
if let Err(_) = crate::auth::verify_token(&token) {
50
-
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"}))).into_response();
51
-
}
52
-
53
-
let session = sqlx::query("SELECT did FROM sessions WHERE access_jwt = $1")
54
.bind(&token)
55
.fetch_optional(&state.db)
56
.await
57
.unwrap_or(None);
58
59
-
let did = match session {
60
-
Some(row) => row.get::<String, _>("did"),
61
None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(),
62
};
63
64
if input.repo != did {
65
return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
···
46
}
47
let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
48
49
+
let session = sqlx::query(
50
+
"SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
51
+
)
52
.bind(&token)
53
.fetch_optional(&state.db)
54
.await
55
.unwrap_or(None);
56
57
+
let (did, key_bytes) = match session {
58
+
Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")),
59
None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(),
60
};
61
+
62
+
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
63
+
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response();
64
+
}
65
66
if input.repo != did {
67
return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
+41
-18
src/api/server.rs
+41
-18
src/api/server.rs
···
13
use jacquard_repo::{mst::Mst, commit::Commit, storage::BlockStore};
14
use jacquard::types::{string::Tid, did::Did, integer::LimitedU32};
15
use std::sync::Arc;
16
17
pub async fn describe_server() -> impl IntoResponse {
18
let domains_str = std::env::var("AVAILABLE_USER_DOMAINS").unwrap_or_else(|_| "example.com".to_string());
···
139
}
140
};
141
142
let store = Arc::new(state.block_store.clone());
143
let mst = Mst::new(store.clone());
144
let mst_root = match mst.root().await {
···
203
}
204
}
205
206
-
let access_jwt = crate::auth::create_access_token(&did).map_err(|e| {
207
error!("Error creating access token: {:?}", e);
208
(StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
209
});
···
212
Err(r) => return r,
213
};
214
215
-
let refresh_jwt = crate::auth::create_refresh_token(&did).map_err(|e| {
216
error!("Error creating refresh token: {:?}", e);
217
(StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
218
});
···
267
) -> Response {
268
info!("create_session: identifier='{}'", input.identifier);
269
270
-
let user_row = sqlx::query("SELECT did, handle, password_hash FROM users WHERE handle = $1 OR email = $1")
271
.bind(&input.identifier)
272
.fetch_optional(&state.db)
273
.await;
···
279
if verify(&input.password, &stored_hash).unwrap_or(false) {
280
let did: String = row.get("did");
281
let handle: String = row.get("handle");
282
283
-
let access_jwt = match crate::auth::create_access_token(&did) {
284
Ok(t) => t,
285
Err(e) => {
286
error!("Failed to create access token: {:?}", e);
···
288
}
289
};
290
291
-
let refresh_jwt = match crate::auth::create_refresh_token(&did) {
292
Ok(t) => t,
293
Err(e) => {
294
error!("Failed to create refresh token: {:?}", e);
···
344
345
let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
346
347
-
if let Err(_) = crate::auth::verify_token(&token) {
348
-
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"}))).into_response();
349
-
}
350
-
351
let result = sqlx::query(
352
r#"
353
-
SELECT u.handle, u.did, u.email
354
FROM sessions s
355
JOIN users u ON s.did = u.did
356
WHERE s.access_jwt = $1
357
"#
358
)
359
-
.bind(token)
360
.fetch_optional(&state.db)
361
.await;
362
···
365
let handle: String = row.get("handle");
366
let did: String = row.get("did");
367
let email: String = row.get("email");
368
369
return (StatusCode::OK, Json(json!({
370
"handle": handle,
···
424
425
let refresh_token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
426
427
-
if let Err(_) = crate::auth::verify_token(&refresh_token) {
428
-
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid refresh token"}))).into_response();
429
-
}
430
-
431
-
let session = sqlx::query("SELECT did FROM sessions WHERE refresh_jwt = $1")
432
.bind(&refresh_token)
433
.fetch_optional(&state.db)
434
.await;
···
436
match session {
437
Ok(Some(session_row)) => {
438
let did: String = session_row.get("did");
439
-
let new_access_jwt = match crate::auth::create_access_token(&did) {
440
Ok(t) => t,
441
Err(e) => {
442
error!("Failed to create access token: {:?}", e);
443
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
444
}
445
};
446
-
let new_refresh_jwt = match crate::auth::create_refresh_token(&did) {
447
Ok(t) => t,
448
Err(e) => {
449
error!("Failed to create refresh token: {:?}", e);
···
13
use jacquard_repo::{mst::Mst, commit::Commit, storage::BlockStore};
14
use jacquard::types::{string::Tid, did::Did, integer::LimitedU32};
15
use std::sync::Arc;
16
+
use k256::SecretKey;
17
+
use rand::rngs::OsRng;
18
19
pub async fn describe_server() -> impl IntoResponse {
20
let domains_str = std::env::var("AVAILABLE_USER_DOMAINS").unwrap_or_else(|_| "example.com".to_string());
···
141
}
142
};
143
144
+
let secret_key = SecretKey::random(&mut OsRng);
145
+
let secret_key_bytes = secret_key.to_bytes();
146
+
147
+
let key_insert = sqlx::query("INSERT INTO user_keys (user_id, key_bytes) VALUES ($1, $2)")
148
+
.bind(user_id)
149
+
.bind(&secret_key_bytes[..])
150
+
.execute(&mut *tx)
151
+
.await;
152
+
153
+
if let Err(e) = key_insert {
154
+
error!("Error inserting user key: {:?}", e);
155
+
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
156
+
}
157
+
158
let store = Arc::new(state.block_store.clone());
159
let mst = Mst::new(store.clone());
160
let mst_root = match mst.root().await {
···
219
}
220
}
221
222
+
let access_jwt = crate::auth::create_access_token(&did, &secret_key_bytes[..]).map_err(|e| {
223
error!("Error creating access token: {:?}", e);
224
(StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
225
});
···
228
Err(r) => return r,
229
};
230
231
+
let refresh_jwt = crate::auth::create_refresh_token(&did, &secret_key_bytes[..]).map_err(|e| {
232
error!("Error creating refresh token: {:?}", e);
233
(StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
234
});
···
283
) -> Response {
284
info!("create_session: identifier='{}'", input.identifier);
285
286
+
let user_row = sqlx::query("SELECT u.did, u.handle, u.password_hash, k.key_bytes FROM users u JOIN user_keys k ON u.id = k.user_id WHERE u.handle = $1 OR u.email = $1")
287
.bind(&input.identifier)
288
.fetch_optional(&state.db)
289
.await;
···
295
if verify(&input.password, &stored_hash).unwrap_or(false) {
296
let did: String = row.get("did");
297
let handle: String = row.get("handle");
298
+
let key_bytes: Vec<u8> = row.get("key_bytes");
299
300
+
let access_jwt = match crate::auth::create_access_token(&did, &key_bytes) {
301
Ok(t) => t,
302
Err(e) => {
303
error!("Failed to create access token: {:?}", e);
···
305
}
306
};
307
308
+
let refresh_jwt = match crate::auth::create_refresh_token(&did, &key_bytes) {
309
Ok(t) => t,
310
Err(e) => {
311
error!("Failed to create refresh token: {:?}", e);
···
361
362
let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
363
364
let result = sqlx::query(
365
r#"
366
+
SELECT u.handle, u.did, u.email, k.key_bytes
367
FROM sessions s
368
JOIN users u ON s.did = u.did
369
+
JOIN user_keys k ON u.id = k.user_id
370
WHERE s.access_jwt = $1
371
"#
372
)
373
+
.bind(&token)
374
.fetch_optional(&state.db)
375
.await;
376
···
379
let handle: String = row.get("handle");
380
let did: String = row.get("did");
381
let email: String = row.get("email");
382
+
let key_bytes: Vec<u8> = row.get("key_bytes");
383
+
384
+
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
385
+
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response();
386
+
}
387
388
return (StatusCode::OK, Json(json!({
389
"handle": handle,
···
443
444
let refresh_token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
445
446
+
let session = sqlx::query(
447
+
"SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.refresh_jwt = $1"
448
+
)
449
.bind(&refresh_token)
450
.fetch_optional(&state.db)
451
.await;
···
453
match session {
454
Ok(Some(session_row)) => {
455
let did: String = session_row.get("did");
456
+
let key_bytes: Vec<u8> = session_row.get("key_bytes");
457
+
458
+
if let Err(_) = crate::auth::verify_token(&refresh_token, &key_bytes) {
459
+
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid refresh token signature"}))).into_response();
460
+
}
461
+
462
+
let new_access_jwt = match crate::auth::create_access_token(&did, &key_bytes) {
463
Ok(t) => t,
464
Err(e) => {
465
error!("Failed to create access token: {:?}", e);
466
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
467
}
468
};
469
+
let new_refresh_jwt = match crate::auth::create_refresh_token(&did, &key_bytes) {
470
Ok(t) => t,
471
Err(e) => {
472
error!("Failed to create refresh token: {:?}", e);
+119
-21
src/auth.rs
+119
-21
src/auth.rs
···
1
-
use jsonwebtoken::{encode, decode, Header, Validation, EncodingKey, DecodingKey, TokenData};
2
use serde::{Deserialize, Serialize};
3
use chrono::{Utc, Duration};
4
-
use std::env;
5
6
#[derive(Debug, Serialize, Deserialize)]
7
pub struct Claims {
8
-
// DID type shit
9
pub sub: String,
10
pub exp: usize,
11
pub iat: usize,
12
-
pub scope: String,
13
pub jti: String,
14
}
15
16
-
pub fn create_access_token(did: &str) -> Result<String, jsonwebtoken::errors::Error> {
17
-
let secret = env::var("JWT_SECRET").unwrap_or_else(|_| "secret".to_string());
18
let expiration = Utc::now()
19
-
.checked_add_signed(Duration::minutes(15))
20
.expect("valid timestamp")
21
.timestamp();
22
23
let claims = Claims {
24
sub: did.to_owned(),
25
exp: expiration as usize,
26
iat: Utc::now().timestamp() as usize,
27
-
scope: "access".to_string(),
28
jti: uuid::Uuid::new_v4().to_string(),
29
};
30
31
-
encode(&Header::default(), &claims, &EncodingKey::from_secret(secret.as_ref()))
32
}
33
34
-
pub fn create_refresh_token(did: &str) -> Result<String, jsonwebtoken::errors::Error> {
35
-
let secret = env::var("JWT_SECRET").unwrap_or_else(|_| "secret".to_string());
36
let expiration = Utc::now()
37
-
.checked_add_signed(Duration::days(7))
38
.expect("valid timestamp")
39
.timestamp();
40
41
let claims = Claims {
42
sub: did.to_owned(),
43
exp: expiration as usize,
44
iat: Utc::now().timestamp() as usize,
45
-
scope: "refresh".to_string(),
46
jti: uuid::Uuid::new_v4().to_string(),
47
};
48
49
-
encode(&Header::default(), &claims, &EncodingKey::from_secret(secret.as_ref()))
50
}
51
52
-
pub fn verify_token(token: &str) -> Result<TokenData<Claims>, jsonwebtoken::errors::Error> {
53
-
let secret = env::var("JWT_SECRET").unwrap_or_else(|_| "secret".to_string());
54
-
decode::<Claims>(
55
-
token,
56
-
&DecodingKey::from_secret(secret.as_ref()),
57
-
&Validation::default(),
58
-
)
59
}
···
1
use serde::{Deserialize, Serialize};
2
use chrono::{Utc, Duration};
3
+
use k256::ecdsa::{SigningKey, VerifyingKey, signature::Signer, signature::Verifier, Signature};
4
+
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
5
+
use anyhow::{Context, Result, anyhow};
6
7
#[derive(Debug, Serialize, Deserialize)]
8
pub struct Claims {
9
+
pub iss: String,
10
pub sub: String,
11
+
pub aud: String,
12
pub exp: usize,
13
pub iat: usize,
14
+
#[serde(skip_serializing_if = "Option::is_none")]
15
+
pub scope: Option<String>,
16
+
#[serde(skip_serializing_if = "Option::is_none")]
17
+
pub lxm: Option<String>,
18
pub jti: String,
19
}
20
21
+
#[derive(Debug, Serialize, Deserialize)]
22
+
struct Header {
23
+
alg: String,
24
+
typ: String,
25
+
}
26
+
27
+
#[derive(Debug, Serialize, Deserialize)]
28
+
struct UnsafeClaims {
29
+
iss: String,
30
+
sub: Option<String>,
31
+
}
32
+
33
+
// fancy boy TokenData equivalent for compatibility/structure
34
+
pub struct TokenData<T> {
35
+
pub claims: T,
36
+
}
37
+
38
+
pub fn get_did_from_token(token: &str) -> Result<String, String> {
39
+
let parts: Vec<&str> = token.split('.').collect();
40
+
if parts.len() != 3 {
41
+
return Err("Invalid token format".to_string());
42
+
}
43
+
44
+
let payload_bytes = URL_SAFE_NO_PAD.decode(parts[1])
45
+
.map_err(|e| format!("Base64 decode failed: {}", e))?;
46
+
47
+
let claims: UnsafeClaims = serde_json::from_slice(&payload_bytes)
48
+
.map_err(|e| format!("JSON decode failed: {}", e))?;
49
+
50
+
Ok(claims.sub.unwrap_or(claims.iss))
51
+
}
52
+
53
+
pub fn create_access_token(did: &str, key_bytes: &[u8]) -> Result<String, anyhow::Error> {
54
+
create_signed_token(did, "access", key_bytes, Duration::minutes(15))
55
+
}
56
+
57
+
pub fn create_refresh_token(did: &str, key_bytes: &[u8]) -> Result<String, anyhow::Error> {
58
+
create_signed_token(did, "refresh", key_bytes, Duration::days(7))
59
+
}
60
+
61
+
pub fn create_service_token(did: &str, aud: &str, lxm: &str, key_bytes: &[u8]) -> Result<String, anyhow::Error> {
62
+
let signing_key = SigningKey::from_slice(key_bytes)?;
63
+
64
let expiration = Utc::now()
65
+
.checked_add_signed(Duration::seconds(60))
66
.expect("valid timestamp")
67
.timestamp();
68
69
let claims = Claims {
70
+
iss: did.to_owned(),
71
sub: did.to_owned(),
72
+
aud: aud.to_owned(),
73
exp: expiration as usize,
74
iat: Utc::now().timestamp() as usize,
75
+
scope: None,
76
+
lxm: Some(lxm.to_string()),
77
jti: uuid::Uuid::new_v4().to_string(),
78
};
79
80
+
sign_claims(claims, &signing_key)
81
}
82
83
+
fn create_signed_token(did: &str, scope: &str, key_bytes: &[u8], duration: Duration) -> Result<String, anyhow::Error> {
84
+
let signing_key = SigningKey::from_slice(key_bytes)?;
85
+
86
let expiration = Utc::now()
87
+
.checked_add_signed(duration)
88
.expect("valid timestamp")
89
.timestamp();
90
91
let claims = Claims {
92
+
iss: did.to_owned(),
93
sub: did.to_owned(),
94
+
aud: format!("did:web:{}", std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string())),
95
exp: expiration as usize,
96
iat: Utc::now().timestamp() as usize,
97
+
scope: Some(scope.to_string()),
98
+
lxm: None,
99
jti: uuid::Uuid::new_v4().to_string(),
100
};
101
102
+
sign_claims(claims, &signing_key)
103
}
104
105
+
fn sign_claims(claims: Claims, key: &SigningKey) -> Result<String, anyhow::Error> {
106
+
let header = Header {
107
+
alg: "ES256K".to_string(),
108
+
typ: "JWT".to_string(),
109
+
};
110
+
111
+
let header_json = serde_json::to_string(&header)?;
112
+
let claims_json = serde_json::to_string(&claims)?;
113
+
114
+
let header_b64 = URL_SAFE_NO_PAD.encode(header_json);
115
+
let claims_b64 = URL_SAFE_NO_PAD.encode(claims_json);
116
+
117
+
let message = format!("{}.{}", header_b64, claims_b64);
118
+
let signature: Signature = key.sign(message.as_bytes());
119
+
let signature_b64 = URL_SAFE_NO_PAD.encode(signature.to_bytes());
120
+
121
+
Ok(format!("{}.{}", message, signature_b64))
122
+
}
123
+
124
+
pub fn verify_token(token: &str, key_bytes: &[u8]) -> Result<TokenData<Claims>, anyhow::Error> {
125
+
let parts: Vec<&str> = token.split('.').collect();
126
+
if parts.len() != 3 {
127
+
return Err(anyhow!("Invalid token format"));
128
+
}
129
+
130
+
let header_b64 = parts[0];
131
+
let claims_b64 = parts[1];
132
+
let signature_b64 = parts[2];
133
+
134
+
let signature_bytes = URL_SAFE_NO_PAD.decode(signature_b64)
135
+
.context("Base64 decode of signature failed")?;
136
+
let signature = Signature::from_slice(&signature_bytes)
137
+
.map_err(|e| anyhow!("Invalid signature format: {}", e))?;
138
+
139
+
let signing_key = SigningKey::from_slice(key_bytes)?;
140
+
let verifying_key = VerifyingKey::from(&signing_key);
141
+
142
+
let message = format!("{}.{}", header_b64, claims_b64);
143
+
verifying_key.verify(message.as_bytes(), &signature)
144
+
.map_err(|e| anyhow!("Signature verification failed: {}", e))?;
145
+
146
+
let claims_bytes = URL_SAFE_NO_PAD.decode(claims_b64)
147
+
.context("Base64 decode of claims failed")?;
148
+
let claims: Claims = serde_json::from_slice(&claims_bytes)
149
+
.context("JSON decode of claims failed")?;
150
+
151
+
let now = Utc::now().timestamp() as usize;
152
+
if claims.exp < now {
153
+
return Err(anyhow!("Token expired"));
154
+
}
155
+
156
+
Ok(TokenData { claims })
157
}
+122
tests/auth.rs
+122
tests/auth.rs
···
···
1
+
use bspds::auth;
2
+
use k256::SecretKey;
3
+
use rand::rngs::OsRng;
4
+
use chrono::{Utc, Duration};
5
+
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
6
+
use serde_json::json;
7
+
use k256::ecdsa::{SigningKey, signature::Signer};
8
+
9
+
#[test]
10
+
fn test_jwt_flow() {
11
+
let secret_key = SecretKey::random(&mut OsRng);
12
+
let key_bytes = secret_key.to_bytes();
13
+
let did = "did:plc:test";
14
+
15
+
let token = auth::create_access_token(did, &key_bytes).expect("create token");
16
+
let data = auth::verify_token(&token, &key_bytes).expect("verify token");
17
+
assert_eq!(data.claims.sub, did);
18
+
assert_eq!(data.claims.iss, did);
19
+
assert_eq!(data.claims.scope, Some("access".to_string()));
20
+
21
+
let r_token = auth::create_refresh_token(did, &key_bytes).expect("create refresh token");
22
+
let r_data = auth::verify_token(&r_token, &key_bytes).expect("verify refresh token");
23
+
assert_eq!(r_data.claims.scope, Some("refresh".to_string()));
24
+
25
+
let aud = "did:web:service";
26
+
let lxm = "com.example.test";
27
+
let s_token = auth::create_service_token(did, aud, lxm, &key_bytes).expect("create service token");
28
+
let s_data = auth::verify_token(&s_token, &key_bytes).expect("verify service token");
29
+
assert_eq!(s_data.claims.aud, aud);
30
+
assert_eq!(s_data.claims.lxm, Some(lxm.to_string()));
31
+
}
32
+
33
+
#[test]
34
+
fn test_verify_fails_with_wrong_key() {
35
+
let secret_key1 = SecretKey::random(&mut OsRng);
36
+
let key_bytes1 = secret_key1.to_bytes();
37
+
38
+
let secret_key2 = SecretKey::random(&mut OsRng);
39
+
let key_bytes2 = secret_key2.to_bytes();
40
+
41
+
let did = "did:plc:test";
42
+
let token = auth::create_access_token(did, &key_bytes1).expect("create token");
43
+
44
+
let result = auth::verify_token(&token, &key_bytes2);
45
+
assert!(result.is_err());
46
+
}
47
+
48
+
#[test]
49
+
fn test_token_expiration() {
50
+
let secret_key = SecretKey::random(&mut OsRng);
51
+
let key_bytes = secret_key.to_bytes();
52
+
let signing_key = SigningKey::from_slice(&key_bytes).expect("key");
53
+
54
+
let header = json!({
55
+
"alg": "ES256K",
56
+
"typ": "JWT"
57
+
});
58
+
let claims = json!({
59
+
"iss": "did:plc:test",
60
+
"sub": "did:plc:test",
61
+
"aud": "did:web:test",
62
+
"exp": (Utc::now() - Duration::seconds(10)).timestamp(),
63
+
"iat": (Utc::now() - Duration::minutes(1)).timestamp(),
64
+
"jti": "unique",
65
+
});
66
+
67
+
let header_b64 = URL_SAFE_NO_PAD.encode(serde_json::to_string(&header).unwrap());
68
+
let claims_b64 = URL_SAFE_NO_PAD.encode(serde_json::to_string(&claims).unwrap());
69
+
let message = format!("{}.{}", header_b64, claims_b64);
70
+
let signature: k256::ecdsa::Signature = signing_key.sign(message.as_bytes());
71
+
let signature_b64 = URL_SAFE_NO_PAD.encode(signature.to_bytes());
72
+
let token = format!("{}.{}", message, signature_b64);
73
+
74
+
let result = auth::verify_token(&token, &key_bytes);
75
+
match result {
76
+
Ok(_) => panic!("Token should be expired"),
77
+
Err(e) => assert_eq!(e.to_string(), "Token expired"),
78
+
}
79
+
}
80
+
81
+
#[test]
82
+
fn test_invalid_token_format() {
83
+
let secret_key = SecretKey::random(&mut OsRng);
84
+
let key_bytes = secret_key.to_bytes();
85
+
86
+
assert!(auth::verify_token("invalid.token", &key_bytes).is_err());
87
+
assert!(auth::verify_token("too.many.parts.here", &key_bytes).is_err());
88
+
assert!(auth::verify_token("bad_base64.payload.sig", &key_bytes).is_err());
89
+
}
90
+
91
+
#[test]
92
+
fn test_tampered_token() {
93
+
let secret_key = SecretKey::random(&mut OsRng);
94
+
let key_bytes = secret_key.to_bytes();
95
+
let did = "did:plc:test";
96
+
97
+
let token = auth::create_access_token(did, &key_bytes).expect("create token");
98
+
let parts: Vec<&str> = token.split('.').collect();
99
+
100
+
let claims_json = String::from_utf8(URL_SAFE_NO_PAD.decode(parts[1]).unwrap()).unwrap();
101
+
let mut claims: serde_json::Value = serde_json::from_str(&claims_json).unwrap();
102
+
claims["sub"] = json!("did:plc:hacker");
103
+
let tampered_claims_b64 = URL_SAFE_NO_PAD.encode(serde_json::to_string(&claims).unwrap());
104
+
105
+
let tampered_token = format!("{}.{}.{}", parts[0], tampered_claims_b64, parts[2]);
106
+
107
+
let result = auth::verify_token(&tampered_token, &key_bytes);
108
+
assert!(result.is_err());
109
+
}
110
+
111
+
#[test]
112
+
fn test_get_did_from_token() {
113
+
let secret_key = SecretKey::random(&mut OsRng);
114
+
let key_bytes = secret_key.to_bytes();
115
+
let did = "did:plc:test";
116
+
117
+
let token = auth::create_access_token(did, &key_bytes).expect("create token");
118
+
let extracted_did = auth::get_did_from_token(&token).expect("get did");
119
+
assert_eq!(extracted_did, did);
120
+
121
+
assert!(auth::get_did_from_token("bad.token").is_err());
122
+
}
+2
tests/common/mod.rs
+2
tests/common/mod.rs
···
24
#[allow(dead_code)]
25
pub const TARGET_DID: &str = "did:plc:target";
26
27
pub fn client() -> Client {
28
Client::new()
29
}
···
142
(uri, cid, rkey)
143
}
144
145
pub async fn create_account_and_login(client: &Client) -> (String, String) {
146
let handle = format!("user_{}", uuid::Uuid::new_v4());
147
let payload = json!({
···
24
#[allow(dead_code)]
25
pub const TARGET_DID: &str = "did:plc:target";
26
27
+
#[allow(dead_code)]
28
pub fn client() -> Client {
29
Client::new()
30
}
···
143
(uri, cid, rkey)
144
}
145
146
+
#[allow(dead_code)]
147
pub async fn create_account_and_login(client: &Client) -> (String, String) {
148
let handle = format!("user_{}", uuid::Uuid::new_v4());
149
let payload = json!({
+37
tests/proxy.rs
+37
tests/proxy.rs
···
9
use tokio::net::TcpListener;
10
use reqwest::Client;
11
use std::sync::Arc;
12
13
async fn spawn_mock_upstream() -> (String, tokio::sync::mpsc::Receiver<(String, String, Option<String>)>) {
14
let (tx, rx) = tokio::sync::mpsc::channel(10);
···
94
95
assert_eq!(res.status(), StatusCode::BAD_GATEWAY);
96
}
···
9
use tokio::net::TcpListener;
10
use reqwest::Client;
11
use std::sync::Arc;
12
+
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
13
14
async fn spawn_mock_upstream() -> (String, tokio::sync::mpsc::Receiver<(String, String, Option<String>)>) {
15
let (tx, rx) = tokio::sync::mpsc::channel(10);
···
95
96
assert_eq!(res.status(), StatusCode::BAD_GATEWAY);
97
}
98
+
99
+
#[tokio::test]
100
+
async fn test_proxy_auth_signing() {
101
+
let app_url = common::base_url().await;
102
+
let (upstream_url, mut rx) = spawn_mock_upstream().await;
103
+
let client = Client::new();
104
+
105
+
let (access_jwt, did) = common::create_account_and_login(&client).await;
106
+
107
+
let res = client.get(format!("{}/xrpc/com.example.signed", app_url))
108
+
.header("atproto-proxy", &upstream_url)
109
+
.header("Authorization", format!("Bearer {}", access_jwt))
110
+
.send()
111
+
.await
112
+
.unwrap();
113
+
114
+
assert_eq!(res.status(), StatusCode::OK);
115
+
116
+
let (method, uri, auth) = rx.recv().await.expect("Upstream receive");
117
+
assert_eq!(method, "GET");
118
+
assert_eq!(uri, "/xrpc/com.example.signed");
119
+
120
+
let received_token = auth.expect("No auth header").replace("Bearer ", "");
121
+
assert_ne!(received_token, access_jwt, "Token should be replaced");
122
+
123
+
let parts: Vec<&str> = received_token.split('.').collect();
124
+
assert_eq!(parts.len(), 3);
125
+
126
+
let payload_bytes = URL_SAFE_NO_PAD.decode(parts[1]).expect("payload b64");
127
+
let claims: serde_json::Value = serde_json::from_slice(&payload_bytes).expect("payload json");
128
+
129
+
assert_eq!(claims["iss"], did);
130
+
assert_eq!(claims["sub"], did);
131
+
assert_eq!(claims["aud"], upstream_url);
132
+
assert_eq!(claims["lxm"], "com.example.signed");
133
+
}