tangled
alpha
login
or
join now
parakeet.at
/
parakeet
63
fork
atom
Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview
atproto
bluesky
rust
appserver
63
fork
atom
overview
issues
12
pulls
pipelines
feat(lexica): move StrongRef and Blob into lexica
mia.omg.lol
6 months ago
d19a50fd
a3349d80
+77
-76
9 changed files
expand all
collapse all
unified
split
Cargo.lock
consumer
src
backfill
mod.rs
db
copy.rs
record.rs
indexer
records.rs
utils.rs
lexica
Cargo.toml
src
lib.rs
utils.rs
+1
Cargo.lock
···
2276
version = "0.1.0"
2277
dependencies = [
2278
"chrono",
0
2279
"serde",
2280
"serde_json",
2281
]
···
2276
version = "0.1.0"
2277
dependencies = [
2278
"chrono",
2279
+
"cid",
2280
"serde",
2281
"serde_json",
2282
]
+3
-12
consumer/src/backfill/mod.rs
···
6
use deadpool_postgres::{Object, Pool, Transaction};
7
use did_resolver::Resolver;
8
use ipld_core::cid::Cid;
0
9
use metrics::counter;
10
use parakeet_db::types::{ActorStatus, ActorSyncState};
11
use redis::aio::MultiplexedConnection;
···
267
268
#[derive(Debug, Default)]
269
struct CopyStore {
270
-
likes: Vec<(
271
-
String,
272
-
records::StrongRef,
273
-
Option<records::StrongRef>,
274
-
DateTime<Utc>,
275
-
)>,
276
posts: Vec<(String, Cid, records::AppBskyFeedPost)>,
277
-
reposts: Vec<(
278
-
String,
279
-
records::StrongRef,
280
-
Option<records::StrongRef>,
281
-
DateTime<Utc>,
282
-
)>,
283
blocks: Vec<(String, String, DateTime<Utc>)>,
284
follows: Vec<(String, String, DateTime<Utc>)>,
285
list_items: Vec<(String, records::AppBskyGraphListItem)>,
···
6
use deadpool_postgres::{Object, Pool, Transaction};
7
use did_resolver::Resolver;
8
use ipld_core::cid::Cid;
9
+
use lexica::StrongRef;
10
use metrics::counter;
11
use parakeet_db::types::{ActorStatus, ActorSyncState};
12
use redis::aio::MultiplexedConnection;
···
268
269
#[derive(Debug, Default)]
270
struct CopyStore {
271
+
likes: Vec<(String, StrongRef, Option<StrongRef>, DateTime<Utc>)>,
0
0
0
0
0
272
posts: Vec<(String, Cid, records::AppBskyFeedPost)>,
273
+
reposts: Vec<(String, StrongRef, Option<StrongRef>, DateTime<Utc>)>,
0
0
0
0
0
274
blocks: Vec<(String, String, DateTime<Utc>)>,
275
follows: Vec<(String, String, DateTime<Utc>)>,
276
list_items: Vec<(String, records::AppBskyGraphListItem)>,
+3
-2
consumer/src/db/copy.rs
···
7
use ipld_core::cid::Cid;
8
use tokio_postgres::binary_copy::BinaryCopyInWriter;
9
use tokio_postgres::types::Type;
0
10
11
// StrongRefs are used in both likes and reposts
12
const STRONGREF_TYPES: &[Type] = &[
···
20
];
21
type StrongRefRow = (
22
String,
23
-
records::StrongRef,
24
-
Option<records::StrongRef>,
25
DateTime<Utc>,
26
);
27
···
7
use ipld_core::cid::Cid;
8
use tokio_postgres::binary_copy::BinaryCopyInWriter;
9
use tokio_postgres::types::Type;
10
+
use lexica::StrongRef;
11
12
// StrongRefs are used in both likes and reposts
13
const STRONGREF_TYPES: &[Type] = &[
···
21
];
22
type StrongRefRow = (
23
String,
24
+
StrongRef,
25
+
Option<StrongRef>,
26
DateTime<Utc>,
27
);
28
+5
-5
consumer/src/db/record.rs
···
371
let stmt = conn.prepare("INSERT INTO post_embed_images (post_uri, seq, cid, mime_type, alt, width, height) VALUES ($1, $2, $3, $4, $5, $6, $7)").await?;
372
373
for (idx, image) in embed.images.iter().enumerate() {
374
-
let cid = image.image.r#ref.to_string();
375
let width = image.aspect_ratio.as_ref().map(|v| v.width);
376
let height = image.aspect_ratio.as_ref().map(|v| v.height);
377
···
398
post: &str,
399
embed: AppBskyEmbedVideo,
400
) -> PgExecResult {
401
-
let cid = embed.video.r#ref.to_string();
402
let width = embed.aspect_ratio.as_ref().map(|v| v.width);
403
let height = embed.aspect_ratio.as_ref().map(|v| v.height);
404
···
411
let stmt = conn.prepare_cached("INSERT INTO post_embed_video_captions (post_uri, cid, mime_type, language) VALUES ($1, $2, $3, $4)").await?;
412
413
for caption in captions {
414
-
let cid = caption.file.r#ref.to_string();
415
conn.execute(
416
&stmt,
417
&[&post, &cid, &caption.file.mime_type, &caption.lang],
···
429
embed: AppBskyEmbedExternal,
430
) -> PgExecResult {
431
let thumb_mime = embed.external.thumb.as_ref().map(|v| v.mime_type.clone());
432
-
let thumb_cid = embed.external.thumb.as_ref().map(|v| v.r#ref.to_string());
433
434
conn.execute(
435
"INSERT INTO post_embed_ext (post_uri, uri, title, description, thumb_mime_type, thumb_cid) VALUES ($1, $2, $3, $4, $5, $6)",
···
634
let record = serde_json::to_value(&rec).unwrap();
635
let thumb = rec.embed.as_ref().and_then(|v| v.external.thumb.clone());
636
let thumb_mime = thumb.as_ref().map(|v| v.mime_type.clone());
637
-
let thumb_cid = thumb.as_ref().map(|v| v.r#ref.to_string());
638
639
conn.execute(
640
include_str!("sql/status_upsert.sql"),
···
371
let stmt = conn.prepare("INSERT INTO post_embed_images (post_uri, seq, cid, mime_type, alt, width, height) VALUES ($1, $2, $3, $4, $5, $6, $7)").await?;
372
373
for (idx, image) in embed.images.iter().enumerate() {
374
+
let cid = image.image.cid.to_string();
375
let width = image.aspect_ratio.as_ref().map(|v| v.width);
376
let height = image.aspect_ratio.as_ref().map(|v| v.height);
377
···
398
post: &str,
399
embed: AppBskyEmbedVideo,
400
) -> PgExecResult {
401
+
let cid = embed.video.cid.to_string();
402
let width = embed.aspect_ratio.as_ref().map(|v| v.width);
403
let height = embed.aspect_ratio.as_ref().map(|v| v.height);
404
···
411
let stmt = conn.prepare_cached("INSERT INTO post_embed_video_captions (post_uri, cid, mime_type, language) VALUES ($1, $2, $3, $4)").await?;
412
413
for caption in captions {
414
+
let cid = caption.file.cid.to_string();
415
conn.execute(
416
&stmt,
417
&[&post, &cid, &caption.file.mime_type, &caption.lang],
···
429
embed: AppBskyEmbedExternal,
430
) -> PgExecResult {
431
let thumb_mime = embed.external.thumb.as_ref().map(|v| v.mime_type.clone());
432
+
let thumb_cid = embed.external.thumb.as_ref().map(|v| v.cid.to_string());
433
434
conn.execute(
435
"INSERT INTO post_embed_ext (post_uri, uri, title, description, thumb_mime_type, thumb_cid) VALUES ($1, $2, $3, $4, $5, $6)",
···
634
let record = serde_json::to_value(&rec).unwrap();
635
let thumb = rec.embed.as_ref().and_then(|v| v.external.thumb.clone());
636
let thumb_mime = thumb.as_ref().map(|v| v.mime_type.clone());
637
+
let thumb_cid = thumb.as_ref().map(|v| v.cid.to_string());
638
639
conn.execute(
640
include_str!("sql/status_upsert.sql"),
+1
-22
consumer/src/indexer/records.rs
···
1
use crate::utils;
2
use chrono::{DateTime, Utc};
3
-
use ipld_core::cid::Cid;
4
use lexica::app_bsky::actor::{ChatAllowIncoming, ProfileAllowSubscriptions, Status};
5
use lexica::app_bsky::embed::AspectRatio;
6
use lexica::app_bsky::labeler::LabelerPolicy;
7
use lexica::app_bsky::richtext::FacetMain;
8
use lexica::com_atproto::label::SelfLabels;
9
use lexica::com_atproto::moderation::{ReasonType, SubjectType};
0
10
use serde::{Deserialize, Serialize};
11
use serde_with::serde_as;
12
-
13
-
#[derive(Clone, Debug, Deserialize, Serialize)]
14
-
pub struct StrongRef {
15
-
#[serde(
16
-
deserialize_with = "utils::cid_from_string",
17
-
serialize_with = "utils::cid_as_str"
18
-
)]
19
-
pub cid: Cid,
20
-
pub uri: String,
21
-
}
22
-
23
-
#[derive(Clone, Debug, Deserialize, Serialize)]
24
-
#[serde(tag = "$type")]
25
-
#[serde(rename = "blob")]
26
-
#[serde(rename_all = "camelCase")]
27
-
pub struct Blob {
28
-
pub mime_type: String,
29
-
#[serde(serialize_with = "utils::cid_as_link")]
30
-
pub r#ref: Cid,
31
-
pub size: i32,
32
-
}
33
34
#[derive(Debug, Deserialize, Serialize)]
35
#[serde(rename_all = "camelCase")]
···
1
use crate::utils;
2
use chrono::{DateTime, Utc};
0
3
use lexica::app_bsky::actor::{ChatAllowIncoming, ProfileAllowSubscriptions, Status};
4
use lexica::app_bsky::embed::AspectRatio;
5
use lexica::app_bsky::labeler::LabelerPolicy;
6
use lexica::app_bsky::richtext::FacetMain;
7
use lexica::com_atproto::label::SelfLabels;
8
use lexica::com_atproto::moderation::{ReasonType, SubjectType};
9
+
use lexica::{Blob, StrongRef};
10
use serde::{Deserialize, Serialize};
11
use serde_with::serde_as;
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
12
13
#[derive(Debug, Deserialize, Serialize)]
14
#[serde(rename_all = "camelCase")]
+5
-34
consumer/src/utils.rs
···
1
-
use ipld_core::cid::Cid;
2
-
use serde::{Deserialize, Deserializer, Serialize, Serializer};
3
4
// see https://deer.social/profile/did:plc:63y3oh7iakdueqhlj6trojbq/post/3ltuv4skhqs2h
5
pub fn safe_string<'de, D: Deserializer<'de>>(deserializer: D) -> Result<String, D::Error> {
···
8
Ok(str.replace('\u{0000}', ""))
9
}
10
11
-
pub fn cid_from_string<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Cid, D::Error> {
12
-
let str = String::deserialize(deserializer)?;
13
-
14
-
Cid::try_from(str).map_err(serde::de::Error::custom)
15
-
}
16
-
17
-
pub fn cid_as_str<S>(inp: &Cid, serializer: S) -> Result<S::Ok, S::Error>
18
-
where
19
-
S: Serializer,
20
-
{
21
-
inp.to_string().serialize(serializer)
22
-
}
23
-
24
-
#[derive(Debug, Deserialize, Serialize)]
25
-
pub struct LinkRef {
26
-
#[serde(rename = "$link")]
27
-
link: String,
28
-
}
29
-
30
-
pub fn cid_as_link<S>(inp: &Cid, serializer: S) -> Result<S::Ok, S::Error>
31
-
where
32
-
S: Serializer,
33
-
{
34
-
LinkRef {
35
-
link: inp.to_string(),
36
-
}
37
-
.serialize(serializer)
38
-
}
39
-
40
-
pub fn blob_ref(blob: Option<crate::indexer::records::Blob>) -> Option<String> {
41
-
blob.map(|blob| blob.r#ref.to_string())
42
}
43
44
pub fn strongref_to_parts(
45
-
strongref: Option<&crate::indexer::records::StrongRef>,
46
) -> (Option<String>, Option<String>) {
47
strongref
48
.map(|sr| (sr.uri.clone(), sr.cid.to_string()))
···
1
+
use serde::{Deserialize, Deserializer};
2
+
use lexica::{Blob, StrongRef};
3
4
// see https://deer.social/profile/did:plc:63y3oh7iakdueqhlj6trojbq/post/3ltuv4skhqs2h
5
pub fn safe_string<'de, D: Deserializer<'de>>(deserializer: D) -> Result<String, D::Error> {
···
8
Ok(str.replace('\u{0000}', ""))
9
}
10
11
+
pub fn blob_ref(blob: Option<Blob>) -> Option<String> {
12
+
blob.map(|blob| blob.cid.to_string())
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
13
}
14
15
pub fn strongref_to_parts(
16
+
strongref: Option<&StrongRef>,
17
) -> (Option<String>, Option<String>) {
18
strongref
19
.map(|sr| (sr.uri.clone(), sr.cid.to_string()))
+1
lexica/Cargo.toml
···
5
6
[dependencies]
7
chrono = { version = "0.4.39", features = ["serde"] }
0
8
serde = { version = "1.0.216", features = ["derive"] }
9
serde_json = "1.0.134"
···
5
6
[dependencies]
7
chrono = { version = "0.4.39", features = ["serde"] }
8
+
cid = { version = "0.11", features = ["serde"] }
9
serde = { version = "1.0.216", features = ["derive"] }
10
serde_json = "1.0.134"
+27
-1
lexica/src/lib.rs
···
1
-
use serde::Serialize;
0
0
0
2
3
pub mod app_bsky;
4
pub mod com_atproto;
0
5
6
#[derive(Clone, Debug, Serialize)]
7
pub struct JsonBytes {
8
#[serde(rename = "$bytes")]
9
pub bytes: String,
10
}
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
use cid::Cid;
2
+
use serde::{Deserialize, Serialize};
3
+
4
+
pub use utils::LinkRef;
5
6
pub mod app_bsky;
7
pub mod com_atproto;
8
+
mod utils;
9
10
#[derive(Clone, Debug, Serialize)]
11
pub struct JsonBytes {
12
#[serde(rename = "$bytes")]
13
pub bytes: String,
14
}
15
+
16
+
#[derive(Clone, Debug, Deserialize, Serialize)]
17
+
pub struct StrongRef {
18
+
#[serde(
19
+
deserialize_with = "utils::cid_from_string",
20
+
serialize_with = "utils::cid_as_str"
21
+
)]
22
+
pub cid: Cid,
23
+
pub uri: String,
24
+
}
25
+
26
+
#[derive(Clone, Debug, Deserialize, Serialize)]
27
+
#[serde(tag = "$type")]
28
+
#[serde(rename = "blob")]
29
+
#[serde(rename_all = "camelCase")]
30
+
pub struct Blob {
31
+
pub mime_type: String,
32
+
#[serde(rename = "ref")]
33
+
#[serde(serialize_with = "utils::cid_as_link")]
34
+
pub cid: Cid,
35
+
pub size: i32,
36
+
}
+31
lexica/src/utils.rs
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
use cid::Cid;
2
+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
3
+
4
+
pub fn cid_from_string<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Cid, D::Error> {
5
+
let str = String::deserialize(deserializer)?;
6
+
7
+
Cid::try_from(str).map_err(serde::de::Error::custom)
8
+
}
9
+
10
+
pub fn cid_as_str<S>(inp: &Cid, serializer: S) -> Result<S::Ok, S::Error>
11
+
where
12
+
S: Serializer,
13
+
{
14
+
inp.to_string().serialize(serializer)
15
+
}
16
+
17
+
#[derive(Debug, Deserialize, Serialize)]
18
+
pub struct LinkRef {
19
+
#[serde(rename = "$link")]
20
+
link: String,
21
+
}
22
+
23
+
pub fn cid_as_link<S>(inp: &Cid, serializer: S) -> Result<S::Ok, S::Error>
24
+
where
25
+
S: Serializer,
26
+
{
27
+
LinkRef {
28
+
link: inp.to_string(),
29
+
}
30
+
.serialize(serializer)
31
+
}