this repo has no description
1use axum::{ 2 extract::{Path, Query, State}, 3 http::{HeaderMap, Method, StatusCode}, 4 response::{IntoResponse, Response}, 5 body::Bytes, 6}; 7use reqwest::Client; 8use tracing::{info, error}; 9use std::collections::HashMap; 10use crate::state::AppState; 11use sqlx::Row; 12 13pub async fn proxy_handler( 14 State(state): State<AppState>, 15 Path(method): Path<String>, 16 method_verb: Method, 17 headers: HeaderMap, 18 Query(params): Query<HashMap<String, String>>, 19 body: Bytes, 20) -> Response { 21 22 let proxy_header = headers.get("atproto-proxy") 23 .and_then(|h| h.to_str().ok()) 24 .map(|s| s.to_string()); 25 26 let appview_url = match &proxy_header { 27 Some(url) => url.clone(), 28 None => match std::env::var("APPVIEW_URL") { 29 Ok(url) => url, 30 Err(_) => return (StatusCode::BAD_GATEWAY, "No upstream AppView configured").into_response(), 31 }, 32 }; 33 34 let target_url = format!("{}/xrpc/{}", appview_url, method); 35 36 info!("Proxying {} request to {}", method_verb, target_url); 37 38 let client = Client::new(); 39 40 let mut request_builder = client 41 .request(method_verb, &target_url) 42 .query(&params); 43 44 let mut auth_header_val = headers.get("Authorization").map(|h| h.clone()); 45 46 if let Some(aud) = &proxy_header { 47 if let Some(auth_val) = &auth_header_val { 48 if let Ok(token) = auth_val.to_str() { 49 let token = token.replace("Bearer ", ""); 50 if let Ok(did) = crate::auth::get_did_from_token(&token) { 51 let key_row = sqlx::query("SELECT k.key_bytes FROM user_keys k JOIN users u ON k.user_id = u.id WHERE u.did = $1") 52 .bind(&did) 53 .fetch_optional(&state.db) 54 .await; 55 56 if let Ok(Some(row)) = key_row { 57 let key_bytes: Vec<u8> = row.get("key_bytes"); 58 if let Ok(new_token) = crate::auth::create_service_token(&did, aud, &method, &key_bytes) { 59 if let Ok(val) = axum::http::HeaderValue::from_str(&format!("Bearer {}", new_token)) { 60 auth_header_val = Some(val); 61 } 62 } 63 } 64 } 65 } 66 } 67 } 68 69 if let Some(val) = auth_header_val { 70 request_builder = request_builder.header("Authorization", val); 71 } 72 73 for (key, value) in headers.iter() { 74 if key != "host" && key != "content-length" && key != "authorization" { 75 request_builder = request_builder.header(key, value); 76 } 77 } 78 79 request_builder = request_builder.body(body); 80 81 match request_builder.send().await { 82 Ok(resp) => { 83 let status = resp.status(); 84 let headers = resp.headers().clone(); 85 let body = match resp.bytes().await { 86 Ok(b) => b, 87 Err(e) => { 88 error!("Error reading proxy response body: {:?}", e); 89 return (StatusCode::BAD_GATEWAY, "Error reading upstream response").into_response(); 90 } 91 }; 92 93 let mut response_builder = Response::builder().status(status); 94 95 for (key, value) in headers.iter() { 96 response_builder = response_builder.header(key, value); 97 } 98 99 match response_builder.body(axum::body::Body::from(body)) { 100 Ok(r) => r, 101 Err(e) => { 102 error!("Error building proxy response: {:?}", e); 103 (StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error").into_response() 104 } 105 } 106 }, 107 Err(e) => { 108 error!("Error sending proxy request: {:?}", e); 109 if e.is_timeout() { 110 (StatusCode::GATEWAY_TIMEOUT, "Upstream Timeout").into_response() 111 } else { 112 (StatusCode::BAD_GATEWAY, "Upstream Error").into_response() 113 } 114 } 115 } 116}