forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1// Bundle-related handlers
2
3use crate::constants;
4use crate::server::ServerState;
5use crate::server::error::{
6 bad_request, internal_error, is_not_found_error, not_found, task_join_error,
7};
8use crate::server::utils::{bundle_download_headers, parse_operation_pointer};
9use axum::{
10 body::Body,
11 extract::{Path, State},
12 http::{HeaderMap, HeaderValue, StatusCode},
13 response::IntoResponse,
14};
15use std::sync::Arc;
16use std::time::Instant;
17use tokio_util::io::ReaderStream;
18
19pub async fn handle_bundle(
20 State(state): State<ServerState>,
21 Path(number): Path<u32>,
22) -> impl IntoResponse {
23 match state.manager.get_bundle_metadata(number) {
24 Ok(Some(meta)) => (StatusCode::OK, axum::Json(meta)).into_response(),
25 Ok(None) => not_found("Bundle not found").into_response(),
26 Err(e) => internal_error(&e.to_string()).into_response(),
27 }
28}
29
30pub async fn handle_bundle_data(
31 State(state): State<ServerState>,
32 Path(number): Path<u32>,
33) -> impl IntoResponse {
34 // Use BundleManager API to get bundle file stream
35 let file_result = tokio::task::spawn_blocking({
36 let manager = Arc::clone(&state.manager);
37 move || manager.stream_bundle_raw(number)
38 })
39 .await;
40
41 match file_result {
42 Ok(Ok(std_file)) => {
43 // Convert std::fs::File to tokio::fs::File
44 let file = tokio::fs::File::from_std(std_file);
45 let stream = ReaderStream::new(file);
46 let body = Body::from_stream(stream);
47
48 let headers =
49 bundle_download_headers("application/zstd", &constants::bundle_filename(number));
50
51 (StatusCode::OK, headers, body).into_response()
52 }
53 Ok(Err(e)) => {
54 // Handle errors from BundleManager
55 if is_not_found_error(&e) {
56 not_found("Bundle not found").into_response()
57 } else {
58 internal_error(&e.to_string()).into_response()
59 }
60 }
61 Err(e) => task_join_error(e).into_response(),
62 }
63}
64
65pub async fn handle_bundle_jsonl(
66 State(state): State<ServerState>,
67 Path(number): Path<u32>,
68) -> impl IntoResponse {
69 // For streaming decompressed data, read in spawn_blocking and stream chunks
70 // TODO: Implement true async streaming when tokio-util supports it better
71 match tokio::task::spawn_blocking({
72 let manager = Arc::clone(&state.manager);
73 move || {
74 let mut reader = manager.stream_bundle_decompressed(number)?;
75 use std::io::Read;
76 let mut buf = Vec::new();
77 reader.read_to_end(&mut buf)?;
78 Ok::<Vec<u8>, anyhow::Error>(buf)
79 }
80 })
81 .await
82 {
83 Ok(Ok(data)) => {
84 let filename = constants::bundle_filename(number).replace(".zst", "");
85 let headers = bundle_download_headers("application/x-ndjson", &filename);
86
87 (StatusCode::OK, headers, data).into_response()
88 }
89 Ok(Err(e)) => {
90 if is_not_found_error(&e) {
91 not_found("Bundle not found").into_response()
92 } else {
93 internal_error(&e.to_string()).into_response()
94 }
95 }
96 Err(_) => internal_error("Task join error").into_response(),
97 }
98}
99
100pub async fn handle_operation(
101 State(state): State<ServerState>,
102 Path(pointer): Path<String>,
103) -> impl IntoResponse {
104 // Parse pointer: "bundle:position" or global position
105 let (bundle_num, position) = match parse_operation_pointer(&pointer) {
106 Ok((b, p)) => (b, p),
107 Err(e) => return bad_request(&e.to_string()).into_response(),
108 };
109
110 if position >= constants::BUNDLE_SIZE {
111 return bad_request("Position must be 0-9999").into_response();
112 }
113
114 let total_start = Instant::now();
115 let load_start = Instant::now();
116
117 // get_operation_raw performs blocking file I/O, so we need to use spawn_blocking
118 let json_result = tokio::task::spawn_blocking({
119 let manager = Arc::clone(&state.manager);
120 move || manager.get_operation_raw(bundle_num, position)
121 })
122 .await;
123
124 match json_result {
125 Ok(Ok(json)) => {
126 let load_duration = load_start.elapsed();
127 let total_duration = total_start.elapsed();
128
129 let global_pos = crate::constants::bundle_position_to_global(bundle_num, position);
130
131 let mut headers = HeaderMap::new();
132 headers.insert("X-Bundle-Number", HeaderValue::from(bundle_num));
133 headers.insert("X-Position", HeaderValue::from(position));
134 headers.insert(
135 "X-Global-Position",
136 HeaderValue::from_str(&global_pos.to_string()).unwrap(),
137 );
138 headers.insert(
139 "X-Pointer",
140 HeaderValue::from_str(&format!("{}:{}", bundle_num, position)).unwrap(),
141 );
142 headers.insert(
143 "X-Load-Time-Ms",
144 HeaderValue::from_str(&format!("{:.3}", load_duration.as_secs_f64() * 1000.0))
145 .unwrap(),
146 );
147 headers.insert(
148 "X-Total-Time-Ms",
149 HeaderValue::from_str(&format!("{:.3}", total_duration.as_secs_f64() * 1000.0))
150 .unwrap(),
151 );
152 headers.insert(
153 "Cache-Control",
154 HeaderValue::from_static("public, max-age=31536000, immutable"),
155 );
156 headers.insert("Content-Type", HeaderValue::from_static("application/json"));
157
158 (StatusCode::OK, headers, json).into_response()
159 }
160 Ok(Err(e)) => {
161 if is_not_found_error(&e) {
162 not_found("Operation not found").into_response()
163 } else {
164 internal_error(&e.to_string()).into_response()
165 }
166 }
167 Err(e) => task_join_error(e).into_response(),
168 }
169}