this repo has no description
1use crate::api::read_after_write::{
2 extract_repo_rev, format_local_post, format_munged_response, get_local_lag,
3 get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost,
4 PostView,
5};
6use crate::state::AppState;
7use axum::{
8 extract::{Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Response},
11 Json,
12};
13use jacquard_repo::storage::BlockStore;
14use serde::Deserialize;
15use serde_json::{json, Value};
16use std::collections::HashMap;
17use tracing::warn;
18
19#[derive(Deserialize)]
20pub struct GetTimelineParams {
21 pub algorithm: Option<String>,
22 pub limit: Option<u32>,
23 pub cursor: Option<String>,
24}
25
26pub async fn get_timeline(
27 State(state): State<AppState>,
28 headers: axum::http::HeaderMap,
29 Query(params): Query<GetTimelineParams>,
30) -> Response {
31 let token = match crate::auth::extract_bearer_token_from_header(
32 headers.get("Authorization").and_then(|h| h.to_str().ok()),
33 ) {
34 Some(t) => t,
35 None => {
36 return (
37 StatusCode::UNAUTHORIZED,
38 Json(json!({"error": "AuthenticationRequired"})),
39 )
40 .into_response();
41 }
42 };
43 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
44 Ok(user) => user,
45 Err(_) => {
46 return (
47 StatusCode::UNAUTHORIZED,
48 Json(json!({"error": "AuthenticationFailed"})),
49 )
50 .into_response();
51 }
52 };
53 match std::env::var("APPVIEW_URL") {
54 Ok(url) if !url.starts_with("http://127.0.0.1") => {
55 return get_timeline_with_appview(&state, &headers, ¶ms, &auth_user.did).await;
56 }
57 _ => {}
58 }
59 get_timeline_local_only(&state, &auth_user.did).await
60}
61
62async fn get_timeline_with_appview(
63 state: &AppState,
64 headers: &axum::http::HeaderMap,
65 params: &GetTimelineParams,
66 auth_did: &str,
67) -> Response {
68 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
69 let mut query_params = HashMap::new();
70 if let Some(algo) = ¶ms.algorithm {
71 query_params.insert("algorithm".to_string(), algo.clone());
72 }
73 if let Some(limit) = params.limit {
74 query_params.insert("limit".to_string(), limit.to_string());
75 }
76 if let Some(cursor) = ¶ms.cursor {
77 query_params.insert("cursor".to_string(), cursor.clone());
78 }
79 let proxy_result =
80 match proxy_to_appview("app.bsky.feed.getTimeline", &query_params, auth_header).await {
81 Ok(r) => r,
82 Err(e) => return e,
83 };
84 if !proxy_result.status.is_success() {
85 return (proxy_result.status, proxy_result.body).into_response();
86 }
87 let rev = extract_repo_rev(&proxy_result.headers);
88 if rev.is_none() {
89 return (proxy_result.status, proxy_result.body).into_response();
90 }
91 let rev = rev.unwrap();
92 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) {
93 Ok(f) => f,
94 Err(e) => {
95 warn!("Failed to parse timeline response: {:?}", e);
96 return (proxy_result.status, proxy_result.body).into_response();
97 }
98 };
99 let local_records = match get_records_since_rev(state, auth_did, &rev).await {
100 Ok(r) => r,
101 Err(e) => {
102 warn!("Failed to get local records: {}", e);
103 return (proxy_result.status, proxy_result.body).into_response();
104 }
105 };
106 if local_records.count == 0 {
107 return (proxy_result.status, proxy_result.body).into_response();
108 }
109 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did)
110 .fetch_optional(&state.db)
111 .await
112 {
113 Ok(Some(h)) => h,
114 Ok(None) => auth_did.to_string(),
115 Err(e) => {
116 warn!("Database error fetching handle: {:?}", e);
117 auth_did.to_string()
118 }
119 };
120 let local_posts: Vec<_> = local_records
121 .posts
122 .iter()
123 .map(|p| format_local_post(p, auth_did, &handle, local_records.profile.as_ref()))
124 .collect();
125 insert_posts_into_feed(&mut feed_output.feed, local_posts);
126 let lag = get_local_lag(&local_records);
127 format_munged_response(feed_output, lag)
128}
129
130async fn get_timeline_local_only(state: &AppState, auth_did: &str) -> Response {
131 let user_id: uuid::Uuid = match sqlx::query_scalar!(
132 "SELECT id FROM users WHERE did = $1",
133 auth_did
134 )
135 .fetch_optional(&state.db)
136 .await
137 {
138 Ok(Some(id)) => id,
139 Ok(None) => {
140 return (
141 StatusCode::INTERNAL_SERVER_ERROR,
142 Json(json!({"error": "InternalError", "message": "User not found"})),
143 )
144 .into_response();
145 }
146 Err(e) => {
147 warn!("Database error fetching user: {:?}", e);
148 return (
149 StatusCode::INTERNAL_SERVER_ERROR,
150 Json(json!({"error": "InternalError", "message": "Database error"})),
151 )
152 .into_response();
153 }
154 };
155 let follows_query = sqlx::query!(
156 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000",
157 user_id
158 )
159 .fetch_all(&state.db)
160 .await;
161 let follow_cids: Vec<String> = match follows_query {
162 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(),
163 Err(_) => {
164 return (
165 StatusCode::INTERNAL_SERVER_ERROR,
166 Json(json!({"error": "InternalError"})),
167 )
168 .into_response();
169 }
170 };
171 let mut followed_dids: Vec<String> = Vec::new();
172 for cid_str in follow_cids {
173 let cid = match cid_str.parse::<cid::Cid>() {
174 Ok(c) => c,
175 Err(_) => continue,
176 };
177 let block_bytes = match state.block_store.get(&cid).await {
178 Ok(Some(b)) => b,
179 _ => continue,
180 };
181 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
182 Ok(v) => v,
183 Err(_) => continue,
184 };
185 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) {
186 followed_dids.push(subject.to_string());
187 }
188 }
189 if followed_dids.is_empty() {
190 return (
191 StatusCode::OK,
192 Json(FeedOutput {
193 feed: vec![],
194 cursor: None,
195 }),
196 )
197 .into_response();
198 }
199 let posts_result = sqlx::query!(
200 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle
201 FROM records r
202 JOIN repos rp ON r.repo_id = rp.user_id
203 JOIN users u ON rp.user_id = u.id
204 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post'
205 ORDER BY r.created_at DESC
206 LIMIT 50",
207 &followed_dids
208 )
209 .fetch_all(&state.db)
210 .await;
211 let posts = match posts_result {
212 Ok(rows) => rows,
213 Err(_) => {
214 return (
215 StatusCode::INTERNAL_SERVER_ERROR,
216 Json(json!({"error": "InternalError"})),
217 )
218 .into_response();
219 }
220 };
221 let mut feed: Vec<FeedViewPost> = Vec::new();
222 for row in posts {
223 let record_cid: String = row.record_cid;
224 let rkey: String = row.rkey;
225 let created_at: chrono::DateTime<chrono::Utc> = row.created_at;
226 let author_did: String = row.did;
227 let author_handle: String = row.handle;
228 let cid = match record_cid.parse::<cid::Cid>() {
229 Ok(c) => c,
230 Err(_) => continue,
231 };
232 let block_bytes = match state.block_store.get(&cid).await {
233 Ok(Some(b)) => b,
234 _ => continue,
235 };
236 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
237 Ok(v) => v,
238 Err(_) => continue,
239 };
240 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey);
241 feed.push(FeedViewPost {
242 post: PostView {
243 uri,
244 cid: record_cid,
245 author: crate::api::read_after_write::AuthorView {
246 did: author_did,
247 handle: author_handle,
248 display_name: None,
249 avatar: None,
250 extra: HashMap::new(),
251 },
252 record,
253 indexed_at: created_at.to_rfc3339(),
254 embed: None,
255 reply_count: 0,
256 repost_count: 0,
257 like_count: 0,
258 quote_count: 0,
259 extra: HashMap::new(),
260 },
261 reply: None,
262 reason: None,
263 feed_context: None,
264 extra: HashMap::new(),
265 });
266 }
267 (StatusCode::OK, Json(FeedOutput { feed, cursor: None })).into_response()
268}