tangled
alpha
login
or
join now
parakeet.at
/
parakeet
63
fork
atom
Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview
atproto
bluesky
rust
appserver
63
fork
atom
overview
issues
12
pulls
pipelines
#account and #identity indexing
mia.omg.lol
1 year ago
7c6baa29
e8d9e58c
+199
-2
9 changed files
expand all
collapse all
unified
split
consumer
src
firehose
types.rs
indexer
db.rs
mod.rs
migrations
2025-01-26-171756_actors
down.sql
up.sql
parakeet-db
src
lib.rs
models.rs
schema.rs
types.rs
+11
consumer/src/firehose/types.rs
···
59
59
}
60
60
}
61
61
62
62
+
impl Into<parakeet_db::types::ActorStatus> for AtpAccountStatus {
63
63
+
fn into(self) -> parakeet_db::types::ActorStatus {
64
64
+
match self {
65
65
+
AtpAccountStatus::Takendown => parakeet_db::types::ActorStatus::Takendown,
66
66
+
AtpAccountStatus::Suspended => parakeet_db::types::ActorStatus::Suspended,
67
67
+
AtpAccountStatus::Deleted => parakeet_db::types::ActorStatus::Deleted,
68
68
+
AtpAccountStatus::Deactivated => parakeet_db::types::ActorStatus::Deactivated,
69
69
+
}
70
70
+
}
71
71
+
}
72
72
+
62
73
#[derive(Debug, Deserialize)]
63
74
pub struct AtpAccountEvent {
64
75
pub seq: u64,
+27
consumer/src/indexer/db.rs
···
1
1
+
use chrono::prelude::*;
2
2
+
use diesel::prelude::*;
3
3
+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
4
4
+
use parakeet_db::{models, schema, types};
5
5
+
6
6
+
pub async fn upsert_actor(
7
7
+
conn: &mut AsyncPgConnection,
8
8
+
did: &str,
9
9
+
handle: Option<Option<String>>,
10
10
+
status: Option<types::ActorStatus>,
11
11
+
time: DateTime<Utc>,
12
12
+
) -> QueryResult<usize> {
13
13
+
let data = models::NewActor {
14
14
+
did,
15
15
+
handle,
16
16
+
status,
17
17
+
last_indexed: Some(time.naive_utc()),
18
18
+
};
19
19
+
20
20
+
diesel::insert_into(schema::actors::table)
21
21
+
.values(&data)
22
22
+
.on_conflict(schema::actors::did)
23
23
+
.do_update()
24
24
+
.set(&data)
25
25
+
.execute(conn)
26
26
+
.await
27
27
+
}
+25
-2
consumer/src/indexer/mod.rs
···
4
4
use diesel_async::AsyncPgConnection;
5
5
use futures::StreamExt;
6
6
use ipld_core::cid::Cid;
7
7
+
use parakeet_db::types::ActorStatus;
7
8
use std::collections::HashMap;
8
9
use tokio::sync::mpsc::Receiver;
9
9
-
use tracing::Instrument;
10
10
+
use tracing::{instrument, Instrument};
10
11
12
12
+
mod db;
11
13
mod types;
12
14
13
15
pub async fn relay_indexer(
···
44
46
Ok(())
45
47
}
46
48
47
47
-
async fn index_identity(conn: &mut AsyncPgConnection, identity: AtpIdentityEvent) -> eyre::Result<()> {
49
49
+
#[instrument(skip_all, fields(seq = identity.seq, repo=identity.did))]
50
50
+
async fn index_identity(
51
51
+
conn: &mut AsyncPgConnection,
52
52
+
identity: AtpIdentityEvent,
53
53
+
) -> eyre::Result<()> {
54
54
+
db::upsert_actor(
55
55
+
conn,
56
56
+
&identity.did,
57
57
+
Some(identity.handle),
58
58
+
None,
59
59
+
identity.time,
60
60
+
)
61
61
+
.await?;
62
62
+
48
63
Ok(())
49
64
}
50
65
66
66
+
#[instrument(skip_all, fields(seq = account.seq, repo=account.did))]
51
67
async fn index_account(conn: &mut AsyncPgConnection, account: AtpAccountEvent) -> eyre::Result<()> {
68
68
+
let status = account
69
69
+
.status
70
70
+
.map(|status| status.into())
71
71
+
.unwrap_or(ActorStatus::Active);
72
72
+
73
73
+
db::upsert_actor(conn, &account.did, None, Some(status), account.time).await?;
74
74
+
52
75
Ok(())
53
76
}
54
77
+1
migrations/2025-01-26-171756_actors/down.sql
···
1
1
+
drop table actors;
+15
migrations/2025-01-26-171756_actors/up.sql
···
1
1
+
create table actors
2
2
+
(
3
3
+
did text primary key,
4
4
+
handle text,
5
5
+
6
6
+
-- active / takendown / suspended / deleted / deactivated
7
7
+
status text not null default 'active',
8
8
+
-- synced / dirty / processing
9
9
+
sync_state text not null default 'dirty',
10
10
+
11
11
+
repo_rev text,
12
12
+
repo_cid text,
13
13
+
14
14
+
last_indexed timestamp
15
15
+
);
+1
parakeet-db/src/lib.rs
···
1
1
pub mod models;
2
2
pub mod schema;
3
3
+
pub mod types;
+27
parakeet-db/src/models.rs
···
1
1
+
use crate::types::*;
2
2
+
use chrono::NaiveDateTime;
3
3
+
use diesel::prelude::*;
4
4
+
5
5
+
#[derive(Debug, Queryable, Selectable, Identifiable)]
6
6
+
#[diesel(table_name = crate::schema::actors)]
7
7
+
#[diesel(primary_key(did))]
8
8
+
#[diesel(check_for_backend(diesel::pg::Pg))]
9
9
+
pub struct Actor {
10
10
+
pub did: String,
11
11
+
pub handle: Option<String>,
12
12
+
pub status: ActorStatus,
13
13
+
pub sync_state: ActorSyncState,
14
14
+
pub repo_rev: Option<String>,
15
15
+
pub repo_cid: Option<String>,
16
16
+
pub last_indexed: Option<NaiveDateTime>,
17
17
+
}
18
18
+
19
19
+
#[derive(Insertable, AsChangeset)]
20
20
+
#[diesel(table_name = crate::schema::actors)]
21
21
+
#[diesel(check_for_backend(diesel::pg::Pg))]
22
22
+
pub struct NewActor<'a> {
23
23
+
pub did: &'a str,
24
24
+
pub handle: Option<Option<String>>,
25
25
+
pub status: Option<ActorStatus>,
26
26
+
pub last_indexed: Option<NaiveDateTime>,
27
27
+
}
+11
parakeet-db/src/schema.rs
···
1
1
// @generated automatically by Diesel CLI.
2
2
3
3
+
diesel::table! {
4
4
+
actors (did) {
5
5
+
did -> Text,
6
6
+
handle -> Nullable<Text>,
7
7
+
status -> Text,
8
8
+
sync_state -> Text,
9
9
+
repo_rev -> Nullable<Text>,
10
10
+
repo_cid -> Nullable<Text>,
11
11
+
last_indexed -> Nullable<Timestamp>,
12
12
+
}
13
13
+
}
+81
parakeet-db/src/types.rs
···
1
1
+
use diesel::backend::Backend;
2
2
+
use diesel::deserialize::FromSql;
3
3
+
use diesel::pg::Pg;
4
4
+
use diesel::serialize::{Output, ToSql};
5
5
+
use diesel::{AsExpression, FromSqlRow};
6
6
+
7
7
+
#[derive(Debug, AsExpression, FromSqlRow)]
8
8
+
#[diesel(sql_type = diesel::sql_types::Text)]
9
9
+
pub enum ActorStatus {
10
10
+
Active,
11
11
+
Takendown,
12
12
+
Suspended,
13
13
+
Deleted,
14
14
+
Deactivated,
15
15
+
}
16
16
+
17
17
+
impl<DB> FromSql<diesel::sql_types::Text, DB> for ActorStatus
18
18
+
where
19
19
+
DB: Backend,
20
20
+
String: FromSql<diesel::sql_types::Text, DB>,
21
21
+
{
22
22
+
fn from_sql(bytes: DB::RawValue<'_>) -> diesel::deserialize::Result<Self> {
23
23
+
match String::from_sql(bytes)?.as_str() {
24
24
+
"active" => Ok(ActorStatus::Active),
25
25
+
"takendown" => Ok(ActorStatus::Takendown),
26
26
+
"suspended" => Ok(ActorStatus::Suspended),
27
27
+
"deleted" => Ok(ActorStatus::Deleted),
28
28
+
"deactivated" => Ok(ActorStatus::Deactivated),
29
29
+
x => Err(format!("Unrecognized variant {}", x).into()),
30
30
+
}
31
31
+
}
32
32
+
}
33
33
+
34
34
+
impl ToSql<diesel::sql_types::Text, Pg> for ActorStatus {
35
35
+
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result {
36
36
+
let val = match self {
37
37
+
ActorStatus::Active => "active",
38
38
+
ActorStatus::Takendown => "takendown",
39
39
+
ActorStatus::Suspended => "suspended",
40
40
+
ActorStatus::Deleted => "deleted",
41
41
+
ActorStatus::Deactivated => "deactivated",
42
42
+
};
43
43
+
44
44
+
<str as ToSql<diesel::sql_types::Text, Pg>>::to_sql(val, out)
45
45
+
}
46
46
+
}
47
47
+
48
48
+
#[derive(Debug, AsExpression, FromSqlRow)]
49
49
+
#[diesel(sql_type = diesel::sql_types::Text)]
50
50
+
pub enum ActorSyncState {
51
51
+
Synced,
52
52
+
Dirty,
53
53
+
Processing,
54
54
+
}
55
55
+
56
56
+
impl<DB> FromSql<diesel::sql_types::Text, DB> for ActorSyncState
57
57
+
where
58
58
+
DB: Backend,
59
59
+
String: FromSql<diesel::sql_types::Text, DB>,
60
60
+
{
61
61
+
fn from_sql(bytes: DB::RawValue<'_>) -> diesel::deserialize::Result<Self> {
62
62
+
match String::from_sql(bytes)?.as_str() {
63
63
+
"synced" => Ok(ActorSyncState::Synced),
64
64
+
"dirty" => Ok(ActorSyncState::Dirty),
65
65
+
"processing" => Ok(ActorSyncState::Processing),
66
66
+
x => Err(format!("Unrecognized variant {}", x).into()),
67
67
+
}
68
68
+
}
69
69
+
}
70
70
+
71
71
+
impl ToSql<diesel::sql_types::Text, Pg> for ActorSyncState {
72
72
+
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result {
73
73
+
let val = match self {
74
74
+
ActorSyncState::Synced => "synced",
75
75
+
ActorSyncState::Dirty => "dirty",
76
76
+
ActorSyncState::Processing => "processing",
77
77
+
};
78
78
+
79
79
+
<str as ToSql<diesel::sql_types::Text, Pg>>::to_sql(val, out)
80
80
+
}
81
81
+
}