tangled
alpha
login
or
join now
parakeet.at
/
parakeet
63
fork
atom
Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview
atproto
bluesky
rust
appserver
63
fork
atom
overview
issues
12
pulls
pipelines
start on the relay consumer and indexer
mia.omg.lol
1 year ago
23d526f7
72532422
+75
-1
2 changed files
expand all
collapse all
unified
split
consumer
src
indexer
mod.rs
main.rs
+35
consumer/src/indexer/mod.rs
···
1
1
+
use crate::firehose::{AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, FirehoseEvent};
2
2
+
use tokio::sync::mpsc::Receiver;
3
3
+
4
4
+
pub async fn relay_indexer(mut rx: Receiver<FirehoseEvent>) -> eyre::Result<()> {
5
5
+
while let Some(event) = rx.recv().await {
6
6
+
let res = match event {
7
7
+
FirehoseEvent::Identity(identity) => index_identity(identity).await,
8
8
+
FirehoseEvent::Account(account) => index_account(account).await,
9
9
+
FirehoseEvent::Commit(commit) => index_commit(commit).await,
10
10
+
FirehoseEvent::Label(_) => {
11
11
+
// We handle all labels through direct connections to labelers
12
12
+
tracing::warn!("got #labels from the relay");
13
13
+
Ok(())
14
14
+
}
15
15
+
};
16
16
+
17
17
+
if let Err(e) = res {
18
18
+
tracing::error!("Indexing error: {e}");
19
19
+
}
20
20
+
}
21
21
+
22
22
+
Ok(())
23
23
+
}
24
24
+
25
25
+
async fn index_identity(identity: AtpIdentityEvent) -> eyre::Result<()> {
26
26
+
Ok(())
27
27
+
}
28
28
+
29
29
+
async fn index_account(account: AtpAccountEvent) -> eyre::Result<()> {
30
30
+
Ok(())
31
31
+
}
32
32
+
33
33
+
async fn index_commit(commit: AtpCommitEvent) -> eyre::Result<()> {
34
34
+
Ok(())
35
35
+
}
+40
-1
consumer/src/main.rs
···
1
1
+
use tokio::sync::mpsc::Sender;
2
2
+
1
3
mod config;
2
4
mod firehose;
5
5
+
mod indexer;
3
6
4
7
#[tokio::main]
5
5
-
async fn main() -> eyre::Result<()> {
8
8
+
async fn main() -> eyre::Result<()> {
6
9
tracing_subscriber::fmt::init();
7
10
8
11
let conf = config::load_config()?;
12
12
+
13
13
+
let (tx, rx) = tokio::sync::mpsc::channel::<firehose::FirehoseEvent>(64);
14
14
+
15
15
+
let relay_firehose = firehose::FirehoseConsumer::new_relay(&conf.relay_source, None).await?;
16
16
+
17
17
+
let firehose_handle = tokio::spawn(relay_consumer(relay_firehose, tx));
18
18
+
let indexer_handle = tokio::spawn(indexer::relay_indexer(rx));
19
19
+
20
20
+
let (firehose_res, indexer_res) = tokio::try_join!{
21
21
+
firehose_handle,
22
22
+
indexer_handle,
23
23
+
}?;
24
24
+
25
25
+
firehose_res.and(indexer_res)
26
26
+
}
27
27
+
28
28
+
async fn relay_consumer(
29
29
+
mut consumer: firehose::FirehoseConsumer,
30
30
+
tx: Sender<firehose::FirehoseEvent>,
31
31
+
) -> eyre::Result<()> {
32
32
+
loop {
33
33
+
let event = consumer.drive().await?;
34
34
+
match event {
35
35
+
firehose::FirehoseOutput::Close => break,
36
36
+
firehose::FirehoseOutput::Continue => continue,
37
37
+
firehose::FirehoseOutput::Error(err) => {
38
38
+
tracing::error!("Firehose sent an error, exiting: {err:?}");
39
39
+
break;
40
40
+
}
41
41
+
firehose::FirehoseOutput::Event(event) => {
42
42
+
if let Err(e) = tx.send(event).await {
43
43
+
tracing::error!("Error sending event: {e}");
44
44
+
}
45
45
+
}
46
46
+
}
47
47
+
}
9
48
10
49
Ok(())
11
50
}