tangled
alpha
login
or
join now
tsiry-sandratraina.com
/
replay
4
fork
atom
Sniff and replay HTTP requests and responses — perfect for mocking APIs during testing.
4
fork
atom
overview
issues
pulls
pipelines
run cargo fmt
tsiry-sandratraina.com
7 months ago
c5fff251
f965ea97
1/1
fmt.yml
success
10s
+350
-326
3 changed files
expand all
collapse all
unified
split
src
main.rs
proxy.rs
replay.rs
+9
-8
src/main.rs
···
1
-
2
-
use std::sync::{Arc};
3
4
use clap::{Arg, Command};
5
use owo_colors::OwoColorize;
···
31
Arg::new("target")
32
.short('t')
33
.long("target")
34
-
.help("The target URL to replay the requests to")
35
)
36
.arg(
37
Arg::new("listen")
38
.short('l')
39
.long("listen")
40
.help("The address to listen on for incoming requests")
41
-
.default_value("127.0.0.1:6677")
42
)
43
.subcommand(
44
Command::new("mock")
45
-
.about("Read mocks from replay_mock.json and start a replay server")
46
)
47
.get_matches();
48
-
49
50
if let Some(_) = matches.subcommand_matches("mock") {
51
let logs = store::load_logs_from_file(proxy::PROXY_LOG_FILE)?;
52
let logs = Arc::new(Mutex::new(logs));
53
let listen = matches.get_one::<String>("listen").unwrap();
54
-
println!("Loaded {} mocks from {}", logs.lock().await.len().magenta(), proxy::PROXY_LOG_FILE.magenta());
0
0
0
0
55
println!("Replay server is running on {}", listen.magenta());
56
replay::start_replay_server(logs, listen).await?;
57
return Ok(());
58
}
59
-
60
61
let target = matches.get_one::<String>("target");
62
···
1
+
use std::sync::Arc;
0
2
3
use clap::{Arg, Command};
4
use owo_colors::OwoColorize;
···
30
Arg::new("target")
31
.short('t')
32
.long("target")
33
+
.help("The target URL to replay the requests to"),
34
)
35
.arg(
36
Arg::new("listen")
37
.short('l')
38
.long("listen")
39
.help("The address to listen on for incoming requests")
40
+
.default_value("127.0.0.1:6677"),
41
)
42
.subcommand(
43
Command::new("mock")
44
+
.about("Read mocks from replay_mock.json and start a replay server"),
45
)
46
.get_matches();
0
47
48
if let Some(_) = matches.subcommand_matches("mock") {
49
let logs = store::load_logs_from_file(proxy::PROXY_LOG_FILE)?;
50
let logs = Arc::new(Mutex::new(logs));
51
let listen = matches.get_one::<String>("listen").unwrap();
52
+
println!(
53
+
"Loaded {} mocks from {}",
54
+
logs.lock().await.len().magenta(),
55
+
proxy::PROXY_LOG_FILE.magenta()
56
+
);
57
println!("Replay server is running on {}", listen.magenta());
58
replay::start_replay_server(logs, listen).await?;
59
return Ok(());
60
}
0
61
62
let target = matches.get_one::<String>("target");
63
+243
-232
src/proxy.rs
···
1
-
use std::{process::exit, sync::Arc, thread, time::{SystemTime, UNIX_EPOCH}};
0
0
0
0
0
2
3
use http_body_util::{BodyExt, Full};
4
-
use hyper::{body::{Buf, Bytes, Incoming}, server::conn::http1, Request, Response};
0
0
0
0
0
5
use owo_colors::OwoColorize;
6
use serde::{Deserialize, Serialize};
7
use tokio::{net::TcpListener, sync::Mutex};
8
-
use hyper_util::rt::TokioIo;
9
10
-
use crate::{replay::start_replay_server, store::{save_logs_to_file, LogStore}};
0
0
0
11
12
pub const PROXY_LOG_FILE: &str = "replay_mocks.json";
13
···
30
31
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
32
pub struct ProxyLog {
33
-
pub request: RequestLog,
34
-
pub response: ResponseLog,
35
}
36
37
-
pub async fn start_server(target: &str, listen: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
38
-
let target_uri = target.parse::<hyper::Uri>()?;
39
-
let target_authority = target_uri.authority().ok_or("Invalid target URL")?;
40
-
let target_scheme = target_uri.scheme_str().ok_or("http")?;
41
-
let target_host = target_authority.host();
42
-
let target_port = target_authority.port_u16().unwrap_or(if target_scheme == "https" { 443 } else { 80 });
0
0
0
0
0
43
44
-
let logs = Arc::new(Mutex::new(Vec::<ProxyLog>::new()));
45
-
let logs_for_saving = logs.clone();
46
-
tokio::spawn(async move {
47
-
loop {
48
-
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
49
-
save_logs_to_file(&logs_for_saving, PROXY_LOG_FILE).await
0
50
.unwrap_or_else(|e| eprintln!("Error saving logs to file: {}", e));
51
-
}
52
-
});
53
54
-
let logs_for_replay = logs.clone();
55
-
thread::spawn(move || {
56
-
let rt = tokio::runtime::Builder::new_multi_thread()
57
-
.enable_all()
58
-
.build()
59
-
.unwrap();
60
-
rt.block_on(async {
61
-
match start_replay_server(logs_for_replay, "127.0.0.1:6688").await {
62
-
Ok(_) => {
63
-
println!("Replay server stopped");
64
-
exit(0);
65
-
},
66
-
Err(e) => eprintln!("Replay server error: {}", e),
67
-
}
0
68
});
69
-
});
70
71
-
let listener = TcpListener::bind(listen).await?;
72
-
println!("Target URL: {}", target.magenta());
73
-
println!("Proxy server is listening on {}", listen.green());
74
-
println!("Replay server is running on {}", "127.0.0.1:6688".green());
75
76
-
loop {
77
-
let (stream, _) = listener.accept().await?;
78
-
let io = TokioIo::new(stream);
79
80
-
let target_host_str = target_host.to_string();
81
-
let target_scheme = target_scheme.to_string();
82
-
let logs_clone = logs.clone();
83
84
-
tokio::task::spawn(async move {
85
-
let service = hyper::service::service_fn(move |req: Request<Incoming>| {
86
-
let target_host = target_host_str.clone();
87
-
let scheme = target_scheme.clone();
88
-
let logs = logs_clone.clone();
89
90
-
async move {
91
-
proxy_handler(req, &target_host, target_port, &scheme, logs).await
92
-
}
93
-
});
94
95
-
if let Err(err) = http1::Builder::new()
96
.keep_alive(false)
97
.max_buf_size(30 * 1024 * 1024)
98
-
.serve_connection(io, service)
99
-
.await
100
-
{
101
-
eprintln!("> Connection error: {}", err);
102
-
}
103
-
});
104
-
}
105
-
106
}
107
108
pub async fn proxy_handler(
109
-
req: Request<Incoming>,
110
-
target_host: &str,
111
-
target_port: u16,
112
-
scheme: &str,
113
-
logs: LogStore,
114
) -> Result<Response<Full<Bytes>>, hyper::Error> {
115
-
let timestamp = SystemTime::now()
116
-
.duration_since(UNIX_EPOCH)
117
-
.unwrap()
118
-
.as_secs();
119
120
-
let method = req.method().clone();
121
-
let path = req.uri().path().to_string();
122
-
let query = req.uri().query().map(|q| q.to_string());
123
124
-
let headers: Vec<(String, String)> = req
125
-
.headers()
126
-
.iter()
127
-
.map(|(name, value)| {
128
-
(
129
-
name.to_string(),
130
-
value.to_str().unwrap_or("").to_string(),
131
-
)
132
-
})
133
-
.collect();
134
135
-
let (parts, body) = req.into_parts();
136
-
let body_bytes = match body.collect().await {
137
-
Ok(collected) => collected.aggregate(),
138
-
Err(e) => {
139
-
eprintln!("Error collecting request body: {}", e);
140
-
return Ok(Response::builder()
141
-
.status(500)
142
-
.body(Full::new(Bytes::from("Internal Server Error")))
143
-
.unwrap());
144
-
}
145
-
};
146
147
-
let body_vec = body_bytes.chunk().to_vec();
148
-
let body_str = String::from_utf8(body_vec.clone()).ok();
149
150
-
let forward_uri = if target_port != 443 && target_port != 80 {
151
-
format!(
152
-
"{}://{}:{}{}{}",
153
-
scheme,
154
-
target_host,
155
-
target_port,
156
-
parts.uri.path(),
157
-
parts.uri.query().map_or(String::new(), |q| format!("?{}", q))
158
-
)
159
-
} else {
160
-
format!(
161
-
"{}://{}{}{}",
162
-
scheme,
163
-
target_host,
164
-
parts.uri.path(),
165
-
parts.uri.query().map_or(String::new(), |q| format!("?{}", q))
166
-
)
167
-
};
0
0
0
0
0
0
168
169
-
println!("{} {} {}", method.yellow(), path, forward_uri.magenta());
170
171
-
let client = reqwest::Client::builder()
172
-
.timeout(std::time::Duration::from_secs(30))
173
-
.danger_accept_invalid_certs(true)
174
-
.build()
175
-
.unwrap_or_else(|_| reqwest::Client::new());
176
177
-
let mut req_builder = match method.as_str() {
178
-
"GET" => client.get(&forward_uri),
179
-
"POST" => client.post(&forward_uri),
180
-
"PUT" => client.put(&forward_uri),
181
-
"DELETE" => client.delete(&forward_uri),
182
-
"HEAD" => client.head(&forward_uri),
183
-
"OPTIONS" => client.request(reqwest::Method::OPTIONS, &forward_uri),
184
-
"PATCH" => client.patch(&forward_uri),
185
-
_ => {
186
-
eprintln!("Unsupported method: {}", method);
187
-
return Ok(Response::builder()
188
-
.status(400)
189
-
.body(Full::new(Bytes::from("Bad Request: Unsupported Method")))
190
-
.unwrap());
191
-
}
192
-
};
193
194
-
for (name, value) in &headers {
195
-
if name.to_lowercase() != "host" &&
196
-
name.to_lowercase() != "connection" {
197
-
if let Ok(header_name) = reqwest::header::HeaderName::from_bytes(name.as_bytes()) {
198
-
if let Ok(header_value) = reqwest::header::HeaderValue::from_str(value) {
199
-
req_builder = req_builder.header(header_name, header_value);
200
-
}
201
-
}
202
-
}
203
-
}
204
205
-
if !body_vec.is_empty() {
206
-
req_builder = req_builder.body(body_vec.clone());
207
-
}
208
209
-
let resp = match req_builder.send().await {
210
-
Ok(resp) => resp,
211
-
Err(e) => {
212
-
eprintln!("Error sending request: {}", e);
213
-
return Ok(Response::builder()
214
-
.status(502)
215
-
.body(Full::new(Bytes::from(format!("Bad Gateway: {}", e))))
216
-
.unwrap());
217
-
}
218
-
};
219
220
-
let status = resp.status().as_u16();
221
222
-
let resp_headers: Vec<(String, String)> = resp
223
-
.headers()
224
-
.iter()
225
-
.map(|(name, value)| {
226
-
(
227
-
name.to_string(),
228
-
value.to_str().unwrap_or("").to_string(),
229
-
)
230
-
})
231
-
.collect();
232
233
-
let resp_bytes = match resp.bytes().await {
234
-
Ok(bytes) => bytes,
235
-
Err(e) => {
236
-
eprintln!("Error reading response body: {}", e);
237
-
return Ok(Response::builder()
238
-
.status(500)
239
-
.body(Full::new(Bytes::from("Internal Server Error")))
240
-
.unwrap());
241
-
}
242
-
};
243
244
-
let resp_vec = resp_bytes.to_vec();
245
-
let resp_str = String::from_utf8(resp_vec.clone()).ok();
246
247
-
let log_entry = ProxyLog {
248
-
request: RequestLog {
249
-
timestamp,
250
-
method: method.to_string(),
251
-
path,
252
-
query_params: query,
253
-
headers,
254
-
body: body_str,
255
-
},
256
-
response: ResponseLog {
257
-
status,
258
-
headers: resp_headers.clone(),
259
-
body: resp_str.clone(),
260
-
},
261
-
};
262
263
-
{
264
-
let mut logs_guard = logs.lock().await;
265
-
if !logs_guard.iter()
266
-
.any(|log|
267
-
log.request.method == log_entry.request.method &&
268
-
log.request.path == log_entry.request.path &&
269
-
log.request.query_params == log_entry.request.query_params
270
-
) {
271
-
logs_guard.push(log_entry.clone());
272
}
273
-
}
274
275
-
println!("Saved request/response to log store {}", PROXY_LOG_FILE.magenta());
0
0
0
276
277
-
let mut builder = Response::builder().status(status);
278
279
-
for (name, value) in resp_headers {
280
-
if name.to_lowercase() != "connection" &&
281
-
name.to_lowercase() != "transfer-encoding" {
282
-
if let Ok(header_name) = hyper::header::HeaderName::from_bytes(name.as_bytes()) {
283
-
if let Ok(header_value) = hyper::header::HeaderValue::from_str(&value) {
284
-
builder = builder.header(header_name, header_value);
285
-
}
286
-
}
287
-
}
288
-
}
289
290
-
builder = builder.header("content-length", resp_vec.len());
291
-
builder = builder.header("connection", "close");
292
293
-
Ok(builder
294
-
.body(Full::new(Bytes::from(resp_vec)))
295
-
.unwrap_or_else(|_| {
296
-
Response::builder()
297
-
.status(500)
298
-
.body(Full::new(Bytes::from("Internal Server Error")))
299
-
.unwrap()
300
-
}))
301
-
}
···
1
+
use std::{
2
+
process::exit,
3
+
sync::Arc,
4
+
thread,
5
+
time::{SystemTime, UNIX_EPOCH},
6
+
};
7
8
use http_body_util::{BodyExt, Full};
9
+
use hyper::{
10
+
Request, Response,
11
+
body::{Buf, Bytes, Incoming},
12
+
server::conn::http1,
13
+
};
14
+
use hyper_util::rt::TokioIo;
15
use owo_colors::OwoColorize;
16
use serde::{Deserialize, Serialize};
17
use tokio::{net::TcpListener, sync::Mutex};
0
18
19
+
use crate::{
20
+
replay::start_replay_server,
21
+
store::{LogStore, save_logs_to_file},
22
+
};
23
24
pub const PROXY_LOG_FILE: &str = "replay_mocks.json";
25
···
42
43
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
44
pub struct ProxyLog {
45
+
pub request: RequestLog,
46
+
pub response: ResponseLog,
47
}
48
49
+
pub async fn start_server(
50
+
target: &str,
51
+
listen: &str,
52
+
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
53
+
let target_uri = target.parse::<hyper::Uri>()?;
54
+
let target_authority = target_uri.authority().ok_or("Invalid target URL")?;
55
+
let target_scheme = target_uri.scheme_str().ok_or("http")?;
56
+
let target_host = target_authority.host();
57
+
let target_port = target_authority
58
+
.port_u16()
59
+
.unwrap_or(if target_scheme == "https" { 443 } else { 80 });
60
61
+
let logs = Arc::new(Mutex::new(Vec::<ProxyLog>::new()));
62
+
let logs_for_saving = logs.clone();
63
+
tokio::spawn(async move {
64
+
loop {
65
+
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
66
+
save_logs_to_file(&logs_for_saving, PROXY_LOG_FILE)
67
+
.await
68
.unwrap_or_else(|e| eprintln!("Error saving logs to file: {}", e));
69
+
}
70
+
});
71
72
+
let logs_for_replay = logs.clone();
73
+
thread::spawn(move || {
74
+
let rt = tokio::runtime::Builder::new_multi_thread()
75
+
.enable_all()
76
+
.build()
77
+
.unwrap();
78
+
rt.block_on(async {
79
+
match start_replay_server(logs_for_replay, "127.0.0.1:6688").await {
80
+
Ok(_) => {
81
+
println!("Replay server stopped");
82
+
exit(0);
83
+
}
84
+
Err(e) => eprintln!("Replay server error: {}", e),
85
+
}
86
+
});
87
});
0
88
89
+
let listener = TcpListener::bind(listen).await?;
90
+
println!("Target URL: {}", target.magenta());
91
+
println!("Proxy server is listening on {}", listen.green());
92
+
println!("Replay server is running on {}", "127.0.0.1:6688".green());
93
94
+
loop {
95
+
let (stream, _) = listener.accept().await?;
96
+
let io = TokioIo::new(stream);
97
98
+
let target_host_str = target_host.to_string();
99
+
let target_scheme = target_scheme.to_string();
100
+
let logs_clone = logs.clone();
101
102
+
tokio::task::spawn(async move {
103
+
let service = hyper::service::service_fn(move |req: Request<Incoming>| {
104
+
let target_host = target_host_str.clone();
105
+
let scheme = target_scheme.clone();
106
+
let logs = logs_clone.clone();
107
108
+
async move { proxy_handler(req, &target_host, target_port, &scheme, logs).await }
109
+
});
0
0
110
111
+
if let Err(err) = http1::Builder::new()
112
.keep_alive(false)
113
.max_buf_size(30 * 1024 * 1024)
114
+
.serve_connection(io, service)
115
+
.await
116
+
{
117
+
eprintln!("> Connection error: {}", err);
118
+
}
119
+
});
120
+
}
0
121
}
122
123
pub async fn proxy_handler(
124
+
req: Request<Incoming>,
125
+
target_host: &str,
126
+
target_port: u16,
127
+
scheme: &str,
128
+
logs: LogStore,
129
) -> Result<Response<Full<Bytes>>, hyper::Error> {
130
+
let timestamp = SystemTime::now()
131
+
.duration_since(UNIX_EPOCH)
132
+
.unwrap()
133
+
.as_secs();
134
135
+
let method = req.method().clone();
136
+
let path = req.uri().path().to_string();
137
+
let query = req.uri().query().map(|q| q.to_string());
138
139
+
let headers: Vec<(String, String)> = req
140
+
.headers()
141
+
.iter()
142
+
.map(|(name, value)| (name.to_string(), value.to_str().unwrap_or("").to_string()))
143
+
.collect();
0
0
0
0
0
144
145
+
let (parts, body) = req.into_parts();
146
+
let body_bytes = match body.collect().await {
147
+
Ok(collected) => collected.aggregate(),
148
+
Err(e) => {
149
+
eprintln!("Error collecting request body: {}", e);
150
+
return Ok(Response::builder()
151
+
.status(500)
152
+
.body(Full::new(Bytes::from("Internal Server Error")))
153
+
.unwrap());
154
+
}
155
+
};
156
157
+
let body_vec = body_bytes.chunk().to_vec();
158
+
let body_str = String::from_utf8(body_vec.clone()).ok();
159
160
+
let forward_uri = if target_port != 443 && target_port != 80 {
161
+
format!(
162
+
"{}://{}:{}{}{}",
163
+
scheme,
164
+
target_host,
165
+
target_port,
166
+
parts.uri.path(),
167
+
parts
168
+
.uri
169
+
.query()
170
+
.map_or(String::new(), |q| format!("?{}", q))
171
+
)
172
+
} else {
173
+
format!(
174
+
"{}://{}{}{}",
175
+
scheme,
176
+
target_host,
177
+
parts.uri.path(),
178
+
parts
179
+
.uri
180
+
.query()
181
+
.map_or(String::new(), |q| format!("?{}", q))
182
+
)
183
+
};
184
185
+
println!("{} {} {}", method.yellow(), path, forward_uri.magenta());
186
187
+
let client = reqwest::Client::builder()
188
+
.timeout(std::time::Duration::from_secs(30))
189
+
.danger_accept_invalid_certs(true)
190
+
.build()
191
+
.unwrap_or_else(|_| reqwest::Client::new());
192
193
+
let mut req_builder = match method.as_str() {
194
+
"GET" => client.get(&forward_uri),
195
+
"POST" => client.post(&forward_uri),
196
+
"PUT" => client.put(&forward_uri),
197
+
"DELETE" => client.delete(&forward_uri),
198
+
"HEAD" => client.head(&forward_uri),
199
+
"OPTIONS" => client.request(reqwest::Method::OPTIONS, &forward_uri),
200
+
"PATCH" => client.patch(&forward_uri),
201
+
_ => {
202
+
eprintln!("Unsupported method: {}", method);
203
+
return Ok(Response::builder()
204
+
.status(400)
205
+
.body(Full::new(Bytes::from("Bad Request: Unsupported Method")))
206
+
.unwrap());
207
+
}
208
+
};
209
210
+
for (name, value) in &headers {
211
+
if name.to_lowercase() != "host" && name.to_lowercase() != "connection" {
212
+
if let Ok(header_name) = reqwest::header::HeaderName::from_bytes(name.as_bytes()) {
213
+
if let Ok(header_value) = reqwest::header::HeaderValue::from_str(value) {
214
+
req_builder = req_builder.header(header_name, header_value);
215
+
}
216
+
}
217
+
}
218
+
}
0
219
220
+
if !body_vec.is_empty() {
221
+
req_builder = req_builder.body(body_vec.clone());
222
+
}
223
224
+
let resp = match req_builder.send().await {
225
+
Ok(resp) => resp,
226
+
Err(e) => {
227
+
eprintln!("Error sending request: {}", e);
228
+
return Ok(Response::builder()
229
+
.status(502)
230
+
.body(Full::new(Bytes::from(format!("Bad Gateway: {}", e))))
231
+
.unwrap());
232
+
}
233
+
};
234
235
+
let status = resp.status().as_u16();
236
237
+
let resp_headers: Vec<(String, String)> = resp
238
+
.headers()
239
+
.iter()
240
+
.map(|(name, value)| (name.to_string(), value.to_str().unwrap_or("").to_string()))
241
+
.collect();
0
0
0
0
0
242
243
+
let resp_bytes = match resp.bytes().await {
244
+
Ok(bytes) => bytes,
245
+
Err(e) => {
246
+
eprintln!("Error reading response body: {}", e);
247
+
return Ok(Response::builder()
248
+
.status(500)
249
+
.body(Full::new(Bytes::from("Internal Server Error")))
250
+
.unwrap());
251
+
}
252
+
};
253
254
+
let resp_vec = resp_bytes.to_vec();
255
+
let resp_str = String::from_utf8(resp_vec.clone()).ok();
256
257
+
let log_entry = ProxyLog {
258
+
request: RequestLog {
259
+
timestamp,
260
+
method: method.to_string(),
261
+
path,
262
+
query_params: query,
263
+
headers,
264
+
body: body_str,
265
+
},
266
+
response: ResponseLog {
267
+
status,
268
+
headers: resp_headers.clone(),
269
+
body: resp_str.clone(),
270
+
},
271
+
};
272
273
+
{
274
+
let mut logs_guard = logs.lock().await;
275
+
if !logs_guard.iter().any(|log| {
276
+
log.request.method == log_entry.request.method
277
+
&& log.request.path == log_entry.request.path
278
+
&& log.request.query_params == log_entry.request.query_params
279
+
}) {
280
+
logs_guard.push(log_entry.clone());
281
+
}
282
}
0
283
284
+
println!(
285
+
"Saved request/response to log store {}",
286
+
PROXY_LOG_FILE.magenta()
287
+
);
288
289
+
let mut builder = Response::builder().status(status);
290
291
+
for (name, value) in resp_headers {
292
+
if name.to_lowercase() != "connection" && name.to_lowercase() != "transfer-encoding" {
293
+
if let Ok(header_name) = hyper::header::HeaderName::from_bytes(name.as_bytes()) {
294
+
if let Ok(header_value) = hyper::header::HeaderValue::from_str(&value) {
295
+
builder = builder.header(header_name, header_value);
296
+
}
297
+
}
298
+
}
299
+
}
0
300
301
+
builder = builder.header("content-length", resp_vec.len());
302
+
builder = builder.header("connection", "close");
303
304
+
Ok(builder
305
+
.body(Full::new(Bytes::from(resp_vec)))
306
+
.unwrap_or_else(|_| {
307
+
Response::builder()
308
+
.status(500)
309
+
.body(Full::new(Bytes::from("Internal Server Error")))
310
+
.unwrap()
311
+
}))
312
+
}
+98
-86
src/replay.rs
···
1
use crate::proxy::PROXY_LOG_FILE;
2
-
use crate::store::{load_logs_from_file, LogStore};
3
-
use actix_web::{web, App, HttpServer, HttpRequest, HttpResponse, Responder};
4
use actix_web::http::header::{HeaderName, HeaderValue};
0
5
use owo_colors::OwoColorize;
6
7
-
pub async fn start_replay_server(logs: LogStore, bind_address: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
8
-
if let Ok(file_logs) = load_logs_from_file(PROXY_LOG_FILE) {
9
-
let mut logs_guard = logs.lock().await;
10
-
for log in file_logs {
11
-
logs_guard.push(log);
12
-
}
13
-
}
0
0
0
14
15
-
fn build_request_key(method: &str, path: &str, query: &Option<String>) -> String {
16
-
if let Some(q) = query {
17
-
format!("{} {}?{}", method, path, q)
18
-
} else {
19
-
format!("{} {}", method, path)
20
-
}
21
-
}
22
23
-
async fn replay_handler(
24
-
req: HttpRequest,
25
-
_body: web::Bytes,
26
-
logs: web::Data<LogStore>,
27
-
) -> impl Responder {
28
-
let method = req.method().to_string();
29
-
let path = req.path().to_string();
30
-
let query = req.query_string();
31
-
let query_opt = if query.is_empty() { None } else { Some(query.to_string()) };
0
0
0
0
32
33
-
let key = build_request_key(&method, &path, &query_opt);
34
-
println!("Replay server received request: {}", key.magenta());
35
36
-
let response = {
37
-
let logs_guard = logs.lock().await;
38
-
logs_guard.iter()
39
-
.find(|log| {
40
-
let log_key = build_request_key(
41
-
&log.request.method,
42
-
&log.request.path,
43
-
&log.request.query_params
44
-
);
45
-
log_key == key
46
-
})
47
-
.cloned()
48
-
};
0
49
50
-
if let Some(log) = response {
51
-
println!("Found matching response for: {}", key.magenta());
52
53
-
let mut response_builder = HttpResponse::build(
54
-
actix_web::http::StatusCode::from_u16(log.response.status).unwrap_or(actix_web::http::StatusCode::OK)
55
-
);
0
56
57
-
for (name, value) in log.response.headers {
58
-
if let (Ok(header_name), Ok(header_value)) = (
59
-
HeaderName::try_from(name.as_str()),
60
-
HeaderValue::try_from(value.as_str())
61
-
) {
62
-
response_builder.append_header((header_name, header_value));
63
-
}
64
-
}
65
66
-
if let Some(body) = log.response.body {
67
-
response_builder.body(body)
68
-
} else {
69
-
response_builder.finish()
70
-
}
71
-
} else {
72
-
println!("No matching response found for: {}", key.magenta());
73
-
HttpResponse::NotFound().body("No matching request found in logs")
74
-
}
75
-
}
76
77
-
async fn list_requests(logs: web::Data<LogStore>) -> impl Responder {
78
-
let logs_guard = logs.lock().await;
79
-
let requests: Vec<_> = logs_guard.iter().map(|log| {
80
-
let key = build_request_key(
81
-
&log.request.method,
82
-
&log.request.path,
83
-
&log.request.query_params
84
-
);
85
-
(key, log.response.status)
86
-
}).collect();
0
0
0
87
88
-
web::Json(requests)
89
-
}
90
91
-
HttpServer::new(move || {
92
-
App::new()
93
-
.app_data(web::Data::new(logs.clone()))
94
-
.route("/admin/requests", web::get().to(list_requests))
95
-
.default_service(web::route().to(replay_handler))
96
-
})
97
-
.bind(bind_address)?
98
-
.run()
99
-
.await?;
100
101
-
Ok(())
102
-
}
···
1
use crate::proxy::PROXY_LOG_FILE;
2
+
use crate::store::{LogStore, load_logs_from_file};
0
3
use actix_web::http::header::{HeaderName, HeaderValue};
4
+
use actix_web::{App, HttpRequest, HttpResponse, HttpServer, Responder, web};
5
use owo_colors::OwoColorize;
6
7
+
pub async fn start_replay_server(
8
+
logs: LogStore,
9
+
bind_address: &str,
10
+
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
11
+
if let Ok(file_logs) = load_logs_from_file(PROXY_LOG_FILE) {
12
+
let mut logs_guard = logs.lock().await;
13
+
for log in file_logs {
14
+
logs_guard.push(log);
15
+
}
16
+
}
17
18
+
fn build_request_key(method: &str, path: &str, query: &Option<String>) -> String {
19
+
if let Some(q) = query {
20
+
format!("{} {}?{}", method, path, q)
21
+
} else {
22
+
format!("{} {}", method, path)
23
+
}
24
+
}
25
26
+
async fn replay_handler(
27
+
req: HttpRequest,
28
+
_body: web::Bytes,
29
+
logs: web::Data<LogStore>,
30
+
) -> impl Responder {
31
+
let method = req.method().to_string();
32
+
let path = req.path().to_string();
33
+
let query = req.query_string();
34
+
let query_opt = if query.is_empty() {
35
+
None
36
+
} else {
37
+
Some(query.to_string())
38
+
};
39
40
+
let key = build_request_key(&method, &path, &query_opt);
41
+
println!("Replay server received request: {}", key.magenta());
42
43
+
let response = {
44
+
let logs_guard = logs.lock().await;
45
+
logs_guard
46
+
.iter()
47
+
.find(|log| {
48
+
let log_key = build_request_key(
49
+
&log.request.method,
50
+
&log.request.path,
51
+
&log.request.query_params,
52
+
);
53
+
log_key == key
54
+
})
55
+
.cloned()
56
+
};
57
58
+
if let Some(log) = response {
59
+
println!("Found matching response for: {}", key.magenta());
60
61
+
let mut response_builder = HttpResponse::build(
62
+
actix_web::http::StatusCode::from_u16(log.response.status)
63
+
.unwrap_or(actix_web::http::StatusCode::OK),
64
+
);
65
66
+
for (name, value) in log.response.headers {
67
+
if let (Ok(header_name), Ok(header_value)) = (
68
+
HeaderName::try_from(name.as_str()),
69
+
HeaderValue::try_from(value.as_str()),
70
+
) {
71
+
response_builder.append_header((header_name, header_value));
72
+
}
73
+
}
74
75
+
if let Some(body) = log.response.body {
76
+
response_builder.body(body)
77
+
} else {
78
+
response_builder.finish()
79
+
}
80
+
} else {
81
+
println!("No matching response found for: {}", key.magenta());
82
+
HttpResponse::NotFound().body("No matching request found in logs")
83
+
}
84
+
}
85
86
+
async fn list_requests(logs: web::Data<LogStore>) -> impl Responder {
87
+
let logs_guard = logs.lock().await;
88
+
let requests: Vec<_> = logs_guard
89
+
.iter()
90
+
.map(|log| {
91
+
let key = build_request_key(
92
+
&log.request.method,
93
+
&log.request.path,
94
+
&log.request.query_params,
95
+
);
96
+
(key, log.response.status)
97
+
})
98
+
.collect();
99
100
+
web::Json(requests)
101
+
}
102
103
+
HttpServer::new(move || {
104
+
App::new()
105
+
.app_data(web::Data::new(logs.clone()))
106
+
.route("/admin/requests", web::get().to(list_requests))
107
+
.default_service(web::route().to(replay_handler))
108
+
})
109
+
.bind(bind_address)?
110
+
.run()
111
+
.await?;
112
113
+
Ok(())
114
+
}