A PLC Mirror written in Rust

automatically resume from the latest TS in the DB

+41 -5
+11
src/db.rs
··· 54 54 55 55 Ok(ops.into_iter().map(Operation::from).collect()) 56 56 } 57 + 58 + pub async fn get_last_operation_ts( 59 + conn: &Object, 60 + ) -> Result<Option<DateTime<Utc>>, tokio_postgres::Error> { 61 + conn.query_opt( 62 + "SELECT created_at FROM operations ORDER BY created_at DESC LIMIT 1", 63 + &[], 64 + ) 65 + .await 66 + .map(|v| v.map(|row| row.get(0))) 67 + }
+7 -3
src/import.rs
··· 1 1 use crate::types::{PlcEntry, PlcOperationType}; 2 2 use deadpool_postgres::{Object, Pool}; 3 3 use reqwest::Client; 4 - use slog::{Logger, debug, error}; 4 + use slog::{Logger, debug, error, info}; 5 5 use std::env::var; 6 6 use std::time::{Duration, Instant}; 7 7 use tokio::sync::OnceCell; ··· 15 15 static INSERT_DID_STMT: OnceCell<Statement> = OnceCell::const_new(); 16 16 static INSERT_OP_STMT: OnceCell<Statement> = OnceCell::const_new(); 17 17 18 - pub async fn importer(log: Logger, pool: Pool) { 18 + pub async fn importer(log: Logger, pool: Pool, start_after: Option<String>) { 19 19 let plc_main = var("PLC_UPSTREAM").unwrap_or("https://plc.directory".to_string()); 20 20 let user_agent = var("PLC_USER_AGENT").unwrap_or("parakeet-plc".to_string()); 21 21 22 - let mut after = var("PLC_START_AFTER").ok(); 22 + let mut after = start_after; 23 + 24 + if let Some(after) = &after { 25 + info!(log, "starting import at {after}"); 26 + } 23 27 24 28 let client = Client::builder().user_agent(user_agent).build().unwrap(); 25 29 let conn = pool.get().await.unwrap();
+13
src/lib.rs
··· 59 59 60 60 Ok(pool) 61 61 } 62 + 63 + pub async fn get_start_after(pool: &Pool) -> eyre::Result<Option<String>> { 64 + if let Some(env_start_after) = var("PLC_START_AFTER").ok() { 65 + return Ok(Some(env_start_after)); 66 + } 67 + 68 + let obj = pool.get().await?; 69 + let last_ts = db::get_last_operation_ts(&obj) 70 + .await? 71 + .map(|v| v.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)); 72 + 73 + Ok(last_ts) 74 + }
+7 -1
src/main.rs
··· 14 14 15 15 let pool = plc_mirror::connect_db().await?; 16 16 17 - tokio::spawn(plc_mirror::import::importer(log.clone(), pool.clone())); 17 + let start_after = plc_mirror::get_start_after(&pool).await?; 18 + 19 + tokio::spawn(plc_mirror::import::importer( 20 + log.clone(), 21 + pool.clone(), 22 + start_after, 23 + )); 18 24 19 25 let bind_address = var("PLC_BIND_ADDR") 20 26 .unwrap_or(DEFAULT_BIND.to_string())
+3 -1
src/sql/init.sql
··· 16 16 created_at timestamptz not null default now(), 17 17 18 18 primary key (did, hash) 19 - ); 19 + ); 20 + 21 + create index if not exists operations_created on operations (created_at);