this repo has no description
1use crate::api::read_after_write::{
2 extract_repo_rev, format_local_post, format_munged_response, get_local_lag,
3 get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost,
4 PostView,
5};
6use crate::state::AppState;
7use axum::{
8 extract::{Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Response},
11 Json,
12};
13use jacquard_repo::storage::BlockStore;
14use serde::Deserialize;
15use serde_json::{json, Value};
16use std::collections::HashMap;
17use tracing::warn;
18#[derive(Deserialize)]
19pub struct GetTimelineParams {
20 pub algorithm: Option<String>,
21 pub limit: Option<u32>,
22 pub cursor: Option<String>,
23}
24pub async fn get_timeline(
25 State(state): State<AppState>,
26 headers: axum::http::HeaderMap,
27 Query(params): Query<GetTimelineParams>,
28) -> Response {
29 let token = match crate::auth::extract_bearer_token_from_header(
30 headers.get("Authorization").and_then(|h| h.to_str().ok()),
31 ) {
32 Some(t) => t,
33 None => {
34 return (
35 StatusCode::UNAUTHORIZED,
36 Json(json!({"error": "AuthenticationRequired"})),
37 )
38 .into_response();
39 }
40 };
41 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
42 Ok(user) => user,
43 Err(_) => {
44 return (
45 StatusCode::UNAUTHORIZED,
46 Json(json!({"error": "AuthenticationFailed"})),
47 )
48 .into_response();
49 }
50 };
51 match std::env::var("APPVIEW_URL") {
52 Ok(url) if !url.starts_with("http://127.0.0.1") => {
53 return get_timeline_with_appview(&state, &headers, ¶ms, &auth_user.did).await;
54 }
55 _ => {}
56 }
57 get_timeline_local_only(&state, &auth_user.did).await
58}
59async fn get_timeline_with_appview(
60 state: &AppState,
61 headers: &axum::http::HeaderMap,
62 params: &GetTimelineParams,
63 auth_did: &str,
64) -> Response {
65 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
66 let mut query_params = HashMap::new();
67 if let Some(algo) = ¶ms.algorithm {
68 query_params.insert("algorithm".to_string(), algo.clone());
69 }
70 if let Some(limit) = params.limit {
71 query_params.insert("limit".to_string(), limit.to_string());
72 }
73 if let Some(cursor) = ¶ms.cursor {
74 query_params.insert("cursor".to_string(), cursor.clone());
75 }
76 let proxy_result =
77 match proxy_to_appview("app.bsky.feed.getTimeline", &query_params, auth_header).await {
78 Ok(r) => r,
79 Err(e) => return e,
80 };
81 if !proxy_result.status.is_success() {
82 return (proxy_result.status, proxy_result.body).into_response();
83 }
84 let rev = extract_repo_rev(&proxy_result.headers);
85 if rev.is_none() {
86 return (proxy_result.status, proxy_result.body).into_response();
87 }
88 let rev = rev.unwrap();
89 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) {
90 Ok(f) => f,
91 Err(e) => {
92 warn!("Failed to parse timeline response: {:?}", e);
93 return (proxy_result.status, proxy_result.body).into_response();
94 }
95 };
96 let local_records = match get_records_since_rev(state, auth_did, &rev).await {
97 Ok(r) => r,
98 Err(e) => {
99 warn!("Failed to get local records: {}", e);
100 return (proxy_result.status, proxy_result.body).into_response();
101 }
102 };
103 if local_records.count == 0 {
104 return (proxy_result.status, proxy_result.body).into_response();
105 }
106 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did)
107 .fetch_optional(&state.db)
108 .await
109 {
110 Ok(Some(h)) => h,
111 Ok(None) => auth_did.to_string(),
112 Err(e) => {
113 warn!("Database error fetching handle: {:?}", e);
114 auth_did.to_string()
115 }
116 };
117 let local_posts: Vec<_> = local_records
118 .posts
119 .iter()
120 .map(|p| format_local_post(p, auth_did, &handle, local_records.profile.as_ref()))
121 .collect();
122 insert_posts_into_feed(&mut feed_output.feed, local_posts);
123 let lag = get_local_lag(&local_records);
124 format_munged_response(feed_output, lag)
125}
126async fn get_timeline_local_only(state: &AppState, auth_did: &str) -> Response {
127 let user_id: uuid::Uuid = match sqlx::query_scalar!(
128 "SELECT id FROM users WHERE did = $1",
129 auth_did
130 )
131 .fetch_optional(&state.db)
132 .await
133 {
134 Ok(Some(id)) => id,
135 Ok(None) => {
136 return (
137 StatusCode::INTERNAL_SERVER_ERROR,
138 Json(json!({"error": "InternalError", "message": "User not found"})),
139 )
140 .into_response();
141 }
142 Err(e) => {
143 warn!("Database error fetching user: {:?}", e);
144 return (
145 StatusCode::INTERNAL_SERVER_ERROR,
146 Json(json!({"error": "InternalError", "message": "Database error"})),
147 )
148 .into_response();
149 }
150 };
151 let follows_query = sqlx::query!(
152 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000",
153 user_id
154 )
155 .fetch_all(&state.db)
156 .await;
157 let follow_cids: Vec<String> = match follows_query {
158 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(),
159 Err(_) => {
160 return (
161 StatusCode::INTERNAL_SERVER_ERROR,
162 Json(json!({"error": "InternalError"})),
163 )
164 .into_response();
165 }
166 };
167 let mut followed_dids: Vec<String> = Vec::new();
168 for cid_str in follow_cids {
169 let cid = match cid_str.parse::<cid::Cid>() {
170 Ok(c) => c,
171 Err(_) => continue,
172 };
173 let block_bytes = match state.block_store.get(&cid).await {
174 Ok(Some(b)) => b,
175 _ => continue,
176 };
177 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
178 Ok(v) => v,
179 Err(_) => continue,
180 };
181 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) {
182 followed_dids.push(subject.to_string());
183 }
184 }
185 if followed_dids.is_empty() {
186 return (
187 StatusCode::OK,
188 Json(FeedOutput {
189 feed: vec![],
190 cursor: None,
191 }),
192 )
193 .into_response();
194 }
195 let posts_result = sqlx::query!(
196 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle
197 FROM records r
198 JOIN repos rp ON r.repo_id = rp.user_id
199 JOIN users u ON rp.user_id = u.id
200 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post'
201 ORDER BY r.created_at DESC
202 LIMIT 50",
203 &followed_dids
204 )
205 .fetch_all(&state.db)
206 .await;
207 let posts = match posts_result {
208 Ok(rows) => rows,
209 Err(_) => {
210 return (
211 StatusCode::INTERNAL_SERVER_ERROR,
212 Json(json!({"error": "InternalError"})),
213 )
214 .into_response();
215 }
216 };
217 let mut feed: Vec<FeedViewPost> = Vec::new();
218 for row in posts {
219 let record_cid: String = row.record_cid;
220 let rkey: String = row.rkey;
221 let created_at: chrono::DateTime<chrono::Utc> = row.created_at;
222 let author_did: String = row.did;
223 let author_handle: String = row.handle;
224 let cid = match record_cid.parse::<cid::Cid>() {
225 Ok(c) => c,
226 Err(_) => continue,
227 };
228 let block_bytes = match state.block_store.get(&cid).await {
229 Ok(Some(b)) => b,
230 _ => continue,
231 };
232 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) {
233 Ok(v) => v,
234 Err(_) => continue,
235 };
236 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey);
237 feed.push(FeedViewPost {
238 post: PostView {
239 uri,
240 cid: record_cid,
241 author: crate::api::read_after_write::AuthorView {
242 did: author_did,
243 handle: author_handle,
244 display_name: None,
245 avatar: None,
246 extra: HashMap::new(),
247 },
248 record,
249 indexed_at: created_at.to_rfc3339(),
250 embed: None,
251 reply_count: 0,
252 repost_count: 0,
253 like_count: 0,
254 quote_count: 0,
255 extra: HashMap::new(),
256 },
257 reply: None,
258 reason: None,
259 feed_context: None,
260 extra: HashMap::new(),
261 });
262 }
263 (StatusCode::OK, Json(FeedOutput { feed, cursor: None })).into_response()
264}