tangled
alpha
login
or
join now
parakeet.at
/
parakeet
63
fork
atom
Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview
atproto
bluesky
rust
appserver
63
fork
atom
overview
issues
12
pulls
pipelines
start on commit processing
mia.omg.lol
1 year ago
e8d9e58c
f2c50b6a
+143
-9
4 changed files
expand all
collapse all
unified
split
Cargo.lock
consumer
Cargo.toml
src
indexer
mod.rs
types.rs
+54
-5
Cargo.lock
···
83
]
84
85
[[package]]
0
0
0
0
0
0
86
name = "async-trait"
87
version = "0.1.85"
88
source = "registry+https://github.com/rust-lang/crates.io-index"
···
251
"multihash",
252
"serde",
253
"serde_bytes",
254
-
"unsigned-varint",
255
]
256
257
[[package]]
···
312
"figment",
313
"futures",
314
"ipld-core",
0
315
"parakeet-db",
316
"serde",
317
"serde_bytes",
···
839
]
840
841
[[package]]
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
842
name = "is_terminal_polyfill"
843
version = "1.70.1"
844
source = "registry+https://github.com/rust-lang/crates.io-index"
···
949
dependencies = [
950
"core2",
951
"serde",
952
-
"unsigned-varint",
953
]
954
955
[[package]]
···
1095
"eyre",
1096
"serde",
1097
"serde_json",
1098
-
"thiserror",
1099
"walkdir",
1100
]
1101
···
1571
1572
[[package]]
1573
name = "thiserror"
0
0
0
0
0
0
0
0
0
1574
version = "2.0.11"
1575
source = "registry+https://github.com/rust-lang/crates.io-index"
1576
checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc"
1577
dependencies = [
1578
-
"thiserror-impl",
0
0
0
0
0
0
0
0
0
0
0
1579
]
1580
1581
[[package]]
···
1812
"native-tls",
1813
"rand",
1814
"sha1",
1815
-
"thiserror",
1816
"utf-8",
1817
]
1818
···
1857
version = "0.1.3"
1858
source = "registry+https://github.com/rust-lang/crates.io-index"
1859
checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0"
0
0
0
0
0
0
1860
1861
[[package]]
1862
name = "unsigned-varint"
···
83
]
84
85
[[package]]
86
+
name = "anyhow"
87
+
version = "1.0.95"
88
+
source = "registry+https://github.com/rust-lang/crates.io-index"
89
+
checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04"
90
+
91
+
[[package]]
92
name = "async-trait"
93
version = "0.1.85"
94
source = "registry+https://github.com/rust-lang/crates.io-index"
···
257
"multihash",
258
"serde",
259
"serde_bytes",
260
+
"unsigned-varint 0.8.0",
261
]
262
263
[[package]]
···
318
"figment",
319
"futures",
320
"ipld-core",
321
+
"iroh-car",
322
"parakeet-db",
323
"serde",
324
"serde_bytes",
···
846
]
847
848
[[package]]
849
+
name = "iroh-car"
850
+
version = "0.5.1"
851
+
source = "registry+https://github.com/rust-lang/crates.io-index"
852
+
checksum = "cb7f8cd4cb9aa083fba8b52e921764252d0b4dcb1cd6d120b809dbfe1106e81a"
853
+
dependencies = [
854
+
"anyhow",
855
+
"cid",
856
+
"futures",
857
+
"serde",
858
+
"serde_ipld_dagcbor",
859
+
"thiserror 1.0.69",
860
+
"tokio",
861
+
"unsigned-varint 0.7.2",
862
+
]
863
+
864
+
[[package]]
865
name = "is_terminal_polyfill"
866
version = "1.70.1"
867
source = "registry+https://github.com/rust-lang/crates.io-index"
···
972
dependencies = [
973
"core2",
974
"serde",
975
+
"unsigned-varint 0.8.0",
976
]
977
978
[[package]]
···
1118
"eyre",
1119
"serde",
1120
"serde_json",
1121
+
"thiserror 2.0.11",
1122
"walkdir",
1123
]
1124
···
1594
1595
[[package]]
1596
name = "thiserror"
1597
+
version = "1.0.69"
1598
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1599
+
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
1600
+
dependencies = [
1601
+
"thiserror-impl 1.0.69",
1602
+
]
1603
+
1604
+
[[package]]
1605
+
name = "thiserror"
1606
version = "2.0.11"
1607
source = "registry+https://github.com/rust-lang/crates.io-index"
1608
checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc"
1609
dependencies = [
1610
+
"thiserror-impl 2.0.11",
1611
+
]
1612
+
1613
+
[[package]]
1614
+
name = "thiserror-impl"
1615
+
version = "1.0.69"
1616
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1617
+
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
1618
+
dependencies = [
1619
+
"proc-macro2",
1620
+
"quote",
1621
+
"syn",
1622
]
1623
1624
[[package]]
···
1855
"native-tls",
1856
"rand",
1857
"sha1",
1858
+
"thiserror 2.0.11",
1859
"utf-8",
1860
]
1861
···
1900
version = "0.1.3"
1901
source = "registry+https://github.com/rust-lang/crates.io-index"
1902
checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0"
1903
+
1904
+
[[package]]
1905
+
name = "unsigned-varint"
1906
+
version = "0.7.2"
1907
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1908
+
checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105"
1909
1910
[[package]]
1911
name = "unsigned-varint"
+1
consumer/Cargo.toml
···
12
figment = { version = "0.10.19", features = ["env", "toml"] }
13
futures = "0.3.31"
14
ipld-core = "0.4.1"
0
15
parakeet-db = { path = "../parakeet-db" }
16
serde = { version = "1.0.217", features = ["derive"] }
17
serde_bytes = "0.11"
···
12
figment = { version = "0.10.19", features = ["env", "toml"] }
13
futures = "0.3.31"
14
ipld-core = "0.4.1"
15
+
iroh-car = "0.5.1"
16
parakeet-db = { path = "../parakeet-db" }
17
serde = { version = "1.0.217", features = ["derive"] }
18
serde_bytes = "0.11"
+64
-4
consumer/src/indexer/mod.rs
···
0
0
0
1
use diesel_async::AsyncPgConnection;
2
-
use diesel_async::pooled_connection::deadpool::Pool;
3
-
use crate::firehose::{AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, FirehoseEvent};
0
4
use tokio::sync::mpsc::Receiver;
0
5
6
-
pub async fn relay_indexer(pool: Pool<AsyncPgConnection>, mut rx: Receiver<FirehoseEvent>) -> eyre::Result<()> {
0
0
0
0
0
7
let mut conn = pool.get().await?;
8
9
while let Some(event) = rx.recv().await {
10
let res = match event {
11
FirehoseEvent::Identity(identity) => index_identity(&mut conn, identity).await,
12
FirehoseEvent::Account(account) => index_account(&mut conn, account).await,
13
-
FirehoseEvent::Commit(commit) => index_commit(&mut conn, commit).await,
0
0
0
0
0
0
0
0
14
FirehoseEvent::Label(_) => {
15
// We handle all labels through direct connections to labelers
16
tracing::warn!("got #labels from the relay");
···
35
}
36
37
async fn index_commit(conn: &mut AsyncPgConnection, commit: AtpCommitEvent) -> eyre::Result<()> {
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
38
Ok(())
39
}
···
1
+
use crate::firehose::{AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, FirehoseEvent};
2
+
use crate::indexer::types::{CollectionType, RecordTypes};
3
+
use diesel_async::pooled_connection::deadpool::Pool;
4
use diesel_async::AsyncPgConnection;
5
+
use futures::StreamExt;
6
+
use ipld_core::cid::Cid;
7
+
use std::collections::HashMap;
8
use tokio::sync::mpsc::Receiver;
9
+
use tracing::Instrument;
10
11
+
mod types;
12
+
13
+
pub async fn relay_indexer(
14
+
pool: Pool<AsyncPgConnection>,
15
+
mut rx: Receiver<FirehoseEvent>,
16
+
) -> eyre::Result<()> {
17
let mut conn = pool.get().await?;
18
19
while let Some(event) = rx.recv().await {
20
let res = match event {
21
FirehoseEvent::Identity(identity) => index_identity(&mut conn, identity).await,
22
FirehoseEvent::Account(account) => index_account(&mut conn, account).await,
23
+
FirehoseEvent::Commit(commit) => {
24
+
let span = tracing::info_span!(
25
+
"commit",
26
+
seq = commit.seq,
27
+
repo = commit.repo,
28
+
rev = commit.rev
29
+
);
30
+
index_commit(&mut conn, commit).instrument(span).await
31
+
}
32
FirehoseEvent::Label(_) => {
33
// We handle all labels through direct connections to labelers
34
tracing::warn!("got #labels from the relay");
···
53
}
54
55
async fn index_commit(conn: &mut AsyncPgConnection, commit: AtpCommitEvent) -> eyre::Result<()> {
56
+
// turn the car slice into a map of cid:block
57
+
let car_reader = iroh_car::CarReader::new(commit.blocks.as_slice()).await?;
58
+
let blocks = car_reader
59
+
.stream()
60
+
.filter_map(|car| async move { car.ok() })
61
+
.collect::<HashMap<Cid, Vec<u8>>>()
62
+
.await;
63
+
64
+
for op in &commit.ops {
65
+
let Some((collection_raw, rkey)) = op.path.split_once("/") else {
66
+
tracing::warn!("op contained invalid path {}", op.path);
67
+
continue;
68
+
};
69
+
70
+
let collection = CollectionType::from_str(collection_raw);
71
+
if collection == CollectionType::Unsupported {
72
+
tracing::debug!("{} {collection_raw} is unsupported", op.action);
73
+
continue;
74
+
}
75
+
76
+
if op.action == "create" || op.action == "update" {
77
+
if let Some(block) = op.cid.and_then(|cid| blocks.get(&cid)) {
78
+
let decoded: RecordTypes = match serde_ipld_dagcbor::from_slice(block) {
79
+
Ok(decoded) => decoded,
80
+
Err(err) => {
81
+
tracing::error!("Failed to decode record: {err}");
82
+
continue;
83
+
}
84
+
};
85
+
86
+
dbg!(decoded);
87
+
} else {
88
+
tracing::error!("Missing Cid or the block was not found");
89
+
continue;
90
+
}
91
+
} else if op.action == "delete" {
92
+
//
93
+
} else {
94
+
tracing::warn!("op contained invalid action {}", op.action);
95
+
}
96
+
}
97
+
98
Ok(())
99
}
+24
consumer/src/indexer/types.rs
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
use serde::{Deserialize, Serialize};
2
+
3
+
#[derive(Debug, Deserialize, Serialize)]
4
+
#[serde(tag = "$type")]
5
+
pub enum RecordTypes {}
6
+
7
+
#[derive(Debug, PartialOrd, PartialEq)]
8
+
pub enum CollectionType {
9
+
Unsupported,
10
+
}
11
+
12
+
impl CollectionType {
13
+
pub(crate) fn from_str(input: &str) -> CollectionType {
14
+
match input {
15
+
_ => CollectionType::Unsupported,
16
+
}
17
+
}
18
+
19
+
pub fn can_update(&self) -> bool {
20
+
match self {
21
+
CollectionType::Unsupported => false,
22
+
}
23
+
}
24
+
}