this repo has no description
1// Yes, I know, this endpoint is an appview one, not for PDS. Who cares!!
2// Yes, this only gets posts that our DB/instance knows about. Who cares!!!
3
4use crate::state::AppState;
5use axum::{
6 Json,
7 extract::State,
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use jacquard_repo::storage::BlockStore;
12use serde::Serialize;
13use serde_json::{Value, json};
14use sqlx::Row;
15use tracing::error;
16
17#[derive(Serialize)]
18pub struct TimelineOutput {
19 pub feed: Vec<FeedViewPost>,
20 pub cursor: Option<String>,
21}
22
23#[derive(Serialize)]
24pub struct FeedViewPost {
25 pub post: PostView,
26}
27
28#[derive(Serialize)]
29#[serde(rename_all = "camelCase")]
30pub struct PostView {
31 pub uri: String,
32 pub cid: String,
33 pub author: AuthorView,
34 pub record: Value,
35 pub indexed_at: String,
36}
37
38#[derive(Serialize)]
39pub struct AuthorView {
40 pub did: String,
41 pub handle: String,
42}
43
44pub async fn get_timeline(
45 State(state): State<AppState>,
46 headers: axum::http::HeaderMap,
47) -> Response {
48 let auth_header = headers.get("Authorization");
49 if auth_header.is_none() {
50 return (
51 StatusCode::UNAUTHORIZED,
52 Json(json!({"error": "AuthenticationRequired"})),
53 )
54 .into_response();
55 }
56 let token = auth_header
57 .unwrap()
58 .to_str()
59 .unwrap_or("")
60 .replace("Bearer ", "");
61
62 let session = sqlx::query!(
63 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1",
64 token
65 )
66 .fetch_optional(&state.db)
67 .await
68 .unwrap_or(None);
69
70 let (did, key_bytes) = match session {
71 Some(row) => (row.did, row.key_bytes),
72 None => {
73 return (
74 StatusCode::UNAUTHORIZED,
75 Json(json!({"error": "AuthenticationFailed"})),
76 )
77 .into_response();
78 }
79 };
80
81 if crate::auth::verify_token(&token, &key_bytes).is_err() {
82 return (
83 StatusCode::UNAUTHORIZED,
84 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
85 )
86 .into_response();
87 }
88
89 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
90 .fetch_optional(&state.db)
91 .await;
92
93 let user_id = match user_query {
94 Ok(Some(row)) => row.id,
95 _ => {
96 return (
97 StatusCode::INTERNAL_SERVER_ERROR,
98 Json(json!({"error": "InternalError", "message": "User not found"})),
99 )
100 .into_response();
101 }
102 };
103
104 let follows_query = sqlx::query!(
105 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow'",
106 user_id
107 )
108 .fetch_all(&state.db)
109 .await;
110
111 let follow_cids: Vec<String> = match follows_query {
112 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(),
113 Err(e) => {
114 error!("Failed to get follows: {:?}", e);
115 return (
116 StatusCode::INTERNAL_SERVER_ERROR,
117 Json(json!({"error": "InternalError"})),
118 )
119 .into_response();
120 }
121 };
122
123 let mut followed_dids: Vec<String> = Vec::new();
124 for cid_str in follow_cids {
125 let cid = match cid_str.parse::<cid::Cid>() {
126 Ok(c) => c,
127 Err(_) => continue,
128 };
129
130 let block_bytes = match state.block_store.get(&cid).await {
131 Ok(Some(b)) => b,
132 _ => continue,
133 };
134
135 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
136 Ok(v) => v,
137 Err(_) => continue,
138 };
139
140 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) {
141 followed_dids.push(subject.to_string());
142 }
143 }
144
145 if followed_dids.is_empty() {
146 return (
147 StatusCode::OK,
148 Json(TimelineOutput {
149 feed: vec![],
150 cursor: None,
151 }),
152 )
153 .into_response();
154 }
155
156 let placeholders: Vec<String> = followed_dids
157 .iter()
158 .enumerate()
159 .map(|(i, _)| format!("${}", i + 1))
160 .collect();
161
162 let posts_query = format!(
163 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle
164 FROM records r
165 JOIN repos rp ON r.repo_id = rp.user_id
166 JOIN users u ON rp.user_id = u.id
167 WHERE u.did IN ({}) AND r.collection = 'app.bsky.feed.post'
168 ORDER BY r.created_at DESC
169 LIMIT 50",
170 placeholders.join(", ")
171 );
172
173 let mut query = sqlx::query(&posts_query);
174 for did in &followed_dids {
175 query = query.bind(did);
176 }
177
178 let posts_result = query.fetch_all(&state.db).await;
179
180 let posts = match posts_result {
181 Ok(rows) => rows,
182 Err(e) => {
183 error!("Failed to get posts: {:?}", e);
184 return (
185 StatusCode::INTERNAL_SERVER_ERROR,
186 Json(json!({"error": "InternalError"})),
187 )
188 .into_response();
189 }
190 };
191
192 let mut feed: Vec<FeedViewPost> = Vec::new();
193
194 for row in posts {
195 let record_cid: String = row.get("record_cid");
196 let rkey: String = row.get("rkey");
197 let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
198 let author_did: String = row.get("did");
199 let author_handle: String = row.get("handle");
200
201 let cid = match record_cid.parse::<cid::Cid>() {
202 Ok(c) => c,
203 Err(_) => continue,
204 };
205
206 let block_bytes = match state.block_store.get(&cid).await {
207 Ok(Some(b)) => b,
208 _ => continue,
209 };
210
211 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
212 Ok(v) => v,
213 Err(_) => continue,
214 };
215
216 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey);
217
218 feed.push(FeedViewPost {
219 post: PostView {
220 uri,
221 cid: record_cid,
222 author: AuthorView {
223 did: author_did,
224 handle: author_handle,
225 },
226 record,
227 indexed_at: created_at.to_rfc3339(),
228 },
229 });
230 }
231
232 (StatusCode::OK, Json(TimelineOutput { feed, cursor: None })).into_response()
233}