+16
.sqlx/query-297e5495004fa601f86b3ada9e512815d4b7d73aacf3f3654628c93e5db8b791.json
+16
.sqlx/query-297e5495004fa601f86b3ada9e512815d4b7d73aacf3f3654628c93e5db8b791.json
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "\n INSERT INTO record_blobs (repo_id, record_uri, blob_cid)\n VALUES ($1, $2, $3)\n ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING\n ",
4
+
"describe": {
5
+
"columns": [],
6
+
"parameters": {
7
+
"Left": [
8
+
"Uuid",
9
+
"Text",
10
+
"Text"
11
+
]
12
+
},
13
+
"nullable": []
14
+
},
15
+
"hash": "297e5495004fa601f86b3ada9e512815d4b7d73aacf3f3654628c93e5db8b791"
16
+
}
-23
.sqlx/query-423bbfd2ddf9b41d3bb339b8b94ac47524dc9233ec70cf2b6c5e9bc2de49b22d.json
-23
.sqlx/query-423bbfd2ddf9b41d3bb339b8b94ac47524dc9233ec70cf2b6c5e9bc2de49b22d.json
···
1
-
{
2
-
"db_name": "PostgreSQL",
3
-
"query": "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2",
4
-
"describe": {
5
-
"columns": [
6
-
{
7
-
"ordinal": 0,
8
-
"name": "one",
9
-
"type_info": "Int4"
10
-
}
11
-
],
12
-
"parameters": {
13
-
"Left": [
14
-
"Text",
15
-
"Uuid"
16
-
]
17
-
},
18
-
"nullable": [
19
-
null
20
-
]
21
-
},
22
-
"hash": "423bbfd2ddf9b41d3bb339b8b94ac47524dc9233ec70cf2b6c5e9bc2de49b22d"
23
-
}
-28
.sqlx/query-49f01f438353a771fd42473fee5090f68e0083610d07e609825d528ef58ade1f.json
-28
.sqlx/query-49f01f438353a771fd42473fee5090f68e0083610d07e609825d528ef58ade1f.json
···
1
-
{
2
-
"db_name": "PostgreSQL",
3
-
"query": "SELECT r.repo_root_cid, r.repo_rev FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
4
-
"describe": {
5
-
"columns": [
6
-
{
7
-
"ordinal": 0,
8
-
"name": "repo_root_cid",
9
-
"type_info": "Text"
10
-
},
11
-
{
12
-
"ordinal": 1,
13
-
"name": "repo_rev",
14
-
"type_info": "Text"
15
-
}
16
-
],
17
-
"parameters": {
18
-
"Left": [
19
-
"Text"
20
-
]
21
-
},
22
-
"nullable": [
23
-
false,
24
-
true
25
-
]
26
-
},
27
-
"hash": "49f01f438353a771fd42473fee5090f68e0083610d07e609825d528ef58ade1f"
28
-
}
+30
.sqlx/query-6f88c5e63c1beb47733daed5295492d59c649a35ef78414c62dcdf4d0b2a3115.json
+30
.sqlx/query-6f88c5e63c1beb47733daed5295492d59c649a35ef78414c62dcdf4d0b2a3115.json
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "\n SELECT rb.blob_cid, rb.record_uri\n FROM record_blobs rb\n LEFT JOIN blobs b ON rb.blob_cid = b.cid AND b.created_by_user = rb.repo_id\n WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2\n ORDER BY rb.blob_cid\n LIMIT $3\n ",
4
+
"describe": {
5
+
"columns": [
6
+
{
7
+
"ordinal": 0,
8
+
"name": "blob_cid",
9
+
"type_info": "Text"
10
+
},
11
+
{
12
+
"ordinal": 1,
13
+
"name": "record_uri",
14
+
"type_info": "Text"
15
+
}
16
+
],
17
+
"parameters": {
18
+
"Left": [
19
+
"Uuid",
20
+
"Text",
21
+
"Int8"
22
+
]
23
+
},
24
+
"nullable": [
25
+
false,
26
+
false
27
+
]
28
+
},
29
+
"hash": "6f88c5e63c1beb47733daed5295492d59c649a35ef78414c62dcdf4d0b2a3115"
30
+
}
-22
.sqlx/query-95165c49f57bb8e130bf1d8444566e1f9521777f994573a4f3cdee809fb63fd7.json
-22
.sqlx/query-95165c49f57bb8e130bf1d8444566e1f9521777f994573a4f3cdee809fb63fd7.json
···
1
-
{
2
-
"db_name": "PostgreSQL",
3
-
"query": "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq",
4
-
"describe": {
5
-
"columns": [
6
-
{
7
-
"ordinal": 0,
8
-
"name": "seq",
9
-
"type_info": "Int8"
10
-
}
11
-
],
12
-
"parameters": {
13
-
"Left": [
14
-
"Text"
15
-
]
16
-
},
17
-
"nullable": [
18
-
false
19
-
]
20
-
},
21
-
"hash": "95165c49f57bb8e130bf1d8444566e1f9521777f994573a4f3cdee809fb63fd7"
22
-
}
+22
.sqlx/query-9f993908f6ab139a1a8c6f75a1147e6ee6ceac794350fc4543bb93e62748ced2.json
+22
.sqlx/query-9f993908f6ab139a1a8c6f75a1147e6ee6ceac794350fc4543bb93e62748ced2.json
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1",
4
+
"describe": {
5
+
"columns": [
6
+
{
7
+
"ordinal": 0,
8
+
"name": "count",
9
+
"type_info": "Int8"
10
+
}
11
+
],
12
+
"parameters": {
13
+
"Left": [
14
+
"Uuid"
15
+
]
16
+
},
17
+
"nullable": [
18
+
null
19
+
]
20
+
},
21
+
"hash": "9f993908f6ab139a1a8c6f75a1147e6ee6ceac794350fc4543bb93e62748ced2"
22
+
}
+23
.sqlx/query-c4a99ff3485bfe5971b2a2c4144097ec168f9feb8c2500d5d4693c94ff6dbc75.json
+23
.sqlx/query-c4a99ff3485bfe5971b2a2c4144097ec168f9feb8c2500d5d4693c94ff6dbc75.json
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "INSERT INTO repo_seq (did, event_type, handle) VALUES ($1, 'identity', $2) RETURNING seq",
4
+
"describe": {
5
+
"columns": [
6
+
{
7
+
"ordinal": 0,
8
+
"name": "seq",
9
+
"type_info": "Int8"
10
+
}
11
+
],
12
+
"parameters": {
13
+
"Left": [
14
+
"Text",
15
+
"Text"
16
+
]
17
+
},
18
+
"nullable": [
19
+
false
20
+
]
21
+
},
22
+
"hash": "c4a99ff3485bfe5971b2a2c4144097ec168f9feb8c2500d5d4693c94ff6dbc75"
23
+
}
+16
.sqlx/query-e155d44cb2bd48ff141a27c51f34dfebeb628992a03f4bd6b10ade365ef8dc5e.json
+16
.sqlx/query-e155d44cb2bd48ff141a27c51f34dfebeb628992a03f4bd6b10ade365ef8dc5e.json
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "\n INSERT INTO record_blobs (repo_id, record_uri, blob_cid)\n VALUES ($1, $2, $3)\n ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING\n ",
4
+
"describe": {
5
+
"columns": [],
6
+
"parameters": {
7
+
"Left": [
8
+
"Uuid",
9
+
"Text",
10
+
"Text"
11
+
]
12
+
},
13
+
"nullable": []
14
+
},
15
+
"hash": "e155d44cb2bd48ff141a27c51f34dfebeb628992a03f4bd6b10ade365ef8dc5e"
16
+
}
+26
.sqlx/query-f3a7d87d9479500a9ddff82ea6de30334870a272d1a06cd003181b11d8f3b304.json
+26
.sqlx/query-f3a7d87d9479500a9ddff82ea6de30334870a272d1a06cd003181b11d8f3b304.json
···
1
+
{
2
+
"db_name": "PostgreSQL",
3
+
"query": "\n SELECT DISTINCT u.id as user_id, u.did\n FROM users u\n JOIN records r ON r.repo_id = u.id\n WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id)\n LIMIT 100\n ",
4
+
"describe": {
5
+
"columns": [
6
+
{
7
+
"ordinal": 0,
8
+
"name": "user_id",
9
+
"type_info": "Uuid"
10
+
},
11
+
{
12
+
"ordinal": 1,
13
+
"name": "did",
14
+
"type_info": "Text"
15
+
}
16
+
],
17
+
"parameters": {
18
+
"Left": []
19
+
},
20
+
"nullable": [
21
+
false,
22
+
false
23
+
]
24
+
},
25
+
"hash": "f3a7d87d9479500a9ddff82ea6de30334870a272d1a06cd003181b11d8f3b304"
26
+
}
+3
-6
.sqlx/query-ff7899984ea138f1e608fa862def47402369a428ac9116c653890e5fcaa0015b.json
.sqlx/query-72d9db2d1287fa43f69666a5259d3243e5d87807551565948ab99f1400b8cc4c.json
+3
-6
.sqlx/query-ff7899984ea138f1e608fa862def47402369a428ac9116c653890e5fcaa0015b.json
.sqlx/query-72d9db2d1287fa43f69666a5259d3243e5d87807551565948ab99f1400b8cc4c.json
···
1
1
{
2
2
"db_name": "PostgreSQL",
3
-
"query": "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
3
+
"query": "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1",
4
4
"describe": {
5
5
"columns": [
6
6
{
···
21
21
],
22
22
"parameters": {
23
23
"Left": [
24
-
"Uuid",
25
-
"Text",
26
-
"Text",
27
-
"Int8"
24
+
"Uuid"
28
25
]
29
26
},
30
27
"nullable": [
···
33
30
false
34
31
]
35
32
},
36
-
"hash": "ff7899984ea138f1e608fa862def47402369a428ac9116c653890e5fcaa0015b"
33
+
"hash": "72d9db2d1287fa43f69666a5259d3243e5d87807551565948ab99f1400b8cc4c"
37
34
}
-9
KNOWN_ISSUES.md
-9
KNOWN_ISSUES.md
···
1
-
# Known Issues
2
-
3
-
## Account migration from bsky.social
4
-
5
-
Migrating your account from bsky.social to this PDS works, but Bluesky's appview may not recognize your new signing key. This means you can post and your followers will see it, but some authenticated requests might fail with "jwt signature does not match jwt issuer".
6
-
7
-
We've been trying hard to verify that our side is correct (PLC updated, signing keys match, relays have the account) but something about how we're emitting events isn't triggering Bluesky's appview to refresh its identity data. Still investigating.
8
-
9
-
No workaround yet.
+8
frontend/src/components/migration/InboundWizard.svelte
+8
frontend/src/components/migration/InboundWizard.svelte
···
621
621
<div class="code-block">
622
622
<pre>{`{
623
623
"id": "${flow.state.sourceDid}",
624
+
"verificationMethod": [
625
+
{
626
+
"id": "${flow.state.sourceDid}#atproto",
627
+
"type": "Multikey",
628
+
"controller": "${flow.state.sourceDid}",
629
+
"publicKeyMultibase": "${flow.state.targetVerificationMethod?.replace('did:key:', '') || '...'}"
630
+
}
631
+
],
624
632
"service": [
625
633
{
626
634
"id": "#atproto_pds",
+8
frontend/src/lib/migration/flow.svelte.ts
+8
frontend/src/lib/migration/flow.svelte.ts
···
65
65
error: null,
66
66
requires2FA: false,
67
67
twoFactorCode: "",
68
+
targetVerificationMethod: null,
68
69
});
69
70
70
71
let sourceClient: AtprotoClient | null = null;
···
372
373
}
373
374
374
375
if (state.sourceDid.startsWith("did:web:")) {
376
+
const credentials = await localClient.getRecommendedDidCredentials();
377
+
state.targetVerificationMethod =
378
+
credentials.verificationMethods?.atproto || null;
375
379
setStep("did-web-update");
376
380
} else {
377
381
setProgress({ currentOperation: "Requesting PLC operation token..." });
···
406
410
state.targetPassword,
407
411
);
408
412
if (state.sourceDid.startsWith("did:web:")) {
413
+
const credentials = await localClient.getRecommendedDidCredentials();
414
+
state.targetVerificationMethod =
415
+
credentials.verificationMethods?.atproto || null;
409
416
setStep("did-web-update");
410
417
} else {
411
418
await sourceClient.requestPlcOperationSignature();
···
620
627
error: null,
621
628
requires2FA: false,
622
629
twoFactorCode: "",
630
+
targetVerificationMethod: null,
623
631
};
624
632
sourceClient = null;
625
633
clearMigrationState();
+1
frontend/src/lib/migration/types.ts
+1
frontend/src/lib/migration/types.ts
+11
migrations/20251243_record_blobs.sql
+11
migrations/20251243_record_blobs.sql
···
1
+
CREATE TABLE record_blobs (
2
+
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
3
+
repo_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
4
+
record_uri TEXT NOT NULL,
5
+
blob_cid TEXT NOT NULL,
6
+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
7
+
UNIQUE(repo_id, record_uri, blob_cid)
8
+
);
9
+
10
+
CREATE INDEX idx_record_blobs_repo_id ON record_blobs(repo_id);
11
+
CREATE INDEX idx_record_blobs_blob_cid ON record_blobs(blob_cid);
+3
-2
src/api/identity/plc/submit.rs
+3
-2
src/api/identity/plc/submit.rs
···
222
222
}
223
223
}
224
224
match sqlx::query!(
225
-
"INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq",
226
-
did
225
+
"INSERT INTO repo_seq (did, event_type, handle) VALUES ($1, 'identity', $2) RETURNING seq",
226
+
did,
227
+
user.handle
227
228
)
228
229
.fetch_one(&state.db)
229
230
.await
+28
-72
src/api/repo/blob.rs
+28
-72
src/api/repo/blob.rs
···
1
1
use crate::auth::{ServiceTokenVerifier, is_service_token};
2
2
use crate::delegation::{self, DelegationActionType};
3
3
use crate::state::AppState;
4
-
use crate::sync::import::find_blob_refs_ipld;
5
4
use axum::body::Bytes;
6
5
use axum::{
7
6
Json,
···
10
9
response::{IntoResponse, Response},
11
10
};
12
11
use cid::Cid;
13
-
use ipld_core::ipld::Ipld;
14
-
use jacquard_repo::storage::BlockStore;
15
12
use multihash::Multihash;
16
13
use serde::{Deserialize, Serialize};
17
14
use serde_json::json;
18
15
use sha2::{Digest, Sha256};
19
-
use std::str::FromStr;
20
-
use tracing::{debug, error, warn};
16
+
use tracing::{debug, error};
21
17
22
18
const MAX_BLOB_SIZE: usize = 10_000_000_000;
23
19
const MAX_VIDEO_BLOB_SIZE: usize = 10_000_000_000;
···
303
299
}
304
300
};
305
301
let limit = params.limit.unwrap_or(500).clamp(1, 1000);
306
-
let cursor_str = params.cursor.unwrap_or_default();
307
-
let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
308
-
let parts: Vec<&str> = cursor_str.split('|').collect();
309
-
(parts[0].to_string(), parts[1].to_string())
310
-
} else {
311
-
(String::new(), String::new())
312
-
};
313
-
let records_query = sqlx::query!(
314
-
"SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
302
+
let cursor_cid = params.cursor.as_deref().unwrap_or("");
303
+
let missing_query = sqlx::query!(
304
+
r#"
305
+
SELECT rb.blob_cid, rb.record_uri
306
+
FROM record_blobs rb
307
+
LEFT JOIN blobs b ON rb.blob_cid = b.cid AND b.created_by_user = rb.repo_id
308
+
WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2
309
+
ORDER BY rb.blob_cid
310
+
LIMIT $3
311
+
"#,
315
312
user_id,
316
-
cursor_collection,
317
-
cursor_rkey,
318
-
limit
313
+
cursor_cid,
314
+
limit + 1
319
315
)
320
316
.fetch_all(&state.db)
321
317
.await;
322
-
let records = match records_query {
318
+
let rows = match missing_query {
323
319
Ok(r) => r,
324
320
Err(e) => {
325
-
error!("DB error fetching records: {:?}", e);
321
+
error!("DB error fetching missing blobs: {:?}", e);
326
322
return (
327
323
StatusCode::INTERNAL_SERVER_ERROR,
328
324
Json(json!({"error": "InternalError"})),
···
330
326
.into_response();
331
327
}
332
328
};
333
-
let mut missing_blobs = Vec::new();
334
-
let mut last_cursor = None;
335
-
for row in &records {
336
-
let collection = &row.collection;
337
-
let rkey = &row.rkey;
338
-
let record_cid_str = &row.record_cid;
339
-
last_cursor = Some(format!("{}|{}", collection, rkey));
340
-
let record_cid = match Cid::from_str(record_cid_str) {
341
-
Ok(c) => c,
342
-
Err(_) => continue,
343
-
};
344
-
let block_bytes = match state.block_store.get(&record_cid).await {
345
-
Ok(Some(b)) => b,
346
-
_ => continue,
347
-
};
348
-
let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) {
349
-
Ok(v) => v,
350
-
Err(e) => {
351
-
warn!("Failed to parse record {} as IPLD: {:?}", record_cid_str, e);
352
-
continue;
353
-
}
354
-
};
355
-
let blob_refs = find_blob_refs_ipld(&record_ipld, 0);
356
-
for blob_ref in blob_refs {
357
-
let blob_cid_str = blob_ref.cid;
358
-
let exists = sqlx::query!(
359
-
"SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2",
360
-
blob_cid_str,
361
-
user_id
362
-
)
363
-
.fetch_optional(&state.db)
364
-
.await;
365
-
match exists {
366
-
Ok(None) => {
367
-
missing_blobs.push(RecordBlob {
368
-
cid: blob_cid_str,
369
-
record_uri: format!("at://{}/{}/{}", did, collection, rkey),
370
-
});
371
-
}
372
-
Err(e) => {
373
-
error!("DB error checking blob existence: {:?}", e);
374
-
}
375
-
_ => {}
376
-
}
377
-
}
378
-
}
379
-
// if we fetched fewer records than limit, we are done, so cursor is None.
380
-
// otherwise, cursor is the last one we saw.
381
-
// ...right?
382
-
let next_cursor = if records.len() < limit as usize {
329
+
let has_more = rows.len() > limit as usize;
330
+
let blobs: Vec<RecordBlob> = rows
331
+
.into_iter()
332
+
.take(limit as usize)
333
+
.map(|row| RecordBlob {
334
+
cid: row.blob_cid,
335
+
record_uri: row.record_uri,
336
+
})
337
+
.collect();
338
+
let next_cursor = if has_more {
339
+
blobs.last().map(|b| b.cid.clone())
340
+
} else {
383
341
None
384
-
} else {
385
-
last_cursor
386
342
};
387
343
(
388
344
StatusCode::OK,
389
345
Json(ListMissingBlobsOutput {
390
346
cursor: next_cursor,
391
-
blobs: missing_blobs,
347
+
blobs,
392
348
}),
393
349
)
394
350
.into_response()
+26
src/api/repo/import.rs
+26
src/api/repo/import.rs
···
322
322
import_result.records.len(),
323
323
did
324
324
);
325
+
let mut blob_ref_count = 0;
326
+
for record in &import_result.records {
327
+
for blob_ref in &record.blob_refs {
328
+
let record_uri = format!("at://{}/{}/{}", did, record.collection, record.rkey);
329
+
if let Err(e) = sqlx::query!(
330
+
r#"
331
+
INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
332
+
VALUES ($1, $2, $3)
333
+
ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
334
+
"#,
335
+
user_id,
336
+
record_uri,
337
+
blob_ref.cid
338
+
)
339
+
.execute(&state.db)
340
+
.await
341
+
{
342
+
warn!("Failed to insert record_blob for {}: {:?}", record_uri, e);
343
+
} else {
344
+
blob_ref_count += 1;
345
+
}
346
+
}
347
+
}
348
+
if blob_ref_count > 0 {
349
+
info!("Recorded {} blob references for imported repo", blob_ref_count);
350
+
}
325
351
let key_row = match sqlx::query!(
326
352
r#"SELECT uk.key_bytes, uk.encryption_version
327
353
FROM user_keys uk
+11
-3
src/api/server/account_status.rs
+11
-3
src/api/server/account_status.rs
···
122
122
.await
123
123
.unwrap_or(Some(0))
124
124
.unwrap_or(0);
125
-
let blob_count: i64 = sqlx::query_scalar!(
125
+
let imported_blobs: i64 = sqlx::query_scalar!(
126
126
"SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
127
+
user_id
128
+
)
129
+
.fetch_one(&state.db)
130
+
.await
131
+
.unwrap_or(Some(0))
132
+
.unwrap_or(0);
133
+
let expected_blobs: i64 = sqlx::query_scalar!(
134
+
"SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1",
127
135
user_id
128
136
)
129
137
.fetch_one(&state.db)
···
141
149
repo_blocks: block_count as i64,
142
150
indexed_records: record_count,
143
151
private_state_values: 0,
144
-
expected_blobs: blob_count,
145
-
imported_blobs: blob_count,
152
+
expected_blobs,
153
+
imported_blobs,
146
154
}),
147
155
)
148
156
.into_response()
+3
-2
src/main.rs
+3
-2
src/main.rs
···
5
5
use tracing::{error, info, warn};
6
6
use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender};
7
7
use tranquil_pds::crawlers::{Crawlers, start_crawlers_service};
8
-
use tranquil_pds::scheduled::{backfill_genesis_commit_blocks, backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks};
8
+
use tranquil_pds::scheduled::{backfill_genesis_commit_blocks, backfill_record_blobs, backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks};
9
9
use tranquil_pds::state::AppState;
10
10
11
11
#[tokio::main]
···
34
34
tokio::spawn(async move {
35
35
backfill_genesis_commit_blocks(&backfill_db, backfill_block_store.clone()).await;
36
36
backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await;
37
-
backfill_user_blocks(&backfill_db, backfill_block_store).await;
37
+
backfill_user_blocks(&backfill_db, backfill_block_store.clone()).await;
38
+
backfill_record_blobs(&backfill_db, backfill_block_store).await;
38
39
});
39
40
40
41
let mut comms_service = CommsService::new(state.db.clone());
+106
src/scheduled.rs
+106
src/scheduled.rs
···
293
293
info!(success, failed, "Completed user_blocks backfill");
294
294
}
295
295
296
+
pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) {
297
+
let users_needing_backfill = match sqlx::query!(
298
+
r#"
299
+
SELECT DISTINCT u.id as user_id, u.did
300
+
FROM users u
301
+
JOIN records r ON r.repo_id = u.id
302
+
WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id)
303
+
LIMIT 100
304
+
"#
305
+
)
306
+
.fetch_all(db)
307
+
.await
308
+
{
309
+
Ok(rows) => rows,
310
+
Err(e) => {
311
+
error!("Failed to query users for record_blobs backfill: {}", e);
312
+
return;
313
+
}
314
+
};
315
+
316
+
if users_needing_backfill.is_empty() {
317
+
debug!("No users need record_blobs backfill");
318
+
return;
319
+
}
320
+
321
+
info!(
322
+
count = users_needing_backfill.len(),
323
+
"Backfilling record_blobs for existing repos"
324
+
);
325
+
326
+
let mut success = 0;
327
+
let mut failed = 0;
328
+
329
+
for user in users_needing_backfill {
330
+
let records = match sqlx::query!(
331
+
"SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1",
332
+
user.user_id
333
+
)
334
+
.fetch_all(db)
335
+
.await
336
+
{
337
+
Ok(r) => r,
338
+
Err(e) => {
339
+
warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill");
340
+
failed += 1;
341
+
continue;
342
+
}
343
+
};
344
+
345
+
let mut blob_refs_found = 0;
346
+
for record in records {
347
+
let record_cid = match Cid::from_str(&record.record_cid) {
348
+
Ok(c) => c,
349
+
Err(_) => continue,
350
+
};
351
+
352
+
let block_bytes = match block_store.get(&record_cid).await {
353
+
Ok(Some(b)) => b,
354
+
_ => continue,
355
+
};
356
+
357
+
let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) {
358
+
Ok(v) => v,
359
+
Err(_) => continue,
360
+
};
361
+
362
+
let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0);
363
+
for blob_ref in blob_refs {
364
+
let record_uri = format!(
365
+
"at://{}/{}/{}",
366
+
user.did, record.collection, record.rkey
367
+
);
368
+
if let Err(e) = sqlx::query!(
369
+
r#"
370
+
INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
371
+
VALUES ($1, $2, $3)
372
+
ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
373
+
"#,
374
+
user.user_id,
375
+
record_uri,
376
+
blob_ref.cid
377
+
)
378
+
.execute(db)
379
+
.await
380
+
{
381
+
warn!(error = %e, "Failed to insert record_blob during backfill");
382
+
} else {
383
+
blob_refs_found += 1;
384
+
}
385
+
}
386
+
}
387
+
388
+
if blob_refs_found > 0 {
389
+
info!(
390
+
user_id = %user.user_id,
391
+
did = %user.did,
392
+
blob_refs = blob_refs_found,
393
+
"Backfilled record_blobs"
394
+
);
395
+
}
396
+
success += 1;
397
+
}
398
+
399
+
info!(success, failed, "Completed record_blobs backfill");
400
+
}
401
+
296
402
pub async fn start_scheduled_tasks(
297
403
db: PgPool,
298
404
blob_store: Arc<dyn BlobStorage>,