this repo has no description
1use crate::api::read_after_write::{
2 extract_repo_rev, format_local_post, format_munged_response, get_local_lag,
3 get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost,
4 PostView,
5};
6use crate::state::AppState;
7use axum::{
8 extract::{Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Response},
11 Json,
12};
13use jacquard_repo::storage::BlockStore;
14use serde::Deserialize;
15use serde_json::{json, Value};
16use std::collections::HashMap;
17use tracing::warn;
18
19#[derive(Deserialize)]
20pub struct GetTimelineParams {
21 pub algorithm: Option<String>,
22 pub limit: Option<u32>,
23 pub cursor: Option<String>,
24}
25
26pub async fn get_timeline(
27 State(state): State<AppState>,
28 headers: axum::http::HeaderMap,
29 Query(params): Query<GetTimelineParams>,
30) -> Response {
31 let token = match crate::auth::extract_bearer_token_from_header(
32 headers.get("Authorization").and_then(|h| h.to_str().ok()),
33 ) {
34 Some(t) => t,
35 None => {
36 return (
37 StatusCode::UNAUTHORIZED,
38 Json(json!({"error": "AuthenticationRequired"})),
39 )
40 .into_response();
41 }
42 };
43
44 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
45 Ok(user) => user,
46 Err(_) => {
47 return (
48 StatusCode::UNAUTHORIZED,
49 Json(json!({"error": "AuthenticationFailed"})),
50 )
51 .into_response();
52 }
53 };
54
55 match std::env::var("APPVIEW_URL") {
56 Ok(url) if !url.starts_with("http://127.0.0.1") => {
57 return get_timeline_with_appview(&state, &headers, ¶ms, &auth_user.did).await;
58 }
59 _ => {}
60 }
61
62 get_timeline_local_only(&state, &auth_user.did).await
63}
64
65async fn get_timeline_with_appview(
66 state: &AppState,
67 headers: &axum::http::HeaderMap,
68 params: &GetTimelineParams,
69 auth_did: &str,
70) -> Response {
71 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
72
73 let mut query_params = HashMap::new();
74 if let Some(algo) = ¶ms.algorithm {
75 query_params.insert("algorithm".to_string(), algo.clone());
76 }
77 if let Some(limit) = params.limit {
78 query_params.insert("limit".to_string(), limit.to_string());
79 }
80 if let Some(cursor) = ¶ms.cursor {
81 query_params.insert("cursor".to_string(), cursor.clone());
82 }
83
84 let proxy_result =
85 match proxy_to_appview("app.bsky.feed.getTimeline", &query_params, auth_header).await {
86 Ok(r) => r,
87 Err(e) => return e,
88 };
89
90 if !proxy_result.status.is_success() {
91 return (proxy_result.status, proxy_result.body).into_response();
92 }
93
94 let rev = extract_repo_rev(&proxy_result.headers);
95 if rev.is_none() {
96 return (proxy_result.status, proxy_result.body).into_response();
97 }
98 let rev = rev.unwrap();
99
100 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) {
101 Ok(f) => f,
102 Err(e) => {
103 warn!("Failed to parse timeline response: {:?}", e);
104 return (proxy_result.status, proxy_result.body).into_response();
105 }
106 };
107
108 let local_records = match get_records_since_rev(state, auth_did, &rev).await {
109 Ok(r) => r,
110 Err(e) => {
111 warn!("Failed to get local records: {}", e);
112 return (proxy_result.status, proxy_result.body).into_response();
113 }
114 };
115
116 if local_records.count == 0 {
117 return (proxy_result.status, proxy_result.body).into_response();
118 }
119
120 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did)
121 .fetch_optional(&state.db)
122 .await
123 {
124 Ok(Some(h)) => h,
125 Ok(None) => auth_did.to_string(),
126 Err(e) => {
127 warn!("Database error fetching handle: {:?}", e);
128 auth_did.to_string()
129 }
130 };
131
132 let local_posts: Vec<_> = local_records
133 .posts
134 .iter()
135 .map(|p| format_local_post(p, auth_did, &handle, local_records.profile.as_ref()))
136 .collect();
137
138 insert_posts_into_feed(&mut feed_output.feed, local_posts);
139
140 let lag = get_local_lag(&local_records);
141 format_munged_response(feed_output, lag)
142}
143
144async fn get_timeline_local_only(state: &AppState, auth_did: &str) -> Response {
145 let user_id: uuid::Uuid = match sqlx::query_scalar!(
146 "SELECT id FROM users WHERE did = $1",
147 auth_did
148 )
149 .fetch_optional(&state.db)
150 .await
151 {
152 Ok(Some(id)) => id,
153 Ok(None) => {
154 return (
155 StatusCode::INTERNAL_SERVER_ERROR,
156 Json(json!({"error": "InternalError", "message": "User not found"})),
157 )
158 .into_response();
159 }
160 Err(e) => {
161 warn!("Database error fetching user: {:?}", e);
162 return (
163 StatusCode::INTERNAL_SERVER_ERROR,
164 Json(json!({"error": "InternalError", "message": "Database error"})),
165 )
166 .into_response();
167 }
168 };
169
170 let follows_query = sqlx::query!(
171 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000",
172 user_id
173 )
174 .fetch_all(&state.db)
175 .await;
176
177 let follow_cids: Vec<String> = match follows_query {
178 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(),
179 Err(_) => {
180 return (
181 StatusCode::INTERNAL_SERVER_ERROR,
182 Json(json!({"error": "InternalError"})),
183 )
184 .into_response();
185 }
186 };
187
188 let mut followed_dids: Vec<String> = Vec::new();
189 for cid_str in follow_cids {
190 let cid = match cid_str.parse::<cid::Cid>() {
191 Ok(c) => c,
192 Err(_) => continue,
193 };
194
195 let block_bytes = match state.block_store.get(&cid).await {
196 Ok(Some(b)) => b,
197 _ => continue,
198 };
199
200 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
201 Ok(v) => v,
202 Err(_) => continue,
203 };
204
205 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) {
206 followed_dids.push(subject.to_string());
207 }
208 }
209
210 if followed_dids.is_empty() {
211 return (
212 StatusCode::OK,
213 Json(FeedOutput {
214 feed: vec![],
215 cursor: None,
216 }),
217 )
218 .into_response();
219 }
220
221 let posts_result = sqlx::query!(
222 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle
223 FROM records r
224 JOIN repos rp ON r.repo_id = rp.user_id
225 JOIN users u ON rp.user_id = u.id
226 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post'
227 ORDER BY r.created_at DESC
228 LIMIT 50",
229 &followed_dids
230 )
231 .fetch_all(&state.db)
232 .await;
233
234 let posts = match posts_result {
235 Ok(rows) => rows,
236 Err(_) => {
237 return (
238 StatusCode::INTERNAL_SERVER_ERROR,
239 Json(json!({"error": "InternalError"})),
240 )
241 .into_response();
242 }
243 };
244
245 let mut feed: Vec<FeedViewPost> = Vec::new();
246
247 for row in posts {
248 let record_cid: String = row.record_cid;
249 let rkey: String = row.rkey;
250 let created_at: chrono::DateTime<chrono::Utc> = row.created_at;
251 let author_did: String = row.did;
252 let author_handle: String = row.handle;
253
254 let cid = match record_cid.parse::<cid::Cid>() {
255 Ok(c) => c,
256 Err(_) => continue,
257 };
258
259 let block_bytes = match state.block_store.get(&cid).await {
260 Ok(Some(b)) => b,
261 _ => continue,
262 };
263
264 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
265 Ok(v) => v,
266 Err(_) => continue,
267 };
268
269 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey);
270
271 feed.push(FeedViewPost {
272 post: PostView {
273 uri,
274 cid: record_cid,
275 author: crate::api::read_after_write::AuthorView {
276 did: author_did,
277 handle: author_handle,
278 display_name: None,
279 avatar: None,
280 extra: HashMap::new(),
281 },
282 record,
283 indexed_at: created_at.to_rfc3339(),
284 embed: None,
285 reply_count: 0,
286 repost_count: 0,
287 like_count: 0,
288 quote_count: 0,
289 extra: HashMap::new(),
290 },
291 reply: None,
292 reason: None,
293 feed_context: None,
294 extra: HashMap::new(),
295 });
296 }
297
298 (StatusCode::OK, Json(FeedOutput { feed, cursor: None })).into_response()
299}