this repo has no description

WIP Add sqlx, insert posts into postgres

+780 -633
+73 -117
indexer/Cargo.lock
··· 19 19 20 20 [[package]] 21 21 name = "ahash" 22 - version = "0.8.11" 22 + version = "0.8.12" 23 23 source = "registry+https://github.com/rust-lang/crates.io-index" 24 - checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" 24 + checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" 25 25 dependencies = [ 26 26 "cfg-if", 27 27 "once_cell", 28 28 "version_check", 29 - "zerocopy 0.7.35", 29 + "zerocopy", 30 30 ] 31 31 32 32 [[package]] ··· 128 128 129 129 [[package]] 130 130 name = "backtrace" 131 - version = "0.3.74" 131 + version = "0.3.75" 132 132 source = "registry+https://github.com/rust-lang/crates.io-index" 133 - checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" 133 + checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" 134 134 dependencies = [ 135 135 "addr2line", 136 136 "cfg-if", ··· 222 222 223 223 [[package]] 224 224 name = "cc" 225 - version = "1.2.21" 225 + version = "1.2.22" 226 226 source = "registry+https://github.com/rust-lang/crates.io-index" 227 - checksum = "8691782945451c1c383942c4874dbe63814f61cb57ef773cda2972682b7bb3c0" 227 + checksum = "32db95edf998450acc7881c932f94cd9b05c87b4b2599e8bab064753da4acfd1" 228 228 dependencies = [ 229 229 "jobserver", 230 230 "libc", ··· 507 507 508 508 [[package]] 509 509 name = "eyewall-indexer" 510 - version = "0.1.0" 510 + version = "0.0.1" 511 511 dependencies = [ 512 512 "anyhow", 513 513 "async-trait", ··· 668 668 669 669 [[package]] 670 670 name = "getrandom" 671 - version = "0.3.2" 671 + version = "0.3.3" 672 672 source = "registry+https://github.com/rust-lang/crates.io-index" 673 - checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" 673 + checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" 674 674 dependencies = [ 675 675 "cfg-if", 676 676 "libc", ··· 762 762 763 763 [[package]] 764 764 name = "icu_collections" 765 - version = "1.5.0" 765 + version = "2.0.0" 766 766 source = "registry+https://github.com/rust-lang/crates.io-index" 767 - checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" 767 + checksum = "200072f5d0e3614556f94a9930d5dc3e0662a652823904c3a75dc3b0af7fee47" 768 768 dependencies = [ 769 769 "displaydoc", 770 + "potential_utf", 770 771 "yoke", 771 772 "zerofrom", 772 773 "zerovec", 773 774 ] 774 775 775 776 [[package]] 776 - name = "icu_locid" 777 - version = "1.5.0" 777 + name = "icu_locale_core" 778 + version = "2.0.0" 778 779 source = "registry+https://github.com/rust-lang/crates.io-index" 779 - checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" 780 + checksum = "0cde2700ccaed3872079a65fb1a78f6c0a36c91570f28755dda67bc8f7d9f00a" 780 781 dependencies = [ 781 782 "displaydoc", 782 783 "litemap", ··· 786 787 ] 787 788 788 789 [[package]] 789 - name = "icu_locid_transform" 790 - version = "1.5.0" 791 - source = "registry+https://github.com/rust-lang/crates.io-index" 792 - checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" 793 - dependencies = [ 794 - "displaydoc", 795 - "icu_locid", 796 - "icu_locid_transform_data", 797 - "icu_provider", 798 - "tinystr", 799 - "zerovec", 800 - ] 801 - 802 - [[package]] 803 - name = "icu_locid_transform_data" 804 - version = "1.5.1" 805 - source = "registry+https://github.com/rust-lang/crates.io-index" 806 - checksum = "7515e6d781098bf9f7205ab3fc7e9709d34554ae0b21ddbcb5febfa4bc7df11d" 807 - 808 - [[package]] 809 790 name = "icu_normalizer" 810 - version = "1.5.0" 791 + version = "2.0.0" 811 792 source = "registry+https://github.com/rust-lang/crates.io-index" 812 - checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" 793 + checksum = "436880e8e18df4d7bbc06d58432329d6458cc84531f7ac5f024e93deadb37979" 813 794 dependencies = [ 814 795 "displaydoc", 815 796 "icu_collections", ··· 817 798 "icu_properties", 818 799 "icu_provider", 819 800 "smallvec", 820 - "utf16_iter", 821 - "utf8_iter", 822 - "write16", 823 801 "zerovec", 824 802 ] 825 803 826 804 [[package]] 827 805 name = "icu_normalizer_data" 828 - version = "1.5.1" 806 + version = "2.0.0" 829 807 source = "registry+https://github.com/rust-lang/crates.io-index" 830 - checksum = "c5e8338228bdc8ab83303f16b797e177953730f601a96c25d10cb3ab0daa0cb7" 808 + checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3" 831 809 832 810 [[package]] 833 811 name = "icu_properties" 834 - version = "1.5.1" 812 + version = "2.0.0" 835 813 source = "registry+https://github.com/rust-lang/crates.io-index" 836 - checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" 814 + checksum = "2549ca8c7241c82f59c80ba2a6f415d931c5b58d24fb8412caa1a1f02c49139a" 837 815 dependencies = [ 838 816 "displaydoc", 839 817 "icu_collections", 840 - "icu_locid_transform", 818 + "icu_locale_core", 841 819 "icu_properties_data", 842 820 "icu_provider", 843 - "tinystr", 821 + "potential_utf", 822 + "zerotrie", 844 823 "zerovec", 845 824 ] 846 825 847 826 [[package]] 848 827 name = "icu_properties_data" 849 - version = "1.5.1" 828 + version = "2.0.0" 850 829 source = "registry+https://github.com/rust-lang/crates.io-index" 851 - checksum = "85fb8799753b75aee8d2a21d7c14d9f38921b54b3dbda10f5a3c7a7b82dba5e2" 830 + checksum = "8197e866e47b68f8f7d95249e172903bec06004b18b2937f1095d40a0c57de04" 852 831 853 832 [[package]] 854 833 name = "icu_provider" 855 - version = "1.5.0" 834 + version = "2.0.0" 856 835 source = "registry+https://github.com/rust-lang/crates.io-index" 857 - checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" 836 + checksum = "03c80da27b5f4187909049ee2d72f276f0d9f99a42c306bd0131ecfe04d8e5af" 858 837 dependencies = [ 859 838 "displaydoc", 860 - "icu_locid", 861 - "icu_provider_macros", 839 + "icu_locale_core", 862 840 "stable_deref_trait", 863 841 "tinystr", 864 842 "writeable", 865 843 "yoke", 866 844 "zerofrom", 845 + "zerotrie", 867 846 "zerovec", 868 847 ] 869 848 870 849 [[package]] 871 - name = "icu_provider_macros" 872 - version = "1.5.0" 873 - source = "registry+https://github.com/rust-lang/crates.io-index" 874 - checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" 875 - dependencies = [ 876 - "proc-macro2", 877 - "quote", 878 - "syn", 879 - ] 880 - 881 - [[package]] 882 850 name = "ident_case" 883 851 version = "1.0.1" 884 852 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 897 865 898 866 [[package]] 899 867 name = "idna_adapter" 900 - version = "1.2.0" 868 + version = "1.2.1" 901 869 source = "registry+https://github.com/rust-lang/crates.io-index" 902 - checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" 870 + checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" 903 871 dependencies = [ 904 872 "icu_normalizer", 905 873 "icu_properties", ··· 966 934 source = "registry+https://github.com/rust-lang/crates.io-index" 967 935 checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a" 968 936 dependencies = [ 969 - "getrandom 0.3.2", 937 + "getrandom 0.3.3", 970 938 "libc", 971 939 ] 972 940 ··· 1019 987 1020 988 [[package]] 1021 989 name = "litemap" 1022 - version = "0.7.5" 990 + version = "0.8.0" 1023 991 source = "registry+https://github.com/rust-lang/crates.io-index" 1024 - checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" 992 + checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" 1025 993 1026 994 [[package]] 1027 995 name = "lock_api" ··· 1277 1245 ] 1278 1246 1279 1247 [[package]] 1248 + name = "potential_utf" 1249 + version = "0.1.2" 1250 + source = "registry+https://github.com/rust-lang/crates.io-index" 1251 + checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" 1252 + dependencies = [ 1253 + "zerovec", 1254 + ] 1255 + 1256 + [[package]] 1280 1257 name = "ppv-lite86" 1281 1258 version = "0.2.21" 1282 1259 source = "registry+https://github.com/rust-lang/crates.io-index" 1283 1260 checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" 1284 1261 dependencies = [ 1285 - "zerocopy 0.8.25", 1262 + "zerocopy", 1286 1263 ] 1287 1264 1288 1265 [[package]] ··· 1997 1974 checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" 1998 1975 dependencies = [ 1999 1976 "fastrand", 2000 - "getrandom 0.3.2", 1977 + "getrandom 0.3.3", 2001 1978 "once_cell", 2002 1979 "rustix", 2003 1980 "windows-sys 0.59.0", ··· 2055 2032 2056 2033 [[package]] 2057 2034 name = "tinystr" 2058 - version = "0.7.6" 2035 + version = "0.8.1" 2059 2036 source = "registry+https://github.com/rust-lang/crates.io-index" 2060 - checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" 2037 + checksum = "5d4f6d1145dcb577acf783d4e601bc1d76a13337bb54e6233add580b07344c8b" 2061 2038 dependencies = [ 2062 2039 "displaydoc", 2063 2040 "zerovec", ··· 2080 2057 2081 2058 [[package]] 2082 2059 name = "tokio" 2083 - version = "1.44.2" 2060 + version = "1.45.0" 2084 2061 source = "registry+https://github.com/rust-lang/crates.io-index" 2085 - checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48" 2062 + checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165" 2086 2063 dependencies = [ 2087 2064 "backtrace", 2088 2065 "bytes", ··· 2276 2253 version = "0.7.6" 2277 2254 source = "registry+https://github.com/rust-lang/crates.io-index" 2278 2255 checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" 2279 - 2280 - [[package]] 2281 - name = "utf16_iter" 2282 - version = "1.0.5" 2283 - source = "registry+https://github.com/rust-lang/crates.io-index" 2284 - checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" 2285 2256 2286 2257 [[package]] 2287 2258 name = "utf8_iter" ··· 2605 2576 ] 2606 2577 2607 2578 [[package]] 2608 - name = "write16" 2609 - version = "1.0.0" 2610 - source = "registry+https://github.com/rust-lang/crates.io-index" 2611 - checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" 2612 - 2613 - [[package]] 2614 2579 name = "writeable" 2615 - version = "0.5.5" 2580 + version = "0.6.1" 2616 2581 source = "registry+https://github.com/rust-lang/crates.io-index" 2617 - checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" 2582 + checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" 2618 2583 2619 2584 [[package]] 2620 2585 name = "yoke" 2621 - version = "0.7.5" 2586 + version = "0.8.0" 2622 2587 source = "registry+https://github.com/rust-lang/crates.io-index" 2623 - checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" 2588 + checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc" 2624 2589 dependencies = [ 2625 2590 "serde", 2626 2591 "stable_deref_trait", ··· 2630 2595 2631 2596 [[package]] 2632 2597 name = "yoke-derive" 2633 - version = "0.7.5" 2598 + version = "0.8.0" 2634 2599 source = "registry+https://github.com/rust-lang/crates.io-index" 2635 - checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" 2600 + checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" 2636 2601 dependencies = [ 2637 2602 "proc-macro2", 2638 2603 "quote", ··· 2642 2607 2643 2608 [[package]] 2644 2609 name = "zerocopy" 2645 - version = "0.7.35" 2646 - source = "registry+https://github.com/rust-lang/crates.io-index" 2647 - checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" 2648 - dependencies = [ 2649 - "zerocopy-derive 0.7.35", 2650 - ] 2651 - 2652 - [[package]] 2653 - name = "zerocopy" 2654 2610 version = "0.8.25" 2655 2611 source = "registry+https://github.com/rust-lang/crates.io-index" 2656 2612 checksum = "a1702d9583232ddb9174e01bb7c15a2ab8fb1bc6f227aa1233858c351a3ba0cb" 2657 2613 dependencies = [ 2658 - "zerocopy-derive 0.8.25", 2659 - ] 2660 - 2661 - [[package]] 2662 - name = "zerocopy-derive" 2663 - version = "0.7.35" 2664 - source = "registry+https://github.com/rust-lang/crates.io-index" 2665 - checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" 2666 - dependencies = [ 2667 - "proc-macro2", 2668 - "quote", 2669 - "syn", 2614 + "zerocopy-derive", 2670 2615 ] 2671 2616 2672 2617 [[package]] ··· 2708 2653 checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" 2709 2654 2710 2655 [[package]] 2656 + name = "zerotrie" 2657 + version = "0.2.2" 2658 + source = "registry+https://github.com/rust-lang/crates.io-index" 2659 + checksum = "36f0bbd478583f79edad978b407914f61b2972f5af6fa089686016be8f9af595" 2660 + dependencies = [ 2661 + "displaydoc", 2662 + "yoke", 2663 + "zerofrom", 2664 + ] 2665 + 2666 + [[package]] 2711 2667 name = "zerovec" 2712 - version = "0.10.4" 2668 + version = "0.11.2" 2713 2669 source = "registry+https://github.com/rust-lang/crates.io-index" 2714 - checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" 2670 + checksum = "4a05eb080e015ba39cc9e23bbe5e7fb04d5fb040350f99f34e338d5fdd294428" 2715 2671 dependencies = [ 2716 2672 "yoke", 2717 2673 "zerofrom", ··· 2720 2676 2721 2677 [[package]] 2722 2678 name = "zerovec-derive" 2723 - version = "0.10.3" 2679 + version = "0.11.1" 2724 2680 source = "registry+https://github.com/rust-lang/crates.io-index" 2725 - checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" 2681 + checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" 2726 2682 dependencies = [ 2727 2683 "proc-macro2", 2728 2684 "quote",
+2 -1
indexer/Cargo.toml
··· 1 1 [package] 2 2 name = "eyewall-indexer" 3 - version = "0.1.0" 3 + version = "0.0.1" 4 4 edition = "2024" 5 5 6 6 [dependencies] ··· 11 11 log = "0.4.27" 12 12 rocketman = "0.2.0" 13 13 serde_json = "1.0.140" 14 + sqlx = { version = "0.8.5", features = [ "runtime-tokio", "postgres", "tls-rustls" ] } 14 15 tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal", "sync"] }
+28
indexer/migrations/20250513173752_create_bsky_post_tables.sql
··· 1 + CREATE SCHEMA IF NOT EXISTS eyewall; 2 + 3 + CREATE TABLE eyewall.bsky_posts ( 4 + author_did TEXT NOT NULL, 5 + rkey TEXT NOT NULL, 6 + created_at TIMESTAMP, 7 + PRIMARY KEY (author_did, rkey) 8 + ); 9 + 10 + CREATE TABLE eyewall.bsky_post_replies ( 11 + author_did TEXT NOT NULL, 12 + rkey TEXT NOT NULL, 13 + parent_author_did TEXT NOT NULL, 14 + parent_rkey TEXT NOT NULL, 15 + PRIMARY KEY (author_did, rkey), 16 + FOREIGN KEY (author_did, rkey) REFERENCES bsky_posts(author_did, rkey), 17 + FOREIGN KEY (parent_author_did, parent_rkey) REFERENCES bsky_posts(author_did, rkey) 18 + ); 19 + 20 + CREATE TABLE eyewall.bsky_post_quotes ( 21 + author_did TEXT NOT NULL, 22 + rkey TEXT NOT NULL, 23 + parent_author_did TEXT NOT NULL, 24 + parent_rkey TEXT NOT NULL, 25 + PRIMARY KEY (author_did, rkey), 26 + FOREIGN KEY (author_did, rkey) REFERENCES bsky_posts(author_did, rkey), 27 + FOREIGN KEY (parent_author_did, parent_rkey) REFERENCES bsky_posts(author_did, rkey) 28 + );
+8
indexer/migrations/20250514150409_create_ingest_state_table.sql
··· 1 + CREATE TABLE eyewall.ingest_state ( 2 + id INTEGER NOT NULL DEFAULT 1, 3 + bsky_post_cursor TIMESTAMP NOT NULL, 4 + CONSTRAINT ingest_state_pk PRIMARY KEY (id), 5 + CONSTRAINT ingest_state_id_chk CHECK (id = 1) 6 + ); 7 + 8 + INSERT INTO eyewall.ingest_state (bsky_post_cursor) VALUES (to_timestamp(1735711200.000000));
+53 -12
indexer/src/ingestion.rs
··· 1 1 use std::{collections::HashMap, sync::{Arc, Mutex}}; 2 2 3 - use anyhow::{bail, Result}; 3 + use anyhow::{Result}; 4 4 use async_trait::async_trait; 5 5 use log::{debug, error, info}; 6 6 use rocketman::{connection::JetstreamConnection, endpoints::JetstreamEndpoints, handler, ingestion::LexiconIngestor, options::JetstreamOptions, types::event::{Event, Kind}}; 7 7 use serde_json::Value; 8 8 use tokio::{select, sync::Notify, time::{self, Duration}}; 9 9 10 - use crate::storage; 10 + use crate::storage::{self, BskyPostRecord}; 11 11 12 12 static BSKY_POST_NSID: &'static str = "app.bsky.feed.post"; 13 13 static BSKY_EMBED_RECORD_NSID: &'static str = "app.bsky.embed.record"; 14 14 15 - struct PostIngestor; 15 + 16 + struct PostIngestor { 17 + pool: storage::db::DbPool 18 + } 19 + 20 + impl PostIngestor { 21 + async fn new() -> Result<PostIngestor> { 22 + let pool = storage::db::connect().await?; 23 + Ok(PostIngestor { pool }) 24 + } 25 + } 16 26 17 27 #[async_trait] 18 28 impl LexiconIngestor for PostIngestor { 29 + 19 30 async fn ingest(&self, message: Event<Value>) -> Result<()> { 20 31 if message.commit.is_none() { 21 32 return Ok(()); ··· 47 58 48 59 debug!("{:?}", message); 49 60 50 - let post_id = storage::BskyPostId::new(message.did, message.commit.unwrap().rkey); 61 + let post_id = storage::BskyPostId::new(&message.did[..], &commit.rkey[..]); 62 + 63 + let reply_to = reply.map(|val| { 64 + if let Value::Object(map) = val { 65 + if let Some(Value::Object(parent_map)) = map.get("parent") { 66 + if let Some(Value::String(parent_uri)) = parent_map.get("uri") { 67 + let parts: Vec<_> = parent_uri.split('/').collect(); 68 + let parent_id = storage::BskyPostId::new(parts[2], parts[4]); 69 + if let Some(Value::Object(root_map)) = map.get("root") { 70 + if let Some(Value::String(root_uri)) = root_map.get("uri") { 71 + let parts: Vec<_> = root_uri.split('/').collect(); 72 + let root_id = storage::BskyPostId::new(parts[2], parts[4]); 73 + return storage::BskyPostReplyTo::new(parent_id, root_id); 74 + } 75 + } 76 + } 77 + } 78 + } 79 + panic!("Expected reply to be object"); 80 + }); 51 81 52 - // if let Some(Value::Object(embed_record)) = embed_map.get("record") { 53 - // if let Some(uri) = embed_record.get("uri") { 54 - // } 55 - // } 82 + let mut quote_of = None; 83 + if let Some(Value::Object(embed_record)) = embed_record { 84 + if let Some(Value::String(uri)) = embed_record.get("uri") { 85 + let parts: Vec<_> = uri.split('/').collect(); 86 + let author_did = parts[2]; 87 + let rkey = parts[4]; 88 + quote_of = Some(storage::BskyPostId::new(author_did, rkey)); 89 + } 90 + } 91 + 92 + let post = BskyPostRecord::new(post_id, reply_to, quote_of, message.time_us); 93 + storage::db::insert_bsky_post(&self.pool, post).await?; 56 94 57 95 }, 58 96 _ => { ··· 60 98 } 61 99 } 62 100 63 - // Event { did: "did:plc:i5o7ybb4yg45zhnhigrzeiqn", time_us: Some(1746977528899856), kind: Commit, commit: Some(Commit { 64 - // rev: "3lovrnkjkf32t", operation: Create, collection: "app.bsky.feed.post", rkey: "3lovrnkbesk2z", 101 + // Event { 102 + // did: "did:plc:i5o7ybb4yg45zhnhigrzeiqn", time_us: Some(1746977528899856), kind: Commit, commit: Some(Commit { 103 + // rev: "3lovrnkjkf32t", operation: Create, collection: "app.bsky.feed.post", 104 + // rkey: "3lovrnkbesk2z", 65 105 // record: Some(Object { 66 106 // "$type": String("app.bsky.feed.post"), "createdAt": String("2025-05-11T15:32:08.458Z"), "langs": Array [String("en")], 67 107 // "reply": Object { 68 108 // "parent": Object { 69 - // "cid": String("bafyreifhgwuhg25cdh7r4xg54c7zyt3miq37caeixkfdwlpsxbsucdedey"), "uri": String("at://did:plc:qsmmhv4u2ygx2thvepti77zc/app.bsky.feed.post/3lovq4eaoxc2y") 109 + // "cid": String("bafyreifhgwuhg25cdh7r4xg54c7zyt3miq37caeixkfdwlpsxbsucdedey"), 110 + // "uri": String("at://did:plc:qsmmhv4u2ygx2thvepti77zc/app.bsky.feed.post/3lovq4eaoxc2y") 70 111 // }, 71 112 // "root": Object { 72 113 // "cid": String("bafyreifhgwuhg25cdh7r4xg54c7zyt3miq37caeixkfdwlpsxbsucdedey"), "uri": String("at://did:plc:qsmmhv4u2ygx2thvepti77zc/app.bsky.feed.post/3lovq4eaoxc2y") ··· 129 170 let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new(); 130 171 ingestors.insert( 131 172 BSKY_POST_NSID.to_string(), 132 - Box::new(PostIngestor) 173 + Box::new(PostIngestor::new().await.expect("Could not connect to DB")) 133 174 ); 134 175 135 176 let cursor = Arc::new(Mutex::new(cursor_val));
+80
indexer/src/storage/db.rs
··· 1 + use std::env; 2 + 3 + use sqlx::{postgres::{PgPoolOptions, PgRow}, Pool, Postgres, Row}; 4 + 5 + use super::{BskyPostId, BskyPostRecord}; 6 + 7 + pub type DbPool = Pool<Postgres>; 8 + 9 + pub async fn connect() -> Result<DbPool, sqlx::Error> { 10 + let db_url = env::var("DATABASE_URL").expect("Missing DATABASE_URL env var"); 11 + PgPoolOptions::new() 12 + .max_connections(5) 13 + .connect(&db_url).await 14 + } 15 + 16 + pub async fn insert_bsky_post<S: AsRef<str>>(pool: &DbPool, post: BskyPostRecord<BskyPostId<S>>) -> Result<(), sqlx::Error> { 17 + let result = sqlx::query("SELECT COUNT(*) as count FROM eyewall.bsky_posts AS posts WHERE posts.author_did = $1 and posts.rkey = $2") 18 + .bind(post.id.did.as_ref()) 19 + .bind(post.id.rkey.as_ref()) 20 + .fetch_one(pool) 21 + .await?; 22 + let count: i64 = result.try_get("count")?; 23 + if count == 1 { 24 + return Ok(()); 25 + } 26 + 27 + let created_at = post.created_at.map(|ts| (ts as f64) / 1e6); 28 + sqlx::query("INSERT INTO eyewall.bsky_posts (author_did, rkey, created_at) VALUES ($1, $2, to_timestamp($3))") 29 + .bind(post.id.did.as_ref()) 30 + .bind(post.id.rkey.as_ref()) 31 + .bind(created_at) 32 + .execute(pool) 33 + .await?; 34 + 35 + if let Some(reply) = post.reply_to { 36 + let result = sqlx::query("SELECT COUNT(*) as count FROM eyewall.bsky_posts AS posts WHERE posts.author_did = $1 and posts.rkey = $2") 37 + .bind(reply.target.did.as_ref()) 38 + .bind(reply.target.rkey.as_ref()) 39 + .fetch_one(pool) 40 + .await?; 41 + let count: i64 = result.try_get("count")?; 42 + if count == 0 { 43 + sqlx::query("INSERT INTO eyewall.bsky_posts (author_did, rkey) VALUES ($1, $2)") 44 + .bind(reply.target.did.as_ref()) 45 + .bind(reply.target.rkey.as_ref()) 46 + .execute(pool) 47 + .await?; 48 + } 49 + sqlx::query("INSERT INTO eyewall.bsky_post_replies (author_did, rkey, parent_author_did, parent_rkey) VALUES ($1, $2, $3, $4)") 50 + .bind(post.id.did.as_ref()) 51 + .bind(post.id.rkey.as_ref()) 52 + .bind(reply.target.did.as_ref()) 53 + .bind(reply.target.rkey.as_ref()) 54 + .execute(pool) 55 + .await?; 56 + } 57 + if let Some(quote) = post.quote_of { 58 + let result = sqlx::query("SELECT COUNT(*) as count FROM eyewall.bsky_posts AS posts WHERE posts.author_did = $1 and posts.rkey = $2") 59 + .bind(quote.did.as_ref()) 60 + .bind(quote.rkey.as_ref()) 61 + .fetch_one(pool) 62 + .await?; 63 + let count: i64 = result.try_get("count")?; 64 + if count == 0 { 65 + sqlx::query("INSERT INTO eyewall.bsky_posts (author_did, rkey) VALUES ($1, $2)") 66 + .bind(quote.did.as_ref()) 67 + .bind(quote.rkey.as_ref()) 68 + .execute(pool) 69 + .await?; 70 + } 71 + sqlx::query("INSERT INTO eyewall.bsky_post_quotes (author_did, rkey, parent_author_did, parent_rkey) VALUES ($1, $2, $3, $4)") 72 + .bind(post.id.did.as_ref()) 73 + .bind(post.id.rkey.as_ref()) 74 + .bind(quote.did.as_ref()) 75 + .bind(quote.rkey.as_ref()) 76 + .execute(pool) 77 + .await?; 78 + } 79 + Ok(()) 80 + }
+22 -503
indexer/src/storage/mod.rs
··· 1 - use std::{collections::HashMap, hash::Hash}; 1 + pub mod db; 2 + pub mod uf; 3 + 4 + use std::hash::Hash; 2 5 3 6 #[derive(Clone, Debug, Eq, Hash, PartialEq)] 4 - pub struct BskyPostId { 5 - did: String, 6 - rkey: String, 7 + pub struct BskyPostId<S: AsRef<str>> { 8 + did: S, 9 + rkey: S, 7 10 } 8 11 9 - impl BskyPostId { 10 - pub fn new(did: String, rkey: String) -> BskyPostId { 12 + impl<S: AsRef<str>> BskyPostId<S> { 13 + pub fn new(did: S, rkey: S) -> BskyPostId<S> { 11 14 BskyPostId { did: did, rkey: rkey } 12 15 } 13 16 } 14 17 15 - impl BskyPostId { 16 - fn from(did: &str, rkey: &str) -> BskyPostId { 17 - BskyPostId { did: String::from(did), rkey: String::from(rkey) } 18 - } 19 - } 20 - 21 - #[derive(Clone, Copy, Debug, Eq, PartialEq)] 22 - enum SubgraphType { 23 - Reply, 24 - Quote, 25 - ReplyQuote, 26 - } 27 - 28 - 29 18 #[derive(Clone, Eq, Hash, PartialEq)] 30 - struct BskyPostReplyTo<Id> { 19 + pub struct BskyPostReplyTo<Id> { 31 20 target: Id, 32 21 root: Id, 33 22 } 34 23 24 + impl<Id> BskyPostReplyTo<Id> { 25 + pub fn new(target: Id, root: Id) -> BskyPostReplyTo<Id> { 26 + BskyPostReplyTo { target, root} 27 + } 28 + } 29 + 30 + 35 31 #[derive(Clone, Eq, Hash, PartialEq)] 36 - struct BskyPostRecord<Id> { 32 + pub struct BskyPostRecord<Id> { 37 33 id: Id, 38 34 reply_to: Option<BskyPostReplyTo<Id>>, 39 35 quote_of: Option<Id>, 36 + created_at: Option<u64>, 40 37 } 41 38 42 - type UFIndex = u32; 43 - type UFSize = u32; 44 - 45 - #[derive(Debug)] 46 - struct BskyPostUnionFind<Id> { 47 - id_to_index: HashMap<Id, UFIndex>, 48 - parents_ro: HashMap<UFIndex, UFIndex>, 49 - parents_qo: HashMap<UFIndex, UFIndex>, 50 - parents_rq: HashMap<UFIndex, UFIndex>, 51 - sizes_ro: HashMap<UFIndex, UFSize>, 52 - sizes_qo: HashMap<UFIndex, UFSize>, 53 - sizes_rq: HashMap<UFIndex, UFSize>, 54 - next_index: UFIndex, 55 - } 56 - 57 - impl<Id: Clone + Eq + Hash> BskyPostUnionFind<Id> { 58 - pub fn new() -> BskyPostUnionFind<Id> { 59 - BskyPostUnionFind { 60 - id_to_index: HashMap::new(), 61 - parents_ro: HashMap::new(), 62 - parents_qo: HashMap::new(), 63 - parents_rq: HashMap::new(), 64 - sizes_ro: HashMap::new(), 65 - sizes_qo: HashMap::new(), 66 - sizes_rq: HashMap::new(), 67 - next_index: 0, 68 - } 69 - } 70 - 71 - fn get_index(&mut self, id: &Id) -> UFIndex { 72 - match self.id_to_index.get(id) { 73 - None => { 74 - let idx = self.next_index; 75 - self.id_to_index.insert(id.clone(), idx); 76 - self.next_index += 1; 77 - idx 78 - }, 79 - Some(idx) => *idx, 80 - } 81 - } 82 - 83 - pub fn ingest_post(&mut self, record: BskyPostRecord<Id>) -> bool { 84 - if record.reply_to.is_none() && record.quote_of.is_none() { 85 - return false; 86 - } 87 - let post_idx = self.get_index(&record.id); 88 - 89 - if let Some(ref reply_to) = record.reply_to { 90 - let root_idx = self.get_index(&reply_to.root); 91 - let parent_idx = self.get_index(&reply_to.target); 92 - 93 - // assuming that root has a parents_ro entry iff it has a sizes_ro entry. 94 - // this is *only true* iff the root of the reply tree is always the UF root. 95 - // but by construction it is, since we never have to call union() or find() 96 - // for a reply tree. 97 - 98 - let root_ro_size = match self.parents_ro.insert(root_idx, root_idx) { 99 - None => 1, 100 - Some(_) => *self.sizes_ro.get(&root_idx).unwrap(), 101 - }; 102 - let mut new_root_ro_size = root_ro_size; 103 - match self.parents_ro.insert(parent_idx, root_idx) { 104 - None => { 105 - new_root_ro_size += 1; 106 - }, 107 - Some(_) => (), 108 - } 109 - match self.parents_ro.insert(post_idx, root_idx) { 110 - None => { 111 - new_root_ro_size += 1; 112 - }, 113 - Some(_) => (), 114 - } 115 - 116 - if new_root_ro_size > root_ro_size { 117 - self.sizes_ro.insert(root_idx, new_root_ro_size); 118 - } 119 - 120 - let mut rq_to_add = Vec::new(); 121 - if !self.parents_rq.contains_key(&post_idx) { 122 - rq_to_add.push(post_idx); 123 - } 124 - if !self.parents_rq.contains_key(&root_idx) { 125 - rq_to_add.push(root_idx); 126 - } 127 - if parent_idx != root_idx && !self.parents_rq.contains_key(&parent_idx) { 128 - rq_to_add.push(parent_idx); 129 - } 130 - 131 - for idx in rq_to_add { 132 - self.parents_rq.insert(idx, idx); 133 - self.sizes_rq.insert(idx, 1); 134 - } 135 - if parent_idx != root_idx { 136 - self.union_rq(root_idx, parent_idx); 137 - } 138 - self.union_rq(parent_idx, post_idx); 139 - } 140 - 141 - if let Some(ref quote_of) = record.quote_of { 142 - let parent_idx = self.get_index(&quote_of); 143 - // assuming: if it's not in parents_qo, it's also not in sizes_qo 144 - 145 - match (self.parents_qo.get(&parent_idx), self.parents_qo.get(&post_idx)) { 146 - (None, None) => { 147 - self.parents_qo.insert(parent_idx, parent_idx); 148 - self.parents_qo.insert(post_idx, parent_idx); 149 - self.sizes_qo.insert(parent_idx, 2); 150 - }, 151 - (None, Some(post_parent)) => { 152 - let post_parent_idx = *post_parent; 153 - self.parents_qo.insert(parent_idx, parent_idx); 154 - self.sizes_qo.insert(parent_idx, 1); 155 - self.union_qo(parent_idx, post_parent_idx); 156 - }, 157 - (Some(parent_parent), None) => { 158 - let parent_root_idx = self.find_qo(*parent_parent).unwrap(); 159 - self.parents_qo.insert(post_idx, parent_root_idx); 160 - self.sizes_qo.entry(parent_root_idx).and_modify(|e| *e += 1); 161 - }, 162 - (Some(parent_parent), Some(post_parent)) => { 163 - self.union_qo(*parent_parent, *post_parent); 164 - }, 165 - } 166 - 167 - let mut rq_to_add = Vec::new(); 168 - if !self.parents_rq.contains_key(&post_idx) { 169 - rq_to_add.push(post_idx); 170 - } 171 - if !self.parents_rq.contains_key(&parent_idx) { 172 - rq_to_add.push(parent_idx); 173 - } 174 - for idx in rq_to_add { 175 - self.parents_rq.insert(idx, idx); 176 - self.sizes_rq.insert(idx, 1); 177 - } 178 - self.union_rq(parent_idx, post_idx); 179 - 180 - } 181 - 182 - true 183 - } 184 - 185 - fn find(parents: &mut HashMap<UFIndex, UFIndex>, idx: UFIndex) -> Option<UFIndex> { 186 - let mut curr_idx = idx; 187 - let mut pa_idx = *parents.get(&curr_idx)?; 188 - while curr_idx != pa_idx { 189 - curr_idx = pa_idx; 190 - pa_idx = *parents.get(&curr_idx).unwrap(); 191 - } 192 - let rep = curr_idx; 193 - 194 - curr_idx = idx; 195 - pa_idx = *parents.get(&curr_idx).unwrap(); 196 - while curr_idx != pa_idx { 197 - parents.insert(curr_idx, rep); 198 - curr_idx = pa_idx; 199 - pa_idx = *parents.get(&curr_idx).unwrap(); 200 - } 201 - Some(curr_idx) 202 - } 203 - 204 - // find the representative node for a given index 205 - fn find_qo(&mut self, idx: UFIndex) -> Option<UFIndex> { 206 - Self::find(&mut self.parents_qo, idx) 207 - } 208 - 209 - fn find_rq(&mut self, idx: UFIndex) -> Option<UFIndex> { 210 - Self::find(&mut self.parents_rq, idx) 211 - } 212 - 213 - 214 - fn union_qo(&mut self, idx1: UFIndex, idx2: UFIndex) { 215 - let pa1 = self.find_qo(idx1).unwrap(); 216 - let pa2 = self.find_qo(idx2).unwrap(); 217 - 218 - if pa1 == pa2 { 219 - return; 220 - } 221 - 222 - let pa1_size = self.sizes_qo.get(&pa1).unwrap(); 223 - let pa2_size = self.sizes_qo.get(&pa2).unwrap(); 224 - 225 - if pa1_size >= pa2_size { 226 - self.parents_qo.insert(pa2, pa1); 227 - self.sizes_qo.insert(pa1, pa1_size + pa2_size); 228 - self.sizes_qo.remove(&pa2); 229 - } else { 230 - self.parents_qo.insert(pa1, pa2); 231 - self.sizes_qo.insert(pa2, pa1_size + pa2_size); 232 - self.sizes_qo.remove(&pa1); 233 - } 234 - } 235 - 236 - fn union_rq(&mut self, idx1: UFIndex, idx2: UFIndex) { 237 - let pa1 = self.find_rq(idx1).unwrap(); 238 - let pa2 = self.find_rq(idx2).unwrap(); 239 - 240 - if pa1 == pa2 { 241 - return; 242 - } 243 - 244 - let pa1_size = self.sizes_rq.get(&pa1).unwrap(); 245 - let pa2_size = self.sizes_rq.get(&pa2).unwrap(); 246 - 247 - if pa1_size >= pa2_size { 248 - self.parents_rq.insert(pa2, pa1); 249 - self.sizes_rq.insert(pa1, pa1_size + pa2_size); 250 - self.sizes_rq.remove(&pa2); 251 - } else { 252 - self.parents_rq.insert(pa1, pa2); 253 - self.sizes_rq.insert(pa2, pa1_size + pa2_size); 254 - self.sizes_rq.remove(&pa1); 255 - } 256 - } 257 - 258 - pub fn component_size(&mut self, subgraph_type: SubgraphType, id: &Id) -> Option<UFSize> { 259 - let idx = self.id_to_index.get(id).map(|idx| *idx)?; 260 - match subgraph_type { 261 - SubgraphType::Reply => { 262 - // special structure of replies means all non-root nodes are direct children of root 263 - let pa = *self.parents_ro.get(&idx)?; 264 - self.sizes_ro.get(&pa).map(|idx| *idx) 265 - }, 266 - SubgraphType::Quote => { 267 - let pa = self.find_qo(idx)?; 268 - self.sizes_qo.get(&pa).map(|idx| *idx) 269 - }, 270 - SubgraphType::ReplyQuote => { 271 - let pa = self.find_rq(idx)?; 272 - self.sizes_rq.get(&pa).map(|idx| *idx) 273 - }, 274 - } 39 + impl<Id> BskyPostRecord<Id> { 40 + pub fn new(id: Id, reply_to: Option<BskyPostReplyTo<Id>>, quote_of: Option<Id>, created_at: Option<u64>) -> BskyPostRecord<Id> { 41 + BskyPostRecord { id, reply_to, quote_of, created_at } 275 42 } 276 43 } 277 - 278 - #[cfg(test)] 279 - mod tests { 280 - use itertools::Itertools; 281 - 282 - use super::{BskyPostId, BskyPostRecord, BskyPostReplyTo, BskyPostUnionFind, SubgraphType}; 283 - 284 - #[test] 285 - fn test_simple_reply_thread_1() { 286 - let mut uf = BskyPostUnionFind::new(); 287 - 288 - let id1 = BskyPostId::from("a", "1"); 289 - let id2 = BskyPostId::from("b", "1"); 290 - let id3 = BskyPostId::from("a", "2"); 291 - let id4 = BskyPostId::from("a", "3"); 292 - uf.ingest_post(BskyPostRecord { 293 - id: id2.clone(), 294 - reply_to: Some(super::BskyPostReplyTo { target: id1.clone(), root: id1.clone() }), 295 - quote_of: None 296 - }); 297 - uf.ingest_post(BskyPostRecord { 298 - id: id3.clone(), 299 - reply_to: Some(super::BskyPostReplyTo { target: id2.clone(), root: id1.clone() }), 300 - quote_of: None 301 - }); 302 - uf.ingest_post(BskyPostRecord { 303 - id: id4.clone(), 304 - reply_to: Some(super::BskyPostReplyTo { target: id3.clone(), root: id1.clone() }), 305 - quote_of: None 306 - }); 307 - 308 - assert_eq!(uf.component_size(SubgraphType::Reply, &id1), Some(4)); 309 - assert_eq!(uf.component_size(SubgraphType::Reply, &id2), Some(4)); 310 - assert_eq!(uf.component_size(SubgraphType::Reply, &id3), Some(4)); 311 - assert_eq!(uf.component_size(SubgraphType::Reply, &id4), Some(4)); 312 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id1), Some(4)); 313 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id2), Some(4)); 314 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id3), Some(4)); 315 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id4), Some(4)); 316 - 317 - } 318 - 319 - #[test] 320 - fn test_simple_reply_thread_2() { 321 - let mut uf = BskyPostUnionFind::new(); 322 - 323 - let id1 = BskyPostId::from("a", "1"); 324 - let id2 = BskyPostId::from("b", "1"); 325 - let id3 = BskyPostId::from("a", "2"); 326 - let id4 = BskyPostId::from("a", "3"); 327 - uf.ingest_post(BskyPostRecord { 328 - id: id4.clone(), 329 - reply_to: Some(super::BskyPostReplyTo { target: id3.clone(), root: id1.clone() }), 330 - quote_of: None 331 - }); 332 - uf.ingest_post(BskyPostRecord { 333 - id: id3.clone(), 334 - reply_to: Some(super::BskyPostReplyTo { target: id2.clone(), root: id1.clone() }), 335 - quote_of: None 336 - }); 337 - uf.ingest_post(BskyPostRecord { 338 - id: id2.clone(), 339 - reply_to: Some(super::BskyPostReplyTo { target: id1.clone(), root: id1.clone() }), 340 - quote_of: None 341 - }); 342 - 343 - assert_eq!(uf.component_size(SubgraphType::Reply, &id1), Some(4)); 344 - assert_eq!(uf.component_size(SubgraphType::Reply, &id2), Some(4)); 345 - assert_eq!(uf.component_size(SubgraphType::Reply, &id3), Some(4)); 346 - assert_eq!(uf.component_size(SubgraphType::Reply, &id4), Some(4)); 347 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id1), Some(4)); 348 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id2), Some(4)); 349 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id3), Some(4)); 350 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id4), Some(4)); 351 - 352 - } 353 - 354 - #[test] 355 - fn test_simple_quote_chain() { 356 - let id1 = BskyPostId::from("a", "1"); 357 - let id2 = BskyPostId::from("b", "1"); 358 - let id3 = BskyPostId::from("a", "2"); 359 - let id4 = BskyPostId::from("a", "3"); 360 - let mut records: Vec<BskyPostRecord<BskyPostId>> = vec![ 361 - BskyPostRecord { 362 - id: id2.clone(), 363 - reply_to: None, 364 - quote_of: Some(id1.clone()) 365 - }, 366 - BskyPostRecord { 367 - id: id3.clone(), 368 - reply_to: None, 369 - quote_of: Some(id2.clone()) 370 - }, 371 - BskyPostRecord { 372 - id: id4.clone(), 373 - reply_to: None, 374 - quote_of: Some(id3.clone()) 375 - } 376 - ]; 377 - 378 - for (i, perm) in records.iter().permutations(records.len()).enumerate() { 379 - let mut uf = BskyPostUnionFind::new(); 380 - uf.ingest_post(perm[0].clone()); 381 - uf.ingest_post(perm[1].clone()); 382 - uf.ingest_post(perm[2].clone()); 383 - assert_eq!(uf.component_size(SubgraphType::Quote, &id1), Some(4)); 384 - assert_eq!(uf.component_size(SubgraphType::Quote, &id2), Some(4)); 385 - assert_eq!(uf.component_size(SubgraphType::Quote, &id3), Some(4)); 386 - assert_eq!(uf.component_size(SubgraphType::Quote, &id4), Some(4)); 387 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id1), Some(4)); 388 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id2), Some(4)); 389 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id3), Some(4)); 390 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id4), Some(4)); 391 - } 392 - 393 - 394 - 395 - } 396 - 397 - #[test] 398 - fn test_simple_reply_quote_graph_1() { 399 - let id1 = BskyPostId::from("a", "1"); 400 - let id2 = BskyPostId::from("b", "1"); 401 - let id3 = BskyPostId::from("a", "2"); 402 - let id4 = BskyPostId::from("a", "3"); 403 - let mut records: Vec<BskyPostRecord<BskyPostId>> = vec![ 404 - BskyPostRecord { 405 - id: id2.clone(), 406 - reply_to: Some(BskyPostReplyTo { target: id1.clone(), root: id1.clone() }), 407 - quote_of: None 408 - }, 409 - BskyPostRecord { 410 - id: id3.clone(), 411 - reply_to: Some(BskyPostReplyTo { target: id2.clone(), root: id1.clone() }), 412 - quote_of: None 413 - }, 414 - BskyPostRecord { 415 - id: id4.clone(), 416 - reply_to: None, 417 - quote_of: Some(id3.clone()) 418 - } 419 - ]; 420 - 421 - for perm in records.iter().permutations(records.len()) { 422 - let mut uf = BskyPostUnionFind::new(); 423 - uf.ingest_post(perm[0].clone()); 424 - uf.ingest_post(perm[1].clone()); 425 - uf.ingest_post(perm[2].clone()); 426 - assert_eq!(uf.component_size(SubgraphType::Reply, &id1), Some(3)); 427 - assert_eq!(uf.component_size(SubgraphType::Reply, &id2), Some(3)); 428 - assert_eq!(uf.component_size(SubgraphType::Reply, &id3), Some(3)); 429 - assert_eq!(uf.component_size(SubgraphType::Reply, &id4), None); 430 - assert_eq!(uf.component_size(SubgraphType::Quote, &id3), Some(2)); 431 - assert_eq!(uf.component_size(SubgraphType::Quote, &id4), Some(2)); 432 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id1), Some(4)); 433 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id2), Some(4)); 434 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id3), Some(4)); 435 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id4), Some(4)); 436 - } 437 - } 438 - 439 - #[test] 440 - fn test_simple_reply_quote_graph_2() { 441 - let id1 = BskyPostId::from("a", "1"); 442 - let id2 = BskyPostId::from("b", "1"); 443 - let id3 = BskyPostId::from("a", "2"); 444 - let id4 = BskyPostId::from("a", "3"); 445 - let mut records: Vec<BskyPostRecord<BskyPostId>> = vec![ 446 - BskyPostRecord { 447 - id: id2.clone(), 448 - reply_to: Some(BskyPostReplyTo { target: id1.clone(), root: id1.clone() }), 449 - quote_of: None 450 - }, 451 - BskyPostRecord { 452 - id: id4.clone(), 453 - reply_to: Some(BskyPostReplyTo { target: id3.clone(), root: id3.clone() }), 454 - quote_of: Some(id2.clone()) 455 - }, 456 - ]; 457 - 458 - for (i, perm) in records.iter().permutations(records.len()).enumerate() { 459 - let mut uf = BskyPostUnionFind::new(); 460 - uf.ingest_post(perm[0].clone()); 461 - uf.ingest_post(perm[1].clone()); 462 - assert_eq!(uf.component_size(SubgraphType::Reply, &id1), Some(2)); 463 - assert_eq!(uf.component_size(SubgraphType::Reply, &id2), Some(2)); 464 - assert_eq!(uf.component_size(SubgraphType::Reply, &id3), Some(2)); 465 - assert_eq!(uf.component_size(SubgraphType::Reply, &id4), Some(2)); 466 - assert_eq!(uf.component_size(SubgraphType::Quote, &id1), None); 467 - assert_eq!(uf.component_size(SubgraphType::Quote, &id2), Some(2)); 468 - assert_eq!(uf.component_size(SubgraphType::Quote, &id3), None); 469 - assert_eq!(uf.component_size(SubgraphType::Quote, &id4), Some(2)); 470 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id1), Some(4)); 471 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id2), Some(4)); 472 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id3), Some(4)); 473 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id4), Some(4)); 474 - } 475 - 476 - } 477 - 478 - #[test] 479 - fn test_reply_quote_graph() { 480 - let id1 = BskyPostId::from("a", "1"); 481 - let id2 = BskyPostId::from("b", "1"); 482 - let id3 = BskyPostId::from("c", "1"); 483 - let id4 = BskyPostId::from("a", "2"); 484 - let id5 = BskyPostId::from("a", "3"); 485 - let id6 = BskyPostId::from("a", "4"); 486 - let mut records: Vec<BskyPostRecord<BskyPostId>> = vec![ 487 - BskyPostRecord { 488 - id: id3.clone(), 489 - reply_to: Some(BskyPostReplyTo { target: id2.clone(), root: id1.clone() }), 490 - quote_of: None 491 - }, 492 - BskyPostRecord { 493 - id: id6.clone(), 494 - reply_to: Some(BskyPostReplyTo { target: id5.clone(), root: id4.clone() }), 495 - quote_of: Some(id3.clone()) 496 - }, 497 - ]; 498 - 499 - for (i, perm) in records.iter().permutations(records.len()).enumerate() { 500 - let mut uf = BskyPostUnionFind::new(); 501 - uf.ingest_post(perm[0].clone()); 502 - uf.ingest_post(perm[1].clone()); 503 - assert_eq!(uf.component_size(SubgraphType::Reply, &id1), Some(3)); 504 - assert_eq!(uf.component_size(SubgraphType::Reply, &id2), Some(3)); 505 - assert_eq!(uf.component_size(SubgraphType::Reply, &id3), Some(3)); 506 - assert_eq!(uf.component_size(SubgraphType::Reply, &id4), Some(3)); 507 - assert_eq!(uf.component_size(SubgraphType::Reply, &id5), Some(3)); 508 - assert_eq!(uf.component_size(SubgraphType::Reply, &id6), Some(3)); 509 - assert_eq!(uf.component_size(SubgraphType::Quote, &id1), None); 510 - assert_eq!(uf.component_size(SubgraphType::Quote, &id2), None); 511 - assert_eq!(uf.component_size(SubgraphType::Quote, &id3), Some(2)); 512 - assert_eq!(uf.component_size(SubgraphType::Quote, &id4), None); 513 - assert_eq!(uf.component_size(SubgraphType::Quote, &id5), None); 514 - assert_eq!(uf.component_size(SubgraphType::Quote, &id6), Some(2)); 515 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id1), Some(6)); 516 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id2), Some(6)); 517 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id3), Some(6)); 518 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id4), Some(6)); 519 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id5), Some(6)); 520 - assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id6), Some(6)); 521 - } 522 - 523 - } 524 - }
+514
indexer/src/storage/uf.rs
··· 1 + use std::{collections::HashMap, hash::Hash}; 2 + 3 + use super::BskyPostRecord; 4 + 5 + 6 + #[derive(Clone, Copy, Debug, Eq, PartialEq)] 7 + enum SubgraphType { 8 + Reply, 9 + Quote, 10 + ReplyQuote, 11 + } 12 + 13 + 14 + 15 + type UFIndex = u32; 16 + type UFSize = u32; 17 + 18 + #[derive(Debug)] 19 + struct BskyPostUnionFind<Id> { 20 + id_to_index: HashMap<Id, UFIndex>, 21 + parents_ro: HashMap<UFIndex, UFIndex>, 22 + parents_qo: HashMap<UFIndex, UFIndex>, 23 + parents_rq: HashMap<UFIndex, UFIndex>, 24 + sizes_ro: HashMap<UFIndex, UFSize>, 25 + sizes_qo: HashMap<UFIndex, UFSize>, 26 + sizes_rq: HashMap<UFIndex, UFSize>, 27 + next_index: UFIndex, 28 + } 29 + 30 + impl<Id: Clone + Eq + Hash> BskyPostUnionFind<Id> { 31 + pub fn new() -> BskyPostUnionFind<Id> { 32 + BskyPostUnionFind { 33 + id_to_index: HashMap::new(), 34 + parents_ro: HashMap::new(), 35 + parents_qo: HashMap::new(), 36 + parents_rq: HashMap::new(), 37 + sizes_ro: HashMap::new(), 38 + sizes_qo: HashMap::new(), 39 + sizes_rq: HashMap::new(), 40 + next_index: 0, 41 + } 42 + } 43 + 44 + fn get_index(&mut self, id: &Id) -> UFIndex { 45 + match self.id_to_index.get(id) { 46 + None => { 47 + let idx = self.next_index; 48 + self.id_to_index.insert(id.clone(), idx); 49 + self.next_index += 1; 50 + idx 51 + }, 52 + Some(idx) => *idx, 53 + } 54 + } 55 + 56 + pub fn ingest_post(&mut self, record: BskyPostRecord<Id>) -> bool { 57 + if record.reply_to.is_none() && record.quote_of.is_none() { 58 + return false; 59 + } 60 + let post_idx = self.get_index(&record.id); 61 + 62 + if let Some(ref reply_to) = record.reply_to { 63 + let root_idx = self.get_index(&reply_to.root); 64 + let parent_idx = self.get_index(&reply_to.target); 65 + 66 + // assuming that root has a parents_ro entry iff it has a sizes_ro entry. 67 + // this is *only true* iff the root of the reply tree is always the UF root. 68 + // but by construction it is, since we never have to call union() or find() 69 + // for a reply tree. 70 + 71 + let root_ro_size = match self.parents_ro.insert(root_idx, root_idx) { 72 + None => 1, 73 + Some(_) => *self.sizes_ro.get(&root_idx).unwrap(), 74 + }; 75 + let mut new_root_ro_size = root_ro_size; 76 + match self.parents_ro.insert(parent_idx, root_idx) { 77 + None => { 78 + new_root_ro_size += 1; 79 + }, 80 + Some(_) => (), 81 + } 82 + match self.parents_ro.insert(post_idx, root_idx) { 83 + None => { 84 + new_root_ro_size += 1; 85 + }, 86 + Some(_) => (), 87 + } 88 + 89 + if new_root_ro_size > root_ro_size { 90 + self.sizes_ro.insert(root_idx, new_root_ro_size); 91 + } 92 + 93 + let mut rq_to_add = Vec::new(); 94 + if !self.parents_rq.contains_key(&post_idx) { 95 + rq_to_add.push(post_idx); 96 + } 97 + if !self.parents_rq.contains_key(&root_idx) { 98 + rq_to_add.push(root_idx); 99 + } 100 + if parent_idx != root_idx && !self.parents_rq.contains_key(&parent_idx) { 101 + rq_to_add.push(parent_idx); 102 + } 103 + 104 + for idx in rq_to_add { 105 + self.parents_rq.insert(idx, idx); 106 + self.sizes_rq.insert(idx, 1); 107 + } 108 + if parent_idx != root_idx { 109 + self.union_rq(root_idx, parent_idx); 110 + } 111 + self.union_rq(parent_idx, post_idx); 112 + } 113 + 114 + if let Some(ref quote_of) = record.quote_of { 115 + let parent_idx = self.get_index(&quote_of); 116 + // assuming: if it's not in parents_qo, it's also not in sizes_qo 117 + 118 + match (self.parents_qo.get(&parent_idx), self.parents_qo.get(&post_idx)) { 119 + (None, None) => { 120 + self.parents_qo.insert(parent_idx, parent_idx); 121 + self.parents_qo.insert(post_idx, parent_idx); 122 + self.sizes_qo.insert(parent_idx, 2); 123 + }, 124 + (None, Some(post_parent)) => { 125 + let post_parent_idx = *post_parent; 126 + self.parents_qo.insert(parent_idx, parent_idx); 127 + self.sizes_qo.insert(parent_idx, 1); 128 + self.union_qo(parent_idx, post_parent_idx); 129 + }, 130 + (Some(parent_parent), None) => { 131 + let parent_root_idx = self.find_qo(*parent_parent).unwrap(); 132 + self.parents_qo.insert(post_idx, parent_root_idx); 133 + self.sizes_qo.entry(parent_root_idx).and_modify(|e| *e += 1); 134 + }, 135 + (Some(parent_parent), Some(post_parent)) => { 136 + self.union_qo(*parent_parent, *post_parent); 137 + }, 138 + } 139 + 140 + let mut rq_to_add = Vec::new(); 141 + if !self.parents_rq.contains_key(&post_idx) { 142 + rq_to_add.push(post_idx); 143 + } 144 + if !self.parents_rq.contains_key(&parent_idx) { 145 + rq_to_add.push(parent_idx); 146 + } 147 + for idx in rq_to_add { 148 + self.parents_rq.insert(idx, idx); 149 + self.sizes_rq.insert(idx, 1); 150 + } 151 + self.union_rq(parent_idx, post_idx); 152 + 153 + } 154 + 155 + true 156 + } 157 + 158 + fn find(parents: &mut HashMap<UFIndex, UFIndex>, idx: UFIndex) -> Option<UFIndex> { 159 + let mut curr_idx = idx; 160 + let mut pa_idx = *parents.get(&curr_idx)?; 161 + while curr_idx != pa_idx { 162 + curr_idx = pa_idx; 163 + pa_idx = *parents.get(&curr_idx).unwrap(); 164 + } 165 + let rep = curr_idx; 166 + 167 + curr_idx = idx; 168 + pa_idx = *parents.get(&curr_idx).unwrap(); 169 + while curr_idx != pa_idx { 170 + parents.insert(curr_idx, rep); 171 + curr_idx = pa_idx; 172 + pa_idx = *parents.get(&curr_idx).unwrap(); 173 + } 174 + Some(curr_idx) 175 + } 176 + 177 + // find the representative node for a given index 178 + fn find_qo(&mut self, idx: UFIndex) -> Option<UFIndex> { 179 + Self::find(&mut self.parents_qo, idx) 180 + } 181 + 182 + fn find_rq(&mut self, idx: UFIndex) -> Option<UFIndex> { 183 + Self::find(&mut self.parents_rq, idx) 184 + } 185 + 186 + 187 + fn union_qo(&mut self, idx1: UFIndex, idx2: UFIndex) { 188 + let pa1 = self.find_qo(idx1).unwrap(); 189 + let pa2 = self.find_qo(idx2).unwrap(); 190 + 191 + if pa1 == pa2 { 192 + return; 193 + } 194 + 195 + let pa1_size = self.sizes_qo.get(&pa1).unwrap(); 196 + let pa2_size = self.sizes_qo.get(&pa2).unwrap(); 197 + 198 + if pa1_size >= pa2_size { 199 + self.parents_qo.insert(pa2, pa1); 200 + self.sizes_qo.insert(pa1, pa1_size + pa2_size); 201 + self.sizes_qo.remove(&pa2); 202 + } else { 203 + self.parents_qo.insert(pa1, pa2); 204 + self.sizes_qo.insert(pa2, pa1_size + pa2_size); 205 + self.sizes_qo.remove(&pa1); 206 + } 207 + } 208 + 209 + fn union_rq(&mut self, idx1: UFIndex, idx2: UFIndex) { 210 + let pa1 = self.find_rq(idx1).unwrap(); 211 + let pa2 = self.find_rq(idx2).unwrap(); 212 + 213 + if pa1 == pa2 { 214 + return; 215 + } 216 + 217 + let pa1_size = self.sizes_rq.get(&pa1).unwrap(); 218 + let pa2_size = self.sizes_rq.get(&pa2).unwrap(); 219 + 220 + if pa1_size >= pa2_size { 221 + self.parents_rq.insert(pa2, pa1); 222 + self.sizes_rq.insert(pa1, pa1_size + pa2_size); 223 + self.sizes_rq.remove(&pa2); 224 + } else { 225 + self.parents_rq.insert(pa1, pa2); 226 + self.sizes_rq.insert(pa2, pa1_size + pa2_size); 227 + self.sizes_rq.remove(&pa1); 228 + } 229 + } 230 + 231 + pub fn component_size(&mut self, subgraph_type: SubgraphType, id: &Id) -> Option<UFSize> { 232 + let idx = self.id_to_index.get(id).map(|idx| *idx)?; 233 + match subgraph_type { 234 + SubgraphType::Reply => { 235 + // special structure of replies means all non-root nodes are direct children of root 236 + let pa = *self.parents_ro.get(&idx)?; 237 + self.sizes_ro.get(&pa).map(|idx| *idx) 238 + }, 239 + SubgraphType::Quote => { 240 + let pa = self.find_qo(idx)?; 241 + self.sizes_qo.get(&pa).map(|idx| *idx) 242 + }, 243 + SubgraphType::ReplyQuote => { 244 + let pa = self.find_rq(idx)?; 245 + self.sizes_rq.get(&pa).map(|idx| *idx) 246 + }, 247 + } 248 + } 249 + } 250 + 251 + #[cfg(test)] 252 + mod tests { 253 + use itertools::Itertools; 254 + 255 + use super::super::{BskyPostId, BskyPostRecord, BskyPostReplyTo}; 256 + use super::{BskyPostUnionFind, SubgraphType}; 257 + 258 + #[test] 259 + fn test_simple_reply_thread_1() { 260 + let mut uf = BskyPostUnionFind::new(); 261 + 262 + let id1 = BskyPostId::new("a", "1"); 263 + let id2 = BskyPostId::new("b", "1"); 264 + let id3 = BskyPostId::new("a", "2"); 265 + let id4 = BskyPostId::new("a", "3"); 266 + uf.ingest_post(BskyPostRecord { 267 + id: id2.clone(), 268 + reply_to: Some(BskyPostReplyTo { target: id1.clone(), root: id1.clone() }), 269 + quote_of: None, 270 + created_at: None 271 + }); 272 + uf.ingest_post(BskyPostRecord { 273 + id: id3.clone(), 274 + reply_to: Some(BskyPostReplyTo { target: id2.clone(), root: id1.clone() }), 275 + quote_of: None, 276 + created_at: None 277 + }); 278 + uf.ingest_post(BskyPostRecord { 279 + id: id4.clone(), 280 + reply_to: Some(BskyPostReplyTo { target: id3.clone(), root: id1.clone() }), 281 + quote_of: None, 282 + created_at: None 283 + }); 284 + 285 + assert_eq!(uf.component_size(SubgraphType::Reply, &id1), Some(4)); 286 + assert_eq!(uf.component_size(SubgraphType::Reply, &id2), Some(4)); 287 + assert_eq!(uf.component_size(SubgraphType::Reply, &id3), Some(4)); 288 + assert_eq!(uf.component_size(SubgraphType::Reply, &id4), Some(4)); 289 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id1), Some(4)); 290 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id2), Some(4)); 291 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id3), Some(4)); 292 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id4), Some(4)); 293 + 294 + } 295 + 296 + #[test] 297 + fn test_simple_reply_thread_2() { 298 + let mut uf = BskyPostUnionFind::new(); 299 + 300 + let id1 = BskyPostId::new("a", "1"); 301 + let id2 = BskyPostId::new("b", "1"); 302 + let id3 = BskyPostId::new("a", "2"); 303 + let id4 = BskyPostId::new("a", "3"); 304 + uf.ingest_post(BskyPostRecord { 305 + id: id4.clone(), 306 + reply_to: Some(BskyPostReplyTo { target: id3.clone(), root: id1.clone() }), 307 + quote_of: None, 308 + created_at: None 309 + }); 310 + uf.ingest_post(BskyPostRecord { 311 + id: id3.clone(), 312 + reply_to: Some(BskyPostReplyTo { target: id2.clone(), root: id1.clone() }), 313 + quote_of: None, 314 + created_at: None 315 + }); 316 + uf.ingest_post(BskyPostRecord { 317 + id: id2.clone(), 318 + reply_to: Some(BskyPostReplyTo { target: id1.clone(), root: id1.clone() }), 319 + quote_of: None, 320 + created_at: None 321 + }); 322 + 323 + assert_eq!(uf.component_size(SubgraphType::Reply, &id1), Some(4)); 324 + assert_eq!(uf.component_size(SubgraphType::Reply, &id2), Some(4)); 325 + assert_eq!(uf.component_size(SubgraphType::Reply, &id3), Some(4)); 326 + assert_eq!(uf.component_size(SubgraphType::Reply, &id4), Some(4)); 327 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id1), Some(4)); 328 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id2), Some(4)); 329 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id3), Some(4)); 330 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id4), Some(4)); 331 + 332 + } 333 + 334 + #[test] 335 + fn test_simple_quote_chain() { 336 + let id1 = BskyPostId::new("a", "1"); 337 + let id2 = BskyPostId::new("b", "1"); 338 + let id3 = BskyPostId::new("a", "2"); 339 + let id4 = BskyPostId::new("a", "3"); 340 + let records: Vec<BskyPostRecord<_>> = vec![ 341 + BskyPostRecord { 342 + id: id2.clone(), 343 + reply_to: None, 344 + quote_of: Some(id1.clone()), 345 + created_at: None 346 + }, 347 + BskyPostRecord { 348 + id: id3.clone(), 349 + reply_to: None, 350 + quote_of: Some(id2.clone()), 351 + created_at: None 352 + }, 353 + BskyPostRecord { 354 + id: id4.clone(), 355 + reply_to: None, 356 + quote_of: Some(id3.clone()), 357 + created_at: None 358 + } 359 + ]; 360 + 361 + for (i, perm) in records.iter().permutations(records.len()).enumerate() { 362 + let mut uf = BskyPostUnionFind::new(); 363 + uf.ingest_post(perm[0].clone()); 364 + uf.ingest_post(perm[1].clone()); 365 + uf.ingest_post(perm[2].clone()); 366 + assert_eq!(uf.component_size(SubgraphType::Quote, &id1), Some(4)); 367 + assert_eq!(uf.component_size(SubgraphType::Quote, &id2), Some(4)); 368 + assert_eq!(uf.component_size(SubgraphType::Quote, &id3), Some(4)); 369 + assert_eq!(uf.component_size(SubgraphType::Quote, &id4), Some(4)); 370 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id1), Some(4)); 371 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id2), Some(4)); 372 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id3), Some(4)); 373 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id4), Some(4)); 374 + } 375 + 376 + 377 + 378 + } 379 + 380 + #[test] 381 + fn test_simple_reply_quote_graph_1() { 382 + let id1 = BskyPostId::new("a", "1"); 383 + let id2 = BskyPostId::new("b", "1"); 384 + let id3 = BskyPostId::new("a", "2"); 385 + let id4 = BskyPostId::new("a", "3"); 386 + let records: Vec<BskyPostRecord<_>> = vec![ 387 + BskyPostRecord { 388 + id: id2.clone(), 389 + reply_to: Some(BskyPostReplyTo { target: id1.clone(), root: id1.clone() }), 390 + quote_of: None, 391 + created_at: None 392 + }, 393 + BskyPostRecord { 394 + id: id3.clone(), 395 + reply_to: Some(BskyPostReplyTo { target: id2.clone(), root: id1.clone() }), 396 + quote_of: None, 397 + created_at: None 398 + }, 399 + BskyPostRecord { 400 + id: id4.clone(), 401 + reply_to: None, 402 + quote_of: Some(id3.clone()), 403 + created_at: None 404 + } 405 + ]; 406 + 407 + for perm in records.iter().permutations(records.len()) { 408 + let mut uf = BskyPostUnionFind::new(); 409 + uf.ingest_post(perm[0].clone()); 410 + uf.ingest_post(perm[1].clone()); 411 + uf.ingest_post(perm[2].clone()); 412 + assert_eq!(uf.component_size(SubgraphType::Reply, &id1), Some(3)); 413 + assert_eq!(uf.component_size(SubgraphType::Reply, &id2), Some(3)); 414 + assert_eq!(uf.component_size(SubgraphType::Reply, &id3), Some(3)); 415 + assert_eq!(uf.component_size(SubgraphType::Reply, &id4), None); 416 + assert_eq!(uf.component_size(SubgraphType::Quote, &id3), Some(2)); 417 + assert_eq!(uf.component_size(SubgraphType::Quote, &id4), Some(2)); 418 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id1), Some(4)); 419 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id2), Some(4)); 420 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id3), Some(4)); 421 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id4), Some(4)); 422 + } 423 + } 424 + 425 + #[test] 426 + fn test_simple_reply_quote_graph_2() { 427 + let id1 = BskyPostId::new("a", "1"); 428 + let id2 = BskyPostId::new("b", "1"); 429 + let id3 = BskyPostId::new("a", "2"); 430 + let id4 = BskyPostId::new("a", "3"); 431 + let records: Vec<BskyPostRecord<_>> = vec![ 432 + BskyPostRecord { 433 + id: id2.clone(), 434 + reply_to: Some(BskyPostReplyTo { target: id1.clone(), root: id1.clone() }), 435 + quote_of: None, 436 + created_at: None 437 + }, 438 + BskyPostRecord { 439 + id: id4.clone(), 440 + reply_to: Some(BskyPostReplyTo { target: id3.clone(), root: id3.clone() }), 441 + quote_of: Some(id2.clone()), 442 + created_at: None 443 + }, 444 + ]; 445 + 446 + for (i, perm) in records.iter().permutations(records.len()).enumerate() { 447 + let mut uf = BskyPostUnionFind::new(); 448 + uf.ingest_post(perm[0].clone()); 449 + uf.ingest_post(perm[1].clone()); 450 + assert_eq!(uf.component_size(SubgraphType::Reply, &id1), Some(2)); 451 + assert_eq!(uf.component_size(SubgraphType::Reply, &id2), Some(2)); 452 + assert_eq!(uf.component_size(SubgraphType::Reply, &id3), Some(2)); 453 + assert_eq!(uf.component_size(SubgraphType::Reply, &id4), Some(2)); 454 + assert_eq!(uf.component_size(SubgraphType::Quote, &id1), None); 455 + assert_eq!(uf.component_size(SubgraphType::Quote, &id2), Some(2)); 456 + assert_eq!(uf.component_size(SubgraphType::Quote, &id3), None); 457 + assert_eq!(uf.component_size(SubgraphType::Quote, &id4), Some(2)); 458 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id1), Some(4)); 459 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id2), Some(4)); 460 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id3), Some(4)); 461 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id4), Some(4)); 462 + } 463 + 464 + } 465 + 466 + #[test] 467 + fn test_reply_quote_graph() { 468 + let id1 = BskyPostId::new("a", "1"); 469 + let id2 = BskyPostId::new("b", "1"); 470 + let id3 = BskyPostId::new("c", "1"); 471 + let id4 = BskyPostId::new("a", "2"); 472 + let id5 = BskyPostId::new("a", "3"); 473 + let id6 = BskyPostId::new("a", "4"); 474 + let records: Vec<BskyPostRecord<_>> = vec![ 475 + BskyPostRecord { 476 + id: id3.clone(), 477 + reply_to: Some(BskyPostReplyTo { target: id2.clone(), root: id1.clone() }), 478 + quote_of: None, 479 + created_at: None 480 + }, 481 + BskyPostRecord { 482 + id: id6.clone(), 483 + reply_to: Some(BskyPostReplyTo { target: id5.clone(), root: id4.clone() }), 484 + quote_of: Some(id3.clone()), 485 + created_at: None 486 + }, 487 + ]; 488 + 489 + for (i, perm) in records.iter().permutations(records.len()).enumerate() { 490 + let mut uf = BskyPostUnionFind::new(); 491 + uf.ingest_post(perm[0].clone()); 492 + uf.ingest_post(perm[1].clone()); 493 + assert_eq!(uf.component_size(SubgraphType::Reply, &id1), Some(3)); 494 + assert_eq!(uf.component_size(SubgraphType::Reply, &id2), Some(3)); 495 + assert_eq!(uf.component_size(SubgraphType::Reply, &id3), Some(3)); 496 + assert_eq!(uf.component_size(SubgraphType::Reply, &id4), Some(3)); 497 + assert_eq!(uf.component_size(SubgraphType::Reply, &id5), Some(3)); 498 + assert_eq!(uf.component_size(SubgraphType::Reply, &id6), Some(3)); 499 + assert_eq!(uf.component_size(SubgraphType::Quote, &id1), None); 500 + assert_eq!(uf.component_size(SubgraphType::Quote, &id2), None); 501 + assert_eq!(uf.component_size(SubgraphType::Quote, &id3), Some(2)); 502 + assert_eq!(uf.component_size(SubgraphType::Quote, &id4), None); 503 + assert_eq!(uf.component_size(SubgraphType::Quote, &id5), None); 504 + assert_eq!(uf.component_size(SubgraphType::Quote, &id6), Some(2)); 505 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id1), Some(6)); 506 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id2), Some(6)); 507 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id3), Some(6)); 508 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id4), Some(6)); 509 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id5), Some(6)); 510 + assert_eq!(uf.component_size(SubgraphType::ReplyQuote, &id6), Some(6)); 511 + } 512 + 513 + } 514 + }