A PLC Mirror written in Rust
1use crate::types::{PlcEntry, PlcOperationType};
2use deadpool_postgres::{Object, Pool};
3use reqwest::Client;
4use slog::{Logger, debug, error, info};
5use std::env::var;
6use std::time::{Duration, Instant};
7use tokio::sync::OnceCell;
8use tokio_postgres::Statement;
9use tokio_postgres::types::Type;
10
11// PLC rate limit is 100req/min - we aim a little lower
12const SLEEP_TIME_F: f32 = (60. / 90.) * 1000.;
13const SLEEP_TIME: Duration = Duration::from_millis(SLEEP_TIME_F as u64);
14
15static INSERT_DID_STMT: OnceCell<Statement> = OnceCell::const_new();
16static INSERT_OP_STMT: OnceCell<Statement> = OnceCell::const_new();
17
18pub async fn importer(log: Logger, pool: Pool, start_after: Option<String>) {
19 let plc_main = var("PLC_UPSTREAM").unwrap_or("https://plc.directory".to_string());
20 let user_agent = var("PLC_USER_AGENT").unwrap_or("parakeet-plc".to_string());
21
22 let mut after = start_after;
23
24 if let Some(after) = &after {
25 info!(log, "starting import at {after}");
26 }
27
28 let client = Client::builder().user_agent(user_agent).build().unwrap();
29 let conn = pool.get().await.unwrap();
30
31 loop {
32 let start = Instant::now();
33
34 match upstream_export(&client, &plc_main, after.clone()).await {
35 Ok((entries, errors)) => {
36 debug!(
37 log,
38 "exported {} entries, {} failed",
39 entries.len(),
40 errors.len()
41 );
42
43 if !errors.is_empty() {
44 error!(log, "{} entries failed to decode", errors.len());
45 }
46
47 for entry in &entries {
48 if let Err(e) = insert_did_and_op(&conn, entry).await {
49 error!(log, "failed to insert operation for did {}: {e}", entry.did);
50 }
51 }
52
53 if let Some(new_after) = entries.last() {
54 let new_after = new_after.created_at;
55 after = Some(new_after.to_rfc3339_opts(chrono::SecondsFormat::Millis, true));
56 }
57 }
58 Err(err) => error!(log, "failed to export PLC directory: {err}"),
59 }
60
61 let end = start.elapsed();
62 tokio::time::sleep(SLEEP_TIME.saturating_sub(end)).await;
63 }
64}
65
66async fn upstream_export(
67 client: &Client,
68 plc: &str,
69 after: Option<String>,
70) -> eyre::Result<(Vec<PlcEntry>, Vec<serde_json::Error>)> {
71 let mut uri = format!("{plc}/export?count=1000");
72 if let Some(after) = after {
73 uri += format!("&after={}", after).as_str();
74 }
75
76 let export_res = client.get(uri).send().await?;
77
78 let export = export_res.text().await?;
79 let (exports, errors) = export
80 .lines()
81 .map(serde_json::from_str)
82 .fold((Vec::new(), Vec::new()), fold_exports);
83
84 Ok((exports, errors))
85}
86
87fn fold_exports<A, B>(
88 (mut oks, mut errors): (Vec<A>, Vec<B>),
89 current: Result<A, B>,
90) -> (Vec<A>, Vec<B>) {
91 match current {
92 Ok(data) => oks.push(data),
93 Err(err) => errors.push(err),
94 };
95
96 (oks, errors)
97}
98
99async fn insert_did_and_op(conn: &Object, entry: &PlcEntry) -> eyre::Result<()> {
100 let insert_did_stmt = INSERT_DID_STMT
101 .get_or_init(|| init_insert_did_stmt(conn))
102 .await;
103 let insert_op_stmt = INSERT_OP_STMT
104 .get_or_init(|| init_insert_op_stmt(conn))
105 .await;
106
107 let operation = serde_json::to_value(&entry.operation)?;
108
109 let (prev, sig) = match &entry.operation {
110 PlcOperationType::Create(op) => (op.prev.map(|v| v.to_string()), &op.sig),
111 PlcOperationType::Tombstone(op) => (op.prev.map(|v| v.to_string()), &op.sig),
112 PlcOperationType::Operation(op) => (op.prev.map(|v| v.to_string()), &op.sig),
113 };
114
115 conn.execute(insert_did_stmt, &[&entry.did]).await?;
116 conn.execute(
117 insert_op_stmt,
118 &[
119 &entry.did,
120 &entry.cid.to_string(),
121 &prev,
122 &sig,
123 &entry.nullified,
124 &operation,
125 &entry.created_at,
126 ],
127 )
128 .await?;
129
130 Ok(())
131}
132
133async fn init_insert_did_stmt(conn: &Object) -> Statement {
134 conn.prepare_typed(
135 "INSERT INTO dids (did) VALUES ($1) ON CONFLICT DO NOTHING",
136 &[Type::TEXT],
137 )
138 .await
139 .unwrap()
140}
141
142async fn init_insert_op_stmt(conn: &Object) -> Statement {
143 conn.prepare_typed(
144 include_str!("sql/insert_op.sql"),
145 &[
146 Type::TEXT,
147 Type::TEXT,
148 Type::TEXT,
149 Type::TEXT,
150 Type::BOOL,
151 Type::JSONB,
152 Type::TIMESTAMPTZ,
153 ],
154 )
155 .await
156 .unwrap()
157}