···11+[package]
22+name = "job-board-orchestrator"
33+version = "0.1.0"
44+edition = "2024"
55+66+[dependencies]
77+# The core async runtime for Rust
88+tokio = { version = "1", features = ["full"] }
99+1010+# WebSocket library that integrates with Tokio
1111+tokio-tungstenite = "0.23"
1212+1313+# Utilities for working with async streams and sinks
1414+futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
1515+1616+# Framework for serializing and deserializing Rust data structures
1717+serde = { version = "1.0", features = ["derive"] }
1818+1919+# A JSON implementation for Serde
2020+serde_json = "1.0"
2121+2222+# For generating unique identifiers for jobs
2323+uuid = { version = "1.8", features = ["v4"] }
2424+2525+# A framework for structured, event-based logging
2626+tracing = "0.1"
2727+2828+# A subscriber for the tracing framework that formats and outputs logs
2929+tracing-subscriber = "0.3"
3030+
+11
control-plane/job-board/README.md
···11+# Job board
22+33+This posts jobs for worker nodes to try to bid to recieve. So far in the development, the node with the most available resources wins.
44+55+More coming soon.
66+77+Here's an example response from a worker node in case you want to `wscat` to the job board to test responses:
88+99+bash```
1010+{ "type": "BidResponse", "job_id": "PASTE_THE_JOB_ID_HERE", "available_cpu_cores": 999, "available_ram_mb": 99999, "available_storage_mb": 999999 }
1111+```
+265
control-plane/job-board/src/main.rs
···11+use futures_util::{
22+ stream::{SplitSink},
33+ SinkExt, StreamExt,
44+};
55+use serde::{Deserialize, Serialize};
66+use std::{
77+ collections::{HashMap, VecDeque},
88+ net::SocketAddr,
99+ sync::Arc,
1010+ time::{Duration, Instant},
1111+};
1212+use tokio::{
1313+ net::{TcpListener, TcpStream},
1414+ sync::Mutex,
1515+};
1616+use tokio_tungstenite::{
1717+ accept_async,
1818+ tungstenite::{Message},
1919+ WebSocketStream,
2020+};
2121+use tracing::{error, info, instrument};
2222+use tracing_subscriber::FmtSubscriber;
2323+use uuid::Uuid;
2424+2525+type PeerMap = Arc<Mutex<HashMap<SocketAddr, SplitSink<WebSocketStream<TcpStream>, Message>>>>;
2626+type JobQueue = Arc<Mutex<VecDeque<Job>>>;
2727+type Bids = Arc<Mutex<HashMap<SocketAddr, Bid>>>;
2828+2929+#[derive(Debug, Serialize, Deserialize, Clone)]
3030+struct Job {
3131+ id: String,
3232+ payload: String,
3333+}
3434+3535+#[derive(Debug, Serialize, Deserialize, Clone)]
3636+struct Bid {
3737+ job_id: String,
3838+ available_cpu_cores: u32,
3939+ available_ram_mb: u32,
4040+ available_storage_mb: u32,
4141+}
4242+4343+impl Bid {
4444+ fn score(&self) -> u64 {
4545+ (self.available_cpu_cores as u64 * 1000)
4646+ + self.available_ram_mb as u64
4747+ + (self.available_storage_mb as u64 / 10)
4848+ }
4949+}
5050+5151+#[derive(Debug, Serialize, Deserialize)]
5252+#[serde(tag = "type")]
5353+enum ServerMessage {
5454+ PingForBids { job_id: String },
5555+ AssignJob { job: Job },
5656+ Acknowledge,
5757+}
5858+5959+#[derive(Debug, Serialize, Deserialize)]
6060+#[serde(tag = "type")]
6161+enum WorkerMessage {
6262+ BidResponse(Bid),
6363+}
6464+6565+#[tokio::main]
6666+async fn main() {
6767+ FmtSubscriber::builder()
6868+ .with_max_level(tracing::Level::INFO)
6969+ .init();
7070+7171+ let peers: PeerMap = Arc::new(Mutex::new(HashMap::new()));
7272+ let job_queue: JobQueue = Arc::new(Mutex::new(VecDeque::new()));
7373+ let bids: Bids = Arc::new(Mutex::new(HashMap::new()));
7474+7575+ {
7676+ let mut queue = job_queue.lock().await;
7777+ info!("Populating job queue with initial jobs...");
7878+ for i in 1..=5 {
7979+ queue.push_back(Job {
8080+ id: Uuid::new_v4().to_string(),
8181+ payload: format!("This is job number {}", i),
8282+ });
8383+ }
8484+ info!("{} jobs added to the queue.", queue.len());
8585+ }
8686+8787+ let peers_clone = Arc::clone(&peers);
8888+ let queue_clone = Arc::clone(&job_queue);
8989+ let bids_clone = Arc::clone(&bids);
9090+ tokio::spawn(async move {
9191+ queue_monitor(queue_clone, peers_clone, bids_clone).await;
9292+ });
9393+9494+ let addr = "127.0.0.1:9001";
9595+ let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
9696+ info!("Job Board listening on: {}", addr);
9797+9898+ while let Ok((stream, addr)) = listener.accept().await {
9999+ info!("New worker connection from: {}", addr);
100100+ tokio::spawn(handle_connection(
101101+ Arc::clone(&peers),
102102+ Arc::clone(&bids),
103103+ stream,
104104+ addr,
105105+ ));
106106+ }
107107+}
108108+109109+#[instrument(skip(peers, bids, stream))]
110110+async fn handle_connection(
111111+ peers: PeerMap,
112112+ bids: Bids,
113113+ stream: TcpStream,
114114+ addr: SocketAddr,
115115+) {
116116+ let ws_stream = match accept_async(stream).await {
117117+ Ok(ws) => ws,
118118+ Err(e) => {
119119+ error!("Error during WebSocket handshake: {}", e);
120120+ return;
121121+ }
122122+ };
123123+ info!("WebSocket connection established: {}", addr);
124124+125125+ let (mut ws_sender, mut ws_receiver) = ws_stream.split();
126126+127127+ let ack_msg = ServerMessage::Acknowledge;
128128+ let ack_json = serde_json::to_string(&ack_msg).unwrap();
129129+ if ws_sender.send(Message::Text(ack_json)).await.is_err() {
130130+ error!("Failed to send Acknowledge to {}", addr);
131131+ return;
132132+ }
133133+134134+ peers.lock().await.insert(addr, ws_sender);
135135+136136+ while let Some(msg) = ws_receiver.next().await {
137137+ match msg {
138138+ Ok(Message::Text(text)) => {
139139+ match serde_json::from_str::<WorkerMessage>(&text) {
140140+ Ok(WorkerMessage::BidResponse(bid)) => {
141141+ info!("Received bid from {}: {:?}", addr, bid);
142142+ bids.lock().await.insert(addr, bid);
143143+ }
144144+ Err(e) => {
145145+ error!("Error deserializing message from {}: {}", addr, e);
146146+ }
147147+ }
148148+ }
149149+ Ok(Message::Close(_)) => {
150150+ info!("Received Close message from {}", addr);
151151+ break;
152152+ }
153153+ Err(tokio_tungstenite::tungstenite::Error::ConnectionClosed) => {
154154+ info!("Connection to {} closed gracefully.", addr);
155155+ break;
156156+ }
157157+ Err(e) => {
158158+ error!("Error receiving message from {}: {}", addr, e);
159159+ break;
160160+ }
161161+ _ => {}
162162+ }
163163+ }
164164+165165+ info!("Worker {} disconnected.", addr);
166166+ peers.lock().await.remove(&addr);
167167+}
168168+169169+#[instrument(skip(job_queue, peers, bids))]
170170+async fn queue_monitor(job_queue: JobQueue, peers: PeerMap, bids: Bids) {
171171+ loop {
172172+ tokio::time::sleep(Duration::from_secs(3)).await;
173173+174174+ let job = {
175175+ let mut queue = job_queue.lock().await;
176176+ queue.pop_front()
177177+ };
178178+179179+ if let Some(job) = job {
180180+ info!("Found job {} in queue. Starting bidding process.", job.id);
181181+182182+ bids.lock().await.clear();
183183+184184+ let ping_msg = ServerMessage::PingForBids {
185185+ job_id: job.id.clone(),
186186+ };
187187+ let ping_json = serde_json::to_string(&ping_msg).unwrap();
188188+ let mut disconnected_peers = Vec::new();
189189+190190+ let expected_bids = {
191191+ let mut peers_map = peers.lock().await;
192192+ if peers_map.is_empty() {
193193+ info!("No workers connected. Returning job to the front of the queue.");
194194+ job_queue.lock().await.push_front(job);
195195+ continue;
196196+ }
197197+198198+ info!("Pinging {} connected workers for bids.", peers_map.len());
199199+ for (addr, sender) in peers_map.iter_mut() {
200200+ if sender.send(Message::Text(ping_json.clone())).await.is_err() {
201201+ error!("Failed to send ping to {}. Marking for removal.", addr);
202202+ disconnected_peers.push(*addr);
203203+ }
204204+ }
205205+ peers_map.len() - disconnected_peers.len()
206206+ };
207207+208208+ if !disconnected_peers.is_empty() {
209209+ let mut peers_map = peers.lock().await;
210210+ for addr in disconnected_peers {
211211+ peers_map.remove(&addr);
212212+ }
213213+ }
214214+215215+ if expected_bids == 0 {
216216+ info!("No active workers to ping. Returning job to the front of the queue.");
217217+ job_queue.lock().await.push_front(job);
218218+ continue;
219219+ }
220220+221221+ info!("Waiting up to 3 seconds for {} workers to submit bids...", expected_bids);
222222+ let bidding_deadline = Instant::now() + Duration::from_secs(3);
223223+224224+ while Instant::now() < bidding_deadline {
225225+ let bids_received = bids.lock().await.len();
226226+ if bids_received >= expected_bids {
227227+ info!("All {} expected bids received early.", expected_bids);
228228+ break;
229229+ }
230230+ // tokio::time::sleep(Duration::from_millis(100)).await;
231231+ }
232232+233233+ let winner = {
234234+ let bids_map = bids.lock().await;
235235+ info!("Evaluating {} bids received.", bids_map.len());
236236+ bids_map
237237+ .iter()
238238+ .max_by_key(|(_, bid)| bid.score())
239239+ .map(|(addr, _)| *addr)
240240+ };
241241+242242+ if let Some(winner_addr) = winner {
243243+ info!(
244244+ "Bidding complete. Winner is {}. Assigning job {}.",
245245+ winner_addr, job.id
246246+ );
247247+ let mut peers_map = peers.lock().await;
248248+ if let Some(winner_sender) = peers_map.get_mut(&winner_addr) {
249249+ let assign_msg = ServerMessage::AssignJob { job };
250250+ let assign_json = serde_json::to_string(&assign_msg).unwrap();
251251+ if winner_sender
252252+ .send(Message::Text(assign_json))
253253+ .await
254254+ .is_err()
255255+ {
256256+ error!("Failed to assign job to winner {}.", winner_addr);
257257+ }
258258+ }
259259+ } else {
260260+ info!("No bids received for job {}. Returning it to the front of the queue.", job.id);
261261+ job_queue.lock().await.push_front(job);
262262+ }
263263+ }
264264+ }
265265+}
···11+# The VM that contains a given pod
22+33+## Manual installation
44+55+Add the firecracker binary to your system.
66+77+Download a vmlinux* that has the virtio etc installed in itself and not as modules. For example, AWS seems to have one readymade, from the firecracker docs:
88+99+```
1010+ARCH="$(uname -m)"
1111+release_url="https://github.com/firecracker-microvm/firecracker/releases"
1212+latest_version=$(basename $(curl -fsSLI -o /dev/null -w %{url_effective} ${release_url}/latest))
1313+CI_VERSION=${latest_version%.*}
1414+latest_kernel_key=$(curl "http://spec.ccfc.min.s3.amazonaws.com/?prefix=firecracker-ci/$CI_VERSION/$ARCH/vmlinux-&list-type=2" \
1515+ | grep -oP "(?<=<Key>)(firecracker-ci/$CI_VERSION/$ARCH/vmlinux-[0-9]+\.[0-9]+\.[0-9]{1,3})(?=</Key>)" \
1616+ | sort -V | tail -1)
1717+1818+# Download a linux kernel binary
1919+wget "https://s3.amazonaws.com/spec.ccfc.min/${latest_kernel_key}"
2020+```
2121+2222+Add the following network rules to your system (not necessary at this stage of the project but good to have)
2323+2424+```
2525+TAP_DEV="tap0"
2626+TAP_IP="172.16.0.1"
2727+MASK_SHORT="/30"
2828+HOST_IFACE=$(ip -j route list default | jq -r '.[0].dev')
2929+3030+# Setup network interface on the host
3131+sudo ip link del "$TAP_DEV" 2> /dev/null || true
3232+sudo ip tuntap add dev "$TAP_DEV" mode tap
3333+sudo ip addr add "${TAP_IP}${MASK_SHORT}" dev "$TAP_DEV"
3434+sudo ip link set dev "$TAP_DEV" up
3535+3636+# Enable IP forwarding and masquerading
3737+sudo sh -c "echo 1 > /proc/sys/net/ipv4/ip_forward"
3838+sudo iptables -P FORWARD ACCEPT
3939+sudo iptables -t nat -A POSTROUTING -o "$HOST_IFACE" -j MASQUERADE
4040+```
4141+4242+Allow execution (chmod +x) on the create_alpine_rootfs.sh, then run it.
4343+4444+`touch firecracker.log`
4545+4646+Finally, to run the image:
4747+4848+`sudo rm -f /tmp/firecracker.socket && sudo ./firecracker --api-sock /tmp/firecracker.socket --config-file firecracker-config.json`
4949+5050+The username and password is root and root. Change that in the create_alpine_rootfs.sh file if you want.
5151+5252+To exit the tty you'll have to `shutdown` or `reboot`.
5353+5454+If you want to wire up the networking, complete the guest side of the host networking that we added on the host earlier:
5555+5656+```
5757+ip addr add 172.16.0.2/30 dev eth0
5858+ip link set eth0 up
5959+ip route add default via 172.16.0.1 dev eth0
6060+echo "nameserver 8.8.8.8" > /etc/resolv.conf
6161+```
6262+### TODO: prod machines that dynamically assign internal IPs on rootfs creation time
6363+6464+Add to /etc/network/interfaces
6565+6666+```
6767+auto lo
6868+iface lo inet loopback
6969+7070+auto eth0
7171+iface eth0 inet static
7272+ address 172.16.0.2
7373+ netmask 255.255.255.252
7474+ gateway 172.16.0.1
7575+```
7676+7777+and at startup
7878+7979+```
8080+rc-update add networking boot
8181+rc-service networking start
8282+```
8383+8484+Hmm.. also should do something for ipv6 too.