tangled
alpha
login
or
join now
parakeet.at
/
parakeet
63
fork
atom
Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview
atproto
bluesky
rust
appserver
63
fork
atom
overview
issues
12
pulls
pipelines
feat(consumer): guess who rewrote backfill again?
mia.omg.lol
2 weeks ago
283d69b4
19c77fcd
verified
This commit was signed with the committer's
known signature
.
mia.omg.lol
SSH Key Fingerprint:
SHA256:eb+NhC0QEl+XKRuFP/97oH6LEz0TXTKPXGDIAI5y7CQ=
+523
-553
8 changed files
expand all
collapse all
unified
split
Cargo.lock
crates
consumer
Cargo.toml
src
backfill
downloader.rs
mod.rs
repo.rs
types.rs
utils.rs
config.rs
+166
-17
Cargo.lock
···
70
70
]
71
71
72
72
[[package]]
73
73
+
name = "allocator-api2"
74
74
+
version = "0.2.21"
75
75
+
source = "registry+https://github.com/rust-lang/crates.io-index"
76
76
+
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
77
77
+
78
78
+
[[package]]
73
79
name = "android_system_properties"
74
80
version = "0.1.5"
75
81
source = "registry+https://github.com/rust-lang/crates.io-index"
···
488
494
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
489
495
490
496
[[package]]
497
497
+
name = "byteorder-lite"
498
498
+
version = "0.1.0"
499
499
+
source = "registry+https://github.com/rust-lang/crates.io-index"
500
500
+
checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"
501
501
+
502
502
+
[[package]]
491
503
name = "bytes"
492
504
version = "1.11.0"
493
505
source = "registry+https://github.com/rust-lang/crates.io-index"
···
495
507
dependencies = [
496
508
"serde",
497
509
]
510
510
+
511
511
+
[[package]]
512
512
+
name = "byteview"
513
513
+
version = "0.10.1"
514
514
+
source = "registry+https://github.com/rust-lang/crates.io-index"
515
515
+
checksum = "1c53ba0f290bfc610084c05582d9c5d421662128fc69f4bf236707af6fd321b9"
498
516
499
517
[[package]]
500
518
name = "bzip2-sys"
···
702
720
]
703
721
704
722
[[package]]
723
723
+
name = "compare"
724
724
+
version = "0.0.6"
725
725
+
source = "registry+https://github.com/rust-lang/crates.io-index"
726
726
+
checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7"
727
727
+
728
728
+
[[package]]
705
729
name = "compression-codecs"
706
730
version = "0.4.36"
707
731
source = "registry+https://github.com/rust-lang/crates.io-index"
···
741
765
"deadpool-postgres",
742
766
"eyre",
743
767
"figment",
744
744
-
"flume",
745
745
-
"foldhash",
768
768
+
"foldhash 0.1.5",
746
769
"futures",
747
770
"ipld-core",
748
771
"iroh-car",
···
754
777
"parakeet-db",
755
778
"parakeet-index",
756
779
"redis",
780
780
+
"repo-stream",
757
781
"reqwest",
758
782
"serde",
759
783
"serde_bytes",
···
855
879
source = "registry+https://github.com/rust-lang/crates.io-index"
856
880
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
857
881
dependencies = [
882
882
+
"crossbeam-utils",
883
883
+
]
884
884
+
885
885
+
[[package]]
886
886
+
name = "crossbeam-skiplist"
887
887
+
version = "0.1.3"
888
888
+
source = "registry+https://github.com/rust-lang/crates.io-index"
889
889
+
checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b"
890
890
+
dependencies = [
891
891
+
"crossbeam-epoch",
858
892
"crossbeam-utils",
859
893
]
860
894
···
1305
1339
]
1306
1340
1307
1341
[[package]]
1342
1342
+
name = "enum_dispatch"
1343
1343
+
version = "0.3.13"
1344
1344
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1345
1345
+
checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd"
1346
1346
+
dependencies = [
1347
1347
+
"once_cell",
1348
1348
+
"proc-macro2",
1349
1349
+
"quote",
1350
1350
+
"syn 2.0.114",
1351
1351
+
]
1352
1352
+
1353
1353
+
[[package]]
1308
1354
name = "equivalent"
1309
1355
version = "1.0.2"
1310
1356
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1385
1431
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
1386
1432
1387
1433
[[package]]
1434
1434
+
name = "fjall"
1435
1435
+
version = "3.0.2"
1436
1436
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1437
1437
+
checksum = "5a2799b4198427a08c774838e44d0b77f677208f19a1927671cd2cd36bb30d69"
1438
1438
+
dependencies = [
1439
1439
+
"byteorder-lite",
1440
1440
+
"byteview",
1441
1441
+
"dashmap",
1442
1442
+
"flume",
1443
1443
+
"log",
1444
1444
+
"lsm-tree",
1445
1445
+
"tempfile",
1446
1446
+
"xxhash-rust",
1447
1447
+
]
1448
1448
+
1449
1449
+
[[package]]
1388
1450
name = "flate2"
1389
1451
version = "1.1.8"
1390
1452
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1396
1458
1397
1459
[[package]]
1398
1460
name = "flume"
1399
1399
-
version = "0.11.1"
1461
1461
+
version = "0.12.0"
1400
1462
source = "registry+https://github.com/rust-lang/crates.io-index"
1401
1401
-
checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
1463
1463
+
checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be"
1402
1464
dependencies = [
1403
1403
-
"futures-core",
1404
1404
-
"futures-sink",
1405
1405
-
"nanorand",
1406
1465
"spin 0.9.8",
1407
1466
]
1408
1467
···
1417
1476
version = "0.1.5"
1418
1477
source = "registry+https://github.com/rust-lang/crates.io-index"
1419
1478
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
1479
1479
+
1480
1480
+
[[package]]
1481
1481
+
name = "foldhash"
1482
1482
+
version = "0.2.0"
1483
1483
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1484
1484
+
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
1420
1485
1421
1486
[[package]]
1422
1487
name = "foreign-types"
···
1709
1774
source = "registry+https://github.com/rust-lang/crates.io-index"
1710
1775
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
1711
1776
dependencies = [
1712
1712
-
"foldhash",
1777
1777
+
"foldhash 0.1.5",
1713
1778
]
1714
1779
1715
1780
[[package]]
···
1717
1782
version = "0.16.1"
1718
1783
source = "registry+https://github.com/rust-lang/crates.io-index"
1719
1784
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
1785
1785
+
dependencies = [
1786
1786
+
"allocator-api2",
1787
1787
+
"equivalent",
1788
1788
+
"foldhash 0.2.0",
1789
1789
+
]
1720
1790
1721
1791
[[package]]
1722
1792
name = "headers"
···
2176
2246
]
2177
2247
2178
2248
[[package]]
2249
2249
+
name = "interval-heap"
2250
2250
+
version = "0.0.5"
2251
2251
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2252
2252
+
checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6"
2253
2253
+
dependencies = [
2254
2254
+
"compare",
2255
2255
+
]
2256
2256
+
2257
2257
+
[[package]]
2179
2258
name = "inventory"
2180
2259
version = "0.3.21"
2181
2260
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2604
2683
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
2605
2684
2606
2685
[[package]]
2686
2686
+
name = "lsm-tree"
2687
2687
+
version = "3.0.2"
2688
2688
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2689
2689
+
checksum = "86e8d0b8e0cf2531a437788ce94d95570dbaabfe9888db20022c2d5ccec9b221"
2690
2690
+
dependencies = [
2691
2691
+
"byteorder-lite",
2692
2692
+
"byteview",
2693
2693
+
"crossbeam-skiplist",
2694
2694
+
"enum_dispatch",
2695
2695
+
"interval-heap",
2696
2696
+
"log",
2697
2697
+
"quick_cache",
2698
2698
+
"rustc-hash",
2699
2699
+
"self_cell",
2700
2700
+
"sfa",
2701
2701
+
"tempfile",
2702
2702
+
"varint-rs",
2703
2703
+
"xxhash-rust",
2704
2704
+
]
2705
2705
+
2706
2706
+
[[package]]
2607
2707
name = "lz4-sys"
2608
2708
version = "1.11.1+lz4-1.10.0"
2609
2709
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2841
2941
"wasm-bindgen",
2842
2942
"wasm-bindgen-futures",
2843
2943
"web-time",
2844
2844
-
]
2845
2845
-
2846
2846
-
[[package]]
2847
2847
-
name = "nanorand"
2848
2848
-
version = "0.7.0"
2849
2849
-
source = "registry+https://github.com/rust-lang/crates.io-index"
2850
2850
-
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
2851
2851
-
dependencies = [
2852
2852
-
"getrandom 0.2.17",
2853
2944
]
2854
2945
2855
2946
[[package]]
···
3645
3736
]
3646
3737
3647
3738
[[package]]
3739
3739
+
name = "quick_cache"
3740
3740
+
version = "0.6.18"
3741
3741
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3742
3742
+
checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3"
3743
3743
+
dependencies = [
3744
3744
+
"equivalent",
3745
3745
+
"hashbrown 0.16.1",
3746
3746
+
]
3747
3747
+
3748
3748
+
[[package]]
3648
3749
name = "quinn"
3649
3750
version = "0.11.9"
3650
3751
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3895
3996
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
3896
3997
3897
3998
[[package]]
3999
3999
+
name = "repo-stream"
4000
4000
+
version = "0.4.0"
4001
4001
+
source = "registry+https://github.com/rust-lang/crates.io-index"
4002
4002
+
checksum = "6c95af4a1465ac3a5a5aa4921b60f8761c1e126deb53a5b1a2e40abd0ec6ae1f"
4003
4003
+
dependencies = [
4004
4004
+
"cid",
4005
4005
+
"fjall",
4006
4006
+
"hashbrown 0.16.1",
4007
4007
+
"iroh-car",
4008
4008
+
"log",
4009
4009
+
"serde",
4010
4010
+
"serde_bytes",
4011
4011
+
"serde_ipld_dagcbor",
4012
4012
+
"sha2",
4013
4013
+
"thiserror 2.0.18",
4014
4014
+
"tokio",
4015
4015
+
]
4016
4016
+
4017
4017
+
[[package]]
3898
4018
name = "reqwest"
3899
4019
version = "0.12.28"
3900
4020
source = "registry+https://github.com/rust-lang/crates.io-index"
···
4197
4317
]
4198
4318
4199
4319
[[package]]
4320
4320
+
name = "self_cell"
4321
4321
+
version = "1.2.2"
4322
4322
+
source = "registry+https://github.com/rust-lang/crates.io-index"
4323
4323
+
checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89"
4324
4324
+
4325
4325
+
[[package]]
4200
4326
name = "semver"
4201
4327
version = "1.0.27"
4202
4328
source = "registry+https://github.com/rust-lang/crates.io-index"
···
4367
4493
"proc-macro2",
4368
4494
"quote",
4369
4495
"syn 2.0.114",
4496
4496
+
]
4497
4497
+
4498
4498
+
[[package]]
4499
4499
+
name = "sfa"
4500
4500
+
version = "1.0.0"
4501
4501
+
source = "registry+https://github.com/rust-lang/crates.io-index"
4502
4502
+
checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175"
4503
4503
+
dependencies = [
4504
4504
+
"byteorder-lite",
4505
4505
+
"log",
4506
4506
+
"xxhash-rust",
4370
4507
]
4371
4508
4372
4509
[[package]]
···
5401
5538
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
5402
5539
5403
5540
[[package]]
5541
5541
+
name = "varint-rs"
5542
5542
+
version = "2.2.0"
5543
5543
+
source = "registry+https://github.com/rust-lang/crates.io-index"
5544
5544
+
checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
5545
5545
+
5546
5546
+
[[package]]
5404
5547
name = "vcpkg"
5405
5548
version = "0.2.15"
5406
5549
source = "registry+https://github.com/rust-lang/crates.io-index"
···
5916
6059
version = "0.6.2"
5917
6060
source = "registry+https://github.com/rust-lang/crates.io-index"
5918
6061
checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9"
6062
6062
+
6063
6063
+
[[package]]
6064
6064
+
name = "xxhash-rust"
6065
6065
+
version = "0.8.15"
6066
6066
+
source = "registry+https://github.com/rust-lang/crates.io-index"
6067
6067
+
checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
5919
6068
5920
6069
[[package]]
5921
6070
name = "yansi"
+1
-1
crates/consumer/Cargo.toml
···
10
10
deadpool-postgres = { version = "0.14", features = ["serde"] }
11
11
eyre = "0.6.12"
12
12
figment = { version = "0.10.19", features = ["env", "toml"] }
13
13
-
flume = { version = "0.11", features = ["async"] }
14
13
foldhash = "0.1"
15
14
futures = "0.3.31"
16
15
ipld-core = "0.4"
···
38
37
tokio-util = { version = "0.7", features = ["io", "rt"] }
39
38
tracing = "0.1"
40
39
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
40
40
+
repo-stream = "0.4.0"
-338
crates/consumer/src/backfill/downloader.rs
···
1
1
-
use super::DL_DONE_KEY;
2
2
-
use crate::db;
3
3
-
use chrono::prelude::*;
4
4
-
use deadpool_postgres::{Client as PgClient, Pool};
5
5
-
use futures::TryStreamExt;
6
6
-
use jacquard_common::types::did::Did;
7
7
-
use jacquard_common::types::string::Handle;
8
8
-
use jacquard_identity::resolver::IdentityResolver;
9
9
-
use jacquard_identity::JacquardResolver;
10
10
-
use metrics::{counter, histogram};
11
11
-
use parakeet_db::types::{ActorStatus, ActorSyncState};
12
12
-
use redis::aio::MultiplexedConnection;
13
13
-
use redis::AsyncTypedCommands;
14
14
-
use reqwest::header::HeaderMap;
15
15
-
use reqwest::Client as HttpClient;
16
16
-
use std::path::{Path, PathBuf};
17
17
-
use tokio::sync::watch::Receiver as WatchReceiver;
18
18
-
use tokio::time::{Duration, Instant};
19
19
-
use tokio_postgres::types::Type;
20
20
-
use tokio_util::io::StreamReader;
21
21
-
use tokio_util::task::TaskTracker;
22
22
-
use tracing::instrument;
23
23
-
24
24
-
const BF_RESET_KEY: &str = "bf_download_ratelimit_reset";
25
25
-
const BF_REM_KEY: &str = "bf_download_ratelimit_rem";
26
26
-
const DL_DUP_KEY: &str = "bf_downloaded";
27
27
-
28
28
-
pub async fn downloader(
29
29
-
mut rc: MultiplexedConnection,
30
30
-
pool: Pool,
31
31
-
resolver: JacquardResolver,
32
32
-
tmp_dir: PathBuf,
33
33
-
concurrency: usize,
34
34
-
buffer: usize,
35
35
-
tracker: TaskTracker,
36
36
-
stop: WatchReceiver<bool>,
37
37
-
) {
38
38
-
let (tx, rx) = flume::bounded(64);
39
39
-
let mut conn = pool.get().await.unwrap();
40
40
-
41
41
-
let http = HttpClient::new();
42
42
-
43
43
-
for _ in 0..concurrency {
44
44
-
tracker.spawn(download_thread(
45
45
-
rc.clone(),
46
46
-
pool.clone(),
47
47
-
resolver.clone(),
48
48
-
http.clone(),
49
49
-
rx.clone(),
50
50
-
tmp_dir.clone(),
51
51
-
));
52
52
-
}
53
53
-
54
54
-
let status_stmt = conn.prepare_typed_cached(
55
55
-
"INSERT INTO actors (did, sync_state, last_indexed) VALUES ($1, 'processing', NOW()) ON CONFLICT (did) DO UPDATE SET sync_state = 'processing', last_indexed=NOW()",
56
56
-
&[Type::TEXT]
57
57
-
).await.unwrap();
58
58
-
59
59
-
loop {
60
60
-
if stop.has_changed().unwrap_or(true) {
61
61
-
tracing::info!("stopping downloader");
62
62
-
break;
63
63
-
}
64
64
-
65
65
-
if let Ok(count) = rc.llen(DL_DONE_KEY).await {
66
66
-
if count > buffer {
67
67
-
tracing::info!("waiting due to full buffer");
68
68
-
tokio::time::sleep(Duration::from_secs(5)).await;
69
69
-
continue;
70
70
-
}
71
71
-
}
72
72
-
73
73
-
let did: String = match rc.lpop("backfill_queue", None).await {
74
74
-
Ok(Some(did)) => did,
75
75
-
Ok(None) => {
76
76
-
tokio::time::sleep(Duration::from_millis(250)).await;
77
77
-
continue;
78
78
-
}
79
79
-
Err(e) => {
80
80
-
tracing::error!("failed to get item from backfill queue: {e}");
81
81
-
continue;
82
82
-
}
83
83
-
};
84
84
-
85
85
-
tracing::trace!("resolving repo {did}");
86
86
-
87
87
-
// has the repo already been downloaded?
88
88
-
if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() {
89
89
-
tracing::info!("skipping duplicate repo {did}");
90
90
-
continue;
91
91
-
}
92
92
-
93
93
-
// check if they're already synced in DB too
94
94
-
match db::actor_get_statuses(&mut conn, &did).await {
95
95
-
Ok(Some((_, state))) => {
96
96
-
if state == ActorSyncState::Synced || state == ActorSyncState::Processing {
97
97
-
tracing::info!("skipping duplicate repo {did}");
98
98
-
continue;
99
99
-
}
100
100
-
}
101
101
-
Ok(None) => {}
102
102
-
Err(e) => {
103
103
-
tracing::error!(did, "failed to check current repo status: {e}");
104
104
-
db::backfill_job_write(&mut conn, &did, "failed.resolve")
105
105
-
.await
106
106
-
.unwrap();
107
107
-
}
108
108
-
}
109
109
-
110
110
-
let jd = Did::raw(&did);
111
111
-
match resolver
112
112
-
.resolve_did_doc(&jd)
113
113
-
.await
114
114
-
.and_then(|v| v.into_owned())
115
115
-
{
116
116
-
Ok(did_doc) => {
117
117
-
let Some(service) = did_doc.pds_endpoint() else {
118
118
-
tracing::warn!("bad DID doc for {did}");
119
119
-
db::backfill_job_write(&mut conn, &did, "failed.resolve.did_svc")
120
120
-
.await
121
121
-
.unwrap();
122
122
-
continue;
123
123
-
};
124
124
-
let service = service.to_string();
125
125
-
126
126
-
// set the repo to processing
127
127
-
if let Err(e) = conn.execute(&status_stmt, &[&did.as_str()]).await {
128
128
-
tracing::error!("failed to update repo status for {did}: {e}");
129
129
-
continue;
130
130
-
}
131
131
-
132
132
-
let handle = did_doc.handles().first().cloned();
133
133
-
134
134
-
tracing::trace!("resolved repo {did} {service}");
135
135
-
if let Err(e) = tx.send_async((service, did, handle)).await {
136
136
-
tracing::error!("failed to send: {e}");
137
137
-
}
138
138
-
}
139
139
-
Err(e) => {
140
140
-
tracing::error!(did = did.as_str(), "failed to resolve DID doc: {e}");
141
141
-
db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now())
142
142
-
.await
143
143
-
.unwrap();
144
144
-
db::backfill_job_write(&mut conn, &did, "failed.resolve.did")
145
145
-
.await
146
146
-
.unwrap();
147
147
-
}
148
148
-
}
149
149
-
}
150
150
-
}
151
151
-
152
152
-
async fn download_thread(
153
153
-
mut rc: MultiplexedConnection,
154
154
-
pool: Pool,
155
155
-
resolver: JacquardResolver,
156
156
-
http: reqwest::Client,
157
157
-
rx: flume::Receiver<(String, String, Option<Handle<'static>>)>,
158
158
-
tmp_dir: PathBuf,
159
159
-
) {
160
160
-
tracing::debug!("spawning thread");
161
161
-
162
162
-
// this will return Err(_) and exit when all senders (only held above) are dropped
163
163
-
while let Ok((pds, did, maybe_handle)) = rx.recv_async().await {
164
164
-
if let Err(e) = enforce_ratelimit(&mut rc, &pds).await {
165
165
-
tracing::error!("ratelimiter error: {e}");
166
166
-
continue;
167
167
-
};
168
168
-
169
169
-
{
170
170
-
tracing::trace!("getting DB conn...");
171
171
-
let mut conn = pool.get().await.unwrap();
172
172
-
tracing::trace!("got DB conn...");
173
173
-
match check_and_update_repo_status(&http, &mut conn, &pds, &did).await {
174
174
-
Ok(true) => {}
175
175
-
Ok(false) => continue,
176
176
-
Err(e) => {
177
177
-
tracing::error!(pds, did, "failed to check repo status: {e}");
178
178
-
db::backfill_job_write(&mut conn, &did, "failed.resolve.status")
179
179
-
.await
180
180
-
.unwrap();
181
181
-
continue;
182
182
-
}
183
183
-
}
184
184
-
185
185
-
tracing::debug!("trying to resolve handle...");
186
186
-
if let Some(handle) = maybe_handle {
187
187
-
if let Err(e) = resolve_and_set_handle(&conn, &resolver, &did, &handle).await {
188
188
-
tracing::error!(pds, did = did.as_str(), "failed to resolve handle: {e}");
189
189
-
db::backfill_job_write(&mut conn, &did, "failed.resolve.handle")
190
190
-
.await
191
191
-
.unwrap();
192
192
-
}
193
193
-
}
194
194
-
}
195
195
-
196
196
-
let start = Instant::now();
197
197
-
198
198
-
tracing::trace!("downloading repo {did}");
199
199
-
200
200
-
match download_car(&http, &tmp_dir, &pds, &did).await {
201
201
-
Ok(Some((rem, reset))) => {
202
202
-
let _ = rc.zadd(BF_REM_KEY, &pds, rem).await;
203
203
-
let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await;
204
204
-
}
205
205
-
Ok(_) => tracing::debug!(pds, "got response with no ratelimit headers."),
206
206
-
Err(e) => {
207
207
-
tracing::error!(pds, did, "failed to download repo: {e}");
208
208
-
continue;
209
209
-
}
210
210
-
}
211
211
-
212
212
-
histogram!("backfill_download_dur", "pds" => pds).record(start.elapsed().as_secs_f64());
213
213
-
214
214
-
let _ = rc.sadd(DL_DUP_KEY, &did).await;
215
215
-
if let Err(e) = rc.rpush(DL_DONE_KEY, &did).await {
216
216
-
tracing::error!(did, "failed to mark download complete: {e}");
217
217
-
} else {
218
218
-
counter!("backfill_downloaded").increment(1);
219
219
-
}
220
220
-
}
221
221
-
222
222
-
tracing::debug!("thread exiting");
223
223
-
}
224
224
-
225
225
-
async fn enforce_ratelimit(rc: &mut MultiplexedConnection, pds: &str) -> eyre::Result<()> {
226
226
-
let score = rc.zscore(BF_REM_KEY, pds).await?;
227
227
-
228
228
-
if let Some(rem) = score {
229
229
-
if (rem as i32) < 100 {
230
230
-
// if we've got None for some reason, just hope that the next req will contain the reset header.
231
231
-
if let Some(at) = rc.zscore(BF_RESET_KEY, pds).await? {
232
232
-
tracing::debug!("rate limit for {pds} resets at {at}");
233
233
-
let time = chrono::DateTime::from_timestamp(at as i64, 0).unwrap();
234
234
-
let delta = (time - Utc::now()).num_milliseconds().max(0);
235
235
-
236
236
-
tokio::time::sleep(Duration::from_millis(delta as u64)).await;
237
237
-
};
238
238
-
}
239
239
-
}
240
240
-
241
241
-
Ok(())
242
242
-
}
243
243
-
244
244
-
// you wouldn't...
245
245
-
#[instrument(skip(http, tmp_dir, pds))]
246
246
-
async fn download_car(
247
247
-
http: &HttpClient,
248
248
-
tmp_dir: &Path,
249
249
-
pds: &str,
250
250
-
did: &str,
251
251
-
) -> eyre::Result<Option<(i32, i32)>> {
252
252
-
let res = http
253
253
-
.get(format!("{pds}xrpc/com.atproto.sync.getRepo?did={did}"))
254
254
-
.send()
255
255
-
.await?
256
256
-
.error_for_status()?;
257
257
-
258
258
-
let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?;
259
259
-
260
260
-
let headers = res.headers();
261
261
-
let ratelimit_rem = header_to_int(headers, "ratelimit-remaining");
262
262
-
let ratelimit_reset = header_to_int(headers, "ratelimit-reset");
263
263
-
264
264
-
let strm = res.bytes_stream().map_err(std::io::Error::other);
265
265
-
let mut reader = StreamReader::new(strm);
266
266
-
267
267
-
tokio::io::copy(&mut reader, &mut file).await?;
268
268
-
269
269
-
Ok(ratelimit_rem.zip(ratelimit_reset))
270
270
-
}
271
271
-
272
272
-
// there's no ratelimit handling here because we pretty much always call download_car after.
273
273
-
#[instrument(skip(http, conn, pds))]
274
274
-
async fn check_and_update_repo_status(
275
275
-
http: &HttpClient,
276
276
-
conn: &mut PgClient,
277
277
-
pds: &str,
278
278
-
repo: &str,
279
279
-
) -> eyre::Result<bool> {
280
280
-
match super::check_pds_repo_status(http, pds, repo).await? {
281
281
-
Some(status) => {
282
282
-
if !status.active {
283
283
-
tracing::debug!("repo is inactive");
284
284
-
285
285
-
let status = status
286
286
-
.status
287
287
-
.unwrap_or(crate::firehose::AtpAccountStatus::Deleted);
288
288
-
conn.execute(
289
289
-
"UPDATE actors SET sync_state='dirty', status=$2 WHERE did=$1",
290
290
-
&[&repo, &ActorStatus::from(status)],
291
291
-
)
292
292
-
.await?;
293
293
-
294
294
-
Ok(false)
295
295
-
} else {
296
296
-
Ok(true)
297
297
-
}
298
298
-
}
299
299
-
None => {
300
300
-
// this repo can't be found - set dirty and assume deleted.
301
301
-
tracing::debug!("repo was deleted");
302
302
-
conn.execute(
303
303
-
"UPDATE actors SET sync_state='dirty', status='deleted' WHERE did=$1",
304
304
-
&[&repo],
305
305
-
)
306
306
-
.await?;
307
307
-
308
308
-
Ok(false)
309
309
-
}
310
310
-
}
311
311
-
}
312
312
-
313
313
-
async fn resolve_and_set_handle(
314
314
-
conn: &PgClient,
315
315
-
resolver: &JacquardResolver,
316
316
-
did: &str,
317
317
-
handle: &Handle<'_>,
318
318
-
) -> eyre::Result<()> {
319
319
-
let handle_did = resolver.resolve_handle(handle).await?;
320
320
-
if handle_did == Did::raw(did) {
321
321
-
conn.execute(
322
322
-
"UPDATE actors SET handle=$2 WHERE did=$1",
323
323
-
&[&did, &handle.as_str()],
324
324
-
)
325
325
-
.await?;
326
326
-
} else {
327
327
-
tracing::warn!("requested DID ({did}) doesn't match handle");
328
328
-
}
329
329
-
330
330
-
Ok(())
331
331
-
}
332
332
-
333
333
-
fn header_to_int(headers: &HeaderMap, name: &str) -> Option<i32> {
334
334
-
headers
335
335
-
.get(name)
336
336
-
.and_then(|v| v.to_str().ok())
337
337
-
.and_then(|v| v.parse().ok())
338
338
-
}
+128
-65
crates/consumer/src/backfill/mod.rs
···
5
5
use chrono::prelude::*;
6
6
use deadpool_postgres::{Object, Pool, Transaction};
7
7
use ipld_core::cid::Cid;
8
8
+
use jacquard_common::types::did::Did;
8
9
use jacquard_identity::JacquardResolver;
9
10
use lexica::StrongRef;
10
11
use metrics::counter;
11
11
-
use parakeet_db::types::{ActorStatus, ActorSyncState};
12
12
+
use parakeet_db::types::ActorSyncState;
12
13
use redis::aio::MultiplexedConnection;
13
14
use redis::AsyncTypedCommands;
14
14
-
use reqwest::{Client, StatusCode};
15
15
+
use reqwest::Client;
15
16
use std::path::PathBuf;
16
17
use std::str::FromStr;
17
18
use std::sync::Arc;
19
19
+
use std::time::Duration;
18
20
use tokio::sync::watch::Receiver as WatchReceiver;
19
21
use tokio::sync::Semaphore;
20
22
use tokio_util::task::TaskTracker;
21
23
use tracing::instrument;
22
24
23
23
-
mod downloader;
24
25
mod repo;
25
25
-
mod types;
26
26
+
mod utils;
26
27
27
27
-
const DL_DONE_KEY: &str = "bf_download_complete";
28
28
-
const PDS_SERVICE_ID: &str = "#atproto_pds";
28
28
+
const DL_DUP_KEY: &str = "bf_completed";
29
29
// There's a 4MiB limit on parakeet-index, so break delta batches up if there's loads.
30
30
// this should be plenty low enough to not trigger the size limit. (59k did slightly)
31
31
const DELTA_BATCH_SIZE: usize = 32 * 1024;
···
34
34
pub struct BackfillManagerInner {
35
35
index_client: Option<parakeet_index::Client>,
36
36
tmp_dir: PathBuf,
37
37
+
resolver: JacquardResolver,
38
38
+
client: Client,
37
39
}
38
40
39
41
pub struct BackfillManager {
40
42
pool: Pool,
41
43
redis: MultiplexedConnection,
42
42
-
resolver: JacquardResolver,
43
44
semaphore: Arc<Semaphore>,
44
44
-
opts: BackfillConfig,
45
45
inner: BackfillManagerInner,
46
46
}
47
47
···
55
55
) -> eyre::Result<Self> {
56
56
let semaphore = Arc::new(Semaphore::new(opts.workers as usize));
57
57
58
58
+
let client = Client::builder().brotli(true).build()?;
59
59
+
58
60
Ok(BackfillManager {
59
61
pool,
60
62
redis,
61
61
-
resolver,
62
63
semaphore,
63
64
inner: BackfillManagerInner {
64
65
index_client,
65
66
tmp_dir: PathBuf::from_str(&opts.download_tmp_dir)?,
67
67
+
resolver,
68
68
+
client,
66
69
},
67
67
-
opts,
68
70
})
69
71
}
70
72
71
73
pub async fn run(mut self, stop: WatchReceiver<bool>) -> eyre::Result<()> {
72
74
let tracker = TaskTracker::new();
73
75
74
74
-
tracker.spawn(downloader::downloader(
75
75
-
self.redis.clone(),
76
76
-
self.pool.clone(),
77
77
-
self.resolver,
78
78
-
self.inner.tmp_dir.clone(),
79
79
-
self.opts.download_workers,
80
80
-
self.opts.download_buffer,
81
81
-
tracker.clone(),
82
82
-
stop.clone(),
83
83
-
));
84
84
-
85
76
loop {
86
77
if stop.has_changed().unwrap_or(true) {
87
78
tracker.close();
···
89
80
break;
90
81
}
91
82
92
92
-
let Some(job): Option<String> = self.redis.lpop(DL_DONE_KEY, None).await? else {
93
93
-
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
94
94
-
continue;
83
83
+
let did: String = match self.redis.lpop("backfill_queue", None).await {
84
84
+
Ok(Some(did)) => did,
85
85
+
Ok(None) => {
86
86
+
tokio::time::sleep(Duration::from_millis(250)).await;
87
87
+
continue;
88
88
+
}
89
89
+
Err(e) => {
90
90
+
tracing::error!("failed to get item from backfill queue: {e}");
91
91
+
continue;
92
92
+
}
95
93
};
96
94
97
95
let p = self.semaphore.clone().acquire_owned().await?;
98
96
99
99
-
let mut inner = self.inner.clone();
97
97
+
let inner = self.inner.clone();
100
98
let mut conn = self.pool.get().await?;
101
101
-
let mut rc = self.redis.clone();
99
99
+
let rc = self.redis.clone();
102
100
103
101
tracker.spawn(async move {
104
102
let _p = p;
105
105
-
tracing::trace!("backfilling {job}");
103
103
+
tracing::trace!("backfilling {did}");
106
104
107
107
-
if let Err(e) = backfill_actor(&mut conn, &mut rc, &mut inner, &job).await {
108
108
-
tracing::error!(did = &job, "backfill failed: {e}");
105
105
+
if let Err(e) = do_actor_backfill(&mut conn, rc, inner, &did).await {
106
106
+
tracing::error!(did, "backfill failed: {e}");
109
107
counter!("backfill_failure").increment(1);
110
110
-
111
111
-
db::backfill_job_write(&mut conn, &job, "failed.write")
112
112
-
.await
113
113
-
.unwrap();
114
108
} else {
115
109
counter!("backfill_success").increment(1);
116
110
117
117
-
db::backfill_job_write(&mut conn, &job, "successful")
111
111
+
db::backfill_job_write(&mut conn, &did, "successful")
118
112
.await
119
113
.unwrap();
120
120
-
}
121
121
-
122
122
-
if let Err(e) = tokio::fs::remove_file(inner.tmp_dir.join(&job)).await {
123
123
-
tracing::error!(did = &job, "failed to remove file: {e}");
124
114
}
125
115
});
126
116
}
···
131
121
}
132
122
}
133
123
124
124
+
async fn do_actor_backfill(
125
125
+
conn: &mut Object,
126
126
+
mut rc: MultiplexedConnection,
127
127
+
mut inner: BackfillManagerInner,
128
128
+
did: &str,
129
129
+
) -> eyre::Result<()> {
130
130
+
// has the repo already been downloaded?
131
131
+
if rc.sismember(DL_DUP_KEY, did).await.unwrap_or_default() {
132
132
+
tracing::info!("skipping duplicate repo {did}");
133
133
+
return Ok(());
134
134
+
}
135
135
+
136
136
+
// check if they're already synced in DB too
137
137
+
match db::actor_get_statuses(conn, did).await {
138
138
+
Ok(Some((_, state))) => {
139
139
+
if state == ActorSyncState::Synced || state == ActorSyncState::Processing {
140
140
+
tracing::info!("skipping duplicate repo {did}");
141
141
+
return Ok(());
142
142
+
}
143
143
+
}
144
144
+
Ok(None) => {}
145
145
+
Err(e) => {
146
146
+
tracing::error!(did, "failed to check current repo status: {e}");
147
147
+
db::backfill_job_write(conn, did, "failed.resolve").await?;
148
148
+
}
149
149
+
}
150
150
+
151
151
+
// set the repo to processing
152
152
+
conn.execute(
153
153
+
r#"INSERT INTO actors (did, sync_state, last_indexed) VALUES ($1, 'processing', NOW())
154
154
+
ON CONFLICT (did) DO UPDATE SET sync_state = 'processing', last_indexed=NOW()"#,
155
155
+
&[&did],
156
156
+
)
157
157
+
.await?;
158
158
+
159
159
+
let jd = Did::raw(did);
160
160
+
let (pds, handle) = match utils::resolve_service(&inner.resolver, &jd).await {
161
161
+
Ok(Some(data)) => data,
162
162
+
Ok(None) => {
163
163
+
tracing::warn!("bad DID doc for {did}");
164
164
+
db::actor_set_sync_status(conn, did, ActorSyncState::Dirty, Utc::now()).await?;
165
165
+
db::backfill_job_write(conn, did, "failed.resolve.did_svc").await?;
166
166
+
return Ok(());
167
167
+
}
168
168
+
Err(e) => {
169
169
+
tracing::error!(did, "failed to resolve DID doc: {e}");
170
170
+
db::actor_set_sync_status(conn, did, ActorSyncState::Dirty, Utc::now()).await?;
171
171
+
db::backfill_job_write(conn, did, "failed.resolve.did").await?;
172
172
+
return Ok(());
173
173
+
}
174
174
+
};
175
175
+
176
176
+
match utils::check_and_update_repo_status(&reqwest::Client::new(), conn, &pds, did).await {
177
177
+
Ok(true) => {}
178
178
+
Ok(false) => return Ok(()),
179
179
+
Err(e) => {
180
180
+
tracing::error!(pds, did, "failed to check repo status: {e}");
181
181
+
db::actor_set_sync_status(conn, did, ActorSyncState::Dirty, Utc::now()).await?;
182
182
+
db::backfill_job_write(conn, did, "failed.resolve.status").await?;
183
183
+
}
184
184
+
}
185
185
+
186
186
+
if let Some(handle) = handle {
187
187
+
// validate the handle and set it. we don't fail the whole backfill if this fails bc it's not
188
188
+
// really *that* important.
189
189
+
if let Err(e) = utils::resolve_and_set_handle(conn, &inner.resolver, &jd, handle).await {
190
190
+
tracing::warn!(pds, did, "failed to resolve handle: {e}");
191
191
+
}
192
192
+
}
193
193
+
194
194
+
utils::enforce_ratelimit(&mut rc, &pds).await?;
195
195
+
196
196
+
match backfill_repo(conn, &mut rc, &mut inner, &pds, did).await {
197
197
+
Ok(Some((rem, reset))) => {
198
198
+
let _ = rc.zadd(utils::BF_REM_KEY, &pds, rem).await;
199
199
+
let _ = rc.zadd(utils::BF_RESET_KEY, &pds, reset).await;
200
200
+
}
201
201
+
Ok(None) => tracing::debug!(pds, "got response with no ratelimit headers."),
202
202
+
Err(e) => {
203
203
+
tracing::error!(did, "backfill failed: {e}");
204
204
+
db::actor_set_sync_status(conn, did, ActorSyncState::Dirty, Utc::now()).await?;
205
205
+
db::backfill_job_write(conn, did, "failed.write").await?;
206
206
+
}
207
207
+
}
208
208
+
209
209
+
Ok(())
210
210
+
}
211
211
+
134
212
#[instrument(skip(conn, rc, inner))]
135
135
-
async fn backfill_actor(
213
213
+
async fn backfill_repo(
136
214
conn: &mut Object,
137
215
rc: &mut MultiplexedConnection,
138
216
inner: &mut BackfillManagerInner,
217
217
+
pds: &str,
139
218
did: &str,
140
140
-
) -> eyre::Result<()> {
219
219
+
) -> eyre::Result<Option<(i32, i32)>> {
141
220
let mut t = conn.transaction().await?;
142
221
t.execute("SET CONSTRAINTS ALL DEFERRED", &[]).await?;
143
222
144
223
tracing::trace!("loading repo");
145
224
146
146
-
let (commit, mut deltas, copies) = repo::insert_repo(&mut t, rc, &inner.tmp_dir, did).await?;
225
225
+
let mut completed =
226
226
+
repo::process_repo(&mut t, rc, &inner.client, &inner.tmp_dir, pds, did).await?;
227
227
+
let rev = &completed.commit.rev;
147
228
148
148
-
db::actor_set_repo_state(&mut t, did, &commit.rev, commit.data).await?;
229
229
+
db::actor_set_repo_state(&mut t, did, rev, completed.commit.data).await?;
149
230
150
150
-
copies.submit(&mut t, did).await?;
231
231
+
completed.copies.submit(&mut t, did).await?;
151
232
152
233
t.execute(
153
234
"UPDATE actors SET sync_state=$2, last_indexed=$3 WHERE did=$1",
···
155
236
)
156
237
.await?;
157
238
158
158
-
handle_backfill_rows(&mut t, rc, &mut deltas, did, &commit.rev).await?;
239
239
+
handle_backfill_rows(&mut t, rc, &mut completed.deltas, did, rev).await?;
159
240
160
241
tracing::trace!("insertion finished");
161
242
162
243
if let Some(index_client) = &mut inner.index_client {
163
244
// submit the deltas
164
164
-
let delta_store = deltas
245
245
+
let delta_store = completed
246
246
+
.deltas
165
247
.into_iter()
166
248
.map(|((uri, typ), delta)| parakeet_index::AggregateDeltaReq {
167
249
typ,
···
190
272
191
273
t.commit().await?;
192
274
193
193
-
Ok(())
275
275
+
Ok(completed.ratelimit_rem.zip(completed.ratelimit_reset))
194
276
}
195
277
196
278
async fn handle_backfill_rows(
···
245
327
db::backfill_delete_rows(conn, repo).await?;
246
328
247
329
Ok(())
248
248
-
}
249
249
-
250
250
-
async fn check_pds_repo_status(
251
251
-
client: &Client,
252
252
-
pds: &str,
253
253
-
repo: &str,
254
254
-
) -> eyre::Result<Option<types::GetRepoStatusRes>> {
255
255
-
let res = client
256
256
-
.get(format!(
257
257
-
"{pds}xrpc/com.atproto.sync.getRepoStatus?did={repo}"
258
258
-
))
259
259
-
.send()
260
260
-
.await?;
261
261
-
262
262
-
if [StatusCode::NOT_FOUND, StatusCode::BAD_REQUEST].contains(&res.status()) {
263
263
-
return Ok(None);
264
264
-
}
265
265
-
266
266
-
Ok(res.json().await?)
267
330
}
268
331
269
332
#[derive(Debug, Default)]
+90
-66
crates/consumer/src/backfill/repo.rs
···
1
1
-
use super::{
2
2
-
types::{CarCommitEntry, CarEntry, CarRecordEntry},
3
3
-
CopyStore,
4
4
-
};
1
1
+
use super::{utils::header_to_int, CopyStore};
5
2
use crate::indexer::records;
6
3
use crate::indexer::types::{AggregateDeltaStore, RecordTypes};
7
4
use crate::utils::at_uri_is_by;
8
5
use crate::{db, indexer};
9
6
use deadpool_postgres::Transaction;
7
7
+
use futures::TryStreamExt;
10
8
use ipld_core::cid::Cid;
11
11
-
use iroh_car::CarReader;
12
9
use metrics::counter;
13
10
use parakeet_index::AggregateType;
14
11
use redis::aio::MultiplexedConnection;
12
12
+
use repo_stream::{Commit, DiskBuilder, Driver, DriverBuilder};
13
13
+
use reqwest::Client;
14
14
+
use serde::Deserialize;
15
15
use std::collections::HashMap;
16
16
use std::path::Path;
17
17
-
use tokio::io::BufReader;
17
17
+
use tokio_util::io::StreamReader;
18
18
19
19
type BackfillDeltaStore = HashMap<(String, i32), i32>;
20
20
21
21
-
pub async fn insert_repo(
21
21
+
#[derive(Debug, Deserialize)]
22
22
+
#[serde(untagged)]
23
23
+
pub enum CarRecordEntry {
24
24
+
Known(RecordTypes),
25
25
+
Other {
26
26
+
#[serde(rename = "$type")]
27
27
+
ty: String,
28
28
+
},
29
29
+
}
30
30
+
31
31
+
pub struct CompletedRepoData {
32
32
+
pub commit: Commit,
33
33
+
pub deltas: BackfillDeltaStore,
34
34
+
pub copies: CopyStore,
35
35
+
pub ratelimit_rem: Option<i32>,
36
36
+
pub ratelimit_reset: Option<i32>,
37
37
+
}
38
38
+
39
39
+
pub async fn process_repo(
22
40
t: &mut Transaction<'_>,
23
41
rc: &mut MultiplexedConnection,
42
42
+
client: &Client,
24
43
tmp_dir: &Path,
25
25
-
repo: &str,
26
26
-
) -> eyre::Result<(CarCommitEntry, BackfillDeltaStore, CopyStore)> {
27
27
-
let car = tokio::fs::File::open(tmp_dir.join(repo)).await?;
28
28
-
let mut car_stream = CarReader::new(BufReader::new(car)).await?;
44
44
+
pds: &str,
45
45
+
did: &str,
46
46
+
) -> eyre::Result<CompletedRepoData> {
47
47
+
let res = client
48
48
+
.get(format!("{pds}xrpc/com.atproto.sync.getRepo?did={did}"))
49
49
+
.send()
50
50
+
.await?
51
51
+
.error_for_status()?;
29
52
30
30
-
// the root should be the commit block
31
31
-
let root = car_stream.header().roots().first().cloned().unwrap();
53
53
+
let headers = res.headers();
54
54
+
let ratelimit_rem = header_to_int(headers, "ratelimit-remaining");
55
55
+
let ratelimit_reset = header_to_int(headers, "ratelimit-reset");
32
56
33
33
-
let mut commit = None;
34
34
-
let mut mst_nodes: HashMap<Cid, String> = HashMap::new();
35
35
-
let mut records: HashMap<Cid, RecordTypes> = HashMap::new();
57
57
+
let strm = res.bytes_stream().map_err(std::io::Error::other);
58
58
+
let reader = StreamReader::new(strm);
59
59
+
36
60
let mut deltas = HashMap::new();
37
61
let mut copies = CopyStore::default();
38
62
39
39
-
while let Some((cid, block)) = car_stream.next_block().await? {
40
40
-
let Ok(block) = serde_ipld_dagcbor::from_slice::<CarEntry>(&block) else {
41
41
-
tracing::warn!("failed to parse block {cid}");
42
42
-
continue;
43
43
-
};
63
63
+
let commit = match DriverBuilder::new().load_car(reader).await? {
64
64
+
Driver::Memory(commit, mut driver) => {
65
65
+
tracing::debug!("parsing with memory driver");
66
66
+
while let Some(chunk) = driver.next_chunk(256).await? {
67
67
+
for output in chunk {
68
68
+
let path = output.rkey;
69
69
+
let cid = output.cid;
44
70
45
45
-
if root == cid {
46
46
-
if let CarEntry::Commit(commit_entry) = block {
47
47
-
commit = Some(commit_entry);
48
48
-
} else {
49
49
-
tracing::warn!("root did not point to a commit entry");
50
50
-
}
51
51
-
continue;
52
52
-
}
71
71
+
let record = serde_ipld_dagcbor::from_slice::<CarRecordEntry>(&output.data);
72
72
+
let record = match record {
73
73
+
Ok(CarRecordEntry::Known(record)) => record,
74
74
+
Ok(CarRecordEntry::Other { ty }) => {
75
75
+
tracing::debug!("repo contains unknown record type: {ty} ({cid})");
76
76
+
continue;
77
77
+
}
78
78
+
Err(err) => {
79
79
+
tracing::warn!("failed to parse block {cid}: {err}");
80
80
+
continue;
81
81
+
}
82
82
+
};
53
83
54
54
-
match block {
55
55
-
CarEntry::Commit(_) => {
56
56
-
tracing::debug!("got commit entry that was not in root")
57
57
-
}
58
58
-
CarEntry::Record(CarRecordEntry::Known(record)) => {
59
59
-
if let Some(path) = mst_nodes.remove(&cid) {
60
60
-
record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?;
61
61
-
} else {
62
62
-
records.insert(cid, record);
84
84
+
record_index(t, rc, &mut copies, &mut deltas, did, &path, cid, record).await?;
63
85
}
64
86
}
65
65
-
CarEntry::Record(CarRecordEntry::Other { ty }) => {
66
66
-
tracing::debug!("repo contains unknown record type: {ty} ({cid})");
67
67
-
}
68
68
-
CarEntry::Mst(mst) => {
69
69
-
let mut out = Vec::with_capacity(mst.e.len());
70
87
71
71
-
for node in mst.e {
72
72
-
let ks = String::from_utf8_lossy(&node.k);
88
88
+
commit
89
89
+
}
90
90
+
Driver::Disk(paused) => {
91
91
+
tracing::debug!("parsing with disk driver");
92
92
+
let disk_store = DiskBuilder::new().open(tmp_dir.join(did)).await?;
93
93
+
let (commit, mut driver) = paused.finish_loading(disk_store).await?;
73
94
74
74
-
let key = if node.p == 0 {
75
75
-
ks.to_string()
76
76
-
} else {
77
77
-
let (_, prev): &(Cid, String) = out.last().unwrap();
78
78
-
let prefix = &prev[..node.p as usize];
95
95
+
while let Some(chunk) = driver.next_chunk(256).await? {
96
96
+
for output in chunk {
97
97
+
let path = output.rkey;
98
98
+
let cid = output.cid;
79
99
80
80
-
format!("{prefix}{ks}")
100
100
+
let record = serde_ipld_dagcbor::from_slice::<CarRecordEntry>(&output.data);
101
101
+
let record = match record {
102
102
+
Ok(CarRecordEntry::Known(record)) => record,
103
103
+
Ok(CarRecordEntry::Other { ty }) => {
104
104
+
tracing::debug!("repo contains unknown record type: {ty} ({cid})");
105
105
+
continue;
106
106
+
}
107
107
+
Err(err) => {
108
108
+
tracing::warn!("failed to parse block {cid}: {err}");
109
109
+
continue;
110
110
+
}
81
111
};
82
112
83
83
-
out.push((node.v, key.to_string()));
113
113
+
record_index(t, rc, &mut copies, &mut deltas, did, &path, cid, record).await?;
84
114
}
85
85
-
86
86
-
mst_nodes.extend(out);
87
115
}
88
88
-
}
89
89
-
}
90
116
91
91
-
for (cid, record) in records {
92
92
-
if let Some(path) = mst_nodes.remove(&cid) {
93
93
-
record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?;
94
94
-
} else {
95
95
-
tracing::warn!("couldn't find MST node for record {cid}")
117
117
+
commit
96
118
}
97
97
-
}
98
98
-
99
99
-
let Some(commit) = commit else {
100
100
-
eyre::bail!("repo contained no commit?");
101
119
};
102
120
103
103
-
Ok((commit, deltas, copies))
121
121
+
Ok(CompletedRepoData {
122
122
+
commit,
123
123
+
deltas,
124
124
+
copies,
125
125
+
ratelimit_rem,
126
126
+
ratelimit_reset,
127
127
+
})
104
128
}
105
129
106
130
async fn record_index(
-54
crates/consumer/src/backfill/types.rs
···
1
1
-
use crate::indexer::types::RecordTypes;
2
2
-
use ipld_core::cid::Cid;
3
3
-
use serde::Deserialize;
4
4
-
use serde_bytes::ByteBuf;
5
5
-
6
6
-
#[derive(Debug, Deserialize)]
7
7
-
#[serde(untagged)]
8
8
-
pub enum CarEntry {
9
9
-
Mst(CarMstEntry),
10
10
-
Commit(CarCommitEntry),
11
11
-
Record(CarRecordEntry),
12
12
-
}
13
13
-
14
14
-
#[derive(Debug, Deserialize)]
15
15
-
pub struct CarMstEntry {
16
16
-
pub l: Option<Cid>,
17
17
-
pub e: Vec<CarMstEntryNode>,
18
18
-
}
19
19
-
20
20
-
#[derive(Debug, Deserialize)]
21
21
-
pub struct CarMstEntryNode {
22
22
-
pub p: i32,
23
23
-
pub k: ByteBuf,
24
24
-
pub v: Cid,
25
25
-
pub t: Option<Cid>,
26
26
-
}
27
27
-
28
28
-
#[derive(Debug, Deserialize)]
29
29
-
pub struct CarCommitEntry {
30
30
-
pub did: String,
31
31
-
pub version: i32,
32
32
-
pub data: Cid,
33
33
-
pub rev: String,
34
34
-
pub prev: Option<Cid>,
35
35
-
pub sig: ByteBuf,
36
36
-
}
37
37
-
38
38
-
#[derive(Debug, Deserialize)]
39
39
-
#[serde(untagged)]
40
40
-
pub enum CarRecordEntry {
41
41
-
Known(RecordTypes),
42
42
-
Other {
43
43
-
#[serde(rename = "$type")]
44
44
-
ty: String,
45
45
-
},
46
46
-
}
47
47
-
48
48
-
#[derive(Debug, Deserialize)]
49
49
-
pub struct GetRepoStatusRes {
50
50
-
pub did: String,
51
51
-
pub active: bool,
52
52
-
pub status: Option<crate::firehose::AtpAccountStatus>,
53
53
-
pub rev: Option<String>,
54
54
-
}
+138
crates/consumer/src/backfill/utils.rs
···
1
1
+
use chrono::prelude::*;
2
2
+
use deadpool_postgres::Object;
3
3
+
use jacquard_common::types::{did::Did, string::Handle};
4
4
+
use jacquard_identity::{resolver::IdentityResolver, JacquardResolver};
5
5
+
use parakeet_db::types::ActorStatus;
6
6
+
use redis::{aio::MultiplexedConnection, AsyncTypedCommands};
7
7
+
use reqwest::{header::HeaderMap, Client, StatusCode};
8
8
+
use serde::Deserialize;
9
9
+
use std::time::Duration;
10
10
+
use tracing::instrument;
11
11
+
12
12
+
pub const BF_RESET_KEY: &str = "bf_ratelimit_reset";
13
13
+
pub const BF_REM_KEY: &str = "bf_ratelimit_rem";
14
14
+
15
15
+
#[derive(Debug, Deserialize)]
16
16
+
pub struct GetRepoStatusRes {
17
17
+
pub did: String,
18
18
+
pub active: bool,
19
19
+
pub status: Option<crate::firehose::AtpAccountStatus>,
20
20
+
pub rev: Option<String>,
21
21
+
}
22
22
+
23
23
+
pub async fn resolve_service(
24
24
+
resolver: &JacquardResolver,
25
25
+
did: &Did<'_>,
26
26
+
) -> jacquard_identity::resolver::Result<Option<(String, Option<Handle<'static>>)>> {
27
27
+
let doc = resolver.resolve_did_doc_owned(did).await?;
28
28
+
29
29
+
let Some(service) = doc.pds_endpoint() else {
30
30
+
return Ok(None);
31
31
+
};
32
32
+
let service = service.to_string();
33
33
+
34
34
+
let handle = doc.handles().first().cloned();
35
35
+
36
36
+
Ok(Some((service, handle)))
37
37
+
}
38
38
+
39
39
+
pub async fn resolve_and_set_handle(
40
40
+
conn: &mut Object,
41
41
+
resolver: &JacquardResolver,
42
42
+
did: &Did<'_>,
43
43
+
handle: Handle<'_>,
44
44
+
) -> eyre::Result<()> {
45
45
+
let handle_did = resolver.resolve_handle(&handle).await?;
46
46
+
if handle_did == Did::raw(did) {
47
47
+
conn.execute(
48
48
+
"UPDATE actors SET handle=$2 WHERE did=$1",
49
49
+
&[&did.as_str(), &handle.as_str()],
50
50
+
)
51
51
+
.await?;
52
52
+
} else {
53
53
+
tracing::warn!("requested DID ({did}) doesn't match handle");
54
54
+
}
55
55
+
56
56
+
Ok(())
57
57
+
}
58
58
+
59
59
+
// there's no ratelimit handling here because we pretty much always call download_car after.
60
60
+
#[instrument(skip(client, conn, pds))]
61
61
+
pub async fn check_and_update_repo_status(
62
62
+
client: &Client,
63
63
+
conn: &mut Object,
64
64
+
pds: &str,
65
65
+
repo: &str,
66
66
+
) -> eyre::Result<bool> {
67
67
+
match check_pds_repo_status(client, pds, repo).await? {
68
68
+
Some(status) if status.active => return Ok(true),
69
69
+
Some(status) => {
70
70
+
tracing::debug!("repo is inactive");
71
71
+
72
72
+
let status = status
73
73
+
.status
74
74
+
.unwrap_or(crate::firehose::AtpAccountStatus::Deleted);
75
75
+
conn.execute(
76
76
+
"UPDATE actors SET sync_state='dirty', status=$2 WHERE did=$1",
77
77
+
&[&repo, &ActorStatus::from(status)],
78
78
+
)
79
79
+
.await?;
80
80
+
}
81
81
+
None => {
82
82
+
tracing::debug!("repo was deleted");
83
83
+
84
84
+
conn.execute(
85
85
+
"UPDATE actors SET sync_state='dirty', status='deleted' WHERE did=$1",
86
86
+
&[&repo],
87
87
+
)
88
88
+
.await?;
89
89
+
}
90
90
+
}
91
91
+
92
92
+
Ok(false)
93
93
+
}
94
94
+
95
95
+
pub async fn check_pds_repo_status(
96
96
+
client: &Client,
97
97
+
pds: &str,
98
98
+
repo: &str,
99
99
+
) -> eyre::Result<Option<GetRepoStatusRes>> {
100
100
+
let res = client
101
101
+
.get(format!(
102
102
+
"{pds}xrpc/com.atproto.sync.getRepoStatus?did={repo}"
103
103
+
))
104
104
+
.send()
105
105
+
.await?;
106
106
+
107
107
+
if [StatusCode::NOT_FOUND, StatusCode::BAD_REQUEST].contains(&res.status()) {
108
108
+
return Ok(None);
109
109
+
}
110
110
+
111
111
+
Ok(res.json().await?)
112
112
+
}
113
113
+
114
114
+
pub async fn enforce_ratelimit(rc: &mut MultiplexedConnection, pds: &str) -> eyre::Result<()> {
115
115
+
let score = rc.zscore(BF_REM_KEY, pds).await?;
116
116
+
117
117
+
if let Some(rem) = score {
118
118
+
if (rem as i32) < 100 {
119
119
+
// if we've got None for some reason, just hope that the next req will contain the reset header.
120
120
+
if let Some(at) = rc.zscore(BF_RESET_KEY, pds).await? {
121
121
+
tracing::debug!("rate limit for {pds} resets at {at}");
122
122
+
let time = DateTime::from_timestamp(at as i64, 0).unwrap();
123
123
+
let delta = (time - Utc::now()).num_milliseconds().max(0);
124
124
+
125
125
+
tokio::time::sleep(Duration::from_millis(delta as u64)).await;
126
126
+
};
127
127
+
}
128
128
+
}
129
129
+
130
130
+
Ok(())
131
131
+
}
132
132
+
133
133
+
pub fn header_to_int(headers: &HeaderMap, name: &str) -> Option<i32> {
134
134
+
headers
135
135
+
.get(name)
136
136
+
.and_then(|v| v.to_str().ok())
137
137
+
.and_then(|v| v.parse().ok())
138
138
+
}
-12
crates/consumer/src/config.rs
···
70
70
pub workers: u8,
71
71
#[serde(default)]
72
72
pub skip_aggregation: bool,
73
73
-
#[serde(default = "default_download_workers")]
74
74
-
pub download_workers: usize,
75
75
-
#[serde(default = "default_download_buffer")]
76
76
-
pub download_buffer: usize,
77
73
pub download_tmp_dir: String,
78
74
}
79
75
···
88
84
fn default_indexer_workers() -> u8 {
89
85
4
90
86
}
91
91
-
92
92
-
fn default_download_workers() -> usize {
93
93
-
25
94
94
-
}
95
95
-
96
96
-
fn default_download_buffer() -> usize {
97
97
-
25_000
98
98
-
}