//! Actor management operations. //! //! This module handles database operations for ATProto actors (users/DIDs) //! tracked within slices, including batch insertion, querying, and filtering. use super::client::Database; use super::types::{WhereClause, WhereCondition}; use crate::errors::DatabaseError; use crate::models::Actor; impl Database { /// Inserts multiple actors in batches with conflict resolution. /// /// Updates handle and indexed_at if an actor already exists for the /// (did, slice_uri) pair. pub async fn batch_insert_actors(&self, actors: &[Actor]) -> Result<(), DatabaseError> { if actors.is_empty() { return Ok(()); } let mut tx = self.pool.begin().await?; const CHUNK_SIZE: usize = 1000; for chunk in actors.chunks(CHUNK_SIZE) { for actor in chunk { sqlx::query!( r#"INSERT INTO "actor" ("did", "handle", "slice_uri", "indexed_at") VALUES ($1, $2, $3, $4) ON CONFLICT ("did", "slice_uri") DO UPDATE SET "handle" = EXCLUDED."handle", "indexed_at" = EXCLUDED."indexed_at""#, actor.did, actor.handle, actor.slice_uri, actor.indexed_at ) .execute(&mut *tx) .await?; } } tx.commit().await?; Ok(()) } /// Queries actors for a slice with advanced filtering and cursor-based pagination. /// /// Supports: /// - Complex WHERE conditions (AND/OR, eq/in/contains operators) /// - Cursor-based pagination /// /// # Returns /// Tuple of (actors, next_cursor) where cursor is the last DID pub async fn get_slice_actors( &self, slice_uri: &str, limit: Option, cursor: Option<&str>, where_clause: Option<&WhereClause>, ) -> Result<(Vec, Option), DatabaseError> { let limit = limit.unwrap_or(50).min(100); let mut where_clauses = vec![format!("slice_uri = $1")]; let mut param_count = 2; // Build WHERE conditions for actors (handle table columns properly) let (and_conditions, or_conditions) = build_actor_where_conditions(where_clause, &mut param_count); where_clauses.extend(and_conditions); if !or_conditions.is_empty() { let or_clause = format!("({})", or_conditions.join(" OR ")); where_clauses.push(or_clause); } // Add cursor condition if let Some(_cursor_did) = cursor { where_clauses.push(format!("did > ${}", param_count)); param_count += 1; } let where_sql = format!("WHERE {}", where_clauses.join(" AND ")); let query = format!( r#" SELECT did, handle, slice_uri, indexed_at FROM actor {} ORDER BY did ASC LIMIT ${} "#, where_sql, param_count ); let mut sqlx_query = sqlx::query_as::<_, Actor>(&query); // Bind parameters in order sqlx_query = sqlx_query.bind(slice_uri); // Bind WHERE clause parameters if let Some(clause) = where_clause { for condition in clause.conditions.values() { if let Some(eq_value) = &condition.eq { if let Some(str_val) = eq_value.as_str() { sqlx_query = sqlx_query.bind(str_val); } else { sqlx_query = sqlx_query.bind(eq_value); } } if let Some(in_values) = &condition.in_values { let str_values: Vec = in_values .iter() .filter_map(|v| v.as_str().map(|s| s.to_string())) .collect(); sqlx_query = sqlx_query.bind(str_values); } if let Some(contains_value) = &condition.contains { sqlx_query = sqlx_query.bind(contains_value); } } // Bind OR conditions if let Some(or_conditions) = &clause.or_conditions { for condition in or_conditions.values() { if let Some(eq_value) = &condition.eq { if let Some(str_val) = eq_value.as_str() { sqlx_query = sqlx_query.bind(str_val); } else { sqlx_query = sqlx_query.bind(eq_value); } } if let Some(in_values) = &condition.in_values { let str_values: Vec = in_values .iter() .filter_map(|v| v.as_str().map(|s| s.to_string())) .collect(); sqlx_query = sqlx_query.bind(str_values); } if let Some(contains_value) = &condition.contains { sqlx_query = sqlx_query.bind(contains_value); } } } } // Bind cursor parameter if let Some(cursor_did) = cursor { sqlx_query = sqlx_query.bind(cursor_did); } // Bind limit sqlx_query = sqlx_query.bind(limit as i64); let records = sqlx_query.fetch_all(&self.pool).await?; let cursor = if records.len() < limit as usize { None // Last page - no more results } else { records.last().map(|actor| actor.did.clone()) }; Ok((records, cursor)) } /// Gets all actors across all slices. /// /// # Returns /// Vector of (did, slice_uri) tuples pub async fn get_all_actors(&self) -> Result, DatabaseError> { let rows = sqlx::query!( r#" SELECT did, slice_uri FROM actor "# ) .fetch_all(&self.pool) .await?; Ok(rows .into_iter() .map(|row| (row.did, row.slice_uri)) .collect()) } /// Checks if an actor has any records in a slice. /// /// Used before actor deletion to maintain referential integrity. pub async fn actor_has_records( &self, did: &str, slice_uri: &str, ) -> Result { let count = sqlx::query!( r#" SELECT COUNT(*) as count FROM record WHERE did = $1 AND slice_uri = $2 "#, did, slice_uri ) .fetch_one(&self.pool) .await?; Ok(count.count.unwrap_or(0) > 0) } /// Deletes an actor from a specific slice. /// /// # Returns /// Number of rows affected pub async fn delete_actor(&self, did: &str, slice_uri: &str) -> Result { let result = sqlx::query!( r#" DELETE FROM actor WHERE did = $1 AND slice_uri = $2 "#, did, slice_uri ) .execute(&self.pool) .await?; Ok(result.rows_affected()) } /// Deletes all actors for a specific slice. /// /// This is a destructive operation that removes all tracked actors /// from the specified slice. Actors will be recreated when records /// are re-indexed during sync. /// /// # Arguments /// * `slice_uri` - AT-URI of the slice to clear /// /// # Returns /// Number of actors deleted pub async fn delete_all_actors_for_slice(&self, slice_uri: &str) -> Result { let result = sqlx::query!( r#" DELETE FROM actor WHERE slice_uri = $1 "#, slice_uri ) .execute(&self.pool) .await?; Ok(result.rows_affected()) } /// Resolves actor handles to DIDs for a specific slice. /// /// # Arguments /// * `handles` - List of handles to resolve /// * `slice_uri` - AT-URI of the slice /// /// # Returns /// Vec of DIDs corresponding to the handles pub async fn resolve_handles_to_dids( &self, handles: &[String], slice_uri: &str, ) -> Result, DatabaseError> { if handles.is_empty() { return Ok(Vec::new()); } let placeholders: Vec = (1..=handles.len()).map(|i| format!("${}", i)).collect(); let query_sql = format!( "SELECT DISTINCT did FROM actor WHERE handle = ANY(ARRAY[{}]) AND slice_uri = ${}", placeholders.join(", "), handles.len() + 1 ); let mut query = sqlx::query_scalar::<_, String>(&query_sql); for handle in handles { query = query.bind(handle); } query = query.bind(slice_uri); Ok(query.fetch_all(&self.pool).await?) } /// Resolves actor handles to DIDs using pattern matching (ILIKE). /// /// # Arguments /// * `pattern` - Handle pattern to search for (partial match) /// * `slice_uri` - AT-URI of the slice /// /// # Returns /// Vec of DIDs matching the handle pattern pub async fn resolve_handle_pattern_to_dids( &self, pattern: &str, slice_uri: &str, ) -> Result, DatabaseError> { let query_sql = "SELECT DISTINCT did FROM actor WHERE handle ILIKE '%' || $1 || '%' AND slice_uri = $2"; let dids = sqlx::query_scalar::<_, String>(query_sql) .bind(pattern) .bind(slice_uri) .fetch_all(&self.pool) .await?; Ok(dids) } /// Resolves actor handles to DIDs using fuzzy matching (trigram similarity). /// /// # Arguments /// * `pattern` - Handle pattern to fuzzy match /// * `slice_uri` - AT-URI of the slice /// /// # Returns /// Vec of DIDs with similar handles pub async fn resolve_handle_fuzzy_to_dids( &self, pattern: &str, slice_uri: &str, ) -> Result, DatabaseError> { let query_sql = "SELECT DISTINCT did FROM actor WHERE handle % $1 AND slice_uri = $2"; let dids = sqlx::query_scalar::<_, String>(query_sql) .bind(pattern) .bind(slice_uri) .fetch_all(&self.pool) .await?; Ok(dids) } } /// Builds WHERE conditions specifically for actor queries. /// /// Unlike the general query builder, this handles actor table columns directly /// rather than treating them as JSON paths. fn build_actor_where_conditions( where_clause: Option<&WhereClause>, param_count: &mut usize, ) -> (Vec, Vec) { let mut where_clauses = Vec::new(); let mut or_clauses = Vec::new(); if let Some(clause) = where_clause { for (field, condition) in &clause.conditions { let field_clause = build_actor_single_condition(field, condition, param_count); if !field_clause.is_empty() { where_clauses.push(field_clause); } } if let Some(or_conditions) = &clause.or_conditions { for (field, condition) in or_conditions { let field_clause = build_actor_single_condition(field, condition, param_count); if !field_clause.is_empty() { or_clauses.push(field_clause); } } } } (where_clauses, or_clauses) } /// Builds a single SQL condition clause for actor fields. fn build_actor_single_condition( field: &str, condition: &WhereCondition, param_count: &mut usize, ) -> String { if let Some(_eq_value) = &condition.eq { let clause = format!("{} = ${}", field, param_count); *param_count += 1; clause } else if let Some(_in_values) = &condition.in_values { let clause = format!("{} = ANY(${})", field, param_count); *param_count += 1; clause } else if let Some(_contains_value) = &condition.contains { let clause = format!("{} ILIKE '%' || ${} || '%'", field, param_count); *param_count += 1; clause } else { String::new() } }