+237
src/api/feed/timeline.rs
+237
src/api/feed/timeline.rs
···
1
+
// Yes, I know, this endpoint is an appview one, not for PDS. Who cares!!
2
+
// Yes, this only gets posts that our DB/instance knows about. Who cares!!!
3
+
4
+
use crate::state::AppState;
5
+
use axum::{
6
+
Json,
7
+
extract::State,
8
+
http::StatusCode,
9
+
response::{IntoResponse, Response},
10
+
};
11
+
use jacquard_repo::storage::BlockStore;
12
+
use serde::Serialize;
13
+
use serde_json::{Value, json};
14
+
use sqlx::Row;
15
+
use tracing::error;
16
+
17
+
#[derive(Serialize)]
18
+
pub struct TimelineOutput {
19
+
pub feed: Vec<FeedViewPost>,
20
+
pub cursor: Option<String>,
21
+
}
22
+
23
+
#[derive(Serialize)]
24
+
pub struct FeedViewPost {
25
+
pub post: PostView,
26
+
}
27
+
28
+
#[derive(Serialize)]
29
+
#[serde(rename_all = "camelCase")]
30
+
pub struct PostView {
31
+
pub uri: String,
32
+
pub cid: String,
33
+
pub author: AuthorView,
34
+
pub record: Value,
35
+
pub indexed_at: String,
36
+
}
37
+
38
+
#[derive(Serialize)]
39
+
pub struct AuthorView {
40
+
pub did: String,
41
+
pub handle: String,
42
+
}
43
+
44
+
pub async fn get_timeline(
45
+
State(state): State<AppState>,
46
+
headers: axum::http::HeaderMap,
47
+
) -> Response {
48
+
let auth_header = headers.get("Authorization");
49
+
if auth_header.is_none() {
50
+
return (
51
+
StatusCode::UNAUTHORIZED,
52
+
Json(json!({"error": "AuthenticationRequired"})),
53
+
)
54
+
.into_response();
55
+
}
56
+
let token = auth_header
57
+
.unwrap()
58
+
.to_str()
59
+
.unwrap_or("")
60
+
.replace("Bearer ", "");
61
+
62
+
let session = sqlx::query(
63
+
"SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
64
+
)
65
+
.bind(&token)
66
+
.fetch_optional(&state.db)
67
+
.await
68
+
.unwrap_or(None);
69
+
70
+
let (did, key_bytes) = match session {
71
+
Some(row) => (
72
+
row.get::<String, _>("did"),
73
+
row.get::<Vec<u8>, _>("key_bytes"),
74
+
),
75
+
None => {
76
+
return (
77
+
StatusCode::UNAUTHORIZED,
78
+
Json(json!({"error": "AuthenticationFailed"})),
79
+
)
80
+
.into_response();
81
+
}
82
+
};
83
+
84
+
if crate::auth::verify_token(&token, &key_bytes).is_err() {
85
+
return (
86
+
StatusCode::UNAUTHORIZED,
87
+
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
88
+
)
89
+
.into_response();
90
+
}
91
+
92
+
let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
93
+
.bind(&did)
94
+
.fetch_optional(&state.db)
95
+
.await;
96
+
97
+
let user_id: uuid::Uuid = match user_query {
98
+
Ok(Some(row)) => row.get("id"),
99
+
_ => {
100
+
return (
101
+
StatusCode::INTERNAL_SERVER_ERROR,
102
+
Json(json!({"error": "InternalError", "message": "User not found"})),
103
+
)
104
+
.into_response();
105
+
}
106
+
};
107
+
108
+
let follows_query = sqlx::query(
109
+
"SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow'"
110
+
)
111
+
.bind(user_id)
112
+
.fetch_all(&state.db)
113
+
.await;
114
+
115
+
let follow_cids: Vec<String> = match follows_query {
116
+
Ok(rows) => rows.iter().map(|r| r.get("record_cid")).collect(),
117
+
Err(e) => {
118
+
error!("Failed to get follows: {:?}", e);
119
+
return (
120
+
StatusCode::INTERNAL_SERVER_ERROR,
121
+
Json(json!({"error": "InternalError"})),
122
+
)
123
+
.into_response();
124
+
}
125
+
};
126
+
127
+
let mut followed_dids: Vec<String> = Vec::new();
128
+
for cid_str in follow_cids {
129
+
let cid = match cid_str.parse::<cid::Cid>() {
130
+
Ok(c) => c,
131
+
Err(_) => continue,
132
+
};
133
+
134
+
let block_bytes = match state.block_store.get(&cid).await {
135
+
Ok(Some(b)) => b,
136
+
_ => continue,
137
+
};
138
+
139
+
let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
140
+
Ok(v) => v,
141
+
Err(_) => continue,
142
+
};
143
+
144
+
if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) {
145
+
followed_dids.push(subject.to_string());
146
+
}
147
+
}
148
+
149
+
if followed_dids.is_empty() {
150
+
return (
151
+
StatusCode::OK,
152
+
Json(TimelineOutput {
153
+
feed: vec![],
154
+
cursor: None,
155
+
}),
156
+
)
157
+
.into_response();
158
+
}
159
+
160
+
let placeholders: Vec<String> = followed_dids
161
+
.iter()
162
+
.enumerate()
163
+
.map(|(i, _)| format!("${}", i + 1))
164
+
.collect();
165
+
166
+
let posts_query = format!(
167
+
"SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle
168
+
FROM records r
169
+
JOIN repos rp ON r.repo_id = rp.user_id
170
+
JOIN users u ON rp.user_id = u.id
171
+
WHERE u.did IN ({}) AND r.collection = 'app.bsky.feed.post'
172
+
ORDER BY r.created_at DESC
173
+
LIMIT 50",
174
+
placeholders.join(", ")
175
+
);
176
+
177
+
let mut query = sqlx::query(&posts_query);
178
+
for did in &followed_dids {
179
+
query = query.bind(did);
180
+
}
181
+
182
+
let posts_result = query.fetch_all(&state.db).await;
183
+
184
+
let posts = match posts_result {
185
+
Ok(rows) => rows,
186
+
Err(e) => {
187
+
error!("Failed to get posts: {:?}", e);
188
+
return (
189
+
StatusCode::INTERNAL_SERVER_ERROR,
190
+
Json(json!({"error": "InternalError"})),
191
+
)
192
+
.into_response();
193
+
}
194
+
};
195
+
196
+
let mut feed: Vec<FeedViewPost> = Vec::new();
197
+
198
+
for row in posts {
199
+
let record_cid: String = row.get("record_cid");
200
+
let rkey: String = row.get("rkey");
201
+
let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
202
+
let author_did: String = row.get("did");
203
+
let author_handle: String = row.get("handle");
204
+
205
+
let cid = match record_cid.parse::<cid::Cid>() {
206
+
Ok(c) => c,
207
+
Err(_) => continue,
208
+
};
209
+
210
+
let block_bytes = match state.block_store.get(&cid).await {
211
+
Ok(Some(b)) => b,
212
+
_ => continue,
213
+
};
214
+
215
+
let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
216
+
Ok(v) => v,
217
+
Err(_) => continue,
218
+
};
219
+
220
+
let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey);
221
+
222
+
feed.push(FeedViewPost {
223
+
post: PostView {
224
+
uri,
225
+
cid: record_cid,
226
+
author: AuthorView {
227
+
did: author_did,
228
+
handle: author_handle,
229
+
},
230
+
record,
231
+
indexed_at: created_at.to_rfc3339(),
232
+
},
233
+
});
234
+
}
235
+
236
+
(StatusCode::OK, Json(TimelineOutput { feed, cursor: None })).into_response()
237
+
}
+2
-2
src/api/identity/account.rs
+2
-2
src/api/identity/account.rs
···
206
206
}
207
207
208
208
let mst = Mst::new(Arc::new(state.block_store.clone()));
209
-
let mst_root = match mst.root().await {
209
+
let mst_root = match mst.persist().await {
210
210
Ok(c) => c,
211
211
Err(e) => {
212
-
error!("Error creating MST root: {:?}", e);
212
+
error!("Error persisting MST: {:?}", e);
213
213
return (
214
214
StatusCode::INTERNAL_SERVER_ERROR,
215
215
Json(json!({"error": "InternalError"})),
+11
-7
src/api/repo/record/delete.rs
+11
-7
src/api/repo/record/delete.rs
···
151
151
152
152
// TODO: Check swapRecord if provided? Skipping for brevity/robustness
153
153
154
-
if let Err(e) = mst.delete(&key).await {
155
-
error!("Failed to delete from MST: {:?}", e);
156
-
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to delete from MST: {:?}", e)}))).into_response();
157
-
}
154
+
let new_mst = match mst.delete(&key).await {
155
+
Ok(m) => m,
156
+
Err(e) => {
157
+
error!("Failed to delete from MST: {:?}", e);
158
+
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to delete from MST: {:?}", e)}))).into_response();
159
+
}
160
+
};
158
161
159
-
let new_mst_root = match mst.root().await {
162
+
let new_mst_root = match new_mst.persist().await {
160
163
Ok(c) => c,
161
-
Err(_e) => {
164
+
Err(e) => {
165
+
error!("Failed to persist MST: {:?}", e);
162
166
return (
163
167
StatusCode::INTERNAL_SERVER_ERROR,
164
-
Json(json!({"error": "InternalError", "message": "Failed to get new MST root"})),
168
+
Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
165
169
)
166
170
.into_response();
167
171
}
+81
-16
src/api/repo/record/write.rs
+81
-16
src/api/repo/record/write.rs
···
205
205
};
206
206
207
207
let key = format!("{}/{}", collection_nsid, rkey);
208
-
if let Err(e) = mst.update(&key, record_cid).await {
209
-
error!("Failed to update MST: {:?}", e);
210
-
return (
211
-
StatusCode::INTERNAL_SERVER_ERROR,
212
-
Json(json!({"error": "InternalError"})),
213
-
)
214
-
.into_response();
215
-
}
208
+
let new_mst = match mst.add(&key, record_cid).await {
209
+
Ok(m) => m,
210
+
Err(e) => {
211
+
error!("Failed to add to MST: {:?}", e);
212
+
return (
213
+
StatusCode::INTERNAL_SERVER_ERROR,
214
+
Json(json!({"error": "InternalError"})),
215
+
)
216
+
.into_response();
217
+
}
218
+
};
216
219
217
-
let new_mst_root = match mst.root().await {
220
+
let new_mst_root = match new_mst.persist().await {
218
221
Ok(c) => c,
219
222
Err(e) => {
220
-
error!("Failed to get new MST root: {:?}", e);
223
+
error!("Failed to persist MST: {:?}", e);
221
224
return (
222
225
StatusCode::INTERNAL_SERVER_ERROR,
223
226
Json(json!({"error": "InternalError"})),
···
317
320
pub record: serde_json::Value,
318
321
#[serde(rename = "swapCommit")]
319
322
pub swap_commit: Option<String>,
323
+
#[serde(rename = "swapRecord")]
324
+
pub swap_record: Option<String>,
320
325
}
321
326
322
327
#[derive(Serialize)]
···
490
495
};
491
496
492
497
let key = format!("{}/{}", collection_nsid, rkey);
493
-
if let Err(e) = mst.update(&key, record_cid).await {
494
-
error!("Failed to update MST: {:?}", e);
495
-
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response();
498
+
499
+
let existing = match mst.get(&key).await {
500
+
Ok(v) => v,
501
+
Err(e) => {
502
+
error!("Failed to check MST key: {:?}", e);
503
+
return (
504
+
StatusCode::INTERNAL_SERVER_ERROR,
505
+
Json(
506
+
json!({"error": "InternalError", "message": "Failed to check existing record"}),
507
+
),
508
+
)
509
+
.into_response();
510
+
}
511
+
};
512
+
513
+
if let Some(swap_record_str) = &input.swap_record {
514
+
let swap_record_cid = match Cid::from_str(swap_record_str) {
515
+
Ok(c) => c,
516
+
Err(_) => {
517
+
return (
518
+
StatusCode::BAD_REQUEST,
519
+
Json(
520
+
json!({"error": "InvalidSwapRecord", "message": "Invalid swapRecord CID"}),
521
+
),
522
+
)
523
+
.into_response();
524
+
}
525
+
};
526
+
match &existing {
527
+
Some(current_cid) if *current_cid != swap_record_cid => {
528
+
return (
529
+
StatusCode::CONFLICT,
530
+
Json(json!({"error": "InvalidSwap", "message": "Record has been modified"})),
531
+
)
532
+
.into_response();
533
+
}
534
+
None => {
535
+
return (
536
+
StatusCode::CONFLICT,
537
+
Json(json!({"error": "InvalidSwap", "message": "Record does not exist"})),
538
+
)
539
+
.into_response();
540
+
}
541
+
_ => {}
542
+
}
496
543
}
497
544
498
-
let new_mst_root = match mst.root().await {
545
+
let new_mst = if existing.is_some() {
546
+
match mst.update(&key, record_cid).await {
547
+
Ok(m) => m,
548
+
Err(e) => {
549
+
error!("Failed to update MST: {:?}", e);
550
+
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response();
551
+
}
552
+
}
553
+
} else {
554
+
match mst.add(&key, record_cid).await {
555
+
Ok(m) => m,
556
+
Err(e) => {
557
+
error!("Failed to add to MST: {:?}", e);
558
+
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to add to MST: {:?}", e)}))).into_response();
559
+
}
560
+
}
561
+
};
562
+
563
+
let new_mst_root = match new_mst.persist().await {
499
564
Ok(c) => c,
500
565
Err(e) => {
501
-
error!("Failed to get new MST root: {:?}", e);
566
+
error!("Failed to persist MST: {:?}", e);
502
567
return (
503
568
StatusCode::INTERNAL_SERVER_ERROR,
504
-
Json(json!({"error": "InternalError", "message": "Failed to get new MST root"})),
569
+
Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
505
570
)
506
571
.into_response();
507
572
}
+4
src/lib.rs
+4
src/lib.rs
···
65
65
"/xrpc/com.atproto.repo.uploadBlob",
66
66
post(api::repo::upload_blob),
67
67
)
68
+
.route(
69
+
"/xrpc/app.bsky.feed.getTimeline",
70
+
get(api::feed::get_timeline),
71
+
)
68
72
.route("/.well-known/did.json", get(api::identity::well_known_did))
69
73
.route("/u/{handle}/did.json", get(api::identity::user_did_doc))
70
74
.route("/xrpc/{*method}", any(api::proxy::proxy_handler))
+3
-6
tests/lifecycle.rs
+3
-6
tests/lifecycle.rs
···
53
53
}
54
54
55
55
#[tokio::test]
56
-
#[ignore]
57
56
async fn test_post_crud_lifecycle() {
58
57
let client = client();
59
58
let (did, jwt) = setup_new_user("lifecycle-crud").await;
···
221
220
}
222
221
223
222
#[tokio::test]
224
-
#[ignore]
225
223
async fn test_record_update_conflict_lifecycle() {
226
224
let client = client();
227
225
let (user_did, user_jwt) = setup_new_user("user-conflict").await;
···
277
275
"$type": "app.bsky.actor.profile",
278
276
"displayName": "Updated Name (v2)"
279
277
},
280
-
"swapCommit": cid_v1 // <-- Correctly point to v1
278
+
"swapRecord": cid_v1
281
279
});
282
280
let update_res_v2 = client
283
281
.post(format!(
···
308
306
"$type": "app.bsky.actor.profile",
309
307
"displayName": "Stale Update (v3)"
310
308
},
311
-
"swapCommit": cid_v1
309
+
"swapRecord": cid_v1
312
310
});
313
311
let update_res_v3_stale = client
314
312
.post(format!(
···
335
333
"$type": "app.bsky.actor.profile",
336
334
"displayName": "Good Update (v3)"
337
335
},
338
-
"swapCommit": cid_v2 // <-- Correct
336
+
"swapRecord": cid_v2
339
337
});
340
338
let update_res_v3_good = client
341
339
.post(format!(
···
448
446
}
449
447
450
448
#[tokio::test]
451
-
#[ignore]
452
449
async fn test_social_flow_lifecycle() {
453
450
let client = client();
454
451