Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

correct threadgate backfilling

needed to do a reorder of COPY to make sure everything was in before calculating gates

mia.omg.lol 87cfa87c c0f3cd76

verified
+119 -3
+6 -1
consumer/src/backfill/mod.rs
··· 275 275 follows: Vec<(String, String, DateTime<Utc>)>, 276 276 list_items: Vec<(String, records::AppBskyGraphListItem)>, 277 277 verifications: Vec<(String, Cid, records::AppBskyGraphVerification)>, 278 + threadgates: Vec<(String, Cid, records::AppBskyFeedThreadgate)>, // not COPY'd but needs to be kept until last. 278 279 records: Vec<(String, Cid)>, 279 280 } 280 281 281 282 impl CopyStore { 282 283 async fn submit(self, t: &mut Transaction<'_>, did: &str) -> Result<(), tokio_postgres::Error> { 283 284 db::copy::copy_likes(t, did, self.likes).await?; 284 - db::copy::copy_posts(t, did, self.posts).await?; 285 285 db::copy::copy_reposts(t, did, self.reposts).await?; 286 286 db::copy::copy_blocks(t, did, self.blocks).await?; 287 287 db::copy::copy_follows(t, did, self.follows).await?; 288 288 db::copy::copy_list_items(t, self.list_items).await?; 289 289 db::copy::copy_verification(t, did, self.verifications).await?; 290 + db::copy::copy_posts(t, did, self.posts).await?; 291 + for (at_uri, cid, record) in self.threadgates { 292 + db::threadgate_enforce_backfill(t, did, &record).await?; 293 + db::threadgate_upsert(t, &at_uri, cid, record).await?; 294 + } 290 295 db::copy::copy_records(t, did, self.records).await?; 291 296 292 297 Ok(())
+10
consumer/src/backfill/repo.rs
··· 4 4 }; 5 5 use crate::indexer::records; 6 6 use crate::indexer::types::{AggregateDeltaStore, RecordTypes}; 7 + use crate::utils::at_uri_is_by; 7 8 use crate::{db, indexer}; 8 9 use deadpool_postgres::Transaction; 9 10 use ipld_core::cid::Cid; ··· 165 166 copies 166 167 .reposts 167 168 .push((rkey.to_string(), rec.subject, rec.via, rec.created_at)); 169 + } 170 + RecordTypes::AppBskyFeedThreadgate(record) => { 171 + if !at_uri_is_by(&record.post, did) { 172 + tracing::warn!("tried to create a threadgate on a post we don't control!"); 173 + return Ok(()); 174 + } 175 + 176 + copies.push_record(&at_uri, cid); 177 + copies.threadgates.push((at_uri, cid, record)); 168 178 } 169 179 RecordTypes::AppBskyGraphBlock(rec) => { 170 180 copies.push_record(&at_uri, cid);
+103 -2
consumer/src/db/gates.rs
··· 1 1 use super::{PgExecResult, PgResult}; 2 2 use crate::indexer::records::{ 3 - THREADGATE_RULE_FOLLOWER, THREADGATE_RULE_FOLLOWING, THREADGATE_RULE_LIST, 4 - THREADGATE_RULE_MENTION, 3 + AppBskyFeedThreadgate, ThreadgateRule, THREADGATE_RULE_FOLLOWER, THREADGATE_RULE_FOLLOWING, 4 + THREADGATE_RULE_LIST, THREADGATE_RULE_MENTION, 5 5 }; 6 6 use chrono::prelude::*; 7 7 use chrono::{DateTime, Utc}; ··· 105 105 ) 106 106 .await 107 107 } 108 + 109 + // variant of post_enforce_threadgate that runs when backfilling to clean up any posts already in DB 110 + pub async fn threadgate_enforce_backfill<C: GenericClient>( 111 + conn: &mut C, 112 + root_author: &str, 113 + threadgate: &AppBskyFeedThreadgate, 114 + ) -> PgExecResult { 115 + // pull out allow - if it's None we can skip this gate. 116 + let Some(allow) = threadgate.allow.as_ref() else { 117 + return Ok(0); 118 + }; 119 + 120 + let root = &threadgate.post; 121 + 122 + if allow.is_empty() { 123 + // blind update everything 124 + return conn.execute( 125 + "UPDATE posts SET violates_threadgate=TRUE WHERE root_uri=$1 AND did != $2 AND created_at >= $3", 126 + &[&root, &root_author, &threadgate.created_at], 127 + ).await; 128 + } 129 + 130 + // pull authors with our root_uri where the author is not the root author and are dated after created_at 131 + // this is mutable because we'll remove ALLOWED dids 132 + let mut dids: HashSet<String> = conn 133 + .query( 134 + "SELECT DISTINCT did FROM posts WHERE root_uri=$1 AND did != $2 AND created_at >= $3", 135 + &[&root, &root_author, &threadgate.created_at], 136 + ) 137 + .await? 138 + .into_iter() 139 + .map(|row| row.get(0)) 140 + .collect(); 141 + 142 + // this will be empty if there are no replies. 143 + if dids.is_empty() { 144 + return Ok(0); 145 + } 146 + 147 + let allowed_lists = allow 148 + .iter() 149 + .filter_map(|rule| match rule { 150 + ThreadgateRule::List { list } => Some(list), 151 + _ => None, 152 + }) 153 + .collect::<Vec<_>>(); 154 + 155 + let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str())); 156 + 157 + if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() { 158 + let current_dids: Vec<_> = dids.iter().collect(); 159 + 160 + let res = conn.query( 161 + "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL", 162 + &[&root_author, &current_dids] 163 + ).await?; 164 + 165 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 166 + } 167 + 168 + if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() { 169 + let current_dids: Vec<_> = dids.iter().collect(); 170 + 171 + let res = conn.query( 172 + "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL", 173 + &[&root_author, &current_dids] 174 + ).await?; 175 + 176 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 177 + } 178 + 179 + if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() { 180 + let mentions: Vec<String> = conn 181 + .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 182 + .await? 183 + .map(|r| r.get(0)) 184 + .unwrap_or_default(); 185 + 186 + dids = &dids - &HashSet::from_iter(mentions); 187 + } 188 + 189 + if allow.contains(THREADGATE_RULE_LIST) && !dids.is_empty() { 190 + let current_dids: Vec<_> = dids.iter().collect(); 191 + 192 + let res = conn 193 + .query( 194 + "SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)", 195 + &[&allowed_lists, &current_dids], 196 + ) 197 + .await?; 198 + 199 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 200 + } 201 + 202 + let dids = dids.into_iter().collect::<Vec<_>>(); 203 + 204 + conn.execute( 205 + "UPDATE posts SET violates_threadgate=TRUE WHERE root_uri = $1 AND did = ANY($2) AND created_at >= $3", 206 + &[&threadgate.post, &dids, &threadgate.created_at] 207 + ).await 208 + }