//! Record storage and retrieval for the actor store. //! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/record/mod.rs //! blacksky-algorithms/rsky is licensed under the Apache License 2.0 //! //! Modified for SQLite backend use crate::models::actor_store::{Backlink, Record, RepoBlock}; use anyhow::{Result, bail}; use cidv10::Cid; use diesel::result::Error; use diesel::*; use futures::stream::{self, StreamExt}; use rsky_lexicon::com::atproto::admin::StatusAttr; use rsky_pds::actor_store::record::{GetRecord, RecordsForCollection}; use rsky_repo::storage::Ipld; use rsky_repo::types::{Ids, Lex, RepoRecord, WriteOpAction}; use rsky_repo::util::cbor_to_lex_record; use rsky_syntax::aturi::AtUri; use rsky_syntax::aturi_validation::ensure_valid_at_uri; use rsky_syntax::did::ensure_valid_did; use serde_json::Value as JsonValue; use std::env; use std::str::FromStr; // @NOTE in the future this can be replaced with a more generic routine that pulls backlinks based on lex docs. // For now, we just want to ensure we're tracking links from follows, blocks, likes, and reposts. pub fn get_backlinks(uri: &AtUri, record: &RepoRecord) -> Result> { if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(record_type)))) = record.get("$type") { if record_type == Ids::AppBskyGraphFollow.as_str() || record_type == Ids::AppBskyGraphBlock.as_str() { if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(subject)))) = record.get("subject") { match ensure_valid_did(uri) { Ok(_) => { return Ok(vec![Backlink { uri: uri.to_string(), path: "subject".to_owned(), link_to: subject.clone(), }]); } Err(e) => bail!("get_backlinks Error: invalid did {}", e), }; } } else if record_type == Ids::AppBskyFeedLike.as_str() || record_type == Ids::AppBskyFeedRepost.as_str() { if let Some(Lex::Map(ref_object)) = record.get("subject") { if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(subject_uri)))) = ref_object.get("uri") { match ensure_valid_at_uri(uri) { Ok(_) => { return Ok(vec![Backlink { uri: uri.to_string(), path: "subject.uri".to_owned(), link_to: subject_uri.clone(), }]); } Err(e) => bail!("get_backlinks Error: invalid AtUri {}", e), }; } } } } Ok(Vec::new()) } /// Combined handler for record operations with both read and write capabilities. pub(crate) struct RecordReader { /// Database connection. pub db: deadpool_diesel::Pool< deadpool_diesel::Manager, deadpool_diesel::sqlite::Object, >, /// DID of the actor. pub did: String, } impl RecordReader { /// Create a new record handler. pub(crate) const fn new( did: String, db: deadpool_diesel::Pool< deadpool_diesel::Manager, deadpool_diesel::sqlite::Object, >, ) -> Self { Self { did, db } } /// Count the total number of records. pub(crate) async fn record_count(&mut self) -> Result { use crate::schema::actor_store::record::dsl::*; let other_did = self.did.clone(); self.db .get() .await? .interact(move |conn| { let res: i64 = record.filter(did.eq(&other_did)).count().get_result(conn)?; Ok(res) }) .await .expect("Failed to count records") } /// List all collections in the repository. pub(crate) async fn list_collections(&self) -> Result> { use crate::schema::actor_store::record::dsl::*; let other_did = self.did.clone(); self.db .get() .await? .interact(move |conn| { let collections = record .filter(did.eq(&other_did)) .select(collection) .group_by(collection) .load::(conn)? .into_iter() .collect::>(); Ok(collections) }) .await .expect("Failed to list collections") } /// List records for a specific collection. pub(crate) async fn list_records_for_collection( &mut self, collection: String, limit: i64, reverse: bool, cursor: Option, rkey_start: Option, rkey_end: Option, include_soft_deleted: Option, ) -> Result> { use crate::schema::actor_store::record::dsl as RecordSchema; use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema; let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false); let mut builder = RecordSchema::record .inner_join(RepoBlockSchema::repo_block.on(RepoBlockSchema::cid.eq(RecordSchema::cid))) .limit(limit) .select((Record::as_select(), RepoBlock::as_select())) .filter(RecordSchema::did.eq(self.did.clone())) .filter(RecordSchema::collection.eq(collection)) .into_boxed(); if !include_soft_deleted { builder = builder.filter(RecordSchema::takedownRef.is_null()); } if reverse { builder = builder.order(RecordSchema::rkey.asc()); } else { builder = builder.order(RecordSchema::rkey.desc()); } if let Some(cursor) = cursor { if reverse { builder = builder.filter(RecordSchema::rkey.gt(cursor)); } else { builder = builder.filter(RecordSchema::rkey.lt(cursor)); } } else { if let Some(rkey_start) = rkey_start { builder = builder.filter(RecordSchema::rkey.gt(rkey_start)); } if let Some(rkey_end) = rkey_end { builder = builder.filter(RecordSchema::rkey.lt(rkey_end)); } } let res: Vec<(Record, RepoBlock)> = self .db .get() .await? .interact(move |conn| builder.load(conn)) .await .expect("Failed to load records")?; res.into_iter() .map(|row| { Ok(RecordsForCollection { uri: row.0.uri, cid: row.0.cid, value: cbor_to_lex_record(row.1.content)?, }) }) .collect::>>() } /// Get a specific record by URI. pub(crate) async fn get_record( &mut self, uri: &AtUri, cid: Option, include_soft_deleted: Option, ) -> Result> { use crate::schema::actor_store::record::dsl as RecordSchema; use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema; let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false); let mut builder = RecordSchema::record .inner_join(RepoBlockSchema::repo_block.on(RepoBlockSchema::cid.eq(RecordSchema::cid))) .select((Record::as_select(), RepoBlock::as_select())) .filter(RecordSchema::uri.eq(uri.to_string())) .into_boxed(); if !include_soft_deleted { builder = builder.filter(RecordSchema::takedownRef.is_null()); } if let Some(cid) = cid { builder = builder.filter(RecordSchema::cid.eq(cid)); } let record: Option<(Record, RepoBlock)> = self .db .get() .await? .interact(move |conn| builder.first(conn).optional()) .await .expect("Failed to load record")?; if let Some(record) = record { Ok(Some(GetRecord { uri: record.0.uri, cid: record.0.cid, value: cbor_to_lex_record(record.1.content)?, indexed_at: record.0.indexed_at, takedown_ref: record.0.takedown_ref, })) } else { Ok(None) } } /// Check if a record exists. pub(crate) async fn has_record( &mut self, uri: String, cid: Option, include_soft_deleted: Option, ) -> Result { use crate::schema::actor_store::record::dsl as RecordSchema; let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false); let mut builder = RecordSchema::record .select(RecordSchema::uri) .filter(RecordSchema::uri.eq(uri)) .into_boxed(); if !include_soft_deleted { builder = builder.filter(RecordSchema::takedownRef.is_null()); } if let Some(cid) = cid { builder = builder.filter(RecordSchema::cid.eq(cid)); } let record_uri = self .db .get() .await? .interact(move |conn| builder.first::(conn).optional()) .await .expect("Failed to check record")?; Ok(record_uri.is_some()) } /// Get the takedown status of a record. pub(crate) async fn get_record_takedown_status( &self, uri: String, ) -> Result> { use crate::schema::actor_store::record::dsl as RecordSchema; let res = self .db .get() .await? .interact(move |conn| { RecordSchema::record .select(RecordSchema::takedownRef) .filter(RecordSchema::uri.eq(uri)) .first::>(conn) .optional() }) .await .expect("Failed to get takedown status")?; res.map_or_else( || Ok(None), |res| { res.map_or_else( || { Ok(Some(StatusAttr { applied: false, r#ref: None, })) }, |takedown_ref| { Ok(Some(StatusAttr { applied: true, r#ref: Some(takedown_ref), })) }, ) }, ) } /// Get the current CID for a record URI. pub(crate) async fn get_current_record_cid(&self, uri: String) -> Result> { use crate::schema::actor_store::record::dsl as RecordSchema; let res = self .db .get() .await? .interact(move |conn| { RecordSchema::record .select(RecordSchema::cid) .filter(RecordSchema::uri.eq(uri)) .first::(conn) .optional() }) .await .expect("Failed to get current CID")?; if let Some(res) = res { Ok(Some(Cid::from_str(&res)?)) } else { Ok(None) } } /// Get backlinks for a record. pub(crate) async fn get_record_backlinks( &self, collection: String, path: String, link_to: String, ) -> Result> { use crate::schema::actor_store::backlink::dsl as BacklinkSchema; use crate::schema::actor_store::record::dsl as RecordSchema; let res = self .db .get() .await? .interact(move |conn| { RecordSchema::record .inner_join( BacklinkSchema::backlink.on(BacklinkSchema::uri.eq(RecordSchema::uri)), ) .select(Record::as_select()) .filter(BacklinkSchema::path.eq(path)) .filter(BacklinkSchema::linkTo.eq(link_to)) .filter(RecordSchema::collection.eq(collection)) .load::(conn) }) .await .expect("Failed to get backlinks")?; Ok(res) } /// Get backlink conflicts for a record. pub(crate) async fn get_backlink_conflicts( &self, uri: &AtUri, record: &RepoRecord, ) -> Result> { let record_backlinks = get_backlinks(uri, record)?; let conflicts: Vec> = stream::iter(record_backlinks) .then(|backlink| async move { Ok::, anyhow::Error>( self.get_record_backlinks( uri.get_collection(), backlink.path, backlink.link_to, ) .await?, ) }) .collect::>() .await .into_iter() .collect::, _>>()?; Ok(conflicts .into_iter() .flatten() .filter_map(|record| { AtUri::make( env::var("BLUEPDS_HOST_NAME").unwrap_or("localhost".to_owned()), Some(String::from(uri.get_collection())), Some(record.rkey), ) .ok() }) .collect::>()) } // Transactor methods // ----------------- /// Index a record in the database. #[tracing::instrument(skip_all)] pub(crate) async fn index_record( &self, uri: AtUri, cid: Cid, record: Option, action: Option, // Create or update with a default of create repo_rev: String, timestamp: Option, ) -> Result<()> { tracing::debug!("@LOG DEBUG RecordReader::index_record, indexing record {uri}"); let collection = uri.get_collection(); let rkey = uri.get_rkey(); let hostname = uri.get_hostname().to_string(); let action = action.unwrap_or(WriteOpAction::Create); let indexed_at = timestamp.unwrap_or_else(rsky_common::now); let row = Record { did: self.did.clone(), uri: uri.to_string(), cid: cid.to_string(), collection: collection.clone(), rkey: rkey.to_string(), repo_rev: Some(repo_rev.clone()), indexed_at: indexed_at.clone(), takedown_ref: None, }; if !hostname.starts_with("did:") { bail!("Expected indexed URI to contain DID") } else if collection.is_empty() { bail!("Expected indexed URI to contain a collection") } else if rkey.is_empty() { bail!("Expected indexed URI to contain a record key") } use crate::schema::actor_store::record::dsl as RecordSchema; // Track current version of record let (record, uri) = self .db .get() .await? .interact(move |conn| { _ = insert_into(RecordSchema::record) .values(row) .on_conflict(RecordSchema::uri) .do_update() .set(( RecordSchema::cid.eq(cid.to_string()), RecordSchema::repoRev.eq(&repo_rev), RecordSchema::indexedAt.eq(&indexed_at), )) .execute(conn)?; Ok::<_, Error>((record, uri)) }) .await .expect("Failed to index record")?; if let Some(record) = record { // Maintain backlinks let backlinks = get_backlinks(&uri, &record)?; if action == WriteOpAction::Update { // On update just recreate backlinks from scratch for the record, so we can clear out // the old ones. E.g. for weird cases like updating a follow to be for a different did. self.remove_backlinks_by_uri(&uri).await?; } self.add_backlinks(backlinks).await?; } tracing::debug!("@LOG DEBUG RecordReader::index_record, indexed record {uri}"); Ok(()) } /// Delete a record from the database. #[tracing::instrument(skip_all)] pub(crate) async fn delete_record(&self, uri: &AtUri) -> Result<()> { tracing::debug!("@LOG DEBUG RecordReader::delete_record, deleting indexed record {uri}"); use crate::schema::actor_store::backlink::dsl as BacklinkSchema; use crate::schema::actor_store::record::dsl as RecordSchema; let uri = uri.to_string(); self.db .get() .await? .interact(move |conn| { _ = delete(RecordSchema::record) .filter(RecordSchema::uri.eq(&uri)) .execute(conn)?; _ = delete(BacklinkSchema::backlink) .filter(BacklinkSchema::uri.eq(&uri)) .execute(conn)?; tracing::debug!( "@LOG DEBUG RecordReader::delete_record, deleted indexed record {uri}" ); Ok(()) }) .await .expect("Failed to delete record") } /// Remove backlinks for a URI. pub(crate) async fn remove_backlinks_by_uri(&self, uri: &AtUri) -> Result<()> { use crate::schema::actor_store::backlink::dsl as BacklinkSchema; let uri = uri.to_string(); self.db .get() .await? .interact(move |conn| { _ = delete(BacklinkSchema::backlink) .filter(BacklinkSchema::uri.eq(uri)) .execute(conn)?; Ok(()) }) .await .expect("Failed to remove backlinks") } /// Add backlinks to the database. pub(crate) async fn add_backlinks(&self, backlinks: Vec) -> Result<()> { if backlinks.is_empty() { Ok(()) } else { use crate::schema::actor_store::backlink::dsl as BacklinkSchema; self.db .get() .await? .interact(move |conn| { _ = insert_or_ignore_into(BacklinkSchema::backlink) .values(&backlinks) .execute(conn)?; Ok(()) }) .await .expect("Failed to add backlinks") } } /// Update the takedown status of a record. pub(crate) async fn update_record_takedown_status( &self, uri: &AtUri, takedown: StatusAttr, ) -> Result<()> { use crate::schema::actor_store::record::dsl as RecordSchema; let takedown_ref: Option = match takedown.applied { true => takedown .r#ref .map_or_else(|| Some(rsky_common::now()), Some), false => None, }; let uri_string = uri.to_string(); self.db .get() .await? .interact(move |conn| { _ = update(RecordSchema::record) .filter(RecordSchema::uri.eq(uri_string)) .set(RecordSchema::takedownRef.eq(takedown_ref)) .execute(conn)?; Ok(()) }) .await .expect("Failed to update takedown status") } }