tangled
alpha
login
or
join now
parakeet.at
/
parakeet
63
fork
atom
Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview
atproto
bluesky
rust
appserver
63
fork
atom
overview
issues
12
pulls
pipelines
feat(consumer): profiles
mia.omg.lol
1 year ago
99124579
285d97f4
+219
-4
10 changed files
expand all
collapse all
unified
split
consumer
src
indexer
db.rs
mod.rs
records.rs
types.rs
main.rs
utils.rs
migrations
2025-01-30-204801_profiles
down.sql
up.sql
parakeet-db
src
models.rs
schema.rs
+34
consumer/src/indexer/db.rs
···
1
1
use super::records;
2
2
+
use crate::utils::{blob_ref, strongref_to_parts};
2
3
use chrono::prelude::*;
3
4
use diesel::prelude::*;
4
5
use diesel::sql_types::{Array, Text};
···
127
128
.execute(conn)
128
129
.await
129
130
}
131
131
+
132
132
+
pub async fn upsert_profile(
133
133
+
conn: &mut AsyncPgConnection,
134
134
+
repo: &str,
135
135
+
cid: Cid,
136
136
+
rec: records::AppBskyActorProfile,
137
137
+
) -> QueryResult<usize> {
138
138
+
let (pinned_uri, pinned_cid) = strongref_to_parts(rec.pinned_post.as_ref());
139
139
+
let (joined_sp_uri, joined_sp_cid) = strongref_to_parts(rec.joined_via_starter_pack.as_ref());
140
140
+
141
141
+
let data = models::UpsertProfile {
142
142
+
did: repo,
143
143
+
cid: cid.to_string(),
144
144
+
avatar_cid: blob_ref(rec.avatar),
145
145
+
banner_cid: blob_ref(rec.banner),
146
146
+
display_name: rec.display_name,
147
147
+
description: rec.description,
148
148
+
pinned_uri,
149
149
+
pinned_cid,
150
150
+
joined_sp_uri,
151
151
+
joined_sp_cid,
152
152
+
created_at: rec.created_at.map(|val| val.naive_utc()),
153
153
+
indexed_at: Utc::now().naive_utc(),
154
154
+
};
155
155
+
156
156
+
diesel::insert_into(schema::profiles::table)
157
157
+
.values(&data)
158
158
+
.on_conflict(schema::profiles::did)
159
159
+
.do_update()
160
160
+
.set(&data)
161
161
+
.execute(conn)
162
162
+
.await
163
163
+
}
+16
-4
consumer/src/indexer/mod.rs
···
10
10
use tracing::{instrument, Instrument};
11
11
12
12
mod db;
13
13
-
mod records;
13
13
+
pub mod records;
14
14
mod types;
15
15
16
16
pub async fn relay_indexer(
···
133
133
op: &CommitOp,
134
134
blocks: &HashMap<Cid, Vec<u8>>,
135
135
) -> diesel::QueryResult<()> {
136
136
-
let Some((collection_raw, _)) = op.path.split_once("/") else {
136
136
+
let Some((collection_raw, rkey)) = op.path.split_once("/") else {
137
137
tracing::warn!("op contained invalid path {}", op.path);
138
138
return Ok(());
139
139
};
···
151
151
return Ok(());
152
152
}
153
153
154
154
-
let Some(block) = op.cid.and_then(|cid| blocks.get(&cid)) else {
155
155
-
tracing::error!("Missing Cid or the block was not found");
154
154
+
let Some(cid) = op.cid else {
155
155
+
tracing::error!("Missing cid for {} op", op.action);
156
156
+
return Ok(());
157
157
+
};
158
158
+
159
159
+
let Some(block) = blocks.get(&cid) else {
160
160
+
tracing::error!("Block was not found for cid");
156
161
return Ok(());
157
162
};
158
163
···
165
170
};
166
171
167
172
match decoded {
173
173
+
RecordTypes::AppBskyActorProfile(record) => {
174
174
+
if rkey != "self" {
175
175
+
return Ok(());
176
176
+
}
177
177
+
178
178
+
db::upsert_profile(conn, repo, cid, record).await?;
179
179
+
}
168
180
RecordTypes::AppBskyGraphBlock(record) => {
169
181
db::insert_block(conn, repo, &full_path, record).await?;
170
182
}
+35
consumer/src/indexer/records.rs
···
1
1
+
use crate::utils;
1
2
use chrono::{DateTime, Utc};
3
3
+
use ipld_core::cid::Cid;
2
4
use serde::{Deserialize, Serialize};
5
5
+
6
6
+
#[derive(Debug, Deserialize, Serialize)]
7
7
+
pub struct StrongRef {
8
8
+
#[serde(
9
9
+
deserialize_with = "utils::cid_from_string",
10
10
+
serialize_with = "utils::cid_as_str"
11
11
+
)]
12
12
+
pub cid: Cid,
13
13
+
pub uri: String,
14
14
+
}
15
15
+
16
16
+
#[derive(Debug, Deserialize, Serialize)]
17
17
+
#[serde(tag = "$type")]
18
18
+
#[serde(rename = "blob")]
19
19
+
#[serde(rename_all = "camelCase")]
20
20
+
pub struct Blob {
21
21
+
pub mime_type: String,
22
22
+
#[serde(serialize_with = "utils::cid_as_link")]
23
23
+
pub r#ref: Cid,
24
24
+
pub size: i32,
25
25
+
}
26
26
+
27
27
+
#[derive(Debug, Deserialize, Serialize)]
28
28
+
#[serde(rename_all = "camelCase")]
29
29
+
pub struct AppBskyActorProfile {
30
30
+
pub display_name: Option<String>,
31
31
+
pub description: Option<String>,
32
32
+
pub avatar: Option<Blob>,
33
33
+
pub banner: Option<Blob>,
34
34
+
pub joined_via_starter_pack: Option<StrongRef>,
35
35
+
pub pinned_post: Option<StrongRef>,
36
36
+
pub created_at: Option<DateTime<Utc>>,
37
37
+
}
3
38
4
39
#[derive(Debug, Deserialize, Serialize)]
5
40
#[serde(rename_all = "camelCase")]
+5
consumer/src/indexer/types.rs
···
4
4
#[derive(Debug, Deserialize, Serialize)]
5
5
#[serde(tag = "$type")]
6
6
pub enum RecordTypes {
7
7
+
#[serde(rename = "app.bsky.actor.profile")]
8
8
+
AppBskyActorProfile(records::AppBskyActorProfile),
7
9
#[serde(rename = "app.bsky.graph.block")]
8
10
AppBskyGraphBlock(records::AppBskyGraphBlock),
9
11
#[serde(rename = "app.bsky.graph.follow")]
···
12
14
13
15
#[derive(Debug, PartialOrd, PartialEq)]
14
16
pub enum CollectionType {
17
17
+
BskyProfile,
15
18
BskyBlock,
16
19
BskyFollow,
17
20
Unsupported,
···
20
23
impl CollectionType {
21
24
pub(crate) fn from_str(input: &str) -> CollectionType {
22
25
match input {
26
26
+
"app.bsky.actor.profile" => CollectionType::BskyProfile,
23
27
"app.bsky.graph.block" => CollectionType::BskyBlock,
24
28
"app.bsky.graph.follow" => CollectionType::BskyFollow,
25
29
_ => CollectionType::Unsupported,
···
28
32
29
33
pub fn can_update(&self) -> bool {
30
34
match self {
35
35
+
CollectionType::BskyProfile => true,
31
36
CollectionType::BskyBlock => false,
32
37
CollectionType::BskyFollow => false,
33
38
CollectionType::Unsupported => false,
+1
consumer/src/main.rs
···
6
6
mod config;
7
7
mod firehose;
8
8
mod indexer;
9
9
+
mod utils;
9
10
10
11
#[tokio::main]
11
12
async fn main() -> eyre::Result<()> {
+39
consumer/src/utils.rs
···
1
1
+
use ipld_core::cid::Cid;
2
2
+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
3
3
+
4
4
+
pub fn cid_from_string<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Cid, D::Error> {
5
5
+
let str = String::deserialize(deserializer)?;
6
6
+
7
7
+
Cid::try_from(str).map_err(serde::de::Error::custom)
8
8
+
}
9
9
+
10
10
+
pub fn cid_as_str<S>(inp: &Cid, serializer: S) -> Result<S::Ok, S::Error>
11
11
+
where
12
12
+
S: Serializer,
13
13
+
{
14
14
+
inp.to_string().serialize(serializer)
15
15
+
}
16
16
+
17
17
+
#[derive(Debug, Deserialize, Serialize)]
18
18
+
pub struct LinkRef {
19
19
+
#[serde(rename = "$link")]
20
20
+
link: String,
21
21
+
}
22
22
+
23
23
+
pub fn cid_as_link<S>(inp: &Cid, serializer: S) -> Result<S::Ok, S::Error>
24
24
+
where
25
25
+
S: Serializer,
26
26
+
{
27
27
+
LinkRef {
28
28
+
link: inp.to_string(),
29
29
+
}
30
30
+
.serialize(serializer)
31
31
+
}
32
32
+
33
33
+
pub fn blob_ref(blob: Option<crate::indexer::records::Blob>) -> Option<String> {
34
34
+
blob.map(|blob| blob.r#ref.to_string())
35
35
+
}
36
36
+
37
37
+
pub fn strongref_to_parts(strongref: Option<&crate::indexer::records::StrongRef>) -> (Option<String>, Option<String>) {
38
38
+
strongref.map(|sr| (sr.uri.clone(), sr.cid.to_string())).unzip()
39
39
+
}
+1
migrations/2025-01-30-204801_profiles/down.sql
···
1
1
+
drop table profiles;
+20
migrations/2025-01-30-204801_profiles/up.sql
···
1
1
+
create table profiles
2
2
+
(
3
3
+
did text primary key references actors (did),
4
4
+
cid text not null,
5
5
+
6
6
+
avatar_cid text,
7
7
+
banner_cid text,
8
8
+
9
9
+
display_name text,
10
10
+
description text,
11
11
+
12
12
+
pinned_uri text,
13
13
+
pinned_cid text,
14
14
+
15
15
+
joined_sp_uri text,
16
16
+
joined_sp_cid text,
17
17
+
18
18
+
created_at timestamp not null default now(),
19
19
+
indexed_at timestamp not null default now()
20
20
+
);
+49
parakeet-db/src/models.rs
···
55
55
pub followers: i32,
56
56
pub following: i32,
57
57
}
58
58
+
59
59
+
#[derive(Debug, Queryable, Selectable, Identifiable)]
60
60
+
#[diesel(table_name = crate::schema::profiles)]
61
61
+
#[diesel(primary_key(did))]
62
62
+
#[diesel(check_for_backend(diesel::pg::Pg))]
63
63
+
pub struct Profile {
64
64
+
pub did: String,
65
65
+
pub cid: String,
66
66
+
67
67
+
pub avatar_cid: Option<String>,
68
68
+
pub banner_cid: Option<String>,
69
69
+
70
70
+
pub display_name: Option<String>,
71
71
+
pub description: Option<String>,
72
72
+
73
73
+
pub pinned_uri: Option<String>,
74
74
+
pub pinned_cid: Option<String>,
75
75
+
76
76
+
pub joined_sp_uri: Option<String>,
77
77
+
pub joined_sp_cid: Option<String>,
78
78
+
79
79
+
pub created_at: NaiveDateTime,
80
80
+
pub indexed_at: NaiveDateTime,
81
81
+
}
82
82
+
83
83
+
#[derive(Insertable, AsChangeset)]
84
84
+
#[diesel(table_name = crate::schema::profiles)]
85
85
+
#[diesel(check_for_backend(diesel::pg::Pg))]
86
86
+
#[diesel(treat_none_as_null = true)]
87
87
+
pub struct UpsertProfile<'a> {
88
88
+
pub did: &'a str,
89
89
+
pub cid: String,
90
90
+
91
91
+
pub avatar_cid: Option<String>,
92
92
+
pub banner_cid: Option<String>,
93
93
+
94
94
+
pub display_name: Option<String>,
95
95
+
pub description: Option<String>,
96
96
+
97
97
+
pub pinned_uri: Option<String>,
98
98
+
pub pinned_cid: Option<String>,
99
99
+
100
100
+
pub joined_sp_uri: Option<String>,
101
101
+
pub joined_sp_cid: Option<String>,
102
102
+
103
103
+
#[diesel(treat_none_as_null = false)]
104
104
+
pub created_at: Option<NaiveDateTime>,
105
105
+
pub indexed_at: NaiveDateTime,
106
106
+
}
+19
parakeet-db/src/schema.rs
···
38
38
}
39
39
}
40
40
41
41
+
diesel::table! {
42
42
+
profiles (did) {
43
43
+
did -> Text,
44
44
+
cid -> Text,
45
45
+
avatar_cid -> Nullable<Text>,
46
46
+
banner_cid -> Nullable<Text>,
47
47
+
display_name -> Nullable<Text>,
48
48
+
description -> Nullable<Text>,
49
49
+
pinned_uri -> Nullable<Text>,
50
50
+
pinned_cid -> Nullable<Text>,
51
51
+
joined_sp_uri -> Nullable<Text>,
52
52
+
joined_sp_cid -> Nullable<Text>,
53
53
+
created_at -> Timestamp,
54
54
+
indexed_at -> Timestamp,
55
55
+
}
56
56
+
}
57
57
+
41
58
diesel::joinable!(blocks -> actors (did));
42
59
diesel::joinable!(follows -> actors (did));
60
60
+
diesel::joinable!(profiles -> actors (did));
43
61
44
62
diesel::allow_tables_to_appear_in_same_query!(
45
63
actors,
46
64
blocks,
47
65
follow_stats,
48
66
follows,
67
67
+
profiles,
49
68
);