at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[api] support pagination in GET /repos

ptr.pet e42e3de8 4fe44f8a

verified
+142 -16
+1 -1
AGENTS.md
··· 41 41 - **[`hydrant::backfill`]**: A dedicated worker that fetches full repository CAR files. Uses LIFO prioritization and adaptive concurrency to manage backfill load efficiently. 42 42 - **[`hydrant::api`]**: An Axum-based XRPC server implementing repository read methods (`getRecord`, `listRecords`) and system stats. It also provides a WebSocket event stream and management APIs: 43 43 - `/filter` (`GET`/`PATCH`): Configure indexing mode, signals, and collection patterns. 44 - - `/repos` (`GET`/`PUT`/`DELETE`): Bulk repository management using NDJSON or JSON arrays. 44 + - `/repos` (`GET`/`PUT`/`DELETE`): Bulk repository management using NDJSON or JSON arrays. Supports pagination and partitioning via query parameters. 45 45 - Persistence worker (in `src/main.rs`): Manages periodic background flushes of the LSM-tree and cursor state. 46 46 47 47 ### Lazy event inflation
+4 -1
README.md
··· 94 94 95 95 ### repository management 96 96 97 - - `GET /repos`: get an NDJSON stream of all repositories and their sync status. 97 + - `GET /repos`: get an NDJSON stream of repositories and their sync status. Supports pagination and filtering: 98 + - `limit`: max results (default 100, max 1000) 99 + - `cursor`: DID or u64 index ID depending on partition 100 + - `partition`: `all` (default), `pending` (backfill queue), or `resync` (retries) 98 101 - `PUT /repos`: explicitly track repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). 99 102 - `DELETE /repos`: untrack repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). optionally include `"deleteData": true` to also purge the repository from the database. 100 103
+137 -14
src/api/repos.rs
··· 47 47 pub delete_data: bool, 48 48 } 49 49 50 + #[derive(Deserialize)] 51 + pub struct GetReposParams { 52 + pub limit: Option<usize>, 53 + pub cursor: Option<String>, 54 + pub partition: Option<String>, 55 + } 56 + 50 57 pub async fn handle_get_repos( 51 58 State(state): State<Arc<AppState>>, 59 + Query(params): Query<GetReposParams>, 52 60 ) -> Result<Response, (StatusCode, String)> { 53 - let repos_ks = state.db.repos.clone(); 61 + let limit = params.limit.unwrap_or(100).min(1000); 62 + let partition = params.partition.unwrap_or_else(|| "all".to_string()); 63 + 64 + let items = tokio::task::spawn_blocking(move || { 65 + let db = &state.db; 66 + 67 + let results = match partition.as_str() { 68 + "all" => { 69 + let start_bound = if let Some(cursor) = params.cursor { 70 + let did = jacquard::types::did::Did::new_owned(&cursor) 71 + .map_err(|_| (StatusCode::BAD_REQUEST, "invalid cursor DID".to_string()))?; 72 + let did_key = keys::repo_key(&did); 73 + std::ops::Bound::Excluded(did_key) 74 + } else { 75 + std::ops::Bound::Unbounded 76 + }; 77 + 78 + let mut items = Vec::new(); 79 + for item in db 80 + .repos 81 + .range((start_bound, std::ops::Bound::Unbounded)) 82 + .take(limit) 83 + { 84 + let (k, v) = item 85 + .into_inner() 86 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 87 + let repo_state = crate::db::deser_repo_state(&v) 88 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 89 + let did = crate::db::types::TrimmedDid::try_from(k.as_ref()) 90 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 91 + .to_did(); 92 + 93 + items.push(RepoResponse { 94 + did: did.to_string(), 95 + status: repo_state.status.to_string(), 96 + tracked: repo_state.tracked, 97 + rev: repo_state.rev.as_ref().map(|r| r.to_string()), 98 + last_updated_at: repo_state.last_updated_at, 99 + }); 100 + } 101 + Ok::<_, (StatusCode, String)>(items) 102 + } 103 + "resync" => { 104 + let start_bound = if let Some(cursor) = params.cursor { 105 + let did = jacquard::types::did::Did::new_owned(&cursor) 106 + .map_err(|_| (StatusCode::BAD_REQUEST, "invalid cursor DID".to_string()))?; 107 + let did_key = keys::repo_key(&did); 108 + std::ops::Bound::Excluded(did_key) 109 + } else { 110 + std::ops::Bound::Unbounded 111 + }; 112 + 113 + let mut items = Vec::new(); 114 + for item in db 115 + .resync 116 + .range((start_bound, std::ops::Bound::Unbounded)) 117 + .take(limit) 118 + { 119 + let (k, _) = item 120 + .into_inner() 121 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 122 + 123 + if let Ok(Some(v)) = db.repos.get(&k) { 124 + let repo_state = crate::db::deser_repo_state(&v) 125 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 126 + let did = crate::db::types::TrimmedDid::try_from(k.as_ref()) 127 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 128 + .to_did(); 129 + 130 + items.push(RepoResponse { 131 + did: did.to_string(), 132 + status: repo_state.status.to_string(), 133 + tracked: repo_state.tracked, 134 + rev: repo_state.rev.as_ref().map(|r| r.to_string()), 135 + last_updated_at: repo_state.last_updated_at, 136 + }); 137 + } 138 + } 139 + Ok(items) 140 + } 141 + "pending" => { 142 + let start_bound = if let Some(cursor) = params.cursor { 143 + let id = cursor 144 + .parse::<u64>() 145 + .map_err(|_| (StatusCode::BAD_REQUEST, "invalid cursor id".to_string()))?; 146 + std::ops::Bound::Excluded(id.to_be_bytes().to_vec()) 147 + } else { 148 + std::ops::Bound::Unbounded 149 + }; 150 + 151 + let mut items = Vec::new(); 152 + for item in db 153 + .pending 154 + .range((start_bound, std::ops::Bound::Unbounded)) 155 + .take(limit) 156 + { 157 + let (_, did_key) = item 158 + .into_inner() 159 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 160 + 161 + if let Ok(Some(v)) = db.repos.get(&did_key) { 162 + let repo_state = crate::db::deser_repo_state(&v) 163 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 164 + let did = crate::db::types::TrimmedDid::try_from(did_key.as_ref()) 165 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 166 + .to_did(); 167 + 168 + items.push(RepoResponse { 169 + did: did.to_string(), 170 + status: repo_state.status.to_string(), 171 + tracked: repo_state.tracked, 172 + rev: repo_state.rev.as_ref().map(|r| r.to_string()), 173 + last_updated_at: repo_state.last_updated_at, 174 + }); 175 + } 176 + } 177 + Ok(items) 178 + } 179 + _ => Err((StatusCode::BAD_REQUEST, "invalid partition".to_string())), 180 + }?; 54 181 55 - let stream = futures::stream::iter(repos_ks.prefix(&[]).filter_map(|item| { 56 - let (k, v) = item.into_inner().ok()?; 57 - let did_str = std::str::from_utf8(&k[2..]).ok()?; 58 - let repo_state = crate::db::deser_repo_state(&v).ok()?; 182 + Ok::<_, (StatusCode, String)>(results) 183 + }) 184 + .await 185 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))??; 59 186 60 - let response = RepoResponse { 61 - did: did_str.to_string(), 62 - status: repo_state.status.to_string(), 63 - tracked: repo_state.tracked, 64 - rev: repo_state.rev.as_ref().map(|r| r.to_string()), 65 - last_updated_at: repo_state.last_updated_at, 66 - }; 187 + use futures::StreamExt; 67 188 68 - let json = serde_json::to_string(&response).ok()?; 189 + let stream = futures::stream::iter(items.into_iter().map(|item| { 190 + let json = serde_json::to_string(&item).ok()?; 69 191 Some(Ok::<_, std::io::Error>(format!("{json}\n"))) 70 - })); 192 + })) 193 + .filter_map(|x| futures::future::ready(x)); 71 194 72 195 let body = Body::from_stream(stream); 73 196