Fast and robust atproto CAR file processing in rust

fjall 3.0.0 buggy

+246 -66
+200 -1
Cargo.lock
··· 167 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 168 169 169 [[package]] 170 + name = "byteorder-lite" 171 + version = "0.1.0" 172 + source = "registry+https://github.com/rust-lang/crates.io-index" 173 + checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 174 + 175 + [[package]] 170 176 name = "bytes" 171 177 version = "1.10.1" 172 178 source = "registry+https://github.com/rust-lang/crates.io-index" 173 179 checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 180 + 181 + [[package]] 182 + name = "byteview" 183 + version = "0.10.0" 184 + source = "registry+https://github.com/rust-lang/crates.io-index" 185 + checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca" 174 186 175 187 [[package]] 176 188 name = "cast" ··· 281 293 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 282 294 283 295 [[package]] 296 + name = "compare" 297 + version = "0.0.6" 298 + source = "registry+https://github.com/rust-lang/crates.io-index" 299 + checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" 300 + 301 + [[package]] 284 302 name = "const-str" 285 303 version = "0.4.3" 286 304 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 358 376 ] 359 377 360 378 [[package]] 379 + name = "crossbeam-skiplist" 380 + version = "0.1.3" 381 + source = "registry+https://github.com/rust-lang/crates.io-index" 382 + checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" 383 + dependencies = [ 384 + "crossbeam-epoch", 385 + "crossbeam-utils", 386 + ] 387 + 388 + [[package]] 361 389 name = "crossbeam-utils" 362 390 version = "0.8.21" 363 391 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 377 405 dependencies = [ 378 406 "generic-array", 379 407 "typenum", 408 + ] 409 + 410 + [[package]] 411 + name = "dashmap" 412 + version = "6.1.0" 413 + source = "registry+https://github.com/rust-lang/crates.io-index" 414 + checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" 415 + dependencies = [ 416 + "cfg-if", 417 + "crossbeam-utils", 418 + "hashbrown 0.14.5", 419 + "lock_api", 420 + "once_cell", 421 + "parking_lot_core", 380 422 ] 381 423 382 424 [[package]] ··· 422 464 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 423 465 424 466 [[package]] 467 + name = "enum_dispatch" 468 + version = "0.3.13" 469 + source = "registry+https://github.com/rust-lang/crates.io-index" 470 + checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" 471 + dependencies = [ 472 + "once_cell", 473 + "proc-macro2", 474 + "quote", 475 + "syn 2.0.106", 476 + ] 477 + 478 + [[package]] 425 479 name = "env_filter" 426 480 version = "0.1.3" 427 481 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 443 497 "jiff", 444 498 "log", 445 499 ] 500 + 501 + [[package]] 502 + name = "equivalent" 503 + version = "1.0.2" 504 + source = "registry+https://github.com/rust-lang/crates.io-index" 505 + checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" 446 506 447 507 [[package]] 448 508 name = "errno" ··· 471 531 version = "2.3.0" 472 532 source = "registry+https://github.com/rust-lang/crates.io-index" 473 533 checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 534 + 535 + [[package]] 536 + name = "fjall" 537 + version = "3.0.0" 538 + source = "registry+https://github.com/rust-lang/crates.io-index" 539 + checksum = "4986f550347ed1666561f36e8bf1be3c97df72850ecef0140129da6e2d0aa911" 540 + dependencies = [ 541 + "byteorder-lite", 542 + "byteview", 543 + "dashmap", 544 + "flume", 545 + "log", 546 + "lsm-tree", 547 + "lz4_flex", 548 + "tempfile", 549 + "xxhash-rust", 550 + ] 551 + 552 + [[package]] 553 + name = "flume" 554 + version = "0.12.0" 555 + source = "registry+https://github.com/rust-lang/crates.io-index" 556 + checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 557 + dependencies = [ 558 + "spin", 559 + ] 474 560 475 561 [[package]] 476 562 name = "foldhash" ··· 608 694 609 695 [[package]] 610 696 name = "hashbrown" 697 + version = "0.14.5" 698 + source = "registry+https://github.com/rust-lang/crates.io-index" 699 + checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" 700 + 701 + [[package]] 702 + name = "hashbrown" 611 703 version = "0.15.5" 612 704 source = "registry+https://github.com/rust-lang/crates.io-index" 613 705 checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" ··· 616 708 ] 617 709 618 710 [[package]] 711 + name = "hashbrown" 712 + version = "0.16.1" 713 + source = "registry+https://github.com/rust-lang/crates.io-index" 714 + checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 715 + 716 + [[package]] 619 717 name = "hashlink" 620 718 version = "0.10.0" 621 719 source = "registry+https://github.com/rust-lang/crates.io-index" 622 720 checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" 623 721 dependencies = [ 624 - "hashbrown", 722 + "hashbrown 0.15.5", 625 723 ] 626 724 627 725 [[package]] ··· 631 729 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 632 730 633 731 [[package]] 732 + name = "interval-heap" 733 + version = "0.0.5" 734 + source = "registry+https://github.com/rust-lang/crates.io-index" 735 + checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" 736 + dependencies = [ 737 + "compare", 738 + ] 739 + 740 + [[package]] 634 741 name = "io-uring" 635 742 version = "0.7.10" 636 743 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 761 868 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 762 869 763 870 [[package]] 871 + name = "lsm-tree" 872 + version = "3.0.0" 873 + source = "registry+https://github.com/rust-lang/crates.io-index" 874 + checksum = "3a206e87e8bc38114045060ec1fc6bc4e4559748a37e9622b910d80e48863e87" 875 + dependencies = [ 876 + "byteorder-lite", 877 + "byteview", 878 + "crossbeam-skiplist", 879 + "enum_dispatch", 880 + "interval-heap", 881 + "log", 882 + "lz4_flex", 883 + "quick_cache", 884 + "rustc-hash", 885 + "self_cell", 886 + "sfa", 887 + "tempfile", 888 + "varint-rs", 889 + "xxhash-rust", 890 + ] 891 + 892 + [[package]] 893 + name = "lz4_flex" 894 + version = "0.11.5" 895 + source = "registry+https://github.com/rust-lang/crates.io-index" 896 + checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" 897 + dependencies = [ 898 + "twox-hash", 899 + ] 900 + 901 + [[package]] 764 902 name = "match-lookup" 765 903 version = "0.1.1" 766 904 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 950 1088 ] 951 1089 952 1090 [[package]] 1091 + name = "quick_cache" 1092 + version = "0.6.18" 1093 + source = "registry+https://github.com/rust-lang/crates.io-index" 1094 + checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3" 1095 + dependencies = [ 1096 + "equivalent", 1097 + "hashbrown 0.16.1", 1098 + ] 1099 + 1100 + [[package]] 953 1101 name = "quote" 954 1102 version = "1.0.41" 955 1103 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1030 1178 "clap", 1031 1179 "criterion", 1032 1180 "env_logger", 1181 + "fjall", 1033 1182 "futures", 1034 1183 "futures-core", 1035 1184 "ipld-core", ··· 1067 1216 checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1068 1217 1069 1218 [[package]] 1219 + name = "rustc-hash" 1220 + version = "2.1.1" 1221 + source = "registry+https://github.com/rust-lang/crates.io-index" 1222 + checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 1223 + 1224 + [[package]] 1070 1225 name = "rustix" 1071 1226 version = "1.1.2" 1072 1227 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1105 1260 version = "1.2.0" 1106 1261 source = "registry+https://github.com/rust-lang/crates.io-index" 1107 1262 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1263 + 1264 + [[package]] 1265 + name = "self_cell" 1266 + version = "1.2.2" 1267 + source = "registry+https://github.com/rust-lang/crates.io-index" 1268 + checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" 1108 1269 1109 1270 [[package]] 1110 1271 name = "serde" ··· 1172 1333 ] 1173 1334 1174 1335 [[package]] 1336 + name = "sfa" 1337 + version = "1.0.0" 1338 + source = "registry+https://github.com/rust-lang/crates.io-index" 1339 + checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 1340 + dependencies = [ 1341 + "byteorder-lite", 1342 + "log", 1343 + "xxhash-rust", 1344 + ] 1345 + 1346 + [[package]] 1175 1347 name = "sha2" 1176 1348 version = "0.10.9" 1177 1349 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1214 1386 ] 1215 1387 1216 1388 [[package]] 1389 + name = "spin" 1390 + version = "0.9.8" 1391 + source = "registry+https://github.com/rust-lang/crates.io-index" 1392 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1393 + dependencies = [ 1394 + "lock_api", 1395 + ] 1396 + 1397 + [[package]] 1217 1398 name = "strsim" 1218 1399 version = "0.11.1" 1219 1400 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1336 1517 ] 1337 1518 1338 1519 [[package]] 1520 + name = "twox-hash" 1521 + version = "2.1.2" 1522 + source = "registry+https://github.com/rust-lang/crates.io-index" 1523 + checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" 1524 + 1525 + [[package]] 1339 1526 name = "typenum" 1340 1527 version = "1.19.0" 1341 1528 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1370 1557 version = "0.2.2" 1371 1558 source = "registry+https://github.com/rust-lang/crates.io-index" 1372 1559 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1560 + 1561 + [[package]] 1562 + name = "varint-rs" 1563 + version = "2.2.0" 1564 + source = "registry+https://github.com/rust-lang/crates.io-index" 1565 + checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" 1373 1566 1374 1567 [[package]] 1375 1568 name = "vcpkg" ··· 1659 1852 version = "0.46.0" 1660 1853 source = "registry+https://github.com/rust-lang/crates.io-index" 1661 1854 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1855 + 1856 + [[package]] 1857 + name = "xxhash-rust" 1858 + version = "0.8.15" 1859 + source = "registry+https://github.com/rust-lang/crates.io-index" 1860 + checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 1662 1861 1663 1862 [[package]] 1664 1863 name = "zerocopy"
+1
Cargo.toml
··· 8 8 9 9 [dependencies] 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 + fjall = "3.0.0" 11 12 futures = "0.3.31" 12 13 futures-core = "0.3.31" 13 14 ipld-core = { version = "0.4.2", features = ["serde"] }
-5
examples/disk-read-file/main.rs
··· 82 82 83 83 log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 84 85 - // clean up the database. would be nice to do this in drop so it happens 86 - // automatically, but some blocking work happens, so that's not allowed in 87 - // async rust. 🤷‍♀️ 88 - join.await?.reset_store().await?; 89 - 90 85 log::info!("done. n={n} zeros={zeros}"); 91 86 92 87 Ok(())
+44 -56
src/disk.rs
··· 18 18 */ 19 19 20 20 use crate::drive::DriveError; 21 - use rusqlite::OptionalExtension; 21 + use fjall::{Database, Keyspace, KeyspaceCreateOptions, Error as FjallError}; 22 22 use std::path::PathBuf; 23 23 24 24 #[derive(Debug, thiserror::Error)] ··· 28 28 /// (The wrapped err should probably be obscured to remove public-facing 29 29 /// sqlite bits) 30 30 #[error(transparent)] 31 - DbError(#[from] rusqlite::Error), 31 + DbError(#[from] FjallError), 32 32 /// A tokio blocking task failed to join 33 33 #[error("Failed to join a tokio blocking task: {0}")] 34 34 JoinError(#[from] tokio::task::JoinError), ··· 71 71 impl Default for DiskBuilder { 72 72 fn default() -> Self { 73 73 Self { 74 - cache_size_mb: 32, 74 + cache_size_mb: 64, 75 75 max_stored_mb: 10 * 1024, // 10 GiB 76 76 } 77 77 } ··· 84 84 } 85 85 /// Set the in-memory cache allowance for the database 86 86 /// 87 - /// Default: 32 MiB 87 + /// Default: 64 MiB 88 88 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 89 89 self.cache_size_mb = size; 90 90 self ··· 104 104 105 105 /// On-disk block storage 106 106 pub struct DiskStore { 107 - conn: rusqlite::Connection, 107 + #[allow(unused)] 108 + db: Database, 109 + ks: Keyspace, 108 110 max_stored: usize, 109 111 stored: usize, 110 112 } ··· 117 119 max_stored_mb: usize, 118 120 ) -> Result<Self, DiskError> { 119 121 let max_stored = max_stored_mb * 2_usize.pow(20); 120 - let conn = tokio::task::spawn_blocking(move || { 121 - let conn = rusqlite::Connection::open(path)?; 122 - 123 - let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 124 - 125 - // conn.pragma_update(None, "journal_mode", "OFF")?; 126 - // conn.pragma_update(None, "journal_mode", "MEMORY")?; 127 - conn.pragma_update(None, "journal_mode", "WAL")?; 128 - // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 129 - conn.pragma_update(None, "synchronous", "OFF")?; 130 - conn.pragma_update( 131 - None, 132 - "cache_size", 133 - (cache_mb as i64 * sqlite_one_mb).to_string(), 122 + let (db, ks) = tokio::task::spawn_blocking(move || { 123 + let db = Database::builder(path) 124 + // .manual_journal_persist(true) 125 + // .worker_threads(1) 126 + // .cache_size(cache_mb as u64 * 2_u64.pow(20)) 127 + // .temporary(true) 128 + .open()?; 129 + let ks = db.keyspace("z", || 130 + KeyspaceCreateOptions::default() 131 + // .expect_point_read_hits(true) 132 + // .manual_journal_persist(true) 134 133 )?; 135 - Self::reset_tables(&conn)?; 134 + 135 + // Self::reset_tables(&ks)?; 136 136 137 - Ok::<_, DiskError>(conn) 137 + Ok::<_, DiskError>((db, ks)) 138 138 }) 139 139 .await??; 140 140 141 141 Ok(Self { 142 - conn, 142 + db, 143 + ks, 143 144 max_stored, 144 145 stored: 0, 145 146 }) 146 147 } 147 148 pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 148 - let tx = self.conn.transaction()?; 149 149 Ok(SqliteWriter { 150 - tx, 150 + ks: self.ks.clone(), 151 151 stored: &mut self.stored, 152 152 max: self.max_stored, 153 153 }) 154 154 } 155 - pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 156 - let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 157 - Ok(SqliteReader { select_stmt }) 155 + pub(crate) fn get_reader(&self) -> Result<SqliteReader, DiskError> { 156 + Ok(SqliteReader { 157 + ks: self.ks.clone(), 158 + }) 158 159 } 159 160 /// Drop and recreate the kv table 160 161 pub async fn reset(self) -> Result<Self, DiskError> { 161 162 tokio::task::spawn_blocking(move || { 162 - Self::reset_tables(&self.conn)?; 163 + Self::reset_tables(&self.ks)?; 163 164 Ok(self) 164 165 }) 165 166 .await? 166 167 } 167 - fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 168 - conn.execute("DROP TABLE IF EXISTS blocks", ())?; 169 - conn.execute( 170 - "CREATE TABLE blocks ( 171 - key BLOB PRIMARY KEY NOT NULL, 172 - val BLOB NOT NULL 173 - ) WITHOUT ROWID", 174 - (), 175 - )?; 168 + fn reset_tables(ks: &Keyspace) -> Result<(), DiskError> { 169 + ks.clear()?; 176 170 Ok(()) 177 171 } 178 172 } 179 173 180 - pub(crate) struct SqliteWriter<'conn> { 181 - tx: rusqlite::Transaction<'conn>, 182 - stored: &'conn mut usize, 174 + pub(crate) struct SqliteWriter<'a> { 175 + ks: Keyspace, 176 + stored: &'a mut usize, 183 177 max: usize, 184 178 } 185 179 ··· 188 182 &mut self, 189 183 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 190 184 ) -> Result<(), DriveError> { 191 - let mut insert_stmt = self 192 - .tx 193 - .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 194 - .map_err(DiskError::DbError)?; 195 185 for pair in kv { 196 186 let (k, v) = pair?; 197 187 *self.stored += v.len(); 198 188 if *self.stored > self.max { 199 189 return Err(DiskError::MaxSizeExceeded.into()); 200 190 } 201 - insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 191 + self.ks.insert(k, v).map_err(DiskError::DbError)?; 202 192 } 203 193 Ok(()) 204 194 } 205 - pub fn commit(self) -> Result<(), DiskError> { 206 - self.tx.commit()?; 207 - Ok(()) 208 - } 209 195 } 210 196 211 - pub(crate) struct SqliteReader<'conn> { 212 - select_stmt: rusqlite::Statement<'conn>, 197 + pub(crate) struct SqliteReader { 198 + ks: Keyspace, 213 199 } 214 200 215 - impl SqliteReader<'_> { 216 - pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 217 - self.select_stmt 218 - .query_one((&key,), |row| row.get(0)) 219 - .optional() 201 + impl SqliteReader { 202 + pub(crate) fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, FjallError> { 203 + let rv = self 204 + .ks 205 + .get(&key)? 206 + .map(|v| v.as_ref().into()); 207 + Ok(rv) 220 208 } 221 209 }
-3
src/drive.rs
··· 343 343 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 344 344 345 345 writer.put_many(kvs)?; 346 - writer.commit()?; 347 346 Ok::<_, DriveError>(store) 348 347 }) 349 348 .await??; ··· 359 358 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 360 359 writer.put_many(kvs)?; 361 360 } 362 - 363 - writer.commit()?; 364 361 Ok::<_, DriveError>(store) 365 362 }); // await later 366 363
+1 -1
src/walk.rs
··· 19 19 #[error("Action node error: {0}")] 20 20 MstError(#[from] MstError), 21 21 #[error("storage error: {0}")] 22 - StorageError(#[from] rusqlite::Error), 22 + StorageError(#[from] fjall::Error), 23 23 #[error("Decode error: {0}")] 24 24 DecodeError(#[from] DecodeError), 25 25 }