this repo has no description
1use crate::api::read_after_write::{ 2 extract_repo_rev, format_local_post, format_munged_response, get_local_lag, 3 get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost, 4 PostView, 5}; 6use crate::state::AppState; 7use axum::{ 8 extract::{Query, State}, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11 Json, 12}; 13use jacquard_repo::storage::BlockStore; 14use serde::Deserialize; 15use serde_json::{json, Value}; 16use std::collections::HashMap; 17use tracing::warn; 18#[derive(Deserialize)] 19pub struct GetTimelineParams { 20 pub algorithm: Option<String>, 21 pub limit: Option<u32>, 22 pub cursor: Option<String>, 23} 24pub async fn get_timeline( 25 State(state): State<AppState>, 26 headers: axum::http::HeaderMap, 27 Query(params): Query<GetTimelineParams>, 28) -> Response { 29 let token = match crate::auth::extract_bearer_token_from_header( 30 headers.get("Authorization").and_then(|h| h.to_str().ok()), 31 ) { 32 Some(t) => t, 33 None => { 34 return ( 35 StatusCode::UNAUTHORIZED, 36 Json(json!({"error": "AuthenticationRequired"})), 37 ) 38 .into_response(); 39 } 40 }; 41 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 42 Ok(user) => user, 43 Err(_) => { 44 return ( 45 StatusCode::UNAUTHORIZED, 46 Json(json!({"error": "AuthenticationFailed"})), 47 ) 48 .into_response(); 49 } 50 }; 51 match std::env::var("APPVIEW_URL") { 52 Ok(url) if !url.starts_with("http://127.0.0.1") => { 53 return get_timeline_with_appview(&state, &headers, &params, &auth_user.did).await; 54 } 55 _ => {} 56 } 57 get_timeline_local_only(&state, &auth_user.did).await 58} 59async fn get_timeline_with_appview( 60 state: &AppState, 61 headers: &axum::http::HeaderMap, 62 params: &GetTimelineParams, 63 auth_did: &str, 64) -> Response { 65 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 66 let mut query_params = HashMap::new(); 67 if let Some(algo) = &params.algorithm { 68 query_params.insert("algorithm".to_string(), algo.clone()); 69 } 70 if let Some(limit) = params.limit { 71 query_params.insert("limit".to_string(), limit.to_string()); 72 } 73 if let Some(cursor) = &params.cursor { 74 query_params.insert("cursor".to_string(), cursor.clone()); 75 } 76 let proxy_result = 77 match proxy_to_appview("app.bsky.feed.getTimeline", &query_params, auth_header).await { 78 Ok(r) => r, 79 Err(e) => return e, 80 }; 81 if !proxy_result.status.is_success() { 82 return (proxy_result.status, proxy_result.body).into_response(); 83 } 84 let rev = extract_repo_rev(&proxy_result.headers); 85 if rev.is_none() { 86 return (proxy_result.status, proxy_result.body).into_response(); 87 } 88 let rev = rev.unwrap(); 89 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 90 Ok(f) => f, 91 Err(e) => { 92 warn!("Failed to parse timeline response: {:?}", e); 93 return (proxy_result.status, proxy_result.body).into_response(); 94 } 95 }; 96 let local_records = match get_records_since_rev(state, auth_did, &rev).await { 97 Ok(r) => r, 98 Err(e) => { 99 warn!("Failed to get local records: {}", e); 100 return (proxy_result.status, proxy_result.body).into_response(); 101 } 102 }; 103 if local_records.count == 0 { 104 return (proxy_result.status, proxy_result.body).into_response(); 105 } 106 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did) 107 .fetch_optional(&state.db) 108 .await 109 { 110 Ok(Some(h)) => h, 111 Ok(None) => auth_did.to_string(), 112 Err(e) => { 113 warn!("Database error fetching handle: {:?}", e); 114 auth_did.to_string() 115 } 116 }; 117 let local_posts: Vec<_> = local_records 118 .posts 119 .iter() 120 .map(|p| format_local_post(p, auth_did, &handle, local_records.profile.as_ref())) 121 .collect(); 122 insert_posts_into_feed(&mut feed_output.feed, local_posts); 123 let lag = get_local_lag(&local_records); 124 format_munged_response(feed_output, lag) 125} 126async fn get_timeline_local_only(state: &AppState, auth_did: &str) -> Response { 127 let user_id: uuid::Uuid = match sqlx::query_scalar!( 128 "SELECT id FROM users WHERE did = $1", 129 auth_did 130 ) 131 .fetch_optional(&state.db) 132 .await 133 { 134 Ok(Some(id)) => id, 135 Ok(None) => { 136 return ( 137 StatusCode::INTERNAL_SERVER_ERROR, 138 Json(json!({"error": "InternalError", "message": "User not found"})), 139 ) 140 .into_response(); 141 } 142 Err(e) => { 143 warn!("Database error fetching user: {:?}", e); 144 return ( 145 StatusCode::INTERNAL_SERVER_ERROR, 146 Json(json!({"error": "InternalError", "message": "Database error"})), 147 ) 148 .into_response(); 149 } 150 }; 151 let follows_query = sqlx::query!( 152 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000", 153 user_id 154 ) 155 .fetch_all(&state.db) 156 .await; 157 let follow_cids: Vec<String> = match follows_query { 158 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(), 159 Err(_) => { 160 return ( 161 StatusCode::INTERNAL_SERVER_ERROR, 162 Json(json!({"error": "InternalError"})), 163 ) 164 .into_response(); 165 } 166 }; 167 let mut followed_dids: Vec<String> = Vec::new(); 168 for cid_str in follow_cids { 169 let cid = match cid_str.parse::<cid::Cid>() { 170 Ok(c) => c, 171 Err(_) => continue, 172 }; 173 let block_bytes = match state.block_store.get(&cid).await { 174 Ok(Some(b)) => b, 175 _ => continue, 176 }; 177 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 178 Ok(v) => v, 179 Err(_) => continue, 180 }; 181 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) { 182 followed_dids.push(subject.to_string()); 183 } 184 } 185 if followed_dids.is_empty() { 186 return ( 187 StatusCode::OK, 188 Json(FeedOutput { 189 feed: vec![], 190 cursor: None, 191 }), 192 ) 193 .into_response(); 194 } 195 let posts_result = sqlx::query!( 196 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle 197 FROM records r 198 JOIN repos rp ON r.repo_id = rp.user_id 199 JOIN users u ON rp.user_id = u.id 200 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post' 201 ORDER BY r.created_at DESC 202 LIMIT 50", 203 &followed_dids 204 ) 205 .fetch_all(&state.db) 206 .await; 207 let posts = match posts_result { 208 Ok(rows) => rows, 209 Err(_) => { 210 return ( 211 StatusCode::INTERNAL_SERVER_ERROR, 212 Json(json!({"error": "InternalError"})), 213 ) 214 .into_response(); 215 } 216 }; 217 let mut feed: Vec<FeedViewPost> = Vec::new(); 218 for row in posts { 219 let record_cid: String = row.record_cid; 220 let rkey: String = row.rkey; 221 let created_at: chrono::DateTime<chrono::Utc> = row.created_at; 222 let author_did: String = row.did; 223 let author_handle: String = row.handle; 224 let cid = match record_cid.parse::<cid::Cid>() { 225 Ok(c) => c, 226 Err(_) => continue, 227 }; 228 let block_bytes = match state.block_store.get(&cid).await { 229 Ok(Some(b)) => b, 230 _ => continue, 231 }; 232 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 233 Ok(v) => v, 234 Err(_) => continue, 235 }; 236 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey); 237 feed.push(FeedViewPost { 238 post: PostView { 239 uri, 240 cid: record_cid, 241 author: crate::api::read_after_write::AuthorView { 242 did: author_did, 243 handle: author_handle, 244 display_name: None, 245 avatar: None, 246 extra: HashMap::new(), 247 }, 248 record, 249 indexed_at: created_at.to_rfc3339(), 250 embed: None, 251 reply_count: 0, 252 repost_count: 0, 253 like_count: 0, 254 quote_count: 0, 255 extra: HashMap::new(), 256 }, 257 reply: None, 258 reason: None, 259 feed_context: None, 260 extra: HashMap::new(), 261 }); 262 } 263 (StatusCode::OK, Json(FeedOutput { feed, cursor: None })).into_response() 264}