this repo has no description
1use crate::api::proxy_client::proxy_client; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 body::Bytes, 6 extract::{Path, RawQuery, State}, 7 http::{HeaderMap, Method, StatusCode}, 8 response::{IntoResponse, Response}, 9}; 10use serde_json::json; 11use tracing::{error, info, warn}; 12 13pub async fn proxy_handler( 14 State(state): State<AppState>, 15 Path(method): Path<String>, 16 method_verb: Method, 17 headers: HeaderMap, 18 RawQuery(query): RawQuery, 19 body: Bytes, 20) -> Response { 21 let proxy_header = match headers 22 .get("atproto-proxy") 23 .and_then(|h| h.to_str().ok()) 24 { 25 Some(h) => h.to_string(), 26 None => { 27 return ( 28 StatusCode::BAD_REQUEST, 29 Json(json!({ 30 "error": "InvalidRequest", 31 "message": "Missing required atproto-proxy header" 32 })), 33 ) 34 .into_response(); 35 } 36 }; 37 38 let did = proxy_header.split('#').next().unwrap_or(&proxy_header); 39 let resolved = match state.did_resolver.resolve_did(did).await { 40 Some(r) => r, 41 None => { 42 error!(did = %did, "Could not resolve service DID"); 43 return ( 44 StatusCode::BAD_GATEWAY, 45 Json(json!({ 46 "error": "UpstreamFailure", 47 "message": "Could not resolve service DID" 48 })), 49 ) 50 .into_response(); 51 } 52 }; 53 54 let target_url = match &query { 55 Some(q) => format!("{}/xrpc/{}?{}", resolved.url, method, q), 56 None => format!("{}/xrpc/{}", resolved.url, method), 57 }; 58 info!("Proxying {} request to {}", method_verb, target_url); 59 60 let client = proxy_client(); 61 let mut request_builder = client.request(method_verb, &target_url); 62 63 let mut auth_header_val = headers.get("Authorization").cloned(); 64 if let Some(token) = crate::auth::extract_bearer_token_from_header( 65 headers.get("Authorization").and_then(|h| h.to_str().ok()), 66 ) { 67 match crate::auth::validate_bearer_token(&state.db, &token).await { 68 Ok(auth_user) => { 69 if let Some(key_bytes) = auth_user.key_bytes { 70 match crate::auth::create_service_token( 71 &auth_user.did, 72 &resolved.did, 73 &method, 74 &key_bytes, 75 ) { 76 Ok(new_token) => { 77 if let Ok(val) = 78 axum::http::HeaderValue::from_str(&format!("Bearer {}", new_token)) 79 { 80 auth_header_val = Some(val); 81 } 82 } 83 Err(e) => { 84 warn!("Failed to create service token: {:?}", e); 85 } 86 } 87 } 88 } 89 Err(e) => { 90 warn!("Token validation failed: {:?}", e); 91 } 92 } 93 } 94 95 if let Some(val) = auth_header_val { 96 request_builder = request_builder.header("Authorization", val); 97 } 98 for header_name in crate::api::proxy_client::HEADERS_TO_FORWARD { 99 if let Some(val) = headers.get(*header_name) { 100 request_builder = request_builder.header(*header_name, val); 101 } 102 } 103 if !body.is_empty() { 104 request_builder = request_builder.body(body); 105 } 106 107 match request_builder.send().await { 108 Ok(resp) => { 109 let status = resp.status(); 110 let headers = resp.headers().clone(); 111 let body = match resp.bytes().await { 112 Ok(b) => b, 113 Err(e) => { 114 error!("Error reading proxy response body: {:?}", e); 115 return (StatusCode::BAD_GATEWAY, "Error reading upstream response") 116 .into_response(); 117 } 118 }; 119 let mut response_builder = Response::builder().status(status); 120 for header_name in crate::api::proxy_client::RESPONSE_HEADERS_TO_FORWARD { 121 if let Some(val) = headers.get(*header_name) { 122 response_builder = response_builder.header(*header_name, val); 123 } 124 } 125 match response_builder.body(axum::body::Body::from(body)) { 126 Ok(r) => r, 127 Err(e) => { 128 error!("Error building proxy response: {:?}", e); 129 (StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error").into_response() 130 } 131 } 132 } 133 Err(e) => { 134 error!("Error sending proxy request: {:?}", e); 135 if e.is_timeout() { 136 (StatusCode::GATEWAY_TIMEOUT, "Upstream Timeout").into_response() 137 } else { 138 (StatusCode::BAD_GATEWAY, "Upstream Error").into_response() 139 } 140 } 141 } 142}