this repo has no description
1use crate::api::read_after_write::{ 2 extract_repo_rev, format_local_post, format_munged_response, get_local_lag, 3 get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost, 4 PostView, 5}; 6use crate::state::AppState; 7use axum::{ 8 extract::{Query, State}, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11 Json, 12}; 13use jacquard_repo::storage::BlockStore; 14use serde::Deserialize; 15use serde_json::{json, Value}; 16use std::collections::HashMap; 17use tracing::warn; 18 19#[derive(Deserialize)] 20pub struct GetTimelineParams { 21 pub algorithm: Option<String>, 22 pub limit: Option<u32>, 23 pub cursor: Option<String>, 24} 25 26pub async fn get_timeline( 27 State(state): State<AppState>, 28 headers: axum::http::HeaderMap, 29 Query(params): Query<GetTimelineParams>, 30) -> Response { 31 let token = match crate::auth::extract_bearer_token_from_header( 32 headers.get("Authorization").and_then(|h| h.to_str().ok()), 33 ) { 34 Some(t) => t, 35 None => { 36 return ( 37 StatusCode::UNAUTHORIZED, 38 Json(json!({"error": "AuthenticationRequired"})), 39 ) 40 .into_response(); 41 } 42 }; 43 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 44 Ok(user) => user, 45 Err(_) => { 46 return ( 47 StatusCode::UNAUTHORIZED, 48 Json(json!({"error": "AuthenticationFailed"})), 49 ) 50 .into_response(); 51 } 52 }; 53 match std::env::var("APPVIEW_URL") { 54 Ok(url) if !url.starts_with("http://127.0.0.1") => { 55 return get_timeline_with_appview(&state, &params, &auth_user.did, auth_user.key_bytes.as_deref()).await; 56 } 57 _ => {} 58 } 59 get_timeline_local_only(&state, &auth_user.did).await 60} 61 62async fn get_timeline_with_appview( 63 state: &AppState, 64 params: &GetTimelineParams, 65 auth_did: &str, 66 auth_key_bytes: Option<&[u8]>, 67) -> Response { 68 let mut query_params = HashMap::new(); 69 if let Some(algo) = &params.algorithm { 70 query_params.insert("algorithm".to_string(), algo.clone()); 71 } 72 if let Some(limit) = params.limit { 73 query_params.insert("limit".to_string(), limit.to_string()); 74 } 75 if let Some(cursor) = &params.cursor { 76 query_params.insert("cursor".to_string(), cursor.clone()); 77 } 78 let proxy_result = 79 match proxy_to_appview("app.bsky.feed.getTimeline", &query_params, auth_did, auth_key_bytes).await { 80 Ok(r) => r, 81 Err(e) => return e, 82 }; 83 if !proxy_result.status.is_success() { 84 return proxy_result.into_response(); 85 } 86 let rev = extract_repo_rev(&proxy_result.headers); 87 if rev.is_none() { 88 return proxy_result.into_response(); 89 } 90 let rev = rev.unwrap(); 91 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 92 Ok(f) => f, 93 Err(e) => { 94 warn!("Failed to parse timeline response: {:?}", e); 95 return proxy_result.into_response(); 96 } 97 }; 98 let local_records = match get_records_since_rev(state, auth_did, &rev).await { 99 Ok(r) => r, 100 Err(e) => { 101 warn!("Failed to get local records: {}", e); 102 return proxy_result.into_response(); 103 } 104 }; 105 if local_records.count == 0 { 106 return proxy_result.into_response(); 107 } 108 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did) 109 .fetch_optional(&state.db) 110 .await 111 { 112 Ok(Some(h)) => h, 113 Ok(None) => auth_did.to_string(), 114 Err(e) => { 115 warn!("Database error fetching handle: {:?}", e); 116 auth_did.to_string() 117 } 118 }; 119 let local_posts: Vec<_> = local_records 120 .posts 121 .iter() 122 .map(|p| format_local_post(p, auth_did, &handle, local_records.profile.as_ref())) 123 .collect(); 124 insert_posts_into_feed(&mut feed_output.feed, local_posts); 125 let lag = get_local_lag(&local_records); 126 format_munged_response(feed_output, lag) 127} 128 129async fn get_timeline_local_only(state: &AppState, auth_did: &str) -> Response { 130 let user_id: uuid::Uuid = match sqlx::query_scalar!( 131 "SELECT id FROM users WHERE did = $1", 132 auth_did 133 ) 134 .fetch_optional(&state.db) 135 .await 136 { 137 Ok(Some(id)) => id, 138 Ok(None) => { 139 return ( 140 StatusCode::INTERNAL_SERVER_ERROR, 141 Json(json!({"error": "InternalError", "message": "User not found"})), 142 ) 143 .into_response(); 144 } 145 Err(e) => { 146 warn!("Database error fetching user: {:?}", e); 147 return ( 148 StatusCode::INTERNAL_SERVER_ERROR, 149 Json(json!({"error": "InternalError", "message": "Database error"})), 150 ) 151 .into_response(); 152 } 153 }; 154 let follows_query = sqlx::query!( 155 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000", 156 user_id 157 ) 158 .fetch_all(&state.db) 159 .await; 160 let follow_cids: Vec<String> = match follows_query { 161 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(), 162 Err(_) => { 163 return ( 164 StatusCode::INTERNAL_SERVER_ERROR, 165 Json(json!({"error": "InternalError"})), 166 ) 167 .into_response(); 168 } 169 }; 170 let mut followed_dids: Vec<String> = Vec::new(); 171 for cid_str in follow_cids { 172 let cid = match cid_str.parse::<cid::Cid>() { 173 Ok(c) => c, 174 Err(_) => continue, 175 }; 176 let block_bytes = match state.block_store.get(&cid).await { 177 Ok(Some(b)) => b, 178 _ => continue, 179 }; 180 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 181 Ok(v) => v, 182 Err(_) => continue, 183 }; 184 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) { 185 followed_dids.push(subject.to_string()); 186 } 187 } 188 if followed_dids.is_empty() { 189 return ( 190 StatusCode::OK, 191 Json(FeedOutput { 192 feed: vec![], 193 cursor: None, 194 }), 195 ) 196 .into_response(); 197 } 198 let posts_result = sqlx::query!( 199 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle 200 FROM records r 201 JOIN repos rp ON r.repo_id = rp.user_id 202 JOIN users u ON rp.user_id = u.id 203 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post' 204 ORDER BY r.created_at DESC 205 LIMIT 50", 206 &followed_dids 207 ) 208 .fetch_all(&state.db) 209 .await; 210 let posts = match posts_result { 211 Ok(rows) => rows, 212 Err(_) => { 213 return ( 214 StatusCode::INTERNAL_SERVER_ERROR, 215 Json(json!({"error": "InternalError"})), 216 ) 217 .into_response(); 218 } 219 }; 220 let mut feed: Vec<FeedViewPost> = Vec::new(); 221 for row in posts { 222 let record_cid: String = row.record_cid; 223 let rkey: String = row.rkey; 224 let created_at: chrono::DateTime<chrono::Utc> = row.created_at; 225 let author_did: String = row.did; 226 let author_handle: String = row.handle; 227 let cid = match record_cid.parse::<cid::Cid>() { 228 Ok(c) => c, 229 Err(_) => continue, 230 }; 231 let block_bytes = match state.block_store.get(&cid).await { 232 Ok(Some(b)) => b, 233 _ => continue, 234 }; 235 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 236 Ok(v) => v, 237 Err(_) => continue, 238 }; 239 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey); 240 feed.push(FeedViewPost { 241 post: PostView { 242 uri, 243 cid: record_cid, 244 author: crate::api::read_after_write::AuthorView { 245 did: author_did, 246 handle: author_handle, 247 display_name: None, 248 avatar: None, 249 extra: HashMap::new(), 250 }, 251 record, 252 indexed_at: created_at.to_rfc3339(), 253 embed: None, 254 reply_count: 0, 255 repost_count: 0, 256 like_count: 0, 257 quote_count: 0, 258 extra: HashMap::new(), 259 }, 260 reply: None, 261 reason: None, 262 feed_context: None, 263 extra: HashMap::new(), 264 }); 265 } 266 (StatusCode::OK, Json(FeedOutput { feed, cursor: None })).into_response() 267}