this repo has no description
1// Yes, I know, this endpoint is an appview one, not for PDS. Who cares!! 2// Yes, this only gets posts that our DB/instance knows about. Who cares!!! 3 4use crate::state::AppState; 5use axum::{ 6 Json, 7 extract::State, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use jacquard_repo::storage::BlockStore; 12use serde::Serialize; 13use serde_json::{Value, json}; 14use tracing::error; 15 16#[derive(Serialize)] 17pub struct TimelineOutput { 18 pub feed: Vec<FeedViewPost>, 19 pub cursor: Option<String>, 20} 21 22#[derive(Serialize)] 23pub struct FeedViewPost { 24 pub post: PostView, 25} 26 27#[derive(Serialize)] 28#[serde(rename_all = "camelCase")] 29pub struct PostView { 30 pub uri: String, 31 pub cid: String, 32 pub author: AuthorView, 33 pub record: Value, 34 pub indexed_at: String, 35} 36 37#[derive(Serialize)] 38pub struct AuthorView { 39 pub did: String, 40 pub handle: String, 41} 42 43pub async fn get_timeline( 44 State(state): State<AppState>, 45 headers: axum::http::HeaderMap, 46) -> Response { 47 let auth_header = headers.get("Authorization"); 48 if auth_header.is_none() { 49 return ( 50 StatusCode::UNAUTHORIZED, 51 Json(json!({"error": "AuthenticationRequired"})), 52 ) 53 .into_response(); 54 } 55 let token = auth_header 56 .unwrap() 57 .to_str() 58 .unwrap_or("") 59 .replace("Bearer ", ""); 60 61 let session = sqlx::query!( 62 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", 63 token 64 ) 65 .fetch_optional(&state.db) 66 .await 67 .unwrap_or(None); 68 69 let (did, key_bytes) = match session { 70 Some(row) => (row.did, row.key_bytes), 71 None => { 72 return ( 73 StatusCode::UNAUTHORIZED, 74 Json(json!({"error": "AuthenticationFailed"})), 75 ) 76 .into_response(); 77 } 78 }; 79 80 if crate::auth::verify_token(&token, &key_bytes).is_err() { 81 return ( 82 StatusCode::UNAUTHORIZED, 83 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 84 ) 85 .into_response(); 86 } 87 88 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 89 .fetch_optional(&state.db) 90 .await; 91 92 let user_id = match user_query { 93 Ok(Some(row)) => row.id, 94 _ => { 95 return ( 96 StatusCode::INTERNAL_SERVER_ERROR, 97 Json(json!({"error": "InternalError", "message": "User not found"})), 98 ) 99 .into_response(); 100 } 101 }; 102 103 let follows_query = sqlx::query!( 104 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow'", 105 user_id 106 ) 107 .fetch_all(&state.db) 108 .await; 109 110 let follow_cids: Vec<String> = match follows_query { 111 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(), 112 Err(e) => { 113 error!("Failed to get follows: {:?}", e); 114 return ( 115 StatusCode::INTERNAL_SERVER_ERROR, 116 Json(json!({"error": "InternalError"})), 117 ) 118 .into_response(); 119 } 120 }; 121 122 let mut followed_dids: Vec<String> = Vec::new(); 123 for cid_str in follow_cids { 124 let cid = match cid_str.parse::<cid::Cid>() { 125 Ok(c) => c, 126 Err(_) => continue, 127 }; 128 129 let block_bytes = match state.block_store.get(&cid).await { 130 Ok(Some(b)) => b, 131 _ => continue, 132 }; 133 134 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 135 Ok(v) => v, 136 Err(_) => continue, 137 }; 138 139 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) { 140 followed_dids.push(subject.to_string()); 141 } 142 } 143 144 if followed_dids.is_empty() { 145 return ( 146 StatusCode::OK, 147 Json(TimelineOutput { 148 feed: vec![], 149 cursor: None, 150 }), 151 ) 152 .into_response(); 153 } 154 155 let posts_result = sqlx::query!( 156 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle 157 FROM records r 158 JOIN repos rp ON r.repo_id = rp.user_id 159 JOIN users u ON rp.user_id = u.id 160 WHERE u.did = ANY($1) AND r.collection = 'app.bsky.feed.post' 161 ORDER BY r.created_at DESC 162 LIMIT 50", 163 &followed_dids 164 ) 165 .fetch_all(&state.db) 166 .await; 167 168 let posts = match posts_result { 169 Ok(rows) => rows, 170 Err(e) => { 171 error!("Failed to get posts: {:?}", e); 172 return ( 173 StatusCode::INTERNAL_SERVER_ERROR, 174 Json(json!({"error": "InternalError"})), 175 ) 176 .into_response(); 177 } 178 }; 179 180 let mut feed: Vec<FeedViewPost> = Vec::new(); 181 182 for row in posts { 183 let record_cid: String = row.record_cid; 184 let rkey: String = row.rkey; 185 let created_at: chrono::DateTime<chrono::Utc> = row.created_at; 186 let author_did: String = row.did; 187 let author_handle: String = row.handle; 188 189 let cid = match record_cid.parse::<cid::Cid>() { 190 Ok(c) => c, 191 Err(_) => continue, 192 }; 193 194 let block_bytes = match state.block_store.get(&cid).await { 195 Ok(Some(b)) => b, 196 _ => continue, 197 }; 198 199 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 200 Ok(v) => v, 201 Err(_) => continue, 202 }; 203 204 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey); 205 206 feed.push(FeedViewPost { 207 post: PostView { 208 uri, 209 cid: record_cid, 210 author: AuthorView { 211 did: author_did, 212 handle: author_handle, 213 }, 214 record, 215 indexed_at: created_at.to_rfc3339(), 216 }, 217 }); 218 } 219 220 (StatusCode::OK, Json(TimelineOutput { feed, cursor: None })).into_response() 221}