Fast and robust atproto CAR file processing in rust

next version #1

merged opened by bad-example.com targeting main from fjall3

ended up being a significant rewrite oops!

  • drop sqlite, pick up fjall v3 for some speeeeeeed (and code simplification and easier build requirements and)
  • no more Processable trait, process functions are just Vec<u8> -> Vec<u8> now (bring your own ser/de). there's a potential small cost here where processors need to now actually go through serialization even for in-memory car walking, but i think zero-copy approaches (eg. rkyv) are low-cost enough
  • custom deserialize for MST nodes that does as much depth calculation and rkey validation as possible in-line. (not clear if it actually made anything faster)
  • check MST depth at every node properly (previously it could do some walking before being able to check and included some assumptions)
  • check MST for empty leaf nodes (which not allowed)
  • shave 0.6 nanoseconds (really) from MST depth calculation (don't ask)

i guess i should put this in a changelog

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.repo.pull/3mchuvd4loj22
+1056 -926
Diff #0
+228 -87
Cargo.lock
··· 27 27 ] 28 28 29 29 [[package]] 30 + name = "allocator-api2" 31 + version = "0.2.21" 32 + source = "registry+https://github.com/rust-lang/crates.io-index" 33 + checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" 34 + 35 + [[package]] 30 36 name = "anes" 31 37 version = "0.1.6" 32 38 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 126 132 ] 127 133 128 134 [[package]] 129 - name = "bincode" 130 - version = "2.0.1" 131 - source = "registry+https://github.com/rust-lang/crates.io-index" 132 - checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" 133 - dependencies = [ 134 - "bincode_derive", 135 - "serde", 136 - "unty", 137 - ] 138 - 139 - [[package]] 140 - name = "bincode_derive" 141 - version = "2.0.1" 142 - source = "registry+https://github.com/rust-lang/crates.io-index" 143 - checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" 144 - dependencies = [ 145 - "virtue", 146 - ] 147 - 148 - [[package]] 149 135 name = "bitflags" 150 136 version = "2.9.4" 151 137 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 167 153 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 154 169 155 [[package]] 156 + name = "byteorder-lite" 157 + version = "0.1.0" 158 + source = "registry+https://github.com/rust-lang/crates.io-index" 159 + checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 160 + 161 + [[package]] 170 162 name = "bytes" 171 - version = "1.10.1" 163 + version = "1.11.0" 164 + source = "registry+https://github.com/rust-lang/crates.io-index" 165 + checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" 166 + 167 + [[package]] 168 + name = "byteview" 169 + version = "0.10.0" 172 170 source = "registry+https://github.com/rust-lang/crates.io-index" 173 - checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 171 + checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca" 174 172 175 173 [[package]] 176 174 name = "cast" ··· 188 186 ] 189 187 190 188 [[package]] 189 + name = "cc" 190 + version = "1.2.52" 191 + source = "registry+https://github.com/rust-lang/crates.io-index" 192 + checksum = "cd4932aefd12402b36c60956a4fe0035421f544799057659ff86f923657aada3" 193 + dependencies = [ 194 + "find-msvc-tools", 195 + "shlex", 196 + ] 197 + 198 + [[package]] 191 199 name = "cfg-if" 192 200 version = "1.0.3" 193 201 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 281 289 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 282 290 283 291 [[package]] 292 + name = "compare" 293 + version = "0.0.6" 294 + source = "registry+https://github.com/rust-lang/crates.io-index" 295 + checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" 296 + 297 + [[package]] 284 298 name = "const-str" 285 299 version = "0.4.3" 286 300 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 358 372 ] 359 373 360 374 [[package]] 375 + name = "crossbeam-skiplist" 376 + version = "0.1.3" 377 + source = "registry+https://github.com/rust-lang/crates.io-index" 378 + checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" 379 + dependencies = [ 380 + "crossbeam-epoch", 381 + "crossbeam-utils", 382 + ] 383 + 384 + [[package]] 361 385 name = "crossbeam-utils" 362 386 version = "0.8.21" 363 387 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 380 404 ] 381 405 382 406 [[package]] 407 + name = "dashmap" 408 + version = "6.1.0" 409 + source = "registry+https://github.com/rust-lang/crates.io-index" 410 + checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" 411 + dependencies = [ 412 + "cfg-if", 413 + "crossbeam-utils", 414 + "hashbrown 0.14.5", 415 + "lock_api", 416 + "once_cell", 417 + "parking_lot_core", 418 + ] 419 + 420 + [[package]] 383 421 name = "data-encoding" 384 422 version = "2.9.0" 385 423 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 422 460 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 423 461 424 462 [[package]] 463 + name = "enum_dispatch" 464 + version = "0.3.13" 465 + source = "registry+https://github.com/rust-lang/crates.io-index" 466 + checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" 467 + dependencies = [ 468 + "once_cell", 469 + "proc-macro2", 470 + "quote", 471 + "syn 2.0.106", 472 + ] 473 + 474 + [[package]] 425 475 name = "env_filter" 426 476 version = "0.1.3" 427 477 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 445 495 ] 446 496 447 497 [[package]] 498 + name = "equivalent" 499 + version = "1.0.2" 500 + source = "registry+https://github.com/rust-lang/crates.io-index" 501 + checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" 502 + 503 + [[package]] 448 504 name = "errno" 449 505 version = "0.3.14" 450 506 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 455 511 ] 456 512 457 513 [[package]] 458 - name = "fallible-iterator" 459 - version = "0.3.0" 514 + name = "fastrand" 515 + version = "2.3.0" 460 516 source = "registry+https://github.com/rust-lang/crates.io-index" 461 - checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" 517 + checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 462 518 463 519 [[package]] 464 - name = "fallible-streaming-iterator" 465 - version = "0.1.9" 520 + name = "find-msvc-tools" 521 + version = "0.1.7" 466 522 source = "registry+https://github.com/rust-lang/crates.io-index" 467 - checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 523 + checksum = "f449e6c6c08c865631d4890cfacf252b3d396c9bcc83adb6623cdb02a8336c41" 524 + 525 + [[package]] 526 + name = "fjall" 527 + version = "3.0.1" 528 + source = "registry+https://github.com/rust-lang/crates.io-index" 529 + checksum = "4f69637c02d38ad1b0f003101d0195a60368130aa17d9ef78b1557d265a22093" 530 + dependencies = [ 531 + "byteorder-lite", 532 + "byteview", 533 + "dashmap", 534 + "flume", 535 + "log", 536 + "lsm-tree", 537 + "tempfile", 538 + "xxhash-rust", 539 + ] 468 540 469 541 [[package]] 470 - name = "fastrand" 471 - version = "2.3.0" 542 + name = "flume" 543 + version = "0.12.0" 472 544 source = "registry+https://github.com/rust-lang/crates.io-index" 473 - checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 545 + checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 546 + dependencies = [ 547 + "spin", 548 + ] 474 549 475 550 [[package]] 476 551 name = "foldhash" 477 - version = "0.1.5" 552 + version = "0.2.0" 478 553 source = "registry+https://github.com/rust-lang/crates.io-index" 479 - checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 554 + checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" 480 555 481 556 [[package]] 482 557 name = "futures" ··· 608 683 609 684 [[package]] 610 685 name = "hashbrown" 611 - version = "0.15.5" 686 + version = "0.14.5" 612 687 source = "registry+https://github.com/rust-lang/crates.io-index" 613 - checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" 614 - dependencies = [ 615 - "foldhash", 616 - ] 688 + checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" 617 689 618 690 [[package]] 619 - name = "hashlink" 620 - version = "0.10.0" 691 + name = "hashbrown" 692 + version = "0.16.1" 621 693 source = "registry+https://github.com/rust-lang/crates.io-index" 622 - checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" 694 + checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 623 695 dependencies = [ 624 - "hashbrown", 696 + "allocator-api2", 697 + "equivalent", 698 + "foldhash", 625 699 ] 626 700 627 701 [[package]] ··· 631 705 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 632 706 633 707 [[package]] 708 + name = "hmac-sha256" 709 + version = "1.1.12" 710 + source = "registry+https://github.com/rust-lang/crates.io-index" 711 + checksum = "ad6880c8d4a9ebf39c6e8b77007ce223f646a4d21ce29d99f70cb16420545425" 712 + 713 + [[package]] 714 + name = "interval-heap" 715 + version = "0.0.5" 716 + source = "registry+https://github.com/rust-lang/crates.io-index" 717 + checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" 718 + dependencies = [ 719 + "compare", 720 + ] 721 + 722 + [[package]] 634 723 name = "io-uring" 635 724 version = "0.7.10" 636 725 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 730 819 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 731 820 732 821 [[package]] 733 - name = "libsqlite3-sys" 734 - version = "0.35.0" 822 + name = "libmimalloc-sys" 823 + version = "0.1.44" 735 824 source = "registry+https://github.com/rust-lang/crates.io-index" 736 - checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f" 825 + checksum = "667f4fec20f29dfc6bc7357c582d91796c169ad7e2fce709468aefeb2c099870" 737 826 dependencies = [ 738 - "pkg-config", 739 - "vcpkg", 827 + "cc", 828 + "libc", 740 829 ] 741 830 742 831 [[package]] ··· 761 850 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 762 851 763 852 [[package]] 853 + name = "lsm-tree" 854 + version = "3.0.1" 855 + source = "registry+https://github.com/rust-lang/crates.io-index" 856 + checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb" 857 + dependencies = [ 858 + "byteorder-lite", 859 + "byteview", 860 + "crossbeam-skiplist", 861 + "enum_dispatch", 862 + "interval-heap", 863 + "log", 864 + "quick_cache", 865 + "rustc-hash", 866 + "self_cell", 867 + "sfa", 868 + "tempfile", 869 + "varint-rs", 870 + "xxhash-rust", 871 + ] 872 + 873 + [[package]] 764 874 name = "match-lookup" 765 875 version = "0.1.1" 766 876 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 776 886 version = "2.7.6" 777 887 source = "registry+https://github.com/rust-lang/crates.io-index" 778 888 checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" 889 + 890 + [[package]] 891 + name = "mimalloc" 892 + version = "0.1.48" 893 + source = "registry+https://github.com/rust-lang/crates.io-index" 894 + checksum = "e1ee66a4b64c74f4ef288bcbb9192ad9c3feaad75193129ac8509af543894fd8" 895 + dependencies = [ 896 + "libmimalloc-sys", 897 + ] 779 898 780 899 [[package]] 781 900 name = "miniz_oxide" ··· 892 1011 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 893 1012 894 1013 [[package]] 895 - name = "pkg-config" 896 - version = "0.3.32" 897 - source = "registry+https://github.com/rust-lang/crates.io-index" 898 - checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 899 - 900 - [[package]] 901 1014 name = "plotters" 902 1015 version = "0.3.7" 903 1016 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 947 1060 checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 948 1061 dependencies = [ 949 1062 "unicode-ident", 1063 + ] 1064 + 1065 + [[package]] 1066 + name = "quick_cache" 1067 + version = "0.6.18" 1068 + source = "registry+https://github.com/rust-lang/crates.io-index" 1069 + checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3" 1070 + dependencies = [ 1071 + "equivalent", 1072 + "hashbrown 0.16.1", 950 1073 ] 951 1074 952 1075 [[package]] ··· 1026 1149 name = "repo-stream" 1027 1150 version = "0.2.2" 1028 1151 dependencies = [ 1029 - "bincode", 1152 + "cid", 1030 1153 "clap", 1031 1154 "criterion", 1032 1155 "env_logger", 1033 - "futures", 1034 - "futures-core", 1035 - "ipld-core", 1156 + "fjall", 1157 + "hashbrown 0.16.1", 1158 + "hmac-sha256", 1036 1159 "iroh-car", 1037 1160 "log", 1161 + "mimalloc", 1038 1162 "multibase", 1039 - "rusqlite", 1040 1163 "serde", 1041 1164 "serde_bytes", 1042 1165 "serde_ipld_dagcbor", ··· 1047 1170 ] 1048 1171 1049 1172 [[package]] 1050 - name = "rusqlite" 1051 - version = "0.37.0" 1173 + name = "rustc-demangle" 1174 + version = "0.1.26" 1052 1175 source = "registry+https://github.com/rust-lang/crates.io-index" 1053 - checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f" 1054 - dependencies = [ 1055 - "bitflags", 1056 - "fallible-iterator", 1057 - "fallible-streaming-iterator", 1058 - "hashlink", 1059 - "libsqlite3-sys", 1060 - "smallvec", 1061 - ] 1176 + checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1062 1177 1063 1178 [[package]] 1064 - name = "rustc-demangle" 1065 - version = "0.1.26" 1179 + name = "rustc-hash" 1180 + version = "2.1.1" 1066 1181 source = "registry+https://github.com/rust-lang/crates.io-index" 1067 - checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1182 + checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 1068 1183 1069 1184 [[package]] 1070 1185 name = "rustix" ··· 1105 1220 version = "1.2.0" 1106 1221 source = "registry+https://github.com/rust-lang/crates.io-index" 1107 1222 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1223 + 1224 + [[package]] 1225 + name = "self_cell" 1226 + version = "1.2.2" 1227 + source = "registry+https://github.com/rust-lang/crates.io-index" 1228 + checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" 1108 1229 1109 1230 [[package]] 1110 1231 name = "serde" ··· 1169 1290 "ryu", 1170 1291 "serde", 1171 1292 "serde_core", 1293 + ] 1294 + 1295 + [[package]] 1296 + name = "sfa" 1297 + version = "1.0.0" 1298 + source = "registry+https://github.com/rust-lang/crates.io-index" 1299 + checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 1300 + dependencies = [ 1301 + "byteorder-lite", 1302 + "log", 1303 + "xxhash-rust", 1172 1304 ] 1173 1305 1174 1306 [[package]] ··· 1183 1315 ] 1184 1316 1185 1317 [[package]] 1318 + name = "shlex" 1319 + version = "1.3.0" 1320 + source = "registry+https://github.com/rust-lang/crates.io-index" 1321 + checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" 1322 + 1323 + [[package]] 1186 1324 name = "signal-hook-registry" 1187 1325 version = "1.4.6" 1188 1326 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1211 1349 dependencies = [ 1212 1350 "libc", 1213 1351 "windows-sys 0.59.0", 1352 + ] 1353 + 1354 + [[package]] 1355 + name = "spin" 1356 + version = "0.9.8" 1357 + source = "registry+https://github.com/rust-lang/crates.io-index" 1358 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1359 + dependencies = [ 1360 + "lock_api", 1214 1361 ] 1215 1362 1216 1363 [[package]] ··· 1360 1507 checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" 1361 1508 1362 1509 [[package]] 1363 - name = "unty" 1364 - version = "0.0.4" 1365 - source = "registry+https://github.com/rust-lang/crates.io-index" 1366 - checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" 1367 - 1368 - [[package]] 1369 1510 name = "utf8parse" 1370 1511 version = "0.2.2" 1371 1512 source = "registry+https://github.com/rust-lang/crates.io-index" 1372 1513 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1373 1514 1374 1515 [[package]] 1375 - name = "vcpkg" 1376 - version = "0.2.15" 1516 + name = "varint-rs" 1517 + version = "2.2.0" 1377 1518 source = "registry+https://github.com/rust-lang/crates.io-index" 1378 - checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 1519 + checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" 1379 1520 1380 1521 [[package]] 1381 1522 name = "version_check" 1382 1523 version = "0.9.5" 1383 1524 source = "registry+https://github.com/rust-lang/crates.io-index" 1384 1525 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 1385 - 1386 - [[package]] 1387 - name = "virtue" 1388 - version = "0.0.18" 1389 - source = "registry+https://github.com/rust-lang/crates.io-index" 1390 - checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" 1391 1526 1392 1527 [[package]] 1393 1528 name = "walkdir" ··· 1659 1794 version = "0.46.0" 1660 1795 source = "registry+https://github.com/rust-lang/crates.io-index" 1661 1796 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1797 + 1798 + [[package]] 1799 + name = "xxhash-rust" 1800 + version = "0.8.15" 1801 + source = "registry+https://github.com/rust-lang/crates.io-index" 1802 + checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 1662 1803 1663 1804 [[package]] 1664 1805 name = "zerocopy"
+10 -7
Cargo.toml
··· 7 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 8 9 9 [dependencies] 10 - bincode = { version = "2.0.1", features = ["serde"] } 11 - futures = "0.3.31" 12 - futures-core = "0.3.31" 13 - ipld-core = { version = "0.4.2", features = ["serde"] } 10 + fjall = { version = "3.0.1", default-features = false } 11 + hashbrown = "0.16.1" 12 + cid = { version = "0.11.1", features = ["serde"] } 14 13 iroh-car = "0.5.1" 15 14 log = "0.4.28" 16 - multibase = "0.9.2" 17 - rusqlite = "0.37.0" 18 15 serde = { version = "1.0.228", features = ["derive"] } 19 16 serde_bytes = "0.11.19" 20 17 serde_ipld_dagcbor = "0.6.4" 21 - sha2 = "0.10.9" 18 + sha2 = "0.10.9" # note: hmac-sha256 is simpler, smaller, benches ~15ns slower 22 19 thiserror = "2.0.17" 23 20 tokio = { version = "1.47.1", features = ["rt", "sync"] } 24 21 ··· 29 26 multibase = "0.9.2" 30 27 tempfile = "3.23.0" 31 28 tokio = { version = "1.47.1", features = ["full"] } 29 + mimalloc = "0.1.48" 30 + hmac-sha256 = "1.1.12" 32 31 33 32 [profile.profiling] 34 33 inherits = "release" ··· 44 43 [[bench]] 45 44 name = "huge-car" 46 45 harness = false 46 + 47 + # [[bench]] 48 + # name = "leading" 49 + # harness = false
+11 -4
benches/huge-car.rs
··· 4 4 5 5 use criterion::{Criterion, criterion_group, criterion_main}; 6 6 7 + // use mimalloc::MiMalloc; 8 + // #[global_allocator] 9 + // static GLOBAL: MiMalloc = MiMalloc; 10 + 7 11 pub fn criterion_benchmark(c: &mut Criterion) { 8 12 let rt = tokio::runtime::Builder::new_multi_thread() 9 13 .enable_all() ··· 18 22 }); 19 23 } 20 24 25 + #[inline(always)] 26 + fn ser(block: Vec<u8>) -> Vec<u8> { 27 + let s = block.len(); 28 + usize::to_ne_bytes(s).to_vec() 29 + } 30 + 21 31 async fn drive_car(filename: impl AsRef<Path>) -> usize { 22 32 let reader = tokio::fs::File::open(filename).await.unwrap(); 23 33 let reader = tokio::io::BufReader::new(reader); 24 34 25 - let mut driver = match Driver::load_car(reader, |block| block.len(), 1024) 26 - .await 27 - .unwrap() 28 - { 35 + let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() { 29 36 Driver::Memory(_, mem_driver) => mem_driver, 30 37 Driver::Disk(_) => panic!("not doing disk for benchmark"), 31 38 };
+66
benches/leading.rs
··· 1 + use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; 2 + use hmac_sha256::Hash; 3 + use sha2::{Digest, Sha256}; 4 + 5 + pub fn compute(bytes: [u8; 32]) -> u32 { 6 + let mut zeros = 0; 7 + for byte in bytes { 8 + if byte == 0 { 9 + zeros += 8 10 + } else { 11 + zeros += byte.leading_zeros(); 12 + break; 13 + } 14 + } 15 + zeros / 2 16 + } 17 + 18 + pub fn compute2(bytes: [u8; 32]) -> u32 { 19 + u128::from_be_bytes(bytes.split_at(16).0.try_into().unwrap()).leading_zeros() / 2 20 + } 21 + 22 + fn from_key_old(key: &[u8]) -> u32 { 23 + compute2(Sha256::digest(key).into()) 24 + } 25 + 26 + fn from_key_new(key: &[u8]) -> u32 { 27 + compute2(Hash::hash(key).into()) 28 + } 29 + 30 + pub fn criterion_benchmark(c: &mut Criterion) { 31 + for (name, case) in [ 32 + ("no zeros", [0xFF; 32]), 33 + ("two zeros", [0x3F; 32]), 34 + ( 35 + "some zeros", 36 + [ 37 + 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 38 + 1, 1, 1, 1, 39 + ], 40 + ), 41 + ( 42 + "many zeros", 43 + [ 44 + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 45 + 1, 1, 1, 1, 46 + ], 47 + ), 48 + ] { 49 + let mut g = c.benchmark_group(name); 50 + g.bench_function("old", |b| { 51 + b.iter_batched(|| case.clone(), |c| compute(c), BatchSize::SmallInput) 52 + }); 53 + g.bench_function("new", |b| { 54 + b.iter_batched(|| case.clone(), |c| compute2(c), BatchSize::SmallInput) 55 + }); 56 + } 57 + 58 + for case in ["a", "aa", "aaa", "aaaa"] { 59 + let mut g = c.benchmark_group(case); 60 + g.bench_function("old", |b| b.iter(|| from_key_old(case.as_bytes()))); 61 + g.bench_function("new", |b| b.iter(|| from_key_new(case.as_bytes()))); 62 + } 63 + } 64 + 65 + criterion_group!(benches, criterion_benchmark); 66 + criterion_main!(benches);
+11 -4
benches/non-huge-cars.rs
··· 3 3 4 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 5 6 + // use mimalloc::MiMalloc; 7 + // #[global_allocator] 8 + // static GLOBAL: MiMalloc = MiMalloc; 9 + 6 10 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 7 11 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 8 12 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); ··· 28 32 }); 29 33 } 30 34 35 + #[inline(always)] 36 + fn ser(block: Vec<u8>) -> Vec<u8> { 37 + let s = block.len(); 38 + usize::to_ne_bytes(s).to_vec() 39 + } 40 + 31 41 async fn drive_car(bytes: &[u8]) -> usize { 32 - let mut driver = match Driver::load_car(bytes, |block| block.len(), 32) 33 - .await 34 - .unwrap() 35 - { 42 + let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 36 43 Driver::Memory(_, mem_driver) => mem_driver, 37 44 Driver::Disk(_) => panic!("not benching big cars here"), 38 45 };
+10 -5
examples/disk-read-file/main.rs
··· 3 3 */ 4 4 5 5 extern crate repo_stream; 6 + 7 + use mimalloc::MiMalloc; 8 + #[global_allocator] 9 + static GLOBAL: MiMalloc = MiMalloc; 10 + 6 11 use clap::Parser; 7 12 use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 8 13 use std::path::PathBuf; ··· 33 38 // in this example we only bother handling CARs that are too big for memory 34 39 // `noop` helper means: do no block processing, store the raw blocks 35 40 let driver = match DriverBuilder::new() 36 - .with_mem_limit_mb(10) // how much memory can be used before disk spill 41 + .with_mem_limit_mb(32) // how much memory can be used before disk spill 37 42 .load_car(reader) 38 43 .await? 39 44 { ··· 52 57 // via the DID from the commit, and then verify the signature. 53 58 log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 54 59 60 + // log::info!("now is good time to check mem usage..."); 61 + // tokio::time::sleep(std::time::Duration::from_secs(15)).await; 62 + 55 63 // pop the driver back out to get some code indentation relief 56 64 driver 57 65 } ··· 82 90 83 91 log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 92 85 - // clean up the database. would be nice to do this in drop so it happens 86 - // automatically, but some blocking work happens, so that's not allowed in 87 - // async rust. 🤷‍♀️ 88 - join.await?.reset_store().await?; 93 + join.await?; 89 94 90 95 log::info!("done. n={n} zeros={zeros}"); 91 96
+1 -1
examples/read-file/main.rs
··· 24 24 let reader = tokio::io::BufReader::new(reader); 25 25 26 26 let (commit, mut driver) = match DriverBuilder::new() 27 - .with_block_processor(|block| block.len()) 27 + .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 28 28 .load_car(reader) 29 29 .await? 30 30 {
+49 -10
readme.md
··· 50 50 total_size += size; 51 51 } 52 52 } 53 - 54 - // clean up the disk store (drop tables etc) 55 - driver.reset_store().await?; 56 53 } 57 54 }; 58 55 println!("sum of size of all records: {total_size}"); ··· 61 58 ``` 62 59 63 60 more recent todo 64 - 65 - - [ ] get an *emtpy* car for the test suite 61 + - [ ] repo car slices 62 + - [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling) 63 + - [x] get an *emtpy* car for the test suite 66 64 - [x] implement a max size on disk limit 67 65 68 66 ··· 73 71 74 72 current car processing times (records processed into their length usize, phil's dev machine): 75 73 76 - - 128MiB CAR file: `347ms` 77 - - 5.0MiB: `6.1ms` 78 - - 279KiB: `139us` 79 - - 3.4KiB: `4.9us` 74 + - 450MiB CAR file (huge): `1.3s` 75 + - 128MiB (huge): `350ms` 76 + - 5.0MiB: `6.8ms` 77 + - 279KiB: `160us` 78 + - 3.4KiB: `5.1us` 79 + - empty: `690ns` 80 + 81 + it's a little faster with `mimalloc` 82 + 83 + ```rust 84 + use mimalloc::MiMalloc; 85 + #[global_allocator] 86 + static GLOBAL: MiMalloc = MiMalloc; 87 + ``` 88 + 89 + - 450MiB CAR file: `1.2s` (-8%) 90 + - 128MiB: `300ms` (-14%) 91 + - 5.0MiB: `6.0ms` (-11%) 92 + - 279KiB: `150us` (-7%) 93 + - 3.4KiB: `4.7us` (-8%) 94 + - empty: `670ns` (-4%) 95 + 96 + processing CARs requires buffering blocks, so it can consume a lot of memory. repo-stream's in-memory driver has minimal memory overhead, but there are two ways to make it work with less mem (you can do either or both!) 97 + 98 + 1. spill blocks to disk 99 + 2. inline block processing 100 + 101 + #### spill blocks to disk 102 + 103 + this is a little slower but can greatly reduce the memory used. there's nothing special you need to do for this. 104 + 105 + 106 + #### inline block processing 107 + 108 + if you don't need to store the complete records, you can have repo-stream try to optimistically apply a processing function to the raw blocks as they are streamed in. 109 + 110 + 111 + #### constrained mem perf comparison 80 112 113 + sketchy benchmark but hey. mimalloc is enabled, and the processing spills to disk. inline processing reduces entire records to 8 bytes (usize of the raw record block size): 81 114 82 - running the huge-car benchmark 115 + - 450MiB CAR file: `5.0s` (4.5x slowdown for disk) 116 + - 128MiB: `1.27s` (4.1x slowdown) 117 + 118 + fortunately, most CARs in the ATmosphere are very small, so for eg. backfill purposes, the vast majority of inputs will not face this slowdown. 119 + 120 + 121 + #### running the huge-car benchmark 83 122 84 123 - to avoid committing it to the repo, you have to pass it in through the env for now. 85 124
+32 -99
src/disk.rs
··· 17 17 ``` 18 18 */ 19 19 20 - use crate::drive::DriveError; 21 - use rusqlite::OptionalExtension; 20 + use crate::{Bytes, drive::DriveError}; 21 + use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 22 22 use std::path::PathBuf; 23 23 24 24 #[derive(Debug, thiserror::Error)] ··· 28 28 /// (The wrapped err should probably be obscured to remove public-facing 29 29 /// sqlite bits) 30 30 #[error(transparent)] 31 - DbError(#[from] rusqlite::Error), 31 + DbError(#[from] FjallError), 32 32 /// A tokio blocking task failed to join 33 33 #[error("Failed to join a tokio blocking task: {0}")] 34 34 JoinError(#[from] tokio::task::JoinError), ··· 38 38 /// limit. 39 39 #[error("Maximum disk size reached")] 40 40 MaxSizeExceeded, 41 - #[error("this error was replaced, seeing this is a bug.")] 42 - #[doc(hidden)] 43 - Stolen, 44 - } 45 - 46 - impl DiskError { 47 - /// hack for ownership challenges with the disk driver 48 - pub(crate) fn steal(&mut self) -> Self { 49 - let mut swapped = DiskError::Stolen; 50 - std::mem::swap(self, &mut swapped); 51 - swapped 52 - } 53 41 } 54 42 55 43 /// Builder-style disk store setup ··· 71 59 impl Default for DiskBuilder { 72 60 fn default() -> Self { 73 61 Self { 74 - cache_size_mb: 32, 62 + cache_size_mb: 64, 75 63 max_stored_mb: 10 * 1024, // 10 GiB 76 64 } 77 65 } ··· 84 72 } 85 73 /// Set the in-memory cache allowance for the database 86 74 /// 87 - /// Default: 32 MiB 75 + /// Default: 64 MiB 88 76 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 89 77 self.cache_size_mb = size; 90 78 self ··· 104 92 105 93 /// On-disk block storage 106 94 pub struct DiskStore { 107 - conn: rusqlite::Connection, 95 + #[allow(unused)] 96 + db: Database, 97 + partition: Keyspace, 108 98 max_stored: usize, 109 99 stored: usize, 110 100 } ··· 117 107 max_stored_mb: usize, 118 108 ) -> Result<Self, DiskError> { 119 109 let max_stored = max_stored_mb * 2_usize.pow(20); 120 - let conn = tokio::task::spawn_blocking(move || { 121 - let conn = rusqlite::Connection::open(path)?; 122 - 123 - let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 124 - 125 - // conn.pragma_update(None, "journal_mode", "OFF")?; 126 - // conn.pragma_update(None, "journal_mode", "MEMORY")?; 127 - conn.pragma_update(None, "journal_mode", "WAL")?; 128 - // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 129 - conn.pragma_update(None, "synchronous", "OFF")?; 130 - conn.pragma_update( 131 - None, 132 - "cache_size", 133 - (cache_mb as i64 * sqlite_one_mb).to_string(), 134 - )?; 135 - Self::reset_tables(&conn)?; 110 + let (db, partition) = tokio::task::spawn_blocking(move || { 111 + let db = Database::builder(path) 112 + .manual_journal_persist(true) 113 + .worker_threads(1) 114 + .cache_size(cache_mb as u64 * 2_u64.pow(20) / 2) 115 + .temporary(true) 116 + .open()?; 117 + let opts = KeyspaceCreateOptions::default() 118 + .expect_point_read_hits(true) 119 + .max_memtable_size(16 * 2_u64.pow(20)); 120 + let partition = db.keyspace("z", || opts)?; 136 121 137 - Ok::<_, DiskError>(conn) 122 + Ok::<_, DiskError>((db, partition)) 138 123 }) 139 124 .await??; 140 125 141 126 Ok(Self { 142 - conn, 127 + db, 128 + partition, 143 129 max_stored, 144 130 stored: 0, 145 131 }) 146 132 } 147 - pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 148 - let tx = self.conn.transaction()?; 149 - Ok(SqliteWriter { 150 - tx, 151 - stored: &mut self.stored, 152 - max: self.max_stored, 153 - }) 154 - } 155 - pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 156 - let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 157 - Ok(SqliteReader { select_stmt }) 158 - } 159 - /// Drop and recreate the kv table 160 - pub async fn reset(self) -> Result<Self, DiskError> { 161 - tokio::task::spawn_blocking(move || { 162 - Self::reset_tables(&self.conn)?; 163 - Ok(self) 164 - }) 165 - .await? 166 - } 167 - fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 168 - conn.execute("DROP TABLE IF EXISTS blocks", ())?; 169 - conn.execute( 170 - "CREATE TABLE blocks ( 171 - key BLOB PRIMARY KEY NOT NULL, 172 - val BLOB NOT NULL 173 - ) WITHOUT ROWID", 174 - (), 175 - )?; 176 - Ok(()) 177 - } 178 - } 179 133 180 - pub(crate) struct SqliteWriter<'conn> { 181 - tx: rusqlite::Transaction<'conn>, 182 - stored: &'conn mut usize, 183 - max: usize, 184 - } 185 - 186 - impl SqliteWriter<'_> { 187 134 pub(crate) fn put_many( 188 135 &mut self, 189 - kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 136 + kv: impl Iterator<Item = (Vec<u8>, Bytes)>, 190 137 ) -> Result<(), DriveError> { 191 - let mut insert_stmt = self 192 - .tx 193 - .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 194 - .map_err(DiskError::DbError)?; 195 - for pair in kv { 196 - let (k, v) = pair?; 197 - *self.stored += v.len(); 198 - if *self.stored > self.max { 138 + let mut batch = self.db.batch(); 139 + for (k, v) in kv { 140 + self.stored += v.len(); 141 + if self.stored > self.max_stored { 199 142 return Err(DiskError::MaxSizeExceeded.into()); 200 143 } 201 - insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 144 + batch.insert(&self.partition, k, v); 202 145 } 203 - Ok(()) 204 - } 205 - pub fn commit(self) -> Result<(), DiskError> { 206 - self.tx.commit()?; 146 + batch.commit().map_err(DiskError::DbError)?; 207 147 Ok(()) 208 148 } 209 - } 210 - 211 - pub(crate) struct SqliteReader<'conn> { 212 - select_stmt: rusqlite::Statement<'conn>, 213 - } 214 149 215 - impl SqliteReader<'_> { 216 - pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 217 - self.select_stmt 218 - .query_one((&key,), |row| row.get(0)) 219 - .optional() 150 + #[inline] 151 + pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 152 + self.partition.get(key) 220 153 } 221 154 }
+139 -215
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 - use crate::disk::{DiskError, DiskStore}; 4 - use crate::process::Processable; 5 - use ipld_core::cid::Cid; 3 + use crate::{ 4 + Bytes, HashMap, 5 + disk::{DiskError, DiskStore}, 6 + mst::MstNode, 7 + walk::Output, 8 + }; 9 + use cid::Cid; 6 10 use iroh_car::CarReader; 7 - use serde::{Deserialize, Serialize}; 8 - use std::collections::HashMap; 9 11 use std::convert::Infallible; 10 12 use tokio::{io::AsyncRead, sync::mpsc}; 11 13 12 - use crate::mst::{Commit, Node}; 13 - use crate::walk::{Step, WalkError, Walker}; 14 + use crate::mst::Commit; 15 + use crate::walk::{WalkError, Walker}; 14 16 15 17 /// Errors that can happen while consuming and emitting blocks and records 16 18 #[derive(Debug, thiserror::Error)] ··· 21 23 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 22 24 #[error("The Commit block reference by the root was not found")] 23 25 MissingCommit, 24 - #[error("The MST block {0} could not be found")] 25 - MissingBlock(Cid), 26 26 #[error("Failed to walk the mst tree: {0}")] 27 27 WalkError(#[from] WalkError), 28 28 #[error("CAR file had no roots")] 29 29 MissingRoot, 30 30 #[error("Storage error")] 31 31 StorageError(#[from] DiskError), 32 - #[error("Encode error: {0}")] 33 - BincodeEncodeError(#[from] bincode::error::EncodeError), 34 32 #[error("Tried to send on a closed channel")] 35 33 ChannelSendError, // SendError takes <T> which we don't need 36 34 #[error("Failed to join a task: {0}")] 37 35 JoinError(#[from] tokio::task::JoinError), 38 36 } 39 37 40 - #[derive(Debug, thiserror::Error)] 41 - pub enum DecodeError { 42 - #[error(transparent)] 43 - BincodeDecodeError(#[from] bincode::error::DecodeError), 44 - #[error("extra bytes remained after decoding")] 45 - ExtraGarbage, 46 - } 47 - 48 38 /// An in-order chunk of Rkey + (processed) Block pairs 49 - pub type BlockChunk<T> = Vec<(String, T)>; 39 + pub type BlockChunk = Vec<(String, Bytes)>; 50 40 51 - #[derive(Debug, Clone, Serialize, Deserialize)] 52 - pub(crate) enum MaybeProcessedBlock<T> { 41 + #[derive(Debug, Clone)] 42 + pub(crate) enum MaybeProcessedBlock { 53 43 /// A block that's *probably* a Node (but we can't know yet) 54 44 /// 55 45 /// It *can be* a record that suspiciously looks a lot like a node, so we 56 46 /// cannot eagerly turn it into a Node. We only know for sure what it is 57 47 /// when we actually walk down the MST 58 - Raw(Vec<u8>), 48 + Raw(Bytes), 59 49 /// A processed record from a block that was definitely not a Node 60 50 /// 61 51 /// Processing has to be fallible because the CAR can have totally-unused ··· 71 61 /// There's an alternative here, which would be to kick unprocessable blocks 72 62 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 73 63 /// surface the typed error later if needed by trying to reprocess. 74 - Processed(T), 64 + Processed(Bytes), 75 65 } 76 66 77 - impl<T: Processable> Processable for MaybeProcessedBlock<T> { 78 - /// TODO this is probably a little broken 79 - fn get_size(&self) -> usize { 80 - use std::{cmp::max, mem::size_of}; 81 - 82 - // enum is always as big as its biggest member? 83 - let base_size = max(size_of::<Vec<u8>>(), size_of::<T>()); 84 - 85 - let extra = match self { 86 - Self::Raw(bytes) => bytes.len(), 87 - Self::Processed(t) => t.get_size(), 88 - }; 89 - 90 - base_size + extra 91 - } 92 - } 93 - 94 - impl<T> MaybeProcessedBlock<T> { 95 - fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self { 96 - if Node::could_be(&data) { 67 + impl MaybeProcessedBlock { 68 + pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self { 69 + if MstNode::could_be(&data) { 97 70 MaybeProcessedBlock::Raw(data) 98 71 } else { 99 72 MaybeProcessedBlock::Processed(process(data)) 100 73 } 101 74 } 75 + pub(crate) fn len(&self) -> usize { 76 + match self { 77 + MaybeProcessedBlock::Raw(b) => b.len(), 78 + MaybeProcessedBlock::Processed(b) => b.len(), 79 + } 80 + } 81 + pub(crate) fn into_bytes(self) -> Bytes { 82 + match self { 83 + MaybeProcessedBlock::Raw(mut b) => { 84 + b.push(0x00); 85 + b 86 + } 87 + MaybeProcessedBlock::Processed(mut b) => { 88 + b.push(0x01); 89 + b 90 + } 91 + } 92 + } 93 + pub(crate) fn from_bytes(mut b: Bytes) -> Self { 94 + // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc 95 + let suffix = b.pop().unwrap(); 96 + if suffix == 0x00 { 97 + MaybeProcessedBlock::Raw(b) 98 + } else { 99 + MaybeProcessedBlock::Processed(b) 100 + } 101 + } 102 102 } 103 103 104 104 /// Read a CAR file, buffering blocks in memory or to disk 105 - pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 105 + pub enum Driver<R: AsyncRead + Unpin> { 106 106 /// All blocks fit within the memory limit 107 107 /// 108 108 /// You probably want to check the commit's signature. You can go ahead and 109 109 /// walk the MST right away. 110 - Memory(Commit, MemDriver<T>), 110 + Memory(Commit, MemDriver), 111 111 /// Blocks exceed the memory limit 112 112 /// 113 113 /// You'll need to provide a disk storage to continue. The commit will be 114 114 /// returned and can be validated only once all blocks are loaded. 115 - Disk(NeedDisk<R, T>), 115 + Disk(NeedDisk<R>), 116 + } 117 + 118 + /// Processor that just returns the raw blocks 119 + #[inline] 120 + pub fn noop(block: Bytes) -> Bytes { 121 + block 116 122 } 117 123 118 124 /// Builder-style driver setup 119 125 #[derive(Debug, Clone)] 120 126 pub struct DriverBuilder { 121 127 pub mem_limit_mb: usize, 128 + pub block_processor: fn(Bytes) -> Bytes, 122 129 } 123 130 124 131 impl Default for DriverBuilder { 125 132 fn default() -> Self { 126 - Self { mem_limit_mb: 16 } 133 + Self { 134 + mem_limit_mb: 16, 135 + block_processor: noop, 136 + } 127 137 } 128 138 } 129 139 ··· 135 145 /// Set the in-memory size limit, in MiB 136 146 /// 137 147 /// Default: 16 MiB 138 - pub fn with_mem_limit_mb(self, new_limit: usize) -> Self { 139 - Self { 140 - mem_limit_mb: new_limit, 141 - } 148 + pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 149 + self.mem_limit_mb = new_limit; 150 + self 142 151 } 152 + 143 153 /// Set the block processor 144 154 /// 145 155 /// Default: noop, raw blocks will be emitted 146 - pub fn with_block_processor<T: Processable>( 147 - self, 148 - p: fn(Vec<u8>) -> T, 149 - ) -> DriverBuilderWithProcessor<T> { 150 - DriverBuilderWithProcessor { 151 - mem_limit_mb: self.mem_limit_mb, 152 - block_processor: p, 153 - } 154 - } 155 - /// Begin processing an atproto MST from a CAR file 156 - pub async fn load_car<R: AsyncRead + Unpin>( 157 - &self, 158 - reader: R, 159 - ) -> Result<Driver<R, Vec<u8>>, DriveError> { 160 - Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 161 - } 162 - } 163 - 164 - /// Builder-style driver intermediate step 165 - /// 166 - /// start from `DriverBuilder` 167 - #[derive(Debug, Clone)] 168 - pub struct DriverBuilderWithProcessor<T: Processable> { 169 - pub mem_limit_mb: usize, 170 - pub block_processor: fn(Vec<u8>) -> T, 171 - } 172 - 173 - impl<T: Processable> DriverBuilderWithProcessor<T> { 174 - /// Set the in-memory size limit, in MiB 175 - /// 176 - /// Default: 16 MiB 177 - pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 178 - self.mem_limit_mb = new_limit; 156 + pub fn with_block_processor(mut self, new_processor: fn(Bytes) -> Bytes) -> DriverBuilder { 157 + self.block_processor = new_processor; 179 158 self 180 159 } 160 + 181 161 /// Begin processing an atproto MST from a CAR file 182 - pub async fn load_car<R: AsyncRead + Unpin>( 183 - &self, 184 - reader: R, 185 - ) -> Result<Driver<R, T>, DriveError> { 162 + pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 186 163 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 187 164 } 188 165 } 189 166 190 - impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 167 + impl<R: AsyncRead + Unpin> Driver<R> { 191 168 /// Begin processing an atproto MST from a CAR file 192 169 /// 193 170 /// Blocks will be loaded, processed, and buffered in memory. If the entire ··· 199 176 /// resumed by providing a `SqliteStorage` for on-disk block storage. 200 177 pub async fn load_car( 201 178 reader: R, 202 - process: fn(Vec<u8>) -> T, 179 + process: fn(Bytes) -> Bytes, 203 180 mem_limit_mb: usize, 204 - ) -> Result<Driver<R, T>, DriveError> { 181 + ) -> Result<Driver<R>, DriveError> { 205 182 let max_size = mem_limit_mb * 2_usize.pow(20); 206 183 let mut mem_blocks = HashMap::new(); 207 184 ··· 231 208 let maybe_processed = MaybeProcessedBlock::maybe(process, data); 232 209 233 210 // stash (maybe processed) blocks in memory as long as we have room 234 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 211 + mem_size += maybe_processed.len(); 235 212 mem_blocks.insert(cid, maybe_processed); 236 213 if mem_size >= max_size { 237 214 return Ok(Driver::Disk(NeedDisk { ··· 248 225 // all blocks loaded and we fit in memory! hopefully we found the commit... 249 226 let commit = commit.ok_or(DriveError::MissingCommit)?; 250 227 251 - let walker = Walker::new(commit.data); 228 + // the commit always must point to a Node; empty node => empty MST special case 229 + let root_node: MstNode = match mem_blocks 230 + .get(&commit.data) 231 + .ok_or(DriveError::MissingCommit)? 232 + { 233 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 234 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 235 + }; 236 + let walker = Walker::new(root_node); 252 237 253 238 Ok(Driver::Memory( 254 239 commit, ··· 275 260 /// work the init function will do. We can drop the CAR reader before walking, 276 261 /// so the sync/async boundaries become a little easier to work around. 277 262 #[derive(Debug)] 278 - pub struct MemDriver<T: Processable> { 279 - blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 263 + pub struct MemDriver { 264 + blocks: HashMap<Cid, MaybeProcessedBlock>, 280 265 walker: Walker, 281 - process: fn(Vec<u8>) -> T, 266 + process: fn(Bytes) -> Bytes, 282 267 } 283 268 284 - impl<T: Processable> MemDriver<T> { 269 + impl MemDriver { 285 270 /// Step through the record outputs, in rkey order 286 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 271 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 287 272 let mut out = Vec::with_capacity(n); 288 273 for _ in 0..n { 289 274 // walk as far as we can until we run out of blocks or find a record 290 - match self.walker.step(&mut self.blocks, self.process)? { 291 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 292 - Step::Finish => break, 293 - Step::Found { rkey, data } => { 294 - out.push((rkey, data)); 295 - continue; 296 - } 275 + let Some(Output { rkey, cid: _, data }) = 276 + self.walker.step(&mut self.blocks, self.process)? 277 + else { 278 + break; 297 279 }; 280 + out.push((rkey, data)); 298 281 } 299 - 300 282 if out.is_empty() { 301 283 Ok(None) 302 284 } else { ··· 306 288 } 307 289 308 290 /// A partially memory-loaded car file that needs disk spillover to continue 309 - pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> { 291 + pub struct NeedDisk<R: AsyncRead + Unpin> { 310 292 car: CarReader<R>, 311 293 root: Cid, 312 - process: fn(Vec<u8>) -> T, 294 + process: fn(Bytes) -> Bytes, 313 295 max_size: usize, 314 - mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 296 + mem_blocks: HashMap<Cid, MaybeProcessedBlock>, 315 297 pub commit: Option<Commit>, 316 298 } 317 299 318 - fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> { 319 - bincode::serde::encode_to_vec(v, bincode::config::standard()) 320 - } 321 - 322 - pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> { 323 - let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?; 324 - if n != bytes.len() { 325 - return Err(DecodeError::ExtraGarbage); 326 - } 327 - Ok(t) 328 - } 329 - 330 - impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> { 300 + impl<R: AsyncRead + Unpin> NeedDisk<R> { 331 301 pub async fn finish_loading( 332 302 mut self, 333 303 mut store: DiskStore, 334 - ) -> Result<(Commit, DiskDriver<T>), DriveError> { 304 + ) -> Result<(Commit, DiskDriver), DriveError> { 335 305 // move store in and back out so we can manage lifetimes 336 306 // dump mem blocks into the store 337 307 store = tokio::task::spawn(async move { 338 - let mut writer = store.get_writer()?; 339 - 340 308 let kvs = self 341 309 .mem_blocks 342 310 .into_iter() 343 - .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 311 + .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 344 312 345 - writer.put_many(kvs)?; 346 - writer.commit()?; 313 + store.put_many(kvs)?; 347 314 Ok::<_, DriveError>(store) 348 315 }) 349 316 .await??; 350 317 351 - let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 318 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1); 352 319 353 320 let store_worker = tokio::task::spawn_blocking(move || { 354 - let mut writer = store.get_writer()?; 355 - 356 321 while let Some(chunk) = rx.blocking_recv() { 357 322 let kvs = chunk 358 323 .into_iter() 359 - .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 360 - writer.put_many(kvs)?; 324 + .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 325 + store.put_many(kvs)?; 361 326 } 362 - 363 - writer.commit()?; 364 327 Ok::<_, DriveError>(store) 365 328 }); // await later 366 329 ··· 379 342 self.commit = Some(c); 380 343 continue; 381 344 } 345 + 346 + let data = Bytes::from(data); 347 + 382 348 // remaining possible types: node, record, other. optimistically process 383 349 // TODO: get the actual in-memory size to compute disk spill 384 350 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 385 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 351 + mem_size += maybe_processed.len(); 386 352 chunk.push((cid, maybe_processed)); 387 - if mem_size >= self.max_size { 353 + if mem_size >= (self.max_size / 2) { 388 354 // soooooo if we're setting the db cache to max_size and then letting 389 355 // multiple chunks in the queue that are >= max_size, then at any time 390 356 // we might be using some multiple of max_size? ··· 407 373 408 374 let commit = self.commit.ok_or(DriveError::MissingCommit)?; 409 375 410 - let walker = Walker::new(commit.data); 376 + // the commit always must point to a Node; empty node => empty MST special case 377 + let db_bytes = store 378 + .get(&commit.data.to_bytes()) 379 + .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))? 380 + .ok_or(DriveError::MissingCommit)?; 381 + 382 + let node: MstNode = match MaybeProcessedBlock::from_bytes(db_bytes.to_vec()) { 383 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 384 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?, 385 + }; 386 + let walker = Walker::new(node); 411 387 412 388 Ok(( 413 389 commit, ··· 425 401 } 426 402 427 403 /// MST walker that reads from disk instead of an in-memory hashmap 428 - pub struct DiskDriver<T: Clone> { 429 - process: fn(Vec<u8>) -> T, 404 + pub struct DiskDriver { 405 + process: fn(Bytes) -> Bytes, 430 406 state: Option<BigState>, 431 407 } 432 408 433 409 // for doctests only 434 410 #[doc(hidden)] 435 - pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> { 436 - use crate::process::noop; 411 + pub fn _get_fake_disk_driver() -> DiskDriver { 437 412 DiskDriver { 438 413 process: noop, 439 414 state: None, 440 415 } 441 416 } 442 417 443 - impl<T: Processable + Send + 'static> DiskDriver<T> { 418 + impl DiskDriver { 444 419 /// Walk the MST returning up to `n` rkey + record pairs 445 420 /// 446 421 /// ```no_run 447 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 422 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 448 423 /// # #[tokio::main] 449 424 /// # async fn main() -> Result<(), DriveError> { 450 425 /// # let mut disk_driver = _get_fake_disk_driver(); ··· 453 428 /// println!("{rkey}: size={}", record.len()); 454 429 /// } 455 430 /// } 456 - /// let store = disk_driver.reset_store().await?; 457 431 /// # Ok(()) 458 432 /// # } 459 433 /// ``` 460 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 434 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 461 435 let process = self.process; 462 436 463 437 // state should only *ever* be None transiently while inside here ··· 466 440 // the big pain here is that we don't want to leave self.state in an 467 441 // invalid state (None), so all the error paths have to make sure it 468 442 // comes out again. 469 - let (state, res) = tokio::task::spawn_blocking( 470 - move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 471 - let mut reader_res = state.store.get_reader(); 472 - let reader: &mut _ = match reader_res { 473 - Ok(ref mut r) => r, 474 - Err(ref mut e) => { 475 - // unfortunately we can't return the error directly because 476 - // (for some reason) it's attached to the lifetime of the 477 - // reader? 478 - // hack a mem::swap so we can get it out :/ 479 - let e_swapped = e.steal(); 480 - // the pain: `state` *has to* outlive the reader 481 - drop(reader_res); 482 - return (state, Err(e_swapped.into())); 483 - } 484 - }; 485 - 443 + let (state, res) = 444 + tokio::task::spawn_blocking(move || -> (BigState, Result<BlockChunk, DriveError>) { 486 445 let mut out = Vec::with_capacity(n); 487 446 488 447 for _ in 0..n { 489 448 // walk as far as we can until we run out of blocks or find a record 490 - let step = match state.walker.disk_step(reader, process) { 449 + let step = match state.walker.disk_step(&mut state.store, process) { 491 450 Ok(s) => s, 492 451 Err(e) => { 493 - // the pain: `state` *has to* outlive the reader 494 - drop(reader_res); 495 452 return (state, Err(e.into())); 496 453 } 497 454 }; 498 - match step { 499 - Step::Missing(cid) => { 500 - // the pain: `state` *has to* outlive the reader 501 - drop(reader_res); 502 - return (state, Err(DriveError::MissingBlock(cid))); 503 - } 504 - Step::Finish => break, 505 - Step::Found { rkey, data } => out.push((rkey, data)), 455 + let Some(Output { rkey, cid: _, data }) = step else { 456 + break; 506 457 }; 458 + out.push((rkey, data)); 507 459 } 508 - 509 - // `state` *has to* outlive the reader 510 - drop(reader_res); 511 460 512 461 (state, Ok::<_, DriveError>(out)) 513 - }, 514 - ) 515 - .await?; // on tokio JoinError, we'll be left with invalid state :( 462 + }) 463 + .await?; // on tokio JoinError, we'll be left with invalid state :( 516 464 517 465 // *must* restore state before dealing with the actual result 518 466 self.state = Some(state); ··· 529 477 fn read_tx_blocking( 530 478 &mut self, 531 479 n: usize, 532 - tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 533 - ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 480 + tx: mpsc::Sender<Result<BlockChunk, DriveError>>, 481 + ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> { 534 482 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 535 - let mut reader = match store.get_reader() { 536 - Ok(r) => r, 537 - Err(e) => return tx.blocking_send(Err(e.into())), 538 - }; 539 483 540 484 loop { 541 - let mut out: BlockChunk<T> = Vec::with_capacity(n); 485 + let mut out: BlockChunk = Vec::with_capacity(n); 542 486 543 487 for _ in 0..n { 544 488 // walk as far as we can until we run out of blocks or find a record 545 489 546 - let step = match walker.disk_step(&mut reader, self.process) { 490 + let step = match walker.disk_step(store, self.process) { 547 491 Ok(s) => s, 548 492 Err(e) => return tx.blocking_send(Err(e.into())), 549 493 }; 550 494 551 - match step { 552 - Step::Missing(cid) => { 553 - return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 554 - } 555 - Step::Finish => return Ok(()), 556 - Step::Found { rkey, data } => { 557 - out.push((rkey, data)); 558 - continue; 559 - } 495 + let Some(Output { rkey, cid: _, data }) = step else { 496 + break; 560 497 }; 498 + out.push((rkey, data)); 561 499 } 562 500 563 501 if out.is_empty() { ··· 580 518 /// benefit over just using `.next_chunk(n)`. 581 519 /// 582 520 /// ```no_run 583 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 521 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 584 522 /// # #[tokio::main] 585 523 /// # async fn main() -> Result<(), DriveError> { 586 524 /// # let mut disk_driver = _get_fake_disk_driver(); ··· 592 530 /// } 593 531 /// 594 532 /// } 595 - /// let store = join.await?.reset_store().await?; 596 533 /// # Ok(()) 597 534 /// # } 598 535 /// ``` ··· 600 537 mut self, 601 538 n: usize, 602 539 ) -> ( 603 - mpsc::Receiver<Result<BlockChunk<T>, DriveError>>, 540 + mpsc::Receiver<Result<BlockChunk, DriveError>>, 604 541 tokio::task::JoinHandle<Self>, 605 542 ) { 606 - let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1); 543 + let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1); 607 544 608 545 // sketch: this worker is going to be allowed to execute without a join handle 609 546 let chan_task = tokio::task::spawn_blocking(move || { ··· 614 551 }); 615 552 616 553 (rx, chan_task) 617 - } 618 - 619 - /// Reset the disk storage so it can be reused. You must call this. 620 - /// 621 - /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 622 - /// calls, that would be risky in an async context. For now you just have to 623 - /// carefully make sure you call it. 624 - /// 625 - /// The sqlite store is returned, so it can be reused for another 626 - /// `DiskDriver`. 627 - pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 628 - let BigState { store, .. } = self.state.take().expect("valid state"); 629 - Ok(store.reset().await?) 630 554 } 631 555 }
+19 -9
src/lib.rs
··· 27 27 28 28 match DriverBuilder::new() 29 29 .with_mem_limit_mb(10) 30 - .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 30 + .with_block_processor( 31 + |rec| rec.len().to_ne_bytes().to_vec().into() 32 + ) // block processing: just extract the raw record size 31 33 .load_car(reader) 32 34 .await? 33 35 { ··· 35 37 // if all blocks fit within memory 36 38 Driver::Memory(_commit, mut driver) => { 37 39 while let Some(chunk) = driver.next_chunk(256).await? { 38 - for (_rkey, size) in chunk { 40 + for (_rkey, bytes) in chunk { 41 + 42 + let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 43 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 44 + 39 45 total_size += size; 40 46 } 41 47 } ··· 49 55 let (_commit, mut driver) = paused.finish_loading(store).await?; 50 56 51 57 while let Some(chunk) = driver.next_chunk(256).await? { 52 - for (_rkey, size) in chunk { 58 + for (_rkey, bytes) in chunk { 59 + 60 + let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 61 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 62 + 53 63 total_size += size; 54 64 } 55 65 } 56 - 57 - // clean up the disk store (drop tables etc) 58 - driver.reset_store().await?; 59 66 } 60 67 }; 61 68 println!("sum of size of all records: {total_size}"); ··· 79 86 80 87 pub mod disk; 81 88 pub mod drive; 82 - pub mod process; 83 89 84 90 pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 - pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 91 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 86 92 pub use mst::Commit; 87 - pub use process::Processable; 93 + 94 + // pub use bytes::Bytes; 95 + pub type Bytes = Vec<u8>; 96 + 97 + pub(crate) use hashbrown::HashMap;
+151 -28
src/mst.rs
··· 3 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 5 6 - use ipld_core::cid::Cid; 6 + use cid::Cid; 7 7 use serde::Deserialize; 8 + use sha2::{Digest, Sha256}; 8 9 9 10 /// The top-level data object in a repository's tree is a signed commit. 10 11 #[derive(Debug, Deserialize)] ··· 33 34 pub prev: Option<Cid>, 34 35 /// cryptographic signature of this commit, as raw bytes 35 36 #[serde(with = "serde_bytes")] 36 - pub sig: Vec<u8>, 37 + pub sig: serde_bytes::ByteBuf, 38 + } 39 + 40 + use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor}; 41 + use std::fmt; 42 + 43 + pub type Depth = u32; 44 + 45 + #[inline(always)] 46 + pub fn atproto_mst_depth(key: &str) -> Depth { 47 + // 128 bits oughta be enough: https://bsky.app/profile/retr0.id/post/3jwwbf4izps24 48 + u128::from_be_bytes(Sha256::digest(key).split_at(16).0.try_into().unwrap()).leading_zeros() / 2 49 + } 50 + 51 + #[derive(Debug)] 52 + pub(crate) struct MstNode { 53 + pub depth: Option<Depth>, // known for nodes with entries (required for root) 54 + pub things: Vec<NodeThing>, 55 + } 56 + 57 + #[derive(Debug)] 58 + pub(crate) struct NodeThing { 59 + pub(crate) cid: Cid, 60 + pub(crate) kind: ThingKind, 61 + } 62 + 63 + #[derive(Debug)] 64 + pub(crate) enum ThingKind { 65 + Tree, 66 + Value { rkey: String }, 37 67 } 38 68 39 - /// MST node data schema 40 - #[derive(Debug, Deserialize, PartialEq)] 41 - #[serde(deny_unknown_fields)] 42 - pub(crate) struct Node { 43 - /// link to sub-tree Node on a lower level and with all keys sorting before 44 - /// keys at this node 45 - #[serde(rename = "l")] 46 - pub left: Option<Cid>, 47 - /// ordered list of TreeEntry objects 48 - /// 49 - /// atproto MSTs have a fanout of 4, so there can be max 4 entries. 50 - #[serde(rename = "e")] 51 - pub entries: Vec<Entry>, // maybe we can do [Option<Entry>; 4]? 69 + impl<'de> Deserialize<'de> for MstNode { 70 + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 71 + where 72 + D: Deserializer<'de>, 73 + { 74 + struct NodeVisitor; 75 + impl<'de> Visitor<'de> for NodeVisitor { 76 + type Value = MstNode; 77 + 78 + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 79 + formatter.write_str("struct MstNode") 80 + } 81 + 82 + fn visit_map<V>(self, mut map: V) -> Result<MstNode, V::Error> 83 + where 84 + V: MapAccess<'de>, 85 + { 86 + let mut found_left = false; 87 + let mut left = None; 88 + let mut found_entries = false; 89 + let mut things = Vec::new(); 90 + let mut depth = None; 91 + 92 + while let Some(key) = map.next_key()? { 93 + match key { 94 + "l" => { 95 + if found_left { 96 + return Err(de::Error::duplicate_field("l")); 97 + } 98 + found_left = true; 99 + if let Some(cid) = map.next_value()? { 100 + left = Some(NodeThing { 101 + cid, 102 + kind: ThingKind::Tree, 103 + }); 104 + } 105 + } 106 + "e" => { 107 + if found_entries { 108 + return Err(de::Error::duplicate_field("e")); 109 + } 110 + found_entries = true; 111 + 112 + let mut prefix: Vec<u8> = vec![]; 113 + 114 + for entry in map.next_value::<Vec<Entry>>()? { 115 + let mut rkey: Vec<u8> = vec![]; 116 + let pre_checked = 117 + prefix.get(..entry.prefix_len).ok_or_else(|| { 118 + de::Error::invalid_value( 119 + Unexpected::Bytes(&prefix), 120 + &"a prefix at least as long as the prefix_len", 121 + ) 122 + })?; 123 + 124 + rkey.extend_from_slice(pre_checked); 125 + rkey.extend_from_slice(&entry.keysuffix); 126 + 127 + let rkey_s = String::from_utf8(rkey.clone()).map_err(|_| { 128 + de::Error::invalid_value( 129 + Unexpected::Bytes(&rkey), 130 + &"a valid utf-8 rkey", 131 + ) 132 + })?; 133 + 134 + let key_depth = atproto_mst_depth(&rkey_s); 135 + if depth.is_none() { 136 + depth = Some(key_depth); 137 + } else if Some(key_depth) != depth { 138 + return Err(de::Error::invalid_value( 139 + Unexpected::Bytes(&prefix), 140 + &"all rkeys to have equal MST depth", 141 + )); 142 + } 143 + 144 + things.push(NodeThing { 145 + cid: entry.value, 146 + kind: ThingKind::Value { rkey: rkey_s }, 147 + }); 148 + 149 + if let Some(cid) = entry.tree { 150 + things.push(NodeThing { 151 + cid, 152 + kind: ThingKind::Tree, 153 + }); 154 + } 155 + 156 + prefix = rkey; 157 + } 158 + } 159 + f => return Err(de::Error::unknown_field(f, NODE_FIELDS)), 160 + } 161 + } 162 + if !found_left { 163 + return Err(de::Error::missing_field("l")); 164 + } 165 + if !found_entries { 166 + return Err(de::Error::missing_field("e")); 167 + } 168 + 169 + things.reverse(); 170 + if let Some(l) = left { 171 + things.push(l); 172 + } 173 + 174 + Ok(MstNode { depth, things }) 175 + } 176 + } 177 + 178 + const NODE_FIELDS: &[&str] = &["l", "e"]; 179 + deserializer.deserialize_struct("MstNode", NODE_FIELDS, NodeVisitor) 180 + } 52 181 } 53 182 54 - impl Node { 183 + impl MstNode { 184 + pub(crate) fn is_empty(&self) -> bool { 185 + self.things.is_empty() 186 + } 55 187 /// test if a block could possibly be a node 56 188 /// 57 189 /// we can't eagerly decode records except where we're *sure* they cannot be ··· 62 194 /// so if a block *could be* a node, any record converter must postpone 63 195 /// processing. if it turns out it happens to be a very node-looking record, 64 196 /// well, sorry, it just has to only be processed later when that's known. 197 + #[inline(always)] 65 198 pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 66 199 const NODE_FINGERPRINT: [u8; 3] = [ 67 200 0xA2, // map length 2 (for "l" and "e" keys) ··· 76 209 .map(|b| b & 0b1110_0000 == 0x80) 77 210 .unwrap_or(false) 78 211 } 79 - 80 - /// Check if a node has any entries 81 - /// 82 - /// An empty repository with no records is represented as a single MST node 83 - /// with an empty array of entries. This is the only situation in which a 84 - /// tree may contain an empty leaf node which does not either contain keys 85 - /// ("entries") or point to a sub-tree containing entries. 86 - pub(crate) fn is_empty(&self) -> bool { 87 - self.left.is_none() && self.entries.is_empty() 88 - } 89 212 } 90 213 91 214 /// TreeEntry object ··· 96 219 #[serde(rename = "p")] 97 220 pub prefix_len: usize, 98 221 /// remainder of key for this TreeEntry, after "prefixlen" have been removed 99 - #[serde(rename = "k", with = "serde_bytes")] 100 - pub keysuffix: Vec<u8>, // can we String this here? 222 + #[serde(rename = "k")] 223 + pub keysuffix: serde_bytes::ByteBuf, 101 224 /// link to the record data (CBOR) for this entry 102 225 #[serde(rename = "v")] 103 226 pub value: Cid,
-108
src/process.rs
··· 1 - /*! 2 - Record processor function output trait 3 - 4 - The return type must satisfy the `Processable` trait, which requires: 5 - 6 - - `Clone` because two rkeys can refer to the same record by CID, which may 7 - only appear once in the CAR file. 8 - - `Serialize + DeserializeOwned` so it can be spilled to disk. 9 - 10 - One required function must be implemented, `get_size()`: this should return the 11 - approximate total off-stack size of the type. (the on-stack size will be added 12 - automatically via `std::mem::get_size`). 13 - 14 - Note that it is **not guaranteed** that the `process` function will run on a 15 - block before storing it in memory or on disk: it's not possible to know if a 16 - block is a record without actually walking the MST, so the best we can do is 17 - apply `process` to any block that we know *cannot* be an MST node, and otherwise 18 - store the raw block bytes. 19 - 20 - Here's a silly processing function that just collects 'eyy's found in the raw 21 - record bytes 22 - 23 - ``` 24 - # use repo_stream::Processable; 25 - # use serde::{Serialize, Deserialize}; 26 - #[derive(Debug, Clone, Serialize, Deserialize)] 27 - struct Eyy(usize, String); 28 - 29 - impl Processable for Eyy { 30 - fn get_size(&self) -> usize { 31 - // don't need to compute the usize, it's on the stack 32 - self.1.capacity() // in-mem size from the string's capacity, in bytes 33 - } 34 - } 35 - 36 - fn process(raw: Vec<u8>) -> Vec<Eyy> { 37 - let mut out = Vec::new(); 38 - let to_find = "eyy".as_bytes(); 39 - for i in 0..(raw.len() - 3) { 40 - if &raw[i..(i+3)] == to_find { 41 - out.push(Eyy(i, "eyy".to_string())); 42 - } 43 - } 44 - out 45 - } 46 - ``` 47 - 48 - The memory sizing stuff is a little sketch but probably at least approximately 49 - works. 50 - */ 51 - 52 - use serde::{Serialize, de::DeserializeOwned}; 53 - 54 - /// Output trait for record processing 55 - pub trait Processable: Clone + Serialize + DeserializeOwned { 56 - /// Any additional in-memory size taken by the processed type 57 - /// 58 - /// Do not include stack size (`std::mem::size_of`) 59 - fn get_size(&self) -> usize; 60 - } 61 - 62 - /// Processor that just returns the raw blocks 63 - #[inline] 64 - pub fn noop(block: Vec<u8>) -> Vec<u8> { 65 - block 66 - } 67 - 68 - impl Processable for u8 { 69 - fn get_size(&self) -> usize { 70 - 0 71 - } 72 - } 73 - 74 - impl Processable for usize { 75 - fn get_size(&self) -> usize { 76 - 0 // no additional space taken, just its stack size (newtype is free) 77 - } 78 - } 79 - 80 - impl Processable for String { 81 - fn get_size(&self) -> usize { 82 - self.capacity() 83 - } 84 - } 85 - 86 - impl<Item: Sized + Processable> Processable for Vec<Item> { 87 - fn get_size(&self) -> usize { 88 - let slot_size = std::mem::size_of::<Item>(); 89 - let direct_size = slot_size * self.capacity(); 90 - let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 91 - direct_size + items_referenced_size 92 - } 93 - } 94 - 95 - impl<Item: Processable> Processable for Option<Item> { 96 - fn get_size(&self) -> usize { 97 - self.as_ref().map(|item| item.get_size()).unwrap_or(0) 98 - } 99 - } 100 - 101 - impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 102 - fn get_size(&self) -> usize { 103 - match self { 104 - Ok(item) => item.get_size(), 105 - Err(err) => err.get_size(), 106 - } 107 - } 108 - }
+105 -345
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::disk::SqliteReader; 4 - use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 - use crate::mst::Node; 6 - use crate::process::Processable; 7 - use ipld_core::cid::Cid; 8 - use sha2::{Digest, Sha256}; 9 - use std::collections::HashMap; 3 + use crate::mst::{Depth, MstNode, NodeThing, ThingKind}; 4 + use crate::{Bytes, HashMap, disk::DiskStore, drive::MaybeProcessedBlock}; 5 + use cid::Cid; 10 6 use std::convert::Infallible; 11 7 12 8 /// Errors that can happen while walking ··· 19 15 #[error("Action node error: {0}")] 20 16 MstError(#[from] MstError), 21 17 #[error("storage error: {0}")] 22 - StorageError(#[from] rusqlite::Error), 23 - #[error("Decode error: {0}")] 24 - DecodeError(#[from] DecodeError), 18 + StorageError(#[from] fjall::Error), 19 + #[error("block not found: {0}")] 20 + MissingBlock(Cid), 25 21 } 26 22 27 23 /// Errors from invalid Rkeys 28 24 #[derive(Debug, PartialEq, thiserror::Error)] 29 25 pub enum MstError { 30 - #[error("Failed to compute an rkey due to invalid prefix_len")] 31 - EntryPrefixOutOfbounds, 32 - #[error("RKey was not utf-8")] 33 - EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 34 26 #[error("Nodes cannot be empty (except for an entirely empty MST)")] 35 27 EmptyNode, 36 - #[error("Found an entry with rkey at the wrong depth")] 37 - WrongDepth, 38 - #[error("Lost track of our depth (possible bug?)")] 39 - LostDepth, 28 + #[error("Expected node to be at depth {expected}, but it was at {depth}")] 29 + WrongDepth { depth: Depth, expected: Depth }, 40 30 #[error("MST depth underflow: depth-0 node with child trees")] 41 31 DepthUnderflow, 42 - #[error("Encountered an rkey out of order while walking the MST")] 43 - RkeyOutOfOrder, 32 + #[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")] 33 + RkeyOutOfOrder { prev: String, rkey: String }, 44 34 } 45 35 46 36 /// Walker outputs 47 - #[derive(Debug)] 48 - pub enum Step<T> { 49 - /// We needed this CID but it's not in the block store 50 - Missing(Cid), 51 - /// Reached the end of the MST! yay! 52 - Finish, 53 - /// A record was found! 54 - Found { rkey: String, data: T }, 55 - } 56 - 57 - #[derive(Debug, Clone, PartialEq)] 58 - enum Need { 59 - Node { depth: Depth, cid: Cid }, 60 - Record { rkey: String, cid: Cid }, 61 - } 62 - 63 - #[derive(Debug, Clone, Copy, PartialEq)] 64 - enum Depth { 65 - Root, 66 - Depth(u32), 67 - } 68 - 69 - impl Depth { 70 - fn from_key(key: &[u8]) -> Self { 71 - let mut zeros = 0; 72 - for byte in Sha256::digest(key) { 73 - let leading = byte.leading_zeros(); 74 - zeros += leading; 75 - if leading < 8 { 76 - break; 77 - } 78 - } 79 - Self::Depth(zeros / 2) // truncating divide (rounds down) 80 - } 81 - fn next_expected(&self) -> Result<Option<u32>, MstError> { 82 - match self { 83 - Self::Root => Ok(None), 84 - Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 85 - } 86 - } 87 - } 88 - 89 - fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 - // empty nodes are not allowed in the MST except in an empty MST 91 - if node.is_empty() { 92 - if parent_depth == Depth::Root { 93 - return Ok(()); // empty mst, nothing to push 94 - } else { 95 - return Err(MstError::EmptyNode); 96 - } 97 - } 98 - 99 - let mut entries = Vec::with_capacity(node.entries.len()); 100 - let mut prefix = vec![]; 101 - let mut this_depth = parent_depth.next_expected()?; 102 - 103 - for entry in &node.entries { 104 - let mut rkey = vec![]; 105 - let pre_checked = prefix 106 - .get(..entry.prefix_len) 107 - .ok_or(MstError::EntryPrefixOutOfbounds)?; 108 - rkey.extend_from_slice(pre_checked); 109 - rkey.extend_from_slice(&entry.keysuffix); 110 - 111 - let Depth::Depth(key_depth) = Depth::from_key(&rkey) else { 112 - return Err(MstError::WrongDepth); 113 - }; 114 - 115 - // this_depth is `none` if we are the deepest child (directly below root) 116 - // in that case we accept whatever highest depth is claimed 117 - let expected_depth = match this_depth { 118 - Some(d) => d, 119 - None => { 120 - this_depth = Some(key_depth); 121 - key_depth 122 - } 123 - }; 124 - 125 - // all keys we find should be this depth 126 - if key_depth != expected_depth { 127 - return Err(MstError::DepthUnderflow); 128 - } 129 - 130 - prefix = rkey.clone(); 131 - 132 - entries.push(Need::Record { 133 - rkey: String::from_utf8(rkey)?, 134 - cid: entry.value, 135 - }); 136 - if let Some(ref tree) = entry.tree { 137 - entries.push(Need::Node { 138 - depth: Depth::Depth(key_depth), 139 - cid: *tree, 140 - }); 141 - } 142 - } 143 - 144 - entries.reverse(); 145 - stack.append(&mut entries); 146 - 147 - let d = this_depth.ok_or(MstError::LostDepth)?; 148 - 149 - if let Some(tree) = node.left { 150 - stack.push(Need::Node { 151 - depth: Depth::Depth(d), 152 - cid: tree, 153 - }); 154 - } 155 - Ok(()) 37 + #[derive(Debug, PartialEq)] 38 + pub struct Output { 39 + pub rkey: String, 40 + pub cid: Cid, 41 + pub data: Bytes, 156 42 } 157 43 158 44 /// Traverser of an atproto MST ··· 160 46 /// Walks the tree from left-to-right in depth-first order 161 47 #[derive(Debug)] 162 48 pub struct Walker { 163 - stack: Vec<Need>, 164 - prev: String, 49 + prev_rkey: String, 50 + root_depth: Depth, 51 + todo: Vec<Vec<NodeThing>>, 165 52 } 166 53 167 54 impl Walker { 168 - pub fn new(tree_root_cid: Cid) -> Self { 55 + pub fn new(root_node: MstNode) -> Self { 169 56 Self { 170 - stack: vec![Need::Node { 171 - depth: Depth::Root, 172 - cid: tree_root_cid, 173 - }], 174 - prev: "".to_string(), 57 + prev_rkey: "".to_string(), 58 + root_depth: root_node.depth.unwrap_or(0), // empty root node = empty mst 59 + todo: vec![root_node.things], 175 60 } 176 61 } 177 62 178 - /// Advance through nodes until we find a record or can't go further 179 - pub fn step<T: Processable>( 63 + fn mpb_step( 180 64 &mut self, 181 - blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 182 - process: impl Fn(Vec<u8>) -> T, 183 - ) -> Result<Step<T>, WalkError> { 184 - loop { 185 - let Some(need) = self.stack.last_mut() else { 186 - log::trace!("tried to walk but we're actually done."); 187 - return Ok(Step::Finish); 188 - }; 189 - 190 - match need { 191 - &mut Need::Node { depth, cid } => { 192 - log::trace!("need node {cid:?}"); 193 - let Some(block) = blocks.remove(&cid) else { 194 - log::trace!("node not found, resting"); 195 - return Ok(Step::Missing(cid)); 196 - }; 197 - 198 - let MaybeProcessedBlock::Raw(data) = block else { 199 - return Err(WalkError::BadCommitFingerprint); 200 - }; 201 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 202 - .map_err(WalkError::BadCommit)?; 203 - 204 - // found node, make sure we remember 205 - self.stack.pop(); 65 + kind: ThingKind, 66 + cid: Cid, 67 + mpb: &MaybeProcessedBlock, 68 + process: impl Fn(Bytes) -> Bytes, 69 + ) -> Result<Option<Output>, WalkError> { 70 + match kind { 71 + ThingKind::Value { rkey } => { 72 + let data = match mpb { 73 + MaybeProcessedBlock::Raw(data) => process(data.clone()), 74 + MaybeProcessedBlock::Processed(t) => t.clone(), 75 + }; 206 76 207 - // queue up work on the found node next 208 - push_from_node(&mut self.stack, &node, depth)?; 77 + if rkey <= self.prev_rkey { 78 + return Err(WalkError::MstError(MstError::RkeyOutOfOrder { 79 + rkey, 80 + prev: self.prev_rkey.clone(), 81 + })); 209 82 } 210 - Need::Record { rkey, cid } => { 211 - log::trace!("need record {cid:?}"); 212 - // note that we cannot *remove* a record block, sadly, since 213 - // there can be multiple rkeys pointing to the same cid. 214 - let Some(data) = blocks.get_mut(cid) else { 215 - return Ok(Step::Missing(*cid)); 216 - }; 217 - let rkey = rkey.clone(); 218 - let data = match data { 219 - MaybeProcessedBlock::Raw(data) => process(data.to_vec()), 220 - MaybeProcessedBlock::Processed(t) => t.clone(), 221 - }; 222 - 223 - // found node, make sure we remember 224 - self.stack.pop(); 225 - 226 - // rkeys *must* be in order or else the tree is invalid (or 227 - // we have a bug) 228 - if rkey <= self.prev { 229 - return Err(MstError::RkeyOutOfOrder)?; 230 - } 231 - self.prev = rkey.clone(); 83 + self.prev_rkey = rkey.clone(); 232 84 233 - return Ok(Step::Found { rkey, data }); 234 - } 85 + log::trace!("val @ {rkey}"); 86 + Ok(Some(Output { rkey, cid, data })) 235 87 } 236 - } 237 - } 88 + ThingKind::Tree => { 89 + let MaybeProcessedBlock::Raw(data) = mpb else { 90 + return Err(WalkError::BadCommitFingerprint); 91 + }; 238 92 239 - /// blocking!!!!!! 240 - pub fn disk_step<T: Processable>( 241 - &mut self, 242 - reader: &mut SqliteReader, 243 - process: impl Fn(Vec<u8>) -> T, 244 - ) -> Result<Step<T>, WalkError> { 245 - loop { 246 - let Some(need) = self.stack.last_mut() else { 247 - log::trace!("tried to walk but we're actually done."); 248 - return Ok(Step::Finish); 249 - }; 93 + let node: MstNode = 94 + serde_ipld_dagcbor::from_slice(data).map_err(WalkError::BadCommit)?; 250 95 251 - match need { 252 - &mut Need::Node { depth, cid } => { 253 - let cid_bytes = cid.to_bytes(); 254 - log::trace!("need node {cid:?}"); 255 - let Some(block_bytes) = reader.get(cid_bytes)? else { 256 - log::trace!("node not found, resting"); 257 - return Ok(Step::Missing(cid)); 258 - }; 96 + if node.is_empty() { 97 + return Err(WalkError::MstError(MstError::EmptyNode)); 98 + } 259 99 260 - let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 261 - 262 - let MaybeProcessedBlock::Raw(data) = block else { 263 - return Err(WalkError::BadCommitFingerprint); 264 - }; 265 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 266 - .map_err(WalkError::BadCommit)?; 267 - 268 - // found node, make sure we remember 269 - self.stack.pop(); 270 - 271 - // queue up work on the found node next 272 - push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?; 100 + let current_depth = self.root_depth - (self.todo.len() - 1) as u32; 101 + let next_depth = current_depth 102 + .checked_sub(1) 103 + .ok_or(MstError::DepthUnderflow)?; 104 + if let Some(d) = node.depth 105 + && d != next_depth 106 + { 107 + return Err(WalkError::MstError(MstError::WrongDepth { 108 + depth: d, 109 + expected: next_depth, 110 + })); 273 111 } 274 - Need::Record { rkey, cid } => { 275 - log::trace!("need record {cid:?}"); 276 - let cid_bytes = cid.to_bytes(); 277 - let Some(data_bytes) = reader.get(cid_bytes)? else { 278 - log::trace!("record block not found, resting"); 279 - return Ok(Step::Missing(*cid)); 280 - }; 281 - let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?; 282 - let rkey = rkey.clone(); 283 - let data = match data { 284 - MaybeProcessedBlock::Raw(data) => process(data), 285 - MaybeProcessedBlock::Processed(t) => t.clone(), 286 - }; 287 112 288 - // found node, make sure we remember 289 - self.stack.pop(); 290 - 291 - log::trace!("emitting a block as a step. depth={}", self.stack.len()); 292 - 293 - // rkeys *must* be in order or else the tree is invalid (or 294 - // we have a bug) 295 - if rkey <= self.prev { 296 - return Err(MstError::RkeyOutOfOrder)?; 297 - } 298 - self.prev = rkey.clone(); 299 - 300 - return Ok(Step::Found { rkey, data }); 301 - } 113 + log::trace!("node into depth {next_depth}"); 114 + self.todo.push(node.things); 115 + Ok(None) 302 116 } 303 117 } 304 118 } 305 - } 306 119 307 - #[cfg(test)] 308 - mod test { 309 - use super::*; 310 - 311 - fn cid1() -> Cid { 312 - "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 313 - .parse() 314 - .unwrap() 120 + #[inline(always)] 121 + fn next_todo(&mut self) -> Option<NodeThing> { 122 + while let Some(last) = self.todo.last_mut() { 123 + let Some(thing) = last.pop() else { 124 + self.todo.pop(); 125 + continue; 126 + }; 127 + return Some(thing); 128 + } 129 + None 315 130 } 316 131 317 - #[test] 318 - fn test_depth_spec_0() { 319 - let d = Depth::from_key(b"2653ae71"); 320 - assert_eq!(d, Depth::Depth(0)) 321 - } 322 - 323 - #[test] 324 - fn test_depth_spec_1() { 325 - let d = Depth::from_key(b"blue"); 326 - assert_eq!(d, Depth::Depth(1)) 327 - } 328 - 329 - #[test] 330 - fn test_depth_spec_4() { 331 - let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec"); 332 - assert_eq!(d, Depth::Depth(4)) 333 - } 334 - 335 - #[test] 336 - fn test_depth_spec_8() { 337 - let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c"); 338 - assert_eq!(d, Depth::Depth(8)) 339 - } 340 - 341 - #[test] 342 - fn test_depth_ietf_draft_0() { 343 - let d = Depth::from_key(b"key1"); 344 - assert_eq!(d, Depth::Depth(0)) 345 - } 346 - 347 - #[test] 348 - fn test_depth_ietf_draft_1() { 349 - let d = Depth::from_key(b"key7"); 350 - assert_eq!(d, Depth::Depth(1)) 351 - } 352 - 353 - #[test] 354 - fn test_depth_ietf_draft_4() { 355 - let d = Depth::from_key(b"key515"); 356 - assert_eq!(d, Depth::Depth(4)) 357 - } 358 - 359 - #[test] 360 - fn test_depth_interop() { 361 - // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 362 - for (k, expected) in [ 363 - ("", 0), 364 - ("asdf", 0), 365 - ("blue", 1), 366 - ("2653ae71", 0), 367 - ("88bfafc7", 2), 368 - ("2a92d355", 4), 369 - ("884976f5", 6), 370 - ("app.bsky.feed.post/454397e440ec", 4), 371 - ("app.bsky.feed.post/9adeb165882c", 8), 372 - ] { 373 - let d = Depth::from_key(k.as_bytes()); 374 - assert_eq!(d, Depth::Depth(expected), "key: {}", k); 132 + /// Advance through nodes until we find a record or can't go further 133 + pub fn step( 134 + &mut self, 135 + blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 136 + process: impl Fn(Bytes) -> Bytes, 137 + ) -> Result<Option<Output>, WalkError> { 138 + while let Some(NodeThing { cid, kind }) = self.next_todo() { 139 + let Some(mpb) = blocks.get(&cid) else { 140 + return Err(WalkError::MissingBlock(cid)); 141 + }; 142 + if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? { 143 + return Ok(Some(out)); 144 + } 375 145 } 376 - } 377 - 378 - #[test] 379 - fn test_push_empty_fails() { 380 - let empty_node = Node { 381 - left: None, 382 - entries: vec![], 383 - }; 384 - let mut stack = vec![]; 385 - let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 386 - assert_eq!(err, Err(MstError::EmptyNode)); 146 + Ok(None) 387 147 } 388 148 389 - #[test] 390 - fn test_push_one_node() { 391 - let node = Node { 392 - left: Some(cid1()), 393 - entries: vec![], 394 - }; 395 - let mut stack = vec![]; 396 - push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 397 - assert_eq!( 398 - stack.last(), 399 - Some(Need::Node { 400 - depth: Depth::Depth(3), 401 - cid: cid1() 402 - }) 403 - .as_ref() 404 - ); 149 + /// blocking!!!!!! 150 + pub fn disk_step( 151 + &mut self, 152 + blocks: &mut DiskStore, 153 + process: impl Fn(Bytes) -> Bytes, 154 + ) -> Result<Option<Output>, WalkError> { 155 + while let Some(NodeThing { cid, kind }) = self.next_todo() { 156 + let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 157 + return Err(WalkError::MissingBlock(cid)); 158 + }; 159 + let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 160 + if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? { 161 + return Ok(Some(out)); 162 + } 163 + } 164 + Ok(None) 405 165 } 406 166 }
+212
tests/mst-depth.rs
··· 1 + // use repo_stream::Driver; 2 + use repo_stream::mst::atproto_mst_depth; 3 + 4 + // https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/example_keys.txt 5 + const INTEROP_EXAMPLE_KEYS: &str = "\ 6 + A0/374913 7 + A1/076595 8 + A2/827942 9 + A3/578971 10 + A4/055903 11 + A5/518415 12 + B0/601692 13 + B1/986427 14 + B2/827649 15 + B3/095483 16 + B4/774183 17 + B5/116729 18 + C0/451630 19 + C1/438573 20 + C2/014073 21 + C3/564755 22 + C4/134079 23 + C5/141153 24 + D0/952776 25 + D1/834852 26 + D2/269196 27 + D3/038750 28 + D4/052059 29 + D5/563177 30 + E0/670489 31 + E1/091396 32 + E2/819540 33 + E3/391311 34 + E4/820614 35 + E5/512478 36 + F0/697858 37 + F1/085263 38 + F2/483591 39 + F3/409933 40 + F4/789697 41 + F5/271416 42 + G0/765327 43 + G1/209912 44 + G2/611528 45 + G3/649394 46 + G4/585887 47 + G5/298495 48 + H0/131238 49 + H1/566929 50 + H2/618272 51 + H3/500151 52 + H4/841548 53 + H5/642354 54 + I0/536928 55 + I1/525517 56 + I2/800680 57 + I3/818503 58 + I4/561177 59 + I5/010047 60 + J0/453243 61 + J1/217783 62 + J2/960389 63 + J3/501274 64 + J4/042054 65 + J5/743154 66 + K0/125271 67 + K1/317361 68 + K2/453868 69 + K3/214010 70 + K4/164720 71 + K5/177856 72 + L0/502889 73 + L1/574576 74 + L2/596333 75 + L3/683657 76 + L4/724989 77 + L5/093883 78 + M0/141744 79 + M1/643368 80 + M2/919782 81 + M3/836327 82 + M4/177463 83 + M5/563354 84 + N0/370604 85 + N1/563732 86 + N2/177587 87 + N3/678428 88 + N4/599183 89 + N5/567564 90 + O0/523870 91 + O1/052141 92 + O2/037651 93 + O3/773808 94 + O4/140952 95 + O5/318605 96 + P0/133157 97 + P1/394633 98 + P2/521462 99 + P3/493488 100 + P4/908754 101 + P5/109455 102 + Q0/835234 103 + Q1/131542 104 + Q2/680035 105 + Q3/253381 106 + Q4/019053 107 + Q5/658167 108 + R0/129386 109 + R1/363149 110 + R2/742766 111 + R3/039235 112 + R4/482275 113 + R5/817312 114 + S0/340283 115 + S1/561525 116 + S2/914574 117 + S3/909434 118 + S4/789708 119 + S5/803866 120 + T0/255204 121 + T1/716687 122 + T2/256231 123 + T3/054247 124 + T4/419247 125 + T5/509584 126 + U0/298296 127 + U1/851680 128 + U2/342856 129 + U3/597327 130 + U4/311686 131 + U5/030156 132 + V0/221100 133 + V1/741554 134 + V2/267990 135 + V3/674163 136 + V4/739931 137 + V5/573718 138 + W0/034202 139 + W1/697411 140 + W2/460313 141 + W3/189647 142 + W4/847299 143 + W5/648086 144 + X0/287498 145 + X1/044093 146 + X2/613770 147 + X3/577587 148 + X4/779391 149 + X5/339246 150 + Y0/986350 151 + Y1/044567 152 + Y2/478044 153 + Y3/757097 154 + Y4/396913 155 + Y5/802264 156 + Z0/425878 157 + Z1/127557 158 + Z2/441927 159 + Z3/064474 160 + Z4/888344 161 + Z5/977983"; 162 + 163 + #[test] 164 + fn test_interop_example_keys() { 165 + for key in INTEROP_EXAMPLE_KEYS.split('\n') { 166 + let expected: u32 = key.chars().nth(1).unwrap().to_digit(16).unwrap(); 167 + let computed: u32 = atproto_mst_depth(key); 168 + assert_eq!(computed, expected); 169 + } 170 + } 171 + 172 + #[test] 173 + fn test_iterop_key_heights() { 174 + // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 175 + for (key, expected) in [ 176 + ("", 0), 177 + ("asdf", 0), 178 + ("blue", 1), 179 + ("2653ae71", 0), 180 + ("88bfafc7", 2), 181 + ("2a92d355", 4), 182 + ("884976f5", 6), 183 + ("app.bsky.feed.post/454397e440ec", 4), 184 + ("app.bsky.feed.post/9adeb165882c", 8), 185 + ] { 186 + let computed = atproto_mst_depth(key); 187 + assert_eq!(computed, expected); 188 + } 189 + } 190 + 191 + #[test] 192 + fn test_spec_example_keys() { 193 + // https://atproto.com/specs/repository#mst-structure 194 + for (key, expected) in [ 195 + ("2653ae71", 0), 196 + ("blue", 1), 197 + ("app.bsky.feed.post/454397e440ec", 4), 198 + ("app.bsky.feed.post/9adeb165882c", 8), 199 + ] { 200 + let computed = atproto_mst_depth(key); 201 + assert_eq!(computed, expected); 202 + } 203 + } 204 + 205 + #[test] 206 + fn test_ietf_example_keys() { 207 + // https://atproto.com/specs/repository#mst-structure 208 + for (key, expected) in [("key1", 0), ("key7", 1), ("key515", 4)] { 209 + let computed = atproto_mst_depth(key); 210 + assert_eq!(computed, expected); 211 + } 212 + }
+12 -4
tests/non-huge-cars.rs
··· 12 12 expected_sum: usize, 13 13 expect_profile: bool, 14 14 ) { 15 - let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 16 - .await 17 - .unwrap() 15 + let mut driver = match Driver::load_car( 16 + bytes, 17 + |block| block.len().to_ne_bytes().to_vec().into(), 18 + 10, /* MiB */ 19 + ) 20 + .await 21 + .unwrap() 18 22 { 19 23 Driver::Memory(_commit, mem_driver) => mem_driver, 20 24 Driver::Disk(_) => panic!("too big"), ··· 26 30 let mut prev_rkey = "".to_string(); 27 31 28 32 while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 29 - for (rkey, size) in pairs { 33 + for (rkey, bytes) in pairs { 30 34 records += 1; 35 + 36 + let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 37 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 38 + 31 39 sum += size; 32 40 if rkey == "app.bsky.actor.profile/self" { 33 41 found_bsky_profile = true;

History

2 rounds 0 comments
sign up or login to add to the discussion
21 commits
expand
fjall 3.0.0 buggy
sqlite -> fjall 2 for ~2x speedup
fjall v3
some settings that might not make sense
yay fixessss
use mimalloc for disk read example
bytes
just vec again
custom mst node deserialize
well it kinda works again
simpler depth handling
remove old code
reduce fjall mem
update timings etc
fix example
fmt
old unused error
get the api closer to what it was befor
clean up builder
drop empty-mst special-casing
get ready for release
expand 0 comments
pull request successfully merged
bad-example.com submitted #0
20 commits
expand
fjall 3.0.0 buggy
sqlite -> fjall 2 for ~2x speedup
fjall v3
some settings that might not make sense
yay fixessss
use mimalloc for disk read example
bytes
just vec again
custom mst node deserialize
well it kinda works again
simpler depth handling
remove old code
reduce fjall mem
update timings etc
fix example
fmt
old unused error
get the api closer to what it was befor
clean up builder
drop empty-mst special-casing
expand 0 comments