this repo has no description
1// Yes, I know, this endpoint is an appview one, not for PDS. Who cares!! 2// Yes, this only gets posts that our DB/instance knows about. Who cares!!! 3 4use crate::state::AppState; 5use axum::{ 6 Json, 7 extract::State, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use jacquard_repo::storage::BlockStore; 12use serde::Serialize; 13use serde_json::{Value, json}; 14use sqlx::Row; 15use tracing::error; 16 17#[derive(Serialize)] 18pub struct TimelineOutput { 19 pub feed: Vec<FeedViewPost>, 20 pub cursor: Option<String>, 21} 22 23#[derive(Serialize)] 24pub struct FeedViewPost { 25 pub post: PostView, 26} 27 28#[derive(Serialize)] 29#[serde(rename_all = "camelCase")] 30pub struct PostView { 31 pub uri: String, 32 pub cid: String, 33 pub author: AuthorView, 34 pub record: Value, 35 pub indexed_at: String, 36} 37 38#[derive(Serialize)] 39pub struct AuthorView { 40 pub did: String, 41 pub handle: String, 42} 43 44pub async fn get_timeline( 45 State(state): State<AppState>, 46 headers: axum::http::HeaderMap, 47) -> Response { 48 let auth_header = headers.get("Authorization"); 49 if auth_header.is_none() { 50 return ( 51 StatusCode::UNAUTHORIZED, 52 Json(json!({"error": "AuthenticationRequired"})), 53 ) 54 .into_response(); 55 } 56 let token = auth_header 57 .unwrap() 58 .to_str() 59 .unwrap_or("") 60 .replace("Bearer ", ""); 61 62 let session = sqlx::query!( 63 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", 64 token 65 ) 66 .fetch_optional(&state.db) 67 .await 68 .unwrap_or(None); 69 70 let (did, key_bytes) = match session { 71 Some(row) => (row.did, row.key_bytes), 72 None => { 73 return ( 74 StatusCode::UNAUTHORIZED, 75 Json(json!({"error": "AuthenticationFailed"})), 76 ) 77 .into_response(); 78 } 79 }; 80 81 if crate::auth::verify_token(&token, &key_bytes).is_err() { 82 return ( 83 StatusCode::UNAUTHORIZED, 84 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 85 ) 86 .into_response(); 87 } 88 89 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 90 .fetch_optional(&state.db) 91 .await; 92 93 let user_id = match user_query { 94 Ok(Some(row)) => row.id, 95 _ => { 96 return ( 97 StatusCode::INTERNAL_SERVER_ERROR, 98 Json(json!({"error": "InternalError", "message": "User not found"})), 99 ) 100 .into_response(); 101 } 102 }; 103 104 let follows_query = sqlx::query!( 105 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow'", 106 user_id 107 ) 108 .fetch_all(&state.db) 109 .await; 110 111 let follow_cids: Vec<String> = match follows_query { 112 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(), 113 Err(e) => { 114 error!("Failed to get follows: {:?}", e); 115 return ( 116 StatusCode::INTERNAL_SERVER_ERROR, 117 Json(json!({"error": "InternalError"})), 118 ) 119 .into_response(); 120 } 121 }; 122 123 let mut followed_dids: Vec<String> = Vec::new(); 124 for cid_str in follow_cids { 125 let cid = match cid_str.parse::<cid::Cid>() { 126 Ok(c) => c, 127 Err(_) => continue, 128 }; 129 130 let block_bytes = match state.block_store.get(&cid).await { 131 Ok(Some(b)) => b, 132 _ => continue, 133 }; 134 135 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 136 Ok(v) => v, 137 Err(_) => continue, 138 }; 139 140 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) { 141 followed_dids.push(subject.to_string()); 142 } 143 } 144 145 if followed_dids.is_empty() { 146 return ( 147 StatusCode::OK, 148 Json(TimelineOutput { 149 feed: vec![], 150 cursor: None, 151 }), 152 ) 153 .into_response(); 154 } 155 156 let placeholders: Vec<String> = followed_dids 157 .iter() 158 .enumerate() 159 .map(|(i, _)| format!("${}", i + 1)) 160 .collect(); 161 162 let posts_query = format!( 163 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle 164 FROM records r 165 JOIN repos rp ON r.repo_id = rp.user_id 166 JOIN users u ON rp.user_id = u.id 167 WHERE u.did IN ({}) AND r.collection = 'app.bsky.feed.post' 168 ORDER BY r.created_at DESC 169 LIMIT 50", 170 placeholders.join(", ") 171 ); 172 173 let mut query = sqlx::query(&posts_query); 174 for did in &followed_dids { 175 query = query.bind(did); 176 } 177 178 let posts_result = query.fetch_all(&state.db).await; 179 180 let posts = match posts_result { 181 Ok(rows) => rows, 182 Err(e) => { 183 error!("Failed to get posts: {:?}", e); 184 return ( 185 StatusCode::INTERNAL_SERVER_ERROR, 186 Json(json!({"error": "InternalError"})), 187 ) 188 .into_response(); 189 } 190 }; 191 192 let mut feed: Vec<FeedViewPost> = Vec::new(); 193 194 for row in posts { 195 let record_cid: String = row.get("record_cid"); 196 let rkey: String = row.get("rkey"); 197 let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at"); 198 let author_did: String = row.get("did"); 199 let author_handle: String = row.get("handle"); 200 201 let cid = match record_cid.parse::<cid::Cid>() { 202 Ok(c) => c, 203 Err(_) => continue, 204 }; 205 206 let block_bytes = match state.block_store.get(&cid).await { 207 Ok(Some(b)) => b, 208 _ => continue, 209 }; 210 211 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 212 Ok(v) => v, 213 Err(_) => continue, 214 }; 215 216 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey); 217 218 feed.push(FeedViewPost { 219 post: PostView { 220 uri, 221 cid: record_cid, 222 author: AuthorView { 223 did: author_did, 224 handle: author_handle, 225 }, 226 record, 227 indexed_at: created_at.to_rfc3339(), 228 }, 229 }); 230 } 231 232 (StatusCode::OK, Json(TimelineOutput { feed, cursor: None })).into_response() 233}