this repo has no description
1use crate::api::read_after_write::{
2 FeedOutput, FeedViewPost, PostView, extract_repo_rev, format_local_post,
3 format_munged_response, get_local_lag, get_records_since_rev, insert_posts_into_feed,
4 proxy_to_appview,
5};
6use crate::state::AppState;
7use axum::{
8 Json,
9 extract::{Query, State},
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use jacquard_repo::storage::BlockStore;
14use serde::Deserialize;
15use serde_json::{Value, json};
16use std::collections::HashMap;
17use tracing::warn;
18
19#[derive(Deserialize)]
20pub struct GetTimelineParams {
21 pub algorithm: Option<String>,
22 pub limit: Option<u32>,
23 pub cursor: Option<String>,
24}
25
26pub async fn get_timeline(
27 State(state): State<AppState>,
28 headers: axum::http::HeaderMap,
29 Query(params): Query<GetTimelineParams>,
30) -> Response {
31 let token = match crate::auth::extract_bearer_token_from_header(
32 headers.get("Authorization").and_then(|h| h.to_str().ok()),
33 ) {
34 Some(t) => t,
35 None => {
36 return (
37 StatusCode::UNAUTHORIZED,
38 Json(json!({"error": "AuthenticationRequired"})),
39 )
40 .into_response();
41 }
42 };
43 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
44 Ok(user) => user,
45 Err(_) => {
46 return (
47 StatusCode::UNAUTHORIZED,
48 Json(json!({"error": "AuthenticationFailed"})),
49 )
50 .into_response();
51 }
52 };
53 match std::env::var("APPVIEW_URL") {
54 Ok(url) if !url.starts_with("http://127.0.0.1") => {
55 return get_timeline_with_appview(
56 &state,
57 ¶ms,
58 &auth_user.did,
59 auth_user.key_bytes.as_deref(),
60 )
61 .await;
62 }
63 _ => {}
64 }
65 get_timeline_local_only(&state, &auth_user.did).await
66}
67
68async fn get_timeline_with_appview(
69 state: &AppState,
70 params: &GetTimelineParams,
71 auth_did: &str,
72 auth_key_bytes: Option<&[u8]>,
73) -> Response {
74 let mut query_params = HashMap::new();
75 if let Some(algo) = ¶ms.algorithm {
76 query_params.insert("algorithm".to_string(), algo.clone());
77 }
78 if let Some(limit) = params.limit {
79 query_params.insert("limit".to_string(), limit.to_string());
80 }
81 if let Some(cursor) = ¶ms.cursor {
82 query_params.insert("cursor".to_string(), cursor.clone());
83 }
84 let proxy_result = match proxy_to_appview(
85 "app.bsky.feed.getTimeline",
86 &query_params,
87 auth_did,
88 auth_key_bytes,
89 )
90 .await
91 {
92 Ok(r) => r,
93 Err(e) => return e,
94 };
95 if !proxy_result.status.is_success() {
96 return proxy_result.into_response();
97 }
98 let rev = extract_repo_rev(&proxy_result.headers);
99 if rev.is_none() {
100 return proxy_result.into_response();
101 }
102 let rev = rev.unwrap();
103 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) {
104 Ok(f) => f,
105 Err(e) => {
106 warn!("Failed to parse timeline response: {:?}", e);
107 return proxy_result.into_response();
108 }
109 };
110 let local_records = match get_records_since_rev(state, auth_did, &rev).await {
111 Ok(r) => r,
112 Err(e) => {
113 warn!("Failed to get local records: {}", e);
114 return proxy_result.into_response();
115 }
116 };
117 if local_records.count == 0 {
118 return proxy_result.into_response();
119 }
120 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did)
121 .fetch_optional(&state.db)
122 .await
123 {
124 Ok(Some(h)) => h,
125 Ok(None) => auth_did.to_string(),
126 Err(e) => {
127 warn!("Database error fetching handle: {:?}", e);
128 auth_did.to_string()
129 }
130 };
131 let local_posts: Vec<_> = local_records
132 .posts
133 .iter()
134 .map(|p| format_local_post(p, auth_did, &handle, local_records.profile.as_ref()))
135 .collect();
136 insert_posts_into_feed(&mut feed_output.feed, local_posts);
137 let lag = get_local_lag(&local_records);
138 format_munged_response(feed_output, lag)
139}
140
141async fn get_timeline_local_only(state: &AppState, auth_did: &str) -> Response {
142 let user_id: uuid::Uuid =
143 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_did)
144 .fetch_optional(&state.db)
145 .await
146 {
147 Ok(Some(id)) => id,
148 Ok(None) => {
149 return (
150 StatusCode::INTERNAL_SERVER_ERROR,
151 Json(json!({"error": "InternalError", "message": "User not found"})),
152 )
153 .into_response();
154 }
155 Err(e) => {
156 warn!("Database error fetching user: {:?}", e);
157 return (
158 StatusCode::INTERNAL_SERVER_ERROR,
159 Json(json!({"error": "InternalError", "message": "Database error"})),
160 )
161 .into_response();
162 }
163 };
164 let follows_query = sqlx::query!(
165 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000",
166 user_id
167 )
168 .fetch_all(&state.db)
169 .await;
170 let follow_cids: Vec<String> = match follows_query {
171 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(),
172 Err(_) => {
173 return (
174 StatusCode::INTERNAL_SERVER_ERROR,
175 Json(json!({"error": "InternalError"})),
176 )
177 .into_response();
178 }
179 };
180 let mut followed_dids: Vec<String> = Vec::new();
181 for cid_str in follow_cids {
182 let cid = match cid_str.parse::<cid::Cid>() {
183 Ok(c) => c,
184 Err(_) => continue,
185 };
186 let block_bytes = match state.block_store.get(&cid).await {
187 Ok(Some(b)) => b,
188 _ => continue,
189 };
190 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
191 Ok(v) => v,
192 Err(_) => continue,
193 };
194 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) {
195 followed_dids.push(subject.to_string());
196 }
197 }
198 if followed_dids.is_empty() {
199 return (
200 StatusCode::OK,
201 Json(FeedOutput {
202 feed: vec![],
203 cursor: None,
204 }),
205 )
206 .into_response();
207 }
208 let posts_result = sqlx::query!(
209 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle
210 FROM records r
211 JOIN repos rp ON r.repo_id = rp.user_id
212 JOIN users u ON rp.user_id = u.id
213 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post'
214 ORDER BY r.created_at DESC
215 LIMIT 50",
216 &followed_dids
217 )
218 .fetch_all(&state.db)
219 .await;
220 let posts = match posts_result {
221 Ok(rows) => rows,
222 Err(_) => {
223 return (
224 StatusCode::INTERNAL_SERVER_ERROR,
225 Json(json!({"error": "InternalError"})),
226 )
227 .into_response();
228 }
229 };
230 let mut feed: Vec<FeedViewPost> = Vec::new();
231 for row in posts {
232 let record_cid: String = row.record_cid;
233 let rkey: String = row.rkey;
234 let created_at: chrono::DateTime<chrono::Utc> = row.created_at;
235 let author_did: String = row.did;
236 let author_handle: String = row.handle;
237 let cid = match record_cid.parse::<cid::Cid>() {
238 Ok(c) => c,
239 Err(_) => continue,
240 };
241 let block_bytes = match state.block_store.get(&cid).await {
242 Ok(Some(b)) => b,
243 _ => continue,
244 };
245 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
246 Ok(v) => v,
247 Err(_) => continue,
248 };
249 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey);
250 feed.push(FeedViewPost {
251 post: PostView {
252 uri,
253 cid: record_cid,
254 author: crate::api::read_after_write::AuthorView {
255 did: author_did,
256 handle: author_handle,
257 display_name: None,
258 avatar: None,
259 extra: HashMap::new(),
260 },
261 record,
262 indexed_at: created_at.to_rfc3339(),
263 embed: None,
264 reply_count: 0,
265 repost_count: 0,
266 like_count: 0,
267 quote_count: 0,
268 extra: HashMap::new(),
269 },
270 reply: None,
271 reason: None,
272 feed_context: None,
273 extra: HashMap::new(),
274 });
275 }
276 (StatusCode::OK, Json(FeedOutput { feed, cursor: None })).into_response()
277}