this repo has no description

Half-ass attempt at the local-first appview endpoints like ref impl

lewis b25a102f 61dcea2c

+23
.sqlx/query-5f02d646eb60f99f5cc1ae7b8b41e62d053a6b9f8e9452d5cef3526b8aef8288.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT 1 as val FROM records WHERE repo_id = $1 AND repo_rev <= $2 LIMIT 1", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "val", 9 + "type_info": "Int4" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Uuid", 15 + "Text" 16 + ] 17 + }, 18 + "nullable": [ 19 + null 20 + ] 21 + }, 22 + "hash": "5f02d646eb60f99f5cc1ae7b8b41e62d053a6b9f8e9452d5cef3526b8aef8288" 23 + }
+2 -2
.sqlx/query-7bb1388dec372fe749462cd9b604e5802b770aeb110462208988141d31c86c92.json .sqlx/query-36001fc127d7a3ea4e53e43a559cd86107e74d02ddcc499afd81049ce3c6789b.json
··· 1 1 { 2 2 "db_name": "PostgreSQL", 3 - "query": "SELECT k.key_bytes, k.encryption_version FROM user_keys k JOIN users u ON k.user_id = u.id WHERE u.did = $1", 3 + "query": "SELECT key_bytes, encryption_version FROM user_keys k JOIN users u ON k.user_id = u.id WHERE u.did = $1", 4 4 "describe": { 5 5 "columns": [ 6 6 { ··· 24 24 true 25 25 ] 26 26 }, 27 - "hash": "7bb1388dec372fe749462cd9b604e5802b770aeb110462208988141d31c86c92" 27 + "hash": "36001fc127d7a3ea4e53e43a559cd86107e74d02ddcc499afd81049ce3c6789b" 28 28 }
+18
.sqlx/query-8a9e71f04ec779d5c10d79582cc398529e01be01a83898df3524bb35e3d2ed14.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) VALUES ($1, $2, $3, $4, $5)\n ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, repo_rev = $5, created_at = NOW()", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "Text", 10 + "Text", 11 + "Text", 12 + "Text" 13 + ] 14 + }, 15 + "nullable": [] 16 + }, 17 + "hash": "8a9e71f04ec779d5c10d79582cc398529e01be01a83898df3524bb35e3d2ed14" 18 + }
+2 -2
.sqlx/query-bf60faafb5c79a149ba237a984f78d068b5d691f6762641412a5aa1517605c04.json .sqlx/query-a3e7b0c0861eaf62dda8b3a2ea5573bbb64eef74473f2b73cb38e2948cb3d7cc.json
··· 1 1 { 2 2 "db_name": "PostgreSQL", 3 - "query": "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow'", 3 + "query": "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000", 4 4 "describe": { 5 5 "columns": [ 6 6 { ··· 18 18 false 19 19 ] 20 20 }, 21 - "hash": "bf60faafb5c79a149ba237a984f78d068b5d691f6762641412a5aa1517605c04" 21 + "hash": "a3e7b0c0861eaf62dda8b3a2ea5573bbb64eef74473f2b73cb38e2948cb3d7cc" 22 22 }
-17
.sqlx/query-c61fc3b2fbdf6891269908ef21f13dcabdc3b032e9f767becae34ca176df18b6.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)\n ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Uuid", 9 - "Text", 10 - "Text", 11 - "Text" 12 - ] 13 - }, 14 - "nullable": [] 15 - }, 16 - "hash": "c61fc3b2fbdf6891269908ef21f13dcabdc3b032e9f767becae34ca176df18b6" 17 - }
+47
.sqlx/query-f3f1634b4f03a4c365afa02c4504de758bc420f49a19092d5cd1c526c7c7461e.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT record_cid, collection, rkey, created_at, repo_rev\n FROM records\n WHERE repo_id = $1 AND repo_rev > $2\n ORDER BY repo_rev ASC\n LIMIT 10\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "record_cid", 9 + "type_info": "Text" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "collection", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "rkey", 19 + "type_info": "Text" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "created_at", 24 + "type_info": "Timestamptz" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "repo_rev", 29 + "type_info": "Text" 30 + } 31 + ], 32 + "parameters": { 33 + "Left": [ 34 + "Uuid", 35 + "Text" 36 + ] 37 + }, 38 + "nullable": [ 39 + false, 40 + false, 41 + false, 42 + false, 43 + true 44 + ] 45 + }, 46 + "hash": "f3f1634b4f03a4c365afa02c4504de758bc420f49a19092d5cd1c526c7c7461e" 47 + }
+7 -7
TODO.md
··· 168 168 - [x] Implement `app.bsky.actor.getProfiles` (PDS-level with proxy fallback). 169 169 170 170 ### Feed (`app.bsky.feed`) 171 - These are implemented at PDS level to enable local-first reads: 172 - - [ ] Implement `app.bsky.feed.getTimeline` (PDS-level with proxy). 173 - - [ ] Implement `app.bsky.feed.getAuthorFeed` (PDS-level with proxy). 174 - - [ ] Implement `app.bsky.feed.getActorLikes` (PDS-level with proxy). 175 - - [ ] Implement `app.bsky.feed.getPostThread` (PDS-level with proxy). 176 - - [ ] Implement `app.bsky.feed.getFeed` (PDS-level with proxy). 171 + These are implemented at PDS level to enable local-first reads (read-after-write pattern): 172 + - [x] Implement `app.bsky.feed.getTimeline` (PDS-level with proxy + RAW). 173 + - [x] Implement `app.bsky.feed.getAuthorFeed` (PDS-level with proxy + RAW). 174 + - [x] Implement `app.bsky.feed.getActorLikes` (PDS-level with proxy + RAW). 175 + - [x] Implement `app.bsky.feed.getPostThread` (PDS-level with proxy + RAW + NotFound handling). 176 + - [x] Implement `app.bsky.feed.getFeed` (proxy to feed generator). 177 177 178 178 ### Notification (`app.bsky.notification`) 179 - - [ ] Implement `app.bsky.notification.registerPush` (push notification registration). 179 + - [x] Implement `app.bsky.notification.registerPush` (push notification registration, proxied). 180 180 181 181 ## Deprecated Sync Endpoints (for compatibility) 182 182 - [ ] Implement `com.atproto.sync.getCheckout` (deprecated, still needed for compatibility).
+2
migrations/202512211600_add_repo_rev.sql
··· 1 + ALTER TABLE records ADD COLUMN repo_rev TEXT; 2 + CREATE INDEX idx_records_repo_rev ON records(repo_rev);
+24 -8
src/api/actor/profile.rs
··· 125 125 ) -> Response { 126 126 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 127 127 128 - let auth_did = auth_header.and_then(|h| { 129 - let token = crate::auth::extract_bearer_token_from_header(Some(h))?; 130 - crate::auth::get_did_from_token(&token).ok() 131 - }); 128 + let auth_did = if let Some(h) = auth_header { 129 + if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) { 130 + match crate::auth::validate_bearer_token(&state.db, &token).await { 131 + Ok(user) => Some(user.did), 132 + Err(_) => None, 133 + } 134 + } else { 135 + None 136 + } 137 + } else { 138 + None 139 + }; 132 140 133 141 let mut query_params = HashMap::new(); 134 142 query_params.insert("actor".to_string(), params.actor.clone()); ··· 167 175 ) -> Response { 168 176 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 169 177 170 - let auth_did = auth_header.and_then(|h| { 171 - let token = crate::auth::extract_bearer_token_from_header(Some(h))?; 172 - crate::auth::get_did_from_token(&token).ok() 173 - }); 178 + let auth_did = if let Some(h) = auth_header { 179 + if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) { 180 + match crate::auth::validate_bearer_token(&state.db, &token).await { 181 + Ok(user) => Some(user.did), 182 + Err(_) => None, 183 + } 184 + } else { 185 + None 186 + } 187 + } else { 188 + None 189 + }; 174 190 175 191 let mut query_params = HashMap::new(); 176 192 query_params.insert("actors".to_string(), params.actors.clone());
+33 -4
src/api/error.rs
··· 44 44 InvitesDisabled, 45 45 DatabaseError, 46 46 UpstreamFailure, 47 + UpstreamTimeout, 48 + UpstreamUnavailable(String), 49 + UpstreamError { status: u16, error: Option<String>, message: Option<String> }, 47 50 } 48 51 49 52 impl ApiError { 50 53 fn status_code(&self) -> StatusCode { 51 54 match self { 52 - Self::InternalError | Self::DatabaseError | Self::UpstreamFailure => { 53 - StatusCode::INTERNAL_SERVER_ERROR 55 + Self::InternalError | Self::DatabaseError => StatusCode::INTERNAL_SERVER_ERROR, 56 + Self::UpstreamFailure | Self::UpstreamUnavailable(_) => StatusCode::BAD_GATEWAY, 57 + Self::UpstreamTimeout => StatusCode::GATEWAY_TIMEOUT, 58 + Self::UpstreamError { status, .. } => { 59 + StatusCode::from_u16(*status).unwrap_or(StatusCode::BAD_GATEWAY) 54 60 } 55 61 Self::AuthenticationRequired 56 62 | Self::AuthenticationFailed ··· 83 89 84 90 fn error_name(&self) -> &'static str { 85 91 match self { 86 - Self::InternalError | Self::DatabaseError | Self::UpstreamFailure => "InternalError", 92 + Self::InternalError | Self::DatabaseError => "InternalError", 93 + Self::UpstreamFailure | Self::UpstreamUnavailable(_) => "UpstreamFailure", 94 + Self::UpstreamTimeout => "UpstreamTimeout", 95 + Self::UpstreamError { error, .. } => { 96 + if let Some(e) = error { 97 + return Box::leak(e.clone().into_boxed_str()); 98 + } 99 + "UpstreamError" 100 + } 87 101 Self::AuthenticationRequired => "AuthenticationRequired", 88 102 Self::AuthenticationFailed | Self::AuthenticationFailedMsg(_) => "AuthenticationFailed", 89 103 Self::InvalidToken => "InvalidToken", ··· 116 130 Self::AuthenticationFailedMsg(msg) 117 131 | Self::ExpiredTokenMsg(msg) 118 132 | Self::InvalidRequest(msg) 119 - | Self::RepoNotFoundMsg(msg) => Some(msg.clone()), 133 + | Self::RepoNotFoundMsg(msg) 134 + | Self::UpstreamUnavailable(msg) => Some(msg.clone()), 135 + Self::UpstreamError { message, .. } => message.clone(), 136 + Self::UpstreamTimeout => Some("Upstream service timed out".to_string()), 120 137 _ => None, 121 138 } 139 + } 140 + 141 + pub fn from_upstream_response( 142 + status: u16, 143 + body: &[u8], 144 + ) -> Self { 145 + if let Ok(parsed) = serde_json::from_slice::<serde_json::Value>(body) { 146 + let error = parsed.get("error").and_then(|v| v.as_str()).map(String::from); 147 + let message = parsed.get("message").and_then(|v| v.as_str()).map(String::from); 148 + return Self::UpstreamError { status, error, message }; 149 + } 150 + Self::UpstreamError { status, error: None, message: None } 122 151 } 123 152 } 124 153
+158
src/api/feed/actor_likes.rs
··· 1 + use crate::api::read_after_write::{ 2 + extract_repo_rev, format_munged_response, get_local_lag, get_records_since_rev, 3 + proxy_to_appview, FeedOutput, FeedViewPost, LikeRecord, PostView, RecordDescript, 4 + }; 5 + use crate::state::AppState; 6 + use axum::{ 7 + extract::{Query, State}, 8 + http::StatusCode, 9 + response::{IntoResponse, Response}, 10 + Json, 11 + }; 12 + use serde::Deserialize; 13 + use serde_json::Value; 14 + use std::collections::HashMap; 15 + use tracing::warn; 16 + 17 + #[derive(Deserialize)] 18 + pub struct GetActorLikesParams { 19 + pub actor: String, 20 + pub limit: Option<u32>, 21 + pub cursor: Option<String>, 22 + } 23 + 24 + fn insert_likes_into_feed(feed: &mut Vec<FeedViewPost>, likes: &[RecordDescript<LikeRecord>]) { 25 + for like in likes { 26 + let like_time = &like.indexed_at.to_rfc3339(); 27 + let idx = feed 28 + .iter() 29 + .position(|fi| &fi.post.indexed_at < like_time) 30 + .unwrap_or(feed.len()); 31 + 32 + let placeholder_post = PostView { 33 + uri: like.record.subject.uri.clone(), 34 + cid: like.record.subject.cid.clone(), 35 + author: crate::api::read_after_write::AuthorView { 36 + did: String::new(), 37 + handle: String::new(), 38 + display_name: None, 39 + avatar: None, 40 + extra: HashMap::new(), 41 + }, 42 + record: Value::Null, 43 + indexed_at: like.indexed_at.to_rfc3339(), 44 + embed: None, 45 + reply_count: 0, 46 + repost_count: 0, 47 + like_count: 0, 48 + quote_count: 0, 49 + extra: HashMap::new(), 50 + }; 51 + 52 + feed.insert( 53 + idx, 54 + FeedViewPost { 55 + post: placeholder_post, 56 + reply: None, 57 + reason: None, 58 + feed_context: None, 59 + extra: HashMap::new(), 60 + }, 61 + ); 62 + } 63 + } 64 + 65 + pub async fn get_actor_likes( 66 + State(state): State<AppState>, 67 + headers: axum::http::HeaderMap, 68 + Query(params): Query<GetActorLikesParams>, 69 + ) -> Response { 70 + let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 71 + 72 + let auth_did = if let Some(h) = auth_header { 73 + if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) { 74 + match crate::auth::validate_bearer_token(&state.db, &token).await { 75 + Ok(user) => Some(user.did), 76 + Err(_) => None, 77 + } 78 + } else { 79 + None 80 + } 81 + } else { 82 + None 83 + }; 84 + 85 + let mut query_params = HashMap::new(); 86 + query_params.insert("actor".to_string(), params.actor.clone()); 87 + if let Some(limit) = params.limit { 88 + query_params.insert("limit".to_string(), limit.to_string()); 89 + } 90 + if let Some(cursor) = &params.cursor { 91 + query_params.insert("cursor".to_string(), cursor.clone()); 92 + } 93 + 94 + let proxy_result = 95 + match proxy_to_appview("app.bsky.feed.getActorLikes", &query_params, auth_header).await { 96 + Ok(r) => r, 97 + Err(e) => return e, 98 + }; 99 + 100 + if !proxy_result.status.is_success() { 101 + return (proxy_result.status, proxy_result.body).into_response(); 102 + } 103 + 104 + let rev = match extract_repo_rev(&proxy_result.headers) { 105 + Some(r) => r, 106 + None => return (proxy_result.status, proxy_result.body).into_response(), 107 + }; 108 + 109 + let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 110 + Ok(f) => f, 111 + Err(e) => { 112 + warn!("Failed to parse actor likes response: {:?}", e); 113 + return (proxy_result.status, proxy_result.body).into_response(); 114 + } 115 + }; 116 + 117 + let requester_did = match auth_did { 118 + Some(d) => d, 119 + None => return (StatusCode::OK, Json(feed_output)).into_response(), 120 + }; 121 + 122 + let actor_did = if params.actor.starts_with("did:") { 123 + params.actor.clone() 124 + } else { 125 + match sqlx::query_scalar!("SELECT did FROM users WHERE handle = $1", params.actor) 126 + .fetch_optional(&state.db) 127 + .await 128 + { 129 + Ok(Some(did)) => did, 130 + Ok(None) => return (StatusCode::OK, Json(feed_output)).into_response(), 131 + Err(e) => { 132 + warn!("Database error resolving actor handle: {:?}", e); 133 + return (proxy_result.status, proxy_result.body).into_response(); 134 + } 135 + } 136 + }; 137 + 138 + if actor_did != requester_did { 139 + return (StatusCode::OK, Json(feed_output)).into_response(); 140 + } 141 + 142 + let local_records = match get_records_since_rev(&state, &requester_did, &rev).await { 143 + Ok(r) => r, 144 + Err(e) => { 145 + warn!("Failed to get local records: {}", e); 146 + return (proxy_result.status, proxy_result.body).into_response(); 147 + } 148 + }; 149 + 150 + if local_records.likes.is_empty() { 151 + return (StatusCode::OK, Json(feed_output)).into_response(); 152 + } 153 + 154 + insert_likes_into_feed(&mut feed_output.feed, &local_records.likes); 155 + 156 + let lag = get_local_lag(&local_records); 157 + format_munged_response(feed_output, lag) 158 + }
+169
src/api/feed/author_feed.rs
··· 1 + use crate::api::read_after_write::{ 2 + extract_repo_rev, format_local_post, format_munged_response, get_local_lag, 3 + get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost, 4 + ProfileRecord, RecordDescript, 5 + }; 6 + use crate::state::AppState; 7 + use axum::{ 8 + extract::{Query, State}, 9 + http::StatusCode, 10 + response::{IntoResponse, Response}, 11 + Json, 12 + }; 13 + use serde::Deserialize; 14 + use std::collections::HashMap; 15 + use tracing::warn; 16 + 17 + #[derive(Deserialize)] 18 + pub struct GetAuthorFeedParams { 19 + pub actor: String, 20 + pub limit: Option<u32>, 21 + pub cursor: Option<String>, 22 + pub filter: Option<String>, 23 + #[serde(rename = "includePins")] 24 + pub include_pins: Option<bool>, 25 + } 26 + 27 + fn update_author_profile_in_feed( 28 + feed: &mut [FeedViewPost], 29 + author_did: &str, 30 + local_profile: &RecordDescript<ProfileRecord>, 31 + ) { 32 + for item in feed.iter_mut() { 33 + if item.post.author.did == author_did { 34 + if let Some(ref display_name) = local_profile.record.display_name { 35 + item.post.author.display_name = Some(display_name.clone()); 36 + } 37 + } 38 + } 39 + } 40 + 41 + pub async fn get_author_feed( 42 + State(state): State<AppState>, 43 + headers: axum::http::HeaderMap, 44 + Query(params): Query<GetAuthorFeedParams>, 45 + ) -> Response { 46 + let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 47 + 48 + let auth_did = if let Some(h) = auth_header { 49 + if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) { 50 + match crate::auth::validate_bearer_token(&state.db, &token).await { 51 + Ok(user) => Some(user.did), 52 + Err(_) => None, 53 + } 54 + } else { 55 + None 56 + } 57 + } else { 58 + None 59 + }; 60 + 61 + let mut query_params = HashMap::new(); 62 + query_params.insert("actor".to_string(), params.actor.clone()); 63 + if let Some(limit) = params.limit { 64 + query_params.insert("limit".to_string(), limit.to_string()); 65 + } 66 + if let Some(cursor) = &params.cursor { 67 + query_params.insert("cursor".to_string(), cursor.clone()); 68 + } 69 + if let Some(filter) = &params.filter { 70 + query_params.insert("filter".to_string(), filter.clone()); 71 + } 72 + if let Some(include_pins) = params.include_pins { 73 + query_params.insert("includePins".to_string(), include_pins.to_string()); 74 + } 75 + 76 + let proxy_result = 77 + match proxy_to_appview("app.bsky.feed.getAuthorFeed", &query_params, auth_header).await { 78 + Ok(r) => r, 79 + Err(e) => return e, 80 + }; 81 + 82 + if !proxy_result.status.is_success() { 83 + return (proxy_result.status, proxy_result.body).into_response(); 84 + } 85 + 86 + let rev = match extract_repo_rev(&proxy_result.headers) { 87 + Some(r) => r, 88 + None => return (proxy_result.status, proxy_result.body).into_response(), 89 + }; 90 + 91 + let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 92 + Ok(f) => f, 93 + Err(e) => { 94 + warn!("Failed to parse author feed response: {:?}", e); 95 + return (proxy_result.status, proxy_result.body).into_response(); 96 + } 97 + }; 98 + 99 + let requester_did = match auth_did { 100 + Some(d) => d, 101 + None => return (StatusCode::OK, Json(feed_output)).into_response(), 102 + }; 103 + 104 + let actor_did = if params.actor.starts_with("did:") { 105 + params.actor.clone() 106 + } else { 107 + match sqlx::query_scalar!("SELECT did FROM users WHERE handle = $1", params.actor) 108 + .fetch_optional(&state.db) 109 + .await 110 + { 111 + Ok(Some(did)) => did, 112 + Ok(None) => return (StatusCode::OK, Json(feed_output)).into_response(), 113 + Err(e) => { 114 + warn!("Database error resolving actor handle: {:?}", e); 115 + return (proxy_result.status, proxy_result.body).into_response(); 116 + } 117 + } 118 + }; 119 + 120 + if actor_did != requester_did { 121 + return (StatusCode::OK, Json(feed_output)).into_response(); 122 + } 123 + 124 + let local_records = match get_records_since_rev(&state, &requester_did, &rev).await { 125 + Ok(r) => r, 126 + Err(e) => { 127 + warn!("Failed to get local records: {}", e); 128 + return (proxy_result.status, proxy_result.body).into_response(); 129 + } 130 + }; 131 + 132 + if local_records.count == 0 { 133 + return (StatusCode::OK, Json(feed_output)).into_response(); 134 + } 135 + 136 + let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", requester_did) 137 + .fetch_optional(&state.db) 138 + .await 139 + { 140 + Ok(Some(h)) => h, 141 + Ok(None) => requester_did.clone(), 142 + Err(e) => { 143 + warn!("Database error fetching handle: {:?}", e); 144 + requester_did.clone() 145 + } 146 + }; 147 + 148 + if let Some(ref local_profile) = local_records.profile { 149 + update_author_profile_in_feed(&mut feed_output.feed, &requester_did, local_profile); 150 + } 151 + 152 + let local_posts: Vec<_> = local_records 153 + .posts 154 + .iter() 155 + .map(|p| { 156 + format_local_post( 157 + p, 158 + &requester_did, 159 + &handle, 160 + local_records.profile.as_ref(), 161 + ) 162 + }) 163 + .collect(); 164 + 165 + insert_posts_into_feed(&mut feed_output.feed, local_posts); 166 + 167 + let lag = get_local_lag(&local_records); 168 + format_munged_response(feed_output, lag) 169 + }
+131
src/api/feed/custom_feed.rs
··· 1 + use crate::api::proxy_client::{ 2 + is_ssrf_safe, proxy_client, validate_at_uri, validate_limit, MAX_RESPONSE_SIZE, 3 + }; 4 + use crate::api::ApiError; 5 + use crate::state::AppState; 6 + use axum::{ 7 + extract::{Query, State}, 8 + http::StatusCode, 9 + response::{IntoResponse, Response}, 10 + }; 11 + use serde::Deserialize; 12 + use std::collections::HashMap; 13 + use tracing::{error, info}; 14 + 15 + #[derive(Deserialize)] 16 + pub struct GetFeedParams { 17 + pub feed: String, 18 + pub limit: Option<u32>, 19 + pub cursor: Option<String>, 20 + } 21 + 22 + pub async fn get_feed( 23 + State(state): State<AppState>, 24 + headers: axum::http::HeaderMap, 25 + Query(params): Query<GetFeedParams>, 26 + ) -> Response { 27 + let token = match crate::auth::extract_bearer_token_from_header( 28 + headers.get("Authorization").and_then(|h| h.to_str().ok()), 29 + ) { 30 + Some(t) => t, 31 + None => return ApiError::AuthenticationRequired.into_response(), 32 + }; 33 + 34 + if let Err(e) = crate::auth::validate_bearer_token(&state.db, &token).await { 35 + return ApiError::from(e).into_response(); 36 + }; 37 + 38 + if let Err(e) = validate_at_uri(&params.feed) { 39 + return ApiError::InvalidRequest(format!("Invalid feed URI: {}", e)).into_response(); 40 + } 41 + 42 + let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 43 + 44 + let appview_url = match std::env::var("APPVIEW_URL") { 45 + Ok(url) => url, 46 + Err(_) => { 47 + return ApiError::UpstreamUnavailable("No upstream AppView configured".to_string()) 48 + .into_response(); 49 + } 50 + }; 51 + 52 + if let Err(e) = is_ssrf_safe(&appview_url) { 53 + error!("SSRF check failed for appview URL: {}", e); 54 + return ApiError::UpstreamUnavailable(format!("Invalid upstream URL: {}", e)) 55 + .into_response(); 56 + } 57 + 58 + let limit = validate_limit(params.limit, 50, 100); 59 + let mut query_params = HashMap::new(); 60 + query_params.insert("feed".to_string(), params.feed.clone()); 61 + query_params.insert("limit".to_string(), limit.to_string()); 62 + if let Some(cursor) = &params.cursor { 63 + query_params.insert("cursor".to_string(), cursor.clone()); 64 + } 65 + 66 + let target_url = format!("{}/xrpc/app.bsky.feed.getFeed", appview_url); 67 + info!(target = %target_url, feed = %params.feed, "Proxying getFeed request"); 68 + 69 + let client = proxy_client(); 70 + let mut request_builder = client.get(&target_url).query(&query_params); 71 + 72 + if let Some(auth) = auth_header { 73 + request_builder = request_builder.header("Authorization", auth); 74 + } 75 + 76 + match request_builder.send().await { 77 + Ok(resp) => { 78 + let status = 79 + StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY); 80 + 81 + let content_length = resp.content_length().unwrap_or(0); 82 + if content_length > MAX_RESPONSE_SIZE { 83 + error!( 84 + content_length, 85 + max = MAX_RESPONSE_SIZE, 86 + "getFeed response too large" 87 + ); 88 + return ApiError::UpstreamFailure.into_response(); 89 + } 90 + 91 + let resp_headers = resp.headers().clone(); 92 + let body = match resp.bytes().await { 93 + Ok(b) => { 94 + if b.len() as u64 > MAX_RESPONSE_SIZE { 95 + error!(len = b.len(), "getFeed response body exceeded limit"); 96 + return ApiError::UpstreamFailure.into_response(); 97 + } 98 + b 99 + } 100 + Err(e) => { 101 + error!(error = ?e, "Error reading getFeed response"); 102 + return ApiError::UpstreamFailure.into_response(); 103 + } 104 + }; 105 + 106 + let mut response_builder = axum::response::Response::builder().status(status); 107 + if let Some(ct) = resp_headers.get("content-type") { 108 + response_builder = response_builder.header("content-type", ct); 109 + } 110 + 111 + match response_builder.body(axum::body::Body::from(body)) { 112 + Ok(r) => r, 113 + Err(e) => { 114 + error!(error = ?e, "Error building getFeed response"); 115 + ApiError::UpstreamFailure.into_response() 116 + } 117 + } 118 + } 119 + Err(e) => { 120 + error!(error = ?e, "Error proxying getFeed"); 121 + if e.is_timeout() { 122 + ApiError::UpstreamTimeout.into_response() 123 + } else if e.is_connect() { 124 + ApiError::UpstreamUnavailable("Failed to connect to upstream".to_string()) 125 + .into_response() 126 + } else { 127 + ApiError::UpstreamFailure.into_response() 128 + } 129 + } 130 + } 131 + }
+8
src/api/feed/mod.rs
··· 1 + mod actor_likes; 2 + mod author_feed; 3 + mod custom_feed; 4 + mod post_thread; 1 5 mod timeline; 2 6 7 + pub use actor_likes::get_actor_likes; 8 + pub use author_feed::get_author_feed; 9 + pub use custom_feed::get_feed; 10 + pub use post_thread::get_post_thread; 3 11 pub use timeline::get_timeline;
+322
src/api/feed/post_thread.rs
··· 1 + use crate::api::read_after_write::{ 2 + extract_repo_rev, format_local_post, format_munged_response, get_local_lag, 3 + get_records_since_rev, proxy_to_appview, PostRecord, PostView, RecordDescript, 4 + }; 5 + use crate::state::AppState; 6 + use axum::{ 7 + extract::{Query, State}, 8 + http::StatusCode, 9 + response::{IntoResponse, Response}, 10 + Json, 11 + }; 12 + use serde::{Deserialize, Serialize}; 13 + use serde_json::{json, Value}; 14 + use std::collections::HashMap; 15 + use tracing::warn; 16 + 17 + #[derive(Deserialize)] 18 + pub struct GetPostThreadParams { 19 + pub uri: String, 20 + pub depth: Option<u32>, 21 + #[serde(rename = "parentHeight")] 22 + pub parent_height: Option<u32>, 23 + } 24 + 25 + #[derive(Debug, Clone, Serialize, Deserialize)] 26 + #[serde(rename_all = "camelCase")] 27 + pub struct ThreadViewPost { 28 + #[serde(rename = "$type")] 29 + pub thread_type: Option<String>, 30 + pub post: PostView, 31 + #[serde(skip_serializing_if = "Option::is_none")] 32 + pub parent: Option<Box<ThreadNode>>, 33 + #[serde(skip_serializing_if = "Option::is_none")] 34 + pub replies: Option<Vec<ThreadNode>>, 35 + #[serde(flatten)] 36 + pub extra: HashMap<String, Value>, 37 + } 38 + 39 + #[derive(Debug, Clone, Serialize, Deserialize)] 40 + #[serde(untagged)] 41 + pub enum ThreadNode { 42 + Post(ThreadViewPost), 43 + NotFound(ThreadNotFound), 44 + Blocked(ThreadBlocked), 45 + } 46 + 47 + #[derive(Debug, Clone, Serialize, Deserialize)] 48 + #[serde(rename_all = "camelCase")] 49 + pub struct ThreadNotFound { 50 + #[serde(rename = "$type")] 51 + pub thread_type: String, 52 + pub uri: String, 53 + pub not_found: bool, 54 + } 55 + 56 + #[derive(Debug, Clone, Serialize, Deserialize)] 57 + #[serde(rename_all = "camelCase")] 58 + pub struct ThreadBlocked { 59 + #[serde(rename = "$type")] 60 + pub thread_type: String, 61 + pub uri: String, 62 + pub blocked: bool, 63 + pub author: Value, 64 + } 65 + 66 + #[derive(Debug, Clone, Serialize, Deserialize)] 67 + pub struct PostThreadOutput { 68 + pub thread: ThreadNode, 69 + #[serde(skip_serializing_if = "Option::is_none")] 70 + pub threadgate: Option<Value>, 71 + } 72 + 73 + const MAX_THREAD_DEPTH: usize = 10; 74 + 75 + fn add_replies_to_thread( 76 + thread: &mut ThreadViewPost, 77 + local_posts: &[RecordDescript<PostRecord>], 78 + author_did: &str, 79 + author_handle: &str, 80 + depth: usize, 81 + ) { 82 + if depth >= MAX_THREAD_DEPTH { 83 + return; 84 + } 85 + 86 + let thread_uri = &thread.post.uri; 87 + 88 + let replies: Vec<_> = local_posts 89 + .iter() 90 + .filter(|p| { 91 + p.record 92 + .reply 93 + .as_ref() 94 + .and_then(|r| r.get("parent")) 95 + .and_then(|parent| parent.get("uri")) 96 + .and_then(|u| u.as_str()) 97 + == Some(thread_uri) 98 + }) 99 + .map(|p| { 100 + let post_view = format_local_post(p, author_did, author_handle, None); 101 + ThreadNode::Post(ThreadViewPost { 102 + thread_type: Some("app.bsky.feed.defs#threadViewPost".to_string()), 103 + post: post_view, 104 + parent: None, 105 + replies: None, 106 + extra: HashMap::new(), 107 + }) 108 + }) 109 + .collect(); 110 + 111 + if !replies.is_empty() { 112 + match &mut thread.replies { 113 + Some(existing) => existing.extend(replies), 114 + None => thread.replies = Some(replies), 115 + } 116 + } 117 + 118 + if let Some(ref mut existing_replies) = thread.replies { 119 + for reply in existing_replies.iter_mut() { 120 + if let ThreadNode::Post(reply_thread) = reply { 121 + add_replies_to_thread(reply_thread, local_posts, author_did, author_handle, depth + 1); 122 + } 123 + } 124 + } 125 + } 126 + 127 + pub async fn get_post_thread( 128 + State(state): State<AppState>, 129 + headers: axum::http::HeaderMap, 130 + Query(params): Query<GetPostThreadParams>, 131 + ) -> Response { 132 + let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 133 + 134 + let auth_did = if let Some(h) = auth_header { 135 + if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) { 136 + match crate::auth::validate_bearer_token(&state.db, &token).await { 137 + Ok(user) => Some(user.did), 138 + Err(_) => None, 139 + } 140 + } else { 141 + None 142 + } 143 + } else { 144 + None 145 + }; 146 + 147 + let mut query_params = HashMap::new(); 148 + query_params.insert("uri".to_string(), params.uri.clone()); 149 + if let Some(depth) = params.depth { 150 + query_params.insert("depth".to_string(), depth.to_string()); 151 + } 152 + if let Some(parent_height) = params.parent_height { 153 + query_params.insert("parentHeight".to_string(), parent_height.to_string()); 154 + } 155 + 156 + let proxy_result = 157 + match proxy_to_appview("app.bsky.feed.getPostThread", &query_params, auth_header).await { 158 + Ok(r) => r, 159 + Err(e) => return e, 160 + }; 161 + 162 + if proxy_result.status == StatusCode::NOT_FOUND { 163 + return handle_not_found(&state, &params.uri, auth_did, &proxy_result.headers).await; 164 + } 165 + 166 + if !proxy_result.status.is_success() { 167 + return (proxy_result.status, proxy_result.body).into_response(); 168 + } 169 + 170 + let rev = match extract_repo_rev(&proxy_result.headers) { 171 + Some(r) => r, 172 + None => return (proxy_result.status, proxy_result.body).into_response(), 173 + }; 174 + 175 + let mut thread_output: PostThreadOutput = match serde_json::from_slice(&proxy_result.body) { 176 + Ok(t) => t, 177 + Err(e) => { 178 + warn!("Failed to parse post thread response: {:?}", e); 179 + return (proxy_result.status, proxy_result.body).into_response(); 180 + } 181 + }; 182 + 183 + let requester_did = match auth_did { 184 + Some(d) => d, 185 + None => return (StatusCode::OK, Json(thread_output)).into_response(), 186 + }; 187 + 188 + let local_records = match get_records_since_rev(&state, &requester_did, &rev).await { 189 + Ok(r) => r, 190 + Err(e) => { 191 + warn!("Failed to get local records: {}", e); 192 + return (proxy_result.status, proxy_result.body).into_response(); 193 + } 194 + }; 195 + 196 + if local_records.posts.is_empty() { 197 + return (StatusCode::OK, Json(thread_output)).into_response(); 198 + } 199 + 200 + let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", requester_did) 201 + .fetch_optional(&state.db) 202 + .await 203 + { 204 + Ok(Some(h)) => h, 205 + Ok(None) => requester_did.clone(), 206 + Err(e) => { 207 + warn!("Database error fetching handle: {:?}", e); 208 + requester_did.clone() 209 + } 210 + }; 211 + 212 + if let ThreadNode::Post(ref mut thread_post) = thread_output.thread { 213 + add_replies_to_thread(thread_post, &local_records.posts, &requester_did, &handle, 0); 214 + } 215 + 216 + let lag = get_local_lag(&local_records); 217 + format_munged_response(thread_output, lag) 218 + } 219 + 220 + async fn handle_not_found( 221 + state: &AppState, 222 + uri: &str, 223 + auth_did: Option<String>, 224 + headers: &axum::http::HeaderMap, 225 + ) -> Response { 226 + let rev = match extract_repo_rev(headers) { 227 + Some(r) => r, 228 + None => { 229 + return ( 230 + StatusCode::NOT_FOUND, 231 + Json(json!({"error": "NotFound", "message": "Post not found"})), 232 + ) 233 + .into_response() 234 + } 235 + }; 236 + 237 + let requester_did = match auth_did { 238 + Some(d) => d, 239 + None => { 240 + return ( 241 + StatusCode::NOT_FOUND, 242 + Json(json!({"error": "NotFound", "message": "Post not found"})), 243 + ) 244 + .into_response() 245 + } 246 + }; 247 + 248 + let uri_parts: Vec<&str> = uri.trim_start_matches("at://").split('/').collect(); 249 + if uri_parts.len() != 3 { 250 + return ( 251 + StatusCode::NOT_FOUND, 252 + Json(json!({"error": "NotFound", "message": "Post not found"})), 253 + ) 254 + .into_response(); 255 + } 256 + 257 + let post_did = uri_parts[0]; 258 + if post_did != requester_did { 259 + return ( 260 + StatusCode::NOT_FOUND, 261 + Json(json!({"error": "NotFound", "message": "Post not found"})), 262 + ) 263 + .into_response(); 264 + } 265 + 266 + let local_records = match get_records_since_rev(state, &requester_did, &rev).await { 267 + Ok(r) => r, 268 + Err(_) => { 269 + return ( 270 + StatusCode::NOT_FOUND, 271 + Json(json!({"error": "NotFound", "message": "Post not found"})), 272 + ) 273 + .into_response() 274 + } 275 + }; 276 + 277 + let local_post = local_records.posts.iter().find(|p| p.uri == uri); 278 + 279 + let local_post = match local_post { 280 + Some(p) => p, 281 + None => { 282 + return ( 283 + StatusCode::NOT_FOUND, 284 + Json(json!({"error": "NotFound", "message": "Post not found"})), 285 + ) 286 + .into_response() 287 + } 288 + }; 289 + 290 + let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", requester_did) 291 + .fetch_optional(&state.db) 292 + .await 293 + { 294 + Ok(Some(h)) => h, 295 + Ok(None) => requester_did.clone(), 296 + Err(e) => { 297 + warn!("Database error fetching handle: {:?}", e); 298 + requester_did.clone() 299 + } 300 + }; 301 + 302 + let post_view = format_local_post( 303 + local_post, 304 + &requester_did, 305 + &handle, 306 + local_records.profile.as_ref(), 307 + ); 308 + 309 + let thread = PostThreadOutput { 310 + thread: ThreadNode::Post(ThreadViewPost { 311 + thread_type: Some("app.bsky.feed.defs#threadViewPost".to_string()), 312 + post: post_view, 313 + parent: None, 314 + replies: None, 315 + extra: HashMap::new(), 316 + }), 317 + threadgate: None, 318 + }; 319 + 320 + let lag = get_local_lag(&local_records); 321 + format_munged_response(thread, lag) 322 + }
+143 -48
src/api/feed/timeline.rs
··· 1 - // Yes, I know, this endpoint is an appview one, not for PDS. Who cares!! 2 - // Yes, this only gets posts that our DB/instance knows about. Who cares!!! 3 - 1 + use crate::api::read_after_write::{ 2 + extract_repo_rev, format_local_post, format_munged_response, get_local_lag, 3 + get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost, 4 + PostView, 5 + }; 4 6 use crate::state::AppState; 5 7 use axum::{ 6 - Json, 7 - extract::State, 8 + extract::{Query, State}, 8 9 http::StatusCode, 9 10 response::{IntoResponse, Response}, 11 + Json, 10 12 }; 11 13 use jacquard_repo::storage::BlockStore; 12 - use serde::Serialize; 13 - use serde_json::{Value, json}; 14 - use tracing::error; 14 + use serde::Deserialize; 15 + use serde_json::{json, Value}; 16 + use std::collections::HashMap; 17 + use tracing::warn; 15 18 16 - #[derive(Serialize)] 17 - pub struct TimelineOutput { 18 - pub feed: Vec<FeedViewPost>, 19 + #[derive(Deserialize)] 20 + pub struct GetTimelineParams { 21 + pub algorithm: Option<String>, 22 + pub limit: Option<u32>, 19 23 pub cursor: Option<String>, 20 24 } 21 25 22 - #[derive(Serialize)] 23 - pub struct FeedViewPost { 24 - pub post: PostView, 25 - } 26 - 27 - #[derive(Serialize)] 28 - #[serde(rename_all = "camelCase")] 29 - pub struct PostView { 30 - pub uri: String, 31 - pub cid: String, 32 - pub author: AuthorView, 33 - pub record: Value, 34 - pub indexed_at: String, 35 - } 36 - 37 - #[derive(Serialize)] 38 - pub struct AuthorView { 39 - pub did: String, 40 - pub handle: String, 41 - } 42 - 43 26 pub async fn get_timeline( 44 27 State(state): State<AppState>, 45 28 headers: axum::http::HeaderMap, 29 + Query(params): Query<GetTimelineParams>, 46 30 ) -> Response { 47 31 let token = match crate::auth::extract_bearer_token_from_header( 48 - headers.get("Authorization").and_then(|h| h.to_str().ok()) 32 + headers.get("Authorization").and_then(|h| h.to_str().ok()), 49 33 ) { 50 34 Some(t) => t, 51 35 None => { ··· 68 52 } 69 53 }; 70 54 71 - let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", auth_user.did) 55 + match std::env::var("APPVIEW_URL") { 56 + Ok(url) if !url.starts_with("http://127.0.0.1") => { 57 + return get_timeline_with_appview(&state, &headers, &params, &auth_user.did).await; 58 + } 59 + _ => {} 60 + } 61 + 62 + get_timeline_local_only(&state, &auth_user.did).await 63 + } 64 + 65 + async fn get_timeline_with_appview( 66 + state: &AppState, 67 + headers: &axum::http::HeaderMap, 68 + params: &GetTimelineParams, 69 + auth_did: &str, 70 + ) -> Response { 71 + let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 72 + 73 + let mut query_params = HashMap::new(); 74 + if let Some(algo) = &params.algorithm { 75 + query_params.insert("algorithm".to_string(), algo.clone()); 76 + } 77 + if let Some(limit) = params.limit { 78 + query_params.insert("limit".to_string(), limit.to_string()); 79 + } 80 + if let Some(cursor) = &params.cursor { 81 + query_params.insert("cursor".to_string(), cursor.clone()); 82 + } 83 + 84 + let proxy_result = 85 + match proxy_to_appview("app.bsky.feed.getTimeline", &query_params, auth_header).await { 86 + Ok(r) => r, 87 + Err(e) => return e, 88 + }; 89 + 90 + if !proxy_result.status.is_success() { 91 + return (proxy_result.status, proxy_result.body).into_response(); 92 + } 93 + 94 + let rev = extract_repo_rev(&proxy_result.headers); 95 + if rev.is_none() { 96 + return (proxy_result.status, proxy_result.body).into_response(); 97 + } 98 + let rev = rev.unwrap(); 99 + 100 + let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 101 + Ok(f) => f, 102 + Err(e) => { 103 + warn!("Failed to parse timeline response: {:?}", e); 104 + return (proxy_result.status, proxy_result.body).into_response(); 105 + } 106 + }; 107 + 108 + let local_records = match get_records_since_rev(state, auth_did, &rev).await { 109 + Ok(r) => r, 110 + Err(e) => { 111 + warn!("Failed to get local records: {}", e); 112 + return (proxy_result.status, proxy_result.body).into_response(); 113 + } 114 + }; 115 + 116 + if local_records.count == 0 { 117 + return (proxy_result.status, proxy_result.body).into_response(); 118 + } 119 + 120 + let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", auth_did) 72 121 .fetch_optional(&state.db) 73 - .await; 122 + .await 123 + { 124 + Ok(Some(h)) => h, 125 + Ok(None) => auth_did.to_string(), 126 + Err(e) => { 127 + warn!("Database error fetching handle: {:?}", e); 128 + auth_did.to_string() 129 + } 130 + }; 74 131 75 - let user_id = match user_query { 76 - Ok(Some(row)) => row.id, 77 - _ => { 132 + let local_posts: Vec<_> = local_records 133 + .posts 134 + .iter() 135 + .map(|p| format_local_post(p, auth_did, &handle, local_records.profile.as_ref())) 136 + .collect(); 137 + 138 + insert_posts_into_feed(&mut feed_output.feed, local_posts); 139 + 140 + let lag = get_local_lag(&local_records); 141 + format_munged_response(feed_output, lag) 142 + } 143 + 144 + async fn get_timeline_local_only(state: &AppState, auth_did: &str) -> Response { 145 + let user_id: uuid::Uuid = match sqlx::query_scalar!( 146 + "SELECT id FROM users WHERE did = $1", 147 + auth_did 148 + ) 149 + .fetch_optional(&state.db) 150 + .await 151 + { 152 + Ok(Some(id)) => id, 153 + Ok(None) => { 78 154 return ( 79 155 StatusCode::INTERNAL_SERVER_ERROR, 80 156 Json(json!({"error": "InternalError", "message": "User not found"})), 81 157 ) 82 158 .into_response(); 83 159 } 160 + Err(e) => { 161 + warn!("Database error fetching user: {:?}", e); 162 + return ( 163 + StatusCode::INTERNAL_SERVER_ERROR, 164 + Json(json!({"error": "InternalError", "message": "Database error"})), 165 + ) 166 + .into_response(); 167 + } 84 168 }; 85 169 86 170 let follows_query = sqlx::query!( 87 - "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow'", 171 + "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.graph.follow' LIMIT 5000", 88 172 user_id 89 173 ) 90 - .fetch_all(&state.db) 91 - .await; 174 + .fetch_all(&state.db) 175 + .await; 92 176 93 177 let follow_cids: Vec<String> = match follows_query { 94 178 Ok(rows) => rows.iter().map(|r| r.record_cid.clone()).collect(), 95 - Err(e) => { 96 - error!("Failed to get follows: {:?}", e); 179 + Err(_) => { 97 180 return ( 98 181 StatusCode::INTERNAL_SERVER_ERROR, 99 182 Json(json!({"error": "InternalError"})), ··· 127 210 if followed_dids.is_empty() { 128 211 return ( 129 212 StatusCode::OK, 130 - Json(TimelineOutput { 213 + Json(FeedOutput { 131 214 feed: vec![], 132 215 cursor: None, 133 216 }), ··· 150 233 151 234 let posts = match posts_result { 152 235 Ok(rows) => rows, 153 - Err(e) => { 154 - error!("Failed to get posts: {:?}", e); 236 + Err(_) => { 155 237 return ( 156 238 StatusCode::INTERNAL_SERVER_ERROR, 157 239 Json(json!({"error": "InternalError"})), ··· 190 272 post: PostView { 191 273 uri, 192 274 cid: record_cid, 193 - author: AuthorView { 275 + author: crate::api::read_after_write::AuthorView { 194 276 did: author_did, 195 277 handle: author_handle, 278 + display_name: None, 279 + avatar: None, 280 + extra: HashMap::new(), 196 281 }, 197 282 record, 198 283 indexed_at: created_at.to_rfc3339(), 284 + embed: None, 285 + reply_count: 0, 286 + repost_count: 0, 287 + like_count: 0, 288 + quote_count: 0, 289 + extra: HashMap::new(), 199 290 }, 291 + reply: None, 292 + reason: None, 293 + feed_context: None, 294 + extra: HashMap::new(), 200 295 }); 201 296 } 202 297 203 - (StatusCode::OK, Json(TimelineOutput { feed, cursor: None })).into_response() 298 + (StatusCode::OK, Json(FeedOutput { feed, cursor: None })).into_response() 204 299 }
+4
src/api/mod.rs
··· 4 4 pub mod feed; 5 5 pub mod identity; 6 6 pub mod moderation; 7 + pub mod notification; 7 8 pub mod proxy; 9 + pub mod proxy_client; 10 + pub mod read_after_write; 8 11 pub mod repo; 9 12 pub mod server; 10 13 pub mod validation; 11 14 12 15 pub use error::ApiError; 16 + pub use proxy_client::{proxy_client, validate_at_uri, validate_did, validate_limit, AtUriParts};
+3
src/api/notification/mod.rs
··· 1 + mod register_push; 2 + 3 + pub use register_push::register_push;
+166
src/api/notification/register_push.rs
··· 1 + use crate::api::proxy_client::{is_ssrf_safe, proxy_client, validate_did}; 2 + use crate::api::ApiError; 3 + use crate::state::AppState; 4 + use axum::{ 5 + extract::State, 6 + http::{HeaderMap, StatusCode}, 7 + response::{IntoResponse, Response}, 8 + Json, 9 + }; 10 + use serde::Deserialize; 11 + use serde_json::json; 12 + use tracing::{error, info}; 13 + 14 + #[derive(Deserialize)] 15 + #[serde(rename_all = "camelCase")] 16 + pub struct RegisterPushInput { 17 + pub service_did: String, 18 + pub token: String, 19 + pub platform: String, 20 + pub app_id: String, 21 + } 22 + 23 + const VALID_PLATFORMS: &[&str] = &["ios", "android", "web"]; 24 + 25 + pub async fn register_push( 26 + State(state): State<AppState>, 27 + headers: HeaderMap, 28 + Json(input): Json<RegisterPushInput>, 29 + ) -> Response { 30 + let token = match crate::auth::extract_bearer_token_from_header( 31 + headers.get("Authorization").and_then(|h| h.to_str().ok()), 32 + ) { 33 + Some(t) => t, 34 + None => return ApiError::AuthenticationRequired.into_response(), 35 + }; 36 + 37 + let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 38 + Ok(user) => user, 39 + Err(e) => return ApiError::from(e).into_response(), 40 + }; 41 + 42 + if let Err(e) = validate_did(&input.service_did) { 43 + return ApiError::InvalidRequest(format!("Invalid serviceDid: {}", e)).into_response(); 44 + } 45 + 46 + if input.token.is_empty() || input.token.len() > 4096 { 47 + return ApiError::InvalidRequest("Invalid push token".to_string()).into_response(); 48 + } 49 + 50 + if !VALID_PLATFORMS.contains(&input.platform.as_str()) { 51 + return ApiError::InvalidRequest(format!( 52 + "Invalid platform. Must be one of: {}", 53 + VALID_PLATFORMS.join(", ") 54 + )) 55 + .into_response(); 56 + } 57 + 58 + if input.app_id.is_empty() || input.app_id.len() > 256 { 59 + return ApiError::InvalidRequest("Invalid appId".to_string()).into_response(); 60 + } 61 + 62 + let appview_url = match std::env::var("APPVIEW_URL") { 63 + Ok(url) => url, 64 + Err(_) => { 65 + return ApiError::UpstreamUnavailable("No upstream AppView configured".to_string()) 66 + .into_response(); 67 + } 68 + }; 69 + 70 + if let Err(e) = is_ssrf_safe(&appview_url) { 71 + error!("SSRF check failed for appview URL: {}", e); 72 + return ApiError::UpstreamUnavailable(format!("Invalid upstream URL: {}", e)) 73 + .into_response(); 74 + } 75 + 76 + let key_row = match sqlx::query!( 77 + "SELECT key_bytes, encryption_version FROM user_keys k JOIN users u ON k.user_id = u.id WHERE u.did = $1", 78 + auth_user.did 79 + ) 80 + .fetch_optional(&state.db) 81 + .await 82 + { 83 + Ok(Some(row)) => row, 84 + Ok(None) => { 85 + error!(did = %auth_user.did, "No signing key found for user"); 86 + return ApiError::InternalError.into_response(); 87 + } 88 + Err(e) => { 89 + error!(error = ?e, "Database error fetching signing key"); 90 + return ApiError::DatabaseError.into_response(); 91 + } 92 + }; 93 + 94 + let decrypted_key = 95 + match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) { 96 + Ok(k) => k, 97 + Err(e) => { 98 + error!(error = ?e, "Failed to decrypt signing key"); 99 + return ApiError::InternalError.into_response(); 100 + } 101 + }; 102 + 103 + let service_token = match crate::auth::create_service_token( 104 + &auth_user.did, 105 + &input.service_did, 106 + "app.bsky.notification.registerPush", 107 + &decrypted_key, 108 + ) { 109 + Ok(t) => t, 110 + Err(e) => { 111 + error!(error = ?e, "Failed to create service token"); 112 + return ApiError::InternalError.into_response(); 113 + } 114 + }; 115 + 116 + let target_url = format!("{}/xrpc/app.bsky.notification.registerPush", appview_url); 117 + info!( 118 + target = %target_url, 119 + service_did = %input.service_did, 120 + platform = %input.platform, 121 + "Proxying registerPush request" 122 + ); 123 + 124 + let client = proxy_client(); 125 + let request_body = json!({ 126 + "serviceDid": input.service_did, 127 + "token": input.token, 128 + "platform": input.platform, 129 + "appId": input.app_id 130 + }); 131 + 132 + match client 133 + .post(&target_url) 134 + .header("Authorization", format!("Bearer {}", service_token)) 135 + .header("Content-Type", "application/json") 136 + .json(&request_body) 137 + .send() 138 + .await 139 + { 140 + Ok(resp) => { 141 + let status = 142 + StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY); 143 + if status.is_success() { 144 + StatusCode::OK.into_response() 145 + } else { 146 + let body = resp.bytes().await.unwrap_or_default(); 147 + error!( 148 + status = %status, 149 + "registerPush upstream error" 150 + ); 151 + ApiError::from_upstream_response(status.as_u16(), &body).into_response() 152 + } 153 + } 154 + Err(e) => { 155 + error!(error = ?e, "Error proxying registerPush"); 156 + if e.is_timeout() { 157 + ApiError::UpstreamTimeout.into_response() 158 + } else if e.is_connect() { 159 + ApiError::UpstreamUnavailable("Failed to connect to upstream".to_string()) 160 + .into_response() 161 + } else { 162 + ApiError::UpstreamFailure.into_response() 163 + } 164 + } 165 + } 166 + }
+8 -14
src/api/proxy.rs
··· 46 46 if let Some(token) = crate::auth::extract_bearer_token_from_header( 47 47 headers.get("Authorization").and_then(|h| h.to_str().ok()) 48 48 ) { 49 - if let Ok(did) = crate::auth::get_did_from_token(&token) { 50 - let key_row = sqlx::query!("SELECT k.key_bytes, k.encryption_version FROM user_keys k JOIN users u ON k.user_id = u.id WHERE u.did = $1", did) 51 - .fetch_optional(&state.db) 52 - .await; 53 - 54 - if let Ok(Some(row)) = key_row { 55 - if let Ok(decrypted_key) = crate::config::decrypt_key(&row.key_bytes, row.encryption_version) { 56 - if let Ok(new_token) = 57 - crate::auth::create_service_token(&did, aud, &method, &decrypted_key) 49 + if let Ok(auth_user) = crate::auth::validate_bearer_token(&state.db, &token).await { 50 + if let Some(key_bytes) = auth_user.key_bytes { 51 + if let Ok(new_token) = 52 + crate::auth::create_service_token(&auth_user.did, aud, &method, &key_bytes) 53 + { 54 + if let Ok(val) = 55 + axum::http::HeaderValue::from_str(&format!("Bearer {}", new_token)) 58 56 { 59 - if let Ok(val) = 60 - axum::http::HeaderValue::from_str(&format!("Bearer {}", new_token)) 61 - { 62 - auth_header_val = Some(val); 63 - } 57 + auth_header_val = Some(val); 64 58 } 65 59 } 66 60 }
+252
src/api/proxy_client.rs
··· 1 + use reqwest::{Client, ClientBuilder, Url}; 2 + use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; 3 + use std::sync::OnceLock; 4 + use std::time::Duration; 5 + use tracing::warn; 6 + 7 + pub const DEFAULT_HEADERS_TIMEOUT: Duration = Duration::from_secs(10); 8 + pub const DEFAULT_BODY_TIMEOUT: Duration = Duration::from_secs(30); 9 + pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); 10 + pub const MAX_RESPONSE_SIZE: u64 = 10 * 1024 * 1024; 11 + 12 + static PROXY_CLIENT: OnceLock<Client> = OnceLock::new(); 13 + 14 + pub fn proxy_client() -> &'static Client { 15 + PROXY_CLIENT.get_or_init(|| { 16 + ClientBuilder::new() 17 + .timeout(DEFAULT_BODY_TIMEOUT) 18 + .connect_timeout(DEFAULT_CONNECT_TIMEOUT) 19 + .pool_max_idle_per_host(10) 20 + .pool_idle_timeout(Duration::from_secs(90)) 21 + .redirect(reqwest::redirect::Policy::none()) 22 + .build() 23 + .expect("Failed to build HTTP client - this indicates a TLS or system configuration issue") 24 + }) 25 + } 26 + 27 + pub fn is_ssrf_safe(url: &str) -> Result<(), SsrfError> { 28 + let parsed = Url::parse(url).map_err(|_| SsrfError::InvalidUrl)?; 29 + 30 + let scheme = parsed.scheme(); 31 + if scheme != "https" { 32 + let allow_http = std::env::var("ALLOW_HTTP_PROXY").is_ok() 33 + || url.starts_with("http://127.0.0.1") 34 + || url.starts_with("http://localhost"); 35 + 36 + if !allow_http { 37 + return Err(SsrfError::InsecureProtocol(scheme.to_string())); 38 + } 39 + } 40 + 41 + let host = parsed.host_str().ok_or(SsrfError::NoHost)?; 42 + 43 + if host == "localhost" { 44 + return Ok(()); 45 + } 46 + 47 + if let Ok(ip) = host.parse::<IpAddr>() { 48 + if ip.is_loopback() { 49 + return Ok(()); 50 + } 51 + if !is_unicast_ip(&ip) { 52 + return Err(SsrfError::NonUnicastIp(ip.to_string())); 53 + } 54 + return Ok(()); 55 + } 56 + 57 + let port = parsed.port().unwrap_or(if scheme == "https" { 443 } else { 80 }); 58 + let socket_addrs: Vec<SocketAddr> = match (host, port).to_socket_addrs() { 59 + Ok(addrs) => addrs.collect(), 60 + Err(_) => return Err(SsrfError::DnsResolutionFailed(host.to_string())), 61 + }; 62 + 63 + for addr in &socket_addrs { 64 + if !is_unicast_ip(&addr.ip()) { 65 + warn!( 66 + "DNS resolution for {} returned non-unicast IP: {}", 67 + host, 68 + addr.ip() 69 + ); 70 + return Err(SsrfError::NonUnicastIp(addr.ip().to_string())); 71 + } 72 + } 73 + 74 + Ok(()) 75 + } 76 + 77 + fn is_unicast_ip(ip: &IpAddr) -> bool { 78 + match ip { 79 + IpAddr::V4(v4) => { 80 + !v4.is_loopback() 81 + && !v4.is_broadcast() 82 + && !v4.is_multicast() 83 + && !v4.is_unspecified() 84 + && !v4.is_link_local() 85 + && !is_private_v4(v4) 86 + } 87 + IpAddr::V6(v6) => !v6.is_loopback() && !v6.is_multicast() && !v6.is_unspecified(), 88 + } 89 + } 90 + 91 + fn is_private_v4(ip: &std::net::Ipv4Addr) -> bool { 92 + let octets = ip.octets(); 93 + octets[0] == 10 94 + || (octets[0] == 172 && (16..=31).contains(&octets[1])) 95 + || (octets[0] == 192 && octets[1] == 168) 96 + || (octets[0] == 169 && octets[1] == 254) 97 + } 98 + 99 + #[derive(Debug, Clone)] 100 + pub enum SsrfError { 101 + InvalidUrl, 102 + InsecureProtocol(String), 103 + NoHost, 104 + NonUnicastIp(String), 105 + DnsResolutionFailed(String), 106 + } 107 + 108 + impl std::fmt::Display for SsrfError { 109 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 110 + match self { 111 + SsrfError::InvalidUrl => write!(f, "Invalid URL"), 112 + SsrfError::InsecureProtocol(p) => write!(f, "Insecure protocol: {}", p), 113 + SsrfError::NoHost => write!(f, "No host in URL"), 114 + SsrfError::NonUnicastIp(ip) => write!(f, "Non-unicast IP address: {}", ip), 115 + SsrfError::DnsResolutionFailed(host) => write!(f, "DNS resolution failed for: {}", host), 116 + } 117 + } 118 + } 119 + 120 + impl std::error::Error for SsrfError {} 121 + 122 + pub const HEADERS_TO_FORWARD: &[&str] = &[ 123 + "accept-language", 124 + "atproto-accept-labelers", 125 + "x-bsky-topics", 126 + ]; 127 + 128 + pub const RESPONSE_HEADERS_TO_FORWARD: &[&str] = &[ 129 + "atproto-repo-rev", 130 + "atproto-content-labelers", 131 + "retry-after", 132 + "content-type", 133 + ]; 134 + 135 + pub fn validate_at_uri(uri: &str) -> Result<AtUriParts, &'static str> { 136 + if !uri.starts_with("at://") { 137 + return Err("URI must start with at://"); 138 + } 139 + 140 + let path = uri.trim_start_matches("at://"); 141 + let parts: Vec<&str> = path.split('/').collect(); 142 + 143 + if parts.is_empty() { 144 + return Err("URI missing DID"); 145 + } 146 + 147 + let did = parts[0]; 148 + if !did.starts_with("did:") { 149 + return Err("Invalid DID in URI"); 150 + } 151 + 152 + if parts.len() > 1 { 153 + let collection = parts[1]; 154 + if collection.is_empty() || !collection.contains('.') { 155 + return Err("Invalid collection NSID"); 156 + } 157 + } 158 + 159 + Ok(AtUriParts { 160 + did: did.to_string(), 161 + collection: parts.get(1).map(|s| s.to_string()), 162 + rkey: parts.get(2).map(|s| s.to_string()), 163 + }) 164 + } 165 + 166 + #[derive(Debug, Clone)] 167 + pub struct AtUriParts { 168 + pub did: String, 169 + pub collection: Option<String>, 170 + pub rkey: Option<String>, 171 + } 172 + 173 + pub fn validate_limit(limit: Option<u32>, default: u32, max: u32) -> u32 { 174 + match limit { 175 + Some(l) if l == 0 => default, 176 + Some(l) if l > max => max, 177 + Some(l) => l, 178 + None => default, 179 + } 180 + } 181 + 182 + pub fn validate_did(did: &str) -> Result<(), &'static str> { 183 + if !did.starts_with("did:") { 184 + return Err("Invalid DID format"); 185 + } 186 + 187 + let parts: Vec<&str> = did.split(':').collect(); 188 + if parts.len() < 3 { 189 + return Err("DID must have at least method and identifier"); 190 + } 191 + 192 + let method = parts[1]; 193 + if method != "plc" && method != "web" { 194 + return Err("Unsupported DID method"); 195 + } 196 + 197 + Ok(()) 198 + } 199 + 200 + #[cfg(test)] 201 + mod tests { 202 + use super::*; 203 + 204 + #[test] 205 + fn test_ssrf_safe_https() { 206 + assert!(is_ssrf_safe("https://api.bsky.app/xrpc/test").is_ok()); 207 + } 208 + 209 + #[test] 210 + fn test_ssrf_blocks_http_by_default() { 211 + let result = is_ssrf_safe("http://external.example.com/xrpc/test"); 212 + assert!(matches!(result, Err(SsrfError::InsecureProtocol(_)) | Err(SsrfError::DnsResolutionFailed(_)))); 213 + } 214 + 215 + #[test] 216 + fn test_ssrf_allows_localhost_http() { 217 + assert!(is_ssrf_safe("http://127.0.0.1:8080/test").is_ok()); 218 + assert!(is_ssrf_safe("http://localhost:8080/test").is_ok()); 219 + } 220 + 221 + #[test] 222 + fn test_validate_at_uri() { 223 + let result = validate_at_uri("at://did:plc:test/app.bsky.feed.post/abc123"); 224 + assert!(result.is_ok()); 225 + let parts = result.unwrap(); 226 + assert_eq!(parts.did, "did:plc:test"); 227 + assert_eq!(parts.collection, Some("app.bsky.feed.post".to_string())); 228 + assert_eq!(parts.rkey, Some("abc123".to_string())); 229 + } 230 + 231 + #[test] 232 + fn test_validate_at_uri_invalid() { 233 + assert!(validate_at_uri("https://example.com").is_err()); 234 + assert!(validate_at_uri("at://notadid/collection/rkey").is_err()); 235 + } 236 + 237 + #[test] 238 + fn test_validate_limit() { 239 + assert_eq!(validate_limit(None, 50, 100), 50); 240 + assert_eq!(validate_limit(Some(0), 50, 100), 50); 241 + assert_eq!(validate_limit(Some(200), 50, 100), 100); 242 + assert_eq!(validate_limit(Some(75), 50, 100), 75); 243 + } 244 + 245 + #[test] 246 + fn test_validate_did() { 247 + assert!(validate_did("did:plc:abc123").is_ok()); 248 + assert!(validate_did("did:web:example.com").is_ok()); 249 + assert!(validate_did("notadid").is_err()); 250 + assert!(validate_did("did:unknown:test").is_err()); 251 + } 252 + }
+433
src/api/read_after_write.rs
··· 1 + use crate::api::proxy_client::{ 2 + is_ssrf_safe, proxy_client, MAX_RESPONSE_SIZE, RESPONSE_HEADERS_TO_FORWARD, 3 + }; 4 + use crate::api::ApiError; 5 + use crate::state::AppState; 6 + use axum::{ 7 + http::{HeaderMap, HeaderValue, StatusCode}, 8 + response::{IntoResponse, Response}, 9 + Json, 10 + }; 11 + use chrono::{DateTime, Utc}; 12 + use jacquard_repo::storage::BlockStore; 13 + use serde::{Deserialize, Serialize}; 14 + use serde_json::Value; 15 + use std::collections::HashMap; 16 + use tracing::{error, info, warn}; 17 + use uuid::Uuid; 18 + 19 + pub const REPO_REV_HEADER: &str = "atproto-repo-rev"; 20 + pub const UPSTREAM_LAG_HEADER: &str = "atproto-upstream-lag"; 21 + 22 + #[derive(Debug, Clone, Serialize, Deserialize)] 23 + #[serde(rename_all = "camelCase")] 24 + pub struct PostRecord { 25 + #[serde(rename = "$type")] 26 + pub record_type: Option<String>, 27 + pub text: String, 28 + pub created_at: String, 29 + #[serde(skip_serializing_if = "Option::is_none")] 30 + pub reply: Option<Value>, 31 + #[serde(skip_serializing_if = "Option::is_none")] 32 + pub embed: Option<Value>, 33 + #[serde(skip_serializing_if = "Option::is_none")] 34 + pub langs: Option<Vec<String>>, 35 + #[serde(skip_serializing_if = "Option::is_none")] 36 + pub labels: Option<Value>, 37 + #[serde(skip_serializing_if = "Option::is_none")] 38 + pub tags: Option<Vec<String>>, 39 + #[serde(flatten)] 40 + pub extra: HashMap<String, Value>, 41 + } 42 + 43 + #[derive(Debug, Clone, Serialize, Deserialize)] 44 + #[serde(rename_all = "camelCase")] 45 + pub struct ProfileRecord { 46 + #[serde(rename = "$type")] 47 + pub record_type: Option<String>, 48 + #[serde(skip_serializing_if = "Option::is_none")] 49 + pub display_name: Option<String>, 50 + #[serde(skip_serializing_if = "Option::is_none")] 51 + pub description: Option<String>, 52 + #[serde(skip_serializing_if = "Option::is_none")] 53 + pub avatar: Option<Value>, 54 + #[serde(skip_serializing_if = "Option::is_none")] 55 + pub banner: Option<Value>, 56 + #[serde(flatten)] 57 + pub extra: HashMap<String, Value>, 58 + } 59 + 60 + #[derive(Debug, Clone)] 61 + pub struct RecordDescript<T> { 62 + pub uri: String, 63 + pub cid: String, 64 + pub indexed_at: DateTime<Utc>, 65 + pub record: T, 66 + } 67 + 68 + #[derive(Debug, Clone, Serialize, Deserialize)] 69 + #[serde(rename_all = "camelCase")] 70 + pub struct LikeRecord { 71 + #[serde(rename = "$type")] 72 + pub record_type: Option<String>, 73 + pub subject: LikeSubject, 74 + pub created_at: String, 75 + #[serde(flatten)] 76 + pub extra: HashMap<String, Value>, 77 + } 78 + 79 + #[derive(Debug, Clone, Serialize, Deserialize)] 80 + #[serde(rename_all = "camelCase")] 81 + pub struct LikeSubject { 82 + pub uri: String, 83 + pub cid: String, 84 + } 85 + 86 + #[derive(Debug, Default)] 87 + pub struct LocalRecords { 88 + pub count: usize, 89 + pub profile: Option<RecordDescript<ProfileRecord>>, 90 + pub posts: Vec<RecordDescript<PostRecord>>, 91 + pub likes: Vec<RecordDescript<LikeRecord>>, 92 + } 93 + 94 + pub async fn get_records_since_rev( 95 + state: &AppState, 96 + did: &str, 97 + rev: &str, 98 + ) -> Result<LocalRecords, String> { 99 + let mut result = LocalRecords::default(); 100 + 101 + let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 102 + .fetch_optional(&state.db) 103 + .await 104 + .map_err(|e| format!("DB error: {}", e))? 105 + .ok_or_else(|| "User not found".to_string())?; 106 + 107 + let rows = sqlx::query!( 108 + r#" 109 + SELECT record_cid, collection, rkey, created_at, repo_rev 110 + FROM records 111 + WHERE repo_id = $1 AND repo_rev > $2 112 + ORDER BY repo_rev ASC 113 + LIMIT 10 114 + "#, 115 + user_id, 116 + rev 117 + ) 118 + .fetch_all(&state.db) 119 + .await 120 + .map_err(|e| format!("DB error fetching records: {}", e))?; 121 + 122 + if rows.is_empty() { 123 + return Ok(result); 124 + } 125 + 126 + let sanity_check = sqlx::query_scalar!( 127 + "SELECT 1 as val FROM records WHERE repo_id = $1 AND repo_rev <= $2 LIMIT 1", 128 + user_id, 129 + rev 130 + ) 131 + .fetch_optional(&state.db) 132 + .await 133 + .map_err(|e| format!("DB error sanity check: {}", e))?; 134 + 135 + if sanity_check.is_none() { 136 + warn!("Sanity check failed: no records found before rev {}", rev); 137 + return Ok(result); 138 + } 139 + 140 + for row in rows { 141 + result.count += 1; 142 + 143 + let cid: cid::Cid = match row.record_cid.parse() { 144 + Ok(c) => c, 145 + Err(_) => continue, 146 + }; 147 + 148 + let block_bytes = match state.block_store.get(&cid).await { 149 + Ok(Some(b)) => b, 150 + _ => continue, 151 + }; 152 + 153 + let uri = format!("at://{}/{}/{}", did, row.collection, row.rkey); 154 + let indexed_at = row.created_at; 155 + 156 + if row.collection == "app.bsky.actor.profile" && row.rkey == "self" { 157 + if let Ok(record) = serde_ipld_dagcbor::from_slice::<ProfileRecord>(&block_bytes) { 158 + result.profile = Some(RecordDescript { 159 + uri, 160 + cid: row.record_cid, 161 + indexed_at, 162 + record, 163 + }); 164 + } 165 + } else if row.collection == "app.bsky.feed.post" { 166 + if let Ok(record) = serde_ipld_dagcbor::from_slice::<PostRecord>(&block_bytes) { 167 + result.posts.push(RecordDescript { 168 + uri, 169 + cid: row.record_cid, 170 + indexed_at, 171 + record, 172 + }); 173 + } 174 + } else if row.collection == "app.bsky.feed.like" { 175 + if let Ok(record) = serde_ipld_dagcbor::from_slice::<LikeRecord>(&block_bytes) { 176 + result.likes.push(RecordDescript { 177 + uri, 178 + cid: row.record_cid, 179 + indexed_at, 180 + record, 181 + }); 182 + } 183 + } 184 + } 185 + 186 + Ok(result) 187 + } 188 + 189 + pub fn get_local_lag(local: &LocalRecords) -> Option<i64> { 190 + let mut oldest: Option<DateTime<Utc>> = local.profile.as_ref().map(|p| p.indexed_at); 191 + 192 + for post in &local.posts { 193 + match oldest { 194 + None => oldest = Some(post.indexed_at), 195 + Some(o) if post.indexed_at < o => oldest = Some(post.indexed_at), 196 + _ => {} 197 + } 198 + } 199 + 200 + for like in &local.likes { 201 + match oldest { 202 + None => oldest = Some(like.indexed_at), 203 + Some(o) if like.indexed_at < o => oldest = Some(like.indexed_at), 204 + _ => {} 205 + } 206 + } 207 + 208 + oldest.map(|o| (Utc::now() - o).num_milliseconds()) 209 + } 210 + 211 + pub fn extract_repo_rev(headers: &HeaderMap) -> Option<String> { 212 + headers 213 + .get(REPO_REV_HEADER) 214 + .and_then(|h| h.to_str().ok()) 215 + .map(|s| s.to_string()) 216 + } 217 + 218 + #[derive(Debug)] 219 + pub struct ProxyResponse { 220 + pub status: StatusCode, 221 + pub headers: HeaderMap, 222 + pub body: bytes::Bytes, 223 + } 224 + 225 + pub async fn proxy_to_appview( 226 + method: &str, 227 + params: &HashMap<String, String>, 228 + auth_header: Option<&str>, 229 + ) -> Result<ProxyResponse, Response> { 230 + let appview_url = std::env::var("APPVIEW_URL").map_err(|_| { 231 + ApiError::UpstreamUnavailable("No upstream AppView configured".to_string()).into_response() 232 + })?; 233 + 234 + if let Err(e) = is_ssrf_safe(&appview_url) { 235 + error!("SSRF check failed for appview URL: {}", e); 236 + return Err(ApiError::UpstreamUnavailable(format!("Invalid upstream URL: {}", e)) 237 + .into_response()); 238 + } 239 + 240 + let target_url = format!("{}/xrpc/{}", appview_url, method); 241 + info!(target = %target_url, "Proxying request to appview"); 242 + 243 + let client = proxy_client(); 244 + let mut request_builder = client.get(&target_url).query(params); 245 + 246 + if let Some(auth) = auth_header { 247 + request_builder = request_builder.header("Authorization", auth); 248 + } 249 + 250 + match request_builder.send().await { 251 + Ok(resp) => { 252 + let status = 253 + StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY); 254 + 255 + let headers: HeaderMap = resp 256 + .headers() 257 + .iter() 258 + .filter(|(k, _)| { 259 + RESPONSE_HEADERS_TO_FORWARD 260 + .iter() 261 + .any(|h| k.as_str().eq_ignore_ascii_case(h)) 262 + }) 263 + .filter_map(|(k, v)| { 264 + let name = axum::http::HeaderName::try_from(k.as_str()).ok()?; 265 + let value = HeaderValue::from_bytes(v.as_bytes()).ok()?; 266 + Some((name, value)) 267 + }) 268 + .collect(); 269 + 270 + let content_length = resp 271 + .content_length() 272 + .unwrap_or(0); 273 + if content_length > MAX_RESPONSE_SIZE { 274 + error!( 275 + content_length, 276 + max = MAX_RESPONSE_SIZE, 277 + "Upstream response too large" 278 + ); 279 + return Err(ApiError::UpstreamFailure.into_response()); 280 + } 281 + 282 + let body = resp.bytes().await.map_err(|e| { 283 + error!(error = ?e, "Error reading proxy response body"); 284 + ApiError::UpstreamFailure.into_response() 285 + })?; 286 + 287 + if body.len() as u64 > MAX_RESPONSE_SIZE { 288 + error!( 289 + len = body.len(), 290 + max = MAX_RESPONSE_SIZE, 291 + "Upstream response body exceeded size limit" 292 + ); 293 + return Err(ApiError::UpstreamFailure.into_response()); 294 + } 295 + 296 + Ok(ProxyResponse { 297 + status, 298 + headers, 299 + body, 300 + }) 301 + } 302 + Err(e) => { 303 + error!(error = ?e, "Error sending proxy request"); 304 + if e.is_timeout() { 305 + Err(ApiError::UpstreamTimeout.into_response()) 306 + } else if e.is_connect() { 307 + Err(ApiError::UpstreamUnavailable("Failed to connect to upstream".to_string()) 308 + .into_response()) 309 + } else { 310 + Err(ApiError::UpstreamFailure.into_response()) 311 + } 312 + } 313 + } 314 + } 315 + 316 + pub fn format_munged_response<T: Serialize>(data: T, lag: Option<i64>) -> Response { 317 + let mut response = (StatusCode::OK, Json(data)).into_response(); 318 + 319 + if let Some(lag_ms) = lag { 320 + if let Ok(header_val) = HeaderValue::from_str(&lag_ms.to_string()) { 321 + response 322 + .headers_mut() 323 + .insert(UPSTREAM_LAG_HEADER, header_val); 324 + } 325 + } 326 + 327 + response 328 + } 329 + 330 + #[derive(Debug, Clone, Serialize, Deserialize)] 331 + #[serde(rename_all = "camelCase")] 332 + pub struct AuthorView { 333 + pub did: String, 334 + pub handle: String, 335 + #[serde(skip_serializing_if = "Option::is_none")] 336 + pub display_name: Option<String>, 337 + #[serde(skip_serializing_if = "Option::is_none")] 338 + pub avatar: Option<String>, 339 + #[serde(flatten)] 340 + pub extra: HashMap<String, Value>, 341 + } 342 + 343 + #[derive(Debug, Clone, Serialize, Deserialize)] 344 + #[serde(rename_all = "camelCase")] 345 + pub struct PostView { 346 + pub uri: String, 347 + pub cid: String, 348 + pub author: AuthorView, 349 + pub record: Value, 350 + pub indexed_at: String, 351 + #[serde(skip_serializing_if = "Option::is_none")] 352 + pub embed: Option<Value>, 353 + #[serde(default)] 354 + pub reply_count: i64, 355 + #[serde(default)] 356 + pub repost_count: i64, 357 + #[serde(default)] 358 + pub like_count: i64, 359 + #[serde(default)] 360 + pub quote_count: i64, 361 + #[serde(flatten)] 362 + pub extra: HashMap<String, Value>, 363 + } 364 + 365 + #[derive(Debug, Clone, Serialize, Deserialize)] 366 + #[serde(rename_all = "camelCase")] 367 + pub struct FeedViewPost { 368 + pub post: PostView, 369 + #[serde(skip_serializing_if = "Option::is_none")] 370 + pub reply: Option<Value>, 371 + #[serde(skip_serializing_if = "Option::is_none")] 372 + pub reason: Option<Value>, 373 + #[serde(skip_serializing_if = "Option::is_none")] 374 + pub feed_context: Option<String>, 375 + #[serde(flatten)] 376 + pub extra: HashMap<String, Value>, 377 + } 378 + 379 + #[derive(Debug, Clone, Serialize, Deserialize)] 380 + pub struct FeedOutput { 381 + pub feed: Vec<FeedViewPost>, 382 + #[serde(skip_serializing_if = "Option::is_none")] 383 + pub cursor: Option<String>, 384 + } 385 + 386 + pub fn format_local_post( 387 + descript: &RecordDescript<PostRecord>, 388 + author_did: &str, 389 + author_handle: &str, 390 + profile: Option<&RecordDescript<ProfileRecord>>, 391 + ) -> PostView { 392 + let display_name = profile.and_then(|p| p.record.display_name.clone()); 393 + 394 + PostView { 395 + uri: descript.uri.clone(), 396 + cid: descript.cid.clone(), 397 + author: AuthorView { 398 + did: author_did.to_string(), 399 + handle: author_handle.to_string(), 400 + display_name, 401 + avatar: None, 402 + extra: HashMap::new(), 403 + }, 404 + record: serde_json::to_value(&descript.record).unwrap_or(Value::Null), 405 + indexed_at: descript.indexed_at.to_rfc3339(), 406 + embed: descript.record.embed.clone(), 407 + reply_count: 0, 408 + repost_count: 0, 409 + like_count: 0, 410 + quote_count: 0, 411 + extra: HashMap::new(), 412 + } 413 + } 414 + 415 + pub fn insert_posts_into_feed(feed: &mut Vec<FeedViewPost>, posts: Vec<PostView>) { 416 + if posts.is_empty() { 417 + return; 418 + } 419 + 420 + let new_items: Vec<FeedViewPost> = posts 421 + .into_iter() 422 + .map(|post| FeedViewPost { 423 + post, 424 + reply: None, 425 + reason: None, 426 + feed_context: None, 427 + extra: HashMap::new(), 428 + }) 429 + .collect(); 430 + 431 + feed.extend(new_items); 432 + feed.sort_by(|a, b| b.post.indexed_at.cmp(&a.post.indexed_at)); 433 + }
+6 -4
src/api/repo/record/utils.rs
··· 25 25 current_root_cid: Option<Cid>, 26 26 new_mst_root: Cid, 27 27 ops: Vec<RecordOp>, 28 - blocks_cids: &Vec<String>, 28 + blocks_cids: &[String], 29 29 ) -> Result<CommitResult, String> { 30 30 let key_row = sqlx::query!( 31 31 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", ··· 63 63 .await 64 64 .map_err(|e| format!("DB Error (repos): {}", e))?; 65 65 66 + let rev_str = rev.to_string(); 66 67 for op in &ops { 67 68 match op { 68 69 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid } => { 69 70 sqlx::query!( 70 - "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 71 - ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 71 + "INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) VALUES ($1, $2, $3, $4, $5) 72 + ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, repo_rev = $5, created_at = NOW()", 72 73 user_id, 73 74 collection, 74 75 rkey, 75 - cid.to_string() 76 + cid.to_string(), 77 + rev_str 76 78 ) 77 79 .execute(&mut *tx) 78 80 .await
+20 -1
src/lib.rs
··· 296 296 "/xrpc/app.bsky.actor.getProfiles", 297 297 get(api::actor::get_profiles), 298 298 ) 299 - // I know I know, I'm not supposed to implement appview endpoints. Leave me be 300 299 .route( 301 300 "/xrpc/app.bsky.feed.getTimeline", 302 301 get(api::feed::get_timeline), 302 + ) 303 + .route( 304 + "/xrpc/app.bsky.feed.getAuthorFeed", 305 + get(api::feed::get_author_feed), 306 + ) 307 + .route( 308 + "/xrpc/app.bsky.feed.getActorLikes", 309 + get(api::feed::get_actor_likes), 310 + ) 311 + .route( 312 + "/xrpc/app.bsky.feed.getPostThread", 313 + get(api::feed::get_post_thread), 314 + ) 315 + .route( 316 + "/xrpc/app.bsky.feed.getFeed", 317 + get(api::feed::get_feed), 318 + ) 319 + .route( 320 + "/xrpc/app.bsky.notification.registerPush", 321 + post(api::notification::register_push), 303 322 ) 304 323 .route("/.well-known/did.json", get(api::identity::well_known_did)) 305 324 .route("/u/{handle}/did.json", get(api::identity::user_did_doc))
+149
tests/appview_integration.rs
··· 1 + mod common; 2 + 3 + use common::{base_url, client, create_account_and_login}; 4 + use reqwest::StatusCode; 5 + use serde_json::{json, Value}; 6 + 7 + #[tokio::test] 8 + async fn test_get_author_feed_returns_appview_data() { 9 + let client = client(); 10 + let base = base_url().await; 11 + let (jwt, did) = create_account_and_login(&client).await; 12 + 13 + let res = client 14 + .get(format!( 15 + "{}/xrpc/app.bsky.feed.getAuthorFeed?actor={}", 16 + base, did 17 + )) 18 + .header("Authorization", format!("Bearer {}", jwt)) 19 + .send() 20 + .await 21 + .unwrap(); 22 + 23 + assert_eq!(res.status(), StatusCode::OK); 24 + 25 + let body: Value = res.json().await.unwrap(); 26 + assert!(body["feed"].is_array(), "Response should have feed array"); 27 + let feed = body["feed"].as_array().unwrap(); 28 + assert_eq!(feed.len(), 1, "Feed should have 1 post from appview"); 29 + assert_eq!( 30 + feed[0]["post"]["record"]["text"].as_str(), 31 + Some("Author feed post from appview"), 32 + "Post text should match appview response" 33 + ); 34 + } 35 + 36 + #[tokio::test] 37 + async fn test_get_actor_likes_returns_appview_data() { 38 + let client = client(); 39 + let base = base_url().await; 40 + let (jwt, did) = create_account_and_login(&client).await; 41 + 42 + let res = client 43 + .get(format!( 44 + "{}/xrpc/app.bsky.feed.getActorLikes?actor={}", 45 + base, did 46 + )) 47 + .header("Authorization", format!("Bearer {}", jwt)) 48 + .send() 49 + .await 50 + .unwrap(); 51 + 52 + assert_eq!(res.status(), StatusCode::OK); 53 + 54 + let body: Value = res.json().await.unwrap(); 55 + assert!(body["feed"].is_array(), "Response should have feed array"); 56 + let feed = body["feed"].as_array().unwrap(); 57 + assert_eq!(feed.len(), 1, "Feed should have 1 liked post from appview"); 58 + assert_eq!( 59 + feed[0]["post"]["record"]["text"].as_str(), 60 + Some("Liked post from appview"), 61 + "Post text should match appview response" 62 + ); 63 + } 64 + 65 + #[tokio::test] 66 + async fn test_get_post_thread_returns_appview_data() { 67 + let client = client(); 68 + let base = base_url().await; 69 + let (jwt, did) = create_account_and_login(&client).await; 70 + 71 + let res = client 72 + .get(format!( 73 + "{}/xrpc/app.bsky.feed.getPostThread?uri=at://{}/app.bsky.feed.post/test123", 74 + base, did 75 + )) 76 + .header("Authorization", format!("Bearer {}", jwt)) 77 + .send() 78 + .await 79 + .unwrap(); 80 + 81 + assert_eq!(res.status(), StatusCode::OK); 82 + 83 + let body: Value = res.json().await.unwrap(); 84 + assert!(body["thread"].is_object(), "Response should have thread object"); 85 + assert_eq!( 86 + body["thread"]["$type"].as_str(), 87 + Some("app.bsky.feed.defs#threadViewPost"), 88 + "Thread should be a threadViewPost" 89 + ); 90 + assert_eq!( 91 + body["thread"]["post"]["record"]["text"].as_str(), 92 + Some("Thread post from appview"), 93 + "Post text should match appview response" 94 + ); 95 + } 96 + 97 + #[tokio::test] 98 + async fn test_get_feed_returns_appview_data() { 99 + let client = client(); 100 + let base = base_url().await; 101 + let (jwt, _did) = create_account_and_login(&client).await; 102 + 103 + let res = client 104 + .get(format!( 105 + "{}/xrpc/app.bsky.feed.getFeed?feed=at://did:plc:test/app.bsky.feed.generator/test", 106 + base 107 + )) 108 + .header("Authorization", format!("Bearer {}", jwt)) 109 + .send() 110 + .await 111 + .unwrap(); 112 + 113 + assert_eq!(res.status(), StatusCode::OK); 114 + 115 + let body: Value = res.json().await.unwrap(); 116 + assert!(body["feed"].is_array(), "Response should have feed array"); 117 + let feed = body["feed"].as_array().unwrap(); 118 + assert_eq!(feed.len(), 1, "Feed should have 1 post from appview"); 119 + assert_eq!( 120 + feed[0]["post"]["record"]["text"].as_str(), 121 + Some("Custom feed post from appview"), 122 + "Post text should match appview response" 123 + ); 124 + } 125 + 126 + #[tokio::test] 127 + async fn test_register_push_proxies_to_appview() { 128 + let client = client(); 129 + let base = base_url().await; 130 + let (jwt, _did) = create_account_and_login(&client).await; 131 + 132 + let res = client 133 + .post(format!( 134 + "{}/xrpc/app.bsky.notification.registerPush", 135 + base 136 + )) 137 + .header("Authorization", format!("Bearer {}", jwt)) 138 + .json(&json!({ 139 + "serviceDid": "did:web:example.com", 140 + "token": "test-push-token", 141 + "platform": "ios", 142 + "appId": "xyz.bsky.app" 143 + })) 144 + .send() 145 + .await 146 + .unwrap(); 147 + 148 + assert_eq!(res.status(), StatusCode::OK); 149 + }
+116
tests/common/mod.rs
··· 233 233 }))) 234 234 .mount(mock_server) 235 235 .await; 236 + 237 + Mock::given(method("GET")) 238 + .and(path("/xrpc/app.bsky.feed.getTimeline")) 239 + .respond_with( 240 + ResponseTemplate::new(200) 241 + .insert_header("atproto-repo-rev", "0") 242 + .set_body_json(json!({ 243 + "feed": [], 244 + "cursor": null 245 + })) 246 + ) 247 + .mount(mock_server) 248 + .await; 249 + 250 + Mock::given(method("GET")) 251 + .and(path("/xrpc/app.bsky.feed.getAuthorFeed")) 252 + .respond_with( 253 + ResponseTemplate::new(200) 254 + .insert_header("atproto-repo-rev", "0") 255 + .set_body_json(json!({ 256 + "feed": [{ 257 + "post": { 258 + "uri": "at://did:plc:mock-author/app.bsky.feed.post/from-appview-author", 259 + "cid": "bafyappview123", 260 + "author": {"did": "did:plc:mock-author", "handle": "mock.author"}, 261 + "record": { 262 + "$type": "app.bsky.feed.post", 263 + "text": "Author feed post from appview", 264 + "createdAt": "2025-01-01T00:00:00Z" 265 + }, 266 + "indexedAt": "2025-01-01T00:00:00Z" 267 + } 268 + }], 269 + "cursor": "author-cursor" 270 + })), 271 + ) 272 + .mount(mock_server) 273 + .await; 274 + 275 + Mock::given(method("GET")) 276 + .and(path("/xrpc/app.bsky.feed.getActorLikes")) 277 + .respond_with( 278 + ResponseTemplate::new(200) 279 + .insert_header("atproto-repo-rev", "0") 280 + .set_body_json(json!({ 281 + "feed": [{ 282 + "post": { 283 + "uri": "at://did:plc:mock-likes/app.bsky.feed.post/liked-post", 284 + "cid": "bafyliked123", 285 + "author": {"did": "did:plc:mock-likes", "handle": "mock.likes"}, 286 + "record": { 287 + "$type": "app.bsky.feed.post", 288 + "text": "Liked post from appview", 289 + "createdAt": "2025-01-01T00:00:00Z" 290 + }, 291 + "indexedAt": "2025-01-01T00:00:00Z" 292 + } 293 + }], 294 + "cursor": null 295 + })), 296 + ) 297 + .mount(mock_server) 298 + .await; 299 + 300 + Mock::given(method("GET")) 301 + .and(path("/xrpc/app.bsky.feed.getPostThread")) 302 + .respond_with( 303 + ResponseTemplate::new(200) 304 + .insert_header("atproto-repo-rev", "0") 305 + .set_body_json(json!({ 306 + "thread": { 307 + "$type": "app.bsky.feed.defs#threadViewPost", 308 + "post": { 309 + "uri": "at://did:plc:mock/app.bsky.feed.post/thread-post", 310 + "cid": "bafythread123", 311 + "author": {"did": "did:plc:mock", "handle": "mock.handle"}, 312 + "record": { 313 + "$type": "app.bsky.feed.post", 314 + "text": "Thread post from appview", 315 + "createdAt": "2025-01-01T00:00:00Z" 316 + }, 317 + "indexedAt": "2025-01-01T00:00:00Z" 318 + }, 319 + "replies": [] 320 + } 321 + })), 322 + ) 323 + .mount(mock_server) 324 + .await; 325 + 326 + Mock::given(method("GET")) 327 + .and(path("/xrpc/app.bsky.feed.getFeed")) 328 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 329 + "feed": [{ 330 + "post": { 331 + "uri": "at://did:plc:mock-feed/app.bsky.feed.post/custom-feed-post", 332 + "cid": "bafyfeed123", 333 + "author": {"did": "did:plc:mock-feed", "handle": "mock.feed"}, 334 + "record": { 335 + "$type": "app.bsky.feed.post", 336 + "text": "Custom feed post from appview", 337 + "createdAt": "2025-01-01T00:00:00Z" 338 + }, 339 + "indexedAt": "2025-01-01T00:00:00Z" 340 + } 341 + }], 342 + "cursor": null 343 + }))) 344 + .mount(mock_server) 345 + .await; 346 + 347 + Mock::given(method("POST")) 348 + .and(path("/xrpc/app.bsky.notification.registerPush")) 349 + .respond_with(ResponseTemplate::new(200)) 350 + .mount(mock_server) 351 + .await; 236 352 } 237 353 238 354 async fn spawn_app(database_url: String) -> String {
+122
tests/feed.rs
··· 1 + mod common; 2 + 3 + use common::{base_url, client, create_account_and_login}; 4 + use serde_json::json; 5 + 6 + #[tokio::test] 7 + async fn test_get_timeline_requires_auth() { 8 + let client = client(); 9 + let base = base_url().await; 10 + 11 + let res = client 12 + .get(format!("{}/xrpc/app.bsky.feed.getTimeline", base)) 13 + .send() 14 + .await 15 + .unwrap(); 16 + 17 + assert_eq!(res.status(), 401); 18 + } 19 + 20 + #[tokio::test] 21 + async fn test_get_author_feed_requires_actor() { 22 + let client = client(); 23 + let base = base_url().await; 24 + let (jwt, _did) = create_account_and_login(&client).await; 25 + 26 + let res = client 27 + .get(format!("{}/xrpc/app.bsky.feed.getAuthorFeed", base)) 28 + .header("Authorization", format!("Bearer {}", jwt)) 29 + .send() 30 + .await 31 + .unwrap(); 32 + 33 + assert_eq!(res.status(), 400); 34 + } 35 + 36 + #[tokio::test] 37 + async fn test_get_actor_likes_requires_actor() { 38 + let client = client(); 39 + let base = base_url().await; 40 + let (jwt, _did) = create_account_and_login(&client).await; 41 + 42 + let res = client 43 + .get(format!("{}/xrpc/app.bsky.feed.getActorLikes", base)) 44 + .header("Authorization", format!("Bearer {}", jwt)) 45 + .send() 46 + .await 47 + .unwrap(); 48 + 49 + assert_eq!(res.status(), 400); 50 + } 51 + 52 + #[tokio::test] 53 + async fn test_get_post_thread_requires_uri() { 54 + let client = client(); 55 + let base = base_url().await; 56 + let (jwt, _did) = create_account_and_login(&client).await; 57 + 58 + let res = client 59 + .get(format!("{}/xrpc/app.bsky.feed.getPostThread", base)) 60 + .header("Authorization", format!("Bearer {}", jwt)) 61 + .send() 62 + .await 63 + .unwrap(); 64 + 65 + assert_eq!(res.status(), 400); 66 + } 67 + 68 + #[tokio::test] 69 + async fn test_get_feed_requires_auth() { 70 + let client = client(); 71 + let base = base_url().await; 72 + 73 + let res = client 74 + .get(format!( 75 + "{}/xrpc/app.bsky.feed.getFeed?feed=at://did:plc:test/app.bsky.feed.generator/test", 76 + base 77 + )) 78 + .send() 79 + .await 80 + .unwrap(); 81 + 82 + assert_eq!(res.status(), 401); 83 + } 84 + 85 + #[tokio::test] 86 + async fn test_get_feed_requires_feed_param() { 87 + let client = client(); 88 + let base = base_url().await; 89 + let (jwt, _did) = create_account_and_login(&client).await; 90 + 91 + let res = client 92 + .get(format!("{}/xrpc/app.bsky.feed.getFeed", base)) 93 + .header("Authorization", format!("Bearer {}", jwt)) 94 + .send() 95 + .await 96 + .unwrap(); 97 + 98 + assert_eq!(res.status(), 400); 99 + } 100 + 101 + #[tokio::test] 102 + async fn test_register_push_requires_auth() { 103 + let client = client(); 104 + let base = base_url().await; 105 + 106 + let res = client 107 + .post(format!( 108 + "{}/xrpc/app.bsky.notification.registerPush", 109 + base 110 + )) 111 + .json(&json!({ 112 + "serviceDid": "did:web:example.com", 113 + "token": "test-token", 114 + "platform": "ios", 115 + "appId": "xyz.bsky.app" 116 + })) 117 + .send() 118 + .await 119 + .unwrap(); 120 + 121 + assert_eq!(res.status(), 401); 122 + }
+1 -1
tests/import_verification.rs
··· 23 23 async fn test_import_rejects_car_for_different_user() { 24 24 let client = client(); 25 25 26 - let (token_a, did_a) = create_account_and_login(&client).await; 26 + let (token_a, _did_a) = create_account_and_login(&client).await; 27 27 let (_token_b, did_b) = create_account_and_login(&client).await; 28 28 29 29 let export_res = client