this repo has no description
1use crate::api::read_after_write::{ 2 FeedOutput, FeedViewPost, PostView, extract_repo_rev, format_local_post, 3 format_munged_response, get_local_lag, get_records_since_rev, insert_posts_into_feed, 4 proxy_to_appview, 5}; 6use crate::state::AppState; 7use axum::{ 8 Json, 9 extract::{Query, State}, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use jacquard_repo::storage::BlockStore; 14use serde::Deserialize; 15use serde_json::{Value, json}; 16use std::collections::HashMap; 17use tracing::warn; 18 19#[derive(Deserialize)] 20pub struct GetTimelineParams { 21 pub algorithm: Option<String>, 22 pub limit: Option<u32>, 23 pub cursor: Option<String>, 24} 25 26pub async fn get_timeline( 27 State(state): State<AppState>, 28 headers: axum::http::HeaderMap, 29 Query(params): Query<GetTimelineParams>, 30) -> Response { 31 let token = match crate::auth::extract_bearer_token_from_header( 32 headers.get("Authorization").and_then(|h| h.to_str().ok()), 33 ) { 34 Some(t) => t, 35 None => { 36 return ( 37 StatusCode::UNAUTHORIZED, 38 Json(json!({"error": "AuthenticationRequired"})), 39 ) 40 .into_response(); 41 } 42 }; 43 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 44 Ok(user) => user, 45 Err(_) => { 46 return ( 47 StatusCode::UNAUTHORIZED, 48 Json(json!({"error": "AuthenticationFailed"})), 49 ) 50 .into_response(); 51 } 52 }; 53 match std::env::var("APPVIEW_URL") { 54 Ok(url) if !url.starts_with("http://127.0.0.1") => { 55 return get_timeline_with_appview( 56 &state, 57 &params, 58 &auth_user.did, 59 auth_user.key_bytes.as_deref(), 60 ) 61 .await; 62 } 63 _ => {} 64 } 65 get_timeline_local_only(&state, &auth_user.did).await 66} 67 68async fn get_timeline_with_appview( 69 state: &AppState, 70 params: &GetTimelineParams, 71 auth_did: &str, 72 auth_key_bytes: Option<&[u8]>, 73) -> Response { 74 let mut query_params = HashMap::new(); 75 if let Some(algo) = &params.algorithm { 76 query_params.insert("algorithm".to_string(), algo.clone()); 77 } 78 if let Some(limit) = params.limit { 79 query_params.insert("limit".to_string(), limit.to_string()); 80 } 81 if let Some(cursor) = &params.cursor { 82 query_params.insert("cursor".to_string(), cursor.clone()); 83 } 84 let proxy_result = match proxy_to_appview( 85 "app.bsky.feed.getTimeline", 86 &query_params, 87 auth_did, 88 auth_key_bytes, 89 ) 90 .await 91 { 92 Ok(r) => r, 93 Err(e) => return e, 94 }; 95 if !proxy_result.status.is_success() { 96 return proxy_result.into_response(); 97 } 98 let rev = extract_repo_rev(&proxy_result.headers); 99 if rev.is_none() { 100 return proxy_result.into_response(); 101 } 102 let rev = rev.unwrap(); 103 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 104 Ok(f) => f, 105 Err(e) => { 106 warn!("Failed to parse timeline response: {:?}", e); 107 return proxy_result.into_response(); 108 } 109 }; 110 let local_records = match get_records_since_rev(state, auth_did, &rev).await { 111 Ok(r) => r, 112 Err(e) => { 113 warn!("Failed to get local records: {}", e); 114 return proxy_result.into_response(); 115 } 116 }; 117 if local_records.count == 0 { 118 return proxy_result.into_response(); 119 } 120 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did) 121 .fetch_optional(&state.db) 122 .await 123 { 124 Ok(Some(h)) => h, 125 Ok(None) => auth_did.to_string(), 126 Err(e) => { 127 warn!("Database error fetching handle: {:?}", e); 128 auth_did.to_string() 129 } 130 }; 131 let local_posts: Vec<_> = local_records 132 .posts 133 .iter() 134 .map(|p| format_local_post(p, auth_did, &handle, local_records.profile.as_ref())) 135 .collect(); 136 insert_posts_into_feed(&mut feed_output.feed, local_posts); 137 let lag = get_local_lag(&local_records); 138 format_munged_response(feed_output, lag) 139} 140 141async fn get_timeline_local_only(state: &AppState, auth_did: &str) -> Response { 142 let user_id: uuid::Uuid = 143 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_did) 144 .fetch_optional(&state.db) 145 .await 146 { 147 Ok(Some(id)) => id, 148 Ok(None) => { 149 return ( 150 StatusCode::INTERNAL_SERVER_ERROR, 151 Json(json!({"error": "InternalError", "message": "User not found"})), 152 ) 153 .into_response(); 154 } 155 Err(e) => { 156 warn!("Database error fetching user: {:?}", e); 157 return ( 158 StatusCode::INTERNAL_SERVER_ERROR, 159 Json(json!({"error": "InternalError", "message": "Database error"})), 160 ) 161 .into_response(); 162 } 163 }; 164 let follows_query = sqlx::query!( 165 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000", 166 user_id 167 ) 168 .fetch_all(&state.db) 169 .await; 170 let follow_cids: Vec<String> = match follows_query { 171 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(), 172 Err(_) => { 173 return ( 174 StatusCode::INTERNAL_SERVER_ERROR, 175 Json(json!({"error": "InternalError"})), 176 ) 177 .into_response(); 178 } 179 }; 180 let mut followed_dids: Vec<String> = Vec::new(); 181 for cid_str in follow_cids { 182 let cid = match cid_str.parse::<cid::Cid>() { 183 Ok(c) => c, 184 Err(_) => continue, 185 }; 186 let block_bytes = match state.block_store.get(&cid).await { 187 Ok(Some(b)) => b, 188 _ => continue, 189 }; 190 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 191 Ok(v) => v, 192 Err(_) => continue, 193 }; 194 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) { 195 followed_dids.push(subject.to_string()); 196 } 197 } 198 if followed_dids.is_empty() { 199 return ( 200 StatusCode::OK, 201 Json(FeedOutput { 202 feed: vec![], 203 cursor: None, 204 }), 205 ) 206 .into_response(); 207 } 208 let posts_result = sqlx::query!( 209 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle 210 FROM records r 211 JOIN repos rp ON r.repo_id = rp.user_id 212 JOIN users u ON rp.user_id = u.id 213 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post' 214 ORDER BY r.created_at DESC 215 LIMIT 50", 216 &followed_dids 217 ) 218 .fetch_all(&state.db) 219 .await; 220 let posts = match posts_result { 221 Ok(rows) => rows, 222 Err(_) => { 223 return ( 224 StatusCode::INTERNAL_SERVER_ERROR, 225 Json(json!({"error": "InternalError"})), 226 ) 227 .into_response(); 228 } 229 }; 230 let mut feed: Vec<FeedViewPost> = Vec::new(); 231 for row in posts { 232 let record_cid: String = row.record_cid; 233 let rkey: String = row.rkey; 234 let created_at: chrono::DateTime<chrono::Utc> = row.created_at; 235 let author_did: String = row.did; 236 let author_handle: String = row.handle; 237 let cid = match record_cid.parse::<cid::Cid>() { 238 Ok(c) => c, 239 Err(_) => continue, 240 }; 241 let block_bytes = match state.block_store.get(&cid).await { 242 Ok(Some(b)) => b, 243 _ => continue, 244 }; 245 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 246 Ok(v) => v, 247 Err(_) => continue, 248 }; 249 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey); 250 feed.push(FeedViewPost { 251 post: PostView { 252 uri, 253 cid: record_cid, 254 author: crate::api::read_after_write::AuthorView { 255 did: author_did, 256 handle: author_handle, 257 display_name: None, 258 avatar: None, 259 extra: HashMap::new(), 260 }, 261 record, 262 indexed_at: created_at.to_rfc3339(), 263 embed: None, 264 reply_count: 0, 265 repost_count: 0, 266 like_count: 0, 267 quote_count: 0, 268 extra: HashMap::new(), 269 }, 270 reply: None, 271 reason: None, 272 feed_context: None, 273 extra: HashMap::new(), 274 }); 275 } 276 (StatusCode::OK, Json(FeedOutput { feed, cursor: None })).into_response() 277}