Streaming Tree ARchive format
1use anyhow::Context;
2use star::StarMstEntry;
3use star::StarMstNode;
4use star::calculate_height;
5use star::StarSerializer;
6use star::StarCommit;
7use star::RepoMstNode;
8use cid::Cid;
9use std::collections::HashMap;
10use iroh_car::CarReader;
11use std::env;
12use std::path::PathBuf;
13use std::sync::{
14 Arc,
15 atomic::{AtomicUsize, Ordering},
16};
17use tokio::{io::AsyncRead, task::JoinSet};
18use anyhow::Result;
19
20use mimalloc::MiMalloc;
21#[global_allocator]
22static GLOBAL: MiMalloc = MiMalloc;
23
24
25async fn get_cars(
26 cars_folder: PathBuf,
27 tx: async_channel::Sender<(tokio::io::BufReader<tokio::fs::File>, String)>,
28) -> Result<()> {
29 let mut dir = tokio::fs::read_dir(cars_folder).await?;
30 while let Some(entry) = dir.next_entry().await? {
31 if !entry.file_type().await?.is_file() {
32 continue;
33 }
34 let reader = tokio::fs::File::open(&entry.path()).await?;
35 let reader = tokio::io::BufReader::new(reader);
36 tx.send((reader, entry.file_name().to_string_lossy().into())).await?;
37 }
38 Ok(())
39}
40
41async fn converter<R: AsyncRead + Unpin + Send + Sync + 'static>(
42 car_rx: async_channel::Receiver<(R, String)>,
43 star_folder: String,
44 n: Arc<AtomicUsize>,
45) -> Result<()> {
46 while let Ok((f, name)) = car_rx.recv().await {
47 n.fetch_add(1, Ordering::Relaxed);
48 // eprintln!("failed to drive mem for {name:?}: {e:?}, skipping");
49
50 let mut car = match CarReader::new(f).await {
51 Ok(c) => c,
52 Err(e) => {
53 eprintln!("skipping car: {e}");
54 continue;
55 }
56 };
57
58 let roots = car.header().roots();
59 assert_eq!(roots.len(), 1);
60
61 let commit_cid = *roots.first().expect("a root to be present");
62
63 let mut blocks: HashMap<Cid, Vec<u8>> = HashMap::new();
64
65 while let Some((cid, data)) = car.next_block().await? {
66 blocks.insert(cid, data);
67 }
68 let star_path = format!("{star_folder}/{}", name.replace(".car", ".star"));
69 let res = tokio::task::spawn_blocking(move || {
70 let output_file = std::fs::File::create(star_path)?;
71
72 let commit_bytes = blocks.get(&commit_cid).context("Commit block not found")?;
73
74 #[derive(serde::Deserialize)]
75 struct RepoCommit {
76 did: String,
77 version: i64,
78 data: Cid,
79 rev: String,
80 prev: Option<Cid>,
81 sig: Option<serde_bytes::ByteBuf>,
82 }
83
84 let repo_commit: RepoCommit = serde_ipld_dagcbor::from_slice(commit_bytes)?;
85
86 let root_bytes = blocks
87 .get(&repo_commit.data)
88 .context("repo data cannot be null")?;
89 let root_node: RepoMstNode =
90 serde_ipld_dagcbor::from_slice(root_bytes).context("root must be an mst node")?;
91
92 let star_data = if root_node.l.is_none() && root_node.e.is_empty() {
93 None
94 } else {
95 Some(repo_commit.data)
96 };
97
98 let star_commit = StarCommit {
99 did: repo_commit.did,
100 version: repo_commit.version,
101 data: star_data,
102 rev: repo_commit.rev,
103 prev: repo_commit.prev,
104 sig: repo_commit.sig,
105 };
106
107 let mut serializer = StarSerializer::new(output_file);
108
109 serializer.write_header(&star_commit)?;
110
111 if let Some(root_cid) = star_commit.data {
112 write_tree(root_cid, &blocks, &mut serializer)?;
113 }
114
115 serializer.finish()?;
116 Ok::<(), anyhow::Error>(())
117 }).await?;
118
119 if let Err(e) = res {
120 eprintln!("failed: {name}: {e}");
121 }
122 }
123 Ok(())
124}
125
126fn write_tree(
127 node_cid: Cid,
128 blocks: &HashMap<Cid, Vec<u8>>,
129 serializer: &mut StarSerializer<std::fs::File>,
130) -> Result<(usize, usize)> {
131 // println!("writing tree under {node_cid:?}...");
132
133 let mut nodes_written = 0;
134 let mut records_written = 0;
135
136 let block_bytes = blocks
137 .get(&node_cid)
138 .with_context(|| format!("Missing block {}", node_cid))?;
139
140 let repo_node: RepoMstNode = serde_ipld_dagcbor::from_slice(block_bytes)?;
141
142 let height = if let Some(first_entry) = repo_node.e.first() {
143 calculate_height(&first_entry.k)
144 } else {
145 0
146 };
147
148 let star_node = StarMstNode {
149 l: repo_node.l,
150 l_archived: repo_node.l.map(|_| true),
151 e: repo_node
152 .e
153 .iter()
154 .map(|e| {
155 let v = if height == 0 { None } else { Some(e.v) };
156 StarMstEntry {
157 p: e.p,
158 k: e.k.clone(),
159 v,
160 v_archived: Some(true),
161 t: e.t,
162 t_archived: e.t.map(|_| true),
163 }
164 })
165 .collect(),
166 };
167
168 serializer.write_node(&star_node)?;
169 nodes_written += 1;
170
171 if let Some(l_cid) = repo_node.l {
172 let (n, r) = write_tree(l_cid, blocks, serializer)?;
173 nodes_written += n;
174 records_written += r;
175 }
176
177 for e in repo_node.e {
178 let record_bytes = blocks
179 .get(&e.v)
180 .with_context(|| format!("Missing record {}", e.v))?;
181 // eprintln!("writing record {:?} (<= {node_cid:?})", e.v);
182 serializer.write_record(record_bytes)?;
183 records_written += 1;
184
185 if let Some(t_cid) = e.t {
186 let (n, r) = write_tree(t_cid, blocks, serializer)?;
187 nodes_written += n;
188 records_written += r;
189 }
190 }
191
192 Ok((nodes_written, records_written))
193}
194
195#[tokio::main]
196async fn main() -> Result<()> {
197 let args: Vec<String> = env::args().collect();
198 if args.len() != 3 {
199 eprintln!("Usage: car-to-star <cars-folder> <stars-folder>");
200 std::process::exit(1);
201 }
202 let cars_path = &args[1];
203 let stars_path = &args[2];
204
205 let mut set = JoinSet::<Result<()>>::new();
206
207 let (cars_tx, cars_rx) = async_channel::bounded(2);
208 set.spawn(get_cars(cars_path.into(), cars_tx));
209
210 let n: Arc<AtomicUsize> = Arc::new(0.into());
211
212 tokio::fs::create_dir_all(stars_path.clone()).await?;
213 for _ in 0..10 {
214 set.spawn(converter(cars_rx.clone(), stars_path.clone(), n.clone()));
215 }
216 drop(cars_rx);
217
218 tokio::fs::create_dir_all(stars_path.clone()).await?;
219 set.spawn({
220 let n = n.clone();
221 let mut last_n: usize = 0;
222 let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
223 async move {
224 loop {
225 interval.tick().await;
226 let this_n = n.load(Ordering::Relaxed);
227 println!("{} ({this_n})", this_n - last_n);
228 if this_n > 1 && this_n == last_n {
229 eprintln!("done?");
230 break Ok(());
231 }
232 last_n = this_n;
233 }
234 }
235 });
236
237 while let Some(res) = set.join_next().await {
238 println!("task from set joined: {res:?}");
239 }
240
241 eprintln!("total repos converted: {n:?}");
242
243 Ok(())
244}