this repo has no description
1use crate::api::read_after_write::{ 2 extract_repo_rev, format_local_post, format_munged_response, get_local_lag, 3 get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost, 4 PostView, 5}; 6use crate::state::AppState; 7use axum::{ 8 extract::{Query, State}, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11 Json, 12}; 13use jacquard_repo::storage::BlockStore; 14use serde::Deserialize; 15use serde_json::{json, Value}; 16use std::collections::HashMap; 17use tracing::warn; 18 19#[derive(Deserialize)] 20pub struct GetTimelineParams { 21 pub algorithm: Option<String>, 22 pub limit: Option<u32>, 23 pub cursor: Option<String>, 24} 25 26pub async fn get_timeline( 27 State(state): State<AppState>, 28 headers: axum::http::HeaderMap, 29 Query(params): Query<GetTimelineParams>, 30) -> Response { 31 let token = match crate::auth::extract_bearer_token_from_header( 32 headers.get("Authorization").and_then(|h| h.to_str().ok()), 33 ) { 34 Some(t) => t, 35 None => { 36 return ( 37 StatusCode::UNAUTHORIZED, 38 Json(json!({"error": "AuthenticationRequired"})), 39 ) 40 .into_response(); 41 } 42 }; 43 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 44 Ok(user) => user, 45 Err(_) => { 46 return ( 47 StatusCode::UNAUTHORIZED, 48 Json(json!({"error": "AuthenticationFailed"})), 49 ) 50 .into_response(); 51 } 52 }; 53 match std::env::var("APPVIEW_URL") { 54 Ok(url) if !url.starts_with("http://127.0.0.1") => { 55 return get_timeline_with_appview(&state, &headers, &params, &auth_user.did).await; 56 } 57 _ => {} 58 } 59 get_timeline_local_only(&state, &auth_user.did).await 60} 61 62async fn get_timeline_with_appview( 63 state: &AppState, 64 headers: &axum::http::HeaderMap, 65 params: &GetTimelineParams, 66 auth_did: &str, 67) -> Response { 68 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 69 let mut query_params = HashMap::new(); 70 if let Some(algo) = &params.algorithm { 71 query_params.insert("algorithm".to_string(), algo.clone()); 72 } 73 if let Some(limit) = params.limit { 74 query_params.insert("limit".to_string(), limit.to_string()); 75 } 76 if let Some(cursor) = &params.cursor { 77 query_params.insert("cursor".to_string(), cursor.clone()); 78 } 79 let proxy_result = 80 match proxy_to_appview("app.bsky.feed.getTimeline", &query_params, auth_header).await { 81 Ok(r) => r, 82 Err(e) => return e, 83 }; 84 if !proxy_result.status.is_success() { 85 return (proxy_result.status, proxy_result.body).into_response(); 86 } 87 let rev = extract_repo_rev(&proxy_result.headers); 88 if rev.is_none() { 89 return (proxy_result.status, proxy_result.body).into_response(); 90 } 91 let rev = rev.unwrap(); 92 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 93 Ok(f) => f, 94 Err(e) => { 95 warn!("Failed to parse timeline response: {:?}", e); 96 return (proxy_result.status, proxy_result.body).into_response(); 97 } 98 }; 99 let local_records = match get_records_since_rev(state, auth_did, &rev).await { 100 Ok(r) => r, 101 Err(e) => { 102 warn!("Failed to get local records: {}", e); 103 return (proxy_result.status, proxy_result.body).into_response(); 104 } 105 }; 106 if local_records.count == 0 { 107 return (proxy_result.status, proxy_result.body).into_response(); 108 } 109 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did) 110 .fetch_optional(&state.db) 111 .await 112 { 113 Ok(Some(h)) => h, 114 Ok(None) => auth_did.to_string(), 115 Err(e) => { 116 warn!("Database error fetching handle: {:?}", e); 117 auth_did.to_string() 118 } 119 }; 120 let local_posts: Vec<_> = local_records 121 .posts 122 .iter() 123 .map(|p| format_local_post(p, auth_did, &handle, local_records.profile.as_ref())) 124 .collect(); 125 insert_posts_into_feed(&mut feed_output.feed, local_posts); 126 let lag = get_local_lag(&local_records); 127 format_munged_response(feed_output, lag) 128} 129 130async fn get_timeline_local_only(state: &AppState, auth_did: &str) -> Response { 131 let user_id: uuid::Uuid = match sqlx::query_scalar!( 132 "SELECT id FROM users WHERE did = $1", 133 auth_did 134 ) 135 .fetch_optional(&state.db) 136 .await 137 { 138 Ok(Some(id)) => id, 139 Ok(None) => { 140 return ( 141 StatusCode::INTERNAL_SERVER_ERROR, 142 Json(json!({"error": "InternalError", "message": "User not found"})), 143 ) 144 .into_response(); 145 } 146 Err(e) => { 147 warn!("Database error fetching user: {:?}", e); 148 return ( 149 StatusCode::INTERNAL_SERVER_ERROR, 150 Json(json!({"error": "InternalError", "message": "Database error"})), 151 ) 152 .into_response(); 153 } 154 }; 155 let follows_query = sqlx::query!( 156 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000", 157 user_id 158 ) 159 .fetch_all(&state.db) 160 .await; 161 let follow_cids: Vec<String> = match follows_query { 162 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(), 163 Err(_) => { 164 return ( 165 StatusCode::INTERNAL_SERVER_ERROR, 166 Json(json!({"error": "InternalError"})), 167 ) 168 .into_response(); 169 } 170 }; 171 let mut followed_dids: Vec<String> = Vec::new(); 172 for cid_str in follow_cids { 173 let cid = match cid_str.parse::<cid::Cid>() { 174 Ok(c) => c, 175 Err(_) => continue, 176 }; 177 let block_bytes = match state.block_store.get(&cid).await { 178 Ok(Some(b)) => b, 179 _ => continue, 180 }; 181 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 182 Ok(v) => v, 183 Err(_) => continue, 184 }; 185 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) { 186 followed_dids.push(subject.to_string()); 187 } 188 } 189 if followed_dids.is_empty() { 190 return ( 191 StatusCode::OK, 192 Json(FeedOutput { 193 feed: vec![], 194 cursor: None, 195 }), 196 ) 197 .into_response(); 198 } 199 let posts_result = sqlx::query!( 200 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle 201 FROM records r 202 JOIN repos rp ON r.repo_id = rp.user_id 203 JOIN users u ON rp.user_id = u.id 204 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post' 205 ORDER BY r.created_at DESC 206 LIMIT 50", 207 &followed_dids 208 ) 209 .fetch_all(&state.db) 210 .await; 211 let posts = match posts_result { 212 Ok(rows) => rows, 213 Err(_) => { 214 return ( 215 StatusCode::INTERNAL_SERVER_ERROR, 216 Json(json!({"error": "InternalError"})), 217 ) 218 .into_response(); 219 } 220 }; 221 let mut feed: Vec<FeedViewPost> = Vec::new(); 222 for row in posts { 223 let record_cid: String = row.record_cid; 224 let rkey: String = row.rkey; 225 let created_at: chrono::DateTime<chrono::Utc> = row.created_at; 226 let author_did: String = row.did; 227 let author_handle: String = row.handle; 228 let cid = match record_cid.parse::<cid::Cid>() { 229 Ok(c) => c, 230 Err(_) => continue, 231 }; 232 let block_bytes = match state.block_store.get(&cid).await { 233 Ok(Some(b)) => b, 234 _ => continue, 235 }; 236 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 237 Ok(v) => v, 238 Err(_) => continue, 239 }; 240 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey); 241 feed.push(FeedViewPost { 242 post: PostView { 243 uri, 244 cid: record_cid, 245 author: crate::api::read_after_write::AuthorView { 246 did: author_did, 247 handle: author_handle, 248 display_name: None, 249 avatar: None, 250 extra: HashMap::new(), 251 }, 252 record, 253 indexed_at: created_at.to_rfc3339(), 254 embed: None, 255 reply_count: 0, 256 repost_count: 0, 257 like_count: 0, 258 quote_count: 0, 259 extra: HashMap::new(), 260 }, 261 reply: None, 262 reason: None, 263 feed_context: None, 264 extra: HashMap::new(), 265 }); 266 } 267 (StatusCode::OK, Json(FeedOutput { feed, cursor: None })).into_response() 268}