at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[api] implement GET /repos/:did for getting a single repo state

ptr.pet 7035e18d 784213e0

verified
+37 -3
+1 -1
AGENTS.md
··· 41 - **[`hydrant::backfill`]**: A dedicated worker that fetches full repository CAR files. Uses LIFO prioritization and adaptive concurrency to manage backfill load efficiently. 42 - **[`hydrant::api`]**: An Axum-based XRPC server implementing repository read methods (`getRecord`, `listRecords`) and system stats. It also provides a WebSocket event stream and management APIs: 43 - `/filter` (`GET`/`PATCH`): Configure indexing mode, signals, and collection patterns. 44 - - `/repos` (`GET`/`PUT`/`DELETE`): Bulk repository management using NDJSON or JSON arrays. Supports pagination and partitioning via query parameters. 45 - Persistence worker (in `src/main.rs`): Manages periodic background flushes of the LSM-tree and cursor state. 46 47 ### Lazy event inflation
··· 41 - **[`hydrant::backfill`]**: A dedicated worker that fetches full repository CAR files. Uses LIFO prioritization and adaptive concurrency to manage backfill load efficiently. 42 - **[`hydrant::api`]**: An Axum-based XRPC server implementing repository read methods (`getRecord`, `listRecords`) and system stats. It also provides a WebSocket event stream and management APIs: 43 - `/filter` (`GET`/`PATCH`): Configure indexing mode, signals, and collection patterns. 44 + - `/repos` (`GET`/`PUT`/`DELETE`): Repository management. 45 - Persistence worker (in `src/main.rs`): Manages periodic background flushes of the LSM-tree and cursor state. 46 47 ### Lazy event inflation
+1
README.md
··· 98 - `limit`: max results (default 100, max 1000) 99 - `cursor`: DID or u64 index ID depending on partition 100 - `partition`: `all` (default), `pending` (backfill queue), or `resync` (retries) 101 - `PUT /repos`: explicitly track repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). 102 - `DELETE /repos`: untrack repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). optionally include `"deleteData": true` to also purge the repository from the database. 103
··· 98 - `limit`: max results (default 100, max 1000) 99 - `cursor`: DID or u64 index ID depending on partition 100 - `partition`: `all` (default), `pending` (backfill queue), or `resync` (retries) 101 + - `GET /repos/{did}`: get the sync status and metadata of a specific repository. 102 - `PUT /repos`: explicitly track repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). 103 - `DELETE /repos`: untrack repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). optionally include `"deleteData": true` to also purge the repository from the database. 104
+35 -2
src/api/repos.rs
··· 1 use std::sync::Arc; 2 3 use axum::{ 4 - Router, 5 body::Body, 6 - extract::{Query, State}, 7 http::{StatusCode, header}, 8 response::{IntoResponse, Response}, 9 routing::{delete, get, put}, ··· 20 pub fn router() -> Router<Arc<AppState>> { 21 Router::new() 22 .route("/repos", get(handle_get_repos)) 23 .route("/repos", put(handle_put_repos)) 24 .route("/repos", delete(handle_delete_repos)) 25 } ··· 152 let body = Body::from_stream(stream); 153 154 Ok(([(header::CONTENT_TYPE, "application/x-ndjson")], body).into_response()) 155 } 156 157 pub async fn handle_put_repos(
··· 1 use std::sync::Arc; 2 3 use axum::{ 4 + Json, Router, 5 body::Body, 6 + extract::{Path, Query, State}, 7 http::{StatusCode, header}, 8 response::{IntoResponse, Response}, 9 routing::{delete, get, put}, ··· 20 pub fn router() -> Router<Arc<AppState>> { 21 Router::new() 22 .route("/repos", get(handle_get_repos)) 23 + .route("/repos/{did}", get(handle_get_repo)) 24 .route("/repos", put(handle_put_repos)) 25 .route("/repos", delete(handle_delete_repos)) 26 } ··· 153 let body = Body::from_stream(stream); 154 155 Ok(([(header::CONTENT_TYPE, "application/x-ndjson")], body).into_response()) 156 + } 157 + 158 + pub async fn handle_get_repo( 159 + State(state): State<Arc<AppState>>, 160 + Path(did_str): Path<String>, 161 + ) -> Result<Json<RepoResponse>, (StatusCode, String)> { 162 + let did = Did::new(&did_str).map_err(bad_request)?; 163 + let did_key = keys::repo_key(&did); 164 + 165 + let item = tokio::task::spawn_blocking(move || { 166 + let db = &state.db; 167 + 168 + let repo_bytes = db.repos.get(&did_key).map_err(internal)?; 169 + let repo_state = repo_bytes 170 + .as_deref() 171 + .map(crate::db::deser_repo_state) 172 + .transpose() 173 + .map_err(internal)?; 174 + 175 + Ok(repo_state.map(|s| RepoResponse { 176 + did: did_str, 177 + status: s.status.to_string(), 178 + tracked: s.tracked, 179 + rev: s.rev.as_ref().map(|r| r.to_string()), 180 + last_updated_at: s.last_updated_at, 181 + })) 182 + }) 183 + .await 184 + .map_err(internal)??; 185 + 186 + item.map(Json) 187 + .ok_or_else(|| (StatusCode::NOT_FOUND, "repository not found".to_string())) 188 } 189 190 pub async fn handle_put_repos(