···1+-- Create logs table for sync jobs and jetstream activity
2+CREATE TABLE logs (
3+ id BIGSERIAL PRIMARY KEY,
4+ created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
5+ log_type VARCHAR(50) NOT NULL, -- 'sync_job', 'jetstream', etc.
6+ job_id UUID NULL, -- For sync job logs, null for jetstream logs
7+ user_did TEXT NULL, -- User associated with the log (for filtering)
8+ slice_uri TEXT NULL, -- Slice associated with the log (for filtering)
9+ level VARCHAR(20) NOT NULL DEFAULT 'info', -- 'debug', 'info', 'warn', 'error'
10+ message TEXT NOT NULL,
11+ metadata JSONB NULL -- Additional structured data (counts, errors, etc.)
12+);
13+14+-- Create indexes for efficient queries
15+CREATE INDEX idx_logs_type_job_id ON logs (log_type, job_id);
16+CREATE INDEX idx_logs_type_created_at ON logs (log_type, created_at);
17+CREATE INDEX idx_logs_user_did ON logs (user_did);
18+CREATE INDEX idx_logs_slice_uri ON logs (slice_uri);
19+20+-- Add some helpful comments
21+COMMENT ON TABLE logs IS 'Unified logging table for sync jobs, jetstream, and other system activities';
22+COMMENT ON COLUMN logs.log_type IS 'Type of log entry: sync_job, jetstream, system, etc.';
23+COMMENT ON COLUMN logs.job_id IS 'Associated job ID for sync job logs, null for other log types';
24+COMMENT ON COLUMN logs.level IS 'Log level: debug, info, warn, error';
25+COMMENT ON COLUMN logs.metadata IS 'Additional structured data as JSON (progress, errors, counts, etc.)';
···1+-- Change record table to use composite primary key (uri, slice_uri)
2+-- This allows the same record to exist in multiple slices
3+4+-- First, drop the existing primary key constraint
5+ALTER TABLE record DROP CONSTRAINT record_pkey;
6+7+-- Add the new composite primary key
8+ALTER TABLE record ADD CONSTRAINT record_pkey PRIMARY KEY (uri, slice_uri);
9+10+-- Update the unique index on URI to be non-unique since URIs can now appear multiple times
11+-- (The existing idx_record_* indexes should still work fine for queries)
···23 StatusCode::INTERNAL_SERVER_ERROR => "Internal server error",
24 _ => "Request failed",
25 };
26-27 (status, Json(serde_json::json!({
28 "error": status.as_str(),
29 "message": message
···37 pub uri: String,
38 pub slice: String,
39}
40-41-42-43-4445// Dynamic XRPC handler that routes based on method name (for GET requests)
46pub async fn dynamic_xrpc_handler(
···103 .and_then(|v| v.as_str())
104 .ok_or(StatusCode::BAD_REQUEST)?
105 .to_string();
106-107 let limit = params.get("limit").and_then(|v| {
108 if let Some(s) = v.as_str() {
109 s.parse::<i32>().ok()
···111 v.as_i64().map(|i| i as i32)
112 }
113 });
114-115 let cursor = params.get("cursor")
116 .and_then(|v| v.as_str())
117 .map(|s| s.to_string());
118-119 // Parse sortBy from params - convert legacy sort string to new array format if present
120 let sort_by = params.get("sort")
121 .and_then(|v| v.as_str())
···139 }
140 sort_fields
141 });
142-143 // Parse where conditions from query params if present
144 let mut where_conditions = HashMap::new();
145-146 // Handle legacy author/authors params by converting to where clause
147 if let Some(author_str) = params.get("author").and_then(|v| v.as_str()) {
148 where_conditions.insert("did".to_string(), WhereCondition {
···161 contains: None,
162 });
163 }
164-165 // Handle legacy query param by converting to where clause with contains
166 if let Some(query_str) = params.get("query").and_then(|v| v.as_str()) {
167 let field = params.get("field")
···173 contains: Some(query_str.to_string()),
174 });
175 }
176-177 let where_clause = if where_conditions.is_empty() {
178 None
179 } else {
···182 or_conditions: None,
183 })
184 };
185-186 let records_params = SliceRecordsParams {
187 slice,
188 limit,
···190 where_clause,
191 sort_by: sort_by.clone(),
192 };
193-194 // First verify the collection belongs to this slice
195 let slice_collections = state.database.get_slice_collections_list(&records_params.slice).await
196 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
197-198 // Special handling: social.slices.lexicon is always allowed as it defines the schema
199 if collection != "social.slices.lexicon" && !slice_collections.contains(&collection) {
200 return Err(StatusCode::NOT_FOUND);
201 }
202-203 // Use the unified database method
204 match state.database.get_slice_collections_records(
205 &records_params.slice,
···211 Ok((mut records, cursor)) => {
212 // Filter records to only include the specific collection
213 records.retain(|record| record.collection == collection);
214-215 let indexed_records: Vec<IndexedRecord> = records.into_iter().map(|record| IndexedRecord {
216 uri: record.uri,
217 cid: record.cid,
···220 value: record.json,
221 indexed_at: record.indexed_at.to_rfc3339(),
222 }).collect();
223-224 let output = SliceRecordsOutput {
225 success: true,
226 records: indexed_records,
227 cursor,
228 message: None,
229 };
230-231 Ok(Json(serde_json::to_value(output)
232 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?))
233 },
···249 // First verify the collection belongs to this slice
250 let slice_collections = state.database.get_slice_collections_list(&get_params.slice).await
251 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
252-253 // Special handling: social.slices.lexicon is always allowed as it defines the schema
254 if collection != "social.slices.lexicon" && !slice_collections.contains(&collection) {
255 return Err(StatusCode::NOT_FOUND);
···284 // First verify the collection belongs to this slice
285 let slice_collections = state.database.get_slice_collections_list(&records_params.slice).await
286 .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "Database error"}))))?;
287-288 // Special handling: social.slices.lexicon is always allowed as it defines the schema
289 if collection != "social.slices.lexicon" && !slice_collections.contains(&collection) {
290 return Err((StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Collection not found"}))));
291 }
292-293 // Add collection filter to where conditions
294 let mut where_clause = records_params.where_clause.unwrap_or(crate::models::WhereClause {
295 conditions: HashMap::new(),
···301 contains: None,
302 });
303 records_params.where_clause = Some(where_clause);
304-305 // Use the unified database method
306 match state.database.get_slice_collections_records(
307 &records_params.slice,
···312 ).await {
313 Ok((records, cursor)) => {
314 // No need to filter - collection filter is in the SQL query now
315-316 // Transform Record to IndexedRecord for the response
317 let indexed_records: Vec<IndexedRecord> = records.into_iter().map(|record| IndexedRecord {
318 uri: record.uri,
···329 cursor,
330 message: None,
331 };
332-333 Ok(Json(serde_json::to_value(output)
334 .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "Serialization error"}))))?))
335 },
···350 // First verify the collection belongs to this slice
351 let slice_collections = state.database.get_slice_collections_list(&records_params.slice).await
352 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
353-354 // Special handling: social.slices.lexicon is always allowed as it defines the schema
355 if collection != "social.slices.lexicon" && !slice_collections.contains(&collection) {
356 return Err(StatusCode::NOT_FOUND);
357 }
358-359 // Add collection filter to where conditions
360 let mut where_clause = records_params.where_clause.unwrap_or(crate::models::WhereClause {
361 conditions: HashMap::new(),
···395 // First verify the collection belongs to this slice
396 let slice_collections = state.database.get_slice_collections_list(&records_params.slice).await
397 .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "Database error"}))))?;
398-399 // Special handling: social.slices.lexicon is always allowed as it defines the schema
400 if collection != "social.slices.lexicon" && !slice_collections.contains(&collection) {
401 return Err((StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Collection not found"}))));
402 }
403-404 // Add collection filter to where conditions
405 let mut where_clause = records_params.where_clause.unwrap_or(crate::models::WhereClause {
406 conditions: HashMap::new(),
···454 .and_then(|v| v.as_str())
455 .ok_or_else(|| status_to_error_response(StatusCode::BAD_REQUEST))?
456 .to_string();
457-458 let record_key = body.get("rkey")
459 .and_then(|v| v.as_str())
460 .filter(|s| !s.is_empty()) // Filter out empty strings
461 .map(|s| s.to_string());
462-463 let record_data = body.get("record")
464 .ok_or_else(|| status_to_error_response(StatusCode::BAD_REQUEST))?
465 .clone();
466-467-468 // Validate the record against its lexicon
469-470 // For social.slices.lexicon collection, validate against the system slice
471 let validation_slice_uri = if collection == "social.slices.lexicon" {
472 "at://did:plc:bcgltzqazw5tb6k2g3ttenbj/social.slices.slice/3lwzmbjpqxk2q"
473 } else {
474 &slice_uri
475 };
476-477-478 match LexiconValidator::for_slice(&state.database, validation_slice_uri).await {
479 Ok(validator) => {
480-481 // Debug: Get lexicons from the system slice to see what's there
482 if collection == "social.slices.lexicon" {
483 }
484-485 if let Err(e) = validator.validate_record(&collection, &record_data) {
486 return Err((StatusCode::BAD_REQUEST, Json(serde_json::json!({
487 "error": "ValidationError",
···496 }
497498 // Create record using AT Protocol functions with DPoP
499-500 let create_request = CreateRecordRequest {
501 repo: repo.clone(),
502 collection: collection.clone(),
···566 .and_then(|v| v.as_str())
567 .ok_or_else(|| status_to_error_response(StatusCode::BAD_REQUEST))?
568 .to_string();
569-570 let rkey = body.get("rkey")
571 .and_then(|v| v.as_str())
572 .ok_or_else(|| status_to_error_response(StatusCode::BAD_REQUEST))?
573 .to_string();
574-575 let record_data = body.get("record")
576 .ok_or_else(|| status_to_error_response(StatusCode::BAD_REQUEST))?
577 .clone();
578579 // Extract repo from user info
580 let repo = user_info.did.unwrap_or(user_info.sub);
581-582 // Validate the record against its lexicon
583 match LexiconValidator::for_slice(&state.database, &slice_uri).await {
584 Ok(validator) => {
···677 .await
678 .map_err(|_| status_to_error_response(StatusCode::INTERNAL_SERVER_ERROR))?;
679680- // Also delete from local database
681 let uri = format!("at://{}/{}/{}", repo, collection, rkey);
682- let _ = state.database.delete_record(&uri).await;
683684 Ok(Json(serde_json::json!({})))
685}
···782783 let result = validator.validate_record("social.slices.testRecord", &invalid_record);
784 assert!(result.is_err(), "Invalid record should fail validation");
785-786 if let Err(e) = result {
787 let error_message = format!("{}", e);
788 assert!(!error_message.is_empty(), "Error message should not be empty");
789 // Error message should be user-friendly and descriptive
790- assert!(error_message.contains("aspectRatio") || error_message.contains("required"),
791 "Error message should indicate what's wrong: {}", error_message);
792 }
793 }
···808809 let result = validator.validate_record("social.slices.testRecord", &invalid_record);
810 assert!(result.is_err(), "Constraint violation should fail validation");
811-812 if let Err(e) = result {
813 let error_message = format!("{}", e);
814 // Should indicate the specific constraint that was violated
815- assert!(error_message.contains("length") || error_message.contains("maximum") || error_message.contains("100"),
816 "Error message should indicate length constraint: {}", error_message);
817 }
818 }
···23 StatusCode::INTERNAL_SERVER_ERROR => "Internal server error",
24 _ => "Request failed",
25 };
26+27 (status, Json(serde_json::json!({
28 "error": status.as_str(),
29 "message": message
···37 pub uri: String,
38 pub slice: String,
39}
00004041// Dynamic XRPC handler that routes based on method name (for GET requests)
42pub async fn dynamic_xrpc_handler(
···99 .and_then(|v| v.as_str())
100 .ok_or(StatusCode::BAD_REQUEST)?
101 .to_string();
102+103 let limit = params.get("limit").and_then(|v| {
104 if let Some(s) = v.as_str() {
105 s.parse::<i32>().ok()
···107 v.as_i64().map(|i| i as i32)
108 }
109 });
110+111 let cursor = params.get("cursor")
112 .and_then(|v| v.as_str())
113 .map(|s| s.to_string());
114+115 // Parse sortBy from params - convert legacy sort string to new array format if present
116 let sort_by = params.get("sort")
117 .and_then(|v| v.as_str())
···135 }
136 sort_fields
137 });
138+139 // Parse where conditions from query params if present
140 let mut where_conditions = HashMap::new();
141+142 // Handle legacy author/authors params by converting to where clause
143 if let Some(author_str) = params.get("author").and_then(|v| v.as_str()) {
144 where_conditions.insert("did".to_string(), WhereCondition {
···157 contains: None,
158 });
159 }
160+161 // Handle legacy query param by converting to where clause with contains
162 if let Some(query_str) = params.get("query").and_then(|v| v.as_str()) {
163 let field = params.get("field")
···169 contains: Some(query_str.to_string()),
170 });
171 }
172+173 let where_clause = if where_conditions.is_empty() {
174 None
175 } else {
···178 or_conditions: None,
179 })
180 };
181+182 let records_params = SliceRecordsParams {
183 slice,
184 limit,
···186 where_clause,
187 sort_by: sort_by.clone(),
188 };
189+190 // First verify the collection belongs to this slice
191 let slice_collections = state.database.get_slice_collections_list(&records_params.slice).await
192 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
193+194 // Special handling: social.slices.lexicon is always allowed as it defines the schema
195 if collection != "social.slices.lexicon" && !slice_collections.contains(&collection) {
196 return Err(StatusCode::NOT_FOUND);
197 }
198+199 // Use the unified database method
200 match state.database.get_slice_collections_records(
201 &records_params.slice,
···207 Ok((mut records, cursor)) => {
208 // Filter records to only include the specific collection
209 records.retain(|record| record.collection == collection);
210+211 let indexed_records: Vec<IndexedRecord> = records.into_iter().map(|record| IndexedRecord {
212 uri: record.uri,
213 cid: record.cid,
···216 value: record.json,
217 indexed_at: record.indexed_at.to_rfc3339(),
218 }).collect();
219+220 let output = SliceRecordsOutput {
221 success: true,
222 records: indexed_records,
223 cursor,
224 message: None,
225 };
226+227 Ok(Json(serde_json::to_value(output)
228 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?))
229 },
···245 // First verify the collection belongs to this slice
246 let slice_collections = state.database.get_slice_collections_list(&get_params.slice).await
247 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
248+249 // Special handling: social.slices.lexicon is always allowed as it defines the schema
250 if collection != "social.slices.lexicon" && !slice_collections.contains(&collection) {
251 return Err(StatusCode::NOT_FOUND);
···280 // First verify the collection belongs to this slice
281 let slice_collections = state.database.get_slice_collections_list(&records_params.slice).await
282 .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "Database error"}))))?;
283+284 // Special handling: social.slices.lexicon is always allowed as it defines the schema
285 if collection != "social.slices.lexicon" && !slice_collections.contains(&collection) {
286 return Err((StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Collection not found"}))));
287 }
288+289 // Add collection filter to where conditions
290 let mut where_clause = records_params.where_clause.unwrap_or(crate::models::WhereClause {
291 conditions: HashMap::new(),
···297 contains: None,
298 });
299 records_params.where_clause = Some(where_clause);
300+301 // Use the unified database method
302 match state.database.get_slice_collections_records(
303 &records_params.slice,
···308 ).await {
309 Ok((records, cursor)) => {
310 // No need to filter - collection filter is in the SQL query now
311+312 // Transform Record to IndexedRecord for the response
313 let indexed_records: Vec<IndexedRecord> = records.into_iter().map(|record| IndexedRecord {
314 uri: record.uri,
···325 cursor,
326 message: None,
327 };
328+329 Ok(Json(serde_json::to_value(output)
330 .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "Serialization error"}))))?))
331 },
···346 // First verify the collection belongs to this slice
347 let slice_collections = state.database.get_slice_collections_list(&records_params.slice).await
348 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
349+350 // Special handling: social.slices.lexicon is always allowed as it defines the schema
351 if collection != "social.slices.lexicon" && !slice_collections.contains(&collection) {
352 return Err(StatusCode::NOT_FOUND);
353 }
354+355 // Add collection filter to where conditions
356 let mut where_clause = records_params.where_clause.unwrap_or(crate::models::WhereClause {
357 conditions: HashMap::new(),
···391 // First verify the collection belongs to this slice
392 let slice_collections = state.database.get_slice_collections_list(&records_params.slice).await
393 .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "Database error"}))))?;
394+395 // Special handling: social.slices.lexicon is always allowed as it defines the schema
396 if collection != "social.slices.lexicon" && !slice_collections.contains(&collection) {
397 return Err((StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Collection not found"}))));
398 }
399+400 // Add collection filter to where conditions
401 let mut where_clause = records_params.where_clause.unwrap_or(crate::models::WhereClause {
402 conditions: HashMap::new(),
···450 .and_then(|v| v.as_str())
451 .ok_or_else(|| status_to_error_response(StatusCode::BAD_REQUEST))?
452 .to_string();
453+454 let record_key = body.get("rkey")
455 .and_then(|v| v.as_str())
456 .filter(|s| !s.is_empty()) // Filter out empty strings
457 .map(|s| s.to_string());
458+459 let record_data = body.get("record")
460 .ok_or_else(|| status_to_error_response(StatusCode::BAD_REQUEST))?
461 .clone();
462+463+464 // Validate the record against its lexicon
465+466 // For social.slices.lexicon collection, validate against the system slice
467 let validation_slice_uri = if collection == "social.slices.lexicon" {
468 "at://did:plc:bcgltzqazw5tb6k2g3ttenbj/social.slices.slice/3lwzmbjpqxk2q"
469 } else {
470 &slice_uri
471 };
472+473+474 match LexiconValidator::for_slice(&state.database, validation_slice_uri).await {
475 Ok(validator) => {
476+477 // Debug: Get lexicons from the system slice to see what's there
478 if collection == "social.slices.lexicon" {
479 }
480+481 if let Err(e) = validator.validate_record(&collection, &record_data) {
482 return Err((StatusCode::BAD_REQUEST, Json(serde_json::json!({
483 "error": "ValidationError",
···492 }
493494 // Create record using AT Protocol functions with DPoP
495+496 let create_request = CreateRecordRequest {
497 repo: repo.clone(),
498 collection: collection.clone(),
···562 .and_then(|v| v.as_str())
563 .ok_or_else(|| status_to_error_response(StatusCode::BAD_REQUEST))?
564 .to_string();
565+566 let rkey = body.get("rkey")
567 .and_then(|v| v.as_str())
568 .ok_or_else(|| status_to_error_response(StatusCode::BAD_REQUEST))?
569 .to_string();
570+571 let record_data = body.get("record")
572 .ok_or_else(|| status_to_error_response(StatusCode::BAD_REQUEST))?
573 .clone();
574575 // Extract repo from user info
576 let repo = user_info.did.unwrap_or(user_info.sub);
577+578 // Validate the record against its lexicon
579 match LexiconValidator::for_slice(&state.database, &slice_uri).await {
580 Ok(validator) => {
···673 .await
674 .map_err(|_| status_to_error_response(StatusCode::INTERNAL_SERVER_ERROR))?;
675676+ // Also delete from local database (from all slices)
677 let uri = format!("at://{}/{}/{}", repo, collection, rkey);
678+ let _ = state.database.delete_record_by_uri(&uri, None).await;
679680 Ok(Json(serde_json::json!({})))
681}
···778779 let result = validator.validate_record("social.slices.testRecord", &invalid_record);
780 assert!(result.is_err(), "Invalid record should fail validation");
781+782 if let Err(e) = result {
783 let error_message = format!("{}", e);
784 assert!(!error_message.is_empty(), "Error message should not be empty");
785 // Error message should be user-friendly and descriptive
786+ assert!(error_message.contains("aspectRatio") || error_message.contains("required"),
787 "Error message should indicate what's wrong: {}", error_message);
788 }
789 }
···804805 let result = validator.validate_record("social.slices.testRecord", &invalid_record);
806 assert!(result.is_err(), "Constraint violation should fail validation");
807+808 if let Err(e) = result {
809 let error_message = format!("{}", e);
810 // Should indicate the specific constraint that was violated
811+ assert!(error_message.contains("length") || error_message.contains("maximum") || error_message.contains("100"),
812 "Error message should indicate length constraint: {}", error_message);
813 }
814 }
+1-1
api/src/jetstream.rs
···264 // DID is an actor in our system, delete the record globally
265 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey);
266267- match self.database.delete_record_by_uri(&uri).await {
268 Ok(rows_affected) => {
269 if rows_affected > 0 {
270 info!("✓ Deleted record globally: {} ({} rows)", uri, rows_affected);
···264 // DID is an actor in our system, delete the record globally
265 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey);
266267+ match self.database.delete_record_by_uri(&uri, None).await {
268 Ok(rows_affected) => {
269 if rows_affected > 0 {
270 info!("✓ Deleted record globally: {} ({} rows)", uri, rows_affected);
+108-5
api/src/jobs.rs
···4use uuid::Uuid;
5use crate::sync::SyncService;
6use crate::models::BulkSyncParams;
007use tracing::{info, error};
89/// Payload for sync jobs
···26 pub message: String,
27}
2829-/// Initialize the job registry with all job handlers
30pub fn registry() -> JobRegistry {
31 JobRegistry::new(&[sync_job])
32}
···41 payload.job_id, payload.user_did, payload.slice_uri
42 );
4344- // Get database pool from job context
45 let pool = current_job.pool();
04647- // Create sync service
00000000000000000048 let database = crate::database::Database::from_pool(pool.clone());
49 let relay_endpoint = std::env::var("RELAY_ENDPOINT")
50 .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string());
51- let sync_service = SyncService::new(database.clone(), relay_endpoint);
0000005253 // Track progress
54 let start_time = std::time::Instant::now();
···65 .await
66 {
67 Ok((repos_processed, records_synced)) => {
068 let result = SyncJobResult {
69 success: true,
70 total_records: records_synced,
···75 repos_processed,
76 message: format!(
77 "Sync completed successfully in {:?}",
78- start_time.elapsed()
79 ),
80 };
81000000000000000082 // Store result in database before completing the job
83 store_job_result(
84 pool,
···107 Err(e) => {
108 error!("Sync job {} failed: {}", payload.job_id, e);
1090000000000000110 let result = SyncJobResult {
111 success: false,
112 total_records: 0,
···191 slice_uri: String,
192 params: BulkSyncParams,
193) -> Result<Uuid, Box<dyn std::error::Error + Send + Sync>> {
0000000000000000000000000000000000000000000000194 let job_id = Uuid::new_v4();
195196 let payload = SyncJobPayload {
···4use uuid::Uuid;
5use crate::sync::SyncService;
6use crate::models::BulkSyncParams;
7+use crate::logging::LogLevel;
8+use serde_json::json;
9use tracing::{info, error};
1011/// Payload for sync jobs
···28 pub message: String,
29}
3031+/// Initialize the job registry with all job handlers
32pub fn registry() -> JobRegistry {
33 JobRegistry::new(&[sync_job])
34}
···43 payload.job_id, payload.user_did, payload.slice_uri
44 );
4546+ // Get database pool and logger
47 let pool = current_job.pool();
48+ let logger = crate::logging::Logger::global();
4950+ // Log job start
51+ logger.log_sync_job(
52+ payload.job_id,
53+ &payload.user_did,
54+ &payload.slice_uri,
55+ LogLevel::Info,
56+ &format!("Starting sync job for {} collections",
57+ payload.params.collections.as_ref().map(|c| c.len()).unwrap_or(0) +
58+ payload.params.external_collections.as_ref().map(|c| c.len()).unwrap_or(0)
59+ ),
60+ Some(json!({
61+ "collections": payload.params.collections,
62+ "external_collections": payload.params.external_collections,
63+ "repos": payload.params.repos,
64+ "skip_validation": payload.params.skip_validation
65+ }))
66+ );
67+68+ // Create sync service with logging
69 let database = crate::database::Database::from_pool(pool.clone());
70 let relay_endpoint = std::env::var("RELAY_ENDPOINT")
71 .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string());
72+ let sync_service = SyncService::with_logging(
73+ database.clone(),
74+ relay_endpoint,
75+ logger.clone(),
76+ payload.job_id,
77+ payload.user_did.clone()
78+ );
7980 // Track progress
81 let start_time = std::time::Instant::now();
···92 .await
93 {
94 Ok((repos_processed, records_synced)) => {
95+ let elapsed = start_time.elapsed();
96 let result = SyncJobResult {
97 success: true,
98 total_records: records_synced,
···103 repos_processed,
104 message: format!(
105 "Sync completed successfully in {:?}",
106+ elapsed
107 ),
108 };
109110+ // Log successful completion
111+ logger.log_sync_job(
112+ payload.job_id,
113+ &payload.user_did,
114+ &payload.slice_uri,
115+ LogLevel::Info,
116+ &format!("Sync completed successfully: {} repos, {} records in {:?}",
117+ repos_processed, records_synced, elapsed),
118+ Some(json!({
119+ "repos_processed": repos_processed,
120+ "records_synced": records_synced,
121+ "duration_secs": elapsed.as_secs_f64(),
122+ "collections_synced": result.collections_synced
123+ }))
124+ );
125+126 // Store result in database before completing the job
127 store_job_result(
128 pool,
···151 Err(e) => {
152 error!("Sync job {} failed: {}", payload.job_id, e);
153154+ // Log error
155+ logger.log_sync_job(
156+ payload.job_id,
157+ &payload.user_did,
158+ &payload.slice_uri,
159+ LogLevel::Error,
160+ &format!("Sync job failed: {}", e),
161+ Some(json!({
162+ "error": e.to_string(),
163+ "duration_secs": start_time.elapsed().as_secs_f64()
164+ }))
165+ );
166+167 let result = SyncJobResult {
168 success: false,
169 total_records: 0,
···248 slice_uri: String,
249 params: BulkSyncParams,
250) -> Result<Uuid, Box<dyn std::error::Error + Send + Sync>> {
251+ // Check if there's already a running sync job for this user+slice combination
252+ // We do this by checking:
253+ // 1. If there are any jobs in mq_msgs for sync_queue channel that haven't been processed yet
254+ // 2. If there are any recent job_results entries that indicate a job might still be running
255+ let existing_running_msg = sqlx::query!(
256+ r#"
257+ SELECT m.id
258+ FROM mq_msgs m
259+ JOIN mq_payloads p ON m.id = p.id
260+ WHERE m.channel_name = 'sync_queue'
261+ AND m.id != '00000000-0000-0000-0000-000000000000'
262+ AND p.payload_json->>'user_did' = $1
263+ AND p.payload_json->>'slice_uri' = $2
264+ AND m.attempt_at <= NOW()
265+ "#,
266+ user_did,
267+ slice_uri
268+ )
269+ .fetch_optional(pool)
270+ .await?;
271+272+ // Also check if there's a very recent job that might still be running
273+ // (within the last 10 minutes and no completion record)
274+ let recent_start = sqlx::query!(
275+ r#"
276+ SELECT m.id
277+ FROM mq_msgs m
278+ JOIN mq_payloads p ON m.id = p.id
279+ LEFT JOIN job_results jr ON (p.payload_json->>'job_id')::uuid = jr.job_id
280+ WHERE m.channel_name = 'sync_queue'
281+ AND m.id != '00000000-0000-0000-0000-000000000000'
282+ AND p.payload_json->>'user_did' = $1
283+ AND p.payload_json->>'slice_uri' = $2
284+ AND m.created_at > NOW() - INTERVAL '10 minutes'
285+ AND jr.job_id IS NULL
286+ "#,
287+ user_did,
288+ slice_uri
289+ )
290+ .fetch_optional(pool)
291+ .await?;
292+293+ if existing_running_msg.is_some() || recent_start.is_some() {
294+ return Err("A sync job is already running for this slice. Please wait for it to complete before starting another.".into());
295+ }
296+297 let job_id = Uuid::new_v4();
298299 let payload = SyncJobPayload {