this repo has no description
1// Yes, I know, this endpoint is an appview one, not for PDS. Who cares!! 2// Yes, this only gets posts that our DB/instance knows about. Who cares!!! 3 4use crate::state::AppState; 5use axum::{ 6 Json, 7 extract::State, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use jacquard_repo::storage::BlockStore; 12use serde::Serialize; 13use serde_json::{Value, json}; 14use sqlx::Row; 15use tracing::error; 16 17#[derive(Serialize)] 18pub struct TimelineOutput { 19 pub feed: Vec<FeedViewPost>, 20 pub cursor: Option<String>, 21} 22 23#[derive(Serialize)] 24pub struct FeedViewPost { 25 pub post: PostView, 26} 27 28#[derive(Serialize)] 29#[serde(rename_all = "camelCase")] 30pub struct PostView { 31 pub uri: String, 32 pub cid: String, 33 pub author: AuthorView, 34 pub record: Value, 35 pub indexed_at: String, 36} 37 38#[derive(Serialize)] 39pub struct AuthorView { 40 pub did: String, 41 pub handle: String, 42} 43 44pub async fn get_timeline( 45 State(state): State<AppState>, 46 headers: axum::http::HeaderMap, 47) -> Response { 48 let auth_header = headers.get("Authorization"); 49 if auth_header.is_none() { 50 return ( 51 StatusCode::UNAUTHORIZED, 52 Json(json!({"error": "AuthenticationRequired"})), 53 ) 54 .into_response(); 55 } 56 let token = auth_header 57 .unwrap() 58 .to_str() 59 .unwrap_or("") 60 .replace("Bearer ", ""); 61 62 let session = sqlx::query( 63 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 64 ) 65 .bind(&token) 66 .fetch_optional(&state.db) 67 .await 68 .unwrap_or(None); 69 70 let (did, key_bytes) = match session { 71 Some(row) => ( 72 row.get::<String, _>("did"), 73 row.get::<Vec<u8>, _>("key_bytes"), 74 ), 75 None => { 76 return ( 77 StatusCode::UNAUTHORIZED, 78 Json(json!({"error": "AuthenticationFailed"})), 79 ) 80 .into_response(); 81 } 82 }; 83 84 if crate::auth::verify_token(&token, &key_bytes).is_err() { 85 return ( 86 StatusCode::UNAUTHORIZED, 87 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 88 ) 89 .into_response(); 90 } 91 92 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 93 .bind(&did) 94 .fetch_optional(&state.db) 95 .await; 96 97 let user_id: uuid::Uuid = match user_query { 98 Ok(Some(row)) => row.get("id"), 99 _ => { 100 return ( 101 StatusCode::INTERNAL_SERVER_ERROR, 102 Json(json!({"error": "InternalError", "message": "User not found"})), 103 ) 104 .into_response(); 105 } 106 }; 107 108 let follows_query = sqlx::query( 109 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow'" 110 ) 111 .bind(user_id) 112 .fetch_all(&state.db) 113 .await; 114 115 let follow_cids: Vec<String> = match follows_query { 116 Ok(rows) => rows.iter().map(|r| r.get("record_cid")).collect(), 117 Err(e) => { 118 error!("Failed to get follows: {:?}", e); 119 return ( 120 StatusCode::INTERNAL_SERVER_ERROR, 121 Json(json!({"error": "InternalError"})), 122 ) 123 .into_response(); 124 } 125 }; 126 127 let mut followed_dids: Vec<String> = Vec::new(); 128 for cid_str in follow_cids { 129 let cid = match cid_str.parse::<cid::Cid>() { 130 Ok(c) => c, 131 Err(_) => continue, 132 }; 133 134 let block_bytes = match state.block_store.get(&cid).await { 135 Ok(Some(b)) => b, 136 _ => continue, 137 }; 138 139 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 140 Ok(v) => v, 141 Err(_) => continue, 142 }; 143 144 if let Some(subject) = record.get("subject").and_then(|s| s.as_str()) { 145 followed_dids.push(subject.to_string()); 146 } 147 } 148 149 if followed_dids.is_empty() { 150 return ( 151 StatusCode::OK, 152 Json(TimelineOutput { 153 feed: vec![], 154 cursor: None, 155 }), 156 ) 157 .into_response(); 158 } 159 160 let placeholders: Vec<String> = followed_dids 161 .iter() 162 .enumerate() 163 .map(|(i, _)| format!("${}", i + 1)) 164 .collect(); 165 166 let posts_query = format!( 167 "SELECT r.record_cid, r.rkey, r.created_at, u.did, u.handle 168 FROM records r 169 JOIN repos rp ON r.repo_id = rp.user_id 170 JOIN users u ON rp.user_id = u.id 171 WHERE u.did IN ({}) AND r.collection = 'app.bsky.feed.post' 172 ORDER BY r.created_at DESC 173 LIMIT 50", 174 placeholders.join(", ") 175 ); 176 177 let mut query = sqlx::query(&posts_query); 178 for did in &followed_dids { 179 query = query.bind(did); 180 } 181 182 let posts_result = query.fetch_all(&state.db).await; 183 184 let posts = match posts_result { 185 Ok(rows) => rows, 186 Err(e) => { 187 error!("Failed to get posts: {:?}", e); 188 return ( 189 StatusCode::INTERNAL_SERVER_ERROR, 190 Json(json!({"error": "InternalError"})), 191 ) 192 .into_response(); 193 } 194 }; 195 196 let mut feed: Vec<FeedViewPost> = Vec::new(); 197 198 for row in posts { 199 let record_cid: String = row.get("record_cid"); 200 let rkey: String = row.get("rkey"); 201 let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at"); 202 let author_did: String = row.get("did"); 203 let author_handle: String = row.get("handle"); 204 205 let cid = match record_cid.parse::<cid::Cid>() { 206 Ok(c) => c, 207 Err(_) => continue, 208 }; 209 210 let block_bytes = match state.block_store.get(&cid).await { 211 Ok(Some(b)) => b, 212 _ => continue, 213 }; 214 215 let record: Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 216 Ok(v) => v, 217 Err(_) => continue, 218 }; 219 220 let uri = format!("at://{}/app.bsky.feed.post/{}", author_did, rkey); 221 222 feed.push(FeedViewPost { 223 post: PostView { 224 uri, 225 cid: record_cid, 226 author: AuthorView { 227 did: author_did, 228 handle: author_handle, 229 }, 230 record, 231 indexed_at: created_at.to_rfc3339(), 232 }, 233 }); 234 } 235 236 (StatusCode::OK, Json(TimelineOutput { feed, cursor: None })).into_response() 237}