···5566[dependencies]
77chrono = { version = "0.4.39", features = ["serde"] }
88+deadpool-postgres = "0.14.1"
89dropshot = "0.16.0"
910eyre = "0.6.12"
1011ipld-core = { version = "0.4.1", features = ["serde"] }
···1516slog-scope = { version = "4.4.0" }
1617slog-stdlog = { version = "4.1.1" }
1718tokio = { version = "1.0", features = [ "full" ] }
1919+tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
+3-1
src/import.rs
···11use crate::types::PlcOperationType;
22use crate::utils::*;
33use chrono::{DateTime, Utc};
44+use deadpool_postgres::Pool;
45use ipld_core::cid::Cid;
56use reqwest::Client;
67use serde::{Deserialize, Serialize};
···1213const SLEEP_TIME_F: f32 = (60. / 90.) * 1000.;
1314const SLEEP_TIME: Duration = Duration::from_millis(SLEEP_TIME_F as u64);
14151515-pub async fn importer(log: Logger) {
1616+pub async fn importer(log: Logger, pool: Pool) {
1617 let plc_main = var("PLC_UPSTREAM").unwrap_or("https://plc.directory".to_string());
1718 let user_agent = var("PLC_USER_AGENT").unwrap_or("parakeet-plc".to_string());
18191920 let mut after = var("PLC_START_AFTER").ok();
20212122 let client = Client::builder().user_agent(user_agent).build().unwrap();
2323+ let conn = pool.get().await.unwrap();
22242325 loop {
2426 let start = Instant::now();
+23-2
src/lib.rs
···11+use deadpool_postgres::{Manager, Pool};
12use dropshot::{ApiDescription, ConfigLogging, ConfigLoggingLevel};
33+use eyre::Context;
24use slog::Logger;
55+use std::env::var;
66+use std::str::FromStr;
77+use tokio_postgres::{Config, NoTls};
3849pub mod import;
510mod types;
611mod utils;
712813#[derive(Clone)]
99-pub struct ApiContext {}
1414+pub struct ApiContext {
1515+ pub pool: Pool,
1616+}
10171118pub fn create_logger() -> eyre::Result<Logger> {
1219 let log = ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Info }
···2128 // TODO: api endpoints
22292330 Ok(api)
2424-}3131+}
3232+3333+pub async fn connect_db() -> eyre::Result<Pool> {
3434+ let db_uri = var("PLC_DB_URI").wrap_err("PLC_DB_URI missing")?;
3535+3636+ let cfg = Config::from_str(&db_uri)?;
3737+ let mgr = Manager::from_config(cfg, NoTls, Default::default());
3838+ let pool = Pool::builder(mgr).build()?;
3939+4040+ // run the init script
4141+ let init_conn = pool.get().await?;
4242+ init_conn.simple_query(include_str!("sql/init.sql")).await?;
4343+4444+ Ok(pool)
4545+}
+4-2
src/main.rs
···1212 let _scope = slog_scope::set_global_logger(log.clone());
1313 slog_stdlog::init()?;
14141515- tokio::spawn(plc_mirror::import::importer(log.clone()));
1515+ let pool = plc_mirror::connect_db().await?;
1616+1717+ tokio::spawn(plc_mirror::import::importer(log.clone(), pool.clone()));
16181719 let bind_address = var("PLC_BIND_ADDR")
1820 .unwrap_or(DEFAULT_BIND.to_string())
1921 .parse()?;
20222121- let context = plc_mirror::ApiContext {};
2323+ let context = plc_mirror::ApiContext { pool };
22242325 let server = ServerBuilder::new(api, context, log)
2426 .config(ConfigDropshot {
+19
src/sql/init.sql
···11+create table if not exists dids
22+(
33+ did text primary key
44+);
55+66+create table if not exists operations
77+(
88+ did text not null references dids (did),
99+ hash text not null,
1010+ prev text,
1111+ sig text not null,
1212+ nullified bool not null default false,
1313+1414+ operation jsonb not null,
1515+1616+ created_at timestamptz not null default now(),
1717+1818+ primary key (did, hash)
1919+);