use anyhow::Context; use star::StarMstEntry; use star::StarMstNode; use star::calculate_height; use star::StarSerializer; use star::StarCommit; use star::RepoMstNode; use cid::Cid; use std::collections::HashMap; use iroh_car::CarReader; use std::env; use std::path::PathBuf; use std::sync::{ Arc, atomic::{AtomicUsize, Ordering}, }; use tokio::{io::AsyncRead, task::JoinSet}; use anyhow::Result; use mimalloc::MiMalloc; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; async fn get_cars( cars_folder: PathBuf, tx: async_channel::Sender<(tokio::io::BufReader, String)>, ) -> Result<()> { let mut dir = tokio::fs::read_dir(cars_folder).await?; while let Some(entry) = dir.next_entry().await? { if !entry.file_type().await?.is_file() { continue; } let reader = tokio::fs::File::open(&entry.path()).await?; let reader = tokio::io::BufReader::new(reader); tx.send((reader, entry.file_name().to_string_lossy().into())).await?; } Ok(()) } async fn converter( car_rx: async_channel::Receiver<(R, String)>, star_folder: String, n: Arc, ) -> Result<()> { while let Ok((f, name)) = car_rx.recv().await { n.fetch_add(1, Ordering::Relaxed); // eprintln!("failed to drive mem for {name:?}: {e:?}, skipping"); let mut car = match CarReader::new(f).await { Ok(c) => c, Err(e) => { eprintln!("skipping car: {e}"); continue; } }; let roots = car.header().roots(); assert_eq!(roots.len(), 1); let commit_cid = *roots.first().expect("a root to be present"); let mut blocks: HashMap> = HashMap::new(); while let Some((cid, data)) = car.next_block().await? { blocks.insert(cid, data); } let star_path = format!("{star_folder}/{}", name.replace(".car", ".star")); let res = tokio::task::spawn_blocking(move || { let output_file = std::fs::File::create(star_path)?; let commit_bytes = blocks.get(&commit_cid).context("Commit block not found")?; #[derive(serde::Deserialize)] struct RepoCommit { did: String, version: i64, data: Cid, rev: String, prev: Option, sig: Option, } let repo_commit: RepoCommit = serde_ipld_dagcbor::from_slice(commit_bytes)?; let root_bytes = blocks .get(&repo_commit.data) .context("repo data cannot be null")?; let root_node: RepoMstNode = serde_ipld_dagcbor::from_slice(root_bytes).context("root must be an mst node")?; let star_data = if root_node.l.is_none() && root_node.e.is_empty() { None } else { Some(repo_commit.data) }; let star_commit = StarCommit { did: repo_commit.did, version: repo_commit.version, data: star_data, rev: repo_commit.rev, prev: repo_commit.prev, sig: repo_commit.sig, }; let mut serializer = StarSerializer::new(output_file); serializer.write_header(&star_commit)?; if let Some(root_cid) = star_commit.data { write_tree(root_cid, &blocks, &mut serializer)?; } serializer.finish()?; Ok::<(), anyhow::Error>(()) }).await?; if let Err(e) = res { eprintln!("failed: {name}: {e}"); } } Ok(()) } fn write_tree( node_cid: Cid, blocks: &HashMap>, serializer: &mut StarSerializer, ) -> Result<(usize, usize)> { // println!("writing tree under {node_cid:?}..."); let mut nodes_written = 0; let mut records_written = 0; let block_bytes = blocks .get(&node_cid) .with_context(|| format!("Missing block {}", node_cid))?; let repo_node: RepoMstNode = serde_ipld_dagcbor::from_slice(block_bytes)?; let height = if let Some(first_entry) = repo_node.e.first() { calculate_height(&first_entry.k) } else { 0 }; let star_node = StarMstNode { l: repo_node.l, l_archived: repo_node.l.map(|_| true), e: repo_node .e .iter() .map(|e| { let v = if height == 0 { None } else { Some(e.v) }; StarMstEntry { p: e.p, k: e.k.clone(), v, v_archived: Some(true), t: e.t, t_archived: e.t.map(|_| true), } }) .collect(), }; serializer.write_node(&star_node)?; nodes_written += 1; if let Some(l_cid) = repo_node.l { let (n, r) = write_tree(l_cid, blocks, serializer)?; nodes_written += n; records_written += r; } for e in repo_node.e { let record_bytes = blocks .get(&e.v) .with_context(|| format!("Missing record {}", e.v))?; // eprintln!("writing record {:?} (<= {node_cid:?})", e.v); serializer.write_record(record_bytes)?; records_written += 1; if let Some(t_cid) = e.t { let (n, r) = write_tree(t_cid, blocks, serializer)?; nodes_written += n; records_written += r; } } Ok((nodes_written, records_written)) } #[tokio::main] async fn main() -> Result<()> { let args: Vec = env::args().collect(); if args.len() != 3 { eprintln!("Usage: car-to-star "); std::process::exit(1); } let cars_path = &args[1]; let stars_path = &args[2]; let mut set = JoinSet::>::new(); let (cars_tx, cars_rx) = async_channel::bounded(2); set.spawn(get_cars(cars_path.into(), cars_tx)); let n: Arc = Arc::new(0.into()); tokio::fs::create_dir_all(stars_path.clone()).await?; for _ in 0..10 { set.spawn(converter(cars_rx.clone(), stars_path.clone(), n.clone())); } drop(cars_rx); tokio::fs::create_dir_all(stars_path.clone()).await?; set.spawn({ let n = n.clone(); let mut last_n: usize = 0; let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); async move { loop { interval.tick().await; let this_n = n.load(Ordering::Relaxed); println!("{} ({this_n})", this_n - last_n); if this_n > 1 && this_n == last_n { eprintln!("done?"); break Ok(()); } last_n = this_n; } } }); while let Some(res) = set.join_next().await { println!("task from set joined: {res:?}"); } eprintln!("total repos converted: {n:?}"); Ok(()) }