this repo has no description
1use crate::api::read_after_write::{ 2 FeedOutput, FeedViewPost, PostView, extract_repo_rev, format_local_post, 3 format_munged_response, get_local_lag, get_records_since_rev, insert_posts_into_feed, 4 proxy_to_appview_via_registry, 5}; 6use crate::state::AppState; 7use axum::{ 8 Json, 9 extract::{Query, State}, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use jacquard_repo::storage::BlockStore; 14use serde::Deserialize; 15use serde_json::{Value, json}; 16use std::collections::HashMap; 17use tracing::warn; 18 19#[derive(Deserialize)] 20pub struct GetTimelineParams { 21 pub algorithm: Option<String>, 22 pub limit: Option<u32>, 23 pub cursor: Option<String>, 24} 25 26pub async fn get_timeline( 27 State(state): State<AppState>, 28 headers: axum::http::HeaderMap, 29 Query(params): Query<GetTimelineParams>, 30) -> Response { 31 let token = match crate::auth::extract_bearer_token_from_header( 32 headers.get("Authorization").and_then(|h| h.to_str().ok()), 33 ) { 34 Some(t) => t, 35 None => { 36 return ( 37 StatusCode::UNAUTHORIZED, 38 Json(json!({"error": "AuthenticationRequired"})), 39 ) 40 .into_response(); 41 } 42 }; 43 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 44 Ok(user) => user, 45 Err(_) => { 46 return ( 47 StatusCode::UNAUTHORIZED, 48 Json(json!({"error": "AuthenticationFailed"})), 49 ) 50 .into_response(); 51 } 52 }; 53 if state.appview_registry.get_appview_for_method("app.bsky.feed.getTimeline").await.is_some() { 54 return get_timeline_with_appview( 55 &state, 56 &params, 57 &auth_user.did, 58 auth_user.key_bytes.as_deref(), 59 ) 60 .await; 61 } 62 get_timeline_local_only(&state, &auth_user.did).await 63} 64 65async fn get_timeline_with_appview( 66 state: &AppState, 67 params: &GetTimelineParams, 68 auth_did: &str, 69 auth_key_bytes: Option<&[u8]>, 70) -> Response { 71 let mut query_params = HashMap::new(); 72 if let Some(algo) = &params.algorithm { 73 query_params.insert("algorithm".to_string(), algo.clone()); 74 } 75 if let Some(limit) = params.limit { 76 query_params.insert("limit".to_string(), limit.to_string()); 77 } 78 if let Some(cursor) = &params.cursor { 79 query_params.insert("cursor".to_string(), cursor.clone()); 80 } 81 let proxy_result = match proxy_to_appview_via_registry( 82 state, 83 "app.bsky.feed.getTimeline", 84 &query_params, 85 auth_did, 86 auth_key_bytes, 87 ) 88 .await 89 { 90 Ok(r) => r, 91 Err(e) => return e, 92 }; 93 if !proxy_result.status.is_success() { 94 return proxy_result.into_response(); 95 } 96 let rev = extract_repo_rev(&proxy_result.headers); 97 if rev.is_none() { 98 return proxy_result.into_response(); 99 } 100 let rev = rev.unwrap(); 101 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 102 Ok(f) => f, 103 Err(e) => { 104 warn!("Failed to parse timeline response: {:?}", e); 105 return proxy_result.into_response(); 106 } 107 }; 108 let local_records = match get_records_since_rev(state, auth_did, &rev).await { 109 Ok(r) => r, 110 Err(e) => { 111 warn!("Failed to get local records: {}", e); 112 return proxy_result.into_response(); 113 } 114 }; 115 if local_records.count == 0 { 116 return proxy_result.into_response(); 117 } 118 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did) 119 .fetch_optional(&state.db) 120 .await 121 { 122 Ok(Some(h)) => h, 123 Ok(None) => auth_did.to_string(), 124 Err(e) => { 125 warn!("Database error fetching handle: {:?}", e); 126 auth_did.to_string() 127 } 128 }; 129 let local_posts: Vec<_> = local_records 130 .posts 131 .iter() 132 .map(|p| format_local_post(p, auth_did, &handle, local_records.profile.as_ref())) 133 .collect(); 134 insert_posts_into_feed(&mut feed_output.feed, local_posts); 135 let lag = get_local_lag(&local_records); 136 format_munged_response(feed_output, lag) 137} 138 139async fn get_timeline_local_only(state: &AppState, auth_did: &str) -> Response { 140 let user_id: uuid::Uuid = 141 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_did) 142 .fetch_optional(&state.db) 143 .await 144 { 145 Ok(Some(id)) => id, 146 Ok(None) => { 147 return ( 148 StatusCode::INTERNAL_SERVER_ERROR, 149 Json(json!({"error": "InternalError", "message": "User not found"})), 150 ) 151 .into_response(); 152 } 153 Err(e) => { 154 warn!("Database error fetching user: {:?}", e); 155 return ( 156 StatusCode::INTERNAL_SERVER_ERROR, 157 Json(json!({"error": "InternalError", "message": "Database error"})), 158 ) 159 .into_response(); 160 } 161 }; 162 let follows_query = sqlx::query!( 163 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000", 164 user_id 165 ) 166 .fetch_all(&state.db) 167 .await; 168 let follow_cids: Vec<String> = match follows_query { 169 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(), 170 Err(_) => { 171 return ( 172 StatusCode::INTERNAL_SERVER_ERROR, 173 Json(json!({"error": "InternalError"})), 174 ) 175 .into_response(); 176 } 177 }; 178 let mut followed_dids: Vec<String> = Vec::new(); 179 for cid_str in follow_cids { 180 let cid = match cid_str.parse::<cid::Cid>() { 181 Ok(c) => c, 182 Err(_) => continue, 183 }; 184 let block_bytes = match state.block_store.get(&cid).await { 185 Ok(Some(b)) => b, 186 _ => continue, 187 }; 188 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 189 Ok(v) => v, 190 Err(_) => continue, 191 }; 192 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) { 193 followed_dids.push(subject.to_string()); 194 } 195 } 196 if followed_dids.is_empty() { 197 return ( 198 StatusCode::OK, 199 Json(FeedOutput { 200 feed: vec![], 201 cursor: None, 202 }), 203 ) 204 .into_response(); 205 } 206 let posts_result = sqlx::query!( 207 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle 208 FROM records r 209 JOIN repos rp ON r.repo_id = rp.user_id 210 JOIN users u ON rp.user_id = u.id 211 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post' 212 ORDER BY r.created_at DESC 213 LIMIT 50", 214 &followed_dids 215 ) 216 .fetch_all(&state.db) 217 .await; 218 let posts = match posts_result { 219 Ok(rows) => rows, 220 Err(_) => { 221 return ( 222 StatusCode::INTERNAL_SERVER_ERROR, 223 Json(json!({"error": "InternalError"})), 224 ) 225 .into_response(); 226 } 227 }; 228 let mut feed: Vec<FeedViewPost> = Vec::new(); 229 for row in posts { 230 let record_cid: String = row.record_cid; 231 let rkey: String = row.rkey; 232 let created_at: chrono::DateTime<chrono::Utc> = row.created_at; 233 let author_did: String = row.did; 234 let author_handle: String = row.handle; 235 let cid = match record_cid.parse::<cid::Cid>() { 236 Ok(c) => c, 237 Err(_) => continue, 238 }; 239 let block_bytes = match state.block_store.get(&cid).await { 240 Ok(Some(b)) => b, 241 _ => continue, 242 }; 243 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 244 Ok(v) => v, 245 Err(_) => continue, 246 }; 247 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey); 248 feed.push(FeedViewPost { 249 post: PostView { 250 uri, 251 cid: record_cid, 252 author: crate::api::read_after_write::AuthorView { 253 did: author_did, 254 handle: author_handle, 255 display_name: None, 256 avatar: None, 257 extra: HashMap::new(), 258 }, 259 record, 260 indexed_at: created_at.to_rfc3339(), 261 embed: None, 262 reply_count: 0, 263 repost_count: 0, 264 like_count: 0, 265 quote_count: 0, 266 extra: HashMap::new(), 267 }, 268 reply: None, 269 reason: None, 270 feed_context: None, 271 extra: HashMap::new(), 272 }); 273 } 274 (StatusCode::OK, Json(FeedOutput { feed, cursor: None })).into_response() 275}