Server tools to backfill, tail, mirror, and verify PLC logs

fjall mirror mode

ptr.pet d472cadc bbc1774b

verified
+2229 -248
+285 -3
Cargo.lock
··· 32 32 dependencies = [ 33 33 "anyhow", 34 34 "async-compression", 35 + "async-trait", 35 36 "chrono", 37 + "cid", 36 38 "clap", 39 + "data-encoding", 40 + "fjall", 37 41 "futures", 38 42 "governor", 39 43 "http-body-util", ··· 50 54 "rustls", 51 55 "serde", 52 56 "serde_json", 57 + "tempfile", 53 58 "thiserror 2.0.16", 54 59 "tokio", 55 60 "tokio-postgres", ··· 261 266 ] 262 267 263 268 [[package]] 269 + name = "base-x" 270 + version = "0.2.11" 271 + source = "registry+https://github.com/rust-lang/crates.io-index" 272 + checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" 273 + 274 + [[package]] 275 + name = "base256emoji" 276 + version = "1.0.2" 277 + source = "registry+https://github.com/rust-lang/crates.io-index" 278 + checksum = "b5e9430d9a245a77c92176e649af6e275f20839a48389859d1661e9a128d077c" 279 + dependencies = [ 280 + "const-str", 281 + "match-lookup", 282 + ] 283 + 284 + [[package]] 264 285 name = "base64" 265 286 version = "0.22.1" 266 287 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 341 362 checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" 342 363 343 364 [[package]] 365 + name = "byteorder-lite" 366 + version = "0.1.0" 367 + source = "registry+https://github.com/rust-lang/crates.io-index" 368 + checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 369 + 370 + [[package]] 344 371 name = "bytes" 345 372 version = "1.10.1" 346 373 source = "registry+https://github.com/rust-lang/crates.io-index" 347 374 checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 375 + 376 + [[package]] 377 + name = "byteview" 378 + version = "0.10.1" 379 + source = "registry+https://github.com/rust-lang/crates.io-index" 380 + checksum = "1c53ba0f290bfc610084c05582d9c5d421662128fc69f4bf236707af6fd321b9" 348 381 349 382 [[package]] 350 383 name = "cc" ··· 394 427 ] 395 428 396 429 [[package]] 430 + name = "cid" 431 + version = "0.11.1" 432 + source = "registry+https://github.com/rust-lang/crates.io-index" 433 + checksum = "3147d8272e8fa0ccd29ce51194dd98f79ddfb8191ba9e3409884e751798acf3a" 434 + dependencies = [ 435 + "core2", 436 + "multibase", 437 + "multihash", 438 + "unsigned-varint", 439 + ] 440 + 441 + [[package]] 397 442 name = "clang-sys" 398 443 version = "1.8.1" 399 444 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 460 505 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 461 506 462 507 [[package]] 508 + name = "compare" 509 + version = "0.0.6" 510 + source = "registry+https://github.com/rust-lang/crates.io-index" 511 + checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" 512 + 513 + [[package]] 463 514 name = "compression-codecs" 464 515 version = "0.4.30" 465 516 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 480 531 checksum = "e47641d3deaf41fb1538ac1f54735925e275eaf3bf4d55c81b137fba797e5cbb" 481 532 482 533 [[package]] 534 + name = "const-str" 535 + version = "0.4.3" 536 + source = "registry+https://github.com/rust-lang/crates.io-index" 537 + checksum = "2f421161cb492475f1661ddc9815a745a1c894592070661180fdec3d4872e9c3" 538 + 539 + [[package]] 483 540 name = "core-foundation" 484 541 version = "0.9.4" 485 542 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 506 563 checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" 507 564 508 565 [[package]] 566 + name = "core2" 567 + version = "0.4.0" 568 + source = "registry+https://github.com/rust-lang/crates.io-index" 569 + checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" 570 + dependencies = [ 571 + "memchr", 572 + ] 573 + 574 + [[package]] 509 575 name = "cpufeatures" 510 576 version = "0.2.17" 511 577 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 524 590 ] 525 591 526 592 [[package]] 593 + name = "crossbeam-epoch" 594 + version = "0.9.18" 595 + source = "registry+https://github.com/rust-lang/crates.io-index" 596 + checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" 597 + dependencies = [ 598 + "crossbeam-utils", 599 + ] 600 + 601 + [[package]] 602 + name = "crossbeam-skiplist" 603 + version = "0.1.3" 604 + source = "registry+https://github.com/rust-lang/crates.io-index" 605 + checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" 606 + dependencies = [ 607 + "crossbeam-epoch", 608 + "crossbeam-utils", 609 + ] 610 + 611 + [[package]] 527 612 name = "crossbeam-utils" 528 613 version = "0.8.21" 529 614 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 555 640 556 641 [[package]] 557 642 name = "data-encoding" 558 - version = "2.9.0" 643 + version = "2.10.0" 559 644 source = "registry+https://github.com/rust-lang/crates.io-index" 560 - checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" 645 + checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" 646 + 647 + [[package]] 648 + name = "data-encoding-macro" 649 + version = "0.1.19" 650 + source = "registry+https://github.com/rust-lang/crates.io-index" 651 + checksum = "8142a83c17aa9461d637e649271eae18bf2edd00e91f2e105df36c3c16355bdb" 652 + dependencies = [ 653 + "data-encoding", 654 + "data-encoding-macro-internal", 655 + ] 656 + 657 + [[package]] 658 + name = "data-encoding-macro-internal" 659 + version = "0.1.17" 660 + source = "registry+https://github.com/rust-lang/crates.io-index" 661 + checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" 662 + dependencies = [ 663 + "data-encoding", 664 + "syn", 665 + ] 561 666 562 667 [[package]] 563 668 name = "der-parser" ··· 626 731 ] 627 732 628 733 [[package]] 734 + name = "enum_dispatch" 735 + version = "0.3.13" 736 + source = "registry+https://github.com/rust-lang/crates.io-index" 737 + checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" 738 + dependencies = [ 739 + "once_cell", 740 + "proc-macro2", 741 + "quote", 742 + "syn", 743 + ] 744 + 745 + [[package]] 629 746 name = "equivalent" 630 747 version = "1.0.2" 631 748 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 660 777 checksum = "7fd99930f64d146689264c637b5af2f0233a933bef0d8570e2526bf9e083192d" 661 778 662 779 [[package]] 780 + name = "fjall" 781 + version = "3.0.2" 782 + source = "registry+https://github.com/rust-lang/crates.io-index" 783 + checksum = "5a2799b4198427a08c774838e44d0b77f677208f19a1927671cd2cd36bb30d69" 784 + dependencies = [ 785 + "byteorder-lite", 786 + "byteview", 787 + "dashmap", 788 + "flume", 789 + "log", 790 + "lsm-tree", 791 + "lz4_flex", 792 + "tempfile", 793 + "xxhash-rust", 794 + ] 795 + 796 + [[package]] 663 797 name = "flate2" 664 798 version = "1.1.2" 665 799 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 670 804 ] 671 805 672 806 [[package]] 807 + name = "flume" 808 + version = "0.12.0" 809 + source = "registry+https://github.com/rust-lang/crates.io-index" 810 + checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 811 + dependencies = [ 812 + "spin", 813 + ] 814 + 815 + [[package]] 673 816 name = "fnv" 674 817 version = "1.0.7" 675 818 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 913 1056 "equivalent", 914 1057 "foldhash", 915 1058 ] 1059 + 1060 + [[package]] 1061 + name = "hashbrown" 1062 + version = "0.16.1" 1063 + source = "registry+https://github.com/rust-lang/crates.io-index" 1064 + checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 916 1065 917 1066 [[package]] 918 1067 name = "headers" ··· 1219 1368 checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" 1220 1369 dependencies = [ 1221 1370 "equivalent", 1222 - "hashbrown 0.15.5", 1371 + "hashbrown 0.16.1", 1223 1372 ] 1224 1373 1225 1374 [[package]] ··· 1235 1384 ] 1236 1385 1237 1386 [[package]] 1387 + name = "interval-heap" 1388 + version = "0.0.5" 1389 + source = "registry+https://github.com/rust-lang/crates.io-index" 1390 + checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" 1391 + dependencies = [ 1392 + "compare", 1393 + ] 1394 + 1395 + [[package]] 1238 1396 name = "io-uring" 1239 1397 version = "0.7.10" 1240 1398 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1370 1528 checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" 1371 1529 1372 1530 [[package]] 1531 + name = "lsm-tree" 1532 + version = "3.0.2" 1533 + source = "registry+https://github.com/rust-lang/crates.io-index" 1534 + checksum = "86e8d0b8e0cf2531a437788ce94d95570dbaabfe9888db20022c2d5ccec9b221" 1535 + dependencies = [ 1536 + "byteorder-lite", 1537 + "byteview", 1538 + "crossbeam-skiplist", 1539 + "enum_dispatch", 1540 + "interval-heap", 1541 + "log", 1542 + "lz4_flex", 1543 + "quick_cache", 1544 + "rustc-hash", 1545 + "self_cell", 1546 + "sfa", 1547 + "tempfile", 1548 + "varint-rs", 1549 + "xxhash-rust", 1550 + ] 1551 + 1552 + [[package]] 1553 + name = "lz4_flex" 1554 + version = "0.11.5" 1555 + source = "registry+https://github.com/rust-lang/crates.io-index" 1556 + checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" 1557 + dependencies = [ 1558 + "twox-hash", 1559 + ] 1560 + 1561 + [[package]] 1562 + name = "match-lookup" 1563 + version = "0.1.2" 1564 + source = "registry+https://github.com/rust-lang/crates.io-index" 1565 + checksum = "757aee279b8bdbb9f9e676796fd459e4207a1f986e87886700abf589f5abf771" 1566 + dependencies = [ 1567 + "proc-macro2", 1568 + "quote", 1569 + "syn", 1570 + ] 1571 + 1572 + [[package]] 1373 1573 name = "matchers" 1374 1574 version = "0.2.0" 1375 1575 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1427 1627 ] 1428 1628 1429 1629 [[package]] 1630 + name = "multibase" 1631 + version = "0.9.2" 1632 + source = "registry+https://github.com/rust-lang/crates.io-index" 1633 + checksum = "8694bb4835f452b0e3bb06dbebb1d6fc5385b6ca1caf2e55fd165c042390ec77" 1634 + dependencies = [ 1635 + "base-x", 1636 + "base256emoji", 1637 + "data-encoding", 1638 + "data-encoding-macro", 1639 + ] 1640 + 1641 + [[package]] 1642 + name = "multihash" 1643 + version = "0.19.3" 1644 + source = "registry+https://github.com/rust-lang/crates.io-index" 1645 + checksum = "6b430e7953c29dd6a09afc29ff0bb69c6e306329ee6794700aee27b76a1aea8d" 1646 + dependencies = [ 1647 + "core2", 1648 + "unsigned-varint", 1649 + ] 1650 + 1651 + [[package]] 1430 1652 name = "native-tls" 1431 1653 version = "0.2.14" 1432 1654 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1977 2199 ] 1978 2200 1979 2201 [[package]] 2202 + name = "quick_cache" 2203 + version = "0.6.18" 2204 + source = "registry+https://github.com/rust-lang/crates.io-index" 2205 + checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3" 2206 + dependencies = [ 2207 + "equivalent", 2208 + "hashbrown 0.16.1", 2209 + ] 2210 + 2211 + [[package]] 1980 2212 name = "quinn" 1981 2213 version = "0.11.9" 1982 2214 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2448 2680 ] 2449 2681 2450 2682 [[package]] 2683 + name = "self_cell" 2684 + version = "1.2.2" 2685 + source = "registry+https://github.com/rust-lang/crates.io-index" 2686 + checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" 2687 + 2688 + [[package]] 2451 2689 name = "serde" 2452 2690 version = "1.0.226" 2453 2691 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2502 2740 ] 2503 2741 2504 2742 [[package]] 2743 + name = "sfa" 2744 + version = "1.0.0" 2745 + source = "registry+https://github.com/rust-lang/crates.io-index" 2746 + checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 2747 + dependencies = [ 2748 + "byteorder-lite", 2749 + "log", 2750 + "xxhash-rust", 2751 + ] 2752 + 2753 + [[package]] 2505 2754 name = "sha1" 2506 2755 version = "0.10.6" 2507 2756 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2583 2832 dependencies = [ 2584 2833 "libc", 2585 2834 "windows-sys 0.59.0", 2835 + ] 2836 + 2837 + [[package]] 2838 + name = "spin" 2839 + version = "0.9.8" 2840 + source = "registry+https://github.com/rust-lang/crates.io-index" 2841 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 2842 + dependencies = [ 2843 + "lock_api", 2586 2844 ] 2587 2845 2588 2846 [[package]] ··· 3075 3333 version = "0.2.5" 3076 3334 source = "registry+https://github.com/rust-lang/crates.io-index" 3077 3335 checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" 3336 + 3337 + [[package]] 3338 + name = "twox-hash" 3339 + version = "2.1.2" 3340 + source = "registry+https://github.com/rust-lang/crates.io-index" 3341 + checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" 3078 3342 3079 3343 [[package]] 3080 3344 name = "typenum" ··· 3119 3383 checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" 3120 3384 3121 3385 [[package]] 3386 + name = "unsigned-varint" 3387 + version = "0.8.0" 3388 + source = "registry+https://github.com/rust-lang/crates.io-index" 3389 + checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" 3390 + 3391 + [[package]] 3122 3392 name = "untrusted" 3123 3393 version = "0.9.0" 3124 3394 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3153 3423 version = "0.1.1" 3154 3424 source = "registry+https://github.com/rust-lang/crates.io-index" 3155 3425 checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" 3426 + 3427 + [[package]] 3428 + name = "varint-rs" 3429 + version = "2.2.0" 3430 + source = "registry+https://github.com/rust-lang/crates.io-index" 3431 + checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" 3156 3432 3157 3433 [[package]] 3158 3434 name = "vcpkg" ··· 3642 3918 "thiserror 2.0.16", 3643 3919 "time", 3644 3920 ] 3921 + 3922 + [[package]] 3923 + name = "xxhash-rust" 3924 + version = "0.8.15" 3925 + source = "registry+https://github.com/rust-lang/crates.io-index" 3926 + checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 3645 3927 3646 3928 [[package]] 3647 3929 name = "yasna"
+8
Cargo.toml
··· 6 6 edition = "2024" 7 7 default-run = "allegedly" 8 8 9 + 10 + [dev-dependencies] 11 + tempfile = "3.10.1" 12 + 9 13 [dependencies] 10 14 anyhow = "1.0.99" 15 + async-trait = "0.1" 11 16 async-compression = { version = "0.4.30", features = ["futures-io", "tokio", "gzip"] } 12 17 chrono = { version = "0.4.42", features = ["serde"] } 13 18 clap = { version = "4.5.47", features = ["derive", "env"] } 19 + fjall = "3.0.2" 14 20 futures = "0.3.31" 15 21 governor = "0.10.1" 16 22 http-body-util = "0.1.3" ··· 35 41 tracing = "0.1.41" 36 42 tracing-opentelemetry = "0.31.0" 37 43 tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } 44 + data-encoding = "2.10.0" 45 + cid = "0.11.1"
+61
flake.lock
··· 1 + { 2 + "nodes": { 3 + "nixpkgs": { 4 + "locked": { 5 + "lastModified": 1771177547, 6 + "narHash": "sha256-trTtk3WTOHz7hSw89xIIvahkgoFJYQ0G43IlqprFoMA=", 7 + "owner": "nixos", 8 + "repo": "nixpkgs", 9 + "rev": "ac055f38c798b0d87695240c7b761b82fc7e5bc2", 10 + "type": "github" 11 + }, 12 + "original": { 13 + "owner": "nixos", 14 + "ref": "nixpkgs-unstable", 15 + "repo": "nixpkgs", 16 + "type": "github" 17 + } 18 + }, 19 + "nixpkgs-lib": { 20 + "locked": { 21 + "lastModified": 1769909678, 22 + "narHash": "sha256-cBEymOf4/o3FD5AZnzC3J9hLbiZ+QDT/KDuyHXVJOpM=", 23 + "owner": "nix-community", 24 + "repo": "nixpkgs.lib", 25 + "rev": "72716169fe93074c333e8d0173151350670b824c", 26 + "type": "github" 27 + }, 28 + "original": { 29 + "owner": "nix-community", 30 + "repo": "nixpkgs.lib", 31 + "type": "github" 32 + } 33 + }, 34 + "parts": { 35 + "inputs": { 36 + "nixpkgs-lib": "nixpkgs-lib" 37 + }, 38 + "locked": { 39 + "lastModified": 1769996383, 40 + "narHash": "sha256-AnYjnFWgS49RlqX7LrC4uA+sCCDBj0Ry/WOJ5XWAsa0=", 41 + "owner": "hercules-ci", 42 + "repo": "flake-parts", 43 + "rev": "57928607ea566b5db3ad13af0e57e921e6b12381", 44 + "type": "github" 45 + }, 46 + "original": { 47 + "owner": "hercules-ci", 48 + "repo": "flake-parts", 49 + "type": "github" 50 + } 51 + }, 52 + "root": { 53 + "inputs": { 54 + "nixpkgs": "nixpkgs", 55 + "parts": "parts" 56 + } 57 + } 58 + }, 59 + "root": "root", 60 + "version": 7 61 + }
+33
flake.nix
··· 1 + { 2 + inputs.parts.url = "github:hercules-ci/flake-parts"; 3 + inputs.nixpkgs.url = "github:nixos/nixpkgs/nixpkgs-unstable"; 4 + 5 + outputs = 6 + inp: 7 + inp.parts.lib.mkFlake { inputs = inp; } { 8 + systems = [ "x86_64-linux" ]; 9 + perSystem = 10 + { 11 + pkgs, 12 + config, 13 + ... 14 + }: 15 + { 16 + packages.default = pkgs.callPackage ./default.nix {}; 17 + devShells = { 18 + default = pkgs.mkShell { 19 + packages = with pkgs; [ 20 + rustPlatform.rustLibSrc 21 + rust-analyzer 22 + cargo 23 + cargo-outdated 24 + rustc 25 + rustfmt 26 + openssl 27 + pkg-config 28 + ]; 29 + }; 30 + }; 31 + }; 32 + }; 33 + }
+11 -1
readme.md
··· 15 15 --wrap-pg "postgresql://user:pass@pg-host:5432/plc-db" 16 16 ``` 17 17 18 + - Run a fully self-contained mirror using an embedded fjall database (no postgres needed): 19 + 20 + ```bash 21 + # backfill first 22 + allegedly backfill --to-fjall ./plc-data 23 + 24 + # then run the mirror 25 + allegedly mirror --wrap-fjall ./plc-data 26 + ``` 27 + 18 28 - Wrap a plc server, maximalist edition: 19 29 20 30 ```bash ··· 89 99 - [ ] experimental: websocket version of /export 90 100 - [x] experimental: accept writes by forwarding them upstream 91 101 - [ ] experimental: serve a tlog 92 - - [ ] experimental: embed a log database directly for fast and efficient mirroring 102 + - [x] experimental: embed a log database directly for fast and efficient mirroring 93 103 - [ ] experimental: support multiple upstreams? 94 104 95 105 - [ ] new command todo: `zip` or `check` or `diff`: compare two plc logs over some time range
+30 -3
src/bin/backfill.rs
··· 1 1 use allegedly::{ 2 - Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, 2 + Db, Dt, ExportPage, FjallDb, FolderSource, HttpSource, backfill, backfill_to_fjall, 3 + backfill_to_pg, 3 4 bin::{GlobalArgs, bin_init}, 4 - full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream, 5 + full_pages, logo, pages_to_fjall, pages_to_pg, pages_to_stdout, poll_upstream, 5 6 }; 6 7 use clap::Parser; 7 8 use reqwest::Url; ··· 45 46 /// only used if `--to-postgres` is present 46 47 #[arg(long, action)] 47 48 postgres_reset: bool, 49 + /// Bulk load into a local fjall embedded database 50 + /// 51 + /// Pass a directory path for the fjall database 52 + #[arg(long, conflicts_with_all = ["to_postgres", "postgres_cert", "postgres_reset"])] 53 + to_fjall: Option<PathBuf>, 54 + /// Delete all operations from the fjall db before starting 55 + /// 56 + /// only used if `--to-fjall` is present 57 + #[arg(long, action, requires = "to_fjall")] 58 + fjall_reset: bool, 48 59 /// Stop at the week ending before this date 49 60 #[arg(long)] 50 61 until: Option<Dt>, ··· 66 77 to_postgres, 67 78 postgres_cert, 68 79 postgres_reset, 80 + to_fjall, 81 + fjall_reset, 69 82 until, 70 83 catch_up, 71 84 }: Args, ··· 143 156 } 144 157 145 158 // set up sinks 146 - if let Some(pg_url) = to_postgres { 159 + if let Some(fjall_path) = to_fjall { 160 + log::trace!("opening fjall db at {fjall_path:?}..."); 161 + let db = FjallDb::open(&fjall_path)?; 162 + log::trace!("opened fjall db"); 163 + 164 + tasks.spawn(backfill_to_fjall( 165 + db.clone(), 166 + fjall_reset, 167 + bulk_out, 168 + found_last_tx, 169 + )); 170 + if catch_up { 171 + tasks.spawn(pages_to_fjall(db, full_out)); 172 + } 173 + } else if let Some(pg_url) = to_postgres { 147 174 log::trace!("connecting to postgres..."); 148 175 let db = Db::new(pg_url.as_str(), postgres_cert).await?; 149 176 log::trace!("connected to postgres");
+46 -25
src/bin/mirror.rs
··· 1 1 use allegedly::{ 2 - Db, ExperimentalConf, ListenConf, 2 + Db, ExperimentalConf, FjallDb, ListenConf, 3 3 bin::{GlobalArgs, InstrumentationArgs, bin_init}, 4 - logo, pages_to_pg, poll_upstream, serve, 4 + logo, pages_to_fjall, pages_to_pg, poll_upstream, serve, serve_fjall, 5 5 }; 6 6 use clap::Parser; 7 7 use reqwest::Url; ··· 10 10 11 11 #[derive(Debug, clap::Args)] 12 12 pub struct Args { 13 - /// the wrapped did-method-plc server 13 + /// the wrapped did-method-plc server (not needed when using --wrap-fjall) 14 14 #[arg(long, env = "ALLEGEDLY_WRAP")] 15 - wrap: Url, 15 + wrap: Option<Url>, 16 16 /// the wrapped did-method-plc server's database (write access required) 17 - #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 17 + #[arg(long, env = "ALLEGEDLY_WRAP_PG", conflicts_with = "wrap_fjall")] 18 18 wrap_pg: Option<Url>, 19 19 /// path to tls cert for the wrapped postgres db, if needed 20 20 #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 21 21 wrap_pg_cert: Option<PathBuf>, 22 + /// path to a local fjall database directory (alternative to postgres) 23 + #[arg(long, env = "ALLEGEDLY_WRAP_FJALL", conflicts_with_all = ["wrap_pg", "wrap_pg_cert"])] 24 + wrap_fjall: Option<PathBuf>, 22 25 /// wrapping server listen address 23 26 #[arg(short, long, env = "ALLEGEDLY_BIND")] 24 27 #[clap(default_value = "127.0.0.1:8000")] ··· 70 73 wrap, 71 74 wrap_pg, 72 75 wrap_pg_cert, 76 + wrap_fjall, 73 77 bind, 74 78 acme_domain, 75 79 acme_cache_path, ··· 106 110 107 111 let mut tasks = JoinSet::new(); 108 112 109 - let db = if sync { 110 - let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!( 111 - "a wrapped reference postgres must be provided to sync" 112 - ))?; 113 - let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 113 + if let Some(fjall_path) = wrap_fjall { 114 + let db = FjallDb::open(&fjall_path)?; 114 115 115 - // TODO: allow starting up with polling backfill from beginning? 116 - log::debug!("getting the latest op from the db..."); 116 + log::debug!("getting the latest op from fjall..."); 117 117 let latest = db 118 - .get_latest() 119 - .await? 118 + .get_latest()? 120 119 .expect("there to be at least one op in the db. did you backfill?"); 121 120 122 121 let (send_page, recv_page) = mpsc::channel(8); ··· 126 125 let throttle = Duration::from_millis(upstream_throttle_ms); 127 126 128 127 tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 129 - tasks.spawn(pages_to_pg(db.clone(), recv_page)); 130 - Some(db) 128 + tasks.spawn(pages_to_fjall(db.clone(), recv_page)); 129 + 130 + tasks.spawn(serve_fjall(upstream, listen_conf, experimental_conf, db)); 131 131 } else { 132 - None 133 - }; 132 + let wrap = wrap.ok_or(anyhow::anyhow!( 133 + "--wrap is required unless using --wrap-fjall" 134 + ))?; 135 + 136 + let db: Option<Db> = if sync { 137 + let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!( 138 + "a wrapped reference postgres (--wrap-pg) or fjall db (--wrap-fjall) must be provided to sync" 139 + ))?; 140 + let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 141 + 142 + log::debug!("getting the latest op from the db..."); 143 + let latest = db 144 + .get_latest() 145 + .await? 146 + .expect("there to be at least one op in the db. did you backfill?"); 147 + 148 + let (send_page, recv_page) = mpsc::channel(8); 149 + 150 + let mut poll_url = upstream.clone(); 151 + poll_url.set_path("/export"); 152 + let throttle = Duration::from_millis(upstream_throttle_ms); 153 + 154 + tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 155 + tasks.spawn(pages_to_pg(db.clone(), recv_page)); 156 + Some(db) 157 + } else { 158 + None 159 + }; 134 160 135 - tasks.spawn(serve( 136 - upstream, 137 - wrap, 138 - listen_conf, 139 - experimental_conf, 140 - db.clone(), 141 - )); 161 + tasks.spawn(serve(upstream, wrap, listen_conf, experimental_conf, db)); 162 + } 142 163 143 164 while let Some(next) = tasks.join_next().await { 144 165 match next {
+399
src/doc.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + use serde_json::Value; 3 + use std::borrow::Cow; 4 + use std::collections::BTreeMap; 5 + 6 + pub type CowStr<'a> = Cow<'a, str>; 7 + 8 + #[derive(Debug, Clone, Serialize, Deserialize)] 9 + pub struct Service<'a> { 10 + pub r#type: CowStr<'a>, 11 + pub endpoint: CowStr<'a>, 12 + } 13 + 14 + #[derive(Debug, Clone, Serialize, Deserialize)] 15 + #[serde(rename_all = "camelCase")] 16 + pub struct DocumentData<'a> { 17 + pub did: CowStr<'a>, 18 + pub rotation_keys: Vec<CowStr<'a>>, 19 + pub verification_methods: BTreeMap<CowStr<'a>, CowStr<'a>>, 20 + pub also_known_as: Vec<CowStr<'a>>, 21 + pub services: BTreeMap<CowStr<'a>, Service<'a>>, 22 + } 23 + 24 + #[derive(Debug, Clone, Serialize)] 25 + #[serde(rename_all = "camelCase")] 26 + pub struct DidDocument<'a> { 27 + #[serde(rename = "@context")] 28 + pub context: Vec<CowStr<'a>>, 29 + pub id: CowStr<'a>, 30 + pub also_known_as: Vec<CowStr<'a>>, 31 + pub verification_method: Vec<VerificationMethod<'a>>, 32 + pub service: Vec<DocService<'a>>, 33 + } 34 + 35 + #[derive(Debug, Clone, Serialize)] 36 + #[serde(rename_all = "camelCase")] 37 + pub struct VerificationMethod<'a> { 38 + pub id: CowStr<'a>, 39 + pub r#type: CowStr<'a>, 40 + pub controller: CowStr<'a>, 41 + pub public_key_multibase: CowStr<'a>, 42 + } 43 + 44 + #[derive(Debug, Clone, Serialize)] 45 + #[serde(rename_all = "camelCase")] 46 + pub struct DocService<'a> { 47 + pub id: CowStr<'a>, 48 + pub r#type: CowStr<'a>, 49 + pub service_endpoint: CowStr<'a>, 50 + } 51 + 52 + const P256_PREFIX: &str = "zDn"; 53 + const SECP256K1_PREFIX: &str = "zQ3"; 54 + 55 + fn key_context(multibase: &str) -> Option<&'static str> { 56 + if multibase.starts_with(P256_PREFIX) { 57 + Some("https://w3id.org/security/suites/ecdsa-2019/v1") 58 + } else if multibase.starts_with(SECP256K1_PREFIX) { 59 + Some("https://w3id.org/security/suites/secp256k1-2019/v1") 60 + } else { 61 + None 62 + } 63 + } 64 + 65 + pub fn format_did_doc<'a>(data: &'a DocumentData<'a>) -> DidDocument<'a> { 66 + let mut context = vec![ 67 + "https://www.w3.org/ns/did/v1".into(), 68 + "https://w3id.org/security/multikey/v1".into(), 69 + ]; 70 + 71 + let verification_method = data 72 + .verification_methods 73 + .iter() 74 + .map(|(keyid, did_key)| { 75 + let multibase: CowStr = did_key.strip_prefix("did:key:").unwrap_or(did_key).into(); 76 + 77 + if let Some(ctx) = key_context(&multibase) { 78 + if !context.iter().any(|c| c == ctx) { 79 + context.push(ctx.into()); 80 + } 81 + } 82 + VerificationMethod { 83 + id: format!("{}#{keyid}", data.did).into(), 84 + r#type: "Multikey".into(), 85 + controller: data.did.clone(), 86 + public_key_multibase: multibase, 87 + } 88 + }) 89 + .collect(); 90 + 91 + let service = data 92 + .services 93 + .iter() 94 + .map(|(service_id, svc)| DocService { 95 + id: format!("#{service_id}").into(), 96 + r#type: svc.r#type.clone(), 97 + service_endpoint: svc.endpoint.clone(), 98 + }) 99 + .collect(); 100 + 101 + DidDocument { 102 + context, 103 + id: data.did.clone(), 104 + also_known_as: data.also_known_as.clone(), 105 + verification_method, 106 + service, 107 + } 108 + } 109 + 110 + fn ensure_atproto_prefix(s: &str) -> CowStr<'_> { 111 + if s.starts_with("at://") { 112 + return s.into(); 113 + } 114 + let stripped = s 115 + .strip_prefix("http://") 116 + .or_else(|| s.strip_prefix("https://")) 117 + .unwrap_or(s); 118 + format!("at://{stripped}").into() 119 + } 120 + 121 + fn ensure_http_prefix(s: &str) -> CowStr<'_> { 122 + if s.starts_with("http://") || s.starts_with("https://") { 123 + return s.into(); 124 + } 125 + format!("https://{s}").into() 126 + } 127 + 128 + /// extract DocumentData from a single operation json blob. 129 + /// returns None for tombstones. 130 + pub fn op_to_doc_data<'a>(did: &'a str, op: &'a Value) -> Option<DocumentData<'a>> { 131 + // TODO: this shouldnt just short circuit to None, we should provide better information about whats missing in an error 132 + let obj = op.as_object()?; 133 + let op_type = obj.get("type")?.as_str()?; 134 + 135 + match op_type { 136 + "plc_tombstone" => None, 137 + "create" => { 138 + let signing_key = obj.get("signingKey")?.as_str()?; 139 + let recovery_key = obj.get("recoveryKey")?.as_str()?; 140 + let handle = obj.get("handle")?.as_str()?; 141 + let service = obj.get("service")?.as_str()?; 142 + 143 + let mut verification_methods = BTreeMap::new(); 144 + verification_methods.insert("atproto".into(), signing_key.into()); 145 + 146 + let mut services = BTreeMap::new(); 147 + services.insert( 148 + "atproto_pds".into(), 149 + Service { 150 + r#type: "AtprotoPersonalDataServer".into(), 151 + endpoint: ensure_http_prefix(service), 152 + }, 153 + ); 154 + 155 + Some(DocumentData { 156 + did: Cow::Borrowed(did), 157 + rotation_keys: vec![Cow::Borrowed(recovery_key), Cow::Borrowed(signing_key)], 158 + verification_methods, 159 + also_known_as: vec![ensure_atproto_prefix(handle)], 160 + services, 161 + }) 162 + } 163 + "plc_operation" => { 164 + let rotation_keys = obj 165 + .get("rotationKeys")? 166 + .as_array()? 167 + .iter() 168 + .filter_map(|v| v.as_str().map(Cow::Borrowed)) 169 + .collect(); 170 + 171 + let verification_methods = obj 172 + .get("verificationMethods")? 173 + .as_object()? 174 + .iter() 175 + .filter_map(|(k, v)| Some((k.as_str().into(), v.as_str()?.into()))) 176 + .collect(); 177 + 178 + let also_known_as = obj 179 + .get("alsoKnownAs")? 180 + .as_array()? 181 + .iter() 182 + .filter_map(|v| v.as_str().map(Cow::Borrowed)) 183 + .collect(); 184 + 185 + let services = obj 186 + .get("services")? 187 + .as_object()? 188 + .iter() 189 + .filter_map(|(k, v)| { 190 + let svc: Service = Service::deserialize(v).ok()?; 191 + Some((k.as_str().into(), svc)) 192 + }) 193 + .collect(); 194 + 195 + Some(DocumentData { 196 + did: did.into(), 197 + rotation_keys, 198 + verification_methods, 199 + also_known_as, 200 + services, 201 + }) 202 + } 203 + _ => None, 204 + } 205 + } 206 + 207 + /// apply a sequence of operation JSON blobs and return the current document data. 208 + /// returns None if the DID is tombstoned (last op is a tombstone). 209 + pub fn apply_op_log<'a>(did: &'a str, ops: &'a [Value]) -> Option<DocumentData<'a>> { 210 + // TODO: we don't verify signature chain, we should do that... 211 + ops.last().and_then(|op| op_to_doc_data(did, op)) 212 + } 213 + 214 + #[cfg(test)] 215 + mod tests { 216 + use super::*; 217 + 218 + #[test] 219 + fn normalize_legacy_create() { 220 + let op = serde_json::json!({ 221 + "type": "create", 222 + "signingKey": "did:key:zDnaeSigningKey", 223 + "recoveryKey": "did:key:zQ3shRecoveryKey", 224 + "handle": "alice.bsky.social", 225 + "service": "pds.example.com", 226 + "prev": null, 227 + "sig": "abc" 228 + }); 229 + 230 + let data = op_to_doc_data("did:plc:test", &op).unwrap(); 231 + assert_eq!(data.rotation_keys.len(), 2); 232 + assert_eq!(data.rotation_keys[0], "did:key:zQ3shRecoveryKey"); 233 + assert_eq!(data.rotation_keys[1], "did:key:zDnaeSigningKey"); 234 + assert_eq!( 235 + data.verification_methods.get("atproto").unwrap(), 236 + "did:key:zDnaeSigningKey" 237 + ); 238 + assert_eq!(data.also_known_as, vec!["at://alice.bsky.social"]); 239 + let pds = data.services.get("atproto_pds").unwrap(); 240 + assert_eq!(pds.endpoint, "https://pds.example.com"); 241 + } 242 + 243 + #[test] 244 + fn format_doc_p256_context() { 245 + let data = DocumentData { 246 + did: "did:plc:test123".into(), 247 + rotation_keys: vec!["did:key:zDnaeXYZ".into()], 248 + verification_methods: { 249 + let mut m = BTreeMap::new(); 250 + m.insert("atproto".into(), "did:key:zDnaeXYZ".into()); 251 + m 252 + }, 253 + also_known_as: vec!["at://alice.test".into()], 254 + services: { 255 + let mut m = BTreeMap::new(); 256 + m.insert( 257 + "atproto_pds".into(), 258 + Service { 259 + r#type: "AtprotoPersonalDataServer".into(), 260 + endpoint: "https://pds.test".into(), 261 + }, 262 + ); 263 + m 264 + }, 265 + }; 266 + 267 + let doc = format_did_doc(&data); 268 + assert_eq!(doc.context.len(), 3); 269 + assert!( 270 + doc.context 271 + .iter() 272 + .any(|c| c == "https://w3id.org/security/suites/ecdsa-2019/v1") 273 + ); 274 + assert_eq!(doc.verification_method[0].public_key_multibase, "zDnaeXYZ"); 275 + assert_eq!(doc.verification_method[0].id, "did:plc:test123#atproto"); 276 + } 277 + 278 + #[test] 279 + fn tombstone_returns_none() { 280 + let op = serde_json::json!({ 281 + "type": "plc_tombstone", 282 + "prev": "bafyabc", 283 + "sig": "xyz" 284 + }); 285 + assert!(op_to_doc_data("did:plc:test", &op).is_none()); 286 + } 287 + 288 + #[test] 289 + fn apply_log_with_tombstone() { 290 + let create = serde_json::json!({ 291 + "type": "plc_operation", 292 + "rotationKeys": ["did:key:zQ3shKey1"], 293 + "verificationMethods": {"atproto": "did:key:zDnaeKey1"}, 294 + "alsoKnownAs": ["at://alice.test"], 295 + "services": { 296 + "atproto_pds": {"type": "AtprotoPersonalDataServer", "service_endpoint": "https://pds.test"} 297 + }, 298 + "prev": null, 299 + "sig": "abc" 300 + }); 301 + let tombstone = serde_json::json!({ 302 + "type": "plc_tombstone", 303 + "prev": "bafyabc", 304 + "sig": "xyz" 305 + }); 306 + 307 + let ops = vec![create.clone()]; 308 + let result = apply_op_log("did:plc:test", &ops); 309 + assert!(result.is_some()); 310 + 311 + let ops = vec![create, tombstone]; 312 + let result = apply_op_log("did:plc:test", &ops); 313 + assert!(result.is_none()); 314 + } 315 + 316 + fn load_fixture(name: &str) -> (String, Vec<Value>) { 317 + let path = format!("tests/fixtures/{name}"); 318 + let data = std::fs::read_to_string(&path).unwrap_or_else(|e| panic!("{path}: {e}")); 319 + let entries: Vec<Value> = serde_json::from_str(&data).unwrap(); 320 + let did = entries[0]["did"].as_str().unwrap().to_string(); 321 + let ops: Vec<Value> = entries 322 + .iter() 323 + .filter(|e| !e["nullified"].as_bool().unwrap_or(false)) 324 + .map(|e| e["operation"].clone()) 325 + .collect(); 326 + (did, ops) 327 + } 328 + 329 + #[test] 330 + fn interop_legacy_dholms() { 331 + let (did, ops) = load_fixture("log_legacy_dholms.json"); 332 + assert_eq!(did, "did:plc:yk4dd2qkboz2yv6tpubpc6co"); 333 + 334 + let data = apply_op_log(&did, &ops).expect("should reconstruct"); 335 + assert_eq!(data.did, did); 336 + assert_eq!(data.also_known_as, vec!["at://dholms.xyz"]); 337 + assert_eq!( 338 + data.services.get("atproto_pds").unwrap().endpoint, 339 + "https://bsky.social" 340 + ); 341 + assert_eq!( 342 + data.verification_methods.get("atproto").unwrap(), 343 + "did:key:zQ3shXjHeiBuRCKmM36cuYnm7YEMzhGnCmCyW92sRJ9pribSF" 344 + ); 345 + 346 + let doc = format_did_doc(&data); 347 + assert_eq!(doc.id, did); 348 + assert!( 349 + doc.context 350 + .iter() 351 + .any(|c| c == "https://w3id.org/security/suites/secp256k1-2019/v1") 352 + ); 353 + } 354 + 355 + #[test] 356 + fn interop_bskyapp() { 357 + let (did, ops) = load_fixture("log_bskyapp.json"); 358 + assert_eq!(did, "did:plc:z72i7hdynmk6r22z27h6tvur"); 359 + 360 + let data = apply_op_log(&did, &ops).expect("should reconstruct"); 361 + println!("{:?}", data); 362 + assert_eq!(data.also_known_as, vec!["at://bsky.app"]); 363 + assert_eq!( 364 + data.verification_methods.get("atproto").unwrap(), 365 + "did:key:zQ3shXjHeiBuRCKmM36cuYnm7YEMzhGnCmCyW92sRJ9pribSF" 366 + ); 367 + assert_eq!( 368 + data.services.get("atproto_pds").unwrap().endpoint, 369 + "https://bsky.social" 370 + ); 371 + } 372 + 373 + #[test] 374 + fn interop_tombstone() { 375 + let path = "tests/fixtures/log_tombstone.json"; 376 + let data = std::fs::read_to_string(path).unwrap(); 377 + let entries: Vec<Value> = serde_json::from_str(&data).unwrap(); 378 + let did = entries[0]["did"].as_str().unwrap(); 379 + let ops: Vec<Value> = entries.iter().map(|e| e["operation"].clone()).collect(); 380 + 381 + assert_eq!(did, "did:plc:6adr3q2labdllanslzhqkqd3"); 382 + let result = apply_op_log(did, &ops); 383 + assert!(result.is_none(), "tombstoned DID should return None"); 384 + } 385 + 386 + #[test] 387 + fn interop_nullification() { 388 + let (did, ops) = load_fixture("log_nullification.json"); 389 + assert_eq!(did, "did:plc:2s2mvm52ttz6r4hocmrq7x27"); 390 + 391 + let data = apply_op_log(&did, &ops).expect("should reconstruct"); 392 + assert_eq!(data.did, did); 393 + assert_eq!(data.rotation_keys.len(), 2); 394 + assert_eq!( 395 + data.rotation_keys[0], 396 + "did:key:zQ3shwPdax6jKMbhtzbueGwSjc7RnjsmPcNB1vQUpbKUCN1t1" 397 + ); 398 + } 399 + }
+5 -1
src/lib.rs
··· 1 1 use serde::{Deserialize, Serialize}; 2 + 2 3 use tokio::sync::{mpsc, oneshot}; 3 4 4 5 mod backfill; 5 6 mod cached_value; 6 7 mod client; 8 + pub mod doc; 7 9 mod mirror; 10 + mod plc_fjall; 8 11 mod plc_pg; 9 12 mod poll; 10 13 mod ratelimit; ··· 15 18 pub use backfill::backfill; 16 19 pub use cached_value::{CachedValue, Fetcher}; 17 20 pub use client::{CLIENT, UA}; 18 - pub use mirror::{ExperimentalConf, ListenConf, serve}; 21 + pub use mirror::{ExperimentalConf, ListenConf, serve, serve_fjall}; 22 + pub use plc_fjall::{FjallDb, backfill_to_fjall, pages_to_fjall}; 19 23 pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 20 24 pub use poll::{PageBoundaryState, get_page, poll_upstream}; 21 25 pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters};
+25 -215
src/mirror.rs src/mirror/pg.rs
··· 1 - use crate::{ 2 - CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, GovernorMiddleware, IpLimiters, UA, logo, 3 - }; 4 - use futures::TryStreamExt; 5 - use governor::Quota; 6 - use poem::{ 7 - Body, Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, 8 - get, handler, 9 - http::{StatusCode, header::USER_AGENT}, 10 - listener::{Listener, TcpListener, acme::AutoCert}, 11 - middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 12 - web::{Data, Json, Path}, 13 - }; 14 - use reqwest::{Client, Url}; 15 - use std::{net::SocketAddr, path::PathBuf, time::Duration}; 1 + use super::*; 16 2 17 3 #[derive(Clone)] 18 4 struct State { ··· 30 16 upstream_status: CachedValue<PlcStatus, CheckUpstream>, 31 17 } 32 18 19 + #[derive(Clone)] 20 + struct GetLatestAt(Db); 21 + impl Fetcher<Dt> for GetLatestAt { 22 + async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> { 23 + let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!( 24 + "expected to find at least one thing in the db" 25 + ))?; 26 + Ok(now) 27 + } 28 + } 29 + 30 + #[derive(Clone)] 31 + struct CheckUpstream(Url, Client); 32 + impl Fetcher<PlcStatus> for CheckUpstream { 33 + async fn fetch(&self) -> Result<PlcStatus, Box<dyn std::error::Error>> { 34 + Ok(plc_status(&self.0, &self.1).await) 35 + } 36 + } 37 + 33 38 #[handler] 34 39 fn hello( 35 40 Data(State { ··· 40 45 }): Data<&State>, 41 46 req: &Request, 42 47 ) -> String { 43 - // let mode = if sync_info.is_some() { "mirror" } else { "wrap" }; 44 48 let pre_info = if sync_info.is_some() { 45 49 format!( 46 50 r#" ··· 113 117 "#, 114 118 logo("mirror") 115 119 ) 116 - } 117 - 118 - #[handler] 119 - fn favicon() -> impl IntoResponse { 120 - include_bytes!("../favicon.ico").with_content_type("image/x-icon") 121 - } 122 - 123 - fn failed_to_reach_named(name: &str) -> String { 124 - format!( 125 - r#"{} 126 - 127 - Failed to reach the {name} server. Sorry. 128 - "#, 129 - logo("mirror 502 :( ") 130 - ) 131 - } 132 - 133 - fn bad_create_op(reason: &str) -> Response { 134 - Response::builder() 135 - .status(StatusCode::BAD_REQUEST) 136 - .body(format!( 137 - r#"{} 138 - 139 - NooOOOooooo: {reason} 140 - "#, 141 - logo("mirror 400 >:( ") 142 - )) 143 - } 144 - 145 - type PlcStatus = (bool, serde_json::Value); 146 - 147 - async fn plc_status(url: &Url, client: &Client) -> PlcStatus { 148 - use serde_json::json; 149 - 150 - let mut url = url.clone(); 151 - url.set_path("/_health"); 152 - 153 - let Ok(response) = client.get(url).timeout(Duration::from_secs(3)).send().await else { 154 - return (false, json!({"error": "cannot reach plc server"})); 155 - }; 156 - 157 - let status = response.status(); 158 - 159 - let Ok(text) = response.text().await else { 160 - return (false, json!({"error": "failed to read response body"})); 161 - }; 162 - 163 - let body = match serde_json::from_str(&text) { 164 - Ok(json) => json, 165 - Err(_) => serde_json::Value::String(text.to_string()), 166 - }; 167 - 168 - if status.is_success() { 169 - (true, body) 170 - } else { 171 - ( 172 - false, 173 - json!({ 174 - "error": "non-ok status", 175 - "status": status.as_str(), 176 - "status_code": status.as_u16(), 177 - "response": body, 178 - }), 179 - ) 180 - } 181 - } 182 - 183 - #[derive(Clone)] 184 - struct GetLatestAt(Db); 185 - impl Fetcher<Dt> for GetLatestAt { 186 - async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> { 187 - let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!( 188 - "expected to find at least one thing in the db" 189 - ))?; 190 - Ok(now) 191 - } 192 - } 193 - 194 - #[derive(Clone)] 195 - struct CheckUpstream(Url, Client); 196 - impl Fetcher<PlcStatus> for CheckUpstream { 197 - async fn fetch(&self) -> Result<PlcStatus, Box<dyn std::error::Error>> { 198 - Ok(plc_status(&self.0, &self.1).await) 199 - } 200 120 } 201 121 202 122 #[handler] ··· 213 133 if !ok { 214 134 overall_status = StatusCode::BAD_GATEWAY; 215 135 } 136 + 216 137 if let Some(SyncInfo { 217 138 latest_at, 218 139 upstream_status, 219 140 }) = sync_info 220 141 { 221 - // mirror mode 222 142 let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible"); 223 143 if !ok { 224 144 overall_status = StatusCode::BAD_GATEWAY; ··· 235 155 })), 236 156 ) 237 157 } else { 238 - // wrap mode 239 158 ( 240 159 overall_status, 241 160 Json(serde_json::json!({ ··· 247 166 } 248 167 } 249 168 250 - fn proxy_response(res: reqwest::Response) -> Response { 251 - let http_res: poem::http::Response<reqwest::Body> = res.into(); 252 - let (parts, reqw_body) = http_res.into_parts(); 253 - 254 - let parts = poem::ResponseParts { 255 - status: parts.status, 256 - version: parts.version, 257 - headers: parts.headers, 258 - extensions: parts.extensions, 259 - }; 260 - 261 - let body = http_body_util::BodyDataStream::new(reqw_body) 262 - .map_err(|e| std::io::Error::other(Box::new(e))); 263 - 264 - Response::from_parts(parts, poem::Body::from_bytes_stream(body)) 265 - } 266 - 267 169 #[handler] 268 170 async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> { 269 171 let mut target = state.plc.clone(); ··· 272 174 let wrapped_res = state 273 175 .client 274 176 .get(target) 275 - .timeout(Duration::from_secs(3)) // should be low latency to wrapped server 177 + .timeout(Duration::from_secs(3)) 276 178 .headers(req.headers().clone()) 277 179 .send() 278 180 .await ··· 312 214 } 313 215 } 314 216 315 - // adjust proxied headers 316 217 let mut headers: reqwest::header::HeaderMap = req.headers().clone(); 317 218 log::trace!("original request headers: {headers:?}"); 318 219 headers.insert("Host", upstream.host_str().unwrap().parse().unwrap()); ··· 332 233 target.set_path(&did); 333 234 let upstream_res = client 334 235 .post(target) 335 - .timeout(Duration::from_secs(15)) // be a little generous 236 + .timeout(Duration::from_secs(15)) 336 237 .headers(headers) 337 238 .body(reqwest::Body::wrap_stream(body.into_bytes_stream())) 338 239 .send() ··· 366 267 ) 367 268 } 368 269 369 - #[derive(Debug)] 370 - pub enum ListenConf { 371 - Acme { 372 - domains: Vec<String>, 373 - cache_path: PathBuf, 374 - directory_url: String, 375 - ipv6: bool, 376 - }, 377 - Bind(SocketAddr), 378 - } 379 - 380 - #[derive(Debug, Clone)] 381 - pub struct ExperimentalConf { 382 - pub acme_domain: Option<String>, 383 - pub write_upstream: bool, 384 - } 385 - 386 270 pub async fn serve( 387 271 upstream: Url, 388 272 plc: Url, ··· 392 276 ) -> anyhow::Result<&'static str> { 393 277 log::info!("starting server..."); 394 278 395 - // not using crate CLIENT: don't want the retries etc 396 279 let client = Client::builder() 397 280 .user_agent(UA) 398 - .timeout(Duration::from_secs(10)) // fallback 281 + .timeout(Duration::from_secs(10)) 399 282 .build() 400 283 .expect("reqwest client to build"); 401 284 ··· 447 330 .with(CatchPanic::new()) 448 331 .with(Tracing); 449 332 450 - match listen { 451 - ListenConf::Acme { 452 - domains, 453 - cache_path, 454 - directory_url, 455 - ipv6, 456 - } => { 457 - rustls::crypto::aws_lc_rs::default_provider() 458 - .install_default() 459 - .expect("crypto provider to be installable"); 460 - 461 - let mut auto_cert = AutoCert::builder() 462 - .directory_url(directory_url) 463 - .cache_path(cache_path); 464 - for domain in domains { 465 - auto_cert = auto_cert.domain(domain); 466 - } 467 - let auto_cert = auto_cert.build().expect("acme config to build"); 468 - 469 - log::trace!("auto_cert: {auto_cert:?}"); 470 - 471 - let notice_task = tokio::task::spawn(run_insecure_notice(ipv6)); 472 - let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" }); 473 - let app_res = run(app, listener.acme(auto_cert)).await; 474 - log::warn!("server task ended, aborting insecure server task..."); 475 - notice_task.abort(); 476 - app_res?; 477 - notice_task.await??; 478 - } 479 - ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?, 480 - } 481 - 482 - Ok("server (uh oh?)") 483 - } 484 - 485 - async fn run<A, L>(app: A, listener: L) -> std::io::Result<()> 486 - where 487 - A: Endpoint + 'static, 488 - L: Listener + 'static, 489 - { 490 - Server::new(listener) 491 - .name("allegedly (mirror)") 492 - .run(app) 493 - .await 494 - } 495 - 496 - /// kick off a tiny little server on a tokio task to tell people to use 443 497 - async fn run_insecure_notice(ipv6: bool) -> Result<(), std::io::Error> { 498 - #[handler] 499 - fn oop_plz_be_secure() -> (StatusCode, String) { 500 - ( 501 - StatusCode::BAD_REQUEST, 502 - format!( 503 - r#"{} 504 - 505 - You probably want to change your request to use HTTPS instead of HTTP. 506 - "#, 507 - logo("mirror (tls on 443 please)") 508 - ), 509 - ) 510 - } 511 - 512 - let app = Route::new() 513 - .at("/favicon.ico", get(favicon)) 514 - .nest("/", get(oop_plz_be_secure)) 515 - .with(Tracing); 516 - Server::new(TcpListener::bind(if ipv6 { 517 - "[::]:80" 518 - } else { 519 - "0.0.0.0:80" 520 - })) 521 - .name("allegedly (mirror:80 helper)") 522 - .run(app) 523 - .await 333 + bind_or_acme(app, listen).await 524 334 }
+431
src/mirror/fjall.rs
··· 1 + use super::*; 2 + use futures::StreamExt; 3 + use poem::web::Query; 4 + use serde::Deserialize; 5 + 6 + #[derive(Clone)] 7 + struct FjallState { 8 + client: Client, 9 + upstream: Url, 10 + fjall: FjallDb, 11 + sync_info: FjallSyncInfo, 12 + experimental: ExperimentalConf, 13 + } 14 + 15 + #[derive(Clone)] 16 + struct FjallSyncInfo { 17 + latest_at: CachedValue<Dt, GetFjallLatestAt>, 18 + upstream_status: CachedValue<PlcStatus, CheckUpstream>, 19 + } 20 + 21 + #[derive(Clone)] 22 + struct GetFjallLatestAt(FjallDb); 23 + impl Fetcher<Dt> for GetFjallLatestAt { 24 + async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> { 25 + let db = self.0.clone(); 26 + let now = tokio::task::spawn_blocking(move || db.get_latest()) 27 + .await?? 28 + .ok_or(anyhow::anyhow!( 29 + "expected to find at least one thing in the db" 30 + ))?; 31 + Ok(now) 32 + } 33 + } 34 + 35 + #[derive(Clone)] 36 + struct CheckUpstream(Url, Client); 37 + impl Fetcher<PlcStatus> for CheckUpstream { 38 + async fn fetch(&self) -> Result<PlcStatus, Box<dyn std::error::Error>> { 39 + Ok(plc_status(&self.0, &self.1).await) 40 + } 41 + } 42 + 43 + #[handler] 44 + fn fjall_hello( 45 + Data(FjallState { 46 + upstream, 47 + experimental: exp, 48 + .. 49 + }): Data<&FjallState>, 50 + req: &Request, 51 + ) -> String { 52 + let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) { 53 + (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(), 54 + (_, None, _) => { 55 + " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string() 56 + } 57 + (_, Some(d), Some(f)) if f == d => { 58 + " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string() 59 + } 60 + (_, Some(d), _) => format!( 61 + r#" - POST /* Rejected, but experimental upstream op forwarding is 62 + available at `POST https://{d}/:did`!"# 63 + ), 64 + }; 65 + 66 + format!( 67 + r#"{} 68 + 69 + This is a PLC[1] mirror running Allegedly in fjall mirror mode. The PLC API 70 + is served from a the local database, which is mirrored from the upstream PLC 71 + server. 72 + 73 + 74 + Configured upstream: 75 + 76 + {upstream} 77 + 78 + 79 + Available APIs: 80 + 81 + - GET /_health Health and version info 82 + 83 + - GET /did:plc:{{did}} Resolve a DID document 84 + - GET /did:plc:{{did}}/log Operation log 85 + - GET /did:plc:{{did}}/log/audit Full audit log (including nullified ops) 86 + - GET /did:plc:{{did}}/log/last Last operation 87 + - GET /did:plc:{{did}}/data Parsed document data 88 + 89 + {post_info} 90 + 91 + 92 + Allegedly is a suite of open-source CLI tools for working with PLC logs, 93 + from microcosm: 94 + 95 + https://tangled.org/@microcosm.blue/Allegedly 96 + 97 + https://microcosm.blue 98 + 99 + 100 + [1] https://web.plc.directory 101 + [2] https://github.com/did-method-plc/did-method-plc 102 + "#, 103 + logo("mirror (fjall)") 104 + ) 105 + } 106 + 107 + #[handler] 108 + async fn fjall_health(Data(FjallState { sync_info, .. }): Data<&FjallState>) -> impl IntoResponse { 109 + let mut overall_status = StatusCode::OK; 110 + 111 + let (ok, upstream_status) = sync_info 112 + .upstream_status 113 + .get() 114 + .await 115 + .expect("plc_status infallible"); 116 + if !ok { 117 + overall_status = StatusCode::BAD_GATEWAY; 118 + } 119 + let latest = sync_info.latest_at.get().await.ok(); 120 + 121 + ( 122 + overall_status, 123 + Json(serde_json::json!({ 124 + "server": "allegedly (mirror/fjall)", 125 + "version": env!("CARGO_PKG_VERSION"), 126 + "upstream_plc": upstream_status, 127 + "latest_at": latest, 128 + })), 129 + ) 130 + } 131 + 132 + #[handler] 133 + async fn fjall_resolve(req: &Request, Data(state): Data<&FjallState>) -> Result<Response> { 134 + let path = req.uri().path(); 135 + let did_and_rest = path.strip_prefix("/").unwrap_or(path); 136 + 137 + let (did, sub_path) = match did_and_rest.find('/') { 138 + Some(i) => (&did_and_rest[..i], &did_and_rest[i..]), 139 + None => (did_and_rest, ""), 140 + }; 141 + 142 + if !did.starts_with("did:plc:") { 143 + return Err(Error::from_string("invalid DID", StatusCode::BAD_REQUEST)); 144 + } 145 + 146 + let did = did.to_string(); 147 + let db = state.fjall.clone(); 148 + let ops = tokio::task::spawn_blocking(move || db.ops_for_did(&did)) 149 + .await 150 + .map_err(|e| Error::from_string(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR))? 151 + .map_err(|e| Error::from_string(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?; 152 + 153 + if ops.is_empty() { 154 + return Err(Error::from_string( 155 + format!( 156 + "DID not registered: {}", 157 + did_and_rest.split('/').next().unwrap_or(did_and_rest) 158 + ), 159 + StatusCode::NOT_FOUND, 160 + )); 161 + } 162 + 163 + let did_str = &ops[0].did; 164 + 165 + match sub_path { 166 + "" => { 167 + let op_values: Vec<serde_json::Value> = ops 168 + .iter() 169 + .filter(|op| !op.nullified) 170 + .filter_map(|op| serde_json::from_str(op.operation.get()).ok()) 171 + .collect(); 172 + 173 + let last_op = op_values.last(); 174 + let data = last_op.and_then(|op| doc::op_to_doc_data(did_str, op)); 175 + let Some(data) = data else { 176 + return Err(Error::from_string( 177 + format!("DID not available: {did_str}"), 178 + StatusCode::NOT_FOUND, 179 + )); 180 + }; 181 + let doc = doc::format_did_doc(&data); 182 + Ok(Response::builder() 183 + .content_type("application/did+ld+json") 184 + .body(serde_json::to_string(&doc).unwrap())) 185 + } 186 + "/log" => { 187 + let log: Vec<serde_json::Value> = ops 188 + .iter() 189 + .filter(|op| !op.nullified) 190 + .filter_map(|op| serde_json::from_str(op.operation.get()).ok()) 191 + .collect(); 192 + Ok(Response::builder() 193 + .content_type("application/json") 194 + .body(serde_json::to_string(&log).unwrap())) 195 + } 196 + "/log/audit" => { 197 + let audit: Vec<serde_json::Value> = ops 198 + .iter() 199 + .map(|op| { 200 + serde_json::json!({ 201 + "did": op.did, 202 + "operation": serde_json::from_str::<serde_json::Value>(op.operation.get()).unwrap_or_default(), 203 + "cid": op.cid, 204 + "nullified": op.nullified, 205 + "createdAt": op.created_at.to_rfc3339(), 206 + }) 207 + }) 208 + .collect(); 209 + Ok(Response::builder() 210 + .content_type("application/json") 211 + .body(serde_json::to_string(&audit).unwrap())) 212 + } 213 + "/log/last" => { 214 + let last = 215 + ops.iter().filter(|op| !op.nullified).last().and_then(|op| { 216 + serde_json::from_str::<serde_json::Value>(op.operation.get()).ok() 217 + }); 218 + let Some(last) = last else { 219 + return Err(Error::from_string( 220 + format!("DID not available: {did_str}"), 221 + StatusCode::NOT_FOUND, 222 + )); 223 + }; 224 + Ok(Response::builder() 225 + .content_type("application/json") 226 + .body(serde_json::to_string(&last).unwrap())) 227 + } 228 + "/data" => { 229 + let op_values: Vec<serde_json::Value> = ops 230 + .iter() 231 + .filter(|op| !op.nullified) 232 + .filter_map(|op| serde_json::from_str(op.operation.get()).ok()) 233 + .collect(); 234 + 235 + let last_op = op_values.last(); 236 + let data = last_op.and_then(|op| doc::op_to_doc_data(did_str, op)); 237 + let Some(data) = data else { 238 + return Err(Error::from_string( 239 + format!("DID not available: {did_str}"), 240 + StatusCode::NOT_FOUND, 241 + )); 242 + }; 243 + Ok(Response::builder() 244 + .content_type("application/json") 245 + .body(serde_json::to_string(&data).unwrap())) 246 + } 247 + _ => Err(Error::from_string("not found", StatusCode::NOT_FOUND)), 248 + } 249 + } 250 + 251 + #[derive(Deserialize)] 252 + struct ExportQuery { 253 + after: Option<String>, 254 + #[allow(dead_code)] // we just cap at 1000 for now, matching reference impl 255 + count: Option<usize>, 256 + } 257 + 258 + #[handler] 259 + async fn fjall_export( 260 + _req: &Request, 261 + Query(query): Query<ExportQuery>, 262 + Data(FjallState { fjall, .. }): Data<&FjallState>, 263 + ) -> Result<Body> { 264 + let after = if let Some(a) = query.after { 265 + Some( 266 + chrono::DateTime::parse_from_rfc3339(&a) 267 + .map_err(|e| Error::from_string(e.to_string(), StatusCode::BAD_REQUEST))? 268 + .with_timezone(&chrono::Utc), 269 + ) 270 + } else { 271 + None 272 + }; 273 + 274 + let limit = 1000; 275 + let db = fjall.clone(); 276 + 277 + let ops = tokio::task::spawn_blocking(move || db.export_ops(after, limit)) 278 + .await 279 + .map_err(|e| Error::from_string(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR))? 280 + .map_err(|e| Error::from_string(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?; 281 + 282 + let stream = futures::stream::iter(ops).map(|op| { 283 + let mut json = serde_json::to_string(&op).unwrap(); 284 + json.push('\n'); 285 + Ok::<_, std::io::Error>(json) 286 + }); 287 + 288 + Ok(Body::from_bytes_stream(stream)) 289 + } 290 + 291 + #[handler] 292 + async fn fjall_nope(Data(FjallState { upstream, .. }): Data<&FjallState>) -> (StatusCode, String) { 293 + ( 294 + StatusCode::BAD_REQUEST, 295 + format!( 296 + r#"{} 297 + 298 + Sorry, this server does not accept POST requests. 299 + 300 + You may wish to try sending that to our upstream: {upstream}. 301 + 302 + If you operate this server, try running with `--experimental-write-upstream`. 303 + "#, 304 + logo("mirror (nope)") 305 + ), 306 + ) 307 + } 308 + 309 + pub async fn serve_fjall( 310 + upstream: Url, 311 + listen: ListenConf, 312 + experimental: ExperimentalConf, 313 + fjall: FjallDb, 314 + ) -> anyhow::Result<&'static str> { 315 + log::info!("starting fjall mirror server..."); 316 + 317 + let client = Client::builder() 318 + .user_agent(UA) 319 + .timeout(Duration::from_secs(10)) 320 + .build() 321 + .expect("reqwest client to build"); 322 + 323 + let sync_info = FjallSyncInfo { 324 + latest_at: CachedValue::new(GetFjallLatestAt(fjall.clone()), Duration::from_secs(2)), 325 + upstream_status: CachedValue::new( 326 + CheckUpstream(upstream.clone(), client.clone()), 327 + Duration::from_secs(6), 328 + ), 329 + }; 330 + 331 + let state = FjallState { 332 + client, 333 + upstream, 334 + fjall, 335 + sync_info, 336 + experimental: experimental.clone(), 337 + }; 338 + 339 + let mut app = Route::new() 340 + .at("/", get(fjall_hello)) 341 + .at("/favicon.ico", get(favicon)) 342 + .at("/_health", get(fjall_health)) 343 + .at("/export", get(fjall_export)); 344 + 345 + if experimental.write_upstream { 346 + log::info!("enabling experimental write forwarding to upstream"); 347 + 348 + let ip_limiter = IpLimiters::new(Quota::per_hour(10.try_into().unwrap())); 349 + let did_limiter = CreatePlcOpLimiter::new(Quota::per_hour(4.try_into().unwrap())); 350 + 351 + let upstream_proxier = fjall_forward_create_op_upstream 352 + .with(GovernorMiddleware::new(did_limiter)) 353 + .with(GovernorMiddleware::new(ip_limiter)); 354 + 355 + app = app.at("/did:plc:*", get(fjall_resolve).post(upstream_proxier)); 356 + } else { 357 + app = app.at("/did:plc:*", get(fjall_resolve).post(fjall_nope)); 358 + } 359 + 360 + let app = app 361 + .with(AddData::new(state)) 362 + .with(Cors::new().allow_credentials(false)) 363 + .with(Compression::new()) 364 + .with(GovernorMiddleware::new(IpLimiters::new(Quota::per_minute( 365 + 3000.try_into().expect("ratelimit middleware to build"), 366 + )))) 367 + .with(CatchPanic::new()) 368 + .with(Tracing); 369 + 370 + bind_or_acme(app, listen).await 371 + } 372 + 373 + #[handler] 374 + async fn fjall_forward_create_op_upstream( 375 + Data(FjallState { 376 + upstream, 377 + client, 378 + experimental, 379 + .. 380 + }): Data<&FjallState>, 381 + Path(did): Path<String>, 382 + req: &Request, 383 + body: Body, 384 + ) -> Result<Response> { 385 + if let Some(expected_domain) = &experimental.acme_domain { 386 + let Some(found_host) = req.uri().host() else { 387 + return Ok(bad_create_op(&format!( 388 + "missing `Host` header, expected {expected_domain:?} for experimental requests." 389 + ))); 390 + }; 391 + if found_host != expected_domain { 392 + return Ok(bad_create_op(&format!( 393 + "experimental requests must be made to {expected_domain:?}, but this request's `Host` header was {found_host}" 394 + ))); 395 + } 396 + } 397 + 398 + let mut headers: reqwest::header::HeaderMap = req.headers().clone(); 399 + log::trace!("original request headers: {headers:?}"); 400 + headers.insert("Host", upstream.host_str().unwrap().parse().unwrap()); 401 + let client_ua = headers 402 + .get(USER_AGENT) 403 + .map(|h| h.to_str().unwrap()) 404 + .unwrap_or("unknown"); 405 + headers.insert( 406 + USER_AGENT, 407 + format!("{UA} (forwarding from {client_ua:?})") 408 + .parse() 409 + .unwrap(), 410 + ); 411 + log::trace!("adjusted request headers: {headers:?}"); 412 + 413 + let mut target = upstream.clone(); 414 + target.set_path(&did); 415 + let upstream_res = client 416 + .post(target) 417 + .timeout(Duration::from_secs(15)) 418 + .headers(headers) 419 + .body(reqwest::Body::wrap_stream(body.into_bytes_stream())) 420 + .send() 421 + .await 422 + .map_err(|e| { 423 + log::warn!("upstream write fail: {e}"); 424 + Error::from_string( 425 + failed_to_reach_named("upstream PLC"), 426 + StatusCode::BAD_GATEWAY, 427 + ) 428 + })?; 429 + 430 + Ok(proxy_response(upstream_res)) 431 + }
+201
src/mirror/mod.rs
··· 1 + use crate::{ 2 + CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, FjallDb, GovernorMiddleware, IpLimiters, UA, 3 + doc, logo, 4 + }; 5 + use futures::TryStreamExt; 6 + use governor::Quota; 7 + use poem::{ 8 + Body, Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, 9 + get, handler, 10 + http::{StatusCode, header::USER_AGENT}, 11 + listener::{Listener, TcpListener, acme::AutoCert}, 12 + middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 13 + web::{Data, Json, Path}, 14 + }; 15 + use reqwest::{Client, Url}; 16 + use std::{net::SocketAddr, path::PathBuf, time::Duration}; 17 + 18 + pub mod fjall; 19 + pub mod pg; 20 + 21 + pub use fjall::serve_fjall; 22 + pub use pg::serve; 23 + 24 + #[derive(Debug)] 25 + pub enum ListenConf { 26 + Acme { 27 + domains: Vec<String>, 28 + cache_path: PathBuf, 29 + directory_url: String, 30 + ipv6: bool, 31 + }, 32 + Bind(SocketAddr), 33 + } 34 + 35 + #[derive(Debug, Clone)] 36 + pub struct ExperimentalConf { 37 + pub acme_domain: Option<String>, 38 + pub write_upstream: bool, 39 + } 40 + 41 + #[handler] 42 + pub fn favicon() -> impl IntoResponse { 43 + include_bytes!("../../favicon.ico").with_content_type("image/x-icon") 44 + } 45 + 46 + pub fn failed_to_reach_named(name: &str) -> String { 47 + format!( 48 + r#"{} 49 + 50 + Failed to reach the {name} server. Sorry. 51 + "#, 52 + logo("mirror 502 :( ") 53 + ) 54 + } 55 + 56 + pub fn bad_create_op(reason: &str) -> Response { 57 + Response::builder() 58 + .status(StatusCode::BAD_REQUEST) 59 + .body(format!( 60 + r#"{} 61 + 62 + NooOOOooooo: {reason} 63 + "#, 64 + logo("mirror 400 >:( ") 65 + )) 66 + } 67 + 68 + pub type PlcStatus = (bool, serde_json::Value); 69 + 70 + pub async fn plc_status(url: &Url, client: &Client) -> PlcStatus { 71 + use serde_json::json; 72 + 73 + let mut url = url.clone(); 74 + url.set_path("/_health"); 75 + 76 + let Ok(response) = client.get(url).timeout(Duration::from_secs(3)).send().await else { 77 + return (false, json!({"error": "cannot reach plc server"})); 78 + }; 79 + 80 + let status = response.status(); 81 + 82 + let Ok(text) = response.text().await else { 83 + return (false, json!({"error": "failed to read response body"})); 84 + }; 85 + 86 + let body = match serde_json::from_str(&text) { 87 + Ok(json) => json, 88 + Err(_) => serde_json::Value::String(text.to_string()), 89 + }; 90 + 91 + if status.is_success() { 92 + (true, body) 93 + } else { 94 + ( 95 + false, 96 + json!({ 97 + "error": "non-ok status", 98 + "status": status.as_str(), 99 + "status_code": status.as_u16(), 100 + "response": body, 101 + }), 102 + ) 103 + } 104 + } 105 + 106 + pub fn proxy_response(res: reqwest::Response) -> Response { 107 + let http_res: poem::http::Response<reqwest::Body> = res.into(); 108 + let (parts, reqw_body) = http_res.into_parts(); 109 + 110 + let parts = poem::ResponseParts { 111 + status: parts.status, 112 + version: parts.version, 113 + headers: parts.headers, 114 + extensions: parts.extensions, 115 + }; 116 + 117 + let body = http_body_util::BodyDataStream::new(reqw_body) 118 + .map_err(|e| std::io::Error::other(Box::new(e))); 119 + 120 + Response::from_parts(parts, poem::Body::from_bytes_stream(body)) 121 + } 122 + 123 + async fn run<A, L>(app: A, listener: L) -> std::io::Result<()> 124 + where 125 + A: Endpoint + 'static, 126 + L: Listener + 'static, 127 + { 128 + Server::new(listener) 129 + .name("allegedly (mirror)") 130 + .run(app) 131 + .await 132 + } 133 + 134 + /// kick off a tiny little server on a tokio task to tell people to use 443 135 + async fn run_insecure_notice(ipv6: bool) -> Result<(), std::io::Error> { 136 + #[handler] 137 + fn oop_plz_be_secure() -> (StatusCode, String) { 138 + ( 139 + StatusCode::BAD_REQUEST, 140 + format!( 141 + r#"{} 142 + 143 + You probably want to change your request to use HTTPS instead of HTTP. 144 + "#, 145 + logo("mirror (tls on 443 please)") 146 + ), 147 + ) 148 + } 149 + 150 + let app = Route::new() 151 + .at("/favicon.ico", get(favicon)) 152 + .nest("/", get(oop_plz_be_secure)) 153 + .with(Tracing); 154 + Server::new(TcpListener::bind(if ipv6 { 155 + "[::]:80" 156 + } else { 157 + "0.0.0.0:80" 158 + })) 159 + .name("allegedly (mirror:80 helper)") 160 + .run(app) 161 + .await 162 + } 163 + 164 + pub async fn bind_or_acme<A>(app: A, listen: ListenConf) -> anyhow::Result<&'static str> 165 + where 166 + A: Endpoint + 'static, 167 + { 168 + match listen { 169 + ListenConf::Acme { 170 + domains, 171 + cache_path, 172 + directory_url, 173 + ipv6, 174 + } => { 175 + rustls::crypto::aws_lc_rs::default_provider() 176 + .install_default() 177 + .expect("crypto provider to be installable"); 178 + 179 + let mut auto_cert = AutoCert::builder() 180 + .directory_url(directory_url) 181 + .cache_path(cache_path); 182 + for domain in domains { 183 + auto_cert = auto_cert.domain(domain); 184 + } 185 + let auto_cert = auto_cert.build().expect("acme config to build"); 186 + 187 + log::trace!("auto_cert: {auto_cert:?}"); 188 + 189 + let notice_task = tokio::task::spawn(run_insecure_notice(ipv6)); 190 + let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" }); 191 + let app_res = run(app, listener.acme(auto_cert)).await; 192 + log::warn!("server task ended, aborting insecure server task..."); 193 + notice_task.abort(); 194 + app_res?; 195 + notice_task.await??; 196 + } 197 + ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?, 198 + } 199 + 200 + Ok("server (uh oh?)") 201 + }
+309
src/plc_fjall.rs
··· 1 + use crate::{Dt, ExportPage, Op, PageBoundaryState}; 2 + use data_encoding::BASE32_NOPAD; 3 + use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode}; 4 + use serde::{Deserialize, Serialize}; 5 + use std::path::Path; 6 + use std::sync::Arc; 7 + use std::time::Instant; 8 + use tokio::sync::{mpsc, oneshot}; 9 + 10 + const SEP: u8 = 0; 11 + 12 + type IpldCid = cid::CidGeneric<64>; 13 + 14 + // 24 bytes -> 15 bytes 15 + fn encode_did(buf: &mut Vec<u8>, did: &str) -> anyhow::Result<usize> { 16 + let input = did.trim_start_matches("did:plc:").to_uppercase(); 17 + let len = BASE32_NOPAD 18 + .decode_len(input.len()) 19 + .map_err(|_| anyhow::anyhow!("failed to calculate decode len for {did}"))?; 20 + 21 + let start = buf.len(); 22 + buf.resize(start + len, 0); 23 + 24 + BASE32_NOPAD 25 + .decode_mut(input.as_bytes(), &mut buf[start..]) 26 + .map_err(|_| anyhow::anyhow!("failed to encode did {did}")) 27 + } 28 + 29 + // 59 bytes -> 36 bytes 30 + fn encode_cid(buf: &mut Vec<u8>, s: &str) -> anyhow::Result<usize> { 31 + IpldCid::try_from(s)? 32 + .write_bytes(buf) 33 + .map_err(|e| anyhow::anyhow!("failed to encode cid {s}: {e}")) 34 + } 35 + 36 + fn decode_cid(bytes: &[u8]) -> anyhow::Result<String> { 37 + IpldCid::try_from(bytes) 38 + .map_err(|e| anyhow::anyhow!("failed to decode cid: {e}")) 39 + .map(|cid| cid.to_string()) 40 + } 41 + 42 + fn op_key(created_at: &Dt, cid: &str) -> anyhow::Result<Vec<u8>> { 43 + let micros = created_at.timestamp_micros() as u64; 44 + let mut key = Vec::with_capacity(8 + 1 + cid.len()); 45 + key.extend_from_slice(&micros.to_be_bytes()); 46 + key.push(SEP); 47 + encode_cid(&mut key, cid)?; 48 + Ok(key) 49 + } 50 + 51 + fn by_did_prefix(did: &str) -> anyhow::Result<Vec<u8>> { 52 + let mut p = Vec::with_capacity(BASE32_NOPAD.decode_len(did.len())? + 1); 53 + encode_did(&mut p, did)?; 54 + p.push(SEP); 55 + Ok(p) 56 + } 57 + 58 + fn by_did_key(did: &str, cid: &str) -> anyhow::Result<Vec<u8>> { 59 + let mut key = by_did_prefix(did)?; 60 + encode_cid(&mut key, cid)?; 61 + Ok(key) 62 + } 63 + 64 + fn decode_timestamp(key: &[u8]) -> anyhow::Result<Dt> { 65 + let micros = u64::from_be_bytes( 66 + key.try_into() 67 + .map_err(|e| anyhow::anyhow!("invalid timestamp key {key:?}: {e}"))?, 68 + ); 69 + Dt::from_timestamp_micros(micros as i64) 70 + .ok_or_else(|| anyhow::anyhow!("invalid timestamp {micros}")) 71 + } 72 + 73 + // this is basically Op, but without the cid and created_at fields 74 + // since we have them in the key already 75 + #[derive(Debug, Deserialize, Serialize)] 76 + #[serde(rename_all = "camelCase")] 77 + struct DbOp { 78 + pub did: String, 79 + pub nullified: bool, 80 + pub operation: Box<serde_json::value::RawValue>, 81 + } 82 + 83 + #[derive(Clone)] 84 + pub struct FjallDb { 85 + inner: Arc<FjallInner>, 86 + } 87 + 88 + struct FjallInner { 89 + db: Database, 90 + ops: Keyspace, 91 + by_did: Keyspace, 92 + } 93 + 94 + impl FjallDb { 95 + pub fn open(path: impl AsRef<Path>) -> fjall::Result<Self> { 96 + let db = Database::builder(path).open()?; 97 + let ops = db.keyspace("ops", KeyspaceCreateOptions::default)?; 98 + let by_did = db.keyspace("by_did", KeyspaceCreateOptions::default)?; 99 + Ok(Self { 100 + inner: Arc::new(FjallInner { db, ops, by_did }), 101 + }) 102 + } 103 + 104 + pub fn clear(&self) -> fjall::Result<()> { 105 + self.inner.ops.clear()?; 106 + self.inner.by_did.clear()?; 107 + Ok(()) 108 + } 109 + 110 + pub fn persist(&self) -> fjall::Result<()> { 111 + self.inner.db.persist(PersistMode::SyncAll) 112 + } 113 + 114 + pub fn get_latest(&self) -> anyhow::Result<Option<Dt>> { 115 + let Some(guard) = self.inner.ops.last_key_value() else { 116 + return Ok(None); 117 + }; 118 + let key = guard 119 + .key() 120 + .map_err(|e| anyhow::anyhow!("fjall key error: {e}"))?; 121 + 122 + key.get(..8) 123 + .ok_or_else(|| anyhow::anyhow!("invalid timestamp key {key:?}")) 124 + .map(decode_timestamp) 125 + .flatten() 126 + .map(Some) 127 + } 128 + 129 + pub fn insert_op(&self, batch: &mut OwnedWriteBatch, op: &Op) -> anyhow::Result<usize> { 130 + let pk = by_did_key(&op.did, &op.cid)?; 131 + if self.inner.by_did.get(&pk)?.is_some() { 132 + return Ok(0); 133 + } 134 + let ts_key = op_key(&op.created_at, &op.cid)?; 135 + let value = serde_json::to_vec(op)?; 136 + batch.insert(&self.inner.ops, &ts_key, &value); 137 + batch.insert( 138 + &self.inner.by_did, 139 + &pk, 140 + &op.created_at.timestamp_micros().to_be_bytes(), 141 + ); 142 + Ok(1) 143 + } 144 + 145 + pub fn ops_for_did(&self, did: &str) -> anyhow::Result<Vec<Op>> { 146 + let prefix = by_did_prefix(did)?; 147 + 148 + let mut entries: Vec<(Dt, Vec<u8>)> = Vec::new(); 149 + for guard in self.inner.by_did.prefix(&prefix) { 150 + let (by_did_key, ts_bytes) = guard 151 + .into_inner() 152 + .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?; 153 + // construct op key from timestamp and cid 154 + let cid_bytes = by_did_key 155 + .get(prefix.len()..) 156 + .ok_or_else(|| anyhow::anyhow!("invalid by_did key {by_did_key:?}"))?; 157 + let op_key = [ts_bytes.as_ref(), &[SEP], cid_bytes].concat(); 158 + entries.push((decode_timestamp(&ts_bytes)?, op_key)); 159 + } 160 + 161 + entries.sort_by_key(|(ts, _)| *ts); 162 + 163 + let mut ops = Vec::with_capacity(entries.len()); 164 + for (ts, op_key) in entries { 165 + let Some(value) = self.inner.ops.get(&op_key)? else { 166 + anyhow::bail!("op not found in db: {op_key:?}"); 167 + }; 168 + let op: DbOp = serde_json::from_slice(&value)?; 169 + let cid = decode_cid( 170 + op_key 171 + .get(9..) 172 + .ok_or_else(|| anyhow::anyhow!("invalid op key {op_key:?}"))?, 173 + )?; 174 + let op = Op { 175 + did: op.did, 176 + cid, 177 + created_at: ts, 178 + nullified: op.nullified, 179 + operation: op.operation, 180 + }; 181 + ops.push(op); 182 + } 183 + Ok(ops) 184 + } 185 + 186 + pub fn export_ops(&self, after: Option<Dt>, limit: usize) -> anyhow::Result<Vec<Op>> { 187 + let iter = if let Some(after) = after { 188 + let next_micros = (after.timestamp_micros() as u64) + 1; 189 + let start = next_micros.to_be_bytes(); 190 + self.inner.ops.range(start..) 191 + } else { 192 + self.inner.ops.iter() 193 + }; 194 + 195 + let mut ops = Vec::with_capacity(limit); 196 + for item in iter.take(limit) { 197 + let (key, value) = item 198 + .into_inner() 199 + .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?; 200 + let op: DbOp = serde_json::from_slice(&value)?; 201 + let created_at = decode_timestamp( 202 + key.get(..8) 203 + .ok_or_else(|| anyhow::anyhow!("invalid op key {key:?}"))?, 204 + )?; 205 + let cid = decode_cid( 206 + key.get(9..) 207 + .ok_or_else(|| anyhow::anyhow!("invalid op key {key:?}"))?, 208 + )?; 209 + let op = Op { 210 + did: op.did, 211 + cid, 212 + created_at, 213 + nullified: op.nullified, 214 + operation: op.operation, 215 + }; 216 + ops.push(op); 217 + } 218 + Ok(ops) 219 + } 220 + } 221 + 222 + pub async fn backfill_to_fjall( 223 + db: FjallDb, 224 + reset: bool, 225 + mut pages: mpsc::Receiver<ExportPage>, 226 + notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 227 + ) -> anyhow::Result<&'static str> { 228 + let t0 = Instant::now(); 229 + 230 + if reset { 231 + let db = db.clone(); 232 + tokio::task::spawn_blocking(move || db.clear()).await??; 233 + log::warn!("fjall reset: cleared all data"); 234 + } 235 + 236 + let mut last_at = None; 237 + let mut ops_inserted: usize = 0; 238 + 239 + while let Some(page) = pages.recv().await { 240 + let should_track = notify_last_at.is_some(); 241 + if should_track { 242 + if let Some(s) = PageBoundaryState::new(&page) { 243 + last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 244 + } 245 + } 246 + 247 + let db = db.clone(); 248 + let count = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> { 249 + let mut batch = db.inner.db.batch(); 250 + let mut count: usize = 0; 251 + for op in &page.ops { 252 + count += db.insert_op(&mut batch, op)?; 253 + } 254 + batch.commit()?; 255 + Ok(count) 256 + }) 257 + .await??; 258 + ops_inserted += count; 259 + } 260 + log::debug!("finished receiving bulk pages"); 261 + 262 + if let Some(notify) = notify_last_at { 263 + log::trace!("notifying last_at: {last_at:?}"); 264 + if notify.send(last_at).is_err() { 265 + log::error!("receiver for last_at dropped, can't notify"); 266 + }; 267 + } 268 + 269 + let db = db.clone(); 270 + tokio::task::spawn_blocking(move || db.persist()).await??; 271 + 272 + log::info!( 273 + "backfill_to_fjall: inserted {ops_inserted} ops in {:?}", 274 + t0.elapsed() 275 + ); 276 + Ok("backfill_to_fjall") 277 + } 278 + 279 + pub async fn pages_to_fjall( 280 + db: FjallDb, 281 + mut pages: mpsc::Receiver<ExportPage>, 282 + ) -> anyhow::Result<&'static str> { 283 + log::info!("starting pages_to_fjall writer..."); 284 + 285 + let t0 = Instant::now(); 286 + let mut ops_inserted: usize = 0; 287 + 288 + while let Some(page) = pages.recv().await { 289 + log::trace!("writing page with {} ops", page.ops.len()); 290 + let db = db.clone(); 291 + let count = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> { 292 + let mut batch = db.inner.db.batch(); 293 + let mut count: usize = 0; 294 + for op in &page.ops { 295 + count += db.insert_op(&mut batch, op)?; 296 + } 297 + batch.commit()?; 298 + Ok(count) 299 + }) 300 + .await??; 301 + ops_inserted += count; 302 + } 303 + 304 + log::info!( 305 + "no more pages. inserted {ops_inserted} ops in {:?}", 306 + t0.elapsed() 307 + ); 308 + Ok("pages_to_fjall") 309 + }
+56
tests/fixtures/log_bskyapp.json
··· 1 + [ 2 + { 3 + "did": "did:plc:z72i7hdynmk6r22z27h6tvur", 4 + "operation": { 5 + "sig": "9NuYV7AqwHVTc0YuWzNV3CJafsSZWH7qCxHRUIP2xWlB-YexXC1OaYAnUayiCXLVzRQ8WBXIqF-SvZdNalwcjA", 6 + "prev": null, 7 + "type": "plc_operation", 8 + "services": { 9 + "atproto_pds": { 10 + "type": "AtprotoPersonalDataServer", 11 + "endpoint": "https://bsky.social" 12 + } 13 + }, 14 + "alsoKnownAs": [ 15 + "at://bluesky-team.bsky.social" 16 + ], 17 + "rotationKeys": [ 18 + "did:key:zQ3shhCGUqDKjStzuDxPkTxN6ujddP4RkEKJJouJGRRkaLGbg", 19 + "did:key:zQ3shpKnbdPx3g3CmPf5cRVTPe1HtSwVn5ish3wSnDPQCbLJK" 20 + ], 21 + "verificationMethods": { 22 + "atproto": "did:key:zQ3shXjHeiBuRCKmM36cuYnm7YEMzhGnCmCyW92sRJ9pribSF" 23 + } 24 + }, 25 + "cid": "bafyreigp6shzy6dlcxuowwoxz7u5nemdrkad2my5zwzpwilcnhih7bw6zm", 26 + "nullified": false, 27 + "createdAt": "2023-04-12T04:53:57.057Z" 28 + }, 29 + { 30 + "did": "did:plc:z72i7hdynmk6r22z27h6tvur", 31 + "operation": { 32 + "sig": "1mEWzRtFOgeRXH-YCSPTxb990JOXxa__n8Qw6BOKl7Ndm6OFFmwYKiiMqMCpAbxpnGjF5abfIsKc7u3a77Cbnw", 33 + "prev": "bafyreigp6shzy6dlcxuowwoxz7u5nemdrkad2my5zwzpwilcnhih7bw6zm", 34 + "type": "plc_operation", 35 + "services": { 36 + "atproto_pds": { 37 + "type": "AtprotoPersonalDataServer", 38 + "endpoint": "https://bsky.social" 39 + } 40 + }, 41 + "alsoKnownAs": [ 42 + "at://bsky.app" 43 + ], 44 + "rotationKeys": [ 45 + "did:key:zQ3shhCGUqDKjStzuDxPkTxN6ujddP4RkEKJJouJGRRkaLGbg", 46 + "did:key:zQ3shpKnbdPx3g3CmPf5cRVTPe1HtSwVn5ish3wSnDPQCbLJK" 47 + ], 48 + "verificationMethods": { 49 + "atproto": "did:key:zQ3shXjHeiBuRCKmM36cuYnm7YEMzhGnCmCyW92sRJ9pribSF" 50 + } 51 + }, 52 + "cid": "bafyreihmuvr3frdvd6vmdhucih277prdcfcezf67lasg5oekxoimnunjoq", 53 + "nullified": false, 54 + "createdAt": "2023-04-12T17:26:46.468Z" 55 + } 56 + ]
+125
tests/fixtures/log_legacy_dholms.json
··· 1 + [ 2 + { 3 + "did": "did:plc:yk4dd2qkboz2yv6tpubpc6co", 4 + "operation": { 5 + "sig": "7QTzqO1BcL3eDzP4P_YBxMmv5U4brHzAItkM9w5o8gZA7ElZkrVYEwsfQCfk5EoWLk58Z1y6fyNP9x1pthJnlw", 6 + "prev": null, 7 + "type": "create", 8 + "handle": "dan.bsky.social", 9 + "service": "https://bsky.social", 10 + "signingKey": "did:key:zQ3shP5TBe1sQfSttXty15FAEHV1DZgcxRZNxvEWnPfLFwLxJ", 11 + "recoveryKey": "did:key:zQ3shhCGUqDKjStzuDxPkTxN6ujddP4RkEKJJouJGRRkaLGbg" 12 + }, 13 + "cid": "bafyreigcxay6ucqlwowfpu35alyxqtv3c4vsj7gmdtmnidsnqs6nblyarq", 14 + "nullified": false, 15 + "createdAt": "2022-11-17T01:07:13.996Z" 16 + }, 17 + { 18 + "did": "did:plc:yk4dd2qkboz2yv6tpubpc6co", 19 + "operation": { 20 + "sig": "n-VWsPZY4xkFN8wlg-kJBU_yzWTNd2oBnbjkjxXu3HdjbBLaEB7K39JHIPn_DZVALKRjts6bUicjSEecZy8eIw", 21 + "prev": "bafyreigcxay6ucqlwowfpu35alyxqtv3c4vsj7gmdtmnidsnqs6nblyarq", 22 + "type": "plc_operation", 23 + "services": { 24 + "atproto_pds": { 25 + "type": "AtprotoPersonalDataServer", 26 + "endpoint": "https://bsky.social" 27 + } 28 + }, 29 + "alsoKnownAs": [ 30 + "at://dholms.xyz" 31 + ], 32 + "rotationKeys": [ 33 + "did:key:zQ3shhCGUqDKjStzuDxPkTxN6ujddP4RkEKJJouJGRRkaLGbg", 34 + "did:key:zQ3shP5TBe1sQfSttXty15FAEHV1DZgcxRZNxvEWnPfLFwLxJ" 35 + ], 36 + "verificationMethods": { 37 + "atproto": "did:key:zQ3shP5TBe1sQfSttXty15FAEHV1DZgcxRZNxvEWnPfLFwLxJ" 38 + } 39 + }, 40 + "cid": "bafyreiho5sanautvnw3det66jcwic4vkeabc35y7iou3ygwj2l3xqcxdau", 41 + "nullified": false, 42 + "createdAt": "2023-03-06T18:47:09.501Z" 43 + }, 44 + { 45 + "did": "did:plc:yk4dd2qkboz2yv6tpubpc6co", 46 + "operation": { 47 + "sig": "HWgrfQXxUN3mhR5TR-nrwGJwVr9RDbyDn6eCmqBg32x2zIjhe98YxOtFOLI9jQkBlTTzqzUOwJh1KZd4O2pDOw", 48 + "prev": "bafyreiho5sanautvnw3det66jcwic4vkeabc35y7iou3ygwj2l3xqcxdau", 49 + "type": "plc_operation", 50 + "services": { 51 + "atproto_pds": { 52 + "type": "AtprotoPersonalDataServer", 53 + "endpoint": "https://bsky.social" 54 + } 55 + }, 56 + "alsoKnownAs": [ 57 + "at://dholms.bsky.social" 58 + ], 59 + "rotationKeys": [ 60 + "did:key:zQ3shhCGUqDKjStzuDxPkTxN6ujddP4RkEKJJouJGRRkaLGbg", 61 + "did:key:zQ3shP5TBe1sQfSttXty15FAEHV1DZgcxRZNxvEWnPfLFwLxJ" 62 + ], 63 + "verificationMethods": { 64 + "atproto": "did:key:zQ3shP5TBe1sQfSttXty15FAEHV1DZgcxRZNxvEWnPfLFwLxJ" 65 + } 66 + }, 67 + "cid": "bafyreic3am2nmgykxtwsxwigzn6faibxv5ef5kalcv7li3eatcqldcqrku", 68 + "nullified": false, 69 + "createdAt": "2023-03-06T19:50:49.987Z" 70 + }, 71 + { 72 + "did": "did:plc:yk4dd2qkboz2yv6tpubpc6co", 73 + "operation": { 74 + "sig": "9Fy2iHCSK5mtgLNCkS9CyI0r7lu6H1SVgusaD1jQdsMUySUU6apde0z7SobpYZKp4sThk4hxOWtO-bXhu1cNjg", 75 + "prev": "bafyreic3am2nmgykxtwsxwigzn6faibxv5ef5kalcv7li3eatcqldcqrku", 76 + "type": "plc_operation", 77 + "services": { 78 + "atproto_pds": { 79 + "type": "AtprotoPersonalDataServer", 80 + "endpoint": "https://bsky.social" 81 + } 82 + }, 83 + "alsoKnownAs": [ 84 + "at://dholms.xyz" 85 + ], 86 + "rotationKeys": [ 87 + "did:key:zQ3shhCGUqDKjStzuDxPkTxN6ujddP4RkEKJJouJGRRkaLGbg", 88 + "did:key:zQ3shP5TBe1sQfSttXty15FAEHV1DZgcxRZNxvEWnPfLFwLxJ" 89 + ], 90 + "verificationMethods": { 91 + "atproto": "did:key:zQ3shP5TBe1sQfSttXty15FAEHV1DZgcxRZNxvEWnPfLFwLxJ" 92 + } 93 + }, 94 + "cid": "bafyreicwybxr6h6vkxpoarismso3liozdzswshmzcvl4tyckdazn5lxjte", 95 + "nullified": false, 96 + "createdAt": "2023-03-06T19:51:09.950Z" 97 + }, 98 + { 99 + "did": "did:plc:yk4dd2qkboz2yv6tpubpc6co", 100 + "operation": { 101 + "sig": "lBXd8rHZ84hCuQysGdi_5A9C8yPHTHasPibO4DZiuZVrehs2hiBcjAL0srLSTsF1kvsHTw1ddai-QwH0Wd_drQ", 102 + "prev": "bafyreicwybxr6h6vkxpoarismso3liozdzswshmzcvl4tyckdazn5lxjte", 103 + "type": "plc_operation", 104 + "services": { 105 + "atproto_pds": { 106 + "type": "AtprotoPersonalDataServer", 107 + "endpoint": "https://bsky.social" 108 + } 109 + }, 110 + "alsoKnownAs": [ 111 + "at://dholms.xyz" 112 + ], 113 + "rotationKeys": [ 114 + "did:key:zQ3shhCGUqDKjStzuDxPkTxN6ujddP4RkEKJJouJGRRkaLGbg", 115 + "did:key:zQ3shpKnbdPx3g3CmPf5cRVTPe1HtSwVn5ish3wSnDPQCbLJK" 116 + ], 117 + "verificationMethods": { 118 + "atproto": "did:key:zQ3shXjHeiBuRCKmM36cuYnm7YEMzhGnCmCyW92sRJ9pribSF" 119 + } 120 + }, 121 + "cid": "bafyreidfrpuegbqd5r56shka4duythb7phb6d7i3bck2dkeb5fjppwd7gi", 122 + "nullified": false, 123 + "createdAt": "2023-03-09T23:18:31.709Z" 124 + } 125 + ]
+59
tests/fixtures/log_nullification.json
··· 1 + [ 2 + { 3 + "did": "did:plc:2s2mvm52ttz6r4hocmrq7x27", 4 + "operation": { 5 + "type": "plc_operation", 6 + "rotationKeys": [ 7 + "did:key:zQ3shwPdax6jKMbhtzbueGwSjc7RnjsmPcNB1vQUpbKUCN1t1", 8 + "did:key:zQ3shmMcsFLvSZQy2pSZFFKZNmQSNmyuSepPXMHpgTYrontu2" 9 + ], 10 + "verificationMethods": {}, 11 + "alsoKnownAs": [ 12 + "at://foo" 13 + ], 14 + "services": {}, 15 + "prev": null, 16 + "sig": "1pTjQD5-LA4gN6tz1t7-28Tzct9NI-oYMN8QVUQkB7493j-vKeZnZO0YqNNBCXsORatmfKUXwteC9zW82mi_yw" 17 + }, 18 + "cid": "bafyreiguwtflhou46pupb3qtemh56x6o3fn3y7ym2q3jgoms35tdu2d5ga", 19 + "nullified": false, 20 + "createdAt": "1970-01-01T00:00:00.000Z" 21 + }, 22 + { 23 + "did": "did:plc:2s2mvm52ttz6r4hocmrq7x27", 24 + "operation": { 25 + "prev": "bafyreiguwtflhou46pupb3qtemh56x6o3fn3y7ym2q3jgoms35tdu2d5ga", 26 + "type": "plc_operation", 27 + "services": {}, 28 + "alsoKnownAs": [ 29 + "at://foo" 30 + ], 31 + "rotationKeys": [], 32 + "verificationMethods": {}, 33 + "sig": "XfbRKnfzmJyRXUrajAISgdiJWNdNFMFgtDnlKvj-bIZhpfqGd39xiylJVXXu5Usw2cc2Js4EeSPh2xfhUQNxSA" 34 + }, 35 + "cid": "bafyreicbxgoelatj24iyqnevm7v4f22utxb75p756ztyu5v7cjxlwti3oa", 36 + "nullified": true, 37 + "createdAt": "1970-01-01T00:00:01.000Z" 38 + }, 39 + { 40 + "did": "did:plc:2s2mvm52ttz6r4hocmrq7x27", 41 + "operation": { 42 + "prev": "bafyreiguwtflhou46pupb3qtemh56x6o3fn3y7ym2q3jgoms35tdu2d5ga", 43 + "type": "plc_operation", 44 + "services": {}, 45 + "alsoKnownAs": [ 46 + "at://foo" 47 + ], 48 + "rotationKeys": [ 49 + "did:key:zQ3shwPdax6jKMbhtzbueGwSjc7RnjsmPcNB1vQUpbKUCN1t1", 50 + "did:key:zQ3shmMcsFLvSZQy2pSZFFKZNmQSNmyuSepPXMHpgTYrontu2" 51 + ], 52 + "verificationMethods": {}, 53 + "sig": "zCcbDgFbv5b0aGggLepLcARzNzH0sG6uuvq2GacyPsAsjDsLMZn6KmOxa50iWZEMRknQHsMYyMTS99eeTPu0fQ" 54 + }, 55 + "cid": "bafyreidef3g2vvlshwzf7vxjyxg3lraahujvhtkpo7y3g4br52ft3lzwem", 56 + "nullified": false, 57 + "createdAt": "1970-01-01T00:00:02.000Z" 58 + } 59 + ]
+33
tests/fixtures/log_tombstone.json
··· 1 + [ 2 + { 3 + "did": "did:plc:6adr3q2labdllanslzhqkqd3", 4 + "operation": { 5 + "sig": "ZznbxHinpBI3NgEYWzXUXLA65s2U1ezJooreZlscHYQRfd4EMlijqhpikGMabs-81Tsy4Mt9Iscpmk7Uz13aAg", 6 + "prev": null, 7 + "type": "plc_operation", 8 + "services": {}, 9 + "alsoKnownAs": [ 10 + "at://op0" 11 + ], 12 + "rotationKeys": [ 13 + "did:key:zQ3shmWf4f6ZwzNyjUDYw4oFQjgHWZoYZDjJYtz75YfYYrphB", 14 + "did:key:zQ3shr2PwdoF6kbuYYymyZq6YbGWShiTXAkdMioCPdSdPL9NV" 15 + ], 16 + "verificationMethods": {} 17 + }, 18 + "cid": "bafyreihqa4o4gsyai22ydms6j4cua6yr6tuc7trq2temvnycadk2daniru", 19 + "nullified": false, 20 + "createdAt": "2025-07-25T15:50:58.440Z" 21 + }, 22 + { 23 + "did": "did:plc:6adr3q2labdllanslzhqkqd3", 24 + "operation": { 25 + "sig": "iKFOLyEujXdHWujAAnwFhKfb9kkIcpMDGOV15eNWqlxLM2GtdQD3lVWCshTKWi0dF7InXGseVFDexS0kg0KmEQ", 26 + "prev": "bafyreihqa4o4gsyai22ydms6j4cua6yr6tuc7trq2temvnycadk2daniru", 27 + "type": "plc_tombstone" 28 + }, 29 + "cid": "bafyreifafe44xylxagcom47xb3lrya5pysu3b7zfjmmmitiuimhmgmerze", 30 + "nullified": false, 31 + "createdAt": "2025-07-25T15:50:58.837Z" 32 + } 33 + ]
+112
tests/fjall_mirror_test.rs
··· 1 + use allegedly::{ 2 + ExperimentalConf, FjallDb, ListenConf, backfill_to_fjall, poll_upstream, serve_fjall, 3 + }; 4 + use reqwest::Url; 5 + use std::time::Duration; 6 + use tokio::sync::mpsc; 7 + 8 + #[tokio::test] 9 + async fn test_fjall_mirror_mode() -> anyhow::Result<()> { 10 + let _ = tracing_subscriber::fmt::try_init(); 11 + 12 + // setup 13 + let temp_dir = tempfile::tempdir()?; 14 + let db_path = temp_dir.path().join("fjall.db"); 15 + let db = FjallDb::open(&db_path)?; 16 + 17 + // backfill (limited to 1 page) 18 + let (backfill_tx, backfill_rx) = mpsc::channel(1); 19 + let (upstream_tx, mut upstream_rx) = mpsc::channel(1); 20 + 21 + // spawn upstream poller 22 + let upstream_url: Url = "https://plc.directory/export".parse()?; 23 + tokio::spawn(async move { 24 + // poll fresh data so our data matches the upstream 25 + let start_at = chrono::Utc::now() - chrono::Duration::try_minutes(5).unwrap(); 26 + let _ = poll_upstream( 27 + Some(start_at), 28 + upstream_url, 29 + Duration::from_millis(100), 30 + upstream_tx, 31 + ) 32 + .await; 33 + }); 34 + 35 + // bridge: take 1 page from upstream and forward to backfill 36 + println!("waiting for page from upstream..."); 37 + let page = upstream_rx 38 + .recv() 39 + .await 40 + .expect("to receive page from upstream"); 41 + println!("received page with {} ops", page.ops.len()); 42 + let sample_did = page.ops.last().unwrap().did.clone(); 43 + 44 + backfill_tx.send(page).await?; 45 + drop(backfill_tx); // close backfill input 46 + 47 + backfill_to_fjall(db.clone(), false, backfill_rx, None).await?; 48 + 49 + // get free port 50 + let listener = std::net::TcpListener::bind("127.0.0.1:17548")?; 51 + let port = listener.local_addr()?.port(); 52 + drop(listener); 53 + 54 + let listen_conf = ListenConf::Bind(([127, 0, 0, 1], port).into()); 55 + let exp_conf = ExperimentalConf { 56 + acme_domain: None, 57 + write_upstream: false, 58 + }; 59 + 60 + let db_for_server = db.clone(); 61 + let server_handle = tokio::spawn(async move { 62 + let upstream: Url = "https://plc.directory".parse().unwrap(); 63 + serve_fjall(upstream, listen_conf, exp_conf, db_for_server).await 64 + }); 65 + 66 + // wait for server to be ready (retry loop) 67 + let client = reqwest::Client::new(); 68 + let base_url = format!("http://127.0.0.1:{}", port); 69 + let mut ready = false; 70 + for _ in 0..50 { 71 + if client 72 + .get(format!("{}/_health", base_url)) 73 + .send() 74 + .await 75 + .is_ok() 76 + { 77 + ready = true; 78 + break; 79 + } 80 + tokio::time::sleep(Duration::from_millis(100)).await; 81 + } 82 + assert!(ready, "server failed to start"); 83 + 84 + // verify health 85 + let resp = client.get(format!("{}/_health", base_url)).send().await?; 86 + assert!(resp.status().is_success()); 87 + let json: serde_json::Value = resp.json().await?; 88 + assert_eq!(json["server"], "allegedly (mirror/fjall)"); 89 + 90 + // verify did resolution against upstream 91 + let upstream_resp = client 92 + .get(format!("https://plc.directory/{}", sample_did)) 93 + .send() 94 + .await?; 95 + assert!(upstream_resp.status().is_success()); 96 + let upstream_doc: serde_json::Value = upstream_resp.json().await?; 97 + 98 + let resp = client 99 + .get(format!("{}/{}", base_url, sample_did)) 100 + .send() 101 + .await?; 102 + assert!(resp.status().is_success()); 103 + let doc: serde_json::Value = resp.json().await?; 104 + assert_eq!( 105 + doc, upstream_doc, 106 + "local doc != upstream doc.\nlocal: {:#?}\nupstream: {:#?}", 107 + doc, upstream_doc 108 + ); 109 + 110 + server_handle.abort(); 111 + Ok(()) 112 + }