use dotenvy::dotenv; use futures_util::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use std::env; use std::time::Duration; use sysinfo::{Disks, System}; use tokio_tungstenite::{connect_async, tungstenite::Message}; use tracing::{error, info, instrument}; use tracing_subscriber::FmtSubscriber; use url::Url; #[derive(Debug, Serialize, Deserialize, Clone)] struct Job { id: String, payload: String, } #[derive(Debug, Serialize, Deserialize, Clone)] struct Bid { job_id: String, available_cpu_cores: u32, available_ram_mb: u32, available_storage_mb: u32, } #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "type")] enum ServerMessage { PingForBids { job_id: String }, AssignJob { job: Job }, Acknowledge, } #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "type")] enum WorkerMessage { BidResponse(Bid), } fn get_system_resources() -> (u32, u32, u32) { let mut sys = System::new(); sys.refresh_cpu_all(); sys.refresh_memory(); let cpu_cores = sys.cpus().len() as u32; let available_ram_mb = (sys.available_memory() / (1024 * 1024)) as u32; let available_storage_mb: u32; let disks = Disks::new_with_refreshed_list(); let exe_path = std::env::current_exe().unwrap_or_else(|_| std::path::PathBuf::from("/")); let mut best_disk_mount: Option<&std::path::Path> = None; let mut best_disk_space: u64 = 0; for disk in disks.iter() { let mount_point = disk.mount_point(); if exe_path.starts_with(mount_point) { if best_disk_mount.is_none() || mount_point.as_os_str().len() > best_disk_mount.unwrap().as_os_str().len() { best_disk_mount = Some(mount_point); best_disk_space = disk.available_space(); } } } if best_disk_mount.is_some() { available_storage_mb = (best_disk_space / (1024 * 1024)) as u32; } else { let mut total_space: u64 = 0; for disk in disks.iter() { match disk.kind() { sysinfo::DiskKind::HDD | sysinfo::DiskKind::SSD => { total_space += disk.available_space(); } _ => {} } } if total_space == 0 { for disk in disks.iter() { total_space += disk.available_space(); } } available_storage_mb = (total_space / (1024 * 1024)) as u32; } (cpu_cores, available_ram_mb, available_storage_mb) } async fn spin_up_job(job: Job) { info!("Spinning up job: {}", job.id); tokio::time::sleep(Duration::from_secs(5)).await; info!("Job {} completed. Payload: {}", job.id, job.payload); } #[tokio::main] #[instrument] async fn main() { dotenv().ok(); FmtSubscriber::builder() .with_max_level(tracing::Level::INFO) .init(); let server_url = env::var("CONTROL_PLANE_URL").unwrap_or_else(|_| "ws://127.0.0.1:8080".to_string()); info!("Attempting to connect to job board at {}", server_url); let url = Url::parse(&server_url).expect("Failed to parse server URL"); loop { match connect_async(&url).await { Ok((ws_stream, _response)) => { info!("Successfully connected to job board"); let (mut write, mut read) = ws_stream.split(); while let Some(msg) = read.next().await { match msg { Ok(Message::Text(text)) => { match serde_json::from_str::(&text) { Ok(server_msg) => match server_msg { ServerMessage::PingForBids { job_id } => { info!("Received bid request for job_id: {}", job_id); let (cpu, ram, storage) = get_system_resources(); let bid = Bid { job_id: job_id.clone(), available_cpu_cores: cpu, available_ram_mb: ram, available_storage_mb: storage, }; let response_msg = WorkerMessage::BidResponse(bid); let response_json = serde_json::to_string(&response_msg) .expect("Failed to serialize bid response"); if let Err(e) = write.send(Message::Text(response_json.clone())).await { error!("Failed to send bid response: {}", e); break; } info!("Sent bid for job_id: {}", job_id); } ServerMessage::AssignJob { job } => { info!("Won bid! Assigned job: {}", job.id); tokio::spawn(spin_up_job(job)); } ServerMessage::Acknowledge => { info!("Received Acknowledge from server."); } }, Err(e) => { error!( "Failed to deserialize server message: {}. Raw text: {}", e, text ); } } } Ok(Message::Close(_)) => { info!("Server closed the connection."); break; } Err(e) => { error!("Error receiving message: {}", e); break; } _ => {} } } } Err(e) => { error!("Failed to connect: {}", e); } } info!("Disconnected. Reconnecting in 5 seconds..."); tokio::time::sleep(Duration::from_secs(5)).await; } }