+22
.sqlx/query-1add22e111d5eff8beadbd832b4b8146d95da0a0ce8ce31dc9a2f930a26cc9ce.json
+22
.sqlx/query-1add22e111d5eff8beadbd832b4b8146d95da0a0ce8ce31dc9a2f930a26cc9ce.json
···
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "SELECT takedown_ref FROM users WHERE did = $1",
4
+
"describe": {
5
+
"columns": [
6
+
{
7
+
"ordinal": 0,
8
+
"name": "takedown_ref",
9
+
"type_info": "Text"
10
+
}
11
+
],
12
+
"parameters": {
13
+
"Left": [
14
+
"Text"
15
+
]
16
+
},
17
+
"nullable": [
18
+
true
19
+
]
20
+
},
21
+
"hash": "1add22e111d5eff8beadbd832b4b8146d95da0a0ce8ce31dc9a2f930a26cc9ce"
22
+
}
+28
.sqlx/query-90bcc8fb97f73a0b5f427971aca891936b3f906c2d4cdb4bf203dd6a4c9aa060.json
+28
.sqlx/query-90bcc8fb97f73a0b5f427971aca891936b3f906c2d4cdb4bf203dd6a4c9aa060.json
···
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "SELECT k.key_bytes, k.encryption_version FROM users u JOIN user_keys k ON u.id = k.user_id WHERE u.did = $1",
4
+
"describe": {
5
+
"columns": [
6
+
{
7
+
"ordinal": 0,
8
+
"name": "key_bytes",
9
+
"type_info": "Bytea"
10
+
},
11
+
{
12
+
"ordinal": 1,
13
+
"name": "encryption_version",
14
+
"type_info": "Int4"
15
+
}
16
+
],
17
+
"parameters": {
18
+
"Left": [
19
+
"Text"
20
+
]
21
+
},
22
+
"nullable": [
23
+
false,
24
+
true
25
+
]
26
+
},
27
+
"hash": "90bcc8fb97f73a0b5f427971aca891936b3f906c2d4cdb4bf203dd6a4c9aa060"
28
+
}
+52
.sqlx/query-bee4276cbb537512cced16f7017d8f7c068d30f319ef965fa9ec9fb1a3490151.json
+52
.sqlx/query-bee4276cbb537512cced16f7017d8f7c068d30f319ef965fa9ec9fb1a3490151.json
···
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "SELECT t.did, t.expires_at, u.deactivated_at, u.takedown_ref,\n k.key_bytes as \"key_bytes?\", k.encryption_version as \"encryption_version?\"\n FROM oauth_token t\n JOIN users u ON t.did = u.did\n LEFT JOIN user_keys k ON u.id = k.user_id\n WHERE t.token_id = $1",
4
+
"describe": {
5
+
"columns": [
6
+
{
7
+
"ordinal": 0,
8
+
"name": "did",
9
+
"type_info": "Text"
10
+
},
11
+
{
12
+
"ordinal": 1,
13
+
"name": "expires_at",
14
+
"type_info": "Timestamptz"
15
+
},
16
+
{
17
+
"ordinal": 2,
18
+
"name": "deactivated_at",
19
+
"type_info": "Timestamptz"
20
+
},
21
+
{
22
+
"ordinal": 3,
23
+
"name": "takedown_ref",
24
+
"type_info": "Text"
25
+
},
26
+
{
27
+
"ordinal": 4,
28
+
"name": "key_bytes?",
29
+
"type_info": "Bytea"
30
+
},
31
+
{
32
+
"ordinal": 5,
33
+
"name": "encryption_version?",
34
+
"type_info": "Int4"
35
+
}
36
+
],
37
+
"parameters": {
38
+
"Left": [
39
+
"Text"
40
+
]
41
+
},
42
+
"nullable": [
43
+
false,
44
+
false,
45
+
true,
46
+
true,
47
+
false,
48
+
true
49
+
]
50
+
},
51
+
"hash": "bee4276cbb537512cced16f7017d8f7c068d30f319ef965fa9ec9fb1a3490151"
52
+
}
-40
.sqlx/query-efe82a97fd456c85dc7f51ece87f85950cca79fe0fac4ef6caa44fecf0911b07.json
-40
.sqlx/query-efe82a97fd456c85dc7f51ece87f85950cca79fe0fac4ef6caa44fecf0911b07.json
···
1
-
{
2
-
"db_name": "PostgreSQL",
3
-
"query": "SELECT t.did, t.expires_at, u.deactivated_at, u.takedown_ref\n FROM oauth_token t\n JOIN users u ON t.did = u.did\n WHERE t.token_id = $1",
4
-
"describe": {
5
-
"columns": [
6
-
{
7
-
"ordinal": 0,
8
-
"name": "did",
9
-
"type_info": "Text"
10
-
},
11
-
{
12
-
"ordinal": 1,
13
-
"name": "expires_at",
14
-
"type_info": "Timestamptz"
15
-
},
16
-
{
17
-
"ordinal": 2,
18
-
"name": "deactivated_at",
19
-
"type_info": "Timestamptz"
20
-
},
21
-
{
22
-
"ordinal": 3,
23
-
"name": "takedown_ref",
24
-
"type_info": "Text"
25
-
}
26
-
],
27
-
"parameters": {
28
-
"Left": [
29
-
"Text"
30
-
]
31
-
},
32
-
"nullable": [
33
-
false,
34
-
false,
35
-
true,
36
-
true
37
-
]
38
-
},
39
-
"hash": "efe82a97fd456c85dc7f51ece87f85950cca79fe0fac4ef6caa44fecf0911b07"
40
-
}
···
+23
-15
src/api/actor/profile.rs
+23
-15
src/api/actor/profile.rs
···
73
async fn proxy_to_appview(
74
method: &str,
75
params: &HashMap<String, String>,
76
-
auth_header: Option<&str>,
77
) -> Result<(StatusCode, Value), Response> {
78
let appview_url = match std::env::var("APPVIEW_URL") {
79
Ok(url) => url,
···
87
info!("Proxying GET request to {}", target_url);
88
let client = proxy_client();
89
let mut request_builder = client.get(&target_url).query(params);
90
-
if let Some(auth) = auth_header {
91
-
request_builder = request_builder.header("Authorization", auth);
92
}
93
match request_builder.send().await {
94
Ok(resp) => {
···
118
Query(params): Query<GetProfileParams>,
119
) -> Response {
120
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
121
-
let auth_did = if let Some(h) = auth_header {
122
if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) {
123
-
match crate::auth::validate_bearer_token(&state.db, &token).await {
124
-
Ok(user) => Some(user.did),
125
-
Err(_) => None,
126
-
}
127
} else {
128
None
129
}
130
} else {
131
None
132
};
133
let mut query_params = HashMap::new();
134
query_params.insert("actor".to_string(), params.actor.clone());
135
-
let (status, body) = match proxy_to_appview("app.bsky.actor.getProfile", &query_params, auth_header).await {
136
Ok(r) => r,
137
Err(e) => return e,
138
};
···
161
Query(params): Query<GetProfilesParams>,
162
) -> Response {
163
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
164
-
let auth_did = if let Some(h) = auth_header {
165
if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) {
166
-
match crate::auth::validate_bearer_token(&state.db, &token).await {
167
-
Ok(user) => Some(user.did),
168
-
Err(_) => None,
169
-
}
170
} else {
171
None
172
}
173
} else {
174
None
175
};
176
let mut query_params = HashMap::new();
177
query_params.insert("actors".to_string(), params.actors.clone());
178
-
let (status, body) = match proxy_to_appview("app.bsky.actor.getProfiles", &query_params, auth_header).await {
179
Ok(r) => r,
180
Err(e) => return e,
181
};
···
73
async fn proxy_to_appview(
74
method: &str,
75
params: &HashMap<String, String>,
76
+
auth_did: &str,
77
+
auth_key_bytes: Option<&[u8]>,
78
) -> Result<(StatusCode, Value), Response> {
79
let appview_url = match std::env::var("APPVIEW_URL") {
80
Ok(url) => url,
···
88
info!("Proxying GET request to {}", target_url);
89
let client = proxy_client();
90
let mut request_builder = client.get(&target_url).query(params);
91
+
if let Some(key_bytes) = auth_key_bytes {
92
+
let appview_did = std::env::var("APPVIEW_DID").unwrap_or_else(|_| "did:web:api.bsky.app".to_string());
93
+
match crate::auth::create_service_token(auth_did, &appview_did, method, key_bytes) {
94
+
Ok(service_token) => {
95
+
request_builder = request_builder.header("Authorization", format!("Bearer {}", service_token));
96
+
}
97
+
Err(e) => {
98
+
error!("Failed to create service token: {:?}", e);
99
+
return Err((StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response());
100
+
}
101
+
}
102
}
103
match request_builder.send().await {
104
Ok(resp) => {
···
128
Query(params): Query<GetProfileParams>,
129
) -> Response {
130
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
131
+
let auth_user = if let Some(h) = auth_header {
132
if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) {
133
+
crate::auth::validate_bearer_token(&state.db, &token).await.ok()
134
} else {
135
None
136
}
137
} else {
138
None
139
};
140
+
let auth_did = auth_user.as_ref().map(|u| u.did.clone());
141
+
let auth_key_bytes = auth_user.as_ref().and_then(|u| u.key_bytes.clone());
142
let mut query_params = HashMap::new();
143
query_params.insert("actor".to_string(), params.actor.clone());
144
+
let (status, body) = match proxy_to_appview("app.bsky.actor.getProfile", &query_params, auth_did.as_deref().unwrap_or(""), auth_key_bytes.as_deref()).await {
145
Ok(r) => r,
146
Err(e) => return e,
147
};
···
170
Query(params): Query<GetProfilesParams>,
171
) -> Response {
172
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
173
+
let auth_user = if let Some(h) = auth_header {
174
if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) {
175
+
crate::auth::validate_bearer_token(&state.db, &token).await.ok()
176
} else {
177
None
178
}
179
} else {
180
None
181
};
182
+
let auth_did = auth_user.as_ref().map(|u| u.did.clone());
183
+
let auth_key_bytes = auth_user.as_ref().and_then(|u| u.key_bytes.clone());
184
let mut query_params = HashMap::new();
185
query_params.insert("actors".to_string(), params.actors.clone());
186
+
let (status, body) = match proxy_to_appview("app.bsky.actor.getProfiles", &query_params, auth_did.as_deref().unwrap_or(""), auth_key_bytes.as_deref()).await {
187
Ok(r) => r,
188
Err(e) => return e,
189
};
+12
-13
src/api/feed/actor_likes.rs
+12
-13
src/api/feed/actor_likes.rs
···
66
Query(params): Query<GetActorLikesParams>,
67
) -> Response {
68
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
69
-
let auth_did = if let Some(h) = auth_header {
70
if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) {
71
-
match crate::auth::validate_bearer_token(&state.db, &token).await {
72
-
Ok(user) => Some(user.did),
73
-
Err(_) => None,
74
-
}
75
} else {
76
None
77
}
78
} else {
79
None
80
};
81
let mut query_params = HashMap::new();
82
query_params.insert("actor".to_string(), params.actor.clone());
83
if let Some(limit) = params.limit {
···
87
query_params.insert("cursor".to_string(), cursor.clone());
88
}
89
let proxy_result =
90
-
match proxy_to_appview("app.bsky.feed.getActorLikes", &query_params, auth_header).await {
91
Ok(r) => r,
92
Err(e) => return e,
93
};
94
if !proxy_result.status.is_success() {
95
-
return (proxy_result.status, proxy_result.body).into_response();
96
}
97
let rev = match extract_repo_rev(&proxy_result.headers) {
98
Some(r) => r,
99
-
None => return (proxy_result.status, proxy_result.body).into_response(),
100
};
101
let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) {
102
Ok(f) => f,
103
Err(e) => {
104
warn!("Failed to parse actor likes response: {:?}", e);
105
-
return (proxy_result.status, proxy_result.body).into_response();
106
}
107
};
108
-
let requester_did = match auth_did {
109
-
Some(d) => d,
110
None => return (StatusCode::OK, Json(feed_output)).into_response(),
111
};
112
let actor_did = if params.actor.starts_with("did:") {
···
127
Ok(None) => return (StatusCode::OK, Json(feed_output)).into_response(),
128
Err(e) => {
129
warn!("Database error resolving actor handle: {:?}", e);
130
-
return (proxy_result.status, proxy_result.body).into_response();
131
}
132
}
133
};
···
138
Ok(r) => r,
139
Err(e) => {
140
warn!("Failed to get local records: {}", e);
141
-
return (proxy_result.status, proxy_result.body).into_response();
142
}
143
};
144
if local_records.likes.is_empty() {
···
66
Query(params): Query<GetActorLikesParams>,
67
) -> Response {
68
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
69
+
let auth_user = if let Some(h) = auth_header {
70
if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) {
71
+
crate::auth::validate_bearer_token(&state.db, &token).await.ok()
72
} else {
73
None
74
}
75
} else {
76
None
77
};
78
+
let auth_did = auth_user.as_ref().map(|u| u.did.clone());
79
+
let auth_key_bytes = auth_user.as_ref().and_then(|u| u.key_bytes.clone());
80
let mut query_params = HashMap::new();
81
query_params.insert("actor".to_string(), params.actor.clone());
82
if let Some(limit) = params.limit {
···
86
query_params.insert("cursor".to_string(), cursor.clone());
87
}
88
let proxy_result =
89
+
match proxy_to_appview("app.bsky.feed.getActorLikes", &query_params, auth_did.as_deref().unwrap_or(""), auth_key_bytes.as_deref()).await {
90
Ok(r) => r,
91
Err(e) => return e,
92
};
93
if !proxy_result.status.is_success() {
94
+
return proxy_result.into_response();
95
}
96
let rev = match extract_repo_rev(&proxy_result.headers) {
97
Some(r) => r,
98
+
None => return proxy_result.into_response(),
99
};
100
let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) {
101
Ok(f) => f,
102
Err(e) => {
103
warn!("Failed to parse actor likes response: {:?}", e);
104
+
return proxy_result.into_response();
105
}
106
};
107
+
let requester_did = match &auth_did {
108
+
Some(d) => d.clone(),
109
None => return (StatusCode::OK, Json(feed_output)).into_response(),
110
};
111
let actor_did = if params.actor.starts_with("did:") {
···
126
Ok(None) => return (StatusCode::OK, Json(feed_output)).into_response(),
127
Err(e) => {
128
warn!("Database error resolving actor handle: {:?}", e);
129
+
return proxy_result.into_response();
130
}
131
}
132
};
···
137
Ok(r) => r,
138
Err(e) => {
139
warn!("Failed to get local records: {}", e);
140
+
return proxy_result.into_response();
141
}
142
};
143
if local_records.likes.is_empty() {
+14
-5
src/api/feed/custom_feed.rs
+14
-5
src/api/feed/custom_feed.rs
···
30
Some(t) => t,
31
None => return ApiError::AuthenticationRequired.into_response(),
32
};
33
-
if let Err(e) = crate::auth::validate_bearer_token(&state.db, &token).await {
34
-
return ApiError::from(e).into_response();
35
};
36
if let Err(e) = validate_at_uri(¶ms.feed) {
37
return ApiError::InvalidRequest(format!("Invalid feed URI: {}", e)).into_response();
38
}
39
-
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
40
let appview_url = match std::env::var("APPVIEW_URL") {
41
Ok(url) => url,
42
Err(_) => {
···
60
info!(target = %target_url, feed = %params.feed, "Proxying getFeed request");
61
let client = proxy_client();
62
let mut request_builder = client.get(&target_url).query(&query_params);
63
-
if let Some(auth) = auth_header {
64
-
request_builder = request_builder.header("Authorization", auth);
65
}
66
match request_builder.send().await {
67
Ok(resp) => {
···
30
Some(t) => t,
31
None => return ApiError::AuthenticationRequired.into_response(),
32
};
33
+
let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
34
+
Ok(user) => user,
35
+
Err(e) => return ApiError::from(e).into_response(),
36
};
37
if let Err(e) = validate_at_uri(¶ms.feed) {
38
return ApiError::InvalidRequest(format!("Invalid feed URI: {}", e)).into_response();
39
}
40
let appview_url = match std::env::var("APPVIEW_URL") {
41
Ok(url) => url,
42
Err(_) => {
···
60
info!(target = %target_url, feed = %params.feed, "Proxying getFeed request");
61
let client = proxy_client();
62
let mut request_builder = client.get(&target_url).query(&query_params);
63
+
if let Some(key_bytes) = auth_user.key_bytes.as_ref() {
64
+
let appview_did = std::env::var("APPVIEW_DID").unwrap_or_else(|_| "did:web:api.bsky.app".to_string());
65
+
match crate::auth::create_service_token(&auth_user.did, &appview_did, "app.bsky.feed.getFeed", key_bytes) {
66
+
Ok(service_token) => {
67
+
request_builder = request_builder.header("Authorization", format!("Bearer {}", service_token));
68
+
}
69
+
Err(e) => {
70
+
error!(error = ?e, "Failed to create service token for getFeed");
71
+
return ApiError::InternalError.into_response();
72
+
}
73
+
}
74
}
75
match request_builder.send().await {
76
Ok(resp) => {
+9
-10
src/api/feed/post_thread.rs
+9
-10
src/api/feed/post_thread.rs
···
126
Query(params): Query<GetPostThreadParams>,
127
) -> Response {
128
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
129
-
let auth_did = if let Some(h) = auth_header {
130
if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) {
131
-
match crate::auth::validate_bearer_token(&state.db, &token).await {
132
-
Ok(user) => Some(user.did),
133
-
Err(_) => None,
134
-
}
135
} else {
136
None
137
}
138
} else {
139
None
140
};
141
let mut query_params = HashMap::new();
142
query_params.insert("uri".to_string(), params.uri.clone());
143
if let Some(depth) = params.depth {
···
147
query_params.insert("parentHeight".to_string(), parent_height.to_string());
148
}
149
let proxy_result =
150
-
match proxy_to_appview("app.bsky.feed.getPostThread", &query_params, auth_header).await {
151
Ok(r) => r,
152
Err(e) => return e,
153
};
···
155
return handle_not_found(&state, ¶ms.uri, auth_did, &proxy_result.headers).await;
156
}
157
if !proxy_result.status.is_success() {
158
-
return (proxy_result.status, proxy_result.body).into_response();
159
}
160
let rev = match extract_repo_rev(&proxy_result.headers) {
161
Some(r) => r,
162
-
None => return (proxy_result.status, proxy_result.body).into_response(),
163
};
164
let mut thread_output: PostThreadOutput = match serde_json::from_slice(&proxy_result.body) {
165
Ok(t) => t,
166
Err(e) => {
167
warn!("Failed to parse post thread response: {:?}", e);
168
-
return (proxy_result.status, proxy_result.body).into_response();
169
}
170
};
171
let requester_did = match auth_did {
···
176
Ok(r) => r,
177
Err(e) => {
178
warn!("Failed to get local records: {}", e);
179
-
return (proxy_result.status, proxy_result.body).into_response();
180
}
181
};
182
if local_records.posts.is_empty() {
···
126
Query(params): Query<GetPostThreadParams>,
127
) -> Response {
128
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
129
+
let auth_user = if let Some(h) = auth_header {
130
if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) {
131
+
crate::auth::validate_bearer_token(&state.db, &token).await.ok()
132
} else {
133
None
134
}
135
} else {
136
None
137
};
138
+
let auth_did = auth_user.as_ref().map(|u| u.did.clone());
139
+
let auth_key_bytes = auth_user.as_ref().and_then(|u| u.key_bytes.clone());
140
let mut query_params = HashMap::new();
141
query_params.insert("uri".to_string(), params.uri.clone());
142
if let Some(depth) = params.depth {
···
146
query_params.insert("parentHeight".to_string(), parent_height.to_string());
147
}
148
let proxy_result =
149
+
match proxy_to_appview("app.bsky.feed.getPostThread", &query_params, auth_did.as_deref().unwrap_or(""), auth_key_bytes.as_deref()).await {
150
Ok(r) => r,
151
Err(e) => return e,
152
};
···
154
return handle_not_found(&state, ¶ms.uri, auth_did, &proxy_result.headers).await;
155
}
156
if !proxy_result.status.is_success() {
157
+
return proxy_result.into_response();
158
}
159
let rev = match extract_repo_rev(&proxy_result.headers) {
160
Some(r) => r,
161
+
None => return proxy_result.into_response(),
162
};
163
let mut thread_output: PostThreadOutput = match serde_json::from_slice(&proxy_result.body) {
164
Ok(t) => t,
165
Err(e) => {
166
warn!("Failed to parse post thread response: {:?}", e);
167
+
return proxy_result.into_response();
168
}
169
};
170
let requester_did = match auth_did {
···
175
Ok(r) => r,
176
Err(e) => {
177
warn!("Failed to get local records: {}", e);
178
+
return proxy_result.into_response();
179
}
180
};
181
if local_records.posts.is_empty() {
+8
-9
src/api/feed/timeline.rs
+8
-9
src/api/feed/timeline.rs
···
52
};
53
match std::env::var("APPVIEW_URL") {
54
Ok(url) if !url.starts_with("http://127.0.0.1") => {
55
-
return get_timeline_with_appview(&state, &headers, ¶ms, &auth_user.did).await;
56
}
57
_ => {}
58
}
···
61
62
async fn get_timeline_with_appview(
63
state: &AppState,
64
-
headers: &axum::http::HeaderMap,
65
params: &GetTimelineParams,
66
auth_did: &str,
67
) -> Response {
68
-
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
69
let mut query_params = HashMap::new();
70
if let Some(algo) = ¶ms.algorithm {
71
query_params.insert("algorithm".to_string(), algo.clone());
···
77
query_params.insert("cursor".to_string(), cursor.clone());
78
}
79
let proxy_result =
80
-
match proxy_to_appview("app.bsky.feed.getTimeline", &query_params, auth_header).await {
81
Ok(r) => r,
82
Err(e) => return e,
83
};
84
if !proxy_result.status.is_success() {
85
-
return (proxy_result.status, proxy_result.body).into_response();
86
}
87
let rev = extract_repo_rev(&proxy_result.headers);
88
if rev.is_none() {
89
-
return (proxy_result.status, proxy_result.body).into_response();
90
}
91
let rev = rev.unwrap();
92
let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) {
93
Ok(f) => f,
94
Err(e) => {
95
warn!("Failed to parse timeline response: {:?}", e);
96
-
return (proxy_result.status, proxy_result.body).into_response();
97
}
98
};
99
let local_records = match get_records_since_rev(state, auth_did, &rev).await {
100
Ok(r) => r,
101
Err(e) => {
102
warn!("Failed to get local records: {}", e);
103
-
return (proxy_result.status, proxy_result.body).into_response();
104
}
105
};
106
if local_records.count == 0 {
107
-
return (proxy_result.status, proxy_result.body).into_response();
108
}
109
let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did)
110
.fetch_optional(&state.db)
···
52
};
53
match std::env::var("APPVIEW_URL") {
54
Ok(url) if !url.starts_with("http://127.0.0.1") => {
55
+
return get_timeline_with_appview(&state, ¶ms, &auth_user.did, auth_user.key_bytes.as_deref()).await;
56
}
57
_ => {}
58
}
···
61
62
async fn get_timeline_with_appview(
63
state: &AppState,
64
params: &GetTimelineParams,
65
auth_did: &str,
66
+
auth_key_bytes: Option<&[u8]>,
67
) -> Response {
68
let mut query_params = HashMap::new();
69
if let Some(algo) = ¶ms.algorithm {
70
query_params.insert("algorithm".to_string(), algo.clone());
···
76
query_params.insert("cursor".to_string(), cursor.clone());
77
}
78
let proxy_result =
79
+
match proxy_to_appview("app.bsky.feed.getTimeline", &query_params, auth_did, auth_key_bytes).await {
80
Ok(r) => r,
81
Err(e) => return e,
82
};
83
if !proxy_result.status.is_success() {
84
+
return proxy_result.into_response();
85
}
86
let rev = extract_repo_rev(&proxy_result.headers);
87
if rev.is_none() {
88
+
return proxy_result.into_response();
89
}
90
let rev = rev.unwrap();
91
let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) {
92
Ok(f) => f,
93
Err(e) => {
94
warn!("Failed to parse timeline response: {:?}", e);
95
+
return proxy_result.into_response();
96
}
97
};
98
let local_records = match get_records_since_rev(state, auth_did, &rev).await {
99
Ok(r) => r,
100
Err(e) => {
101
warn!("Failed to get local records: {}", e);
102
+
return proxy_result.into_response();
103
}
104
};
105
if local_records.count == 0 {
106
+
return proxy_result.into_response();
107
}
108
let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did)
109
.fetch_optional(&state.db)
+37
-11
src/api/proxy.rs
+37
-11
src/api/proxy.rs
···
7
};
8
use crate::api::proxy_client::proxy_client;
9
use std::collections::HashMap;
10
-
use tracing::{error, info};
11
12
pub async fn proxy_handler(
13
State(state): State<AppState>,
···
21
.get("atproto-proxy")
22
.and_then(|h| h.to_str().ok())
23
.map(|s| s.to_string());
24
-
let appview_url = match &proxy_header {
25
-
Some(url) => url.clone(),
26
-
None => match std::env::var("APPVIEW_URL") {
27
-
Ok(url) => url,
28
-
Err(_) => {
29
-
return (StatusCode::BAD_GATEWAY, "No upstream AppView configured").into_response();
30
-
}
31
-
},
32
};
33
let target_url = format!("{}/xrpc/{}", appview_url, method);
34
-
info!("Proxying {} request to {}", method_verb, target_url);
35
let client = proxy_client();
36
let mut request_builder = client.request(method_verb, &target_url).query(¶ms);
37
let mut auth_header_val = headers.get("Authorization").map(|h| h.clone());
38
-
if let Some(aud) = &proxy_header {
39
if let Some(token) = crate::auth::extract_bearer_token_from_header(
40
headers.get("Authorization").and_then(|h| h.to_str().ok())
41
) {
···
7
};
8
use crate::api::proxy_client::proxy_client;
9
use std::collections::HashMap;
10
+
use tracing::error;
11
+
12
+
fn resolve_service_did(did_with_fragment: &str) -> Option<(String, String)> {
13
+
if did_with_fragment.starts_with("did:web:") {
14
+
let without_prefix = &did_with_fragment[8..];
15
+
let host = without_prefix.split('#').next()?;
16
+
let url = format!("https://{}", host);
17
+
let did_without_fragment = format!("did:web:{}", host);
18
+
Some((url, did_without_fragment))
19
+
} else if did_with_fragment.starts_with("did:plc:") {
20
+
None
21
+
} else {
22
+
None
23
+
}
24
+
}
25
26
pub async fn proxy_handler(
27
State(state): State<AppState>,
···
35
.get("atproto-proxy")
36
.and_then(|h| h.to_str().ok())
37
.map(|s| s.to_string());
38
+
let (appview_url, service_aud) = match &proxy_header {
39
+
Some(did_str) => {
40
+
let (url, did_without_fragment) = match resolve_service_did(did_str) {
41
+
Some(resolved) => resolved,
42
+
None => {
43
+
error!(did = %did_str, "Could not resolve service DID");
44
+
return (StatusCode::BAD_GATEWAY, "Could not resolve service DID").into_response();
45
+
}
46
+
};
47
+
(url, Some(did_without_fragment))
48
+
}
49
+
None => {
50
+
let url = match std::env::var("APPVIEW_URL") {
51
+
Ok(url) => url,
52
+
Err(_) => {
53
+
return (StatusCode::BAD_GATEWAY, "No upstream AppView configured").into_response();
54
+
}
55
+
};
56
+
let aud = std::env::var("APPVIEW_DID").ok();
57
+
(url, aud)
58
+
}
59
};
60
let target_url = format!("{}/xrpc/{}", appview_url, method);
61
let client = proxy_client();
62
let mut request_builder = client.request(method_verb, &target_url).query(¶ms);
63
let mut auth_header_val = headers.get("Authorization").map(|h| h.clone());
64
+
if let Some(aud) = &service_aud {
65
if let Some(token) = crate::auth::extract_bearer_token_from_header(
66
headers.get("Authorization").and_then(|h| h.to_str().ok())
67
) {
+23
-3
src/api/read_after_write.rs
+23
-3
src/api/read_after_write.rs
···
229
pub body: bytes::Bytes,
230
}
231
232
pub async fn proxy_to_appview(
233
method: &str,
234
params: &HashMap<String, String>,
235
-
auth_header: Option<&str>,
236
) -> Result<ProxyResponse, Response> {
237
let appview_url = std::env::var("APPVIEW_URL").map_err(|_| {
238
ApiError::UpstreamUnavailable("No upstream AppView configured".to_string()).into_response()
···
246
info!(target = %target_url, "Proxying request to appview");
247
let client = proxy_client();
248
let mut request_builder = client.get(&target_url).query(params);
249
-
if let Some(auth) = auth_header {
250
-
request_builder = request_builder.header("Authorization", auth);
251
}
252
match request_builder.send().await {
253
Ok(resp) => {
···
229
pub body: bytes::Bytes,
230
}
231
232
+
impl ProxyResponse {
233
+
pub fn into_response(self) -> Response {
234
+
let mut response = Response::builder().status(self.status);
235
+
for (key, value) in self.headers.iter() {
236
+
response = response.header(key, value);
237
+
}
238
+
response.body(axum::body::Body::from(self.body)).unwrap()
239
+
}
240
+
}
241
+
242
pub async fn proxy_to_appview(
243
method: &str,
244
params: &HashMap<String, String>,
245
+
auth_did: &str,
246
+
auth_key_bytes: Option<&[u8]>,
247
) -> Result<ProxyResponse, Response> {
248
let appview_url = std::env::var("APPVIEW_URL").map_err(|_| {
249
ApiError::UpstreamUnavailable("No upstream AppView configured".to_string()).into_response()
···
257
info!(target = %target_url, "Proxying request to appview");
258
let client = proxy_client();
259
let mut request_builder = client.get(&target_url).query(params);
260
+
if let Some(key_bytes) = auth_key_bytes {
261
+
let appview_did = std::env::var("APPVIEW_DID").unwrap_or_else(|_| "did:web:api.bsky.app".to_string());
262
+
match crate::auth::create_service_token(auth_did, &appview_did, method, key_bytes) {
263
+
Ok(service_token) => {
264
+
request_builder = request_builder.header("Authorization", format!("Bearer {}", service_token));
265
+
}
266
+
Err(e) => {
267
+
error!(error = ?e, "Failed to create service token");
268
+
return Err(ApiError::InternalError.into_response());
269
+
}
270
+
}
271
}
272
match request_builder.send().await {
273
Ok(resp) => {
+1
src/api/repo/blob.rs
+1
src/api/repo/blob.rs
+52
-8
src/api/server/account_status.rs
+52
-8
src/api/server/account_status.rs
···
31
State(state): State<AppState>,
32
headers: axum::http::HeaderMap,
33
) -> Response {
34
-
let token = match crate::auth::extract_bearer_token_from_header(
35
headers.get("Authorization").and_then(|h| h.to_str().ok())
36
) {
37
Some(t) => t,
38
None => return ApiError::AuthenticationRequired.into_response(),
39
};
40
-
let did = match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
41
Ok(user) => user.did,
42
Err(e) => return ApiError::from(e).into_response(),
43
};
···
101
State(state): State<AppState>,
102
headers: axum::http::HeaderMap,
103
) -> Response {
104
-
let token = match crate::auth::extract_bearer_token_from_header(
105
headers.get("Authorization").and_then(|h| h.to_str().ok())
106
) {
107
Some(t) => t,
108
None => return ApiError::AuthenticationRequired.into_response(),
109
};
110
-
let did = match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
111
Ok(user) => user.did,
112
Err(e) => return ApiError::from(e).into_response(),
113
};
···
148
headers: axum::http::HeaderMap,
149
Json(_input): Json<DeactivateAccountInput>,
150
) -> Response {
151
-
let token = match crate::auth::extract_bearer_token_from_header(
152
headers.get("Authorization").and_then(|h| h.to_str().ok())
153
) {
154
Some(t) => t,
155
None => return ApiError::AuthenticationRequired.into_response(),
156
};
157
-
let did = match crate::auth::validate_bearer_token(&state.db, &token).await {
158
Ok(user) => user.did,
159
Err(e) => return ApiError::from(e).into_response(),
160
};
···
188
State(state): State<AppState>,
189
headers: axum::http::HeaderMap,
190
) -> Response {
191
-
let token = match crate::auth::extract_bearer_token_from_header(
192
headers.get("Authorization").and_then(|h| h.to_str().ok())
193
) {
194
Some(t) => t,
195
None => return ApiError::AuthenticationRequired.into_response(),
196
};
197
-
let did = match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
198
Ok(user) => user.did,
199
Err(e) => return ApiError::from(e).into_response(),
200
};
···
31
State(state): State<AppState>,
32
headers: axum::http::HeaderMap,
33
) -> Response {
34
+
let extracted = match crate::auth::extract_auth_token_from_header(
35
headers.get("Authorization").and_then(|h| h.to_str().ok())
36
) {
37
Some(t) => t,
38
None => return ApiError::AuthenticationRequired.into_response(),
39
};
40
+
let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
41
+
let http_uri = format!("https://{}/xrpc/com.atproto.server.checkAccountStatus",
42
+
std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()));
43
+
let did = match crate::auth::validate_token_with_dpop(
44
+
&state.db,
45
+
&extracted.token,
46
+
extracted.is_dpop,
47
+
dpop_proof,
48
+
"GET",
49
+
&http_uri,
50
+
true,
51
+
).await {
52
Ok(user) => user.did,
53
Err(e) => return ApiError::from(e).into_response(),
54
};
···
112
State(state): State<AppState>,
113
headers: axum::http::HeaderMap,
114
) -> Response {
115
+
let extracted = match crate::auth::extract_auth_token_from_header(
116
headers.get("Authorization").and_then(|h| h.to_str().ok())
117
) {
118
Some(t) => t,
119
None => return ApiError::AuthenticationRequired.into_response(),
120
};
121
+
let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
122
+
let http_uri = format!("https://{}/xrpc/com.atproto.server.activateAccount",
123
+
std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()));
124
+
let did = match crate::auth::validate_token_with_dpop(
125
+
&state.db,
126
+
&extracted.token,
127
+
extracted.is_dpop,
128
+
dpop_proof,
129
+
"POST",
130
+
&http_uri,
131
+
true,
132
+
).await {
133
Ok(user) => user.did,
134
Err(e) => return ApiError::from(e).into_response(),
135
};
···
170
headers: axum::http::HeaderMap,
171
Json(_input): Json<DeactivateAccountInput>,
172
) -> Response {
173
+
let extracted = match crate::auth::extract_auth_token_from_header(
174
headers.get("Authorization").and_then(|h| h.to_str().ok())
175
) {
176
Some(t) => t,
177
None => return ApiError::AuthenticationRequired.into_response(),
178
};
179
+
let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
180
+
let http_uri = format!("https://{}/xrpc/com.atproto.server.deactivateAccount",
181
+
std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()));
182
+
let did = match crate::auth::validate_token_with_dpop(
183
+
&state.db,
184
+
&extracted.token,
185
+
extracted.is_dpop,
186
+
dpop_proof,
187
+
"POST",
188
+
&http_uri,
189
+
false,
190
+
).await {
191
Ok(user) => user.did,
192
Err(e) => return ApiError::from(e).into_response(),
193
};
···
221
State(state): State<AppState>,
222
headers: axum::http::HeaderMap,
223
) -> Response {
224
+
let extracted = match crate::auth::extract_auth_token_from_header(
225
headers.get("Authorization").and_then(|h| h.to_str().ok())
226
) {
227
Some(t) => t,
228
None => return ApiError::AuthenticationRequired.into_response(),
229
};
230
+
let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
231
+
let http_uri = format!("https://{}/xrpc/com.atproto.server.requestAccountDelete",
232
+
std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()));
233
+
let did = match crate::auth::validate_token_with_dpop(
234
+
&state.db,
235
+
&extracted.token,
236
+
extracted.is_dpop,
237
+
dpop_proof,
238
+
"POST",
239
+
&http_uri,
240
+
true,
241
+
).await {
242
Ok(user) => user.did,
243
Err(e) => return ApiError::from(e).into_response(),
244
};
+23
-5
src/api/server/session.rs
+23
-5
src/api/server/session.rs
···
29
"unknown".to_string()
30
}
31
32
#[derive(Deserialize)]
33
pub struct CreateSessionInput {
34
pub identifier: String,
···
62
)
63
.into_response();
64
}
65
let row = match sqlx::query!(
66
r#"SELECT
67
u.id, u.did, u.handle, u.password_hash,
···
70
FROM users u
71
JOIN user_keys k ON u.id = k.user_id
72
WHERE u.handle = $1 OR u.email = $1"#,
73
-
input.identifier
74
)
75
.fetch_optional(&state.db)
76
.await
···
152
error!("Failed to insert session: {:?}", e);
153
return ApiError::InternalError.into_response();
154
}
155
Json(CreateSessionOutput {
156
access_jwt: access_meta.token,
157
refresh_jwt: refresh_meta.token,
158
-
handle: row.handle,
159
did: row.did,
160
}).into_response()
161
}
···
182
crate::notifications::NotificationChannel::Telegram => ("telegram", row.telegram_verified),
183
crate::notifications::NotificationChannel::Signal => ("signal", row.signal_verified),
184
};
185
Json(json!({
186
-
"handle": row.handle,
187
"did": auth_user.did,
188
"email": row.email,
189
"emailConfirmed": row.email_confirmed,
190
"preferredChannel": preferred_channel,
191
"preferredChannelVerified": preferred_channel_verified,
192
"didDoc": {}
193
})).into_response()
194
}
···
381
crate::notifications::NotificationChannel::Telegram => ("telegram", u.telegram_verified),
382
crate::notifications::NotificationChannel::Signal => ("signal", u.signal_verified),
383
};
384
Json(json!({
385
"accessJwt": new_access_meta.token,
386
"refreshJwt": new_refresh_meta.token,
387
-
"handle": u.handle,
388
"did": session_row.did,
389
"email": u.email,
390
"emailConfirmed": u.email_confirmed,
391
"preferredChannel": preferred_channel,
392
-
"preferredChannelVerified": preferred_channel_verified
393
})).into_response()
394
}
395
Ok(None) => {
···
29
"unknown".to_string()
30
}
31
32
+
fn normalize_handle(identifier: &str, pds_hostname: &str) -> String {
33
+
let suffix = format!(".{}", pds_hostname);
34
+
if identifier.ends_with(&suffix) {
35
+
identifier[..identifier.len() - suffix.len()].to_string()
36
+
} else {
37
+
identifier.to_string()
38
+
}
39
+
}
40
+
41
#[derive(Deserialize)]
42
pub struct CreateSessionInput {
43
pub identifier: String,
···
71
)
72
.into_response();
73
}
74
+
let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
75
+
let normalized_identifier = normalize_handle(&input.identifier, &pds_hostname);
76
let row = match sqlx::query!(
77
r#"SELECT
78
u.id, u.did, u.handle, u.password_hash,
···
81
FROM users u
82
JOIN user_keys k ON u.id = k.user_id
83
WHERE u.handle = $1 OR u.email = $1"#,
84
+
normalized_identifier
85
)
86
.fetch_optional(&state.db)
87
.await
···
163
error!("Failed to insert session: {:?}", e);
164
return ApiError::InternalError.into_response();
165
}
166
+
let full_handle = format!("{}.{}", row.handle, pds_hostname);
167
Json(CreateSessionOutput {
168
access_jwt: access_meta.token,
169
refresh_jwt: refresh_meta.token,
170
+
handle: full_handle,
171
did: row.did,
172
}).into_response()
173
}
···
194
crate::notifications::NotificationChannel::Telegram => ("telegram", row.telegram_verified),
195
crate::notifications::NotificationChannel::Signal => ("signal", row.signal_verified),
196
};
197
+
let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
198
+
let full_handle = format!("{}.{}", row.handle, pds_hostname);
199
Json(json!({
200
+
"handle": full_handle,
201
"did": auth_user.did,
202
"email": row.email,
203
"emailConfirmed": row.email_confirmed,
204
"preferredChannel": preferred_channel,
205
"preferredChannelVerified": preferred_channel_verified,
206
+
"active": true,
207
"didDoc": {}
208
})).into_response()
209
}
···
396
crate::notifications::NotificationChannel::Telegram => ("telegram", u.telegram_verified),
397
crate::notifications::NotificationChannel::Signal => ("signal", u.signal_verified),
398
};
399
+
let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
400
+
let full_handle = format!("{}.{}", u.handle, pds_hostname);
401
Json(json!({
402
"accessJwt": new_access_meta.token,
403
"refreshJwt": new_refresh_meta.token,
404
+
"handle": full_handle,
405
"did": session_row.did,
406
"email": u.email,
407
"emailConfirmed": u.email_confirmed,
408
"preferredChannel": preferred_channel,
409
+
"preferredChannelVerified": preferred_channel_verified,
410
+
"active": true
411
})).into_response()
412
}
413
Ok(None) => {
+28
src/auth/extractor.rs
+28
src/auth/extractor.rs
···
94
Some(token.to_string())
95
}
96
97
+
pub struct ExtractedToken {
98
+
pub token: String,
99
+
pub is_dpop: bool,
100
+
}
101
+
102
+
pub fn extract_auth_token_from_header(auth_header: Option<&str>) -> Option<ExtractedToken> {
103
+
let header = auth_header?;
104
+
let header = header.trim();
105
+
106
+
if header.len() >= 7 && header[..7].eq_ignore_ascii_case("bearer ") {
107
+
let token = header[7..].trim();
108
+
if token.is_empty() {
109
+
return None;
110
+
}
111
+
return Some(ExtractedToken { token: token.to_string(), is_dpop: false });
112
+
}
113
+
114
+
if header.len() >= 5 && header[..5].eq_ignore_ascii_case("dpop ") {
115
+
let token = header[5..].trim();
116
+
if token.is_empty() {
117
+
return None;
118
+
}
119
+
return Some(ExtractedToken { token: token.to_string(), is_dpop: true });
120
+
}
121
+
122
+
None
123
+
}
124
+
125
impl FromRequestParts<AppState> for BearerAuth {
126
type Rejection = AuthError;
127
+73
-3
src/auth/mod.rs
+73
-3
src/auth/mod.rs
···
10
pub mod token;
11
pub mod verify;
12
13
-
pub use extractor::{BearerAuth, BearerAuthAllowDeactivated, AuthError, extract_bearer_token_from_header};
14
pub use token::{
15
create_access_token, create_refresh_token, create_service_token,
16
create_access_token_with_metadata, create_refresh_token_with_metadata,
···
195
196
if let Ok(oauth_info) = crate::oauth::verify::extract_oauth_token_info(token) {
197
if let Some(oauth_token) = sqlx::query!(
198
-
r#"SELECT t.did, t.expires_at, u.deactivated_at, u.takedown_ref
199
FROM oauth_token t
200
JOIN users u ON t.did = u.did
201
WHERE t.token_id = $1"#,
202
oauth_info.token_id
203
)
···
216
217
let now = chrono::Utc::now();
218
if oauth_token.expires_at > now {
219
return Ok(AuthenticatedUser {
220
did: oauth_token.did,
221
-
key_bytes: None,
222
is_oauth: true,
223
});
224
}
···
231
pub async fn invalidate_auth_cache(cache: &Arc<dyn Cache>, did: &str) {
232
let key_cache_key = format!("auth:key:{}", did);
233
let _ = cache.delete(&key_cache_key).await;
234
}
235
236
#[derive(Debug, Serialize, Deserialize)]
···
10
pub mod token;
11
pub mod verify;
12
13
+
pub use extractor::{BearerAuth, BearerAuthAllowDeactivated, AuthError, extract_bearer_token_from_header, extract_auth_token_from_header, ExtractedToken};
14
pub use token::{
15
create_access_token, create_refresh_token, create_service_token,
16
create_access_token_with_metadata, create_refresh_token_with_metadata,
···
195
196
if let Ok(oauth_info) = crate::oauth::verify::extract_oauth_token_info(token) {
197
if let Some(oauth_token) = sqlx::query!(
198
+
r#"SELECT t.did, t.expires_at, u.deactivated_at, u.takedown_ref,
199
+
k.key_bytes as "key_bytes?", k.encryption_version as "encryption_version?"
200
FROM oauth_token t
201
JOIN users u ON t.did = u.did
202
+
LEFT JOIN user_keys k ON u.id = k.user_id
203
WHERE t.token_id = $1"#,
204
oauth_info.token_id
205
)
···
218
219
let now = chrono::Utc::now();
220
if oauth_token.expires_at > now {
221
+
let key_bytes = if let (Some(kb), Some(ev)) = (&oauth_token.key_bytes, oauth_token.encryption_version) {
222
+
crate::config::decrypt_key(kb, Some(ev)).ok()
223
+
} else {
224
+
None
225
+
};
226
return Ok(AuthenticatedUser {
227
did: oauth_token.did,
228
+
key_bytes,
229
is_oauth: true,
230
});
231
}
···
238
pub async fn invalidate_auth_cache(cache: &Arc<dyn Cache>, did: &str) {
239
let key_cache_key = format!("auth:key:{}", did);
240
let _ = cache.delete(&key_cache_key).await;
241
+
}
242
+
243
+
pub async fn validate_token_with_dpop(
244
+
db: &PgPool,
245
+
token: &str,
246
+
is_dpop_token: bool,
247
+
dpop_proof: Option<&str>,
248
+
http_method: &str,
249
+
http_uri: &str,
250
+
allow_deactivated: bool,
251
+
) -> Result<AuthenticatedUser, TokenValidationError> {
252
+
if !is_dpop_token {
253
+
if allow_deactivated {
254
+
return validate_bearer_token_allow_deactivated(db, token).await;
255
+
} else {
256
+
return validate_bearer_token(db, token).await;
257
+
}
258
+
}
259
+
match crate::oauth::verify::verify_oauth_access_token(db, token, dpop_proof, http_method, http_uri).await {
260
+
Ok(result) => {
261
+
if !allow_deactivated {
262
+
let deactivated = sqlx::query_scalar!(
263
+
"SELECT deactivated_at FROM users WHERE did = $1",
264
+
result.did
265
+
)
266
+
.fetch_optional(db)
267
+
.await
268
+
.ok()
269
+
.flatten()
270
+
.flatten();
271
+
if deactivated.is_some() {
272
+
return Err(TokenValidationError::AccountDeactivated);
273
+
}
274
+
}
275
+
let takedown = sqlx::query_scalar!(
276
+
"SELECT takedown_ref FROM users WHERE did = $1",
277
+
result.did
278
+
)
279
+
.fetch_optional(db)
280
+
.await
281
+
.ok()
282
+
.flatten()
283
+
.flatten();
284
+
if takedown.is_some() {
285
+
return Err(TokenValidationError::AccountTakedown);
286
+
}
287
+
let key_bytes = sqlx::query!(
288
+
"SELECT k.key_bytes, k.encryption_version FROM users u JOIN user_keys k ON u.id = k.user_id WHERE u.did = $1",
289
+
result.did
290
+
)
291
+
.fetch_optional(db)
292
+
.await
293
+
.ok()
294
+
.flatten()
295
+
.and_then(|row| crate::config::decrypt_key(&row.key_bytes, row.encryption_version).ok());
296
+
Ok(AuthenticatedUser {
297
+
did: result.did,
298
+
key_bytes,
299
+
is_oauth: true,
300
+
})
301
+
}
302
+
Err(_) => Err(TokenValidationError::AuthenticationFailed),
303
+
}
304
}
305
306
#[derive(Debug, Serialize, Deserialize)]
+95
-13
src/oauth/client.rs
+95
-13
src/oauth/client.rs
···
86
}
87
}
88
89
pub async fn get(&self, client_id: &str) -> Result<ClientMetadata, OAuthError> {
90
{
91
let cache = self.cache.read().await;
92
if let Some(cached) = cache.get(client_id) {
···
250
metadata: &ClientMetadata,
251
redirect_uri: &str,
252
) -> Result<(), OAuthError> {
253
-
if !metadata.redirect_uris.contains(&redirect_uri.to_string()) {
254
-
return Err(OAuthError::InvalidRequest(
255
-
"redirect_uri not registered for client".to_string(),
256
-
));
257
}
258
-
Ok(())
259
}
260
261
fn validate_redirect_uri_format(&self, uri: &str) -> Result<(), OAuthError> {
···
344
metadata: &ClientMetadata,
345
client_assertion: &str,
346
) -> Result<(), OAuthError> {
347
-
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
348
let parts: Vec<&str> = client_assertion.split('.').collect();
349
if parts.len() != 3 {
350
return Err(OAuthError::InvalidClient("Invalid client_assertion format".to_string()));
351
}
352
let header_bytes = URL_SAFE_NO_PAD
353
.decode(parts[0])
354
.map_err(|_| OAuthError::InvalidClient("Invalid assertion header encoding".to_string()))?;
355
let header: serde_json::Value = serde_json::from_slice(&header_bytes)
356
.map_err(|_| OAuthError::InvalidClient("Invalid assertion header JSON".to_string()))?;
···
366
let kid = header.get("kid").and_then(|k| k.as_str());
367
let payload_bytes = URL_SAFE_NO_PAD
368
.decode(parts[1])
369
-
.map_err(|_| OAuthError::InvalidClient("Invalid assertion payload encoding".to_string()))?;
370
let payload: serde_json::Value = serde_json::from_slice(&payload_bytes)
371
.map_err(|_| OAuthError::InvalidClient("Invalid assertion payload JSON".to_string()))?;
372
let iss = payload.get("iss").and_then(|i| i.as_str()).ok_or_else(|| {
···
385
"client_assertion sub does not match client_id".to_string(),
386
));
387
}
388
-
let exp = payload.get("exp").and_then(|e| e.as_i64()).ok_or_else(|| {
389
-
OAuthError::InvalidClient("Missing exp in client_assertion".to_string())
390
-
})?;
391
let now = chrono::Utc::now().timestamp();
392
-
if exp < now {
393
-
return Err(OAuthError::InvalidClient("client_assertion has expired".to_string()));
394
}
395
-
let iat = payload.get("iat").and_then(|i| i.as_i64());
396
if let Some(iat) = iat {
397
if iat > now + 60 {
398
return Err(OAuthError::InvalidClient(
···
86
}
87
}
88
89
+
fn is_loopback_client(client_id: &str) -> bool {
90
+
if let Ok(url) = reqwest::Url::parse(client_id) {
91
+
url.scheme() == "http"
92
+
&& url.host_str() == Some("localhost")
93
+
&& url.port().is_none()
94
+
} else {
95
+
false
96
+
}
97
+
}
98
+
99
+
fn build_loopback_metadata(client_id: &str) -> Result<ClientMetadata, OAuthError> {
100
+
let url = reqwest::Url::parse(client_id).map_err(|_| {
101
+
OAuthError::InvalidClient("Invalid loopback client_id URL".to_string())
102
+
})?;
103
+
let mut redirect_uris = Vec::new();
104
+
for (key, value) in url.query_pairs() {
105
+
if key == "redirect_uri" {
106
+
redirect_uris.push(value.to_string());
107
+
}
108
+
}
109
+
if redirect_uris.is_empty() {
110
+
redirect_uris.push("http://127.0.0.1/callback".to_string());
111
+
redirect_uris.push("http://localhost/callback".to_string());
112
+
}
113
+
let scope = Some("atproto transition:generic transition:chat.bsky".to_string());
114
+
Ok(ClientMetadata {
115
+
client_id: client_id.to_string(),
116
+
client_name: Some("Loopback Client".to_string()),
117
+
client_uri: None,
118
+
logo_uri: None,
119
+
redirect_uris,
120
+
grant_types: vec!["authorization_code".to_string(), "refresh_token".to_string()],
121
+
response_types: vec!["code".to_string()],
122
+
scope,
123
+
token_endpoint_auth_method: Some("none".to_string()),
124
+
dpop_bound_access_tokens: Some(false),
125
+
jwks: None,
126
+
jwks_uri: None,
127
+
application_type: Some("native".to_string()),
128
+
})
129
+
}
130
+
131
pub async fn get(&self, client_id: &str) -> Result<ClientMetadata, OAuthError> {
132
+
if Self::is_loopback_client(client_id) {
133
+
return Self::build_loopback_metadata(client_id);
134
+
}
135
{
136
let cache = self.cache.read().await;
137
if let Some(cached) = cache.get(client_id) {
···
295
metadata: &ClientMetadata,
296
redirect_uri: &str,
297
) -> Result<(), OAuthError> {
298
+
if metadata.redirect_uris.contains(&redirect_uri.to_string()) {
299
+
return Ok(());
300
}
301
+
if Self::is_loopback_client(&metadata.client_id) {
302
+
if let Ok(req_url) = reqwest::Url::parse(redirect_uri) {
303
+
let req_host = req_url.host_str().unwrap_or("");
304
+
let is_loopback_redirect = req_url.scheme() == "http"
305
+
&& (req_host == "localhost" || req_host == "127.0.0.1" || req_host == "[::1]");
306
+
if is_loopback_redirect {
307
+
for registered in &metadata.redirect_uris {
308
+
if let Ok(reg_url) = reqwest::Url::parse(registered) {
309
+
let reg_host = reg_url.host_str().unwrap_or("");
310
+
let hosts_match = (req_host == "localhost" && reg_host == "localhost")
311
+
|| (req_host == "127.0.0.1" && reg_host == "127.0.0.1")
312
+
|| (req_host == "[::1]" && reg_host == "[::1]")
313
+
|| (req_host == "localhost" && reg_host == "127.0.0.1")
314
+
|| (req_host == "127.0.0.1" && reg_host == "localhost");
315
+
if hosts_match && req_url.path() == reg_url.path() {
316
+
return Ok(());
317
+
}
318
+
}
319
+
}
320
+
}
321
+
}
322
+
}
323
+
Err(OAuthError::InvalidRequest(
324
+
"redirect_uri not registered for client".to_string(),
325
+
))
326
}
327
328
fn validate_redirect_uri_format(&self, uri: &str) -> Result<(), OAuthError> {
···
411
metadata: &ClientMetadata,
412
client_assertion: &str,
413
) -> Result<(), OAuthError> {
414
+
use base64::{Engine as _, engine::general_purpose::{URL_SAFE_NO_PAD, STANDARD}};
415
let parts: Vec<&str> = client_assertion.split('.').collect();
416
if parts.len() != 3 {
417
return Err(OAuthError::InvalidClient("Invalid client_assertion format".to_string()));
418
}
419
let header_bytes = URL_SAFE_NO_PAD
420
.decode(parts[0])
421
+
.or_else(|_| STANDARD.decode(parts[0]))
422
.map_err(|_| OAuthError::InvalidClient("Invalid assertion header encoding".to_string()))?;
423
let header: serde_json::Value = serde_json::from_slice(&header_bytes)
424
.map_err(|_| OAuthError::InvalidClient("Invalid assertion header JSON".to_string()))?;
···
434
let kid = header.get("kid").and_then(|k| k.as_str());
435
let payload_bytes = URL_SAFE_NO_PAD
436
.decode(parts[1])
437
+
.or_else(|_| STANDARD.decode(parts[1]))
438
+
.map_err(|e| {
439
+
tracing::warn!(error = %e, payload_part = parts[1], "Invalid assertion payload encoding");
440
+
OAuthError::InvalidClient("Invalid assertion payload encoding".to_string())
441
+
})?;
442
let payload: serde_json::Value = serde_json::from_slice(&payload_bytes)
443
.map_err(|_| OAuthError::InvalidClient("Invalid assertion payload JSON".to_string()))?;
444
let iss = payload.get("iss").and_then(|i| i.as_str()).ok_or_else(|| {
···
457
"client_assertion sub does not match client_id".to_string(),
458
));
459
}
460
let now = chrono::Utc::now().timestamp();
461
+
let exp = payload.get("exp").and_then(|e| e.as_i64());
462
+
let iat = payload.get("iat").and_then(|i| i.as_i64());
463
+
if let Some(exp) = exp {
464
+
if exp < now {
465
+
return Err(OAuthError::InvalidClient("client_assertion has expired".to_string()));
466
+
}
467
+
} else if let Some(iat) = iat {
468
+
let max_age_secs = 300;
469
+
if now - iat > max_age_secs {
470
+
tracing::warn!(iat = iat, now = now, "client_assertion too old (no exp, using iat)");
471
+
return Err(OAuthError::InvalidClient("client_assertion is too old".to_string()));
472
+
}
473
+
} else {
474
+
return Err(OAuthError::InvalidClient(
475
+
"client_assertion must have exp or iat claim".to_string(),
476
+
));
477
}
478
if let Some(iat) = iat {
479
if iat > now + 60 {
480
return Err(OAuthError::InvalidClient(
+11
src/oauth/endpoints/metadata.rs
+11
src/oauth/endpoints/metadata.rs
···
31
#[serde(skip_serializing_if = "Option::is_none")]
32
pub token_endpoint_auth_methods_supported: Option<Vec<String>>,
33
#[serde(skip_serializing_if = "Option::is_none")]
34
pub code_challenge_methods_supported: Option<Vec<String>>,
35
#[serde(skip_serializing_if = "Option::is_none")]
36
pub pushed_authorization_request_endpoint: Option<String>,
···
44
pub revocation_endpoint: Option<String>,
45
#[serde(skip_serializing_if = "Option::is_none")]
46
pub introspection_endpoint: Option<String>,
47
}
48
49
pub async fn oauth_protected_resource(
···
86
"none".to_string(),
87
"private_key_jwt".to_string(),
88
]),
89
code_challenge_methods_supported: Some(vec!["S256".to_string()]),
90
pushed_authorization_request_endpoint: Some(format!("{}/oauth/par", issuer)),
91
require_pushed_authorization_requests: Some(true),
···
98
authorization_response_iss_parameter_supported: Some(true),
99
revocation_endpoint: Some(format!("{}/oauth/revoke", issuer)),
100
introspection_endpoint: Some(format!("{}/oauth/introspect", issuer)),
101
})
102
}
103
···
31
#[serde(skip_serializing_if = "Option::is_none")]
32
pub token_endpoint_auth_methods_supported: Option<Vec<String>>,
33
#[serde(skip_serializing_if = "Option::is_none")]
34
+
pub token_endpoint_auth_signing_alg_values_supported: Option<Vec<String>>,
35
+
#[serde(skip_serializing_if = "Option::is_none")]
36
pub code_challenge_methods_supported: Option<Vec<String>>,
37
#[serde(skip_serializing_if = "Option::is_none")]
38
pub pushed_authorization_request_endpoint: Option<String>,
···
46
pub revocation_endpoint: Option<String>,
47
#[serde(skip_serializing_if = "Option::is_none")]
48
pub introspection_endpoint: Option<String>,
49
+
#[serde(skip_serializing_if = "Option::is_none")]
50
+
pub client_id_metadata_document_supported: Option<bool>,
51
}
52
53
pub async fn oauth_protected_resource(
···
90
"none".to_string(),
91
"private_key_jwt".to_string(),
92
]),
93
+
token_endpoint_auth_signing_alg_values_supported: Some(vec![
94
+
"ES256".to_string(),
95
+
"ES384".to_string(),
96
+
"ES512".to_string(),
97
+
"EdDSA".to_string(),
98
+
]),
99
code_challenge_methods_supported: Some(vec!["S256".to_string()]),
100
pushed_authorization_request_endpoint: Some(format!("{}/oauth/par", issuer)),
101
require_pushed_authorization_requests: Some(true),
···
108
authorization_response_iss_parameter_supported: Some(true),
109
revocation_endpoint: Some(format!("{}/oauth/revoke", issuer)),
110
introspection_endpoint: Some(format!("{}/oauth/introspect", issuer)),
111
+
client_id_metadata_document_supported: Some(true),
112
})
113
}
114
+9
-13
src/oauth/endpoints/par.rs
+9
-13
src/oauth/endpoints/par.rs
···
50
State(state): State<AppState>,
51
headers: HeaderMap,
52
Form(request): Form<ParRequest>,
53
-
) -> Result<Json<ParResponse>, OAuthError> {
54
let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
55
if !state.check_rate_limit(RateLimitKind::OAuthPar, &client_ip).await {
56
tracing::warn!(ip = %client_ip, "OAuth PAR rate limit exceeded");
···
63
}
64
let code_challenge = request.code_challenge.as_ref()
65
.filter(|s| !s.is_empty())
66
-
.ok_or_else(|| OAuthError::InvalidRequest(
67
-
"code_challenge is required".to_string(),
68
-
))?;
69
let code_challenge_method = request.code_challenge_method.as_deref().unwrap_or("");
70
if code_challenge_method != "S256" {
71
return Err(OAuthError::InvalidRequest(
···
76
let client_metadata = client_cache.get(&request.client_id).await?;
77
client_cache.validate_redirect_uri(&client_metadata, &request.redirect_uri)?;
78
let client_auth = determine_client_auth(&request)?;
79
-
if client_metadata.requires_dpop() && request.dpop_jkt.is_none() {
80
-
return Err(OAuthError::InvalidRequest(
81
-
"dpop_jkt is required for this client".to_string(),
82
-
));
83
-
}
84
let validated_scope = validate_scope(&request.scope, &client_metadata)?;
85
let request_id = RequestId::generate();
86
let expires_at = Utc::now() + Duration::seconds(PAR_EXPIRY_SECONDS);
···
114
}
115
}
116
});
117
-
Ok(Json(ParResponse {
118
-
request_uri: request_id.0,
119
-
expires_in: PAR_EXPIRY_SECONDS as u64,
120
-
}))
121
}
122
123
fn determine_client_auth(request: &ParRequest) -> Result<ClientAuth, OAuthError> {
···
50
State(state): State<AppState>,
51
headers: HeaderMap,
52
Form(request): Form<ParRequest>,
53
+
) -> Result<(axum::http::StatusCode, Json<ParResponse>), OAuthError> {
54
let client_ip = crate::rate_limit::extract_client_ip(&headers, None);
55
if !state.check_rate_limit(RateLimitKind::OAuthPar, &client_ip).await {
56
tracing::warn!(ip = %client_ip, "OAuth PAR rate limit exceeded");
···
63
}
64
let code_challenge = request.code_challenge.as_ref()
65
.filter(|s| !s.is_empty())
66
+
.ok_or_else(|| OAuthError::InvalidRequest("code_challenge is required".to_string()))?;
67
let code_challenge_method = request.code_challenge_method.as_deref().unwrap_or("");
68
if code_challenge_method != "S256" {
69
return Err(OAuthError::InvalidRequest(
···
74
let client_metadata = client_cache.get(&request.client_id).await?;
75
client_cache.validate_redirect_uri(&client_metadata, &request.redirect_uri)?;
76
let client_auth = determine_client_auth(&request)?;
77
let validated_scope = validate_scope(&request.scope, &client_metadata)?;
78
let request_id = RequestId::generate();
79
let expires_at = Utc::now() + Duration::seconds(PAR_EXPIRY_SECONDS);
···
107
}
108
}
109
});
110
+
Ok((
111
+
axum::http::StatusCode::CREATED,
112
+
Json(ParResponse {
113
+
request_uri: request_id.0,
114
+
expires_in: PAR_EXPIRY_SECONDS as u64,
115
+
}),
116
+
))
117
}
118
119
fn determine_client_auth(request: &ParRequest) -> Result<ClientAuth, OAuthError> {
+17
-4
src/oauth/endpoints/token/grants.rs
+17
-4
src/oauth/endpoints/token/grants.rs
···
42
.did
43
.ok_or_else(|| OAuthError::InvalidGrant("Authorization not completed".to_string()))?;
44
let client_metadata_cache = ClientMetadataCache::new(3600);
45
-
let client_metadata = client_metadata_cache
46
-
.get(&auth_request.client_id)
47
-
.await?;
48
-
let client_auth = auth_request.client_auth.clone().unwrap_or(ClientAuth::None);
49
verify_client_auth(&client_metadata_cache, &client_metadata, &client_auth).await?;
50
verify_pkce(&auth_request.parameters.code_challenge, &code_verifier)?;
51
if let Some(redirect_uri) = &request.redirect_uri {
···
42
.did
43
.ok_or_else(|| OAuthError::InvalidGrant("Authorization not completed".to_string()))?;
44
let client_metadata_cache = ClientMetadataCache::new(3600);
45
+
let client_metadata = client_metadata_cache.get(&auth_request.client_id).await?;
46
+
let client_auth = if let (Some(assertion), Some(assertion_type)) = (&request.client_assertion, &request.client_assertion_type) {
47
+
if assertion_type != "urn:ietf:params:oauth:client-assertion-type:jwt-bearer" {
48
+
return Err(OAuthError::InvalidClient(
49
+
"Unsupported client_assertion_type".to_string(),
50
+
));
51
+
}
52
+
ClientAuth::PrivateKeyJwt {
53
+
client_assertion: assertion.clone(),
54
+
}
55
+
} else if let Some(secret) = &request.client_secret {
56
+
ClientAuth::SecretPost {
57
+
client_secret: secret.clone(),
58
+
}
59
+
} else {
60
+
ClientAuth::None
61
+
};
62
verify_client_auth(&client_metadata_cache, &client_metadata, &client_auth).await?;
63
verify_pkce(&auth_request.parameters.code_challenge, &code_verifier)?;
64
if let Some(redirect_uri) = &request.redirect_uri {
+1
-1
src/oauth/templates.rs
+1
-1
src/oauth/templates.rs
···
394
<label for="remember_device">Remember this device</label>
395
</div>
396
<div class="buttons">
397
-
<button type="submit" formaction="/oauth/authorize/deny" class="btn btn-secondary">Cancel</button>
398
<button type="submit" class="btn btn-primary">Sign in</button>
399
</div>
400
</form>
401
<div class="footer">
···
394
<label for="remember_device">Remember this device</label>
395
</div>
396
<div class="buttons">
397
<button type="submit" class="btn btn-primary">Sign in</button>
398
+
<button type="submit" formaction="/oauth/authorize/deny" class="btn btn-secondary">Cancel</button>
399
</div>
400
</form>
401
<div class="footer">