Streaming Tree ARchive format
at rust-impl 244 lines 7.2 kB view raw
1use anyhow::Context; 2use star::StarMstEntry; 3use star::StarMstNode; 4use star::calculate_height; 5use star::StarSerializer; 6use star::StarCommit; 7use star::RepoMstNode; 8use cid::Cid; 9use std::collections::HashMap; 10use iroh_car::CarReader; 11use std::env; 12use std::path::PathBuf; 13use std::sync::{ 14 Arc, 15 atomic::{AtomicUsize, Ordering}, 16}; 17use tokio::{io::AsyncRead, task::JoinSet}; 18use anyhow::Result; 19 20use mimalloc::MiMalloc; 21#[global_allocator] 22static GLOBAL: MiMalloc = MiMalloc; 23 24 25async fn get_cars( 26 cars_folder: PathBuf, 27 tx: async_channel::Sender<(tokio::io::BufReader<tokio::fs::File>, String)>, 28) -> Result<()> { 29 let mut dir = tokio::fs::read_dir(cars_folder).await?; 30 while let Some(entry) = dir.next_entry().await? { 31 if !entry.file_type().await?.is_file() { 32 continue; 33 } 34 let reader = tokio::fs::File::open(&entry.path()).await?; 35 let reader = tokio::io::BufReader::new(reader); 36 tx.send((reader, entry.file_name().to_string_lossy().into())).await?; 37 } 38 Ok(()) 39} 40 41async fn converter<R: AsyncRead + Unpin + Send + Sync + 'static>( 42 car_rx: async_channel::Receiver<(R, String)>, 43 star_folder: String, 44 n: Arc<AtomicUsize>, 45) -> Result<()> { 46 while let Ok((f, name)) = car_rx.recv().await { 47 n.fetch_add(1, Ordering::Relaxed); 48 // eprintln!("failed to drive mem for {name:?}: {e:?}, skipping"); 49 50 let mut car = match CarReader::new(f).await { 51 Ok(c) => c, 52 Err(e) => { 53 eprintln!("skipping car: {e}"); 54 continue; 55 } 56 }; 57 58 let roots = car.header().roots(); 59 assert_eq!(roots.len(), 1); 60 61 let commit_cid = *roots.first().expect("a root to be present"); 62 63 let mut blocks: HashMap<Cid, Vec<u8>> = HashMap::new(); 64 65 while let Some((cid, data)) = car.next_block().await? { 66 blocks.insert(cid, data); 67 } 68 let star_path = format!("{star_folder}/{}", name.replace(".car", ".star")); 69 let res = tokio::task::spawn_blocking(move || { 70 let output_file = std::fs::File::create(star_path)?; 71 72 let commit_bytes = blocks.get(&commit_cid).context("Commit block not found")?; 73 74 #[derive(serde::Deserialize)] 75 struct RepoCommit { 76 did: String, 77 version: i64, 78 data: Cid, 79 rev: String, 80 prev: Option<Cid>, 81 sig: Option<serde_bytes::ByteBuf>, 82 } 83 84 let repo_commit: RepoCommit = serde_ipld_dagcbor::from_slice(commit_bytes)?; 85 86 let root_bytes = blocks 87 .get(&repo_commit.data) 88 .context("repo data cannot be null")?; 89 let root_node: RepoMstNode = 90 serde_ipld_dagcbor::from_slice(root_bytes).context("root must be an mst node")?; 91 92 let star_data = if root_node.l.is_none() && root_node.e.is_empty() { 93 None 94 } else { 95 Some(repo_commit.data) 96 }; 97 98 let star_commit = StarCommit { 99 did: repo_commit.did, 100 version: repo_commit.version, 101 data: star_data, 102 rev: repo_commit.rev, 103 prev: repo_commit.prev, 104 sig: repo_commit.sig, 105 }; 106 107 let mut serializer = StarSerializer::new(output_file); 108 109 serializer.write_header(&star_commit)?; 110 111 if let Some(root_cid) = star_commit.data { 112 write_tree(root_cid, &blocks, &mut serializer)?; 113 } 114 115 serializer.finish()?; 116 Ok::<(), anyhow::Error>(()) 117 }).await?; 118 119 if let Err(e) = res { 120 eprintln!("failed: {name}: {e}"); 121 } 122 } 123 Ok(()) 124} 125 126fn write_tree( 127 node_cid: Cid, 128 blocks: &HashMap<Cid, Vec<u8>>, 129 serializer: &mut StarSerializer<std::fs::File>, 130) -> Result<(usize, usize)> { 131 // println!("writing tree under {node_cid:?}..."); 132 133 let mut nodes_written = 0; 134 let mut records_written = 0; 135 136 let block_bytes = blocks 137 .get(&node_cid) 138 .with_context(|| format!("Missing block {}", node_cid))?; 139 140 let repo_node: RepoMstNode = serde_ipld_dagcbor::from_slice(block_bytes)?; 141 142 let height = if let Some(first_entry) = repo_node.e.first() { 143 calculate_height(&first_entry.k) 144 } else { 145 0 146 }; 147 148 let star_node = StarMstNode { 149 l: repo_node.l, 150 l_archived: repo_node.l.map(|_| true), 151 e: repo_node 152 .e 153 .iter() 154 .map(|e| { 155 let v = if height == 0 { None } else { Some(e.v) }; 156 StarMstEntry { 157 p: e.p, 158 k: e.k.clone(), 159 v, 160 v_archived: Some(true), 161 t: e.t, 162 t_archived: e.t.map(|_| true), 163 } 164 }) 165 .collect(), 166 }; 167 168 serializer.write_node(&star_node)?; 169 nodes_written += 1; 170 171 if let Some(l_cid) = repo_node.l { 172 let (n, r) = write_tree(l_cid, blocks, serializer)?; 173 nodes_written += n; 174 records_written += r; 175 } 176 177 for e in repo_node.e { 178 let record_bytes = blocks 179 .get(&e.v) 180 .with_context(|| format!("Missing record {}", e.v))?; 181 // eprintln!("writing record {:?} (<= {node_cid:?})", e.v); 182 serializer.write_record(record_bytes)?; 183 records_written += 1; 184 185 if let Some(t_cid) = e.t { 186 let (n, r) = write_tree(t_cid, blocks, serializer)?; 187 nodes_written += n; 188 records_written += r; 189 } 190 } 191 192 Ok((nodes_written, records_written)) 193} 194 195#[tokio::main] 196async fn main() -> Result<()> { 197 let args: Vec<String> = env::args().collect(); 198 if args.len() != 3 { 199 eprintln!("Usage: car-to-star <cars-folder> <stars-folder>"); 200 std::process::exit(1); 201 } 202 let cars_path = &args[1]; 203 let stars_path = &args[2]; 204 205 let mut set = JoinSet::<Result<()>>::new(); 206 207 let (cars_tx, cars_rx) = async_channel::bounded(2); 208 set.spawn(get_cars(cars_path.into(), cars_tx)); 209 210 let n: Arc<AtomicUsize> = Arc::new(0.into()); 211 212 tokio::fs::create_dir_all(stars_path.clone()).await?; 213 for _ in 0..10 { 214 set.spawn(converter(cars_rx.clone(), stars_path.clone(), n.clone())); 215 } 216 drop(cars_rx); 217 218 tokio::fs::create_dir_all(stars_path.clone()).await?; 219 set.spawn({ 220 let n = n.clone(); 221 let mut last_n: usize = 0; 222 let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); 223 async move { 224 loop { 225 interval.tick().await; 226 let this_n = n.load(Ordering::Relaxed); 227 println!("{} ({this_n})", this_n - last_n); 228 if this_n > 1 && this_n == last_n { 229 eprintln!("done?"); 230 break Ok(()); 231 } 232 last_n = this_n; 233 } 234 } 235 }); 236 237 while let Some(res) = set.join_next().await { 238 println!("task from set joined: {res:?}"); 239 } 240 241 eprintln!("total repos converted: {n:?}"); 242 243 Ok(()) 244}