Fast and robust atproto CAR file processing in rust

next version #1

merged opened by bad-example.com targeting main from fjall3

ended up being a significant rewrite oops!

  • drop sqlite, pick up fjall v3 for some speeeeeeed (and code simplification and easier build requirements and)
  • no more Processable trait, process functions are just Vec<u8> -> Vec<u8> now (bring your own ser/de). there's a potential small cost here where processors need to now actually go through serialization even for in-memory car walking, but i think zero-copy approaches (eg. rkyv) are low-cost enough
  • custom deserialize for MST nodes that does as much depth calculation and rkey validation as possible in-line. (not clear if it actually made anything faster)
  • check MST depth at every node properly (previously it could do some walking before being able to check and included some assumptions)
  • check MST for empty leaf nodes (which not allowed)
  • shave 0.6 nanoseconds (really) from MST depth calculation (don't ask)

i guess i should put this in a changelog

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:hdhoaan3xa3jiuq4fg4mefid/sh.tangled.repo.pull/3mchuvd4loj22
+1056 -926
Diff #0
+228 -87
Cargo.lock
··· 27 ] 28 29 [[package]] 30 name = "anes" 31 version = "0.1.6" 32 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 126 ] 127 128 [[package]] 129 - name = "bincode" 130 - version = "2.0.1" 131 - source = "registry+https://github.com/rust-lang/crates.io-index" 132 - checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" 133 - dependencies = [ 134 - "bincode_derive", 135 - "serde", 136 - "unty", 137 - ] 138 - 139 - [[package]] 140 - name = "bincode_derive" 141 - version = "2.0.1" 142 - source = "registry+https://github.com/rust-lang/crates.io-index" 143 - checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" 144 - dependencies = [ 145 - "virtue", 146 - ] 147 - 148 - [[package]] 149 name = "bitflags" 150 version = "2.9.4" 151 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 169 [[package]] 170 name = "bytes" 171 - version = "1.10.1" 172 source = "registry+https://github.com/rust-lang/crates.io-index" 173 - checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 174 175 [[package]] 176 name = "cast" ··· 188 ] 189 190 [[package]] 191 name = "cfg-if" 192 version = "1.0.3" 193 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 281 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 282 283 [[package]] 284 name = "const-str" 285 version = "0.4.3" 286 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 358 ] 359 360 [[package]] 361 name = "crossbeam-utils" 362 version = "0.8.21" 363 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 380 ] 381 382 [[package]] 383 name = "data-encoding" 384 version = "2.9.0" 385 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 422 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 423 424 [[package]] 425 name = "env_filter" 426 version = "0.1.3" 427 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 445 ] 446 447 [[package]] 448 name = "errno" 449 version = "0.3.14" 450 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 455 ] 456 457 [[package]] 458 - name = "fallible-iterator" 459 - version = "0.3.0" 460 source = "registry+https://github.com/rust-lang/crates.io-index" 461 - checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" 462 463 [[package]] 464 - name = "fallible-streaming-iterator" 465 - version = "0.1.9" 466 source = "registry+https://github.com/rust-lang/crates.io-index" 467 - checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 468 469 [[package]] 470 - name = "fastrand" 471 - version = "2.3.0" 472 source = "registry+https://github.com/rust-lang/crates.io-index" 473 - checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 474 475 [[package]] 476 name = "foldhash" 477 - version = "0.1.5" 478 source = "registry+https://github.com/rust-lang/crates.io-index" 479 - checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 480 481 [[package]] 482 name = "futures" ··· 608 609 [[package]] 610 name = "hashbrown" 611 - version = "0.15.5" 612 source = "registry+https://github.com/rust-lang/crates.io-index" 613 - checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" 614 - dependencies = [ 615 - "foldhash", 616 - ] 617 618 [[package]] 619 - name = "hashlink" 620 - version = "0.10.0" 621 source = "registry+https://github.com/rust-lang/crates.io-index" 622 - checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" 623 dependencies = [ 624 - "hashbrown", 625 ] 626 627 [[package]] ··· 631 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 632 633 [[package]] 634 name = "io-uring" 635 version = "0.7.10" 636 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 730 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 731 732 [[package]] 733 - name = "libsqlite3-sys" 734 - version = "0.35.0" 735 source = "registry+https://github.com/rust-lang/crates.io-index" 736 - checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f" 737 dependencies = [ 738 - "pkg-config", 739 - "vcpkg", 740 ] 741 742 [[package]] ··· 761 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 762 763 [[package]] 764 name = "match-lookup" 765 version = "0.1.1" 766 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 776 version = "2.7.6" 777 source = "registry+https://github.com/rust-lang/crates.io-index" 778 checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" 779 780 [[package]] 781 name = "miniz_oxide" ··· 892 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 893 894 [[package]] 895 - name = "pkg-config" 896 - version = "0.3.32" 897 - source = "registry+https://github.com/rust-lang/crates.io-index" 898 - checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 899 - 900 - [[package]] 901 name = "plotters" 902 version = "0.3.7" 903 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 947 checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 948 dependencies = [ 949 "unicode-ident", 950 ] 951 952 [[package]] ··· 1026 name = "repo-stream" 1027 version = "0.2.2" 1028 dependencies = [ 1029 - "bincode", 1030 "clap", 1031 "criterion", 1032 "env_logger", 1033 - "futures", 1034 - "futures-core", 1035 - "ipld-core", 1036 "iroh-car", 1037 "log", 1038 "multibase", 1039 - "rusqlite", 1040 "serde", 1041 "serde_bytes", 1042 "serde_ipld_dagcbor", ··· 1047 ] 1048 1049 [[package]] 1050 - name = "rusqlite" 1051 - version = "0.37.0" 1052 source = "registry+https://github.com/rust-lang/crates.io-index" 1053 - checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f" 1054 - dependencies = [ 1055 - "bitflags", 1056 - "fallible-iterator", 1057 - "fallible-streaming-iterator", 1058 - "hashlink", 1059 - "libsqlite3-sys", 1060 - "smallvec", 1061 - ] 1062 1063 [[package]] 1064 - name = "rustc-demangle" 1065 - version = "0.1.26" 1066 source = "registry+https://github.com/rust-lang/crates.io-index" 1067 - checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1068 1069 [[package]] 1070 name = "rustix" ··· 1105 version = "1.2.0" 1106 source = "registry+https://github.com/rust-lang/crates.io-index" 1107 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1108 1109 [[package]] 1110 name = "serde" ··· 1169 "ryu", 1170 "serde", 1171 "serde_core", 1172 ] 1173 1174 [[package]] ··· 1183 ] 1184 1185 [[package]] 1186 name = "signal-hook-registry" 1187 version = "1.4.6" 1188 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1211 dependencies = [ 1212 "libc", 1213 "windows-sys 0.59.0", 1214 ] 1215 1216 [[package]] ··· 1360 checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" 1361 1362 [[package]] 1363 - name = "unty" 1364 - version = "0.0.4" 1365 - source = "registry+https://github.com/rust-lang/crates.io-index" 1366 - checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" 1367 - 1368 - [[package]] 1369 name = "utf8parse" 1370 version = "0.2.2" 1371 source = "registry+https://github.com/rust-lang/crates.io-index" 1372 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1373 1374 [[package]] 1375 - name = "vcpkg" 1376 - version = "0.2.15" 1377 source = "registry+https://github.com/rust-lang/crates.io-index" 1378 - checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 1379 1380 [[package]] 1381 name = "version_check" 1382 version = "0.9.5" 1383 source = "registry+https://github.com/rust-lang/crates.io-index" 1384 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 1385 - 1386 - [[package]] 1387 - name = "virtue" 1388 - version = "0.0.18" 1389 - source = "registry+https://github.com/rust-lang/crates.io-index" 1390 - checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" 1391 1392 [[package]] 1393 name = "walkdir" ··· 1659 version = "0.46.0" 1660 source = "registry+https://github.com/rust-lang/crates.io-index" 1661 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1662 1663 [[package]] 1664 name = "zerocopy"
··· 27 ] 28 29 [[package]] 30 + name = "allocator-api2" 31 + version = "0.2.21" 32 + source = "registry+https://github.com/rust-lang/crates.io-index" 33 + checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" 34 + 35 + [[package]] 36 name = "anes" 37 version = "0.1.6" 38 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 132 ] 133 134 [[package]] 135 name = "bitflags" 136 version = "2.9.4" 137 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 153 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 154 155 [[package]] 156 + name = "byteorder-lite" 157 + version = "0.1.0" 158 + source = "registry+https://github.com/rust-lang/crates.io-index" 159 + checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 160 + 161 + [[package]] 162 name = "bytes" 163 + version = "1.11.0" 164 + source = "registry+https://github.com/rust-lang/crates.io-index" 165 + checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" 166 + 167 + [[package]] 168 + name = "byteview" 169 + version = "0.10.0" 170 source = "registry+https://github.com/rust-lang/crates.io-index" 171 + checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca" 172 173 [[package]] 174 name = "cast" ··· 186 ] 187 188 [[package]] 189 + name = "cc" 190 + version = "1.2.52" 191 + source = "registry+https://github.com/rust-lang/crates.io-index" 192 + checksum = "cd4932aefd12402b36c60956a4fe0035421f544799057659ff86f923657aada3" 193 + dependencies = [ 194 + "find-msvc-tools", 195 + "shlex", 196 + ] 197 + 198 + [[package]] 199 name = "cfg-if" 200 version = "1.0.3" 201 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 289 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 290 291 [[package]] 292 + name = "compare" 293 + version = "0.0.6" 294 + source = "registry+https://github.com/rust-lang/crates.io-index" 295 + checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" 296 + 297 + [[package]] 298 name = "const-str" 299 version = "0.4.3" 300 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 372 ] 373 374 [[package]] 375 + name = "crossbeam-skiplist" 376 + version = "0.1.3" 377 + source = "registry+https://github.com/rust-lang/crates.io-index" 378 + checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" 379 + dependencies = [ 380 + "crossbeam-epoch", 381 + "crossbeam-utils", 382 + ] 383 + 384 + [[package]] 385 name = "crossbeam-utils" 386 version = "0.8.21" 387 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 404 ] 405 406 [[package]] 407 + name = "dashmap" 408 + version = "6.1.0" 409 + source = "registry+https://github.com/rust-lang/crates.io-index" 410 + checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" 411 + dependencies = [ 412 + "cfg-if", 413 + "crossbeam-utils", 414 + "hashbrown 0.14.5", 415 + "lock_api", 416 + "once_cell", 417 + "parking_lot_core", 418 + ] 419 + 420 + [[package]] 421 name = "data-encoding" 422 version = "2.9.0" 423 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 460 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 461 462 [[package]] 463 + name = "enum_dispatch" 464 + version = "0.3.13" 465 + source = "registry+https://github.com/rust-lang/crates.io-index" 466 + checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" 467 + dependencies = [ 468 + "once_cell", 469 + "proc-macro2", 470 + "quote", 471 + "syn 2.0.106", 472 + ] 473 + 474 + [[package]] 475 name = "env_filter" 476 version = "0.1.3" 477 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 495 ] 496 497 [[package]] 498 + name = "equivalent" 499 + version = "1.0.2" 500 + source = "registry+https://github.com/rust-lang/crates.io-index" 501 + checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" 502 + 503 + [[package]] 504 name = "errno" 505 version = "0.3.14" 506 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 511 ] 512 513 [[package]] 514 + name = "fastrand" 515 + version = "2.3.0" 516 source = "registry+https://github.com/rust-lang/crates.io-index" 517 + checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 518 519 [[package]] 520 + name = "find-msvc-tools" 521 + version = "0.1.7" 522 source = "registry+https://github.com/rust-lang/crates.io-index" 523 + checksum = "f449e6c6c08c865631d4890cfacf252b3d396c9bcc83adb6623cdb02a8336c41" 524 + 525 + [[package]] 526 + name = "fjall" 527 + version = "3.0.1" 528 + source = "registry+https://github.com/rust-lang/crates.io-index" 529 + checksum = "4f69637c02d38ad1b0f003101d0195a60368130aa17d9ef78b1557d265a22093" 530 + dependencies = [ 531 + "byteorder-lite", 532 + "byteview", 533 + "dashmap", 534 + "flume", 535 + "log", 536 + "lsm-tree", 537 + "tempfile", 538 + "xxhash-rust", 539 + ] 540 541 [[package]] 542 + name = "flume" 543 + version = "0.12.0" 544 source = "registry+https://github.com/rust-lang/crates.io-index" 545 + checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 546 + dependencies = [ 547 + "spin", 548 + ] 549 550 [[package]] 551 name = "foldhash" 552 + version = "0.2.0" 553 source = "registry+https://github.com/rust-lang/crates.io-index" 554 + checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" 555 556 [[package]] 557 name = "futures" ··· 683 684 [[package]] 685 name = "hashbrown" 686 + version = "0.14.5" 687 source = "registry+https://github.com/rust-lang/crates.io-index" 688 + checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" 689 690 [[package]] 691 + name = "hashbrown" 692 + version = "0.16.1" 693 source = "registry+https://github.com/rust-lang/crates.io-index" 694 + checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 695 dependencies = [ 696 + "allocator-api2", 697 + "equivalent", 698 + "foldhash", 699 ] 700 701 [[package]] ··· 705 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 706 707 [[package]] 708 + name = "hmac-sha256" 709 + version = "1.1.12" 710 + source = "registry+https://github.com/rust-lang/crates.io-index" 711 + checksum = "ad6880c8d4a9ebf39c6e8b77007ce223f646a4d21ce29d99f70cb16420545425" 712 + 713 + [[package]] 714 + name = "interval-heap" 715 + version = "0.0.5" 716 + source = "registry+https://github.com/rust-lang/crates.io-index" 717 + checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" 718 + dependencies = [ 719 + "compare", 720 + ] 721 + 722 + [[package]] 723 name = "io-uring" 724 version = "0.7.10" 725 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 819 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 820 821 [[package]] 822 + name = "libmimalloc-sys" 823 + version = "0.1.44" 824 source = "registry+https://github.com/rust-lang/crates.io-index" 825 + checksum = "667f4fec20f29dfc6bc7357c582d91796c169ad7e2fce709468aefeb2c099870" 826 dependencies = [ 827 + "cc", 828 + "libc", 829 ] 830 831 [[package]] ··· 850 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 851 852 [[package]] 853 + name = "lsm-tree" 854 + version = "3.0.1" 855 + source = "registry+https://github.com/rust-lang/crates.io-index" 856 + checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb" 857 + dependencies = [ 858 + "byteorder-lite", 859 + "byteview", 860 + "crossbeam-skiplist", 861 + "enum_dispatch", 862 + "interval-heap", 863 + "log", 864 + "quick_cache", 865 + "rustc-hash", 866 + "self_cell", 867 + "sfa", 868 + "tempfile", 869 + "varint-rs", 870 + "xxhash-rust", 871 + ] 872 + 873 + [[package]] 874 name = "match-lookup" 875 version = "0.1.1" 876 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 886 version = "2.7.6" 887 source = "registry+https://github.com/rust-lang/crates.io-index" 888 checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" 889 + 890 + [[package]] 891 + name = "mimalloc" 892 + version = "0.1.48" 893 + source = "registry+https://github.com/rust-lang/crates.io-index" 894 + checksum = "e1ee66a4b64c74f4ef288bcbb9192ad9c3feaad75193129ac8509af543894fd8" 895 + dependencies = [ 896 + "libmimalloc-sys", 897 + ] 898 899 [[package]] 900 name = "miniz_oxide" ··· 1011 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 1012 1013 [[package]] 1014 name = "plotters" 1015 version = "0.3.7" 1016 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1060 checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 1061 dependencies = [ 1062 "unicode-ident", 1063 + ] 1064 + 1065 + [[package]] 1066 + name = "quick_cache" 1067 + version = "0.6.18" 1068 + source = "registry+https://github.com/rust-lang/crates.io-index" 1069 + checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3" 1070 + dependencies = [ 1071 + "equivalent", 1072 + "hashbrown 0.16.1", 1073 ] 1074 1075 [[package]] ··· 1149 name = "repo-stream" 1150 version = "0.2.2" 1151 dependencies = [ 1152 + "cid", 1153 "clap", 1154 "criterion", 1155 "env_logger", 1156 + "fjall", 1157 + "hashbrown 0.16.1", 1158 + "hmac-sha256", 1159 "iroh-car", 1160 "log", 1161 + "mimalloc", 1162 "multibase", 1163 "serde", 1164 "serde_bytes", 1165 "serde_ipld_dagcbor", ··· 1170 ] 1171 1172 [[package]] 1173 + name = "rustc-demangle" 1174 + version = "0.1.26" 1175 source = "registry+https://github.com/rust-lang/crates.io-index" 1176 + checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1177 1178 [[package]] 1179 + name = "rustc-hash" 1180 + version = "2.1.1" 1181 source = "registry+https://github.com/rust-lang/crates.io-index" 1182 + checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 1183 1184 [[package]] 1185 name = "rustix" ··· 1220 version = "1.2.0" 1221 source = "registry+https://github.com/rust-lang/crates.io-index" 1222 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1223 + 1224 + [[package]] 1225 + name = "self_cell" 1226 + version = "1.2.2" 1227 + source = "registry+https://github.com/rust-lang/crates.io-index" 1228 + checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" 1229 1230 [[package]] 1231 name = "serde" ··· 1290 "ryu", 1291 "serde", 1292 "serde_core", 1293 + ] 1294 + 1295 + [[package]] 1296 + name = "sfa" 1297 + version = "1.0.0" 1298 + source = "registry+https://github.com/rust-lang/crates.io-index" 1299 + checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 1300 + dependencies = [ 1301 + "byteorder-lite", 1302 + "log", 1303 + "xxhash-rust", 1304 ] 1305 1306 [[package]] ··· 1315 ] 1316 1317 [[package]] 1318 + name = "shlex" 1319 + version = "1.3.0" 1320 + source = "registry+https://github.com/rust-lang/crates.io-index" 1321 + checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" 1322 + 1323 + [[package]] 1324 name = "signal-hook-registry" 1325 version = "1.4.6" 1326 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1349 dependencies = [ 1350 "libc", 1351 "windows-sys 0.59.0", 1352 + ] 1353 + 1354 + [[package]] 1355 + name = "spin" 1356 + version = "0.9.8" 1357 + source = "registry+https://github.com/rust-lang/crates.io-index" 1358 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1359 + dependencies = [ 1360 + "lock_api", 1361 ] 1362 1363 [[package]] ··· 1507 checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" 1508 1509 [[package]] 1510 name = "utf8parse" 1511 version = "0.2.2" 1512 source = "registry+https://github.com/rust-lang/crates.io-index" 1513 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1514 1515 [[package]] 1516 + name = "varint-rs" 1517 + version = "2.2.0" 1518 source = "registry+https://github.com/rust-lang/crates.io-index" 1519 + checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" 1520 1521 [[package]] 1522 name = "version_check" 1523 version = "0.9.5" 1524 source = "registry+https://github.com/rust-lang/crates.io-index" 1525 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 1526 1527 [[package]] 1528 name = "walkdir" ··· 1794 version = "0.46.0" 1795 source = "registry+https://github.com/rust-lang/crates.io-index" 1796 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1797 + 1798 + [[package]] 1799 + name = "xxhash-rust" 1800 + version = "0.8.15" 1801 + source = "registry+https://github.com/rust-lang/crates.io-index" 1802 + checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 1803 1804 [[package]] 1805 name = "zerocopy"
+10 -7
Cargo.toml
··· 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 9 [dependencies] 10 - bincode = { version = "2.0.1", features = ["serde"] } 11 - futures = "0.3.31" 12 - futures-core = "0.3.31" 13 - ipld-core = { version = "0.4.2", features = ["serde"] } 14 iroh-car = "0.5.1" 15 log = "0.4.28" 16 - multibase = "0.9.2" 17 - rusqlite = "0.37.0" 18 serde = { version = "1.0.228", features = ["derive"] } 19 serde_bytes = "0.11.19" 20 serde_ipld_dagcbor = "0.6.4" 21 - sha2 = "0.10.9" 22 thiserror = "2.0.17" 23 tokio = { version = "1.47.1", features = ["rt", "sync"] } 24 ··· 29 multibase = "0.9.2" 30 tempfile = "3.23.0" 31 tokio = { version = "1.47.1", features = ["full"] } 32 33 [profile.profiling] 34 inherits = "release" ··· 44 [[bench]] 45 name = "huge-car" 46 harness = false
··· 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 9 [dependencies] 10 + fjall = { version = "3.0.1", default-features = false } 11 + hashbrown = "0.16.1" 12 + cid = { version = "0.11.1", features = ["serde"] } 13 iroh-car = "0.5.1" 14 log = "0.4.28" 15 serde = { version = "1.0.228", features = ["derive"] } 16 serde_bytes = "0.11.19" 17 serde_ipld_dagcbor = "0.6.4" 18 + sha2 = "0.10.9" # note: hmac-sha256 is simpler, smaller, benches ~15ns slower 19 thiserror = "2.0.17" 20 tokio = { version = "1.47.1", features = ["rt", "sync"] } 21 ··· 26 multibase = "0.9.2" 27 tempfile = "3.23.0" 28 tokio = { version = "1.47.1", features = ["full"] } 29 + mimalloc = "0.1.48" 30 + hmac-sha256 = "1.1.12" 31 32 [profile.profiling] 33 inherits = "release" ··· 43 [[bench]] 44 name = "huge-car" 45 harness = false 46 + 47 + # [[bench]] 48 + # name = "leading" 49 + # harness = false
+11 -4
benches/huge-car.rs
··· 4 5 use criterion::{Criterion, criterion_group, criterion_main}; 6 7 pub fn criterion_benchmark(c: &mut Criterion) { 8 let rt = tokio::runtime::Builder::new_multi_thread() 9 .enable_all() ··· 18 }); 19 } 20 21 async fn drive_car(filename: impl AsRef<Path>) -> usize { 22 let reader = tokio::fs::File::open(filename).await.unwrap(); 23 let reader = tokio::io::BufReader::new(reader); 24 25 - let mut driver = match Driver::load_car(reader, |block| block.len(), 1024) 26 - .await 27 - .unwrap() 28 - { 29 Driver::Memory(_, mem_driver) => mem_driver, 30 Driver::Disk(_) => panic!("not doing disk for benchmark"), 31 };
··· 4 5 use criterion::{Criterion, criterion_group, criterion_main}; 6 7 + // use mimalloc::MiMalloc; 8 + // #[global_allocator] 9 + // static GLOBAL: MiMalloc = MiMalloc; 10 + 11 pub fn criterion_benchmark(c: &mut Criterion) { 12 let rt = tokio::runtime::Builder::new_multi_thread() 13 .enable_all() ··· 22 }); 23 } 24 25 + #[inline(always)] 26 + fn ser(block: Vec<u8>) -> Vec<u8> { 27 + let s = block.len(); 28 + usize::to_ne_bytes(s).to_vec() 29 + } 30 + 31 async fn drive_car(filename: impl AsRef<Path>) -> usize { 32 let reader = tokio::fs::File::open(filename).await.unwrap(); 33 let reader = tokio::io::BufReader::new(reader); 34 35 + let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() { 36 Driver::Memory(_, mem_driver) => mem_driver, 37 Driver::Disk(_) => panic!("not doing disk for benchmark"), 38 };
+66
benches/leading.rs
···
··· 1 + use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; 2 + use hmac_sha256::Hash; 3 + use sha2::{Digest, Sha256}; 4 + 5 + pub fn compute(bytes: [u8; 32]) -> u32 { 6 + let mut zeros = 0; 7 + for byte in bytes { 8 + if byte == 0 { 9 + zeros += 8 10 + } else { 11 + zeros += byte.leading_zeros(); 12 + break; 13 + } 14 + } 15 + zeros / 2 16 + } 17 + 18 + pub fn compute2(bytes: [u8; 32]) -> u32 { 19 + u128::from_be_bytes(bytes.split_at(16).0.try_into().unwrap()).leading_zeros() / 2 20 + } 21 + 22 + fn from_key_old(key: &[u8]) -> u32 { 23 + compute2(Sha256::digest(key).into()) 24 + } 25 + 26 + fn from_key_new(key: &[u8]) -> u32 { 27 + compute2(Hash::hash(key).into()) 28 + } 29 + 30 + pub fn criterion_benchmark(c: &mut Criterion) { 31 + for (name, case) in [ 32 + ("no zeros", [0xFF; 32]), 33 + ("two zeros", [0x3F; 32]), 34 + ( 35 + "some zeros", 36 + [ 37 + 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 38 + 1, 1, 1, 1, 39 + ], 40 + ), 41 + ( 42 + "many zeros", 43 + [ 44 + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 45 + 1, 1, 1, 1, 46 + ], 47 + ), 48 + ] { 49 + let mut g = c.benchmark_group(name); 50 + g.bench_function("old", |b| { 51 + b.iter_batched(|| case.clone(), |c| compute(c), BatchSize::SmallInput) 52 + }); 53 + g.bench_function("new", |b| { 54 + b.iter_batched(|| case.clone(), |c| compute2(c), BatchSize::SmallInput) 55 + }); 56 + } 57 + 58 + for case in ["a", "aa", "aaa", "aaaa"] { 59 + let mut g = c.benchmark_group(case); 60 + g.bench_function("old", |b| b.iter(|| from_key_old(case.as_bytes()))); 61 + g.bench_function("new", |b| b.iter(|| from_key_new(case.as_bytes()))); 62 + } 63 + } 64 + 65 + criterion_group!(benches, criterion_benchmark); 66 + criterion_main!(benches);
+11 -4
benches/non-huge-cars.rs
··· 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 6 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 7 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 8 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); ··· 28 }); 29 } 30 31 async fn drive_car(bytes: &[u8]) -> usize { 32 - let mut driver = match Driver::load_car(bytes, |block| block.len(), 32) 33 - .await 34 - .unwrap() 35 - { 36 Driver::Memory(_, mem_driver) => mem_driver, 37 Driver::Disk(_) => panic!("not benching big cars here"), 38 };
··· 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 6 + // use mimalloc::MiMalloc; 7 + // #[global_allocator] 8 + // static GLOBAL: MiMalloc = MiMalloc; 9 + 10 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 11 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 12 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); ··· 32 }); 33 } 34 35 + #[inline(always)] 36 + fn ser(block: Vec<u8>) -> Vec<u8> { 37 + let s = block.len(); 38 + usize::to_ne_bytes(s).to_vec() 39 + } 40 + 41 async fn drive_car(bytes: &[u8]) -> usize { 42 + let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 43 Driver::Memory(_, mem_driver) => mem_driver, 44 Driver::Disk(_) => panic!("not benching big cars here"), 45 };
+10 -5
examples/disk-read-file/main.rs
··· 3 */ 4 5 extern crate repo_stream; 6 use clap::Parser; 7 use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 8 use std::path::PathBuf; ··· 33 // in this example we only bother handling CARs that are too big for memory 34 // `noop` helper means: do no block processing, store the raw blocks 35 let driver = match DriverBuilder::new() 36 - .with_mem_limit_mb(10) // how much memory can be used before disk spill 37 .load_car(reader) 38 .await? 39 { ··· 52 // via the DID from the commit, and then verify the signature. 53 log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 54 55 // pop the driver back out to get some code indentation relief 56 driver 57 } ··· 82 83 log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 85 - // clean up the database. would be nice to do this in drop so it happens 86 - // automatically, but some blocking work happens, so that's not allowed in 87 - // async rust. 🤷‍♀️ 88 - join.await?.reset_store().await?; 89 90 log::info!("done. n={n} zeros={zeros}"); 91
··· 3 */ 4 5 extern crate repo_stream; 6 + 7 + use mimalloc::MiMalloc; 8 + #[global_allocator] 9 + static GLOBAL: MiMalloc = MiMalloc; 10 + 11 use clap::Parser; 12 use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 13 use std::path::PathBuf; ··· 38 // in this example we only bother handling CARs that are too big for memory 39 // `noop` helper means: do no block processing, store the raw blocks 40 let driver = match DriverBuilder::new() 41 + .with_mem_limit_mb(32) // how much memory can be used before disk spill 42 .load_car(reader) 43 .await? 44 { ··· 57 // via the DID from the commit, and then verify the signature. 58 log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 59 60 + // log::info!("now is good time to check mem usage..."); 61 + // tokio::time::sleep(std::time::Duration::from_secs(15)).await; 62 + 63 // pop the driver back out to get some code indentation relief 64 driver 65 } ··· 90 91 log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 92 93 + join.await?; 94 95 log::info!("done. n={n} zeros={zeros}"); 96
+1 -1
examples/read-file/main.rs
··· 24 let reader = tokio::io::BufReader::new(reader); 25 26 let (commit, mut driver) = match DriverBuilder::new() 27 - .with_block_processor(|block| block.len()) 28 .load_car(reader) 29 .await? 30 {
··· 24 let reader = tokio::io::BufReader::new(reader); 25 26 let (commit, mut driver) = match DriverBuilder::new() 27 + .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 28 .load_car(reader) 29 .await? 30 {
+49 -10
readme.md
··· 50 total_size += size; 51 } 52 } 53 - 54 - // clean up the disk store (drop tables etc) 55 - driver.reset_store().await?; 56 } 57 }; 58 println!("sum of size of all records: {total_size}"); ··· 61 ``` 62 63 more recent todo 64 - 65 - - [ ] get an *emtpy* car for the test suite 66 - [x] implement a max size on disk limit 67 68 ··· 73 74 current car processing times (records processed into their length usize, phil's dev machine): 75 76 - - 128MiB CAR file: `347ms` 77 - - 5.0MiB: `6.1ms` 78 - - 279KiB: `139us` 79 - - 3.4KiB: `4.9us` 80 81 82 - running the huge-car benchmark 83 84 - to avoid committing it to the repo, you have to pass it in through the env for now. 85
··· 50 total_size += size; 51 } 52 } 53 } 54 }; 55 println!("sum of size of all records: {total_size}"); ··· 58 ``` 59 60 more recent todo 61 + - [ ] repo car slices 62 + - [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling) 63 + - [x] get an *emtpy* car for the test suite 64 - [x] implement a max size on disk limit 65 66 ··· 71 72 current car processing times (records processed into their length usize, phil's dev machine): 73 74 + - 450MiB CAR file (huge): `1.3s` 75 + - 128MiB (huge): `350ms` 76 + - 5.0MiB: `6.8ms` 77 + - 279KiB: `160us` 78 + - 3.4KiB: `5.1us` 79 + - empty: `690ns` 80 + 81 + it's a little faster with `mimalloc` 82 + 83 + ```rust 84 + use mimalloc::MiMalloc; 85 + #[global_allocator] 86 + static GLOBAL: MiMalloc = MiMalloc; 87 + ``` 88 + 89 + - 450MiB CAR file: `1.2s` (-8%) 90 + - 128MiB: `300ms` (-14%) 91 + - 5.0MiB: `6.0ms` (-11%) 92 + - 279KiB: `150us` (-7%) 93 + - 3.4KiB: `4.7us` (-8%) 94 + - empty: `670ns` (-4%) 95 + 96 + processing CARs requires buffering blocks, so it can consume a lot of memory. repo-stream's in-memory driver has minimal memory overhead, but there are two ways to make it work with less mem (you can do either or both!) 97 + 98 + 1. spill blocks to disk 99 + 2. inline block processing 100 + 101 + #### spill blocks to disk 102 + 103 + this is a little slower but can greatly reduce the memory used. there's nothing special you need to do for this. 104 + 105 + 106 + #### inline block processing 107 + 108 + if you don't need to store the complete records, you can have repo-stream try to optimistically apply a processing function to the raw blocks as they are streamed in. 109 + 110 + 111 + #### constrained mem perf comparison 112 113 + sketchy benchmark but hey. mimalloc is enabled, and the processing spills to disk. inline processing reduces entire records to 8 bytes (usize of the raw record block size): 114 115 + - 450MiB CAR file: `5.0s` (4.5x slowdown for disk) 116 + - 128MiB: `1.27s` (4.1x slowdown) 117 + 118 + fortunately, most CARs in the ATmosphere are very small, so for eg. backfill purposes, the vast majority of inputs will not face this slowdown. 119 + 120 + 121 + #### running the huge-car benchmark 122 123 - to avoid committing it to the repo, you have to pass it in through the env for now. 124
+32 -99
src/disk.rs
··· 17 ``` 18 */ 19 20 - use crate::drive::DriveError; 21 - use rusqlite::OptionalExtension; 22 use std::path::PathBuf; 23 24 #[derive(Debug, thiserror::Error)] ··· 28 /// (The wrapped err should probably be obscured to remove public-facing 29 /// sqlite bits) 30 #[error(transparent)] 31 - DbError(#[from] rusqlite::Error), 32 /// A tokio blocking task failed to join 33 #[error("Failed to join a tokio blocking task: {0}")] 34 JoinError(#[from] tokio::task::JoinError), ··· 38 /// limit. 39 #[error("Maximum disk size reached")] 40 MaxSizeExceeded, 41 - #[error("this error was replaced, seeing this is a bug.")] 42 - #[doc(hidden)] 43 - Stolen, 44 - } 45 - 46 - impl DiskError { 47 - /// hack for ownership challenges with the disk driver 48 - pub(crate) fn steal(&mut self) -> Self { 49 - let mut swapped = DiskError::Stolen; 50 - std::mem::swap(self, &mut swapped); 51 - swapped 52 - } 53 } 54 55 /// Builder-style disk store setup ··· 71 impl Default for DiskBuilder { 72 fn default() -> Self { 73 Self { 74 - cache_size_mb: 32, 75 max_stored_mb: 10 * 1024, // 10 GiB 76 } 77 } ··· 84 } 85 /// Set the in-memory cache allowance for the database 86 /// 87 - /// Default: 32 MiB 88 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 89 self.cache_size_mb = size; 90 self ··· 104 105 /// On-disk block storage 106 pub struct DiskStore { 107 - conn: rusqlite::Connection, 108 max_stored: usize, 109 stored: usize, 110 } ··· 117 max_stored_mb: usize, 118 ) -> Result<Self, DiskError> { 119 let max_stored = max_stored_mb * 2_usize.pow(20); 120 - let conn = tokio::task::spawn_blocking(move || { 121 - let conn = rusqlite::Connection::open(path)?; 122 - 123 - let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 124 - 125 - // conn.pragma_update(None, "journal_mode", "OFF")?; 126 - // conn.pragma_update(None, "journal_mode", "MEMORY")?; 127 - conn.pragma_update(None, "journal_mode", "WAL")?; 128 - // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 129 - conn.pragma_update(None, "synchronous", "OFF")?; 130 - conn.pragma_update( 131 - None, 132 - "cache_size", 133 - (cache_mb as i64 * sqlite_one_mb).to_string(), 134 - )?; 135 - Self::reset_tables(&conn)?; 136 137 - Ok::<_, DiskError>(conn) 138 }) 139 .await??; 140 141 Ok(Self { 142 - conn, 143 max_stored, 144 stored: 0, 145 }) 146 } 147 - pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 148 - let tx = self.conn.transaction()?; 149 - Ok(SqliteWriter { 150 - tx, 151 - stored: &mut self.stored, 152 - max: self.max_stored, 153 - }) 154 - } 155 - pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 156 - let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 157 - Ok(SqliteReader { select_stmt }) 158 - } 159 - /// Drop and recreate the kv table 160 - pub async fn reset(self) -> Result<Self, DiskError> { 161 - tokio::task::spawn_blocking(move || { 162 - Self::reset_tables(&self.conn)?; 163 - Ok(self) 164 - }) 165 - .await? 166 - } 167 - fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 168 - conn.execute("DROP TABLE IF EXISTS blocks", ())?; 169 - conn.execute( 170 - "CREATE TABLE blocks ( 171 - key BLOB PRIMARY KEY NOT NULL, 172 - val BLOB NOT NULL 173 - ) WITHOUT ROWID", 174 - (), 175 - )?; 176 - Ok(()) 177 - } 178 - } 179 180 - pub(crate) struct SqliteWriter<'conn> { 181 - tx: rusqlite::Transaction<'conn>, 182 - stored: &'conn mut usize, 183 - max: usize, 184 - } 185 - 186 - impl SqliteWriter<'_> { 187 pub(crate) fn put_many( 188 &mut self, 189 - kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 190 ) -> Result<(), DriveError> { 191 - let mut insert_stmt = self 192 - .tx 193 - .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 194 - .map_err(DiskError::DbError)?; 195 - for pair in kv { 196 - let (k, v) = pair?; 197 - *self.stored += v.len(); 198 - if *self.stored > self.max { 199 return Err(DiskError::MaxSizeExceeded.into()); 200 } 201 - insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 202 } 203 - Ok(()) 204 - } 205 - pub fn commit(self) -> Result<(), DiskError> { 206 - self.tx.commit()?; 207 Ok(()) 208 } 209 - } 210 - 211 - pub(crate) struct SqliteReader<'conn> { 212 - select_stmt: rusqlite::Statement<'conn>, 213 - } 214 215 - impl SqliteReader<'_> { 216 - pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 217 - self.select_stmt 218 - .query_one((&key,), |row| row.get(0)) 219 - .optional() 220 } 221 }
··· 17 ``` 18 */ 19 20 + use crate::{Bytes, drive::DriveError}; 21 + use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 22 use std::path::PathBuf; 23 24 #[derive(Debug, thiserror::Error)] ··· 28 /// (The wrapped err should probably be obscured to remove public-facing 29 /// sqlite bits) 30 #[error(transparent)] 31 + DbError(#[from] FjallError), 32 /// A tokio blocking task failed to join 33 #[error("Failed to join a tokio blocking task: {0}")] 34 JoinError(#[from] tokio::task::JoinError), ··· 38 /// limit. 39 #[error("Maximum disk size reached")] 40 MaxSizeExceeded, 41 } 42 43 /// Builder-style disk store setup ··· 59 impl Default for DiskBuilder { 60 fn default() -> Self { 61 Self { 62 + cache_size_mb: 64, 63 max_stored_mb: 10 * 1024, // 10 GiB 64 } 65 } ··· 72 } 73 /// Set the in-memory cache allowance for the database 74 /// 75 + /// Default: 64 MiB 76 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 77 self.cache_size_mb = size; 78 self ··· 92 93 /// On-disk block storage 94 pub struct DiskStore { 95 + #[allow(unused)] 96 + db: Database, 97 + partition: Keyspace, 98 max_stored: usize, 99 stored: usize, 100 } ··· 107 max_stored_mb: usize, 108 ) -> Result<Self, DiskError> { 109 let max_stored = max_stored_mb * 2_usize.pow(20); 110 + let (db, partition) = tokio::task::spawn_blocking(move || { 111 + let db = Database::builder(path) 112 + .manual_journal_persist(true) 113 + .worker_threads(1) 114 + .cache_size(cache_mb as u64 * 2_u64.pow(20) / 2) 115 + .temporary(true) 116 + .open()?; 117 + let opts = KeyspaceCreateOptions::default() 118 + .expect_point_read_hits(true) 119 + .max_memtable_size(16 * 2_u64.pow(20)); 120 + let partition = db.keyspace("z", || opts)?; 121 122 + Ok::<_, DiskError>((db, partition)) 123 }) 124 .await??; 125 126 Ok(Self { 127 + db, 128 + partition, 129 max_stored, 130 stored: 0, 131 }) 132 } 133 134 pub(crate) fn put_many( 135 &mut self, 136 + kv: impl Iterator<Item = (Vec<u8>, Bytes)>, 137 ) -> Result<(), DriveError> { 138 + let mut batch = self.db.batch(); 139 + for (k, v) in kv { 140 + self.stored += v.len(); 141 + if self.stored > self.max_stored { 142 return Err(DiskError::MaxSizeExceeded.into()); 143 } 144 + batch.insert(&self.partition, k, v); 145 } 146 + batch.commit().map_err(DiskError::DbError)?; 147 Ok(()) 148 } 149 150 + #[inline] 151 + pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 152 + self.partition.get(key) 153 } 154 }
+139 -215
src/drive.rs
··· 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 3 - use crate::disk::{DiskError, DiskStore}; 4 - use crate::process::Processable; 5 - use ipld_core::cid::Cid; 6 use iroh_car::CarReader; 7 - use serde::{Deserialize, Serialize}; 8 - use std::collections::HashMap; 9 use std::convert::Infallible; 10 use tokio::{io::AsyncRead, sync::mpsc}; 11 12 - use crate::mst::{Commit, Node}; 13 - use crate::walk::{Step, WalkError, Walker}; 14 15 /// Errors that can happen while consuming and emitting blocks and records 16 #[derive(Debug, thiserror::Error)] ··· 21 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 22 #[error("The Commit block reference by the root was not found")] 23 MissingCommit, 24 - #[error("The MST block {0} could not be found")] 25 - MissingBlock(Cid), 26 #[error("Failed to walk the mst tree: {0}")] 27 WalkError(#[from] WalkError), 28 #[error("CAR file had no roots")] 29 MissingRoot, 30 #[error("Storage error")] 31 StorageError(#[from] DiskError), 32 - #[error("Encode error: {0}")] 33 - BincodeEncodeError(#[from] bincode::error::EncodeError), 34 #[error("Tried to send on a closed channel")] 35 ChannelSendError, // SendError takes <T> which we don't need 36 #[error("Failed to join a task: {0}")] 37 JoinError(#[from] tokio::task::JoinError), 38 } 39 40 - #[derive(Debug, thiserror::Error)] 41 - pub enum DecodeError { 42 - #[error(transparent)] 43 - BincodeDecodeError(#[from] bincode::error::DecodeError), 44 - #[error("extra bytes remained after decoding")] 45 - ExtraGarbage, 46 - } 47 - 48 /// An in-order chunk of Rkey + (processed) Block pairs 49 - pub type BlockChunk<T> = Vec<(String, T)>; 50 51 - #[derive(Debug, Clone, Serialize, Deserialize)] 52 - pub(crate) enum MaybeProcessedBlock<T> { 53 /// A block that's *probably* a Node (but we can't know yet) 54 /// 55 /// It *can be* a record that suspiciously looks a lot like a node, so we 56 /// cannot eagerly turn it into a Node. We only know for sure what it is 57 /// when we actually walk down the MST 58 - Raw(Vec<u8>), 59 /// A processed record from a block that was definitely not a Node 60 /// 61 /// Processing has to be fallible because the CAR can have totally-unused ··· 71 /// There's an alternative here, which would be to kick unprocessable blocks 72 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 73 /// surface the typed error later if needed by trying to reprocess. 74 - Processed(T), 75 } 76 77 - impl<T: Processable> Processable for MaybeProcessedBlock<T> { 78 - /// TODO this is probably a little broken 79 - fn get_size(&self) -> usize { 80 - use std::{cmp::max, mem::size_of}; 81 - 82 - // enum is always as big as its biggest member? 83 - let base_size = max(size_of::<Vec<u8>>(), size_of::<T>()); 84 - 85 - let extra = match self { 86 - Self::Raw(bytes) => bytes.len(), 87 - Self::Processed(t) => t.get_size(), 88 - }; 89 - 90 - base_size + extra 91 - } 92 - } 93 - 94 - impl<T> MaybeProcessedBlock<T> { 95 - fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self { 96 - if Node::could_be(&data) { 97 MaybeProcessedBlock::Raw(data) 98 } else { 99 MaybeProcessedBlock::Processed(process(data)) 100 } 101 } 102 } 103 104 /// Read a CAR file, buffering blocks in memory or to disk 105 - pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 106 /// All blocks fit within the memory limit 107 /// 108 /// You probably want to check the commit's signature. You can go ahead and 109 /// walk the MST right away. 110 - Memory(Commit, MemDriver<T>), 111 /// Blocks exceed the memory limit 112 /// 113 /// You'll need to provide a disk storage to continue. The commit will be 114 /// returned and can be validated only once all blocks are loaded. 115 - Disk(NeedDisk<R, T>), 116 } 117 118 /// Builder-style driver setup 119 #[derive(Debug, Clone)] 120 pub struct DriverBuilder { 121 pub mem_limit_mb: usize, 122 } 123 124 impl Default for DriverBuilder { 125 fn default() -> Self { 126 - Self { mem_limit_mb: 16 } 127 } 128 } 129 ··· 135 /// Set the in-memory size limit, in MiB 136 /// 137 /// Default: 16 MiB 138 - pub fn with_mem_limit_mb(self, new_limit: usize) -> Self { 139 - Self { 140 - mem_limit_mb: new_limit, 141 - } 142 } 143 /// Set the block processor 144 /// 145 /// Default: noop, raw blocks will be emitted 146 - pub fn with_block_processor<T: Processable>( 147 - self, 148 - p: fn(Vec<u8>) -> T, 149 - ) -> DriverBuilderWithProcessor<T> { 150 - DriverBuilderWithProcessor { 151 - mem_limit_mb: self.mem_limit_mb, 152 - block_processor: p, 153 - } 154 - } 155 - /// Begin processing an atproto MST from a CAR file 156 - pub async fn load_car<R: AsyncRead + Unpin>( 157 - &self, 158 - reader: R, 159 - ) -> Result<Driver<R, Vec<u8>>, DriveError> { 160 - Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 161 - } 162 - } 163 - 164 - /// Builder-style driver intermediate step 165 - /// 166 - /// start from `DriverBuilder` 167 - #[derive(Debug, Clone)] 168 - pub struct DriverBuilderWithProcessor<T: Processable> { 169 - pub mem_limit_mb: usize, 170 - pub block_processor: fn(Vec<u8>) -> T, 171 - } 172 - 173 - impl<T: Processable> DriverBuilderWithProcessor<T> { 174 - /// Set the in-memory size limit, in MiB 175 - /// 176 - /// Default: 16 MiB 177 - pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 178 - self.mem_limit_mb = new_limit; 179 self 180 } 181 /// Begin processing an atproto MST from a CAR file 182 - pub async fn load_car<R: AsyncRead + Unpin>( 183 - &self, 184 - reader: R, 185 - ) -> Result<Driver<R, T>, DriveError> { 186 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 187 } 188 } 189 190 - impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 191 /// Begin processing an atproto MST from a CAR file 192 /// 193 /// Blocks will be loaded, processed, and buffered in memory. If the entire ··· 199 /// resumed by providing a `SqliteStorage` for on-disk block storage. 200 pub async fn load_car( 201 reader: R, 202 - process: fn(Vec<u8>) -> T, 203 mem_limit_mb: usize, 204 - ) -> Result<Driver<R, T>, DriveError> { 205 let max_size = mem_limit_mb * 2_usize.pow(20); 206 let mut mem_blocks = HashMap::new(); 207 ··· 231 let maybe_processed = MaybeProcessedBlock::maybe(process, data); 232 233 // stash (maybe processed) blocks in memory as long as we have room 234 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 235 mem_blocks.insert(cid, maybe_processed); 236 if mem_size >= max_size { 237 return Ok(Driver::Disk(NeedDisk { ··· 248 // all blocks loaded and we fit in memory! hopefully we found the commit... 249 let commit = commit.ok_or(DriveError::MissingCommit)?; 250 251 - let walker = Walker::new(commit.data); 252 253 Ok(Driver::Memory( 254 commit, ··· 275 /// work the init function will do. We can drop the CAR reader before walking, 276 /// so the sync/async boundaries become a little easier to work around. 277 #[derive(Debug)] 278 - pub struct MemDriver<T: Processable> { 279 - blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 280 walker: Walker, 281 - process: fn(Vec<u8>) -> T, 282 } 283 284 - impl<T: Processable> MemDriver<T> { 285 /// Step through the record outputs, in rkey order 286 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 287 let mut out = Vec::with_capacity(n); 288 for _ in 0..n { 289 // walk as far as we can until we run out of blocks or find a record 290 - match self.walker.step(&mut self.blocks, self.process)? { 291 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 292 - Step::Finish => break, 293 - Step::Found { rkey, data } => { 294 - out.push((rkey, data)); 295 - continue; 296 - } 297 }; 298 } 299 - 300 if out.is_empty() { 301 Ok(None) 302 } else { ··· 306 } 307 308 /// A partially memory-loaded car file that needs disk spillover to continue 309 - pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> { 310 car: CarReader<R>, 311 root: Cid, 312 - process: fn(Vec<u8>) -> T, 313 max_size: usize, 314 - mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 315 pub commit: Option<Commit>, 316 } 317 318 - fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> { 319 - bincode::serde::encode_to_vec(v, bincode::config::standard()) 320 - } 321 - 322 - pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> { 323 - let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?; 324 - if n != bytes.len() { 325 - return Err(DecodeError::ExtraGarbage); 326 - } 327 - Ok(t) 328 - } 329 - 330 - impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> { 331 pub async fn finish_loading( 332 mut self, 333 mut store: DiskStore, 334 - ) -> Result<(Commit, DiskDriver<T>), DriveError> { 335 // move store in and back out so we can manage lifetimes 336 // dump mem blocks into the store 337 store = tokio::task::spawn(async move { 338 - let mut writer = store.get_writer()?; 339 - 340 let kvs = self 341 .mem_blocks 342 .into_iter() 343 - .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 344 345 - writer.put_many(kvs)?; 346 - writer.commit()?; 347 Ok::<_, DriveError>(store) 348 }) 349 .await??; 350 351 - let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 352 353 let store_worker = tokio::task::spawn_blocking(move || { 354 - let mut writer = store.get_writer()?; 355 - 356 while let Some(chunk) = rx.blocking_recv() { 357 let kvs = chunk 358 .into_iter() 359 - .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 360 - writer.put_many(kvs)?; 361 } 362 - 363 - writer.commit()?; 364 Ok::<_, DriveError>(store) 365 }); // await later 366 ··· 379 self.commit = Some(c); 380 continue; 381 } 382 // remaining possible types: node, record, other. optimistically process 383 // TODO: get the actual in-memory size to compute disk spill 384 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 385 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 386 chunk.push((cid, maybe_processed)); 387 - if mem_size >= self.max_size { 388 // soooooo if we're setting the db cache to max_size and then letting 389 // multiple chunks in the queue that are >= max_size, then at any time 390 // we might be using some multiple of max_size? ··· 407 408 let commit = self.commit.ok_or(DriveError::MissingCommit)?; 409 410 - let walker = Walker::new(commit.data); 411 412 Ok(( 413 commit, ··· 425 } 426 427 /// MST walker that reads from disk instead of an in-memory hashmap 428 - pub struct DiskDriver<T: Clone> { 429 - process: fn(Vec<u8>) -> T, 430 state: Option<BigState>, 431 } 432 433 // for doctests only 434 #[doc(hidden)] 435 - pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> { 436 - use crate::process::noop; 437 DiskDriver { 438 process: noop, 439 state: None, 440 } 441 } 442 443 - impl<T: Processable + Send + 'static> DiskDriver<T> { 444 /// Walk the MST returning up to `n` rkey + record pairs 445 /// 446 /// ```no_run 447 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 448 /// # #[tokio::main] 449 /// # async fn main() -> Result<(), DriveError> { 450 /// # let mut disk_driver = _get_fake_disk_driver(); ··· 453 /// println!("{rkey}: size={}", record.len()); 454 /// } 455 /// } 456 - /// let store = disk_driver.reset_store().await?; 457 /// # Ok(()) 458 /// # } 459 /// ``` 460 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 461 let process = self.process; 462 463 // state should only *ever* be None transiently while inside here ··· 466 // the big pain here is that we don't want to leave self.state in an 467 // invalid state (None), so all the error paths have to make sure it 468 // comes out again. 469 - let (state, res) = tokio::task::spawn_blocking( 470 - move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 471 - let mut reader_res = state.store.get_reader(); 472 - let reader: &mut _ = match reader_res { 473 - Ok(ref mut r) => r, 474 - Err(ref mut e) => { 475 - // unfortunately we can't return the error directly because 476 - // (for some reason) it's attached to the lifetime of the 477 - // reader? 478 - // hack a mem::swap so we can get it out :/ 479 - let e_swapped = e.steal(); 480 - // the pain: `state` *has to* outlive the reader 481 - drop(reader_res); 482 - return (state, Err(e_swapped.into())); 483 - } 484 - }; 485 - 486 let mut out = Vec::with_capacity(n); 487 488 for _ in 0..n { 489 // walk as far as we can until we run out of blocks or find a record 490 - let step = match state.walker.disk_step(reader, process) { 491 Ok(s) => s, 492 Err(e) => { 493 - // the pain: `state` *has to* outlive the reader 494 - drop(reader_res); 495 return (state, Err(e.into())); 496 } 497 }; 498 - match step { 499 - Step::Missing(cid) => { 500 - // the pain: `state` *has to* outlive the reader 501 - drop(reader_res); 502 - return (state, Err(DriveError::MissingBlock(cid))); 503 - } 504 - Step::Finish => break, 505 - Step::Found { rkey, data } => out.push((rkey, data)), 506 }; 507 } 508 - 509 - // `state` *has to* outlive the reader 510 - drop(reader_res); 511 512 (state, Ok::<_, DriveError>(out)) 513 - }, 514 - ) 515 - .await?; // on tokio JoinError, we'll be left with invalid state :( 516 517 // *must* restore state before dealing with the actual result 518 self.state = Some(state); ··· 529 fn read_tx_blocking( 530 &mut self, 531 n: usize, 532 - tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 533 - ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 534 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 535 - let mut reader = match store.get_reader() { 536 - Ok(r) => r, 537 - Err(e) => return tx.blocking_send(Err(e.into())), 538 - }; 539 540 loop { 541 - let mut out: BlockChunk<T> = Vec::with_capacity(n); 542 543 for _ in 0..n { 544 // walk as far as we can until we run out of blocks or find a record 545 546 - let step = match walker.disk_step(&mut reader, self.process) { 547 Ok(s) => s, 548 Err(e) => return tx.blocking_send(Err(e.into())), 549 }; 550 551 - match step { 552 - Step::Missing(cid) => { 553 - return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 554 - } 555 - Step::Finish => return Ok(()), 556 - Step::Found { rkey, data } => { 557 - out.push((rkey, data)); 558 - continue; 559 - } 560 }; 561 } 562 563 if out.is_empty() { ··· 580 /// benefit over just using `.next_chunk(n)`. 581 /// 582 /// ```no_run 583 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 584 /// # #[tokio::main] 585 /// # async fn main() -> Result<(), DriveError> { 586 /// # let mut disk_driver = _get_fake_disk_driver(); ··· 592 /// } 593 /// 594 /// } 595 - /// let store = join.await?.reset_store().await?; 596 /// # Ok(()) 597 /// # } 598 /// ``` ··· 600 mut self, 601 n: usize, 602 ) -> ( 603 - mpsc::Receiver<Result<BlockChunk<T>, DriveError>>, 604 tokio::task::JoinHandle<Self>, 605 ) { 606 - let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1); 607 608 // sketch: this worker is going to be allowed to execute without a join handle 609 let chan_task = tokio::task::spawn_blocking(move || { ··· 614 }); 615 616 (rx, chan_task) 617 - } 618 - 619 - /// Reset the disk storage so it can be reused. You must call this. 620 - /// 621 - /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 622 - /// calls, that would be risky in an async context. For now you just have to 623 - /// carefully make sure you call it. 624 - /// 625 - /// The sqlite store is returned, so it can be reused for another 626 - /// `DiskDriver`. 627 - pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 628 - let BigState { store, .. } = self.state.take().expect("valid state"); 629 - Ok(store.reset().await?) 630 } 631 }
··· 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 3 + use crate::{ 4 + Bytes, HashMap, 5 + disk::{DiskError, DiskStore}, 6 + mst::MstNode, 7 + walk::Output, 8 + }; 9 + use cid::Cid; 10 use iroh_car::CarReader; 11 use std::convert::Infallible; 12 use tokio::{io::AsyncRead, sync::mpsc}; 13 14 + use crate::mst::Commit; 15 + use crate::walk::{WalkError, Walker}; 16 17 /// Errors that can happen while consuming and emitting blocks and records 18 #[derive(Debug, thiserror::Error)] ··· 23 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 24 #[error("The Commit block reference by the root was not found")] 25 MissingCommit, 26 #[error("Failed to walk the mst tree: {0}")] 27 WalkError(#[from] WalkError), 28 #[error("CAR file had no roots")] 29 MissingRoot, 30 #[error("Storage error")] 31 StorageError(#[from] DiskError), 32 #[error("Tried to send on a closed channel")] 33 ChannelSendError, // SendError takes <T> which we don't need 34 #[error("Failed to join a task: {0}")] 35 JoinError(#[from] tokio::task::JoinError), 36 } 37 38 /// An in-order chunk of Rkey + (processed) Block pairs 39 + pub type BlockChunk = Vec<(String, Bytes)>; 40 41 + #[derive(Debug, Clone)] 42 + pub(crate) enum MaybeProcessedBlock { 43 /// A block that's *probably* a Node (but we can't know yet) 44 /// 45 /// It *can be* a record that suspiciously looks a lot like a node, so we 46 /// cannot eagerly turn it into a Node. We only know for sure what it is 47 /// when we actually walk down the MST 48 + Raw(Bytes), 49 /// A processed record from a block that was definitely not a Node 50 /// 51 /// Processing has to be fallible because the CAR can have totally-unused ··· 61 /// There's an alternative here, which would be to kick unprocessable blocks 62 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 63 /// surface the typed error later if needed by trying to reprocess. 64 + Processed(Bytes), 65 } 66 67 + impl MaybeProcessedBlock { 68 + pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self { 69 + if MstNode::could_be(&data) { 70 MaybeProcessedBlock::Raw(data) 71 } else { 72 MaybeProcessedBlock::Processed(process(data)) 73 } 74 } 75 + pub(crate) fn len(&self) -> usize { 76 + match self { 77 + MaybeProcessedBlock::Raw(b) => b.len(), 78 + MaybeProcessedBlock::Processed(b) => b.len(), 79 + } 80 + } 81 + pub(crate) fn into_bytes(self) -> Bytes { 82 + match self { 83 + MaybeProcessedBlock::Raw(mut b) => { 84 + b.push(0x00); 85 + b 86 + } 87 + MaybeProcessedBlock::Processed(mut b) => { 88 + b.push(0x01); 89 + b 90 + } 91 + } 92 + } 93 + pub(crate) fn from_bytes(mut b: Bytes) -> Self { 94 + // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc 95 + let suffix = b.pop().unwrap(); 96 + if suffix == 0x00 { 97 + MaybeProcessedBlock::Raw(b) 98 + } else { 99 + MaybeProcessedBlock::Processed(b) 100 + } 101 + } 102 } 103 104 /// Read a CAR file, buffering blocks in memory or to disk 105 + pub enum Driver<R: AsyncRead + Unpin> { 106 /// All blocks fit within the memory limit 107 /// 108 /// You probably want to check the commit's signature. You can go ahead and 109 /// walk the MST right away. 110 + Memory(Commit, MemDriver), 111 /// Blocks exceed the memory limit 112 /// 113 /// You'll need to provide a disk storage to continue. The commit will be 114 /// returned and can be validated only once all blocks are loaded. 115 + Disk(NeedDisk<R>), 116 + } 117 + 118 + /// Processor that just returns the raw blocks 119 + #[inline] 120 + pub fn noop(block: Bytes) -> Bytes { 121 + block 122 } 123 124 /// Builder-style driver setup 125 #[derive(Debug, Clone)] 126 pub struct DriverBuilder { 127 pub mem_limit_mb: usize, 128 + pub block_processor: fn(Bytes) -> Bytes, 129 } 130 131 impl Default for DriverBuilder { 132 fn default() -> Self { 133 + Self { 134 + mem_limit_mb: 16, 135 + block_processor: noop, 136 + } 137 } 138 } 139 ··· 145 /// Set the in-memory size limit, in MiB 146 /// 147 /// Default: 16 MiB 148 + pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 149 + self.mem_limit_mb = new_limit; 150 + self 151 } 152 + 153 /// Set the block processor 154 /// 155 /// Default: noop, raw blocks will be emitted 156 + pub fn with_block_processor(mut self, new_processor: fn(Bytes) -> Bytes) -> DriverBuilder { 157 + self.block_processor = new_processor; 158 self 159 } 160 + 161 /// Begin processing an atproto MST from a CAR file 162 + pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 163 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 164 } 165 } 166 167 + impl<R: AsyncRead + Unpin> Driver<R> { 168 /// Begin processing an atproto MST from a CAR file 169 /// 170 /// Blocks will be loaded, processed, and buffered in memory. If the entire ··· 176 /// resumed by providing a `SqliteStorage` for on-disk block storage. 177 pub async fn load_car( 178 reader: R, 179 + process: fn(Bytes) -> Bytes, 180 mem_limit_mb: usize, 181 + ) -> Result<Driver<R>, DriveError> { 182 let max_size = mem_limit_mb * 2_usize.pow(20); 183 let mut mem_blocks = HashMap::new(); 184 ··· 208 let maybe_processed = MaybeProcessedBlock::maybe(process, data); 209 210 // stash (maybe processed) blocks in memory as long as we have room 211 + mem_size += maybe_processed.len(); 212 mem_blocks.insert(cid, maybe_processed); 213 if mem_size >= max_size { 214 return Ok(Driver::Disk(NeedDisk { ··· 225 // all blocks loaded and we fit in memory! hopefully we found the commit... 226 let commit = commit.ok_or(DriveError::MissingCommit)?; 227 228 + // the commit always must point to a Node; empty node => empty MST special case 229 + let root_node: MstNode = match mem_blocks 230 + .get(&commit.data) 231 + .ok_or(DriveError::MissingCommit)? 232 + { 233 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 234 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 235 + }; 236 + let walker = Walker::new(root_node); 237 238 Ok(Driver::Memory( 239 commit, ··· 260 /// work the init function will do. We can drop the CAR reader before walking, 261 /// so the sync/async boundaries become a little easier to work around. 262 #[derive(Debug)] 263 + pub struct MemDriver { 264 + blocks: HashMap<Cid, MaybeProcessedBlock>, 265 walker: Walker, 266 + process: fn(Bytes) -> Bytes, 267 } 268 269 + impl MemDriver { 270 /// Step through the record outputs, in rkey order 271 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 272 let mut out = Vec::with_capacity(n); 273 for _ in 0..n { 274 // walk as far as we can until we run out of blocks or find a record 275 + let Some(Output { rkey, cid: _, data }) = 276 + self.walker.step(&mut self.blocks, self.process)? 277 + else { 278 + break; 279 }; 280 + out.push((rkey, data)); 281 } 282 if out.is_empty() { 283 Ok(None) 284 } else { ··· 288 } 289 290 /// A partially memory-loaded car file that needs disk spillover to continue 291 + pub struct NeedDisk<R: AsyncRead + Unpin> { 292 car: CarReader<R>, 293 root: Cid, 294 + process: fn(Bytes) -> Bytes, 295 max_size: usize, 296 + mem_blocks: HashMap<Cid, MaybeProcessedBlock>, 297 pub commit: Option<Commit>, 298 } 299 300 + impl<R: AsyncRead + Unpin> NeedDisk<R> { 301 pub async fn finish_loading( 302 mut self, 303 mut store: DiskStore, 304 + ) -> Result<(Commit, DiskDriver), DriveError> { 305 // move store in and back out so we can manage lifetimes 306 // dump mem blocks into the store 307 store = tokio::task::spawn(async move { 308 let kvs = self 309 .mem_blocks 310 .into_iter() 311 + .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 312 313 + store.put_many(kvs)?; 314 Ok::<_, DriveError>(store) 315 }) 316 .await??; 317 318 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1); 319 320 let store_worker = tokio::task::spawn_blocking(move || { 321 while let Some(chunk) = rx.blocking_recv() { 322 let kvs = chunk 323 .into_iter() 324 + .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 325 + store.put_many(kvs)?; 326 } 327 Ok::<_, DriveError>(store) 328 }); // await later 329 ··· 342 self.commit = Some(c); 343 continue; 344 } 345 + 346 + let data = Bytes::from(data); 347 + 348 // remaining possible types: node, record, other. optimistically process 349 // TODO: get the actual in-memory size to compute disk spill 350 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 351 + mem_size += maybe_processed.len(); 352 chunk.push((cid, maybe_processed)); 353 + if mem_size >= (self.max_size / 2) { 354 // soooooo if we're setting the db cache to max_size and then letting 355 // multiple chunks in the queue that are >= max_size, then at any time 356 // we might be using some multiple of max_size? ··· 373 374 let commit = self.commit.ok_or(DriveError::MissingCommit)?; 375 376 + // the commit always must point to a Node; empty node => empty MST special case 377 + let db_bytes = store 378 + .get(&commit.data.to_bytes()) 379 + .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))? 380 + .ok_or(DriveError::MissingCommit)?; 381 + 382 + let node: MstNode = match MaybeProcessedBlock::from_bytes(db_bytes.to_vec()) { 383 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 384 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?, 385 + }; 386 + let walker = Walker::new(node); 387 388 Ok(( 389 commit, ··· 401 } 402 403 /// MST walker that reads from disk instead of an in-memory hashmap 404 + pub struct DiskDriver { 405 + process: fn(Bytes) -> Bytes, 406 state: Option<BigState>, 407 } 408 409 // for doctests only 410 #[doc(hidden)] 411 + pub fn _get_fake_disk_driver() -> DiskDriver { 412 DiskDriver { 413 process: noop, 414 state: None, 415 } 416 } 417 418 + impl DiskDriver { 419 /// Walk the MST returning up to `n` rkey + record pairs 420 /// 421 /// ```no_run 422 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 423 /// # #[tokio::main] 424 /// # async fn main() -> Result<(), DriveError> { 425 /// # let mut disk_driver = _get_fake_disk_driver(); ··· 428 /// println!("{rkey}: size={}", record.len()); 429 /// } 430 /// } 431 /// # Ok(()) 432 /// # } 433 /// ``` 434 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 435 let process = self.process; 436 437 // state should only *ever* be None transiently while inside here ··· 440 // the big pain here is that we don't want to leave self.state in an 441 // invalid state (None), so all the error paths have to make sure it 442 // comes out again. 443 + let (state, res) = 444 + tokio::task::spawn_blocking(move || -> (BigState, Result<BlockChunk, DriveError>) { 445 let mut out = Vec::with_capacity(n); 446 447 for _ in 0..n { 448 // walk as far as we can until we run out of blocks or find a record 449 + let step = match state.walker.disk_step(&mut state.store, process) { 450 Ok(s) => s, 451 Err(e) => { 452 return (state, Err(e.into())); 453 } 454 }; 455 + let Some(Output { rkey, cid: _, data }) = step else { 456 + break; 457 }; 458 + out.push((rkey, data)); 459 } 460 461 (state, Ok::<_, DriveError>(out)) 462 + }) 463 + .await?; // on tokio JoinError, we'll be left with invalid state :( 464 465 // *must* restore state before dealing with the actual result 466 self.state = Some(state); ··· 477 fn read_tx_blocking( 478 &mut self, 479 n: usize, 480 + tx: mpsc::Sender<Result<BlockChunk, DriveError>>, 481 + ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> { 482 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 483 484 loop { 485 + let mut out: BlockChunk = Vec::with_capacity(n); 486 487 for _ in 0..n { 488 // walk as far as we can until we run out of blocks or find a record 489 490 + let step = match walker.disk_step(store, self.process) { 491 Ok(s) => s, 492 Err(e) => return tx.blocking_send(Err(e.into())), 493 }; 494 495 + let Some(Output { rkey, cid: _, data }) = step else { 496 + break; 497 }; 498 + out.push((rkey, data)); 499 } 500 501 if out.is_empty() { ··· 518 /// benefit over just using `.next_chunk(n)`. 519 /// 520 /// ```no_run 521 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 522 /// # #[tokio::main] 523 /// # async fn main() -> Result<(), DriveError> { 524 /// # let mut disk_driver = _get_fake_disk_driver(); ··· 530 /// } 531 /// 532 /// } 533 /// # Ok(()) 534 /// # } 535 /// ``` ··· 537 mut self, 538 n: usize, 539 ) -> ( 540 + mpsc::Receiver<Result<BlockChunk, DriveError>>, 541 tokio::task::JoinHandle<Self>, 542 ) { 543 + let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1); 544 545 // sketch: this worker is going to be allowed to execute without a join handle 546 let chan_task = tokio::task::spawn_blocking(move || { ··· 551 }); 552 553 (rx, chan_task) 554 } 555 }
+19 -9
src/lib.rs
··· 27 28 match DriverBuilder::new() 29 .with_mem_limit_mb(10) 30 - .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 31 .load_car(reader) 32 .await? 33 { ··· 35 // if all blocks fit within memory 36 Driver::Memory(_commit, mut driver) => { 37 while let Some(chunk) = driver.next_chunk(256).await? { 38 - for (_rkey, size) in chunk { 39 total_size += size; 40 } 41 } ··· 49 let (_commit, mut driver) = paused.finish_loading(store).await?; 50 51 while let Some(chunk) = driver.next_chunk(256).await? { 52 - for (_rkey, size) in chunk { 53 total_size += size; 54 } 55 } 56 - 57 - // clean up the disk store (drop tables etc) 58 - driver.reset_store().await?; 59 } 60 }; 61 println!("sum of size of all records: {total_size}"); ··· 79 80 pub mod disk; 81 pub mod drive; 82 - pub mod process; 83 84 pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 - pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 86 pub use mst::Commit; 87 - pub use process::Processable;
··· 27 28 match DriverBuilder::new() 29 .with_mem_limit_mb(10) 30 + .with_block_processor( 31 + |rec| rec.len().to_ne_bytes().to_vec().into() 32 + ) // block processing: just extract the raw record size 33 .load_car(reader) 34 .await? 35 { ··· 37 // if all blocks fit within memory 38 Driver::Memory(_commit, mut driver) => { 39 while let Some(chunk) = driver.next_chunk(256).await? { 40 + for (_rkey, bytes) in chunk { 41 + 42 + let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 43 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 44 + 45 total_size += size; 46 } 47 } ··· 55 let (_commit, mut driver) = paused.finish_loading(store).await?; 56 57 while let Some(chunk) = driver.next_chunk(256).await? { 58 + for (_rkey, bytes) in chunk { 59 + 60 + let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 61 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 62 + 63 total_size += size; 64 } 65 } 66 } 67 }; 68 println!("sum of size of all records: {total_size}"); ··· 86 87 pub mod disk; 88 pub mod drive; 89 90 pub use disk::{DiskBuilder, DiskError, DiskStore}; 91 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 92 pub use mst::Commit; 93 + 94 + // pub use bytes::Bytes; 95 + pub type Bytes = Vec<u8>; 96 + 97 + pub(crate) use hashbrown::HashMap;
+151 -28
src/mst.rs
··· 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 6 - use ipld_core::cid::Cid; 7 use serde::Deserialize; 8 9 /// The top-level data object in a repository's tree is a signed commit. 10 #[derive(Debug, Deserialize)] ··· 33 pub prev: Option<Cid>, 34 /// cryptographic signature of this commit, as raw bytes 35 #[serde(with = "serde_bytes")] 36 - pub sig: Vec<u8>, 37 } 38 39 - /// MST node data schema 40 - #[derive(Debug, Deserialize, PartialEq)] 41 - #[serde(deny_unknown_fields)] 42 - pub(crate) struct Node { 43 - /// link to sub-tree Node on a lower level and with all keys sorting before 44 - /// keys at this node 45 - #[serde(rename = "l")] 46 - pub left: Option<Cid>, 47 - /// ordered list of TreeEntry objects 48 - /// 49 - /// atproto MSTs have a fanout of 4, so there can be max 4 entries. 50 - #[serde(rename = "e")] 51 - pub entries: Vec<Entry>, // maybe we can do [Option<Entry>; 4]? 52 } 53 54 - impl Node { 55 /// test if a block could possibly be a node 56 /// 57 /// we can't eagerly decode records except where we're *sure* they cannot be ··· 62 /// so if a block *could be* a node, any record converter must postpone 63 /// processing. if it turns out it happens to be a very node-looking record, 64 /// well, sorry, it just has to only be processed later when that's known. 65 pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 66 const NODE_FINGERPRINT: [u8; 3] = [ 67 0xA2, // map length 2 (for "l" and "e" keys) ··· 76 .map(|b| b & 0b1110_0000 == 0x80) 77 .unwrap_or(false) 78 } 79 - 80 - /// Check if a node has any entries 81 - /// 82 - /// An empty repository with no records is represented as a single MST node 83 - /// with an empty array of entries. This is the only situation in which a 84 - /// tree may contain an empty leaf node which does not either contain keys 85 - /// ("entries") or point to a sub-tree containing entries. 86 - pub(crate) fn is_empty(&self) -> bool { 87 - self.left.is_none() && self.entries.is_empty() 88 - } 89 } 90 91 /// TreeEntry object ··· 96 #[serde(rename = "p")] 97 pub prefix_len: usize, 98 /// remainder of key for this TreeEntry, after "prefixlen" have been removed 99 - #[serde(rename = "k", with = "serde_bytes")] 100 - pub keysuffix: Vec<u8>, // can we String this here? 101 /// link to the record data (CBOR) for this entry 102 #[serde(rename = "v")] 103 pub value: Cid,
··· 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 6 + use cid::Cid; 7 use serde::Deserialize; 8 + use sha2::{Digest, Sha256}; 9 10 /// The top-level data object in a repository's tree is a signed commit. 11 #[derive(Debug, Deserialize)] ··· 34 pub prev: Option<Cid>, 35 /// cryptographic signature of this commit, as raw bytes 36 #[serde(with = "serde_bytes")] 37 + pub sig: serde_bytes::ByteBuf, 38 + } 39 + 40 + use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor}; 41 + use std::fmt; 42 + 43 + pub type Depth = u32; 44 + 45 + #[inline(always)] 46 + pub fn atproto_mst_depth(key: &str) -> Depth { 47 + // 128 bits oughta be enough: https://bsky.app/profile/retr0.id/post/3jwwbf4izps24 48 + u128::from_be_bytes(Sha256::digest(key).split_at(16).0.try_into().unwrap()).leading_zeros() / 2 49 + } 50 + 51 + #[derive(Debug)] 52 + pub(crate) struct MstNode { 53 + pub depth: Option<Depth>, // known for nodes with entries (required for root) 54 + pub things: Vec<NodeThing>, 55 + } 56 + 57 + #[derive(Debug)] 58 + pub(crate) struct NodeThing { 59 + pub(crate) cid: Cid, 60 + pub(crate) kind: ThingKind, 61 + } 62 + 63 + #[derive(Debug)] 64 + pub(crate) enum ThingKind { 65 + Tree, 66 + Value { rkey: String }, 67 } 68 69 + impl<'de> Deserialize<'de> for MstNode { 70 + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 71 + where 72 + D: Deserializer<'de>, 73 + { 74 + struct NodeVisitor; 75 + impl<'de> Visitor<'de> for NodeVisitor { 76 + type Value = MstNode; 77 + 78 + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 79 + formatter.write_str("struct MstNode") 80 + } 81 + 82 + fn visit_map<V>(self, mut map: V) -> Result<MstNode, V::Error> 83 + where 84 + V: MapAccess<'de>, 85 + { 86 + let mut found_left = false; 87 + let mut left = None; 88 + let mut found_entries = false; 89 + let mut things = Vec::new(); 90 + let mut depth = None; 91 + 92 + while let Some(key) = map.next_key()? { 93 + match key { 94 + "l" => { 95 + if found_left { 96 + return Err(de::Error::duplicate_field("l")); 97 + } 98 + found_left = true; 99 + if let Some(cid) = map.next_value()? { 100 + left = Some(NodeThing { 101 + cid, 102 + kind: ThingKind::Tree, 103 + }); 104 + } 105 + } 106 + "e" => { 107 + if found_entries { 108 + return Err(de::Error::duplicate_field("e")); 109 + } 110 + found_entries = true; 111 + 112 + let mut prefix: Vec<u8> = vec![]; 113 + 114 + for entry in map.next_value::<Vec<Entry>>()? { 115 + let mut rkey: Vec<u8> = vec![]; 116 + let pre_checked = 117 + prefix.get(..entry.prefix_len).ok_or_else(|| { 118 + de::Error::invalid_value( 119 + Unexpected::Bytes(&prefix), 120 + &"a prefix at least as long as the prefix_len", 121 + ) 122 + })?; 123 + 124 + rkey.extend_from_slice(pre_checked); 125 + rkey.extend_from_slice(&entry.keysuffix); 126 + 127 + let rkey_s = String::from_utf8(rkey.clone()).map_err(|_| { 128 + de::Error::invalid_value( 129 + Unexpected::Bytes(&rkey), 130 + &"a valid utf-8 rkey", 131 + ) 132 + })?; 133 + 134 + let key_depth = atproto_mst_depth(&rkey_s); 135 + if depth.is_none() { 136 + depth = Some(key_depth); 137 + } else if Some(key_depth) != depth { 138 + return Err(de::Error::invalid_value( 139 + Unexpected::Bytes(&prefix), 140 + &"all rkeys to have equal MST depth", 141 + )); 142 + } 143 + 144 + things.push(NodeThing { 145 + cid: entry.value, 146 + kind: ThingKind::Value { rkey: rkey_s }, 147 + }); 148 + 149 + if let Some(cid) = entry.tree { 150 + things.push(NodeThing { 151 + cid, 152 + kind: ThingKind::Tree, 153 + }); 154 + } 155 + 156 + prefix = rkey; 157 + } 158 + } 159 + f => return Err(de::Error::unknown_field(f, NODE_FIELDS)), 160 + } 161 + } 162 + if !found_left { 163 + return Err(de::Error::missing_field("l")); 164 + } 165 + if !found_entries { 166 + return Err(de::Error::missing_field("e")); 167 + } 168 + 169 + things.reverse(); 170 + if let Some(l) = left { 171 + things.push(l); 172 + } 173 + 174 + Ok(MstNode { depth, things }) 175 + } 176 + } 177 + 178 + const NODE_FIELDS: &[&str] = &["l", "e"]; 179 + deserializer.deserialize_struct("MstNode", NODE_FIELDS, NodeVisitor) 180 + } 181 } 182 183 + impl MstNode { 184 + pub(crate) fn is_empty(&self) -> bool { 185 + self.things.is_empty() 186 + } 187 /// test if a block could possibly be a node 188 /// 189 /// we can't eagerly decode records except where we're *sure* they cannot be ··· 194 /// so if a block *could be* a node, any record converter must postpone 195 /// processing. if it turns out it happens to be a very node-looking record, 196 /// well, sorry, it just has to only be processed later when that's known. 197 + #[inline(always)] 198 pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 199 const NODE_FINGERPRINT: [u8; 3] = [ 200 0xA2, // map length 2 (for "l" and "e" keys) ··· 209 .map(|b| b & 0b1110_0000 == 0x80) 210 .unwrap_or(false) 211 } 212 } 213 214 /// TreeEntry object ··· 219 #[serde(rename = "p")] 220 pub prefix_len: usize, 221 /// remainder of key for this TreeEntry, after "prefixlen" have been removed 222 + #[serde(rename = "k")] 223 + pub keysuffix: serde_bytes::ByteBuf, 224 /// link to the record data (CBOR) for this entry 225 #[serde(rename = "v")] 226 pub value: Cid,
-108
src/process.rs
··· 1 - /*! 2 - Record processor function output trait 3 - 4 - The return type must satisfy the `Processable` trait, which requires: 5 - 6 - - `Clone` because two rkeys can refer to the same record by CID, which may 7 - only appear once in the CAR file. 8 - - `Serialize + DeserializeOwned` so it can be spilled to disk. 9 - 10 - One required function must be implemented, `get_size()`: this should return the 11 - approximate total off-stack size of the type. (the on-stack size will be added 12 - automatically via `std::mem::get_size`). 13 - 14 - Note that it is **not guaranteed** that the `process` function will run on a 15 - block before storing it in memory or on disk: it's not possible to know if a 16 - block is a record without actually walking the MST, so the best we can do is 17 - apply `process` to any block that we know *cannot* be an MST node, and otherwise 18 - store the raw block bytes. 19 - 20 - Here's a silly processing function that just collects 'eyy's found in the raw 21 - record bytes 22 - 23 - ``` 24 - # use repo_stream::Processable; 25 - # use serde::{Serialize, Deserialize}; 26 - #[derive(Debug, Clone, Serialize, Deserialize)] 27 - struct Eyy(usize, String); 28 - 29 - impl Processable for Eyy { 30 - fn get_size(&self) -> usize { 31 - // don't need to compute the usize, it's on the stack 32 - self.1.capacity() // in-mem size from the string's capacity, in bytes 33 - } 34 - } 35 - 36 - fn process(raw: Vec<u8>) -> Vec<Eyy> { 37 - let mut out = Vec::new(); 38 - let to_find = "eyy".as_bytes(); 39 - for i in 0..(raw.len() - 3) { 40 - if &raw[i..(i+3)] == to_find { 41 - out.push(Eyy(i, "eyy".to_string())); 42 - } 43 - } 44 - out 45 - } 46 - ``` 47 - 48 - The memory sizing stuff is a little sketch but probably at least approximately 49 - works. 50 - */ 51 - 52 - use serde::{Serialize, de::DeserializeOwned}; 53 - 54 - /// Output trait for record processing 55 - pub trait Processable: Clone + Serialize + DeserializeOwned { 56 - /// Any additional in-memory size taken by the processed type 57 - /// 58 - /// Do not include stack size (`std::mem::size_of`) 59 - fn get_size(&self) -> usize; 60 - } 61 - 62 - /// Processor that just returns the raw blocks 63 - #[inline] 64 - pub fn noop(block: Vec<u8>) -> Vec<u8> { 65 - block 66 - } 67 - 68 - impl Processable for u8 { 69 - fn get_size(&self) -> usize { 70 - 0 71 - } 72 - } 73 - 74 - impl Processable for usize { 75 - fn get_size(&self) -> usize { 76 - 0 // no additional space taken, just its stack size (newtype is free) 77 - } 78 - } 79 - 80 - impl Processable for String { 81 - fn get_size(&self) -> usize { 82 - self.capacity() 83 - } 84 - } 85 - 86 - impl<Item: Sized + Processable> Processable for Vec<Item> { 87 - fn get_size(&self) -> usize { 88 - let slot_size = std::mem::size_of::<Item>(); 89 - let direct_size = slot_size * self.capacity(); 90 - let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 91 - direct_size + items_referenced_size 92 - } 93 - } 94 - 95 - impl<Item: Processable> Processable for Option<Item> { 96 - fn get_size(&self) -> usize { 97 - self.as_ref().map(|item| item.get_size()).unwrap_or(0) 98 - } 99 - } 100 - 101 - impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 102 - fn get_size(&self) -> usize { 103 - match self { 104 - Ok(item) => item.get_size(), 105 - Err(err) => err.get_size(), 106 - } 107 - } 108 - }
···
+105 -345
src/walk.rs
··· 1 //! Depth-first MST traversal 2 3 - use crate::disk::SqliteReader; 4 - use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 - use crate::mst::Node; 6 - use crate::process::Processable; 7 - use ipld_core::cid::Cid; 8 - use sha2::{Digest, Sha256}; 9 - use std::collections::HashMap; 10 use std::convert::Infallible; 11 12 /// Errors that can happen while walking ··· 19 #[error("Action node error: {0}")] 20 MstError(#[from] MstError), 21 #[error("storage error: {0}")] 22 - StorageError(#[from] rusqlite::Error), 23 - #[error("Decode error: {0}")] 24 - DecodeError(#[from] DecodeError), 25 } 26 27 /// Errors from invalid Rkeys 28 #[derive(Debug, PartialEq, thiserror::Error)] 29 pub enum MstError { 30 - #[error("Failed to compute an rkey due to invalid prefix_len")] 31 - EntryPrefixOutOfbounds, 32 - #[error("RKey was not utf-8")] 33 - EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 34 #[error("Nodes cannot be empty (except for an entirely empty MST)")] 35 EmptyNode, 36 - #[error("Found an entry with rkey at the wrong depth")] 37 - WrongDepth, 38 - #[error("Lost track of our depth (possible bug?)")] 39 - LostDepth, 40 #[error("MST depth underflow: depth-0 node with child trees")] 41 DepthUnderflow, 42 - #[error("Encountered an rkey out of order while walking the MST")] 43 - RkeyOutOfOrder, 44 } 45 46 /// Walker outputs 47 - #[derive(Debug)] 48 - pub enum Step<T> { 49 - /// We needed this CID but it's not in the block store 50 - Missing(Cid), 51 - /// Reached the end of the MST! yay! 52 - Finish, 53 - /// A record was found! 54 - Found { rkey: String, data: T }, 55 - } 56 - 57 - #[derive(Debug, Clone, PartialEq)] 58 - enum Need { 59 - Node { depth: Depth, cid: Cid }, 60 - Record { rkey: String, cid: Cid }, 61 - } 62 - 63 - #[derive(Debug, Clone, Copy, PartialEq)] 64 - enum Depth { 65 - Root, 66 - Depth(u32), 67 - } 68 - 69 - impl Depth { 70 - fn from_key(key: &[u8]) -> Self { 71 - let mut zeros = 0; 72 - for byte in Sha256::digest(key) { 73 - let leading = byte.leading_zeros(); 74 - zeros += leading; 75 - if leading < 8 { 76 - break; 77 - } 78 - } 79 - Self::Depth(zeros / 2) // truncating divide (rounds down) 80 - } 81 - fn next_expected(&self) -> Result<Option<u32>, MstError> { 82 - match self { 83 - Self::Root => Ok(None), 84 - Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 85 - } 86 - } 87 - } 88 - 89 - fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 - // empty nodes are not allowed in the MST except in an empty MST 91 - if node.is_empty() { 92 - if parent_depth == Depth::Root { 93 - return Ok(()); // empty mst, nothing to push 94 - } else { 95 - return Err(MstError::EmptyNode); 96 - } 97 - } 98 - 99 - let mut entries = Vec::with_capacity(node.entries.len()); 100 - let mut prefix = vec![]; 101 - let mut this_depth = parent_depth.next_expected()?; 102 - 103 - for entry in &node.entries { 104 - let mut rkey = vec![]; 105 - let pre_checked = prefix 106 - .get(..entry.prefix_len) 107 - .ok_or(MstError::EntryPrefixOutOfbounds)?; 108 - rkey.extend_from_slice(pre_checked); 109 - rkey.extend_from_slice(&entry.keysuffix); 110 - 111 - let Depth::Depth(key_depth) = Depth::from_key(&rkey) else { 112 - return Err(MstError::WrongDepth); 113 - }; 114 - 115 - // this_depth is `none` if we are the deepest child (directly below root) 116 - // in that case we accept whatever highest depth is claimed 117 - let expected_depth = match this_depth { 118 - Some(d) => d, 119 - None => { 120 - this_depth = Some(key_depth); 121 - key_depth 122 - } 123 - }; 124 - 125 - // all keys we find should be this depth 126 - if key_depth != expected_depth { 127 - return Err(MstError::DepthUnderflow); 128 - } 129 - 130 - prefix = rkey.clone(); 131 - 132 - entries.push(Need::Record { 133 - rkey: String::from_utf8(rkey)?, 134 - cid: entry.value, 135 - }); 136 - if let Some(ref tree) = entry.tree { 137 - entries.push(Need::Node { 138 - depth: Depth::Depth(key_depth), 139 - cid: *tree, 140 - }); 141 - } 142 - } 143 - 144 - entries.reverse(); 145 - stack.append(&mut entries); 146 - 147 - let d = this_depth.ok_or(MstError::LostDepth)?; 148 - 149 - if let Some(tree) = node.left { 150 - stack.push(Need::Node { 151 - depth: Depth::Depth(d), 152 - cid: tree, 153 - }); 154 - } 155 - Ok(()) 156 } 157 158 /// Traverser of an atproto MST ··· 160 /// Walks the tree from left-to-right in depth-first order 161 #[derive(Debug)] 162 pub struct Walker { 163 - stack: Vec<Need>, 164 - prev: String, 165 } 166 167 impl Walker { 168 - pub fn new(tree_root_cid: Cid) -> Self { 169 Self { 170 - stack: vec![Need::Node { 171 - depth: Depth::Root, 172 - cid: tree_root_cid, 173 - }], 174 - prev: "".to_string(), 175 } 176 } 177 178 - /// Advance through nodes until we find a record or can't go further 179 - pub fn step<T: Processable>( 180 &mut self, 181 - blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 182 - process: impl Fn(Vec<u8>) -> T, 183 - ) -> Result<Step<T>, WalkError> { 184 - loop { 185 - let Some(need) = self.stack.last_mut() else { 186 - log::trace!("tried to walk but we're actually done."); 187 - return Ok(Step::Finish); 188 - }; 189 - 190 - match need { 191 - &mut Need::Node { depth, cid } => { 192 - log::trace!("need node {cid:?}"); 193 - let Some(block) = blocks.remove(&cid) else { 194 - log::trace!("node not found, resting"); 195 - return Ok(Step::Missing(cid)); 196 - }; 197 - 198 - let MaybeProcessedBlock::Raw(data) = block else { 199 - return Err(WalkError::BadCommitFingerprint); 200 - }; 201 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 202 - .map_err(WalkError::BadCommit)?; 203 - 204 - // found node, make sure we remember 205 - self.stack.pop(); 206 207 - // queue up work on the found node next 208 - push_from_node(&mut self.stack, &node, depth)?; 209 } 210 - Need::Record { rkey, cid } => { 211 - log::trace!("need record {cid:?}"); 212 - // note that we cannot *remove* a record block, sadly, since 213 - // there can be multiple rkeys pointing to the same cid. 214 - let Some(data) = blocks.get_mut(cid) else { 215 - return Ok(Step::Missing(*cid)); 216 - }; 217 - let rkey = rkey.clone(); 218 - let data = match data { 219 - MaybeProcessedBlock::Raw(data) => process(data.to_vec()), 220 - MaybeProcessedBlock::Processed(t) => t.clone(), 221 - }; 222 - 223 - // found node, make sure we remember 224 - self.stack.pop(); 225 - 226 - // rkeys *must* be in order or else the tree is invalid (or 227 - // we have a bug) 228 - if rkey <= self.prev { 229 - return Err(MstError::RkeyOutOfOrder)?; 230 - } 231 - self.prev = rkey.clone(); 232 233 - return Ok(Step::Found { rkey, data }); 234 - } 235 } 236 - } 237 - } 238 239 - /// blocking!!!!!! 240 - pub fn disk_step<T: Processable>( 241 - &mut self, 242 - reader: &mut SqliteReader, 243 - process: impl Fn(Vec<u8>) -> T, 244 - ) -> Result<Step<T>, WalkError> { 245 - loop { 246 - let Some(need) = self.stack.last_mut() else { 247 - log::trace!("tried to walk but we're actually done."); 248 - return Ok(Step::Finish); 249 - }; 250 251 - match need { 252 - &mut Need::Node { depth, cid } => { 253 - let cid_bytes = cid.to_bytes(); 254 - log::trace!("need node {cid:?}"); 255 - let Some(block_bytes) = reader.get(cid_bytes)? else { 256 - log::trace!("node not found, resting"); 257 - return Ok(Step::Missing(cid)); 258 - }; 259 260 - let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 261 - 262 - let MaybeProcessedBlock::Raw(data) = block else { 263 - return Err(WalkError::BadCommitFingerprint); 264 - }; 265 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 266 - .map_err(WalkError::BadCommit)?; 267 - 268 - // found node, make sure we remember 269 - self.stack.pop(); 270 - 271 - // queue up work on the found node next 272 - push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?; 273 } 274 - Need::Record { rkey, cid } => { 275 - log::trace!("need record {cid:?}"); 276 - let cid_bytes = cid.to_bytes(); 277 - let Some(data_bytes) = reader.get(cid_bytes)? else { 278 - log::trace!("record block not found, resting"); 279 - return Ok(Step::Missing(*cid)); 280 - }; 281 - let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?; 282 - let rkey = rkey.clone(); 283 - let data = match data { 284 - MaybeProcessedBlock::Raw(data) => process(data), 285 - MaybeProcessedBlock::Processed(t) => t.clone(), 286 - }; 287 288 - // found node, make sure we remember 289 - self.stack.pop(); 290 - 291 - log::trace!("emitting a block as a step. depth={}", self.stack.len()); 292 - 293 - // rkeys *must* be in order or else the tree is invalid (or 294 - // we have a bug) 295 - if rkey <= self.prev { 296 - return Err(MstError::RkeyOutOfOrder)?; 297 - } 298 - self.prev = rkey.clone(); 299 - 300 - return Ok(Step::Found { rkey, data }); 301 - } 302 } 303 } 304 } 305 - } 306 307 - #[cfg(test)] 308 - mod test { 309 - use super::*; 310 - 311 - fn cid1() -> Cid { 312 - "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 313 - .parse() 314 - .unwrap() 315 } 316 317 - #[test] 318 - fn test_depth_spec_0() { 319 - let d = Depth::from_key(b"2653ae71"); 320 - assert_eq!(d, Depth::Depth(0)) 321 - } 322 - 323 - #[test] 324 - fn test_depth_spec_1() { 325 - let d = Depth::from_key(b"blue"); 326 - assert_eq!(d, Depth::Depth(1)) 327 - } 328 - 329 - #[test] 330 - fn test_depth_spec_4() { 331 - let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec"); 332 - assert_eq!(d, Depth::Depth(4)) 333 - } 334 - 335 - #[test] 336 - fn test_depth_spec_8() { 337 - let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c"); 338 - assert_eq!(d, Depth::Depth(8)) 339 - } 340 - 341 - #[test] 342 - fn test_depth_ietf_draft_0() { 343 - let d = Depth::from_key(b"key1"); 344 - assert_eq!(d, Depth::Depth(0)) 345 - } 346 - 347 - #[test] 348 - fn test_depth_ietf_draft_1() { 349 - let d = Depth::from_key(b"key7"); 350 - assert_eq!(d, Depth::Depth(1)) 351 - } 352 - 353 - #[test] 354 - fn test_depth_ietf_draft_4() { 355 - let d = Depth::from_key(b"key515"); 356 - assert_eq!(d, Depth::Depth(4)) 357 - } 358 - 359 - #[test] 360 - fn test_depth_interop() { 361 - // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 362 - for (k, expected) in [ 363 - ("", 0), 364 - ("asdf", 0), 365 - ("blue", 1), 366 - ("2653ae71", 0), 367 - ("88bfafc7", 2), 368 - ("2a92d355", 4), 369 - ("884976f5", 6), 370 - ("app.bsky.feed.post/454397e440ec", 4), 371 - ("app.bsky.feed.post/9adeb165882c", 8), 372 - ] { 373 - let d = Depth::from_key(k.as_bytes()); 374 - assert_eq!(d, Depth::Depth(expected), "key: {}", k); 375 } 376 - } 377 - 378 - #[test] 379 - fn test_push_empty_fails() { 380 - let empty_node = Node { 381 - left: None, 382 - entries: vec![], 383 - }; 384 - let mut stack = vec![]; 385 - let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 386 - assert_eq!(err, Err(MstError::EmptyNode)); 387 } 388 389 - #[test] 390 - fn test_push_one_node() { 391 - let node = Node { 392 - left: Some(cid1()), 393 - entries: vec![], 394 - }; 395 - let mut stack = vec![]; 396 - push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 397 - assert_eq!( 398 - stack.last(), 399 - Some(Need::Node { 400 - depth: Depth::Depth(3), 401 - cid: cid1() 402 - }) 403 - .as_ref() 404 - ); 405 } 406 }
··· 1 //! Depth-first MST traversal 2 3 + use crate::mst::{Depth, MstNode, NodeThing, ThingKind}; 4 + use crate::{Bytes, HashMap, disk::DiskStore, drive::MaybeProcessedBlock}; 5 + use cid::Cid; 6 use std::convert::Infallible; 7 8 /// Errors that can happen while walking ··· 15 #[error("Action node error: {0}")] 16 MstError(#[from] MstError), 17 #[error("storage error: {0}")] 18 + StorageError(#[from] fjall::Error), 19 + #[error("block not found: {0}")] 20 + MissingBlock(Cid), 21 } 22 23 /// Errors from invalid Rkeys 24 #[derive(Debug, PartialEq, thiserror::Error)] 25 pub enum MstError { 26 #[error("Nodes cannot be empty (except for an entirely empty MST)")] 27 EmptyNode, 28 + #[error("Expected node to be at depth {expected}, but it was at {depth}")] 29 + WrongDepth { depth: Depth, expected: Depth }, 30 #[error("MST depth underflow: depth-0 node with child trees")] 31 DepthUnderflow, 32 + #[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")] 33 + RkeyOutOfOrder { prev: String, rkey: String }, 34 } 35 36 /// Walker outputs 37 + #[derive(Debug, PartialEq)] 38 + pub struct Output { 39 + pub rkey: String, 40 + pub cid: Cid, 41 + pub data: Bytes, 42 } 43 44 /// Traverser of an atproto MST ··· 46 /// Walks the tree from left-to-right in depth-first order 47 #[derive(Debug)] 48 pub struct Walker { 49 + prev_rkey: String, 50 + root_depth: Depth, 51 + todo: Vec<Vec<NodeThing>>, 52 } 53 54 impl Walker { 55 + pub fn new(root_node: MstNode) -> Self { 56 Self { 57 + prev_rkey: "".to_string(), 58 + root_depth: root_node.depth.unwrap_or(0), // empty root node = empty mst 59 + todo: vec![root_node.things], 60 } 61 } 62 63 + fn mpb_step( 64 &mut self, 65 + kind: ThingKind, 66 + cid: Cid, 67 + mpb: &MaybeProcessedBlock, 68 + process: impl Fn(Bytes) -> Bytes, 69 + ) -> Result<Option<Output>, WalkError> { 70 + match kind { 71 + ThingKind::Value { rkey } => { 72 + let data = match mpb { 73 + MaybeProcessedBlock::Raw(data) => process(data.clone()), 74 + MaybeProcessedBlock::Processed(t) => t.clone(), 75 + }; 76 77 + if rkey <= self.prev_rkey { 78 + return Err(WalkError::MstError(MstError::RkeyOutOfOrder { 79 + rkey, 80 + prev: self.prev_rkey.clone(), 81 + })); 82 } 83 + self.prev_rkey = rkey.clone(); 84 85 + log::trace!("val @ {rkey}"); 86 + Ok(Some(Output { rkey, cid, data })) 87 } 88 + ThingKind::Tree => { 89 + let MaybeProcessedBlock::Raw(data) = mpb else { 90 + return Err(WalkError::BadCommitFingerprint); 91 + }; 92 93 + let node: MstNode = 94 + serde_ipld_dagcbor::from_slice(data).map_err(WalkError::BadCommit)?; 95 96 + if node.is_empty() { 97 + return Err(WalkError::MstError(MstError::EmptyNode)); 98 + } 99 100 + let current_depth = self.root_depth - (self.todo.len() - 1) as u32; 101 + let next_depth = current_depth 102 + .checked_sub(1) 103 + .ok_or(MstError::DepthUnderflow)?; 104 + if let Some(d) = node.depth 105 + && d != next_depth 106 + { 107 + return Err(WalkError::MstError(MstError::WrongDepth { 108 + depth: d, 109 + expected: next_depth, 110 + })); 111 } 112 113 + log::trace!("node into depth {next_depth}"); 114 + self.todo.push(node.things); 115 + Ok(None) 116 } 117 } 118 } 119 120 + #[inline(always)] 121 + fn next_todo(&mut self) -> Option<NodeThing> { 122 + while let Some(last) = self.todo.last_mut() { 123 + let Some(thing) = last.pop() else { 124 + self.todo.pop(); 125 + continue; 126 + }; 127 + return Some(thing); 128 + } 129 + None 130 } 131 132 + /// Advance through nodes until we find a record or can't go further 133 + pub fn step( 134 + &mut self, 135 + blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 136 + process: impl Fn(Bytes) -> Bytes, 137 + ) -> Result<Option<Output>, WalkError> { 138 + while let Some(NodeThing { cid, kind }) = self.next_todo() { 139 + let Some(mpb) = blocks.get(&cid) else { 140 + return Err(WalkError::MissingBlock(cid)); 141 + }; 142 + if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? { 143 + return Ok(Some(out)); 144 + } 145 } 146 + Ok(None) 147 } 148 149 + /// blocking!!!!!! 150 + pub fn disk_step( 151 + &mut self, 152 + blocks: &mut DiskStore, 153 + process: impl Fn(Bytes) -> Bytes, 154 + ) -> Result<Option<Output>, WalkError> { 155 + while let Some(NodeThing { cid, kind }) = self.next_todo() { 156 + let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 157 + return Err(WalkError::MissingBlock(cid)); 158 + }; 159 + let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 160 + if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? { 161 + return Ok(Some(out)); 162 + } 163 + } 164 + Ok(None) 165 } 166 }
+212
tests/mst-depth.rs
···
··· 1 + // use repo_stream::Driver; 2 + use repo_stream::mst::atproto_mst_depth; 3 + 4 + // https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/example_keys.txt 5 + const INTEROP_EXAMPLE_KEYS: &str = "\ 6 + A0/374913 7 + A1/076595 8 + A2/827942 9 + A3/578971 10 + A4/055903 11 + A5/518415 12 + B0/601692 13 + B1/986427 14 + B2/827649 15 + B3/095483 16 + B4/774183 17 + B5/116729 18 + C0/451630 19 + C1/438573 20 + C2/014073 21 + C3/564755 22 + C4/134079 23 + C5/141153 24 + D0/952776 25 + D1/834852 26 + D2/269196 27 + D3/038750 28 + D4/052059 29 + D5/563177 30 + E0/670489 31 + E1/091396 32 + E2/819540 33 + E3/391311 34 + E4/820614 35 + E5/512478 36 + F0/697858 37 + F1/085263 38 + F2/483591 39 + F3/409933 40 + F4/789697 41 + F5/271416 42 + G0/765327 43 + G1/209912 44 + G2/611528 45 + G3/649394 46 + G4/585887 47 + G5/298495 48 + H0/131238 49 + H1/566929 50 + H2/618272 51 + H3/500151 52 + H4/841548 53 + H5/642354 54 + I0/536928 55 + I1/525517 56 + I2/800680 57 + I3/818503 58 + I4/561177 59 + I5/010047 60 + J0/453243 61 + J1/217783 62 + J2/960389 63 + J3/501274 64 + J4/042054 65 + J5/743154 66 + K0/125271 67 + K1/317361 68 + K2/453868 69 + K3/214010 70 + K4/164720 71 + K5/177856 72 + L0/502889 73 + L1/574576 74 + L2/596333 75 + L3/683657 76 + L4/724989 77 + L5/093883 78 + M0/141744 79 + M1/643368 80 + M2/919782 81 + M3/836327 82 + M4/177463 83 + M5/563354 84 + N0/370604 85 + N1/563732 86 + N2/177587 87 + N3/678428 88 + N4/599183 89 + N5/567564 90 + O0/523870 91 + O1/052141 92 + O2/037651 93 + O3/773808 94 + O4/140952 95 + O5/318605 96 + P0/133157 97 + P1/394633 98 + P2/521462 99 + P3/493488 100 + P4/908754 101 + P5/109455 102 + Q0/835234 103 + Q1/131542 104 + Q2/680035 105 + Q3/253381 106 + Q4/019053 107 + Q5/658167 108 + R0/129386 109 + R1/363149 110 + R2/742766 111 + R3/039235 112 + R4/482275 113 + R5/817312 114 + S0/340283 115 + S1/561525 116 + S2/914574 117 + S3/909434 118 + S4/789708 119 + S5/803866 120 + T0/255204 121 + T1/716687 122 + T2/256231 123 + T3/054247 124 + T4/419247 125 + T5/509584 126 + U0/298296 127 + U1/851680 128 + U2/342856 129 + U3/597327 130 + U4/311686 131 + U5/030156 132 + V0/221100 133 + V1/741554 134 + V2/267990 135 + V3/674163 136 + V4/739931 137 + V5/573718 138 + W0/034202 139 + W1/697411 140 + W2/460313 141 + W3/189647 142 + W4/847299 143 + W5/648086 144 + X0/287498 145 + X1/044093 146 + X2/613770 147 + X3/577587 148 + X4/779391 149 + X5/339246 150 + Y0/986350 151 + Y1/044567 152 + Y2/478044 153 + Y3/757097 154 + Y4/396913 155 + Y5/802264 156 + Z0/425878 157 + Z1/127557 158 + Z2/441927 159 + Z3/064474 160 + Z4/888344 161 + Z5/977983"; 162 + 163 + #[test] 164 + fn test_interop_example_keys() { 165 + for key in INTEROP_EXAMPLE_KEYS.split('\n') { 166 + let expected: u32 = key.chars().nth(1).unwrap().to_digit(16).unwrap(); 167 + let computed: u32 = atproto_mst_depth(key); 168 + assert_eq!(computed, expected); 169 + } 170 + } 171 + 172 + #[test] 173 + fn test_iterop_key_heights() { 174 + // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 175 + for (key, expected) in [ 176 + ("", 0), 177 + ("asdf", 0), 178 + ("blue", 1), 179 + ("2653ae71", 0), 180 + ("88bfafc7", 2), 181 + ("2a92d355", 4), 182 + ("884976f5", 6), 183 + ("app.bsky.feed.post/454397e440ec", 4), 184 + ("app.bsky.feed.post/9adeb165882c", 8), 185 + ] { 186 + let computed = atproto_mst_depth(key); 187 + assert_eq!(computed, expected); 188 + } 189 + } 190 + 191 + #[test] 192 + fn test_spec_example_keys() { 193 + // https://atproto.com/specs/repository#mst-structure 194 + for (key, expected) in [ 195 + ("2653ae71", 0), 196 + ("blue", 1), 197 + ("app.bsky.feed.post/454397e440ec", 4), 198 + ("app.bsky.feed.post/9adeb165882c", 8), 199 + ] { 200 + let computed = atproto_mst_depth(key); 201 + assert_eq!(computed, expected); 202 + } 203 + } 204 + 205 + #[test] 206 + fn test_ietf_example_keys() { 207 + // https://atproto.com/specs/repository#mst-structure 208 + for (key, expected) in [("key1", 0), ("key7", 1), ("key515", 4)] { 209 + let computed = atproto_mst_depth(key); 210 + assert_eq!(computed, expected); 211 + } 212 + }
+12 -4
tests/non-huge-cars.rs
··· 12 expected_sum: usize, 13 expect_profile: bool, 14 ) { 15 - let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 16 - .await 17 - .unwrap() 18 { 19 Driver::Memory(_commit, mem_driver) => mem_driver, 20 Driver::Disk(_) => panic!("too big"), ··· 26 let mut prev_rkey = "".to_string(); 27 28 while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 29 - for (rkey, size) in pairs { 30 records += 1; 31 sum += size; 32 if rkey == "app.bsky.actor.profile/self" { 33 found_bsky_profile = true;
··· 12 expected_sum: usize, 13 expect_profile: bool, 14 ) { 15 + let mut driver = match Driver::load_car( 16 + bytes, 17 + |block| block.len().to_ne_bytes().to_vec().into(), 18 + 10, /* MiB */ 19 + ) 20 + .await 21 + .unwrap() 22 { 23 Driver::Memory(_commit, mem_driver) => mem_driver, 24 Driver::Disk(_) => panic!("too big"), ··· 30 let mut prev_rkey = "".to_string(); 31 32 while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 33 + for (rkey, bytes) in pairs { 34 records += 1; 35 + 36 + let (int_bytes, _) = bytes.split_at(size_of::<usize>()); 37 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 38 + 39 sum += size; 40 if rkey == "app.bsky.actor.profile/self" { 41 found_bsky_profile = true;

History

2 rounds 0 comments
sign up or login to add to the discussion
21 commits
expand
fjall 3.0.0 buggy
sqlite -> fjall 2 for ~2x speedup
fjall v3
some settings that might not make sense
yay fixessss
use mimalloc for disk read example
bytes
just vec again
custom mst node deserialize
well it kinda works again
simpler depth handling
remove old code
reduce fjall mem
update timings etc
fix example
fmt
old unused error
get the api closer to what it was befor
clean up builder
drop empty-mst special-casing
get ready for release
expand 0 comments
pull request successfully merged
bad-example.com submitted #0
20 commits
expand
fjall 3.0.0 buggy
sqlite -> fjall 2 for ~2x speedup
fjall v3
some settings that might not make sense
yay fixessss
use mimalloc for disk read example
bytes
just vec again
custom mst node deserialize
well it kinda works again
simpler depth handling
remove old code
reduce fjall mem
update timings etc
fix example
fmt
old unused error
get the api closer to what it was befor
clean up builder
drop empty-mst special-casing
expand 0 comments