A PLC Mirror written in Rust
at main 157 lines 4.7 kB view raw
1use crate::types::{PlcEntry, PlcOperationType}; 2use deadpool_postgres::{Object, Pool}; 3use reqwest::Client; 4use slog::{Logger, debug, error, info}; 5use std::env::var; 6use std::time::{Duration, Instant}; 7use tokio::sync::OnceCell; 8use tokio_postgres::Statement; 9use tokio_postgres::types::Type; 10 11// PLC rate limit is 100req/min - we aim a little lower 12const SLEEP_TIME_F: f32 = (60. / 90.) * 1000.; 13const SLEEP_TIME: Duration = Duration::from_millis(SLEEP_TIME_F as u64); 14 15static INSERT_DID_STMT: OnceCell<Statement> = OnceCell::const_new(); 16static INSERT_OP_STMT: OnceCell<Statement> = OnceCell::const_new(); 17 18pub async fn importer(log: Logger, pool: Pool, start_after: Option<String>) { 19 let plc_main = var("PLC_UPSTREAM").unwrap_or("https://plc.directory".to_string()); 20 let user_agent = var("PLC_USER_AGENT").unwrap_or("parakeet-plc".to_string()); 21 22 let mut after = start_after; 23 24 if let Some(after) = &after { 25 info!(log, "starting import at {after}"); 26 } 27 28 let client = Client::builder().user_agent(user_agent).build().unwrap(); 29 let conn = pool.get().await.unwrap(); 30 31 loop { 32 let start = Instant::now(); 33 34 match upstream_export(&client, &plc_main, after.clone()).await { 35 Ok((entries, errors)) => { 36 debug!( 37 log, 38 "exported {} entries, {} failed", 39 entries.len(), 40 errors.len() 41 ); 42 43 if !errors.is_empty() { 44 error!(log, "{} entries failed to decode", errors.len()); 45 } 46 47 for entry in &entries { 48 if let Err(e) = insert_did_and_op(&conn, entry).await { 49 error!(log, "failed to insert operation for did {}: {e}", entry.did); 50 } 51 } 52 53 if let Some(new_after) = entries.last() { 54 let new_after = new_after.created_at; 55 after = Some(new_after.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)); 56 } 57 } 58 Err(err) => error!(log, "failed to export PLC directory: {err}"), 59 } 60 61 let end = start.elapsed(); 62 tokio::time::sleep(SLEEP_TIME.saturating_sub(end)).await; 63 } 64} 65 66async fn upstream_export( 67 client: &Client, 68 plc: &str, 69 after: Option<String>, 70) -> eyre::Result<(Vec<PlcEntry>, Vec<serde_json::Error>)> { 71 let mut uri = format!("{plc}/export?count=1000"); 72 if let Some(after) = after { 73 uri += format!("&after={}", after).as_str(); 74 } 75 76 let export_res = client.get(uri).send().await?; 77 78 let export = export_res.text().await?; 79 let (exports, errors) = export 80 .lines() 81 .map(serde_json::from_str) 82 .fold((Vec::new(), Vec::new()), fold_exports); 83 84 Ok((exports, errors)) 85} 86 87fn fold_exports<A, B>( 88 (mut oks, mut errors): (Vec<A>, Vec<B>), 89 current: Result<A, B>, 90) -> (Vec<A>, Vec<B>) { 91 match current { 92 Ok(data) => oks.push(data), 93 Err(err) => errors.push(err), 94 }; 95 96 (oks, errors) 97} 98 99async fn insert_did_and_op(conn: &Object, entry: &PlcEntry) -> eyre::Result<()> { 100 let insert_did_stmt = INSERT_DID_STMT 101 .get_or_init(|| init_insert_did_stmt(conn)) 102 .await; 103 let insert_op_stmt = INSERT_OP_STMT 104 .get_or_init(|| init_insert_op_stmt(conn)) 105 .await; 106 107 let operation = serde_json::to_value(&entry.operation)?; 108 109 let (prev, sig) = match &entry.operation { 110 PlcOperationType::Create(op) => (op.prev.map(|v| v.to_string()), &op.sig), 111 PlcOperationType::Tombstone(op) => (op.prev.map(|v| v.to_string()), &op.sig), 112 PlcOperationType::Operation(op) => (op.prev.map(|v| v.to_string()), &op.sig), 113 }; 114 115 conn.execute(insert_did_stmt, &[&entry.did]).await?; 116 conn.execute( 117 insert_op_stmt, 118 &[ 119 &entry.did, 120 &entry.cid.to_string(), 121 &prev, 122 &sig, 123 &entry.nullified, 124 &operation, 125 &entry.created_at, 126 ], 127 ) 128 .await?; 129 130 Ok(()) 131} 132 133async fn init_insert_did_stmt(conn: &Object) -> Statement { 134 conn.prepare_typed( 135 "INSERT INTO dids (did) VALUES ($1) ON CONFLICT DO NOTHING", 136 &[Type::TEXT], 137 ) 138 .await 139 .unwrap() 140} 141 142async fn init_insert_op_stmt(conn: &Object) -> Statement { 143 conn.prepare_typed( 144 include_str!("sql/insert_op.sql"), 145 &[ 146 Type::TEXT, 147 Type::TEXT, 148 Type::TEXT, 149 Type::TEXT, 150 Type::BOOL, 151 Type::JSONB, 152 Type::TIMESTAMPTZ, 153 ], 154 ) 155 .await 156 .unwrap() 157}