···001use crate::firehose::{AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, FirehoseEvent};
2use tokio::sync::mpsc::Receiver;
34-pub async fn relay_indexer(mut rx: Receiver<FirehoseEvent>) -> eyre::Result<()> {
005 while let Some(event) = rx.recv().await {
6 let res = match event {
7- FirehoseEvent::Identity(identity) => index_identity(identity).await,
8- FirehoseEvent::Account(account) => index_account(account).await,
9- FirehoseEvent::Commit(commit) => index_commit(commit).await,
10 FirehoseEvent::Label(_) => {
11 // We handle all labels through direct connections to labelers
12 tracing::warn!("got #labels from the relay");
···22 Ok(())
23}
2425-async fn index_identity(identity: AtpIdentityEvent) -> eyre::Result<()> {
26 Ok(())
27}
2829-async fn index_account(account: AtpAccountEvent) -> eyre::Result<()> {
30 Ok(())
31}
3233-async fn index_commit(commit: AtpCommitEvent) -> eyre::Result<()> {
34 Ok(())
35}
···1+use diesel_async::AsyncPgConnection;
2+use diesel_async::pooled_connection::deadpool::Pool;
3use crate::firehose::{AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, FirehoseEvent};
4use tokio::sync::mpsc::Receiver;
56+pub async fn relay_indexer(pool: Pool<AsyncPgConnection>, mut rx: Receiver<FirehoseEvent>) -> eyre::Result<()> {
7+ let mut conn = pool.get().await?;
8+9 while let Some(event) = rx.recv().await {
10 let res = match event {
11+ FirehoseEvent::Identity(identity) => index_identity(&mut conn, identity).await,
12+ FirehoseEvent::Account(account) => index_account(&mut conn, account).await,
13+ FirehoseEvent::Commit(commit) => index_commit(&mut conn, commit).await,
14 FirehoseEvent::Label(_) => {
15 // We handle all labels through direct connections to labelers
16 tracing::warn!("got #labels from the relay");
···26 Ok(())
27}
2829+async fn index_identity(conn: &mut AsyncPgConnection, identity: AtpIdentityEvent) -> eyre::Result<()> {
30 Ok(())
31}
3233+async fn index_account(conn: &mut AsyncPgConnection, account: AtpAccountEvent) -> eyre::Result<()> {
34 Ok(())
35}
3637+async fn index_commit(conn: &mut AsyncPgConnection, commit: AtpCommitEvent) -> eyre::Result<()> {
38 Ok(())
39}
+7-1
consumer/src/main.rs
···0001use tokio::sync::mpsc::Sender;
23mod config;
···1011 let conf = config::load_config()?;
1200013 let (tx, rx) = tokio::sync::mpsc::channel::<firehose::FirehoseEvent>(64);
1415 let relay_firehose = firehose::FirehoseConsumer::new_relay(&conf.relay_source, None).await?;
1617 let firehose_handle = tokio::spawn(relay_consumer(relay_firehose, tx));
18- let indexer_handle = tokio::spawn(indexer::relay_indexer(rx));
1920 let (firehose_res, indexer_res) = tokio::try_join!{
21 firehose_handle,
···1+use diesel_async::AsyncPgConnection;
2+use diesel_async::pooled_connection::AsyncDieselConnectionManager;
3+use diesel_async::pooled_connection::deadpool::Pool;
4use tokio::sync::mpsc::Sender;
56mod config;
···1314 let conf = config::load_config()?;
1516+ let db_mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(&conf.database_url);
17+ let pool = Pool::builder(db_mgr).build()?;
18+19 let (tx, rx) = tokio::sync::mpsc::channel::<firehose::FirehoseEvent>(64);
2021 let relay_firehose = firehose::FirehoseConsumer::new_relay(&conf.relay_source, None).await?;
2223 let firehose_handle = tokio::spawn(relay_consumer(relay_firehose, tx));
24+ let indexer_handle = tokio::spawn(indexer::relay_indexer(pool.clone(), rx));
2526 let (firehose_res, indexer_res) = tokio::try_join!{
27 firehose_handle,
+9
diesel.toml
···000000000
···1+# For documentation on how to configure this file,
2+# see https://diesel.rs/guides/configuring-diesel-cli
3+4+[print_schema]
5+file = "parakeet-db/src/schema.rs"
6+custom_type_derives = ["diesel::query_builder::QueryId"]
7+8+[migrations_directory]
9+dir = "migrations"
···1+-- This file was automatically created by Diesel to setup helper functions
2+-- and other internal bookkeeping. This file is safe to edit, any future
3+-- changes will be added to existing projects as new migrations.
4+5+DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass);
6+DROP FUNCTION IF EXISTS diesel_set_updated_at();
···1+-- This file was automatically created by Diesel to setup helper functions
2+-- and other internal bookkeeping. This file is safe to edit, any future
3+-- changes will be added to existing projects as new migrations.
4+5+6+7+8+-- Sets up a trigger for the given table to automatically set a column called
9+-- `updated_at` whenever the row is modified (unless `updated_at` was included
10+-- in the modified columns)
11+--
12+-- # Example
13+--
14+-- ```sql
15+-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW());
16+--
17+-- SELECT diesel_manage_updated_at('users');
18+-- ```
19+CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$
20+BEGIN
21+ EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s
22+ FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl);
23+END;
24+$$ LANGUAGE plpgsql;
25+26+CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$
27+BEGIN
28+ IF (
29+ NEW IS DISTINCT FROM OLD AND
30+ NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at
31+ ) THEN
32+ NEW.updated_at := current_timestamp;
33+ END IF;
34+ RETURN NEW;
35+END;
36+$$ LANGUAGE plpgsql;
+9
parakeet-db/Cargo.toml
···000000000
···1+[package]
2+name = "parakeet-db"
3+version = "0.1.0"
4+edition = "2021"
5+6+[dependencies]
7+chrono = { version = "0.4.39", features = ["serde"] }
8+diesel = { version = "2.2.6", features = ["chrono", "serde_json"] }
9+serde_json = "1.0.134"