this repo has no description
1// Yes, I know, this endpoint is an appview one, not for PDS. Who cares!!
2// Yes, this only gets posts that our DB/instance knows about. Who cares!!!
3
4use crate::state::AppState;
5use axum::{
6 Json,
7 extract::State,
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use jacquard_repo::storage::BlockStore;
12use serde::Serialize;
13use serde_json::{Value, json};
14use tracing::error;
15
16#[derive(Serialize)]
17pub struct TimelineOutput {
18 pub feed: Vec<FeedViewPost>,
19 pub cursor: Option<String>,
20}
21
22#[derive(Serialize)]
23pub struct FeedViewPost {
24 pub post: PostView,
25}
26
27#[derive(Serialize)]
28#[serde(rename_all = "camelCase")]
29pub struct PostView {
30 pub uri: String,
31 pub cid: String,
32 pub author: AuthorView,
33 pub record: Value,
34 pub indexed_at: String,
35}
36
37#[derive(Serialize)]
38pub struct AuthorView {
39 pub did: String,
40 pub handle: String,
41}
42
43pub async fn get_timeline(
44 State(state): State<AppState>,
45 headers: axum::http::HeaderMap,
46) -> Response {
47 let auth_header = headers.get("Authorization");
48 if auth_header.is_none() {
49 return (
50 StatusCode::UNAUTHORIZED,
51 Json(json!({"error": "AuthenticationRequired"})),
52 )
53 .into_response();
54 }
55 let token = auth_header
56 .unwrap()
57 .to_str()
58 .unwrap_or("")
59 .replace("Bearer ", "");
60
61 let session = sqlx::query!(
62 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1",
63 token
64 )
65 .fetch_optional(&state.db)
66 .await
67 .unwrap_or(None);
68
69 let (did, key_bytes) = match session {
70 Some(row) => (row.did, row.key_bytes),
71 None => {
72 return (
73 StatusCode::UNAUTHORIZED,
74 Json(json!({"error": "AuthenticationFailed"})),
75 )
76 .into_response();
77 }
78 };
79
80 if crate::auth::verify_token(&token, &key_bytes).is_err() {
81 return (
82 StatusCode::UNAUTHORIZED,
83 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
84 )
85 .into_response();
86 }
87
88 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
89 .fetch_optional(&state.db)
90 .await;
91
92 let user_id = match user_query {
93 Ok(Some(row)) => row.id,
94 _ => {
95 return (
96 StatusCode::INTERNAL_SERVER_ERROR,
97 Json(json!({"error": "InternalError", "message": "User not found"})),
98 )
99 .into_response();
100 }
101 };
102
103 let follows_query = sqlx::query!(
104 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow'",
105 user_id
106 )
107 .fetch_all(&state.db)
108 .await;
109
110 let follow_cids: Vec<String> = match follows_query {
111 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(),
112 Err(e) => {
113 error!("Failed to get follows: {:?}", e);
114 return (
115 StatusCode::INTERNAL_SERVER_ERROR,
116 Json(json!({"error": "InternalError"})),
117 )
118 .into_response();
119 }
120 };
121
122 let mut followed_dids: Vec<String> = Vec::new();
123 for cid_str in follow_cids {
124 let cid = match cid_str.parse::<cid::Cid>() {
125 Ok(c) => c,
126 Err(_) => continue,
127 };
128
129 let block_bytes = match state.block_store.get(&cid).await {
130 Ok(Some(b)) => b,
131 _ => continue,
132 };
133
134 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
135 Ok(v) => v,
136 Err(_) => continue,
137 };
138
139 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) {
140 followed_dids.push(subject.to_string());
141 }
142 }
143
144 if followed_dids.is_empty() {
145 return (
146 StatusCode::OK,
147 Json(TimelineOutput {
148 feed: vec![],
149 cursor: None,
150 }),
151 )
152 .into_response();
153 }
154
155 let posts_result = sqlx::query!(
156 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle
157 FROM records r
158 JOIN repos rp ON r.repo_id = rp.user_id
159 JOIN users u ON rp.user_id = u.id
160 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post'
161 ORDER BY r.created_at DESC
162 LIMIT 50",
163 &followed_dids
164 )
165 .fetch_all(&state.db)
166 .await;
167
168 let posts = match posts_result {
169 Ok(rows) => rows,
170 Err(e) => {
171 error!("Failed to get posts: {:?}", e);
172 return (
173 StatusCode::INTERNAL_SERVER_ERROR,
174 Json(json!({"error": "InternalError"})),
175 )
176 .into_response();
177 }
178 };
179
180 let mut feed: Vec<FeedViewPost> = Vec::new();
181
182 for row in posts {
183 let record_cid: String = row.record_cid;
184 let rkey: String = row.rkey;
185 let created_at: chrono::DateTime<chrono::Utc> = row.created_at;
186 let author_did: String = row.did;
187 let author_handle: String = row.handle;
188
189 let cid = match record_cid.parse::<cid::Cid>() {
190 Ok(c) => c,
191 Err(_) => continue,
192 };
193
194 let block_bytes = match state.block_store.get(&cid).await {
195 Ok(Some(b)) => b,
196 _ => continue,
197 };
198
199 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
200 Ok(v) => v,
201 Err(_) => continue,
202 };
203
204 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey);
205
206 feed.push(FeedViewPost {
207 post: PostView {
208 uri,
209 cid: record_cid,
210 author: AuthorView {
211 did: author_did,
212 handle: author_handle,
213 },
214 record,
215 indexed_at: created_at.to_rfc3339(),
216 },
217 });
218 }
219
220 (StatusCode::OK, Json(TimelineOutput { feed, cursor: None })).into_response()
221}