this repo has no description
1use crate::api::read_after_write::{ 2 extract_repo_rev, format_local_post, format_munged_response, get_local_lag, 3 get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost, 4 PostView, 5}; 6use crate::state::AppState; 7use axum::{ 8 extract::{Query, State}, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11 Json, 12}; 13use jacquard_repo::storage::BlockStore; 14use serde::Deserialize; 15use serde_json::{json, Value}; 16use std::collections::HashMap; 17use tracing::warn; 18 19#[derive(Deserialize)] 20pub struct GetTimelineParams { 21 pub algorithm: Option<String>, 22 pub limit: Option<u32>, 23 pub cursor: Option<String>, 24} 25 26pub async fn get_timeline( 27 State(state): State<AppState>, 28 headers: axum::http::HeaderMap, 29 Query(params): Query<GetTimelineParams>, 30) -> Response { 31 let token = match crate::auth::extract_bearer_token_from_header( 32 headers.get("Authorization").and_then(|h| h.to_str().ok()), 33 ) { 34 Some(t) => t, 35 None => { 36 return ( 37 StatusCode::UNAUTHORIZED, 38 Json(json!({"error": "AuthenticationRequired"})), 39 ) 40 .into_response(); 41 } 42 }; 43 44 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 45 Ok(user) => user, 46 Err(_) => { 47 return ( 48 StatusCode::UNAUTHORIZED, 49 Json(json!({"error": "AuthenticationFailed"})), 50 ) 51 .into_response(); 52 } 53 }; 54 55 match std::env::var("APPVIEW_URL") { 56 Ok(url) if !url.starts_with("http://127.0.0.1") => { 57 return get_timeline_with_appview(&state, &headers, &params, &auth_user.did).await; 58 } 59 _ => {} 60 } 61 62 get_timeline_local_only(&state, &auth_user.did).await 63} 64 65async fn get_timeline_with_appview( 66 state: &AppState, 67 headers: &axum::http::HeaderMap, 68 params: &GetTimelineParams, 69 auth_did: &str, 70) -> Response { 71 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 72 73 let mut query_params = HashMap::new(); 74 if let Some(algo) = &params.algorithm { 75 query_params.insert("algorithm".to_string(), algo.clone()); 76 } 77 if let Some(limit) = params.limit { 78 query_params.insert("limit".to_string(), limit.to_string()); 79 } 80 if let Some(cursor) = &params.cursor { 81 query_params.insert("cursor".to_string(), cursor.clone()); 82 } 83 84 let proxy_result = 85 match proxy_to_appview("app.bsky.feed.getTimeline", &query_params, auth_header).await { 86 Ok(r) => r, 87 Err(e) => return e, 88 }; 89 90 if !proxy_result.status.is_success() { 91 return (proxy_result.status, proxy_result.body).into_response(); 92 } 93 94 let rev = extract_repo_rev(&proxy_result.headers); 95 if rev.is_none() { 96 return (proxy_result.status, proxy_result.body).into_response(); 97 } 98 let rev = rev.unwrap(); 99 100 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 101 Ok(f) => f, 102 Err(e) => { 103 warn!("Failed to parse timeline response: {:?}", e); 104 return (proxy_result.status, proxy_result.body).into_response(); 105 } 106 }; 107 108 let local_records = match get_records_since_rev(state, auth_did, &rev).await { 109 Ok(r) => r, 110 Err(e) => { 111 warn!("Failed to get local records: {}", e); 112 return (proxy_result.status, proxy_result.body).into_response(); 113 } 114 }; 115 116 if local_records.count == 0 { 117 return (proxy_result.status, proxy_result.body).into_response(); 118 } 119 120 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did) 121 .fetch_optional(&state.db) 122 .await 123 { 124 Ok(Some(h)) => h, 125 Ok(None) => auth_did.to_string(), 126 Err(e) => { 127 warn!("Database error fetching handle: {:?}", e); 128 auth_did.to_string() 129 } 130 }; 131 132 let local_posts: Vec<_> = local_records 133 .posts 134 .iter() 135 .map(|p| format_local_post(p, auth_did, &handle, local_records.profile.as_ref())) 136 .collect(); 137 138 insert_posts_into_feed(&mut feed_output.feed, local_posts); 139 140 let lag = get_local_lag(&local_records); 141 format_munged_response(feed_output, lag) 142} 143 144async fn get_timeline_local_only(state: &AppState, auth_did: &str) -> Response { 145 let user_id: uuid::Uuid = match sqlx::query_scalar!( 146 "SELECT id FROM users WHERE did = $1", 147 auth_did 148 ) 149 .fetch_optional(&state.db) 150 .await 151 { 152 Ok(Some(id)) => id, 153 Ok(None) => { 154 return ( 155 StatusCode::INTERNAL_SERVER_ERROR, 156 Json(json!({"error": "InternalError", "message": "User not found"})), 157 ) 158 .into_response(); 159 } 160 Err(e) => { 161 warn!("Database error fetching user: {:?}", e); 162 return ( 163 StatusCode::INTERNAL_SERVER_ERROR, 164 Json(json!({"error": "InternalError", "message": "Database error"})), 165 ) 166 .into_response(); 167 } 168 }; 169 170 let follows_query = sqlx::query!( 171 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000", 172 user_id 173 ) 174 .fetch_all(&state.db) 175 .await; 176 177 let follow_cids: Vec<String> = match follows_query { 178 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(), 179 Err(_) => { 180 return ( 181 StatusCode::INTERNAL_SERVER_ERROR, 182 Json(json!({"error": "InternalError"})), 183 ) 184 .into_response(); 185 } 186 }; 187 188 let mut followed_dids: Vec<String> = Vec::new(); 189 for cid_str in follow_cids { 190 let cid = match cid_str.parse::<cid::Cid>() { 191 Ok(c) => c, 192 Err(_) => continue, 193 }; 194 195 let block_bytes = match state.block_store.get(&cid).await { 196 Ok(Some(b)) => b, 197 _ => continue, 198 }; 199 200 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 201 Ok(v) => v, 202 Err(_) => continue, 203 }; 204 205 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) { 206 followed_dids.push(subject.to_string()); 207 } 208 } 209 210 if followed_dids.is_empty() { 211 return ( 212 StatusCode::OK, 213 Json(FeedOutput { 214 feed: vec![], 215 cursor: None, 216 }), 217 ) 218 .into_response(); 219 } 220 221 let posts_result = sqlx::query!( 222 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle 223 FROM records r 224 JOIN repos rp ON r.repo_id = rp.user_id 225 JOIN users u ON rp.user_id = u.id 226 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post' 227 ORDER BY r.created_at DESC 228 LIMIT 50", 229 &followed_dids 230 ) 231 .fetch_all(&state.db) 232 .await; 233 234 let posts = match posts_result { 235 Ok(rows) => rows, 236 Err(_) => { 237 return ( 238 StatusCode::INTERNAL_SERVER_ERROR, 239 Json(json!({"error": "InternalError"})), 240 ) 241 .into_response(); 242 } 243 }; 244 245 let mut feed: Vec<FeedViewPost> = Vec::new(); 246 247 for row in posts { 248 let record_cid: String = row.record_cid; 249 let rkey: String = row.rkey; 250 let created_at: chrono::DateTime<chrono::Utc> = row.created_at; 251 let author_did: String = row.did; 252 let author_handle: String = row.handle; 253 254 let cid = match record_cid.parse::<cid::Cid>() { 255 Ok(c) => c, 256 Err(_) => continue, 257 }; 258 259 let block_bytes = match state.block_store.get(&cid).await { 260 Ok(Some(b)) => b, 261 _ => continue, 262 }; 263 264 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 265 Ok(v) => v, 266 Err(_) => continue, 267 }; 268 269 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey); 270 271 feed.push(FeedViewPost { 272 post: PostView { 273 uri, 274 cid: record_cid, 275 author: crate::api::read_after_write::AuthorView { 276 did: author_did, 277 handle: author_handle, 278 display_name: None, 279 avatar: None, 280 extra: HashMap::new(), 281 }, 282 record, 283 indexed_at: created_at.to_rfc3339(), 284 embed: None, 285 reply_count: 0, 286 repost_count: 0, 287 like_count: 0, 288 quote_count: 0, 289 extra: HashMap::new(), 290 }, 291 reply: None, 292 reason: None, 293 feed_context: None, 294 extra: HashMap::new(), 295 }); 296 } 297 298 (StatusCode::OK, Json(FeedOutput { feed, cursor: None })).into_response() 299}