this repo has no description
1use crate::api::read_after_write::{
2 FeedOutput, FeedViewPost, PostView, extract_repo_rev, format_local_post,
3 format_munged_response, get_local_lag, get_records_since_rev, insert_posts_into_feed,
4 proxy_to_appview_via_registry,
5};
6use crate::state::AppState;
7use axum::{
8 Json,
9 extract::{Query, State},
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use jacquard_repo::storage::BlockStore;
14use serde::Deserialize;
15use serde_json::{Value, json};
16use std::collections::HashMap;
17use tracing::warn;
18
19#[derive(Deserialize)]
20pub struct GetTimelineParams {
21 pub algorithm: Option<String>,
22 pub limit: Option<u32>,
23 pub cursor: Option<String>,
24}
25
26pub async fn get_timeline(
27 State(state): State<AppState>,
28 headers: axum::http::HeaderMap,
29 Query(params): Query<GetTimelineParams>,
30) -> Response {
31 let token = match crate::auth::extract_bearer_token_from_header(
32 headers.get("Authorization").and_then(|h| h.to_str().ok()),
33 ) {
34 Some(t) => t,
35 None => {
36 return (
37 StatusCode::UNAUTHORIZED,
38 Json(json!({"error": "AuthenticationRequired"})),
39 )
40 .into_response();
41 }
42 };
43 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
44 Ok(user) => user,
45 Err(_) => {
46 return (
47 StatusCode::UNAUTHORIZED,
48 Json(json!({"error": "AuthenticationFailed"})),
49 )
50 .into_response();
51 }
52 };
53 if state.appview_registry.get_appview_for_method("app.bsky.feed.getTimeline").await.is_some() {
54 return get_timeline_with_appview(
55 &state,
56 ¶ms,
57 &auth_user.did,
58 auth_user.key_bytes.as_deref(),
59 )
60 .await;
61 }
62 get_timeline_local_only(&state, &auth_user.did).await
63}
64
65async fn get_timeline_with_appview(
66 state: &AppState,
67 params: &GetTimelineParams,
68 auth_did: &str,
69 auth_key_bytes: Option<&[u8]>,
70) -> Response {
71 let mut query_params = HashMap::new();
72 if let Some(algo) = ¶ms.algorithm {
73 query_params.insert("algorithm".to_string(), algo.clone());
74 }
75 if let Some(limit) = params.limit {
76 query_params.insert("limit".to_string(), limit.to_string());
77 }
78 if let Some(cursor) = ¶ms.cursor {
79 query_params.insert("cursor".to_string(), cursor.clone());
80 }
81 let proxy_result = match proxy_to_appview_via_registry(
82 state,
83 "app.bsky.feed.getTimeline",
84 &query_params,
85 auth_did,
86 auth_key_bytes,
87 )
88 .await
89 {
90 Ok(r) => r,
91 Err(e) => return e,
92 };
93 if !proxy_result.status.is_success() {
94 return proxy_result.into_response();
95 }
96 let rev = extract_repo_rev(&proxy_result.headers);
97 if rev.is_none() {
98 return proxy_result.into_response();
99 }
100 let rev = rev.unwrap();
101 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) {
102 Ok(f) => f,
103 Err(e) => {
104 warn!("Failed to parse timeline response: {:?}", e);
105 return proxy_result.into_response();
106 }
107 };
108 let local_records = match get_records_since_rev(state, auth_did, &rev).await {
109 Ok(r) => r,
110 Err(e) => {
111 warn!("Failed to get local records: {}", e);
112 return proxy_result.into_response();
113 }
114 };
115 if local_records.count == 0 {
116 return proxy_result.into_response();
117 }
118 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did)
119 .fetch_optional(&state.db)
120 .await
121 {
122 Ok(Some(h)) => h,
123 Ok(None) => auth_did.to_string(),
124 Err(e) => {
125 warn!("Database error fetching handle: {:?}", e);
126 auth_did.to_string()
127 }
128 };
129 let local_posts: Vec<_> = local_records
130 .posts
131 .iter()
132 .map(|p| format_local_post(p, auth_did, &handle, local_records.profile.as_ref()))
133 .collect();
134 insert_posts_into_feed(&mut feed_output.feed, local_posts);
135 let lag = get_local_lag(&local_records);
136 format_munged_response(feed_output, lag)
137}
138
139async fn get_timeline_local_only(state: &AppState, auth_did: &str) -> Response {
140 let user_id: uuid::Uuid =
141 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_did)
142 .fetch_optional(&state.db)
143 .await
144 {
145 Ok(Some(id)) => id,
146 Ok(None) => {
147 return (
148 StatusCode::INTERNAL_SERVER_ERROR,
149 Json(json!({"error": "InternalError", "message": "User not found"})),
150 )
151 .into_response();
152 }
153 Err(e) => {
154 warn!("Database error fetching user: {:?}", e);
155 return (
156 StatusCode::INTERNAL_SERVER_ERROR,
157 Json(json!({"error": "InternalError", "message": "Database error"})),
158 )
159 .into_response();
160 }
161 };
162 let follows_query = sqlx::query!(
163 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000",
164 user_id
165 )
166 .fetch_all(&state.db)
167 .await;
168 let follow_cids: Vec<String> = match follows_query {
169 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(),
170 Err(_) => {
171 return (
172 StatusCode::INTERNAL_SERVER_ERROR,
173 Json(json!({"error": "InternalError"})),
174 )
175 .into_response();
176 }
177 };
178 let mut followed_dids: Vec<String> = Vec::new();
179 for cid_str in follow_cids {
180 let cid = match cid_str.parse::<cid::Cid>() {
181 Ok(c) => c,
182 Err(_) => continue,
183 };
184 let block_bytes = match state.block_store.get(&cid).await {
185 Ok(Some(b)) => b,
186 _ => continue,
187 };
188 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
189 Ok(v) => v,
190 Err(_) => continue,
191 };
192 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) {
193 followed_dids.push(subject.to_string());
194 }
195 }
196 if followed_dids.is_empty() {
197 return (
198 StatusCode::OK,
199 Json(FeedOutput {
200 feed: vec![],
201 cursor: None,
202 }),
203 )
204 .into_response();
205 }
206 let posts_result = sqlx::query!(
207 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle
208 FROM records r
209 JOIN repos rp ON r.repo_id = rp.user_id
210 JOIN users u ON rp.user_id = u.id
211 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post'
212 ORDER BY r.created_at DESC
213 LIMIT 50",
214 &followed_dids
215 )
216 .fetch_all(&state.db)
217 .await;
218 let posts = match posts_result {
219 Ok(rows) => rows,
220 Err(_) => {
221 return (
222 StatusCode::INTERNAL_SERVER_ERROR,
223 Json(json!({"error": "InternalError"})),
224 )
225 .into_response();
226 }
227 };
228 let mut feed: Vec<FeedViewPost> = Vec::new();
229 for row in posts {
230 let record_cid: String = row.record_cid;
231 let rkey: String = row.rkey;
232 let created_at: chrono::DateTime<chrono::Utc> = row.created_at;
233 let author_did: String = row.did;
234 let author_handle: String = row.handle;
235 let cid = match record_cid.parse::<cid::Cid>() {
236 Ok(c) => c,
237 Err(_) => continue,
238 };
239 let block_bytes = match state.block_store.get(&cid).await {
240 Ok(Some(b)) => b,
241 _ => continue,
242 };
243 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
244 Ok(v) => v,
245 Err(_) => continue,
246 };
247 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey);
248 feed.push(FeedViewPost {
249 post: PostView {
250 uri,
251 cid: record_cid,
252 author: crate::api::read_after_write::AuthorView {
253 did: author_did,
254 handle: author_handle,
255 display_name: None,
256 avatar: None,
257 extra: HashMap::new(),
258 },
259 record,
260 indexed_at: created_at.to_rfc3339(),
261 embed: None,
262 reply_count: 0,
263 repost_count: 0,
264 like_count: 0,
265 quote_count: 0,
266 extra: HashMap::new(),
267 },
268 reply: None,
269 reason: None,
270 feed_context: None,
271 extra: HashMap::new(),
272 });
273 }
274 (StatusCode::OK, Json(FeedOutput { feed, cursor: None })).into_response()
275}