//! Record CRUD operations and queries. //! //! This module handles all database operations related to ATProto records, //! including insertion, updates, deletion, and complex queries with filtering, //! sorting, and pagination. use super::client::Database; use super::cursor::{build_cursor_where_condition, decode_cursor, generate_cursor_from_record}; use super::query_builder::{ bind_where_parameters, bind_where_parameters_scalar, build_order_by_clause_with_field_info, build_where_conditions, }; use super::types::{SortField, WhereClause}; use crate::errors::DatabaseError; use crate::models::{IndexedRecord, Record}; use sqlx::Row; impl Database { /// Inserts a single record into the database. /// /// Uses ON CONFLICT to update existing records with matching URI and slice_uri. pub async fn insert_record(&self, record: &Record) -> Result<(), DatabaseError> { sqlx::query!( r#"INSERT INTO "record" ("uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri") VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT ON CONSTRAINT record_pkey DO UPDATE SET "cid" = EXCLUDED."cid", "json" = EXCLUDED."json", "indexed_at" = EXCLUDED."indexed_at""#, record.uri, record.cid, record.did, record.collection, record.json, record.indexed_at, record.slice_uri ) .execute(&self.pool) .await?; Ok(()) } /// Inserts multiple records in optimized batches. /// /// Automatically chunks records to stay within PostgreSQL parameter limits /// (65536 parameters, ~8000 records per batch with 7 fields each). pub async fn batch_insert_records(&self, records: &[Record]) -> Result<(), DatabaseError> { if records.is_empty() { return Ok(()); } const BATCH_SIZE: usize = 8000; for chunk in records.chunks(BATCH_SIZE) { self.batch_insert_records_chunk(chunk).await?; } Ok(()) } /// Internal helper to insert a single chunk of records. async fn batch_insert_records_chunk(&self, records: &[Record]) -> Result<(), DatabaseError> { let mut tx = self.pool.begin().await?; let mut deduped = std::collections::HashMap::new(); for record in records { let key = (&record.uri, &record.slice_uri); deduped.insert(key, record); } let records: Vec<&Record> = deduped.into_values().collect(); let mut query = String::from( r#"INSERT INTO "record" ("uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri") VALUES "#, ); for (i, _) in records.iter().enumerate() { if i > 0 { query.push_str(", "); } let base = i * 7 + 1; query.push_str(&format!( "(${}, ${}, ${}, ${}, ${}, ${}, ${})", base, base + 1, base + 2, base + 3, base + 4, base + 5, base + 6 )); } query.push_str( r#" ON CONFLICT ON CONSTRAINT record_pkey DO UPDATE SET "cid" = EXCLUDED."cid", "json" = EXCLUDED."json", "indexed_at" = EXCLUDED."indexed_at" "#, ); let mut sqlx_query = sqlx::query(&query); for record in records { sqlx_query = sqlx_query .bind(&record.uri) .bind(&record.cid) .bind(&record.did) .bind(&record.collection) .bind(&record.json) .bind(record.indexed_at) .bind(&record.slice_uri); } sqlx_query.execute(&mut *tx).await?; tx.commit().await?; Ok(()) } /// Gets a map of existing record CIDs for a specific actor, collection, and slice. /// /// Used during sync to determine which records need updating vs inserting. /// /// # Returns /// HashMap mapping URI -> CID pub async fn get_existing_record_cids_for_slice( &self, did: &str, collection: &str, slice_uri: &str, ) -> Result, DatabaseError> { let records = sqlx::query!( r#"SELECT "uri", "cid" FROM "record" WHERE "did" = $1 AND "collection" = $2 AND "slice_uri" = $3"#, did, collection, slice_uri ) .fetch_all(&self.pool) .await?; let mut cid_map = std::collections::HashMap::new(); for record in records { cid_map.insert(record.uri, record.cid); } Ok(cid_map) } /// Retrieves a single record by URI. /// /// # Returns /// Some(IndexedRecord) if found, None otherwise pub async fn get_record(&self, uri: &str) -> Result, DatabaseError> { let record = sqlx::query_as::<_, Record>( r#"SELECT "uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri" FROM "record" WHERE "uri" = $1"#, ) .bind(uri) .fetch_optional(&self.pool) .await?; let indexed_record = record.map(|record| IndexedRecord { uri: record.uri, cid: record.cid, did: record.did, collection: record.collection, value: record.json, indexed_at: record.indexed_at.to_rfc3339(), }); Ok(indexed_record) } /// Updates an existing record. /// /// Returns error if no record with matching URI and slice_uri exists. pub async fn update_record(&self, record: &Record) -> Result<(), DatabaseError> { let result = sqlx::query!( r#"UPDATE "record" SET "cid" = $1, "json" = $2, "indexed_at" = $3 WHERE "uri" = $4 AND "slice_uri" = $5"#, record.cid, record.json, record.indexed_at, record.uri, record.slice_uri ) .execute(&self.pool) .await?; if result.rows_affected() == 0 { return Err(DatabaseError::RecordNotFound { uri: record.uri.clone(), }); } Ok(()) } /// Queries records for a slice with advanced filtering, sorting, and pagination. /// /// Supports: /// - Cursor-based pagination /// - Multi-field sorting (with JSON path support) /// - Complex WHERE conditions (AND/OR, eq/in/contains operators) /// - Automatic handling of lexicon records vs regular records /// /// # Returns /// Tuple of (records, next_cursor) pub async fn get_slice_collections_records( &self, slice_uri: &str, limit: Option, cursor: Option<&str>, sort_by: Option<&Vec>, where_clause: Option<&WhereClause>, ) -> Result<(Vec, Option), DatabaseError> { // Default to 50 for API requests, but support unlimited queries for DataLoader let limit = limit.unwrap_or(50); let mut where_clauses = Vec::new(); let mut param_count = 1; // Extract collection name from where clause for lexicon lookup let collection = where_clause .as_ref() .and_then(|wc| wc.conditions.get("collection")) .and_then(|c| c.eq.as_ref()) .and_then(|v| v.as_str()); // Determine which sort fields are datetime fields let field_types: Option> = if let Some(sort_fields) = sort_by { if let Some(collection_name) = collection { // Fetch lexicons to check field types for JSON fields match self.get_lexicons_by_slice(slice_uri).await { Ok(lexicons) => { let types: Vec = sort_fields .iter() .map(|field| { // indexed_at is always a datetime table column if field.field == "indexed_at" { true } else { is_field_datetime(&lexicons, collection_name, &field.field) } }) .collect(); Some(types) } Err(_) => { // Fallback: mark indexed_at as datetime even without lexicons let types: Vec = sort_fields .iter() .map(|field| field.field == "indexed_at") .collect(); Some(types) } } } else { // No collection filter, but we can still identify table columns let types: Vec = sort_fields .iter() .map(|field| field.field == "indexed_at") .collect(); Some(types) } } else { None }; // Get first field type for ORDER BY (for backward compatibility) let primary_field_is_datetime = field_types .as_ref() .and_then(|types| types.first().copied()); // Build ORDER BY clause with datetime field information let order_by = build_order_by_clause_with_field_info(sort_by, primary_field_is_datetime); let is_lexicon = where_clause .as_ref() .and_then(|wc| wc.conditions.get("collection")) .and_then(|c| c.eq.as_ref()) .and_then(|v| v.as_str()) == Some("network.slices.lexicon"); // Check if where_clause already filters by "slice" field (for reverse joins) let has_slice_filter = where_clause .as_ref() .map(|wc| wc.conditions.contains_key("slice")) .unwrap_or(false); if is_lexicon && !has_slice_filter { // For lexicons without explicit slice filter, use the query slice_uri where_clauses.push(format!("json->>'slice' = ${}", param_count)); param_count += 1; } else if !is_lexicon { // For non-lexicons, use slice_uri column where_clauses.push(format!("slice_uri = ${}", param_count)); param_count += 1; } // Build all other WHERE conditions first (including collection filter) // For non-lexicon records, exclude the 'slice' field since we handle it via slice_uri let mut filtered_where_clause = None; let filtered_clause; if is_lexicon { filtered_where_clause = where_clause; } else if let Some(wc) = where_clause { let mut filtered_conditions = std::collections::HashMap::new(); for (field, condition) in &wc.conditions { if field != "slice" { filtered_conditions.insert(field.clone(), condition.clone()); } } filtered_clause = WhereClause { conditions: filtered_conditions, or_conditions: wc.or_conditions.clone(), and: wc.and.clone(), or: wc.or.clone(), }; filtered_where_clause = Some(&filtered_clause); } let (and_conditions, or_conditions) = build_where_conditions(filtered_where_clause, &mut param_count); where_clauses.extend(and_conditions); // Add cursor conditions last to ensure proper parameter order let mut cursor_bind_values = Vec::new(); if let Some(cursor_str) = cursor { match decode_cursor(cursor_str, sort_by) { Ok(decoded_cursor) => { // Use the datetime field information we already computed let field_type_slice = field_types.as_deref(); let (cursor_where, bind_values) = build_cursor_where_condition( &decoded_cursor, sort_by, &mut param_count, field_type_slice, ); where_clauses.push(cursor_where); cursor_bind_values = bind_values; } Err(e) => { // Log the error but don't fail the request eprintln!("Invalid cursor format: {}", e); } } } if !or_conditions.is_empty() { let or_clause = format!("({})", or_conditions.join(" OR ")); where_clauses.push(or_clause); } let where_sql = where_clauses .into_iter() .filter(|clause| !clause.is_empty()) .collect::>() .join(" AND "); // Assign limit parameter AFTER all other parameters let limit_param = param_count; let query = format!( "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE {} ORDER BY {} LIMIT ${}", where_sql, order_by, limit_param ); let mut query_builder = sqlx::query_as::<_, Record>(&query); // Bind slice_uri only if we added it to the query if is_lexicon && !has_slice_filter { query_builder = query_builder.bind(slice_uri); } else if !is_lexicon { query_builder = query_builder.bind(slice_uri); } // Bind WHERE condition parameters (including collection filter) query_builder = bind_where_parameters(query_builder, filtered_where_clause); // Bind cursor values after WHERE conditions for cursor_value in cursor_bind_values { query_builder = query_builder.bind(cursor_value); } query_builder = query_builder.bind(limit as i64); let mut records = query_builder.fetch_all(&self.pool).await?; // Deduplicate lexicon records by URI (same URI can exist with different slice_uri values) if is_lexicon { let mut seen_uris = std::collections::HashSet::new(); records.retain(|record| seen_uris.insert(record.uri.clone())); } // Only return cursor if we got a full page, indicating there might be more let cursor = if records.len() < limit as usize { None // Last page - no more results } else { records .last() .map(|record| generate_cursor_from_record(record, sort_by)) }; Ok((records, cursor)) } /// Counts records matching the given criteria. /// /// Used for pagination metadata and statistics. pub async fn count_slice_collections_records( &self, slice_uri: &str, where_clause: Option<&WhereClause>, ) -> Result { let mut where_clauses = Vec::new(); let mut param_count = 1; let is_lexicon = where_clause .as_ref() .and_then(|wc| wc.conditions.get("collection")) .and_then(|c| c.eq.as_ref()) .and_then(|v| v.as_str()) == Some("network.slices.lexicon"); // Check if where_clause already filters by "slice" field (for reverse joins) let has_slice_filter = where_clause .as_ref() .map(|wc| wc.conditions.contains_key("slice")) .unwrap_or(false); if is_lexicon && !has_slice_filter { // For lexicons without explicit slice filter, use the query slice_uri where_clauses.push(format!("json->>'slice' = ${}", param_count)); param_count += 1; } else if !is_lexicon { // For non-lexicons, use slice_uri column where_clauses.push(format!("slice_uri = ${}", param_count)); param_count += 1; } let (and_conditions, or_conditions) = build_where_conditions(where_clause, &mut param_count); where_clauses.extend(and_conditions); if !or_conditions.is_empty() { let or_clause = format!("({})", or_conditions.join(" OR ")); where_clauses.push(or_clause); } let filtered_where_clauses: Vec<_> = where_clauses .into_iter() .filter(|clause| !clause.is_empty()) .collect(); let where_sql = if filtered_where_clauses.is_empty() { String::new() } else { format!(" WHERE {}", filtered_where_clauses.join(" AND ")) }; let query = format!("SELECT COUNT(*) as count FROM record{}", where_sql); let mut query_builder = sqlx::query_scalar::<_, i64>(&query); // Bind slice_uri only if we added it to the query if is_lexicon && !has_slice_filter { query_builder = query_builder.bind(slice_uri); } else if !is_lexicon { query_builder = query_builder.bind(slice_uri); } query_builder = bind_where_parameters_scalar(query_builder, where_clause); let count = query_builder.fetch_one(&self.pool).await?; Ok(count) } /// Queries aggregated records with GROUP BY support. /// /// # Arguments /// * `slice_uri` - AT-URI of the slice to query /// * `group_by_fields` - JSON paths to group by (e.g., ["releaseMbId", "releaseName"]) /// * `where_clause` - Optional WHERE conditions /// * `order_by_count` - Optional ordering ("asc" or "desc") /// * `limit` - Maximum number of groups to return /// /// # Returns /// Vec of (field_values, count) tuples pub async fn get_aggregated_records( &self, slice_uri: &str, group_by_fields: &[crate::models::GroupByField], where_clause: Option<&WhereClause>, order_by_count: Option<&str>, limit: Option, ) -> Result, DatabaseError> { if group_by_fields.is_empty() { return Ok(Vec::new()); } let limit = limit.unwrap_or(50).min(1000); let mut param_count = 1; // Build SELECT clause with JSON field extraction and optional date truncation let select_fields: Vec = group_by_fields .iter() .enumerate() .map(|(i, group_by_field)| { match group_by_field { crate::models::GroupByField::Simple(field) => { // Check if it's a table column if matches!( field.as_str(), "did" | "collection" | "uri" | "cid" | "indexed_at" ) { format!("\"{}\" as field_{}", field, i) } else { // JSON field format!("json->>'{}' as field_{}", field, i) } } crate::models::GroupByField::Truncated { field, interval } => { // Date truncation using PostgreSQL's date_trunc function let interval_str = interval.to_pg_interval(); // Check if it's a table column if field == "indexed_at" { format!( "date_trunc('{}', \"{}\")::text as field_{}", interval_str, field, i ) } else { // JSON field - cast to timestamp for date_trunc, then to text format!( "date_trunc('{}', (json->>'{}')::timestamp)::text as field_{}", interval_str, field, i ) } } } }) .collect(); let select_clause = format!("{}, COUNT(*) as count", select_fields.join(", ")); // Build GROUP BY clause let group_by_clause: Vec = (0..group_by_fields.len()) .map(|i| format!("field_{}", i)) .collect(); // Build WHERE clause let mut where_clauses = vec![format!("slice_uri = ${}", param_count)]; param_count += 1; let (and_conditions, or_conditions) = build_where_conditions(where_clause, &mut param_count); where_clauses.extend(and_conditions); if !or_conditions.is_empty() { let or_clause = format!("({})", or_conditions.join(" OR ")); where_clauses.push(or_clause); } let where_sql = format!(" WHERE {}", where_clauses.join(" AND ")); // Build ORDER BY clause let order_by_sql = match order_by_count { Some("asc") => " ORDER BY count ASC", Some("desc") | Some(_) | None => " ORDER BY count DESC", }; let query = format!( "SELECT {} FROM record{} GROUP BY {} {} LIMIT {}", select_clause, where_sql, group_by_clause.join(", "), order_by_sql, limit ); tracing::debug!("Generated SQL: {}", query); let mut query_builder = sqlx::query(&query); query_builder = query_builder.bind(slice_uri); // Bind WHERE parameters manually if let Some(clause) = where_clause { for condition in clause.conditions.values() { if let Some(eq_value) = &condition.eq { if let Some(str_val) = eq_value.as_str() { query_builder = query_builder.bind(str_val); } else { query_builder = query_builder.bind(eq_value.to_string()); } } if let Some(in_values) = &condition.in_values { let str_values: Vec = in_values .iter() .filter_map(|v| v.as_str().map(|s| s.to_string())) .collect(); query_builder = query_builder.bind(str_values); } if let Some(contains_value) = &condition.contains { query_builder = query_builder.bind(contains_value); } if let Some(gt_value) = &condition.gt { if let Some(str_val) = gt_value.as_str() { query_builder = query_builder.bind(str_val); } else { query_builder = query_builder.bind(gt_value.to_string()); } } if let Some(gte_value) = &condition.gte { if let Some(str_val) = gte_value.as_str() { query_builder = query_builder.bind(str_val); } else { query_builder = query_builder.bind(gte_value.to_string()); } } if let Some(lt_value) = &condition.lt { if let Some(str_val) = lt_value.as_str() { query_builder = query_builder.bind(str_val); } else { query_builder = query_builder.bind(lt_value.to_string()); } } if let Some(lte_value) = &condition.lte { if let Some(str_val) = lte_value.as_str() { query_builder = query_builder.bind(str_val); } else { query_builder = query_builder.bind(lte_value.to_string()); } } } if let Some(or_conditions) = &clause.or_conditions { for condition in or_conditions.values() { if let Some(eq_value) = &condition.eq { if let Some(str_val) = eq_value.as_str() { query_builder = query_builder.bind(str_val); } else { query_builder = query_builder.bind(eq_value.to_string()); } } if let Some(in_values) = &condition.in_values { let str_values: Vec = in_values .iter() .filter_map(|v| v.as_str().map(|s| s.to_string())) .collect(); query_builder = query_builder.bind(str_values); } if let Some(contains_value) = &condition.contains { query_builder = query_builder.bind(contains_value); } if let Some(gt_value) = &condition.gt { if let Some(str_val) = gt_value.as_str() { query_builder = query_builder.bind(str_val); } else { query_builder = query_builder.bind(gt_value.to_string()); } } if let Some(gte_value) = &condition.gte { if let Some(str_val) = gte_value.as_str() { query_builder = query_builder.bind(str_val); } else { query_builder = query_builder.bind(gte_value.to_string()); } } if let Some(lt_value) = &condition.lt { if let Some(str_val) = lt_value.as_str() { query_builder = query_builder.bind(str_val); } else { query_builder = query_builder.bind(lt_value.to_string()); } } if let Some(lte_value) = &condition.lte { if let Some(str_val) = lte_value.as_str() { query_builder = query_builder.bind(str_val); } else { query_builder = query_builder.bind(lte_value.to_string()); } } } } } let rows = query_builder.fetch_all(&self.pool).await?; // Convert rows to JSON objects let mut results = Vec::new(); for row in rows { let mut obj = serde_json::Map::new(); // Extract grouped field values for (i, group_by_field) in group_by_fields.iter().enumerate() { let col_name = format!("field_{}", i); let value: Option = row.try_get(col_name.as_str()).ok(); // Try to parse as JSON first (for arrays/objects), otherwise use as string let json_value = if let Some(ref str_val) = value { // Check if it looks like JSON (starts with [ or {) if str_val.starts_with('[') || str_val.starts_with('{') { // Try to parse as JSON serde_json::from_str(str_val) .unwrap_or_else(|_| serde_json::Value::String(str_val.clone())) } else { serde_json::Value::String(str_val.clone()) } } else { serde_json::Value::Null }; obj.insert(group_by_field.field_name().to_string(), json_value); } // Extract count let count: i64 = row.try_get("count").unwrap_or(0); obj.insert("count".to_string(), serde_json::Value::Number(count.into())); results.push(serde_json::Value::Object(obj)); } Ok(results) } /// Deletes a record by URI. /// /// If slice_uri is provided, only deletes from that slice. /// Otherwise deletes from all slices. /// /// # Returns /// Number of rows affected pub async fn delete_record_by_uri( &self, uri: &str, slice_uri: Option<&str>, ) -> Result { let result = if let Some(slice_uri) = slice_uri { sqlx::query("DELETE FROM record WHERE uri = $1 AND slice_uri = $2") .bind(uri) .bind(slice_uri) .execute(&self.pool) .await? } else { sqlx::query("DELETE FROM record WHERE uri = $1") .bind(uri) .execute(&self.pool) .await? }; Ok(result.rows_affected()) } /// Deletes all records for a specific slice. /// /// This is a destructive operation that removes all indexed records /// from the specified slice. Records can be recovered by re-syncing. /// /// # Arguments /// * `slice_uri` - AT-URI of the slice to clear /// /// # Returns /// Number of records deleted pub async fn delete_all_records_for_slice( &self, slice_uri: &str, ) -> Result { let result = sqlx::query( "DELETE FROM record WHERE slice_uri = $1 AND collection NOT LIKE 'network.slices.%'", ) .bind(slice_uri) .execute(&self.pool) .await?; Ok(result.rows_affected()) } /// Deletes all records of a specific collection from a slice. /// /// Used when a lexicon is deleted to clean up all records of that type. /// /// # Arguments /// * `slice_uri` - AT-URI of the slice /// * `collection` - Collection name (NSID) to delete /// /// # Returns /// Number of records deleted pub async fn delete_records_by_collection( &self, slice_uri: &str, collection: &str, ) -> Result { let result = sqlx::query("DELETE FROM record WHERE slice_uri = $1 AND collection = $2") .bind(slice_uri) .bind(collection) .execute(&self.pool) .await?; Ok(result.rows_affected()) } /// Handles cascade deletion based on record type. /// /// When certain records are deleted, related data should be cleaned up: /// - Lexicon deletion: removes all records of that collection type /// - Slice deletion: removes all records and actors for that slice /// /// # Arguments /// * `uri` - AT-URI of the deleted record /// * `collection` - Collection name (e.g., "network.slices.lexicon") pub async fn handle_cascade_deletion( &self, uri: &str, collection: &str, ) -> Result<(), DatabaseError> { match collection { "network.slices.lexicon" => { // Get the lexicon record to extract collection name and slice URI if let Ok(Some(lexicon_record)) = self.get_record(uri).await && let (Some(nsid), Some(slice_uri_from_record)) = ( lexicon_record.value.get("nsid").and_then(|v| v.as_str()), lexicon_record.value.get("slice").and_then(|v| v.as_str()), ) { // Delete all records of this collection type from the slice let deleted = self .delete_records_by_collection(slice_uri_from_record, nsid) .await?; tracing::info!( "Cascade delete: removed {} records of collection {} from slice {}", deleted, nsid, slice_uri_from_record ); } } "network.slices.slice" => { // The URI itself is the slice URI let slice_uri = uri; // Delete all records for this slice let records_deleted = self.delete_all_records_for_slice(slice_uri).await?; tracing::info!( "Cascade delete: removed {} records from slice {}", records_deleted, slice_uri ); // Delete all actors for this slice let actors_deleted = super::client::Database::delete_all_actors_for_slice(self, slice_uri).await?; tracing::info!( "Cascade delete: removed {} actors from slice {}", actors_deleted, slice_uri ); } _ => { // No cascade deletion needed for other collections } } Ok(()) } /// Inserts or updates a record atomically. /// /// # Returns /// true if inserted (new record), false if updated (existing record) pub async fn upsert_record(&self, record: &Record) -> Result { let result = sqlx::query_scalar::<_, bool>( r#" INSERT INTO record (uri, cid, did, collection, json, indexed_at, slice_uri) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT ON CONSTRAINT record_pkey DO UPDATE SET cid = EXCLUDED.cid, json = EXCLUDED.json, indexed_at = EXCLUDED.indexed_at RETURNING (xmax = 0) "#, ) .bind(&record.uri) .bind(&record.cid) .bind(&record.did) .bind(&record.collection) .bind(&record.json) .bind(record.indexed_at) .bind(&record.slice_uri) .fetch_one(&self.pool) .await?; Ok(result) } /// Gets lexicon definitions for a specific slice. /// /// Filters for network.slices.lexicon records and transforms them /// into the lexicon JSON format expected by the lexicon parser. pub async fn get_lexicons_by_slice( &self, slice_uri: &str, ) -> Result, DatabaseError> { let records = sqlx::query_as::<_, Record>( r#"SELECT "uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri" FROM "record" WHERE "collection" = 'network.slices.lexicon' AND "json"->>'slice' = $1 ORDER BY "indexed_at" DESC"#, ) .bind(slice_uri) .fetch_all(&self.pool) .await?; let lexicon_definitions: Vec = records .into_iter() .filter_map(|record| { let nsid = record.json.get("nsid")?.as_str()?; let definitions_str = record.json.get("definitions")?.as_str()?; let definitions: serde_json::Value = serde_json::from_str(definitions_str).ok()?; Some(serde_json::json!({ "lexicon": 1, "id": nsid, "defs": definitions })) }) .collect(); Ok(lexicon_definitions) } } /// Helper function to check if a field is a datetime field in the lexicon fn is_field_datetime(lexicons: &[serde_json::Value], collection: &str, field: &str) -> bool { for lexicon in lexicons { if let Some(id) = lexicon.get("id").and_then(|v| v.as_str()) && id == collection && let Some(defs) = lexicon.get("defs") && let Some(main) = defs.get("main") && let Some(record) = main.get("record") && let Some(properties) = record.get("properties") && let Some(field_def) = properties.get(field) && let Some(format) = field_def.get("format").and_then(|v| v.as_str()) { return format == "datetime"; } } false } #[cfg(test)] mod tests { use super::*; #[test] fn test_is_field_datetime_found() { let lexicons = vec![serde_json::json!({ "lexicon": 1, "id": "app.bsky.feed.post", "defs": { "main": { "record": { "properties": { "createdAt": { "type": "string", "format": "datetime" }, "text": { "type": "string" } } } } } })]; assert!(is_field_datetime( &lexicons, "app.bsky.feed.post", "createdAt" )); } #[test] fn test_is_field_datetime_not_datetime() { let lexicons = vec![serde_json::json!({ "lexicon": 1, "id": "app.bsky.feed.post", "defs": { "main": { "record": { "properties": { "text": { "type": "string" } } } } } })]; assert!(!is_field_datetime(&lexicons, "app.bsky.feed.post", "text")); } #[test] fn test_is_field_datetime_missing_field() { let lexicons = vec![serde_json::json!({ "lexicon": 1, "id": "app.bsky.feed.post", "defs": { "main": { "record": { "properties": { "text": { "type": "string" } } } } } })]; assert!(!is_field_datetime( &lexicons, "app.bsky.feed.post", "nonexistent" )); } #[test] fn test_is_field_datetime_wrong_collection() { let lexicons = vec![serde_json::json!({ "lexicon": 1, "id": "app.bsky.feed.post", "defs": { "main": { "record": { "properties": { "createdAt": { "type": "string", "format": "datetime" } } } } } })]; assert!(!is_field_datetime( &lexicons, "app.bsky.actor.profile", "createdAt" )); } #[test] fn test_is_field_datetime_multiple_lexicons() { let lexicons = vec![ serde_json::json!({ "lexicon": 1, "id": "app.bsky.feed.post", "defs": { "main": { "record": { "properties": { "text": { "type": "string" } } } } } }), serde_json::json!({ "lexicon": 1, "id": "app.bsky.actor.profile", "defs": { "main": { "record": { "properties": { "createdAt": { "type": "string", "format": "datetime" } } } } } }), ]; assert!(is_field_datetime( &lexicons, "app.bsky.actor.profile", "createdAt" )); assert!(!is_field_datetime(&lexicons, "app.bsky.feed.post", "text")); } }