this repo has no description

ingest new events to database

vielle.dev 29d5fa5e 548d6489

verified
+313 -3
+196
src/ingest/ingest.rs
··· 1 + use std::{collections::VecDeque, sync::Arc}; 2 + 3 + use futures_util::future; 4 + use ipld_core::ipld::Ipld; 5 + use jacquard::{ 6 + api::com_atproto::sync::subscribe_repos::{Commit, SubscribeReposMessage, Sync}, 7 + types::string::Handle, 8 + }; 9 + use jacquard_repo::{BlockStore, MemoryBlockStore}; 10 + use sqlx::{Pool, Postgres, query}; 11 + use thiserror::Error; 12 + use tokio::{sync::Mutex, task::JoinHandle}; 13 + 14 + use crate::{backfill::backfill, utils::ipld_json::ipld_to_json_value}; 15 + 16 + trait Ingest { 17 + type Error; 18 + async fn ingest(&self, conn: Arc<Pool<Postgres>>) -> Result<(), Self::Error>; 19 + } 20 + 21 + #[derive(Debug, Error)] 22 + enum CommitError { 23 + #[error("Error parsing #commit event: {}", .0)] 24 + ParseCarBytes(#[from] jacquard_repo::RepoError), 25 + } 26 + 27 + impl Ingest for Commit<'_> { 28 + type Error = CommitError; 29 + async fn ingest(&self, conn: Arc<Pool<Postgres>>) -> Result<(), Self::Error> { 30 + let car = jacquard_repo::car::parse_car_bytes(&self.blocks).await?; 31 + let storage = Arc::new(MemoryBlockStore::new_from_blocks(car.blocks)); 32 + 33 + let ops = future::join_all(self.ops.clone().into_iter().map(|op| async { 34 + // get block data by cid, or None if errors/not found 35 + if let Some(cid) = &op.cid { 36 + if let Ok(cid) = cid.0.to_ipld() 37 + && let Ok(contents) = storage.get(&cid).await 38 + && let Some(contents) = contents 39 + && let Ok(val) = serde_ipld_dagcbor::from_slice::<Ipld>(&contents) 40 + { 41 + (op, ipld_to_json_value(&val).ok()) 42 + } else { 43 + (op, None) 44 + } 45 + } else { 46 + (op, None) 47 + } 48 + })) 49 + .await; 50 + 51 + future::join_all(ops.into_iter().map(|(op, val)| async { 52 + let mut path = op.path.split("/"); 53 + let Some(collection) = path.next() else { 54 + eprintln!("Invalid path ({})", op.path.as_str()); 55 + return; 56 + }; 57 + let Some(rkey) = path.next() else { 58 + eprintln!("Invalid path ({})", op.path.as_str()); 59 + return; 60 + }; 61 + // assert the path is only collection/rkey 62 + if path.next().is_some() { 63 + eprintln!("Invalid path ({})", op.path.as_str()); 64 + return; 65 + }; 66 + match op.action.clone().as_str() { 67 + "create" => { 68 + let Some(cid) = op.cid.map(|x| x.0.to_string()) else { 69 + eprintln!("Missing cid for create {}/{}", collection, rkey); 70 + return; 71 + }; 72 + let Some(val) = val else { 73 + eprintln!("Missing value for create {}/{}/{}", collection, rkey, cid); 74 + return; 75 + }; 76 + if let Err(err) = query!( 77 + "INSERT INTO records (collection, rkey, cid, record) 78 + VALUES ($1, $2, $3, $4)", 79 + collection, 80 + rkey, 81 + cid, 82 + val 83 + ) 84 + .execute(&*conn) 85 + .await 86 + { 87 + eprintln!("Error creating {}/{}/{}\n{}", collection, rkey, cid, err); 88 + } else { 89 + println!("wrote {}/{}/{} successfully.", collection, rkey, cid); 90 + }; 91 + } 92 + "update" => { 93 + let Some(cid) = op.cid.map(|x| x.0.to_string()) else { 94 + eprintln!("Missing cid for update {}/{}", collection, rkey); 95 + return; 96 + }; 97 + let Some(val) = val else { 98 + eprintln!("Missing value for update {}/{}/{}", collection, rkey, cid); 99 + return; 100 + }; 101 + if let Err(err) = query!( 102 + "UPDATE records SET 103 + collection = $1, 104 + rkey = $2, 105 + cid = $3, 106 + record = $4 107 + WHERE 108 + collection = $1 109 + and rkey = $2", 110 + collection, 111 + rkey, 112 + cid, 113 + val 114 + ) 115 + .execute(&*conn) 116 + .await 117 + { 118 + eprintln!("Error updating {}/{}/{}\n{}", collection, rkey, cid, err); 119 + } else { 120 + println!("updated {}/{}/{} successfully.", collection, rkey, cid); 121 + }; 122 + } 123 + "delete" => { 124 + if let Err(err) = query!( 125 + "DELETE FROM records WHERE 126 + collection = $1 127 + and rkey = $2", 128 + collection, 129 + rkey, 130 + ) 131 + .execute(&*conn) 132 + .await 133 + { 134 + eprintln!("Error deleting {}/{}\n{}", collection, rkey, err); 135 + } else { 136 + println!("deleted {}/{} successfully.", collection, rkey); 137 + }; 138 + } 139 + _ => { 140 + println!("missing #{} {:#?} {:#?}", op.action.as_str(), op, val) 141 + } 142 + } 143 + })) 144 + .await; 145 + 146 + Ok(()) 147 + } 148 + } 149 + 150 + impl Ingest for Sync<'_> { 151 + type Error = crate::backfill::Error; 152 + async fn ingest(&self, conn: Arc<Pool<Postgres>>) -> Result<(), Self::Error> { 153 + backfill(conn, None).await 154 + } 155 + } 156 + 157 + pub fn ingest( 158 + queue: Arc<Mutex<VecDeque<SubscribeReposMessage<'static>>>>, 159 + conn: Arc<Pool<Postgres>>, 160 + ) -> JoinHandle<()> { 161 + tokio::spawn(async move { 162 + loop { 163 + let Some(next) = queue.lock().await.pop_front() else { 164 + continue; 165 + }; 166 + 167 + match next { 168 + SubscribeReposMessage::Commit(commit) => { 169 + commit.ingest(conn.clone()).await.unwrap_or_else(|err| { 170 + eprintln!("error handling #commit({}): {:?}", commit.clone().rev, err) 171 + }) 172 + } 173 + SubscribeReposMessage::Sync(sync) => { 174 + sync.ingest(conn.clone()).await.unwrap_or_else(|err| { 175 + eprintln!("error handling #sync({}): {:?}", sync.clone().rev, err) 176 + }) 177 + } 178 + SubscribeReposMessage::Identity(identity) => println!( 179 + "ignoring #identity({}) event. has user migrated?", 180 + identity.handle.unwrap_or(Handle::raw("handle.invalid")) 181 + ), 182 + SubscribeReposMessage::Account(account) => println!( 183 + "ignoring #account({} {}) event. has user deactivated?", 184 + account.active, 185 + account.status.unwrap_or("unknown".into()) 186 + ), 187 + SubscribeReposMessage::Info(info) => { 188 + println!("ignoring #info({}) event", info.name) 189 + } 190 + SubscribeReposMessage::Unknown(_) => { 191 + println!("ignoring unknown event. is meview outdated?") 192 + } 193 + }; 194 + } 195 + }) 196 + }
+4
src/ingest/mod.rs
··· 1 + pub mod ingest; 2 + pub mod queue; 3 + pub use self::ingest::ingest; 4 + pub use self::queue::queue;
+94
src/ingest/queue.rs
··· 1 + use std::{collections::VecDeque, sync::Arc}; 2 + 3 + use futures_util::stream::StreamExt; 4 + use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 5 + use jacquard::url::Url; 6 + use jacquard::{common::xrpc::TungsteniteSubscriptionClient, xrpc::SubscriptionClient}; 7 + use tokio::{ 8 + sync::Mutex, 9 + task::{self, JoinHandle}, 10 + }; 11 + 12 + use crate::config; 13 + 14 + pub async fn queue() -> ( 15 + Arc<Mutex<VecDeque<SubscribeReposMessage<'static>>>>, 16 + JoinHandle<()>, 17 + ) { 18 + let queue = Arc::new(Mutex::new(VecDeque::new())); 19 + 20 + // USER_SUBSCRIBE_URL is formatted as a domain 21 + let uri = Url::parse(&format!("wss://{}/", config::USER_SUBSCRIBE_URL)) 22 + .expect("Env var USER_SUBSCRIBE_URL should be formated as a domain."); 23 + let client = TungsteniteSubscriptionClient::from_base_uri(uri); 24 + let (_sink, mut messages) = client 25 + .subscribe(&SubscribeRepos::new().build()) 26 + .await 27 + .expect("Could not subscribe to new events") 28 + .into_stream(); 29 + 30 + let queue_clone = queue.clone(); 31 + let handle = task::spawn(async move { 32 + let queue = queue_clone; 33 + 34 + loop { 35 + if let Some(msg) = messages.next().await { 36 + let msg = match msg { 37 + Ok(val) => val, 38 + Err(err) => { 39 + eprintln!("Warning: Websocket error: {} ({:?})", err, err.source()); 40 + continue; 41 + } 42 + }; 43 + 44 + // filter messages by user did 45 + // note that #identity #account #info and #unknown will probably be ignored 46 + let ev = match msg.clone() { 47 + SubscribeReposMessage::Commit(commit) => { 48 + if commit.repo != *config::USER_DID { 49 + continue; 50 + } else { 51 + SubscribeReposMessage::Commit(commit) 52 + } 53 + } 54 + SubscribeReposMessage::Sync(sync) => { 55 + if sync.did != *config::USER_DID { 56 + continue; 57 + } else { 58 + SubscribeReposMessage::Sync(sync) 59 + } 60 + } 61 + 62 + SubscribeReposMessage::Identity(identity) => { 63 + if identity.did != *config::USER_DID { 64 + continue; 65 + } else { 66 + eprintln!( 67 + "Warning: Recieved #identity event. Configuration may be out of date" 68 + ); 69 + SubscribeReposMessage::Identity(identity) 70 + } 71 + } 72 + SubscribeReposMessage::Account(account) => { 73 + if account.did != *config::USER_DID { 74 + continue; 75 + } else { 76 + eprintln!( 77 + "Warning: Recieved #account event. Account active: `{}`. Account status: `{}`", 78 + account.active, 79 + account.status.clone().unwrap_or("Unknown".into()) 80 + ); 81 + SubscribeReposMessage::Account(account) 82 + } 83 + } 84 + SubscribeReposMessage::Info(info) => SubscribeReposMessage::Info(info), 85 + SubscribeReposMessage::Unknown(data) => SubscribeReposMessage::Unknown(data), 86 + }; 87 + 88 + queue.lock().await.push_back(ev); 89 + } 90 + } 91 + }); 92 + 93 + (queue, handle) 94 + }
+19 -3
src/main.rs
··· 1 - use sqlx::{Pool, Postgres}; 2 - 3 1 use crate::backfill::backfill; 4 2 5 3 mod backfill; 6 4 mod config; 7 5 mod db; 6 + mod ingest; 8 7 mod utils; 9 8 10 9 #[derive(Debug)] ··· 33 32 config::USER_SUBSCRIBE_URL.get().await, 34 33 config::DATABASE_URL.get().await 35 34 ); 36 - let conn: Pool<Postgres> = db::conn().await; 35 + let conn = db::conn().await; 37 36 println!("Database connected and initialized"); 37 + 38 + let (queue, queue_handle) = ingest::queue().await; 38 39 39 40 println!("Starting backfill"); 40 41 let timer = std::time::Instant::now(); ··· 45 46 println!("Backfill complete. Took {:?}", timer.elapsed()); 46 47 47 48 println!("Completed sucessfully!"); 49 + 50 + // Set up Ctrl-C handler 51 + let (tx, rx) = tokio::sync::oneshot::channel(); 52 + tokio::spawn(async move { 53 + tokio::signal::ctrl_c().await.ok(); 54 + let _ = tx.send(()); 55 + }); 56 + 57 + println!("Handling new events. Ctrl+C to quit."); 58 + let ingest_handle = ingest::ingest(queue, conn.clone()); 59 + 60 + let _ = rx.await; 61 + queue_handle.abort(); 62 + ingest_handle.abort(); 63 + 48 64 Ok(()) 49 65 }