tangled
alpha
login
or
join now
slices.network
/
slices
137
fork
atom
Highly ambitious ATProtocol AppView service and sdks
137
fork
atom
overview
issues
10
pulls
3
pipelines
refactor handlers into api folder
chadtmiller.com
5 months ago
71a3ac7c
2999a736
+179
-201
16 changed files
expand all
collapse all
unified
split
api
src
api
actors.rs
jetstream.rs
jobs.rs
logs.rs
mod.rs
oauth.rs
openapi.rs
records.rs
sparkline.rs
stats.rs
sync.rs
upload_blob.rs
xrpc_dynamic.rs
handler_sync.rs
handler_sync_user_collections.rs
main.rs
+12
api/src/api/mod.rs
···
1
1
+
pub mod actors;
2
2
+
pub mod jetstream;
3
3
+
pub mod jobs;
4
4
+
pub mod logs;
5
5
+
pub mod oauth;
6
6
+
pub mod openapi;
7
7
+
pub mod records;
8
8
+
pub mod sparkline;
9
9
+
pub mod stats;
10
10
+
pub mod sync;
11
11
+
pub mod upload_blob;
12
12
+
pub mod xrpc_dynamic;
+146
api/src/api/sync.rs
···
1
1
+
use crate::AppState;
2
2
+
use crate::auth;
3
3
+
use crate::jobs;
4
4
+
use crate::models::BulkSyncParams;
5
5
+
use axum::{
6
6
+
extract::State,
7
7
+
http::{HeaderMap, StatusCode},
8
8
+
response::Json,
9
9
+
};
10
10
+
use serde::{Deserialize, Serialize};
11
11
+
use tracing::{info, warn};
12
12
+
use uuid::Uuid;
13
13
+
14
14
+
#[derive(Debug, Deserialize)]
15
15
+
#[serde(rename_all = "camelCase")]
16
16
+
pub struct SyncRequest {
17
17
+
#[serde(flatten)]
18
18
+
pub params: BulkSyncParams,
19
19
+
pub slice: String,
20
20
+
}
21
21
+
22
22
+
#[derive(Debug, Serialize)]
23
23
+
#[serde(rename_all = "camelCase")]
24
24
+
pub struct SyncJobResponse {
25
25
+
pub success: bool,
26
26
+
pub job_id: Option<Uuid>,
27
27
+
pub message: String,
28
28
+
}
29
29
+
30
30
+
pub async fn sync(
31
31
+
State(state): State<AppState>,
32
32
+
headers: HeaderMap,
33
33
+
axum::extract::Json(request): axum::extract::Json<SyncRequest>,
34
34
+
) -> Result<Json<SyncJobResponse>, StatusCode> {
35
35
+
let token = auth::extract_bearer_token(&headers)?;
36
36
+
let user_info = auth::verify_oauth_token(&token, &state.config.auth_base_url).await?;
37
37
+
38
38
+
let user_did = user_info.sub;
39
39
+
let slice_uri = request.slice;
40
40
+
41
41
+
match jobs::enqueue_sync_job(&state.database_pool, user_did, slice_uri, request.params).await {
42
42
+
Ok(job_id) => Ok(Json(SyncJobResponse {
43
43
+
success: true,
44
44
+
job_id: Some(job_id),
45
45
+
message: format!("Sync job {} enqueued successfully", job_id),
46
46
+
})),
47
47
+
Err(e) => {
48
48
+
tracing::error!("Failed to enqueue sync job: {}", e);
49
49
+
Ok(Json(SyncJobResponse {
50
50
+
success: false,
51
51
+
job_id: None,
52
52
+
message: format!("Failed to enqueue sync job: {}", e),
53
53
+
}))
54
54
+
}
55
55
+
}
56
56
+
}
57
57
+
58
58
+
#[derive(Deserialize)]
59
59
+
#[serde(rename_all = "camelCase")]
60
60
+
pub struct SyncUserCollectionsRequest {
61
61
+
pub slice: String,
62
62
+
#[serde(default = "default_timeout")]
63
63
+
pub timeout_seconds: u64,
64
64
+
}
65
65
+
66
66
+
fn default_timeout() -> u64 {
67
67
+
30
68
68
+
}
69
69
+
70
70
+
pub async fn sync_user_collections(
71
71
+
State(state): State<AppState>,
72
72
+
headers: HeaderMap,
73
73
+
Json(request): Json<SyncUserCollectionsRequest>,
74
74
+
) -> Result<Json<crate::sync::SyncUserCollectionsResult>, (StatusCode, Json<serde_json::Value>)> {
75
75
+
let token = auth::extract_bearer_token(&headers).map_err(|e| {
76
76
+
(
77
77
+
StatusCode::UNAUTHORIZED,
78
78
+
Json(serde_json::json!({
79
79
+
"error": "AuthenticationRequired",
80
80
+
"message": format!("Bearer token required: {}", e)
81
81
+
})),
82
82
+
)
83
83
+
})?;
84
84
+
85
85
+
let user_info = auth::verify_oauth_token(&token, &state.config.auth_base_url)
86
86
+
.await
87
87
+
.map_err(|e| {
88
88
+
(
89
89
+
StatusCode::UNAUTHORIZED,
90
90
+
Json(serde_json::json!({
91
91
+
"error": "InvalidToken",
92
92
+
"message": format!("Token verification failed: {}", e)
93
93
+
})),
94
94
+
)
95
95
+
})?;
96
96
+
97
97
+
let user_did = user_info.did.unwrap_or(user_info.sub);
98
98
+
99
99
+
info!(
100
100
+
"🔄 Starting user collections sync for {} on slice {} (timeout: {}s)",
101
101
+
user_did, request.slice, request.timeout_seconds
102
102
+
);
103
103
+
104
104
+
if request.timeout_seconds > 300 {
105
105
+
return Err((
106
106
+
StatusCode::BAD_REQUEST,
107
107
+
Json(serde_json::json!({
108
108
+
"error": "InvalidTimeout",
109
109
+
"message": "Maximum timeout is 300 seconds (5 minutes)"
110
110
+
})),
111
111
+
));
112
112
+
}
113
113
+
114
114
+
let sync_service =
115
115
+
crate::sync::SyncService::new(state.database.clone(), state.config.relay_endpoint.clone());
116
116
+
117
117
+
match sync_service
118
118
+
.sync_user_collections(&user_did, &request.slice, request.timeout_seconds)
119
119
+
.await
120
120
+
{
121
121
+
Ok(result) => {
122
122
+
if result.timed_out {
123
123
+
info!(
124
124
+
"⏰ Sync timed out for user {}, suggesting async job",
125
125
+
user_did
126
126
+
);
127
127
+
} else {
128
128
+
info!(
129
129
+
"✅ Sync completed for user {}: {} repos, {} records",
130
130
+
user_did, result.repos_processed, result.records_synced
131
131
+
);
132
132
+
}
133
133
+
Ok(Json(result))
134
134
+
}
135
135
+
Err(e) => {
136
136
+
warn!("❌ Sync failed for user {}: {}", user_did, e);
137
137
+
Err((
138
138
+
StatusCode::INTERNAL_SERVER_ERROR,
139
139
+
Json(serde_json::json!({
140
140
+
"error": "SyncFailed",
141
141
+
"message": format!("Sync operation failed: {}", e)
142
142
+
})),
143
143
+
))
144
144
+
}
145
145
+
}
146
146
+
}
api/src/handler_get_actors.rs
api/src/api/actors.rs
api/src/handler_get_records.rs
api/src/api/records.rs
api/src/handler_jetstream_status.rs
api/src/api/jetstream.rs
api/src/handler_jobs.rs
api/src/api/jobs.rs
api/src/handler_logs.rs
api/src/api/logs.rs
api/src/handler_oauth_clients.rs
api/src/api/oauth.rs
api/src/handler_openapi_spec.rs
api/src/api/openapi.rs
api/src/handler_sparkline.rs
api/src/api/sparkline.rs
api/src/handler_stats.rs
api/src/api/stats.rs
-58
api/src/handler_sync.rs
···
1
1
-
use crate::AppState;
2
2
-
use crate::auth;
3
3
-
use crate::jobs;
4
4
-
use crate::models::BulkSyncParams;
5
5
-
use axum::{
6
6
-
extract::State,
7
7
-
http::{HeaderMap, StatusCode},
8
8
-
response::Json,
9
9
-
};
10
10
-
use serde::{Deserialize, Serialize};
11
11
-
use uuid::Uuid;
12
12
-
13
13
-
#[derive(Debug, Deserialize)]
14
14
-
#[serde(rename_all = "camelCase")]
15
15
-
pub struct SyncRequest {
16
16
-
#[serde(flatten)]
17
17
-
pub params: BulkSyncParams,
18
18
-
pub slice: String, // The slice URI
19
19
-
}
20
20
-
21
21
-
#[derive(Debug, Serialize)]
22
22
-
#[serde(rename_all = "camelCase")]
23
23
-
pub struct SyncJobResponse {
24
24
-
pub success: bool,
25
25
-
pub job_id: Option<Uuid>,
26
26
-
pub message: String,
27
27
-
}
28
28
-
29
29
-
/// Start a sync job (enqueue it for background processing)
30
30
-
pub async fn sync(
31
31
-
State(state): State<AppState>,
32
32
-
headers: HeaderMap,
33
33
-
axum::extract::Json(request): axum::extract::Json<SyncRequest>,
34
34
-
) -> Result<Json<SyncJobResponse>, StatusCode> {
35
35
-
// Extract and verify authentication
36
36
-
let token = auth::extract_bearer_token(&headers)?;
37
37
-
let user_info = auth::verify_oauth_token(&token, &state.config.auth_base_url).await?;
38
38
-
39
39
-
let user_did = user_info.sub;
40
40
-
let slice_uri = request.slice;
41
41
-
42
42
-
// Enqueue the sync job with authenticated user information
43
43
-
match jobs::enqueue_sync_job(&state.database_pool, user_did, slice_uri, request.params).await {
44
44
-
Ok(job_id) => Ok(Json(SyncJobResponse {
45
45
-
success: true,
46
46
-
job_id: Some(job_id),
47
47
-
message: format!("Sync job {} enqueued successfully", job_id),
48
48
-
})),
49
49
-
Err(e) => {
50
50
-
tracing::error!("Failed to enqueue sync job: {}", e);
51
51
-
Ok(Json(SyncJobResponse {
52
52
-
success: false,
53
53
-
job_id: None,
54
54
-
message: format!("Failed to enqueue sync job: {}", e),
55
55
-
}))
56
56
-
}
57
57
-
}
58
58
-
}
-108
api/src/handler_sync_user_collections.rs
···
1
1
-
use axum::{
2
2
-
extract::State,
3
3
-
http::{HeaderMap, StatusCode},
4
4
-
response::Json,
5
5
-
};
6
6
-
use serde::Deserialize;
7
7
-
use tracing::{info, warn};
8
8
-
9
9
-
use crate::AppState;
10
10
-
use crate::auth::{extract_bearer_token, verify_oauth_token};
11
11
-
use crate::sync::{SyncService, SyncUserCollectionsResult};
12
12
-
13
13
-
#[derive(Deserialize)]
14
14
-
#[serde(rename_all = "camelCase")]
15
15
-
pub struct SyncUserCollectionsRequest {
16
16
-
pub slice: String,
17
17
-
#[serde(default = "default_timeout")]
18
18
-
pub timeout_seconds: u64,
19
19
-
}
20
20
-
21
21
-
fn default_timeout() -> u64 {
22
22
-
30 // 30 second default timeout for login scenarios
23
23
-
}
24
24
-
25
25
-
/// Handler for network.slices.slice.syncUserCollections
26
26
-
/// Synchronously syncs external collections for the authenticated user with timeout protection
27
27
-
/// Automatically discovers external collections based on the slice's domain configuration
28
28
-
pub async fn sync_user_collections(
29
29
-
State(state): State<AppState>,
30
30
-
headers: HeaderMap,
31
31
-
Json(request): Json<SyncUserCollectionsRequest>,
32
32
-
) -> Result<Json<SyncUserCollectionsResult>, (StatusCode, Json<serde_json::Value>)> {
33
33
-
// Extract and verify OAuth token
34
34
-
let token = extract_bearer_token(&headers).map_err(|e| {
35
35
-
(
36
36
-
StatusCode::UNAUTHORIZED,
37
37
-
Json(serde_json::json!({
38
38
-
"error": "AuthenticationRequired",
39
39
-
"message": format!("Bearer token required: {}", e)
40
40
-
})),
41
41
-
)
42
42
-
})?;
43
43
-
44
44
-
let user_info = verify_oauth_token(&token, &state.config.auth_base_url)
45
45
-
.await
46
46
-
.map_err(|e| {
47
47
-
(
48
48
-
StatusCode::UNAUTHORIZED,
49
49
-
Json(serde_json::json!({
50
50
-
"error": "InvalidToken",
51
51
-
"message": format!("Token verification failed: {}", e)
52
52
-
})),
53
53
-
)
54
54
-
})?;
55
55
-
56
56
-
let user_did = user_info.did.unwrap_or(user_info.sub);
57
57
-
58
58
-
info!(
59
59
-
"🔄 Starting user collections sync for {} on slice {} (timeout: {}s)",
60
60
-
user_did, request.slice, request.timeout_seconds
61
61
-
);
62
62
-
63
63
-
// Validate timeout (max 5 minutes for sync operations)
64
64
-
if request.timeout_seconds > 300 {
65
65
-
return Err((
66
66
-
StatusCode::BAD_REQUEST,
67
67
-
Json(serde_json::json!({
68
68
-
"error": "InvalidTimeout",
69
69
-
"message": "Maximum timeout is 300 seconds (5 minutes)"
70
70
-
})),
71
71
-
));
72
72
-
}
73
73
-
74
74
-
// Create sync service
75
75
-
let sync_service =
76
76
-
SyncService::new(state.database.clone(), state.config.relay_endpoint.clone());
77
77
-
78
78
-
// Perform timeout-protected sync with auto-discovered external collections
79
79
-
match sync_service
80
80
-
.sync_user_collections(&user_did, &request.slice, request.timeout_seconds)
81
81
-
.await
82
82
-
{
83
83
-
Ok(result) => {
84
84
-
if result.timed_out {
85
85
-
info!(
86
86
-
"⏰ Sync timed out for user {}, suggesting async job",
87
87
-
user_did
88
88
-
);
89
89
-
} else {
90
90
-
info!(
91
91
-
"✅ Sync completed for user {}: {} repos, {} records",
92
92
-
user_did, result.repos_processed, result.records_synced
93
93
-
);
94
94
-
}
95
95
-
Ok(Json(result))
96
96
-
}
97
97
-
Err(e) => {
98
98
-
warn!("❌ Sync failed for user {}: {}", user_did, e);
99
99
-
Err((
100
100
-
StatusCode::INTERNAL_SERVER_ERROR,
101
101
-
Json(serde_json::json!({
102
102
-
"error": "SyncFailed",
103
103
-
"message": format!("Sync operation failed: {}", e)
104
104
-
})),
105
105
-
))
106
106
-
}
107
107
-
}
108
108
-
}
api/src/handler_upload_blob.rs
api/src/api/upload_blob.rs
api/src/handler_xrpc_dynamic.rs
api/src/api/xrpc_dynamic.rs
+21
-35
api/src/main.rs
···
1
1
mod actor_resolver;
2
2
+
mod api;
2
3
mod atproto_extensions;
3
4
mod auth;
4
5
mod database;
5
6
mod errors;
6
6
-
mod handler_get_actors;
7
7
-
mod handler_get_records;
8
8
-
mod handler_jetstream_status;
9
9
-
mod handler_jobs;
10
10
-
mod handler_logs;
11
11
-
mod handler_oauth_clients;
12
12
-
mod handler_openapi_spec;
13
13
-
mod handler_sparkline;
14
14
-
mod handler_stats;
15
15
-
mod handler_sync;
16
16
-
mod handler_sync_user_collections;
17
17
-
mod handler_upload_blob;
18
18
-
mod handler_xrpc_dynamic;
19
7
mod jetstream;
20
8
mod jobs;
21
9
mod logging;
···
281
269
"#
282
270
}),
283
271
)
284
284
-
// AT Protocol blob upload endpoint (must come before wildcard routes)
272
272
+
// XRPC endpoints
285
273
.route(
286
274
"/xrpc/com.atproto.repo.uploadBlob",
287
287
-
post(handler_upload_blob::upload_blob),
275
275
+
post(api::upload_blob::upload_blob),
288
276
)
289
289
-
// XRPC endpoints
290
277
.route(
291
278
"/xrpc/network.slices.slice.startSync",
292
292
-
post(handler_sync::sync),
279
279
+
post(api::sync::sync),
293
280
)
294
281
.route(
295
282
"/xrpc/network.slices.slice.syncUserCollections",
296
296
-
post(handler_sync_user_collections::sync_user_collections),
283
283
+
post(api::sync::sync_user_collections),
297
284
)
298
285
.route(
299
286
"/xrpc/network.slices.slice.getJobStatus",
300
300
-
get(handler_jobs::get_job_status),
287
287
+
get(api::jobs::get_job_status),
301
288
)
302
289
.route(
303
290
"/xrpc/network.slices.slice.getJobHistory",
304
304
-
get(handler_jobs::get_slice_job_history),
291
291
+
get(api::jobs::get_slice_job_history),
305
292
)
306
293
.route(
307
294
"/xrpc/network.slices.slice.getJobLogs",
308
308
-
get(handler_logs::get_sync_job_logs_handler),
295
295
+
get(api::logs::get_sync_job_logs_handler),
309
296
)
310
297
.route(
311
298
"/xrpc/network.slices.slice.getJetstreamLogs",
312
312
-
get(handler_logs::get_jetstream_logs_handler),
299
299
+
get(api::logs::get_jetstream_logs_handler),
313
300
)
314
301
.route(
315
302
"/xrpc/network.slices.slice.stats",
316
316
-
post(handler_stats::stats),
303
303
+
post(api::stats::stats),
317
304
)
318
305
.route(
319
306
"/xrpc/network.slices.slice.getSparklines",
320
320
-
post(handler_sparkline::batch_sparkline),
307
307
+
post(api::sparkline::batch_sparkline),
321
308
)
322
309
.route(
323
310
"/xrpc/network.slices.slice.getSliceRecords",
324
324
-
post(handler_get_records::get_records),
311
311
+
post(api::records::get_records),
325
312
)
326
313
.route(
327
314
"/xrpc/network.slices.slice.openapi",
328
328
-
get(handler_openapi_spec::get_openapi_spec),
315
315
+
get(api::openapi::get_openapi_spec),
329
316
)
330
317
.route(
331
318
"/xrpc/network.slices.slice.getJetstreamStatus",
332
332
-
get(handler_jetstream_status::get_jetstream_status),
319
319
+
get(api::jetstream::get_jetstream_status),
333
320
)
334
321
.route(
335
322
"/xrpc/network.slices.slice.getActors",
336
336
-
post(handler_get_actors::get_actors),
323
323
+
post(api::actors::get_actors),
337
324
)
338
338
-
// OAuth client management endpoints
339
325
.route(
340
326
"/xrpc/network.slices.slice.createOAuthClient",
341
341
-
post(handler_oauth_clients::create_oauth_client),
327
327
+
post(api::oauth::create_oauth_client),
342
328
)
343
329
.route(
344
330
"/xrpc/network.slices.slice.getOAuthClients",
345
345
-
get(handler_oauth_clients::get_oauth_clients),
331
331
+
get(api::oauth::get_oauth_clients),
346
332
)
347
333
.route(
348
334
"/xrpc/network.slices.slice.updateOAuthClient",
349
349
-
post(handler_oauth_clients::update_oauth_client),
335
335
+
post(api::oauth::update_oauth_client),
350
336
)
351
337
.route(
352
338
"/xrpc/network.slices.slice.deleteOAuthClient",
353
353
-
post(handler_oauth_clients::delete_oauth_client),
339
339
+
post(api::oauth::delete_oauth_client),
354
340
)
355
341
// Dynamic collection-specific XRPC endpoints (wildcard routes must come last)
356
342
.route(
357
343
"/xrpc/*method",
358
358
-
get(handler_xrpc_dynamic::dynamic_xrpc_handler),
344
344
+
get(api::xrpc_dynamic::dynamic_xrpc_handler),
359
345
)
360
346
.route(
361
347
"/xrpc/*method",
362
362
-
post(handler_xrpc_dynamic::dynamic_xrpc_post_handler),
348
348
+
post(api::xrpc_dynamic::dynamic_xrpc_post_handler),
363
349
)
364
350
.layer(TraceLayer::new_for_http())
365
351
.layer(CorsLayer::permissive())