this repo has no description
1// Yes, I know, this endpoint is an appview one, not for PDS. Who cares!!
2// Yes, this only gets posts that our DB/instance knows about. Who cares!!!
3
4use crate::state::AppState;
5use axum::{
6 Json,
7 extract::State,
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use jacquard_repo::storage::BlockStore;
12use serde::Serialize;
13use serde_json::{Value, json};
14use sqlx::Row;
15use tracing::error;
16
17#[derive(Serialize)]
18pub struct TimelineOutput {
19 pub feed: Vec<FeedViewPost>,
20 pub cursor: Option<String>,
21}
22
23#[derive(Serialize)]
24pub struct FeedViewPost {
25 pub post: PostView,
26}
27
28#[derive(Serialize)]
29#[serde(rename_all = "camelCase")]
30pub struct PostView {
31 pub uri: String,
32 pub cid: String,
33 pub author: AuthorView,
34 pub record: Value,
35 pub indexed_at: String,
36}
37
38#[derive(Serialize)]
39pub struct AuthorView {
40 pub did: String,
41 pub handle: String,
42}
43
44pub async fn get_timeline(
45 State(state): State<AppState>,
46 headers: axum::http::HeaderMap,
47) -> Response {
48 let auth_header = headers.get("Authorization");
49 if auth_header.is_none() {
50 return (
51 StatusCode::UNAUTHORIZED,
52 Json(json!({"error": "AuthenticationRequired"})),
53 )
54 .into_response();
55 }
56 let token = auth_header
57 .unwrap()
58 .to_str()
59 .unwrap_or("")
60 .replace("Bearer ", "");
61
62 let session = sqlx::query(
63 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
64 )
65 .bind(&token)
66 .fetch_optional(&state.db)
67 .await
68 .unwrap_or(None);
69
70 let (did, key_bytes) = match session {
71 Some(row) => (
72 row.get::<String, _>("did"),
73 row.get::<Vec<u8>, _>("key_bytes"),
74 ),
75 None => {
76 return (
77 StatusCode::UNAUTHORIZED,
78 Json(json!({"error": "AuthenticationFailed"})),
79 )
80 .into_response();
81 }
82 };
83
84 if crate::auth::verify_token(&token, &key_bytes).is_err() {
85 return (
86 StatusCode::UNAUTHORIZED,
87 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
88 )
89 .into_response();
90 }
91
92 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
93 .bind(&did)
94 .fetch_optional(&state.db)
95 .await;
96
97 let user_id: uuid::Uuid = match user_query {
98 Ok(Some(row)) => row.get("id"),
99 _ => {
100 return (
101 StatusCode::INTERNAL_SERVER_ERROR,
102 Json(json!({"error": "InternalError", "message": "User not found"})),
103 )
104 .into_response();
105 }
106 };
107
108 let follows_query = sqlx::query(
109 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow'"
110 )
111 .bind(user_id)
112 .fetch_all(&state.db)
113 .await;
114
115 let follow_cids: Vec<String> = match follows_query {
116 Ok(rows) => rows.iter().map(|r| r.get("record_cid")).collect(),
117 Err(e) => {
118 error!("Failed to get follows: {:?}", e);
119 return (
120 StatusCode::INTERNAL_SERVER_ERROR,
121 Json(json!({"error": "InternalError"})),
122 )
123 .into_response();
124 }
125 };
126
127 let mut followed_dids: Vec<String> = Vec::new();
128 for cid_str in follow_cids {
129 let cid = match cid_str.parse::<cid::Cid>() {
130 Ok(c) => c,
131 Err(_) => continue,
132 };
133
134 let block_bytes = match state.block_store.get(&cid).await {
135 Ok(Some(b)) => b,
136 _ => continue,
137 };
138
139 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
140 Ok(v) => v,
141 Err(_) => continue,
142 };
143
144 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) {
145 followed_dids.push(subject.to_string());
146 }
147 }
148
149 if followed_dids.is_empty() {
150 return (
151 StatusCode::OK,
152 Json(TimelineOutput {
153 feed: vec![],
154 cursor: None,
155 }),
156 )
157 .into_response();
158 }
159
160 let placeholders: Vec<String> = followed_dids
161 .iter()
162 .enumerate()
163 .map(|(i, _)| format!("${}", i + 1))
164 .collect();
165
166 let posts_query = format!(
167 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle
168 FROM records r
169 JOIN repos rp ON r.repo_id = rp.user_id
170 JOIN users u ON rp.user_id = u.id
171 WHERE u.did IN ({}) AND r.collection = 'app.bsky.feed.post'
172 ORDER BY r.created_at DESC
173 LIMIT 50",
174 placeholders.join(", ")
175 );
176
177 let mut query = sqlx::query(&posts_query);
178 for did in &followed_dids {
179 query = query.bind(did);
180 }
181
182 let posts_result = query.fetch_all(&state.db).await;
183
184 let posts = match posts_result {
185 Ok(rows) => rows,
186 Err(e) => {
187 error!("Failed to get posts: {:?}", e);
188 return (
189 StatusCode::INTERNAL_SERVER_ERROR,
190 Json(json!({"error": "InternalError"})),
191 )
192 .into_response();
193 }
194 };
195
196 let mut feed: Vec<FeedViewPost> = Vec::new();
197
198 for row in posts {
199 let record_cid: String = row.get("record_cid");
200 let rkey: String = row.get("rkey");
201 let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
202 let author_did: String = row.get("did");
203 let author_handle: String = row.get("handle");
204
205 let cid = match record_cid.parse::<cid::Cid>() {
206 Ok(c) => c,
207 Err(_) => continue,
208 };
209
210 let block_bytes = match state.block_store.get(&cid).await {
211 Ok(Some(b)) => b,
212 _ => continue,
213 };
214
215 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
216 Ok(v) => v,
217 Err(_) => continue,
218 };
219
220 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey);
221
222 feed.push(FeedViewPost {
223 post: PostView {
224 uri,
225 cid: record_cid,
226 author: AuthorView {
227 did: author_did,
228 handle: author_handle,
229 },
230 record,
231 indexed_at: created_at.to_rfc3339(),
232 },
233 });
234 }
235
236 (StatusCode::OK, Json(TimelineOutput { feed, cursor: None })).into_response()
237}