tangled
alpha
login
or
join now
desertthunder.dev
/
malfestio
5
fork
atom
learn and share notes on atproto (wip) 🦉
malfestio.stormlightlabs.org/
readability
solid
axum
atproto
srs
5
fork
atom
overview
issues
pulls
pipelines
feat: sync endpoints
desertthunder.dev
2 months ago
e0dfde3a
0e1675c8
+483
-13
11 changed files
expand all
collapse all
unified
split
crates
server
src
api
feed.rs
mod.rs
preferences.rs
review.rs
search.rs
social.rs
sync.rs
users.rs
lib.rs
state.rs
docs
todo.md
+3
crates/server/src/api/feed.rs
···
70
70
71
71
let search_repo = Arc::new(crate::repository::search::mock::MockSearchRepository::new())
72
72
as Arc<dyn crate::repository::search::SearchRepository>;
73
73
+
let sync_repo = Arc::new(crate::repository::sync::mock::MockSyncRepository::new())
74
74
+
as Arc<dyn crate::repository::sync::SyncRepository>;
73
75
74
76
let repos = crate::state::Repositories {
75
77
card: card_repo,
···
80
82
social: social_repo,
81
83
deck: deck_repo,
82
84
search: search_repo,
85
85
+
sync: sync_repo,
83
86
};
84
87
85
88
AppState::new(pool, repos, config)
+1
crates/server/src/api/mod.rs
···
10
10
pub mod review;
11
11
pub mod search;
12
12
pub mod social;
13
13
+
pub mod sync;
13
14
pub mod users;
+3
crates/server/src/api/preferences.rs
···
117
117
as Arc<dyn crate::repository::search::SearchRepository>;
118
118
let review_repo = Arc::new(crate::repository::review::mock::MockReviewRepository::new())
119
119
as Arc<dyn crate::repository::review::ReviewRepository>;
120
120
+
let sync_repo = Arc::new(crate::repository::sync::mock::MockSyncRepository::new())
121
121
+
as Arc<dyn crate::repository::sync::SyncRepository>;
120
122
121
123
let config = AppConfig { pds_url: "https://bsky.social".to_string() };
122
124
···
129
131
deck: deck_repo,
130
132
search: search_repo,
131
133
prefs: prefs_repo,
134
134
+
sync: sync_repo,
132
135
};
133
136
134
137
AppState::new(pool, repos, config)
+4
crates/server/src/api/review.rs
···
159
159
let search_repo = Arc::new(crate::repository::search::mock::MockSearchRepository::new())
160
160
as Arc<dyn crate::repository::search::SearchRepository>;
161
161
162
162
+
let sync_repo = Arc::new(crate::repository::sync::mock::MockSyncRepository::new())
163
163
+
as Arc<dyn crate::repository::sync::SyncRepository>;
164
164
+
162
165
let repos = crate::state::Repositories {
163
166
card: card_repo,
164
167
note: note_repo,
···
168
171
social: social_repo,
169
172
deck: deck_repo,
170
173
search: search_repo,
174
174
+
sync: sync_repo,
171
175
};
172
176
173
177
AppState::new(pool, repos, config)
+3
crates/server/src/api/search.rs
···
102
102
let search_repo_trait = search_repo.clone() as Arc<dyn SearchRepository>;
103
103
let prefs_repo = Arc::new(crate::repository::preferences::mock::MockPreferencesRepository::new())
104
104
as Arc<dyn crate::repository::preferences::PreferencesRepository>;
105
105
+
let sync_repo = Arc::new(crate::repository::sync::mock::MockSyncRepository::new())
106
106
+
as Arc<dyn crate::repository::sync::SyncRepository>;
105
107
106
108
Arc::new(AppState {
107
109
pool,
···
113
115
social_repo,
114
116
deck_repo,
115
117
search_repo: search_repo_trait,
118
118
+
sync_repo,
116
119
config,
117
120
auth_cache,
118
121
dpop_nonces: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
+3
crates/server/src/api/social.rs
···
195
195
let search_repo = Arc::new(crate::repository::search::mock::MockSearchRepository::new())
196
196
as Arc<dyn crate::repository::search::SearchRepository>;
197
197
let auth_cache = Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()));
198
198
+
let sync_repo = Arc::new(crate::repository::sync::mock::MockSyncRepository::new())
199
199
+
as Arc<dyn crate::repository::sync::SyncRepository>;
198
200
199
201
Arc::new(AppState {
200
202
pool,
···
206
208
social_repo,
207
209
deck_repo,
208
210
search_repo,
211
211
+
sync_repo,
209
212
config,
210
213
auth_cache,
211
214
dpop_nonces: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
+444
crates/server/src/api/sync.rs
···
1
1
+
//! Sync API endpoints for bi-directional PDS synchronization.
2
2
+
//!
3
3
+
//! Provides endpoints for pushing local changes to PDS, getting sync status,
4
4
+
//! and resolving conflicts.
5
5
+
6
6
+
use crate::middleware::auth::UserContext;
7
7
+
use crate::state::SharedState;
8
8
+
use crate::sync_service::{ConflictStrategy, SyncError, SyncService};
9
9
+
use axum::{
10
10
+
Json,
11
11
+
extract::{Extension, Path, State},
12
12
+
http::StatusCode,
13
13
+
response::IntoResponse,
14
14
+
};
15
15
+
use serde::{Deserialize, Serialize};
16
16
+
use serde_json::json;
17
17
+
use std::str::FromStr;
18
18
+
19
19
+
/// Response for sync push operation.
20
20
+
#[derive(Debug, Clone, Serialize)]
21
21
+
pub struct PushResponse {
22
22
+
pub entity_type: String,
23
23
+
pub entity_id: String,
24
24
+
pub pds_uri: Option<String>,
25
25
+
pub pds_cid: Option<String>,
26
26
+
pub version: i32,
27
27
+
pub status: String,
28
28
+
}
29
29
+
30
30
+
/// Response for sync status query.
31
31
+
#[derive(Debug, Clone, Serialize)]
32
32
+
pub struct SyncStatusResponse {
33
33
+
pub pending_count: usize,
34
34
+
pub conflict_count: usize,
35
35
+
pub pending_items: Vec<PendingItem>,
36
36
+
pub conflicts: Vec<ConflictItem>,
37
37
+
}
38
38
+
39
39
+
#[derive(Debug, Clone, Serialize)]
40
40
+
pub struct PendingItem {
41
41
+
pub entity_type: String,
42
42
+
pub entity_id: String,
43
43
+
}
44
44
+
45
45
+
#[derive(Debug, Clone, Serialize)]
46
46
+
pub struct ConflictItem {
47
47
+
pub entity_type: String,
48
48
+
pub entity_id: String,
49
49
+
pub local_version: i32,
50
50
+
pub remote_version: Option<i32>,
51
51
+
}
52
52
+
53
53
+
/// Request for conflict resolution.
54
54
+
#[derive(Debug, Clone, Deserialize)]
55
55
+
pub struct ResolveConflictRequest {
56
56
+
pub strategy: String,
57
57
+
}
58
58
+
59
59
+
/// Push a deck to the user's PDS.
60
60
+
///
61
61
+
/// POST /api/sync/push/deck/:id
62
62
+
pub async fn push_deck(
63
63
+
State(state): State<SharedState>, ctx: Option<Extension<UserContext>>, Path(deck_id): Path<String>,
64
64
+
) -> impl IntoResponse {
65
65
+
let user = match ctx {
66
66
+
Some(Extension(user)) => user,
67
67
+
None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "Unauthorized"}))).into_response(),
68
68
+
};
69
69
+
70
70
+
let sync_service = create_sync_service(&state);
71
71
+
72
72
+
match sync_service.push_deck(&deck_id, &user).await {
73
73
+
Ok(result) => (
74
74
+
StatusCode::OK,
75
75
+
Json(PushResponse {
76
76
+
entity_type: result.entity_type,
77
77
+
entity_id: result.entity_id,
78
78
+
pds_uri: result.pds_uri,
79
79
+
pds_cid: result.pds_cid,
80
80
+
version: result.new_version,
81
81
+
status: result.status.to_string(),
82
82
+
}),
83
83
+
)
84
84
+
.into_response(),
85
85
+
Err(e) => sync_error_response(e),
86
86
+
}
87
87
+
}
88
88
+
89
89
+
/// Push a note to the user's PDS.
90
90
+
///
91
91
+
/// POST /api/sync/push/note/:id
92
92
+
pub async fn push_note(
93
93
+
State(state): State<SharedState>, ctx: Option<Extension<UserContext>>, Path(note_id): Path<String>,
94
94
+
) -> impl IntoResponse {
95
95
+
let user = match ctx {
96
96
+
Some(Extension(user)) => user,
97
97
+
None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "Unauthorized"}))).into_response(),
98
98
+
};
99
99
+
100
100
+
let sync_service = create_sync_service(&state);
101
101
+
102
102
+
match sync_service.push_note(¬e_id, &user).await {
103
103
+
Ok(result) => (
104
104
+
StatusCode::OK,
105
105
+
Json(PushResponse {
106
106
+
entity_type: result.entity_type,
107
107
+
entity_id: result.entity_id,
108
108
+
pds_uri: result.pds_uri,
109
109
+
pds_cid: result.pds_cid,
110
110
+
version: result.new_version,
111
111
+
status: result.status.to_string(),
112
112
+
}),
113
113
+
)
114
114
+
.into_response(),
115
115
+
Err(e) => sync_error_response(e),
116
116
+
}
117
117
+
}
118
118
+
119
119
+
/// Get the current sync status for the authenticated user.
120
120
+
///
121
121
+
/// GET /api/sync/status
122
122
+
pub async fn get_sync_status(
123
123
+
State(state): State<SharedState>, ctx: Option<Extension<UserContext>>,
124
124
+
) -> impl IntoResponse {
125
125
+
let user = match ctx {
126
126
+
Some(Extension(user)) => user,
127
127
+
None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "Unauthorized"}))).into_response(),
128
128
+
};
129
129
+
130
130
+
let sync_service = create_sync_service(&state);
131
131
+
132
132
+
match sync_service.get_sync_status(&user).await {
133
133
+
Ok(summary) => (
134
134
+
StatusCode::OK,
135
135
+
Json(SyncStatusResponse {
136
136
+
pending_count: summary.pending_count,
137
137
+
conflict_count: summary.conflict_count,
138
138
+
pending_items: summary
139
139
+
.pending_items
140
140
+
.into_iter()
141
141
+
.map(|(entity_type, entity_id)| PendingItem { entity_type, entity_id })
142
142
+
.collect(),
143
143
+
conflicts: summary
144
144
+
.conflicts
145
145
+
.into_iter()
146
146
+
.map(|c| ConflictItem {
147
147
+
entity_type: c.entity_type,
148
148
+
entity_id: c.entity_id,
149
149
+
local_version: c.local_version,
150
150
+
remote_version: c.remote_version,
151
151
+
})
152
152
+
.collect(),
153
153
+
}),
154
154
+
)
155
155
+
.into_response(),
156
156
+
Err(e) => sync_error_response(e),
157
157
+
}
158
158
+
}
159
159
+
160
160
+
/// Resolve a sync conflict.
161
161
+
///
162
162
+
/// POST /api/sync/resolve/:entity_type/:id
163
163
+
pub async fn resolve_conflict(
164
164
+
State(state): State<SharedState>, ctx: Option<Extension<UserContext>>,
165
165
+
Path((entity_type, entity_id)): Path<(String, String)>, Json(payload): Json<ResolveConflictRequest>,
166
166
+
) -> impl IntoResponse {
167
167
+
let user = match ctx {
168
168
+
Some(Extension(user)) => user,
169
169
+
None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "Unauthorized"}))).into_response(),
170
170
+
};
171
171
+
172
172
+
let strategy = match ConflictStrategy::from_str(&payload.strategy) {
173
173
+
Ok(s) => s,
174
174
+
Err(_) => {
175
175
+
return (
176
176
+
StatusCode::BAD_REQUEST,
177
177
+
Json(json!({"error": "Invalid strategy. Use: last_write_wins, keep_local, or keep_remote"})),
178
178
+
)
179
179
+
.into_response();
180
180
+
}
181
181
+
};
182
182
+
183
183
+
let sync_service = create_sync_service(&state);
184
184
+
185
185
+
match sync_service
186
186
+
.resolve_conflict(&entity_type, &entity_id, strategy, &user)
187
187
+
.await
188
188
+
{
189
189
+
Ok(result) => (
190
190
+
StatusCode::OK,
191
191
+
Json(PushResponse {
192
192
+
entity_type: result.entity_type,
193
193
+
entity_id: result.entity_id,
194
194
+
pds_uri: result.pds_uri,
195
195
+
pds_cid: result.pds_cid,
196
196
+
version: result.new_version,
197
197
+
status: result.status.to_string(),
198
198
+
}),
199
199
+
)
200
200
+
.into_response(),
201
201
+
Err(e) => sync_error_response(e),
202
202
+
}
203
203
+
}
204
204
+
205
205
+
/// Create a SyncService from the app state.
206
206
+
fn create_sync_service(state: &SharedState) -> SyncService {
207
207
+
SyncService::new(
208
208
+
state.sync_repo.clone(),
209
209
+
state.deck_repo.clone(),
210
210
+
state.card_repo.clone(),
211
211
+
state.note_repo.clone(),
212
212
+
state.oauth_repo.clone(),
213
213
+
)
214
214
+
}
215
215
+
216
216
+
/// Convert SyncError to HTTP response.
217
217
+
fn sync_error_response(error: SyncError) -> axum::response::Response {
218
218
+
let (status, message) = match &error {
219
219
+
SyncError::NotFound(msg) => (StatusCode::NOT_FOUND, msg.clone()),
220
220
+
SyncError::AuthRequired(msg) => (StatusCode::UNAUTHORIZED, msg.clone()),
221
221
+
SyncError::NoTokens(msg) => (StatusCode::UNAUTHORIZED, msg.clone()),
222
222
+
SyncError::InvalidArgument(msg) => (StatusCode::BAD_REQUEST, msg.clone()),
223
223
+
SyncError::ConflictDetected(info) => (
224
224
+
StatusCode::CONFLICT,
225
225
+
format!("Conflict for {}:{}", info.entity_type, info.entity_id),
226
226
+
),
227
227
+
SyncError::PdsError(e) => (StatusCode::BAD_GATEWAY, e.to_string()),
228
228
+
SyncError::RepoError(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
229
229
+
};
230
230
+
231
231
+
tracing::error!("Sync error: {}", error);
232
232
+
(status, Json(json!({"error": message}))).into_response()
233
233
+
}
234
234
+
235
235
+
#[cfg(test)]
236
236
+
mod tests {
237
237
+
use super::*;
238
238
+
239
239
+
#[test]
240
240
+
fn test_push_response_serialization() {
241
241
+
let response = PushResponse {
242
242
+
entity_type: "deck".to_string(),
243
243
+
entity_id: "123".to_string(),
244
244
+
pds_uri: Some("at://did:plc:test/deck/tid".to_string()),
245
245
+
pds_cid: Some("bafycid".to_string()),
246
246
+
version: 2,
247
247
+
status: "synced".to_string(),
248
248
+
};
249
249
+
250
250
+
let json = serde_json::to_string(&response).unwrap();
251
251
+
assert!(json.contains("\"entity_type\":\"deck\""));
252
252
+
assert!(json.contains("\"version\":2"));
253
253
+
}
254
254
+
255
255
+
#[test]
256
256
+
fn test_sync_status_response_serialization() {
257
257
+
let response = SyncStatusResponse {
258
258
+
pending_count: 2,
259
259
+
conflict_count: 1,
260
260
+
pending_items: vec![
261
261
+
PendingItem { entity_type: "deck".to_string(), entity_id: "1".to_string() },
262
262
+
PendingItem { entity_type: "note".to_string(), entity_id: "2".to_string() },
263
263
+
],
264
264
+
conflicts: vec![ConflictItem {
265
265
+
entity_type: "deck".to_string(),
266
266
+
entity_id: "3".to_string(),
267
267
+
local_version: 5,
268
268
+
remote_version: Some(6),
269
269
+
}],
270
270
+
};
271
271
+
272
272
+
let json = serde_json::to_string(&response).unwrap();
273
273
+
assert!(json.contains("\"pending_count\":2"));
274
274
+
assert!(json.contains("\"conflict_count\":1"));
275
275
+
}
276
276
+
277
277
+
#[test]
278
278
+
fn test_resolve_conflict_request_deserialization() {
279
279
+
let json = r#"{"strategy": "last_write_wins"}"#;
280
280
+
let request: ResolveConflictRequest = serde_json::from_str(json).unwrap();
281
281
+
assert_eq!(request.strategy, "last_write_wins");
282
282
+
283
283
+
let json = r#"{"strategy": "keep_local"}"#;
284
284
+
let request: ResolveConflictRequest = serde_json::from_str(json).unwrap();
285
285
+
assert_eq!(request.strategy, "keep_local");
286
286
+
}
287
287
+
288
288
+
#[test]
289
289
+
fn test_sync_error_response_not_found() {
290
290
+
let error = SyncError::NotFound("deck:123".to_string());
291
291
+
let response = sync_error_response(error);
292
292
+
assert_eq!(response.status(), StatusCode::NOT_FOUND);
293
293
+
}
294
294
+
295
295
+
#[test]
296
296
+
fn test_sync_error_response_unauthorized() {
297
297
+
let error = SyncError::AuthRequired("missing token".to_string());
298
298
+
let response = sync_error_response(error);
299
299
+
assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
300
300
+
}
301
301
+
302
302
+
#[test]
303
303
+
fn test_sync_error_response_bad_request() {
304
304
+
let error = SyncError::InvalidArgument("bad entity type".to_string());
305
305
+
let response = sync_error_response(error);
306
306
+
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
307
307
+
}
308
308
+
309
309
+
#[test]
310
310
+
fn test_sync_error_response_conflict() {
311
311
+
let error = SyncError::ConflictDetected(crate::sync_service::ConflictInfo {
312
312
+
entity_type: "deck".to_string(),
313
313
+
entity_id: "123".to_string(),
314
314
+
local_version: 5,
315
315
+
remote_version: Some(6),
316
316
+
local_updated_at: None,
317
317
+
remote_updated_at: None,
318
318
+
});
319
319
+
let response = sync_error_response(error);
320
320
+
assert_eq!(response.status(), StatusCode::CONFLICT);
321
321
+
}
322
322
+
323
323
+
#[test]
324
324
+
fn test_pending_item_serialization() {
325
325
+
let item = PendingItem { entity_type: "note".to_string(), entity_id: "456".to_string() };
326
326
+
327
327
+
let json = serde_json::to_string(&item).unwrap();
328
328
+
assert!(json.contains("\"entity_type\":\"note\""));
329
329
+
assert!(json.contains("\"entity_id\":\"456\""));
330
330
+
}
331
331
+
332
332
+
#[test]
333
333
+
fn test_conflict_item_serialization() {
334
334
+
let item = ConflictItem {
335
335
+
entity_type: "deck".to_string(),
336
336
+
entity_id: "789".to_string(),
337
337
+
local_version: 3,
338
338
+
remote_version: Some(4),
339
339
+
};
340
340
+
341
341
+
let json = serde_json::to_string(&item).unwrap();
342
342
+
assert!(json.contains("\"local_version\":3"));
343
343
+
assert!(json.contains("\"remote_version\":4"));
344
344
+
}
345
345
+
346
346
+
#[test]
347
347
+
fn test_conflict_item_no_remote_version() {
348
348
+
let item = ConflictItem {
349
349
+
entity_type: "note".to_string(),
350
350
+
entity_id: "abc".to_string(),
351
351
+
local_version: 1,
352
352
+
remote_version: None,
353
353
+
};
354
354
+
355
355
+
let json = serde_json::to_string(&item).unwrap();
356
356
+
assert!(json.contains("\"remote_version\":null"));
357
357
+
}
358
358
+
359
359
+
#[tokio::test]
360
360
+
async fn test_push_deck_unauthorized() {
361
361
+
let pool = crate::db::create_mock_pool();
362
362
+
let repos = crate::state::Repositories::default();
363
363
+
let config = crate::state::AppConfig { pds_url: "https://test.example.com".to_string() };
364
364
+
let state = crate::state::AppState::new(pool, repos, config);
365
365
+
366
366
+
let response = push_deck(State(state), None, Path("deck-123".to_string()))
367
367
+
.await
368
368
+
.into_response();
369
369
+
370
370
+
assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
371
371
+
}
372
372
+
373
373
+
#[tokio::test]
374
374
+
async fn test_push_note_unauthorized() {
375
375
+
let pool = crate::db::create_mock_pool();
376
376
+
let repos = crate::state::Repositories::default();
377
377
+
let config = crate::state::AppConfig { pds_url: "https://test.example.com".to_string() };
378
378
+
let state = crate::state::AppState::new(pool, repos, config);
379
379
+
380
380
+
let response = push_note(State(state), None, Path("note-456".to_string()))
381
381
+
.await
382
382
+
.into_response();
383
383
+
384
384
+
assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
385
385
+
}
386
386
+
387
387
+
#[tokio::test]
388
388
+
async fn test_get_sync_status_unauthorized() {
389
389
+
let pool = crate::db::create_mock_pool();
390
390
+
let repos = crate::state::Repositories::default();
391
391
+
let config = crate::state::AppConfig { pds_url: "https://test.example.com".to_string() };
392
392
+
let state = crate::state::AppState::new(pool, repos, config);
393
393
+
394
394
+
let response = get_sync_status(State(state), None).await.into_response();
395
395
+
396
396
+
assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
397
397
+
}
398
398
+
399
399
+
#[tokio::test]
400
400
+
async fn test_resolve_conflict_unauthorized() {
401
401
+
let pool = crate::db::create_mock_pool();
402
402
+
let repos = crate::state::Repositories::default();
403
403
+
let config = crate::state::AppConfig { pds_url: "https://test.example.com".to_string() };
404
404
+
let state = crate::state::AppState::new(pool, repos, config);
405
405
+
406
406
+
let response = resolve_conflict(
407
407
+
State(state),
408
408
+
None,
409
409
+
Path(("deck".to_string(), "123".to_string())),
410
410
+
Json(ResolveConflictRequest { strategy: "last_write_wins".to_string() }),
411
411
+
)
412
412
+
.await
413
413
+
.into_response();
414
414
+
415
415
+
assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
416
416
+
}
417
417
+
418
418
+
#[tokio::test]
419
419
+
async fn test_resolve_conflict_invalid_strategy() {
420
420
+
let pool = crate::db::create_mock_pool();
421
421
+
let repos = crate::state::Repositories::default();
422
422
+
let config = crate::state::AppConfig { pds_url: "https://test.example.com".to_string() };
423
423
+
let state = crate::state::AppState::new(pool, repos, config);
424
424
+
425
425
+
let user = UserContext {
426
426
+
did: "did:plc:alice".to_string(),
427
427
+
handle: "alice.bsky.social".to_string(),
428
428
+
access_token: "test_token".to_string(),
429
429
+
pds_url: "https://bsky.social".to_string(),
430
430
+
has_dpop: false,
431
431
+
};
432
432
+
433
433
+
let response = resolve_conflict(
434
434
+
State(state),
435
435
+
Some(Extension(user)),
436
436
+
Path(("deck".to_string(), "123".to_string())),
437
437
+
Json(ResolveConflictRequest { strategy: "invalid_strategy".to_string() }),
438
438
+
)
439
439
+
.await
440
440
+
.into_response();
441
441
+
442
442
+
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
443
443
+
}
444
444
+
}
+2
crates/server/src/api/users.rs
···
52
52
social_repo: social_repo.clone() as Arc<dyn SocialRepository>,
53
53
deck_repo: Arc::new(MockDeckRepository::new()) as Arc<dyn crate::repository::deck::DeckRepository>,
54
54
search_repo: Arc::new(MockSearchRepository::new()) as Arc<dyn crate::repository::search::SearchRepository>,
55
55
+
sync_repo: Arc::new(crate::repository::sync::mock::MockSyncRepository::new())
56
56
+
as Arc<dyn crate::repository::sync::SyncRepository>,
55
57
config: crate::state::AppConfig { pds_url: "https://bsky.social".to_string() },
56
58
auth_cache: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
57
59
dpop_nonces: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
+4
crates/server/src/lib.rs
···
82
82
.route("/preferences", get(api::preferences::get_preferences))
83
83
.route("/preferences", axum::routing::put(api::preferences::update_preferences))
84
84
.route("/export/{collection}", get(api::export::export_collection))
85
85
+
.route("/sync/push/deck/{id}", post(api::sync::push_deck))
86
86
+
.route("/sync/push/note/{id}", post(api::sync::push_note))
87
87
+
.route("/sync/status", get(api::sync::get_sync_status))
88
88
+
.route("/sync/resolve/{entity_type}/{id}", post(api::sync::resolve_conflict))
85
89
.layer(axum_middleware::from_fn_with_state(
86
90
state.clone(),
87
91
middleware::auth::auth_middleware,
+13
-9
crates/server/src/state.rs
···
1
1
use crate::db::DbPool;
2
2
use crate::middleware::auth::UserContext;
3
3
use crate::oauth::resolver::IdentityResolver;
4
4
-
use crate::repository;
5
5
-
use crate::repository::card::CardRepository;
6
6
-
use crate::repository::deck::DeckRepository;
7
7
-
use crate::repository::note::NoteRepository;
8
8
-
use crate::repository::oauth::OAuthRepository;
9
9
-
use crate::repository::preferences::PreferencesRepository;
10
10
-
use crate::repository::review::ReviewRepository;
11
11
-
use crate::repository::search::SearchRepository;
12
12
-
use crate::repository::social::SocialRepository;
4
4
+
use crate::repository::{
5
5
+
self, card::CardRepository, deck::DeckRepository, note::NoteRepository, oauth::OAuthRepository,
6
6
+
preferences::PreferencesRepository, review::ReviewRepository, search::SearchRepository, social::SocialRepository,
7
7
+
sync::SyncRepository,
8
8
+
};
13
9
14
10
use deadpool_postgres::Pool;
15
11
use std::collections::HashMap;
···
38
34
pub review: Arc<dyn ReviewRepository>,
39
35
pub social: Arc<dyn SocialRepository>,
40
36
pub search: Arc<dyn SearchRepository>,
37
37
+
pub sync: Arc<dyn SyncRepository>,
41
38
}
42
39
43
40
#[cfg(test)]
···
52
49
review: Arc::new(repository::review::mock::MockReviewRepository::new()),
53
50
social: Arc::new(repository::social::mock::MockSocialRepository::new()),
54
51
search: Arc::new(repository::search::mock::MockSearchRepository::new()),
52
52
+
sync: Arc::new(repository::sync::mock::MockSyncRepository::new()),
55
53
}
56
54
}
57
55
}
···
66
64
let review_repo = std::sync::Arc::new(repository::review::DbReviewRepository::new(pool.clone()));
67
65
let social_repo = std::sync::Arc::new(repository::social::DbSocialRepository::new(pool.clone()));
68
66
let search_repo = std::sync::Arc::new(repository::search::DbSearchRepository::new(pool.clone()));
67
67
+
let sync_repo = std::sync::Arc::new(repository::sync::DbSyncRepository::new(pool.clone()));
69
68
70
69
Self {
71
70
oauth: oauth_repo,
···
76
75
review: review_repo,
77
76
social: social_repo,
78
77
search: search_repo,
78
78
+
sync: sync_repo,
79
79
}
80
80
}
81
81
}
···
90
90
pub review_repo: Arc<dyn ReviewRepository>,
91
91
pub social_repo: Arc<dyn SocialRepository>,
92
92
pub search_repo: Arc<dyn SearchRepository>,
93
93
+
pub sync_repo: Arc<dyn SyncRepository>,
93
94
pub config: AppConfig,
94
95
pub auth_cache: AuthCache,
95
96
/// Cache of valid DPoP nonces. Nonces are single-use and expire after TTL.
···
113
114
review_repo: repos.review,
114
115
social_repo: repos.social,
115
116
search_repo: repos.search,
117
117
+
sync_repo: repos.sync,
116
118
config,
117
119
auth_cache,
118
120
dpop_nonces,
···
131
133
let social_repo = Arc::new(repository::social::mock::MockSocialRepository::new()) as Arc<dyn SocialRepository>;
132
134
let search_repo = Arc::new(repository::search::mock::MockSearchRepository::new()) as Arc<dyn SearchRepository>;
133
135
let deck_repo = Arc::new(repository::deck::mock::MockDeckRepository::new()) as Arc<dyn DeckRepository>;
136
136
+
let sync_repo = Arc::new(repository::sync::mock::MockSyncRepository::new()) as Arc<dyn SyncRepository>;
134
137
let config = AppConfig { pds_url: "https://bsky.social".to_string() };
135
138
let prefs_repo =
136
139
Arc::new(repository::preferences::mock::MockPreferencesRepository::new()) as Arc<dyn PreferencesRepository>;
···
144
147
social: social_repo,
145
148
search: search_repo,
146
149
deck: deck_repo,
150
150
+
sync: sync_repo,
147
151
};
148
152
149
153
Self::new(pool, repos, config)
+3
-4
docs/todo.md
···
32
32
33
33
- [x] Bi-directional sync infrastructure
34
34
- [x] Conflict resolution strategy
35
35
-
- [ ] API endpoints for sync operations
35
35
+
- [x] API endpoints for sync operations
36
36
- [ ] Offline queue for pending publishes
37
37
- [ ] Frontend sync store with IndexedDB persistence
38
38
- [ ] Sync status UI indicators
···
57
57
**Observability:**
58
58
59
59
- [ ] Structured logging with correlation IDs
60
60
-
- [ ] Metrics collection (Prometheus/OpenTelemetry)
61
61
-
- [ ] Distributed tracing for request flows
62
62
-
- [ ] Error tracking (Sentry or similar)
60
60
+
- [ ] Metrics collection (Tracing spans)
61
61
+
- [ ] Error tracking
63
62
64
63
**Reliability:**
65
64