A PLC Mirror written in Rust

store operations

+80 -3
src/db.rs

This is a binary file and will not be displayed.

+73 -3
src/import.rs
··· 1 1 use crate::types::PlcOperationType; 2 2 use crate::utils::*; 3 3 use chrono::{DateTime, Utc}; 4 - use deadpool_postgres::Pool; 4 + use deadpool_postgres::{Object, Pool}; 5 5 use ipld_core::cid::Cid; 6 6 use reqwest::Client; 7 7 use serde::{Deserialize, Serialize}; 8 - use slog::{Logger, error, debug}; 8 + use slog::{Logger, debug, error}; 9 9 use std::env::var; 10 10 use std::time::{Duration, Instant}; 11 + use tokio::sync::OnceCell; 12 + use tokio_postgres::Statement; 13 + use tokio_postgres::types::Type; 11 14 12 15 // PLC rate limit is 100req/min - we aim a little lower 13 16 const SLEEP_TIME_F: f32 = (60. / 90.) * 1000.; 14 17 const SLEEP_TIME: Duration = Duration::from_millis(SLEEP_TIME_F as u64); 18 + 19 + static INSERT_DID_STMT: OnceCell<Statement> = OnceCell::const_new(); 20 + static INSERT_OP_STMT: OnceCell<Statement> = OnceCell::const_new(); 15 21 16 22 pub async fn importer(log: Logger, pool: Pool) { 17 23 let plc_main = var("PLC_UPSTREAM").unwrap_or("https://plc.directory".to_string()); ··· 35 41 ); 36 42 37 43 // todo: report errors 38 - // todo: DB inserts 44 + for entry in &entries { 45 + if let Err(e) = insert_did_and_op(&conn, entry).await { 46 + error!(log, "failed to insert operation for did {}: {e}", entry.did); 47 + } 48 + } 39 49 40 50 let new_after = entries.last().unwrap().created_at; 41 51 after = Some(new_after.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)); ··· 79 89 }; 80 90 81 91 (oks, errors) 92 + } 93 + 94 + async fn insert_did_and_op(conn: &Object, entry: &PlcExportEntry) -> eyre::Result<()> { 95 + let insert_did_stmt = INSERT_DID_STMT 96 + .get_or_init(|| init_insert_did_stmt(conn)) 97 + .await; 98 + let insert_op_stmt = INSERT_OP_STMT 99 + .get_or_init(|| init_insert_op_stmt(conn)) 100 + .await; 101 + 102 + let operation = serde_json::to_value(&entry.operation)?; 103 + 104 + let (prev, sig) = match &entry.operation { 105 + PlcOperationType::Create(op) => (op.prev.map(|v| v.to_string()), &op.sig), 106 + PlcOperationType::Tombstone(op) => (op.prev.map(|v| v.to_string()), &op.sig), 107 + PlcOperationType::Operation(op) => (op.prev.map(|v| v.to_string()), &op.sig), 108 + }; 109 + 110 + conn.execute(insert_did_stmt, &[&entry.did]).await?; 111 + conn.execute( 112 + insert_op_stmt, 113 + &[ 114 + &entry.did, 115 + &entry.cid.to_string(), 116 + &prev, 117 + &sig, 118 + &entry.nullified, 119 + &operation, 120 + &entry.created_at, 121 + ], 122 + ) 123 + .await?; 124 + 125 + Ok(()) 126 + } 127 + 128 + async fn init_insert_did_stmt(conn: &Object) -> Statement { 129 + conn.prepare_typed( 130 + "INSERT INTO dids (did) VALUES ($1) ON CONFLICT DO NOTHING", 131 + &[Type::TEXT], 132 + ) 133 + .await 134 + .unwrap() 135 + } 136 + 137 + async fn init_insert_op_stmt(conn: &Object) -> Statement { 138 + conn.prepare_typed( 139 + include_str!("sql/insert_op.sql"), 140 + &[ 141 + Type::TEXT, 142 + Type::TEXT, 143 + Type::TEXT, 144 + Type::TEXT, 145 + Type::BOOL, 146 + Type::JSONB, 147 + Type::TIMESTAMPTZ, 148 + ], 149 + ) 150 + .await 151 + .unwrap() 82 152 } 83 153 84 154 #[derive(Debug, Deserialize, Serialize)]
+1
src/lib.rs
··· 6 6 use std::str::FromStr; 7 7 use tokio_postgres::{Config, NoTls}; 8 8 9 + mod db; 9 10 pub mod import; 10 11 mod types; 11 12 mod utils;
+6
src/sql/insert_op.sql
··· 1 + INSERT INTO operations (did, hash, prev, sig, nullified, operation, created_at) 2 + VALUES ($1, $2, $3, $4, $5, $6, $7) 3 + ON CONFLICT (did, hash) DO UPDATE SET sig=excluded.sig, 4 + nullified=excluded.nullified, 5 + operation=excluded.operation, 6 + created_at=excluded.created_at