tangled
alpha
login
or
join now
parakeet.at
/
parakeet
63
fork
atom
Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview
atproto
bluesky
rust
appserver
63
fork
atom
overview
issues
12
pulls
pipelines
feat(consumer): Faster Backfill
mia.omg.lol
7 months ago
36c029c2
745ab2bd
+419
-128
8 changed files
expand all
collapse all
unified
split
Cargo.lock
consumer
Cargo.toml
src
backfill
downloader.rs
mod.rs
repo.rs
config.rs
indexer
mod.rs
main.rs
+25
Cargo.lock
···
752
752
"did-resolver",
753
753
"eyre",
754
754
"figment",
755
755
+
"flume",
755
756
"foldhash",
756
757
"futures",
757
758
"ipld-core",
···
1338
1339
version = "0.5.7"
1339
1340
source = "registry+https://github.com/rust-lang/crates.io-index"
1340
1341
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
1342
1342
+
1343
1343
+
[[package]]
1344
1344
+
name = "flume"
1345
1345
+
version = "0.11.1"
1346
1346
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1347
1347
+
checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
1348
1348
+
dependencies = [
1349
1349
+
"futures-core",
1350
1350
+
"futures-sink",
1351
1351
+
"nanorand",
1352
1352
+
"spin",
1353
1353
+
]
1341
1354
1342
1355
[[package]]
1343
1356
name = "fnv"
···
2527
2540
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
2528
2541
2529
2542
[[package]]
2543
2543
+
name = "nanorand"
2544
2544
+
version = "0.7.0"
2545
2545
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2546
2546
+
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
2547
2547
+
dependencies = [
2548
2548
+
"getrandom 0.2.15",
2549
2549
+
]
2550
2550
+
2551
2551
+
[[package]]
2530
2552
name = "native-tls"
2531
2553
version = "0.2.12"
2532
2554
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3972
3994
version = "0.9.8"
3973
3995
source = "registry+https://github.com/rust-lang/crates.io-index"
3974
3996
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
3997
3997
+
dependencies = [
3998
3998
+
"lock_api",
3999
3999
+
]
3975
4000
3976
4001
[[package]]
3977
4002
name = "spki"
+1
consumer/Cargo.toml
···
11
11
did-resolver = { path = "../did-resolver" }
12
12
eyre = "0.6.12"
13
13
figment = { version = "0.10.19", features = ["env", "toml"] }
14
14
+
flume = { version = "0.11", features = ["async"] }
14
15
foldhash = "0.1.4"
15
16
futures = "0.3.31"
16
17
ipld-core = "0.4.1"
+334
consumer/src/backfill/downloader.rs
···
1
1
+
use super::{DL_DONE_KEY, PDS_SERVICE_ID};
2
2
+
use crate::db;
3
3
+
use chrono::prelude::*;
4
4
+
use deadpool_postgres::{Client as PgClient, Pool};
5
5
+
use did_resolver::Resolver;
6
6
+
use futures::TryStreamExt;
7
7
+
use metrics::{counter, histogram};
8
8
+
use parakeet_db::types::{ActorStatus, ActorSyncState};
9
9
+
use redis::aio::MultiplexedConnection;
10
10
+
use redis::AsyncTypedCommands;
11
11
+
use reqwest::header::HeaderMap;
12
12
+
use reqwest::Client as HttpClient;
13
13
+
use std::path::{Path, PathBuf};
14
14
+
use std::sync::Arc;
15
15
+
use tokio::sync::watch::Receiver as WatchReceiver;
16
16
+
use tokio::time::{Duration, Instant};
17
17
+
use tokio_postgres::types::Type;
18
18
+
use tokio_util::io::StreamReader;
19
19
+
use tokio_util::task::TaskTracker;
20
20
+
use tracing::instrument;
21
21
+
22
22
+
const BF_RESET_KEY: &str = "bf_download_ratelimit_reset";
23
23
+
const BF_REM_KEY: &str = "bf_download_ratelimit_rem";
24
24
+
const DL_DUP_KEY: &str = "bf_downloaded";
25
25
+
26
26
+
pub async fn downloader(
27
27
+
mut rc: MultiplexedConnection,
28
28
+
pool: Pool,
29
29
+
resolver: Arc<Resolver>,
30
30
+
tmp_dir: PathBuf,
31
31
+
concurrency: usize,
32
32
+
buffer: usize,
33
33
+
tracker: TaskTracker,
34
34
+
stop: WatchReceiver<bool>,
35
35
+
) {
36
36
+
let (tx, rx) = flume::bounded(64);
37
37
+
let mut conn = pool.get().await.unwrap();
38
38
+
39
39
+
let http = HttpClient::new();
40
40
+
41
41
+
for _ in 0..concurrency {
42
42
+
tracker.spawn(download_thread(
43
43
+
rc.clone(),
44
44
+
pool.clone(),
45
45
+
resolver.clone(),
46
46
+
http.clone(),
47
47
+
rx.clone(),
48
48
+
tmp_dir.clone(),
49
49
+
));
50
50
+
}
51
51
+
52
52
+
let status_stmt = conn.prepare_typed_cached(
53
53
+
"INSERT INTO actors (did, sync_state, last_indexed) VALUES ($1, 'processing', NOW()) ON CONFLICT (did) DO UPDATE SET sync_state = 'processing', last_indexed=NOW()",
54
54
+
&[Type::TEXT]
55
55
+
).await.unwrap();
56
56
+
57
57
+
loop {
58
58
+
if stop.has_changed().unwrap_or(true) {
59
59
+
tracing::info!("stopping downloader");
60
60
+
break;
61
61
+
}
62
62
+
63
63
+
if let Ok(count) = rc.llen(DL_DONE_KEY).await {
64
64
+
if count > buffer {
65
65
+
tracing::info!("waiting due to full buffer");
66
66
+
tokio::time::sleep(Duration::from_secs(5)).await;
67
67
+
continue;
68
68
+
}
69
69
+
}
70
70
+
71
71
+
let did: String = match rc.lpop("backfill_queue", None).await {
72
72
+
Ok(Some(did)) => did,
73
73
+
Ok(None) => {
74
74
+
tokio::time::sleep(Duration::from_millis(250)).await;
75
75
+
continue;
76
76
+
}
77
77
+
Err(e) => {
78
78
+
tracing::error!("failed to get item from backfill queue: {e}");
79
79
+
continue;
80
80
+
}
81
81
+
};
82
82
+
83
83
+
tracing::trace!("resolving repo {did}");
84
84
+
85
85
+
// has the repo already been downloaded?
86
86
+
if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() {
87
87
+
tracing::warn!("skipping duplicate repo {did}");
88
88
+
continue;
89
89
+
}
90
90
+
91
91
+
// check if they're already synced in DB too
92
92
+
match db::actor_get_statuses(&mut conn, &did).await {
93
93
+
Ok(Some((_, state))) => {
94
94
+
if state == ActorSyncState::Synced || state == ActorSyncState::Processing {
95
95
+
tracing::warn!("skipping duplicate repo {did}");
96
96
+
continue;
97
97
+
}
98
98
+
}
99
99
+
Ok(None) => {}
100
100
+
Err(e) => {
101
101
+
tracing::error!(did, "failed to check current repo status: {e}");
102
102
+
db::backfill_job_write(&mut conn, &did, "failed.resolve")
103
103
+
.await
104
104
+
.unwrap();
105
105
+
}
106
106
+
}
107
107
+
108
108
+
match resolver.resolve_did(&did).await {
109
109
+
Ok(Some(did_doc)) => {
110
110
+
let Some(service) = did_doc.find_service_by_id(PDS_SERVICE_ID) else {
111
111
+
tracing::warn!("bad DID doc for {did}");
112
112
+
db::backfill_job_write(&mut conn, &did, "failed.resolve")
113
113
+
.await
114
114
+
.unwrap();
115
115
+
continue;
116
116
+
};
117
117
+
let service = service.service_endpoint.clone();
118
118
+
119
119
+
// set the repo to processing
120
120
+
if let Err(e) = conn.execute(&status_stmt, &[&did]).await {
121
121
+
tracing::error!("failed to update repo status for {did}: {e}");
122
122
+
continue;
123
123
+
}
124
124
+
125
125
+
let handle = did_doc
126
126
+
.also_known_as
127
127
+
.and_then(|akas| akas.first().map(|v| v[5..].to_owned()));
128
128
+
129
129
+
tracing::trace!("resolved repo {did} {service}");
130
130
+
if let Err(e) = tx.send_async((service, did, handle)).await {
131
131
+
tracing::error!("failed to send: {e}");
132
132
+
}
133
133
+
}
134
134
+
Ok(None) => {
135
135
+
tracing::warn!(did, "bad DID doc");
136
136
+
db::backfill_job_write(&mut conn, &did, "failed.resolve")
137
137
+
.await
138
138
+
.unwrap();
139
139
+
}
140
140
+
Err(e) => {
141
141
+
tracing::error!(did, "failed to resolve DID doc: {e}");
142
142
+
db::backfill_job_write(&mut conn, &did, "failed.resolve")
143
143
+
.await
144
144
+
.unwrap();
145
145
+
}
146
146
+
}
147
147
+
}
148
148
+
}
149
149
+
150
150
+
async fn download_thread(
151
151
+
mut rc: MultiplexedConnection,
152
152
+
pool: Pool,
153
153
+
resolver: Arc<Resolver>,
154
154
+
http: reqwest::Client,
155
155
+
rx: flume::Receiver<(String, String, Option<String>)>,
156
156
+
tmp_dir: PathBuf,
157
157
+
) {
158
158
+
tracing::debug!("spawning thread");
159
159
+
160
160
+
// this will return Err(_) and exit when all senders (only held above) are dropped
161
161
+
while let Ok((pds, did, maybe_handle)) = rx.recv_async().await {
162
162
+
if let Err(e) = enforce_ratelimit(&mut rc, &pds).await {
163
163
+
tracing::error!("ratelimiter error: {e}");
164
164
+
continue;
165
165
+
};
166
166
+
167
167
+
{
168
168
+
tracing::trace!("getting DB conn...");
169
169
+
let mut conn = pool.get().await.unwrap();
170
170
+
tracing::trace!("got DB conn...");
171
171
+
match check_and_update_repo_status(&http, &mut conn, &pds, &did).await {
172
172
+
Ok(true) => {}
173
173
+
Ok(false) => continue,
174
174
+
Err(e) => {
175
175
+
tracing::error!(pds, did, "failed to check repo status: {e}");
176
176
+
db::backfill_job_write(&mut conn, &did, "failed.resolve")
177
177
+
.await
178
178
+
.unwrap();
179
179
+
continue;
180
180
+
}
181
181
+
}
182
182
+
183
183
+
tracing::debug!("trying to resolve handle...");
184
184
+
if let Some(handle) = maybe_handle {
185
185
+
if let Err(e) = resolve_and_set_handle(&conn, &resolver, &did, &handle).await {
186
186
+
tracing::error!(pds, did, "failed to resolve handle: {e}");
187
187
+
db::backfill_job_write(&mut conn, &did, "failed.resolve")
188
188
+
.await
189
189
+
.unwrap();
190
190
+
}
191
191
+
}
192
192
+
}
193
193
+
194
194
+
let start = Instant::now();
195
195
+
196
196
+
tracing::trace!("downloading repo {did}");
197
197
+
198
198
+
match download_car(&http, &tmp_dir, &pds, &did).await {
199
199
+
Ok(Some((rem, reset))) => {
200
200
+
let _ = rc.zadd(BF_REM_KEY, &pds, rem).await;
201
201
+
let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await;
202
202
+
}
203
203
+
Ok(_) => tracing::warn!(pds, "got response with no ratelimit headers."),
204
204
+
Err(e) => {
205
205
+
tracing::error!(pds, did, "failed to download repo: {e}");
206
206
+
continue;
207
207
+
}
208
208
+
}
209
209
+
210
210
+
histogram!("backfill_download_dur", "pds" => pds).record(start.elapsed().as_secs_f64());
211
211
+
212
212
+
let _ = rc.sadd(DL_DUP_KEY, &did).await;
213
213
+
if let Err(e) = rc.rpush(DL_DONE_KEY, &did).await {
214
214
+
tracing::error!(did, "failed to mark download complete: {e}");
215
215
+
} else {
216
216
+
counter!("backfill_downloaded").increment(1);
217
217
+
}
218
218
+
}
219
219
+
220
220
+
tracing::debug!("thread exiting");
221
221
+
}
222
222
+
223
223
+
async fn enforce_ratelimit(rc: &mut MultiplexedConnection, pds: &str) -> eyre::Result<()> {
224
224
+
let score = rc.zscore(BF_REM_KEY, pds).await?;
225
225
+
226
226
+
if let Some(rem) = score {
227
227
+
if (rem as i32) < 100 {
228
228
+
// if we've got None for some reason, just hope that the next req will contain the reset header.
229
229
+
if let Some(at) = rc.zscore(BF_RESET_KEY, pds).await? {
230
230
+
tracing::debug!("rate limit for {pds} resets at {at}");
231
231
+
let time = chrono::DateTime::from_timestamp(at as i64, 0).unwrap();
232
232
+
let delta = (time - Utc::now()).num_milliseconds().max(0);
233
233
+
234
234
+
tokio::time::sleep(Duration::from_millis(delta as u64)).await;
235
235
+
};
236
236
+
}
237
237
+
}
238
238
+
239
239
+
Ok(())
240
240
+
}
241
241
+
242
242
+
// you wouldn't...
243
243
+
#[instrument(skip(http, tmp_dir, pds))]
244
244
+
async fn download_car(
245
245
+
http: &HttpClient,
246
246
+
tmp_dir: &Path,
247
247
+
pds: &str,
248
248
+
did: &str,
249
249
+
) -> eyre::Result<Option<(i32, i32)>> {
250
250
+
let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?;
251
251
+
252
252
+
let res = http
253
253
+
.get(format!("{pds}/xrpc/com.atproto.sync.getRepo?did={did}"))
254
254
+
.send()
255
255
+
.await?
256
256
+
.error_for_status()?;
257
257
+
258
258
+
let headers = res.headers();
259
259
+
let ratelimit_rem = header_to_int(headers, "ratelimit-remaining");
260
260
+
let ratelimit_reset = header_to_int(headers, "ratelimit-reset");
261
261
+
262
262
+
let strm = res.bytes_stream().map_err(std::io::Error::other);
263
263
+
let mut reader = StreamReader::new(strm);
264
264
+
265
265
+
tokio::io::copy(&mut reader, &mut file).await?;
266
266
+
267
267
+
Ok(ratelimit_rem.zip(ratelimit_reset))
268
268
+
}
269
269
+
270
270
+
// there's no ratelimit handling here because we pretty much always call download_car after.
271
271
+
#[instrument(skip(http, conn, pds))]
272
272
+
async fn check_and_update_repo_status(
273
273
+
http: &HttpClient,
274
274
+
conn: &mut PgClient,
275
275
+
pds: &str,
276
276
+
repo: &str,
277
277
+
) -> eyre::Result<bool> {
278
278
+
match super::check_pds_repo_status(http, pds, repo).await? {
279
279
+
Some(status) => {
280
280
+
if !status.active {
281
281
+
tracing::debug!("repo is inactive");
282
282
+
283
283
+
let status = status
284
284
+
.status
285
285
+
.unwrap_or(crate::firehose::AtpAccountStatus::Deleted);
286
286
+
conn.execute(
287
287
+
"UPDATE actors SET sync_state='dirty', status=$2 WHERE did=$1",
288
288
+
&[&repo, &ActorStatus::from(status)],
289
289
+
)
290
290
+
.await?;
291
291
+
292
292
+
Ok(false)
293
293
+
} else {
294
294
+
Ok(true)
295
295
+
}
296
296
+
}
297
297
+
None => {
298
298
+
// this repo can't be found - set dirty and assume deleted.
299
299
+
tracing::debug!("repo was deleted");
300
300
+
conn.execute(
301
301
+
"UPDATE actors SET sync_state='dirty', status='deleted' WHERE did=$1",
302
302
+
&[&repo],
303
303
+
)
304
304
+
.await?;
305
305
+
306
306
+
Ok(false)
307
307
+
}
308
308
+
}
309
309
+
}
310
310
+
311
311
+
async fn resolve_and_set_handle(
312
312
+
conn: &PgClient,
313
313
+
resolver: &Resolver,
314
314
+
did: &str,
315
315
+
handle: &str,
316
316
+
) -> eyre::Result<()> {
317
317
+
if let Some(handle_did) = resolver.resolve_handle(handle).await? {
318
318
+
if handle_did == did {
319
319
+
conn.execute("UPDATE actors SET handle=$2 WHERE did=$1", &[&did, &handle])
320
320
+
.await?;
321
321
+
} else {
322
322
+
tracing::warn!("requested DID ({did}) doesn't match handle");
323
323
+
}
324
324
+
}
325
325
+
326
326
+
Ok(())
327
327
+
}
328
328
+
329
329
+
fn header_to_int(headers: &HeaderMap, name: &str) -> Option<i32> {
330
330
+
headers
331
331
+
.get(name)
332
332
+
.and_then(|v| v.to_str().ok())
333
333
+
.and_then(|v| v.parse().ok())
334
334
+
}
+29
-105
consumer/src/backfill/mod.rs
···
9
9
use metrics::counter;
10
10
use parakeet_db::types::{ActorStatus, ActorSyncState};
11
11
use redis::aio::MultiplexedConnection;
12
12
-
use redis::{AsyncCommands, Direction};
12
12
+
use redis::AsyncTypedCommands;
13
13
use reqwest::{Client, StatusCode};
14
14
+
use std::path::PathBuf;
14
15
use std::str::FromStr;
15
16
use std::sync::Arc;
16
17
use tokio::sync::watch::Receiver as WatchReceiver;
···
18
19
use tokio_util::task::TaskTracker;
19
20
use tracing::instrument;
20
21
22
22
+
mod downloader;
21
23
mod repo;
22
24
mod types;
23
25
26
26
+
const DL_DONE_KEY: &str = "bf_download_complete";
24
27
const PDS_SERVICE_ID: &str = "#atproto_pds";
25
28
// There's a 4MiB limit on parakeet-index, so break delta batches up if there's loads.
26
29
// this should be plenty low enough to not trigger the size limit. (59k did slightly)
···
28
31
29
32
#[derive(Clone)]
30
33
pub struct BackfillManagerInner {
31
31
-
resolver: Arc<Resolver>,
32
32
-
client: Client,
33
34
index_client: Option<parakeet_index::Client>,
34
34
-
opts: BackfillConfig,
35
35
+
tmp_dir: PathBuf,
35
36
}
36
37
37
38
pub struct BackfillManager {
38
39
pool: Pool,
39
40
redis: MultiplexedConnection,
41
41
+
resolver: Arc<Resolver>,
40
42
semaphore: Arc<Semaphore>,
43
43
+
opts: BackfillConfig,
41
44
inner: BackfillManagerInner,
42
45
}
43
46
···
49
52
index_client: Option<parakeet_index::Client>,
50
53
opts: BackfillConfig,
51
54
) -> eyre::Result<Self> {
52
52
-
let client = Client::builder().brotli(true).build()?;
53
55
let semaphore = Arc::new(Semaphore::new(opts.backfill_workers as usize));
54
56
55
57
Ok(BackfillManager {
56
58
pool,
57
59
redis,
60
60
+
resolver,
58
61
semaphore,
59
62
inner: BackfillManagerInner {
60
60
-
resolver,
61
61
-
client,
62
63
index_client,
63
63
-
opts,
64
64
+
tmp_dir: PathBuf::from_str(&opts.download_tmp_dir)?,
64
65
},
66
66
+
opts,
65
67
})
66
68
}
67
69
68
70
pub async fn run(mut self, stop: WatchReceiver<bool>) -> eyre::Result<()> {
69
71
let tracker = TaskTracker::new();
70
72
73
73
+
tracker.spawn(downloader::downloader(
74
74
+
self.redis.clone(),
75
75
+
self.pool.clone(),
76
76
+
self.resolver,
77
77
+
self.inner.tmp_dir.clone(),
78
78
+
self.opts.download_workers,
79
79
+
self.opts.download_buffer,
80
80
+
tracker.clone(),
81
81
+
stop.clone(),
82
82
+
));
83
83
+
71
84
loop {
72
85
if stop.has_changed().unwrap_or(true) {
73
86
tracker.close();
87
87
+
tracing::info!("stopping backfiller");
74
88
break;
75
89
}
76
90
77
77
-
let Some(job) = self
78
78
-
.redis
79
79
-
.lmove::<_, _, Option<String>>(
80
80
-
"backfill_queue",
81
81
-
"backfill_processing",
82
82
-
Direction::Left,
83
83
-
Direction::Right,
84
84
-
)
85
85
-
.await?
86
86
-
else {
91
91
+
let Some(job): Option<String> = self.redis.lpop(DL_DONE_KEY, None).await? else {
87
92
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
88
93
continue;
89
94
};
···
92
97
93
98
let mut inner = self.inner.clone();
94
99
let mut conn = self.pool.get().await?;
95
95
-
let mut redis = self.redis.clone();
96
100
97
101
tracker.spawn(async move {
98
102
let _p = p;
···
102
106
tracing::error!(did = &job, "backfill failed: {e}");
103
107
counter!("backfill_failure").increment(1);
104
108
105
105
-
db::backfill_job_write(&mut conn, &job, "failed")
109
109
+
db::backfill_job_write(&mut conn, &job, "failed.write")
106
110
.await
107
111
.unwrap();
108
112
} else {
···
113
117
.unwrap();
114
118
}
115
119
116
116
-
redis
117
117
-
.lrem::<_, _, i32>("backfill_processing", 1, &job)
118
118
-
.await
119
119
-
.unwrap();
120
120
+
if let Err(e) = tokio::fs::remove_file(inner.tmp_dir.join(&job)).await {
121
121
+
tracing::error!(did = &job, "failed to remove file: {e}");
122
122
+
}
120
123
});
121
124
}
122
125
···
132
135
inner: &mut BackfillManagerInner,
133
136
did: &str,
134
137
) -> eyre::Result<()> {
135
135
-
let Some((status, sync_state)) = db::actor_get_statuses(conn, did).await? else {
136
136
-
tracing::error!("skipping backfill on unknown repo");
137
137
-
return Ok(());
138
138
-
};
139
139
-
140
140
-
if sync_state != ActorSyncState::Dirty || status != ActorStatus::Active {
141
141
-
tracing::debug!("skipping non-dirty or inactive repo");
142
142
-
return Ok(());
143
143
-
}
144
144
-
145
145
-
// resolve the did to a PDS (also validates the handle)
146
146
-
let Some(did_doc) = inner.resolver.resolve_did(did).await? else {
147
147
-
eyre::bail!("missing did doc");
148
148
-
};
149
149
-
150
150
-
let Some(service) = did_doc.find_service_by_id(PDS_SERVICE_ID) else {
151
151
-
eyre::bail!("DID doc contained no service endpoint");
152
152
-
};
153
153
-
154
154
-
let pds_url = service.service_endpoint.clone();
155
155
-
156
156
-
// check the repo status before we attempt to resolve the handle. There's a case where we can't
157
157
-
// resolve the handle in the DID doc because the acc is already deleted.
158
158
-
let Some(repo_status) = check_pds_repo_status(&inner.client, &pds_url, did).await? else {
159
159
-
// this repo can't be found - set dirty and assume deleted.
160
160
-
tracing::debug!("repo was deleted");
161
161
-
db::actor_upsert(
162
162
-
conn,
163
163
-
did,
164
164
-
ActorStatus::Deleted,
165
165
-
ActorSyncState::Dirty,
166
166
-
Utc::now(),
167
167
-
)
168
168
-
.await?;
169
169
-
return Ok(());
170
170
-
};
171
171
-
172
172
-
if !repo_status.active {
173
173
-
tracing::debug!("repo is inactive");
174
174
-
let status = repo_status
175
175
-
.status
176
176
-
.unwrap_or(crate::firehose::AtpAccountStatus::Deleted);
177
177
-
db::actor_upsert(conn, did, status.into(), ActorSyncState::Dirty, Utc::now()).await?;
178
178
-
return Ok(());
179
179
-
}
180
180
-
181
181
-
if !inner.opts.skip_handle_validation {
182
182
-
// at this point, the account will be active and we can attempt to resolve the handle.
183
183
-
let Some(handle) = did_doc
184
184
-
.also_known_as
185
185
-
.and_then(|aka| aka.first().cloned())
186
186
-
.and_then(|handle| handle.strip_prefix("at://").map(String::from))
187
187
-
else {
188
188
-
eyre::bail!("DID doc contained no handle");
189
189
-
};
190
190
-
191
191
-
// in theory, we can use com.atproto.identity.resolveHandle against a PDS, but that seems
192
192
-
// like a way to end up with really sus handles.
193
193
-
if let Some(handle_did) = inner.resolver.resolve_handle(&handle).await? {
194
194
-
if handle_did != did {
195
195
-
tracing::warn!("requested DID doesn't match handle");
196
196
-
} else {
197
197
-
// set the handle from above
198
198
-
db::actor_upsert_handle(
199
199
-
conn,
200
200
-
did,
201
201
-
ActorSyncState::Processing,
202
202
-
Some(handle),
203
203
-
Utc::now(),
204
204
-
)
205
205
-
.await?;
206
206
-
}
207
207
-
}
208
208
-
}
209
209
-
210
210
-
// now we can start actually backfilling
211
211
-
db::actor_set_sync_status(conn, did, ActorSyncState::Processing, Utc::now()).await?;
212
212
-
213
138
let mut t = conn.transaction().await?;
214
139
t.execute("SET CONSTRAINTS ALL DEFERRED", &[]).await?;
215
140
216
216
-
tracing::trace!("pulling repo");
141
141
+
tracing::trace!("loading repo");
217
142
218
218
-
let (commit, mut deltas, copies) =
219
219
-
repo::stream_and_insert_repo(&mut t, &inner.client, did, &pds_url).await?;
143
143
+
let (commit, mut deltas, copies) = repo::insert_repo(&mut t, &inner.tmp_dir, did).await?;
220
144
221
145
db::actor_set_repo_state(&mut t, did, &commit.rev, commit.data).await?;
222
146
+5
-18
consumer/src/backfill/repo.rs
···
6
6
use crate::indexer::types::{AggregateDeltaStore, RecordTypes};
7
7
use crate::{db, indexer};
8
8
use deadpool_postgres::Transaction;
9
9
-
use futures::TryStreamExt;
10
9
use ipld_core::cid::Cid;
11
10
use iroh_car::CarReader;
12
11
use metrics::counter;
13
12
use parakeet_index::AggregateType;
14
14
-
use reqwest::Client;
15
13
use std::collections::HashMap;
16
16
-
use std::io::ErrorKind;
14
14
+
use std::path::Path;
17
15
use tokio::io::BufReader;
18
18
-
use tokio_util::io::StreamReader;
19
16
20
17
type BackfillDeltaStore = HashMap<(String, i32), i32>;
21
18
22
22
-
pub async fn stream_and_insert_repo(
19
19
+
pub async fn insert_repo(
23
20
t: &mut Transaction<'_>,
24
24
-
client: &Client,
21
21
+
tmp_dir: &Path,
25
22
repo: &str,
26
26
-
pds: &str,
27
23
) -> eyre::Result<(CarCommitEntry, BackfillDeltaStore, CopyStore)> {
28
28
-
let res = client
29
29
-
.get(format!("{pds}/xrpc/com.atproto.sync.getRepo?did={repo}"))
30
30
-
.send()
31
31
-
.await?
32
32
-
.error_for_status()?;
33
33
-
34
34
-
let strm = res
35
35
-
.bytes_stream()
36
36
-
.map_err(|err| std::io::Error::new(ErrorKind::Other, err));
37
37
-
let reader = StreamReader::new(strm);
38
38
-
let mut car_stream = CarReader::new(BufReader::new(reader)).await?;
24
24
+
let car = tokio::fs::File::open(tmp_dir.join(repo)).await?;
25
25
+
let mut car_stream = CarReader::new(BufReader::new(car)).await?;
39
26
40
27
// the root should be the commit block
41
28
let root = car_stream.header().roots().first().cloned().unwrap();
+16
-2
consumer/src/config.rs
···
40
40
/// You can use this to move handle resolution out of event handling and into another place.
41
41
#[serde(default)]
42
42
pub skip_handle_validation: bool,
43
43
+
/// Whether to submit backfill requests for new repos. (Only when history_mode == BackfillHistory).
44
44
+
#[serde(default)]
45
45
+
pub request_backfill: bool,
43
46
}
44
47
45
48
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Deserialize)]
···
57
60
pub backfill_workers: u8,
58
61
#[serde(default)]
59
62
pub skip_aggregation: bool,
60
60
-
#[serde(default)]
61
61
-
pub skip_handle_validation: bool,
63
63
+
#[serde(default = "default_download_workers")]
64
64
+
pub download_workers: usize,
65
65
+
#[serde(default = "default_download_buffer")]
66
66
+
pub download_buffer: usize,
67
67
+
pub download_tmp_dir: String,
62
68
}
63
69
64
70
fn default_backfill_workers() -> u8 {
···
68
74
fn default_indexer_workers() -> u8 {
69
75
4
70
76
}
77
77
+
78
78
+
fn default_download_workers() -> usize {
79
79
+
25
80
80
+
}
81
81
+
82
82
+
fn default_download_buffer() -> usize {
83
83
+
25_000
84
84
+
}
+8
-3
consumer/src/indexer/mod.rs
···
30
30
pub struct RelayIndexerOpts {
31
31
pub history_mode: HistoryMode,
32
32
pub skip_handle_validation: bool,
33
33
+
pub request_backfill: bool,
33
34
}
34
35
35
36
#[derive(Clone)]
···
38
39
resolver: Arc<Resolver>,
39
40
do_backfill: bool,
40
41
do_handle_res: bool,
42
42
+
req_backfill: bool,
41
43
}
42
44
43
45
pub struct RelayIndexer {
···
66
68
state: RelayIndexerState {
67
69
resolver,
68
70
do_backfill: opts.history_mode == HistoryMode::BackfillHistory,
71
71
+
req_backfill: opts.request_backfill,
69
72
do_handle_res: !opts.skip_handle_validation,
70
73
idxc_tx,
71
74
},
···
275
278
.map(ActorStatus::from)
276
279
.unwrap_or(ActorStatus::Active);
277
280
278
278
-
let trigger_bf = if state.do_backfill && status == ActorStatus::Active {
281
281
+
let trigger_bf = if state.do_backfill && state.req_backfill && status == ActorStatus::Active {
279
282
// check old status - if they exist (Some(*)), AND were previously != Active but not Deleted,
280
283
// AND have a rev == null, then trigger backfill.
281
284
db::actor_get_status_and_rev(conn, &account.did)
···
325
328
// backfill for them and they can be marked active and indexed normally.
326
329
// TODO: bridgy doesn't implement since atm - we need a special case
327
330
if commit.since.is_some() {
328
328
-
if state.do_backfill {
331
331
+
if state.do_backfill && state.req_backfill {
329
332
rc.rpush::<_, _, i32>("backfill_queue", commit.repo).await?;
330
333
}
331
334
return Ok(());
···
356
359
.await?;
357
360
358
361
if trigger_backfill {
359
359
-
rc.rpush::<_, _, i32>("backfill_queue", commit.repo).await?;
362
362
+
if state.req_backfill {
363
363
+
rc.rpush::<_, _, i32>("backfill_queue", commit.repo).await?;
364
364
+
}
360
365
return Ok(());
361
366
}
362
367
+1
consumer/src/main.rs
···
115
115
let indexer_opts = indexer::RelayIndexerOpts {
116
116
history_mode: indexer_cfg.history_mode,
117
117
skip_handle_validation: indexer_cfg.skip_handle_validation,
118
118
+
request_backfill: indexer_cfg.request_backfill,
118
119
};
119
120
120
121
let relay_indexer = indexer::RelayIndexer::new(