this repo has no description
at main 289 lines 9.5 kB view raw
1use std::path::{Path, PathBuf}; 2use std::time::Duration; 3 4use anyhow::{Context, Result, anyhow}; 5use chromiumoxide::browser::{Browser, BrowserConfig}; 6use chromiumoxide::cdp::browser_protocol::page::{ 7 EventScreencastFrame, ScreencastFrameAckParams, StartScreencastFormat, StartScreencastParams, 8 StopScreencastParams, 9}; 10use chromiumoxide::handler::viewport::Viewport; 11use futures::StreamExt; 12use tokio::io::{AsyncBufReadExt, BufReader}; 13use tokio::sync::mpsc; 14use tokio::time::MissedTickBehavior; 15use tracing::{debug, error, info, warn}; 16 17use crate::cli::AppConfig; 18use crate::encoder::FfmpegEncoder; 19use crate::error::RuntimeError; 20use crate::frame::{RgbFrame, decode_screencast_frame}; 21 22pub async fn stream_browser_to_encoder( 23 config: &AppConfig, 24 chromium_path: &Path, 25 encoder: &mut FfmpegEncoder, 26) -> Result<()> { 27 let viewport = Viewport { 28 width: config.width, 29 height: config.height, 30 device_scale_factor: Some(1.0), 31 emulating_mobile: false, 32 is_landscape: config.width >= config.height, 33 has_touch: false, 34 }; 35 36 let mut browser_builder = BrowserConfig::builder() 37 .chrome_executable(chromium_path) 38 .window_size(config.width, config.height) 39 .new_headless_mode() 40 .viewport(viewport) 41 .arg("--autoplay-policy=no-user-gesture-required") 42 .arg("--disable-background-timer-throttling") 43 .arg("--disable-backgrounding-occluded-windows") 44 .arg("--disable-renderer-backgrounding"); 45 46 if no_sandbox_from_env() { 47 browser_builder = browser_builder.no_sandbox(); 48 } 49 50 let browser_config = browser_builder 51 .build() 52 .map_err(|err| anyhow!("failed to build browser config: {err}"))?; 53 54 info!(chromium = %chromium_path.display(), "starting chromium"); 55 56 let (mut browser, mut handler) = Browser::launch(browser_config) 57 .await 58 .context("failed to launch chromium")?; 59 60 let handler_task = tokio::spawn(async move { 61 while let Some(item) = handler.next().await { 62 if let Err(err) = item { 63 error!("chromium handler error: {err}"); 64 break; 65 } 66 } 67 }); 68 69 let page = browser 70 .new_page("about:blank") 71 .await 72 .context("failed to create page")?; 73 74 page.goto(config.website_url.as_str()) 75 .await 76 .with_context(|| format!("failed loading {}", config.website_url))?; 77 78 // `goto` waits for page load completion. Delay further for dynamic JS/CSS settling. 79 tokio::time::sleep(Duration::from_millis(config.startup_delay_ms)).await; 80 81 let mut frame_events = page 82 .event_listener::<EventScreencastFrame>() 83 .await 84 .context("failed to register screencast event listener")?; 85 86 let start_params = StartScreencastParams::builder() 87 .format(StartScreencastFormat::Jpeg) 88 .quality(80_i64) 89 .max_width(i64::from(config.width)) 90 .max_height(i64::from(config.height)) 91 .every_nth_frame(1_i64) 92 .build(); 93 94 page.execute(start_params) 95 .await 96 .context("failed to start screencast")?; 97 98 info!("runtime controls: type `r` then Enter to refresh the page"); 99 100 let mut control_rx = spawn_control_listener(); 101 let frame_interval = Duration::from_secs_f64(1.0_f64 / f64::from(config.fps)); 102 let mut frame_tick = tokio::time::interval(frame_interval); 103 frame_tick.set_missed_tick_behavior(MissedTickBehavior::Skip); 104 frame_tick.tick().await; 105 let mut stats_tick = tokio::time::interval(Duration::from_secs(5)); 106 stats_tick.set_missed_tick_behavior(MissedTickBehavior::Skip); 107 stats_tick.tick().await; 108 109 let first_frame_timeout = tokio::time::sleep(Duration::from_millis(config.frame_timeout_ms)); 110 tokio::pin!(first_frame_timeout); 111 let mut latest_frame: Option<RgbFrame> = None; 112 let mut decoded_frames: u64 = 0; 113 let mut encoded_frames: u64 = 0; 114 115 let stream_result: Result<()> = async { 116 loop { 117 tokio::select! { 118 biased; 119 _ = frame_tick.tick() => { 120 if let Some(frame) = latest_frame.as_ref() { 121 encoder.write_frame(frame).await?; 122 encoded_frames = encoded_frames.saturating_add(1); 123 } 124 } 125 maybe_event = frame_events.next() => { 126 let event = maybe_event.context("screencast event stream ended unexpectedly")?; 127 128 page.execute(ScreencastFrameAckParams::new(event.session_id)) 129 .await 130 .context("failed to ack screencast frame")?; 131 132 let frame = decode_screencast_frame(event.data.as_ref(), config.width, config.height) 133 .context("failed to decode screencast frame")?; 134 135 if latest_frame.is_none() { 136 info!("received first screencast frame"); 137 // Prime ffmpeg immediately so it can initialize output without waiting for the first tick. 138 encoder.write_frame(&frame).await?; 139 encoded_frames = encoded_frames.saturating_add(1); 140 } 141 decoded_frames = decoded_frames.saturating_add(1); 142 latest_frame = Some(frame); 143 } 144 _ = stats_tick.tick() => { 145 debug!( 146 decoded_frames, 147 encoded_frames, 148 has_frame = latest_frame.is_some(), 149 "streaming stats" 150 ); 151 } 152 command = control_rx.recv() => { 153 match command { 154 Some(ControlCommand::Refresh) => { 155 page.reload() 156 .await 157 .context("manual refresh failed")?; 158 info!("manual refresh applied"); 159 } 160 Some(ControlCommand::Help) => { 161 info!("runtime controls: `r` or `refresh` reloads the page"); 162 } 163 None => { 164 // stdin closed; continue streaming without runtime controls. 165 } 166 } 167 } 168 _ = tokio::signal::ctrl_c() => { 169 return Err(RuntimeError::ShutdownRequested.into()); 170 } 171 _ = &mut first_frame_timeout, if latest_frame.is_none() => { 172 return Err(RuntimeError::ScreencastTimeout.into()); 173 } 174 } 175 } 176 } 177 .await; 178 179 if let Err(err) = page.execute(StopScreencastParams::default()).await { 180 warn!("failed to stop screencast cleanly: {err}"); 181 } 182 183 if let Err(err) = browser.close().await { 184 warn!("failed to close browser cleanly: {err}"); 185 } 186 if let Err(err) = browser.wait().await { 187 warn!("failed to wait for browser process: {err}"); 188 } 189 190 handler_task.abort(); 191 192 stream_result 193} 194 195fn no_sandbox_from_env() -> bool { 196 match std::env::var("BROWSER_STREAM_NO_SANDBOX") { 197 Ok(value) => parse_truthy(&value), 198 Err(_) => false, 199 } 200} 201 202fn parse_truthy(value: &str) -> bool { 203 matches!( 204 value.trim().to_ascii_lowercase().as_str(), 205 "1" | "true" | "yes" 206 ) 207} 208 209pub fn chromium_executable_name() -> &'static str { 210 if cfg!(target_os = "windows") { 211 "headless_shell.exe" 212 } else { 213 "headless_shell" 214 } 215} 216 217pub fn default_chromium_sidecar_path(exe_dir: &Path) -> PathBuf { 218 exe_dir 219 .join("..") 220 .join("sidecar") 221 .join("chromium") 222 .join(chromium_executable_name()) 223} 224 225#[derive(Debug, Copy, Clone)] 226enum ControlCommand { 227 Refresh, 228 Help, 229} 230 231fn parse_control_command(input: &str) -> Option<ControlCommand> { 232 match input.trim().to_ascii_lowercase().as_str() { 233 "r" | "refresh" => Some(ControlCommand::Refresh), 234 "h" | "help" => Some(ControlCommand::Help), 235 _ => None, 236 } 237} 238 239fn spawn_control_listener() -> mpsc::UnboundedReceiver<ControlCommand> { 240 let (tx, rx) = mpsc::unbounded_channel(); 241 242 tokio::spawn(async move { 243 let mut lines = BufReader::new(tokio::io::stdin()).lines(); 244 while let Ok(Some(line)) = lines.next_line().await { 245 if let Some(command) = parse_control_command(&line) { 246 if tx.send(command).is_err() { 247 break; 248 } 249 } 250 } 251 }); 252 253 rx 254} 255 256#[cfg(test)] 257mod tests { 258 use super::{ControlCommand, parse_control_command, parse_truthy}; 259 260 #[test] 261 fn parses_refresh_shortcut() { 262 assert!(matches!( 263 parse_control_command("r"), 264 Some(ControlCommand::Refresh) 265 )); 266 } 267 268 #[test] 269 fn parses_refresh_word() { 270 assert!(matches!( 271 parse_control_command(" refresh "), 272 Some(ControlCommand::Refresh) 273 )); 274 } 275 276 #[test] 277 fn ignores_unknown_commands() { 278 assert!(parse_control_command("noop").is_none()); 279 } 280 281 #[test] 282 fn truthy_parser() { 283 assert!(parse_truthy("true")); 284 assert!(parse_truthy("1")); 285 assert!(parse_truthy("YES")); 286 assert!(!parse_truthy("0")); 287 assert!(!parse_truthy("false")); 288 } 289}