High-performance implementation of plcbundle written in Rust
at main 169 lines 5.9 kB view raw
1// Bundle-related handlers 2 3use crate::constants; 4use crate::server::ServerState; 5use crate::server::error::{ 6 bad_request, internal_error, is_not_found_error, not_found, task_join_error, 7}; 8use crate::server::utils::{bundle_download_headers, parse_operation_pointer}; 9use axum::{ 10 body::Body, 11 extract::{Path, State}, 12 http::{HeaderMap, HeaderValue, StatusCode}, 13 response::IntoResponse, 14}; 15use std::sync::Arc; 16use std::time::Instant; 17use tokio_util::io::ReaderStream; 18 19pub async fn handle_bundle( 20 State(state): State<ServerState>, 21 Path(number): Path<u32>, 22) -> impl IntoResponse { 23 match state.manager.get_bundle_metadata(number) { 24 Ok(Some(meta)) => (StatusCode::OK, axum::Json(meta)).into_response(), 25 Ok(None) => not_found("Bundle not found").into_response(), 26 Err(e) => internal_error(&e.to_string()).into_response(), 27 } 28} 29 30pub async fn handle_bundle_data( 31 State(state): State<ServerState>, 32 Path(number): Path<u32>, 33) -> impl IntoResponse { 34 // Use BundleManager API to get bundle file stream 35 let file_result = tokio::task::spawn_blocking({ 36 let manager = Arc::clone(&state.manager); 37 move || manager.stream_bundle_raw(number) 38 }) 39 .await; 40 41 match file_result { 42 Ok(Ok(std_file)) => { 43 // Convert std::fs::File to tokio::fs::File 44 let file = tokio::fs::File::from_std(std_file); 45 let stream = ReaderStream::new(file); 46 let body = Body::from_stream(stream); 47 48 let headers = 49 bundle_download_headers("application/zstd", &constants::bundle_filename(number)); 50 51 (StatusCode::OK, headers, body).into_response() 52 } 53 Ok(Err(e)) => { 54 // Handle errors from BundleManager 55 if is_not_found_error(&e) { 56 not_found("Bundle not found").into_response() 57 } else { 58 internal_error(&e.to_string()).into_response() 59 } 60 } 61 Err(e) => task_join_error(e).into_response(), 62 } 63} 64 65pub async fn handle_bundle_jsonl( 66 State(state): State<ServerState>, 67 Path(number): Path<u32>, 68) -> impl IntoResponse { 69 // For streaming decompressed data, read in spawn_blocking and stream chunks 70 // TODO: Implement true async streaming when tokio-util supports it better 71 match tokio::task::spawn_blocking({ 72 let manager = Arc::clone(&state.manager); 73 move || { 74 let mut reader = manager.stream_bundle_decompressed(number)?; 75 use std::io::Read; 76 let mut buf = Vec::new(); 77 reader.read_to_end(&mut buf)?; 78 Ok::<Vec<u8>, anyhow::Error>(buf) 79 } 80 }) 81 .await 82 { 83 Ok(Ok(data)) => { 84 let filename = constants::bundle_filename(number).replace(".zst", ""); 85 let headers = bundle_download_headers("application/x-ndjson", &filename); 86 87 (StatusCode::OK, headers, data).into_response() 88 } 89 Ok(Err(e)) => { 90 if is_not_found_error(&e) { 91 not_found("Bundle not found").into_response() 92 } else { 93 internal_error(&e.to_string()).into_response() 94 } 95 } 96 Err(_) => internal_error("Task join error").into_response(), 97 } 98} 99 100pub async fn handle_operation( 101 State(state): State<ServerState>, 102 Path(pointer): Path<String>, 103) -> impl IntoResponse { 104 // Parse pointer: "bundle:position" or global position 105 let (bundle_num, position) = match parse_operation_pointer(&pointer) { 106 Ok((b, p)) => (b, p), 107 Err(e) => return bad_request(&e.to_string()).into_response(), 108 }; 109 110 if position >= constants::BUNDLE_SIZE { 111 return bad_request("Position must be 0-9999").into_response(); 112 } 113 114 let total_start = Instant::now(); 115 let load_start = Instant::now(); 116 117 // get_operation_raw performs blocking file I/O, so we need to use spawn_blocking 118 let json_result = tokio::task::spawn_blocking({ 119 let manager = Arc::clone(&state.manager); 120 move || manager.get_operation_raw(bundle_num, position) 121 }) 122 .await; 123 124 match json_result { 125 Ok(Ok(json)) => { 126 let load_duration = load_start.elapsed(); 127 let total_duration = total_start.elapsed(); 128 129 let global_pos = crate::constants::bundle_position_to_global(bundle_num, position); 130 131 let mut headers = HeaderMap::new(); 132 headers.insert("X-Bundle-Number", HeaderValue::from(bundle_num)); 133 headers.insert("X-Position", HeaderValue::from(position)); 134 headers.insert( 135 "X-Global-Position", 136 HeaderValue::from_str(&global_pos.to_string()).unwrap(), 137 ); 138 headers.insert( 139 "X-Pointer", 140 HeaderValue::from_str(&format!("{}:{}", bundle_num, position)).unwrap(), 141 ); 142 headers.insert( 143 "X-Load-Time-Ms", 144 HeaderValue::from_str(&format!("{:.3}", load_duration.as_secs_f64() * 1000.0)) 145 .unwrap(), 146 ); 147 headers.insert( 148 "X-Total-Time-Ms", 149 HeaderValue::from_str(&format!("{:.3}", total_duration.as_secs_f64() * 1000.0)) 150 .unwrap(), 151 ); 152 headers.insert( 153 "Cache-Control", 154 HeaderValue::from_static("public, max-age=31536000, immutable"), 155 ); 156 headers.insert("Content-Type", HeaderValue::from_static("application/json")); 157 158 (StatusCode::OK, headers, json).into_response() 159 } 160 Ok(Err(e)) => { 161 if is_not_found_error(&e) { 162 not_found("Operation not found").into_response() 163 } else { 164 internal_error(&e.to_string()).into_response() 165 } 166 } 167 Err(e) => task_join_error(e).into_response(), 168 } 169}