Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

fix(consumer): tighten up error handling

mia.omg.lol 53a1abe4 05b9afbb

verified
+51 -46
+3 -1
consumer/src/backfill/repo.rs
··· 96 96 } 97 97 } 98 98 99 - let commit = commit.unwrap(); 99 + let Some(commit) = commit else { 100 + eyre::bail!("repo contained no commit?"); 101 + }; 100 102 101 103 Ok((commit, deltas, copies)) 102 104 }
+3 -3
consumer/src/db/actor.rs
··· 69 69 ) 70 70 .await?; 71 71 72 - Ok(res.map(|v| (v.get(0), v.get(1)))) 72 + res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 73 73 } 74 74 75 75 pub async fn actor_get_repo_status<C: GenericClient>( ··· 83 83 ) 84 84 .await?; 85 85 86 - Ok(res.map(|v| (v.get(0), v.get(1)))) 86 + res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 87 87 } 88 88 89 89 pub async fn actor_get_statuses<C: GenericClient>( ··· 97 97 ) 98 98 .await?; 99 99 100 - Ok(res.map(|v| (v.get(0), v.get(1)))) 100 + res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 101 101 }
+10 -9
consumer/src/db/backfill.rs
··· 51 51 ) 52 52 .await?; 53 53 54 - Ok(res 55 - .into_iter() 56 - .map(|row| BackfillRow { 57 - repo: row.get(0), 58 - repo_ver: row.get(1), 59 - cid: row.get(2), 60 - data: row.get(3), 61 - indexed_at: row.get(4), 54 + res.into_iter() 55 + .map(|row| { 56 + Ok(BackfillRow { 57 + repo: row.try_get(0)?, 58 + repo_ver: row.try_get(1)?, 59 + cid: row.try_get(2)?, 60 + data: row.try_get(3)?, 61 + indexed_at: row.try_get(4)?, 62 + }) 62 63 }) 63 - .collect()) 64 + .collect() 64 65 } 65 66 66 67 pub async fn backfill_delete_rows<C: GenericClient>(conn: &mut C, repo: &str) -> PgExecResult {
+1 -1
consumer/src/db/copy.rs
··· 205 205 ) 206 206 .await? 207 207 .into_iter() 208 - .map(|v| (v.get(0), v.get(1), v.get(2))).collect(); 208 + .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).collect::<Result<_, _>>()?; 209 209 210 210 for (root, post, created_at) in threadgated { 211 211 match super::post_enforce_threadgate(conn, &root, did, created_at, true).await {
+17 -12
consumer/src/db/gates.rs
··· 47 47 &[&root_author, &post_author], 48 48 ) 49 49 .await? 50 - .map(|v| (v.get(0), v.get(1))); 50 + .map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()?; 51 51 52 52 if let Some((following, followed)) = profile_state { 53 53 if allow.contains(THREADGATE_RULE_FOLLOWER) && followed { ··· 65 65 let mentions: Vec<String> = conn 66 66 .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 67 67 .await? 68 - .map(|r| r.get(0)) 68 + .map(|r| r.try_get(0)) 69 + .transpose()? 69 70 .unwrap_or_default(); 70 71 71 72 if mentions.contains(&post_author.to_owned()) { ··· 84 85 &[&allow_lists, &post_author], 85 86 ) 86 87 .await? 87 - .get(0); 88 + .try_get(0)?; 88 89 if count != 0 { 89 90 return Ok(false); 90 91 } ··· 136 137 ) 137 138 .await? 138 139 .into_iter() 139 - .map(|row| row.get(0)) 140 - .collect(); 140 + .map(|row| row.try_get(0)) 141 + .collect::<Result<_, _>>()?; 141 142 142 143 // this will be empty if there are no replies. 143 144 if dids.is_empty() { ··· 160 161 let res = conn.query( 161 162 "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL", 162 163 &[&root_author, &current_dids] 163 - ).await?; 164 + ).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<HashSet<_>, _>>()?; 164 165 165 - dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 166 + dids = &dids - &res; 166 167 } 167 168 168 169 if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() { ··· 171 172 let res = conn.query( 172 173 "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL", 173 174 &[&root_author, &current_dids] 174 - ).await?; 175 + ).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<_, _>>()?; 175 176 176 - dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 177 + dids = &dids - &res; 177 178 } 178 179 179 180 if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() { 180 181 let mentions: Vec<String> = conn 181 182 .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 182 183 .await? 183 - .map(|r| r.get(0)) 184 + .map(|r| r.try_get(0)) 185 + .transpose()? 184 186 .unwrap_or_default(); 185 187 186 188 dids = &dids - &HashSet::from_iter(mentions); ··· 194 196 "SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)", 195 197 &[&allowed_lists, &current_dids], 196 198 ) 197 - .await?; 199 + .await? 200 + .into_iter() 201 + .map(|row| row.try_get(0)) 202 + .collect::<Result<_, _>>()?; 198 203 199 - dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 204 + dids = &dids - &res; 200 205 } 201 206 202 207 let dids = dids.into_iter().collect::<Vec<_>>();
+17 -20
consumer/src/db/record.rs
··· 127 127 ], 128 128 ) 129 129 .await 130 - .map(|r| r.get::<_, i32>(0) == 0) 130 + .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 131 131 } 132 132 133 133 pub async fn feedgen_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 159 159 ) 160 160 .await?; 161 161 162 - Ok(res.map(|v| v.get(0))) 162 + res.map(|v| v.try_get(0)).transpose() 163 163 } 164 164 165 165 pub async fn labeler_upsert<C: GenericClient>( ··· 224 224 ) 225 225 .await?; 226 226 227 - Ok(res.map(|v| v.get(0))) 227 + res.map(|v| v.try_get(0)).transpose() 228 228 } 229 229 230 230 pub async fn list_upsert<C: GenericClient>( ··· 255 255 ], 256 256 ) 257 257 .await 258 - .map(|r| r.get::<_, i32>(0) == 0) 258 + .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 259 259 } 260 260 261 261 pub async fn list_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 391 391 ) 392 392 .await?; 393 393 394 - Ok(res.map(|row| (row.get(0), row.get(1)))) 394 + res.map(|row| Ok((row.try_get(0)?, row.try_get(1)?))) 395 + .transpose() 395 396 } 396 397 397 398 pub async fn post_embed_insert<C: GenericClient>( ··· 536 537 conn: &mut C, 537 538 post: &str, 538 539 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 539 - let res = conn 540 - .query_opt( 541 - "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1", 542 - &[&post], 543 - ) 544 - .await? 545 - .map(|v| (v.get(0), v.get(1), v.get(2))); 546 - 547 - Ok(res) 540 + conn.query_opt( 541 + "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1", 542 + &[&post], 543 + ) 544 + .await? 545 + .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))) 546 + .transpose() 548 547 } 549 548 550 549 pub async fn postgate_upsert<C: GenericClient>( ··· 651 650 ) 652 651 .await?; 653 652 654 - Ok(res.map(|v| v.get(0))) 653 + res.map(|v| v.try_get(0)).transpose() 655 654 } 656 655 657 656 pub async fn starter_pack_upsert<C: GenericClient>( ··· 686 685 ], 687 686 ) 688 687 .await 689 - .map(|r| r.get::<_, i32>(0) == 0) 688 + .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 690 689 } 691 690 692 691 pub async fn starter_pack_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 731 730 conn: &mut C, 732 731 post: &str, 733 732 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 734 - let res = conn 733 + conn 735 734 .query_opt( 736 735 "SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL", 737 736 &[&post], 738 737 ) 739 738 .await? 740 - .map(|v| (v.get(0), v.get(1), v.get(2))); 741 - 742 - Ok(res) 739 + .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).transpose() 743 740 } 744 741 745 742 pub async fn threadgate_upsert<C: GenericClient>(