this repo has no description

feature: supercell is a configurable feed generator

Signed-off-by: Nick Gerakines <12125+ngerakines@users.noreply.github.com>

+1988 -7
+5 -6
.gitignore
··· 13 13 # MSVC Windows builds of rustc generate these, which store debugging information 14 14 *.pdb 15 15 16 - # RustRover 17 - # JetBrains specific template is maintained in a separate JetBrains.gitignore that can 18 - # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore 19 - # and can be added to the global gitignore or merged into this file. For a more nuclear 20 - # option (not recommended) you can uncomment the following to ignore the entire idea folder. 21 - #.idea/ 16 + # Project specific files 17 + config.yml 18 + development.db 19 + jetstream_zstd_dictionary 20 +
+39
Cargo.toml
··· 1 + [package] 2 + name = "supercell" 3 + version = "0.1.0" 4 + edition = "2021" 5 + 6 + [profile.release] 7 + lto = true 8 + strip = true 9 + 10 + [dependencies] 11 + anyhow = "1.0.88" 12 + async-trait = "0.1.82" 13 + axum-extra = { version = "0.9.4", features = ["query"] } 14 + axum = { version = "0.7.5", features = ["http2", "macros"] } 15 + base64 = "0.22.1" 16 + chrono-tz = "0.10.0" 17 + chrono = { version = "0.4.38", default-features = false, features = ["std", "alloc", "now"] } 18 + ecdsa = { version = "0.16.9", features = ["std", "signing", "verifying"] } 19 + futures-util = { version = "0.3.31", features = ["sink"] } 20 + headers = "0.4.0" 21 + http = "1.1.0" 22 + k256 = { version = "0.13.4", features = ["ecdsa"] } 23 + multibase = "0.9.1" 24 + p256 = { version = "0.13.2", features = ["ecdsa"] } 25 + serde_json_path = "0.7.1" 26 + serde_json = { version = "1.0.132", features = ["alloc"] } 27 + serde = { version = "1.0.214", features = ["alloc", "derive"] } 28 + serde_yaml = "0.9.34" 29 + sqlx-cli = { version = "0.8.2", features = ["sqlite"] } 30 + sqlx = { version = "0.8.2", features = ["chrono", "sqlite"] } 31 + thiserror = "1.0.63" 32 + tokio-util = { version = "0.7.12", features = ["net", "rt", "tracing"] } 33 + tokio = { version = "1.41.0", features = ["bytes", "macros", "net", "rt", "rt-multi-thread", "signal", "sync"] } 34 + tokio-websockets = { version = "0.10.1", features = ["client", "native-tls", "rand", "ring"] } 35 + tower-http = { version = "0.5.2", features = ["cors", "fs", "timeout", "trace", "tracing"] } 36 + tower = { version = "0.5.1", features = ["limit", "timeout", "tokio", "tracing"] } 37 + tracing-subscriber = { version = "0.3.18", features = ["env-filter", "chrono", "json"] } 38 + tracing = { version = "0.1.40", features = ["async-await", "log", "valuable"] } 39 + zstd = "0.13.2"
+45
Dockerfile
··· 1 + # syntax=docker/dockerfile:1.4 2 + FROM rust:1-bookworm AS build 3 + 4 + RUN cargo install sqlx-cli@0.8.2 --no-default-features --features sqlite 5 + RUN cargo install sccache --version ^0.8 6 + ENV RUSTC_WRAPPER=sccache SCCACHE_DIR=/sccache 7 + 8 + RUN USER=root cargo new --bin supercell 9 + RUN mkdir -p /app/ 10 + WORKDIR /app/ 11 + 12 + ARG GIT_HASH 13 + ENV GIT_HASH=$GIT_HASH 14 + 15 + RUN --mount=type=bind,source=src,target=src \ 16 + --mount=type=bind,source=migrations,target=migrations \ 17 + --mount=type=bind,source=Cargo.toml,target=Cargo.toml \ 18 + --mount=type=bind,source=Cargo.lock,target=Cargo.lock \ 19 + --mount=type=cache,target=/app/target/ \ 20 + --mount=type=cache,target=$SCCACHE_DIR,sharing=locked \ 21 + --mount=type=cache,target=/usr/local/cargo/registry/ \ 22 + <<EOF 23 + set -e 24 + cargo build --locked --release --bin supercell --target-dir . 25 + EOF 26 + 27 + FROM debian:bookworm-slim 28 + 29 + RUN set -x \ 30 + && apt-get update \ 31 + && apt-get install ca-certificates -y 32 + 33 + RUN groupadd -g 1508 -r supercell && useradd -u 1509 -r -g supercell -d /var/lib/supercell -m supercell 34 + 35 + ENV RUST_LOG=info 36 + ENV RUST_BACKTRACE=full 37 + 38 + COPY --from=build /app/release/supercell /var/lib/supercell/ 39 + 40 + RUN chown -R supercell:supercell /var/lib/supercell 41 + 42 + WORKDIR /var/lib/supercell 43 + 44 + USER supercell 45 + ENTRYPOINT ["sh", "-c", "/var/lib/supercell/supercell"]
+101 -1
README.md
··· 1 - # supercell 1 + # supercell 2 + 3 + > A supercell is a thunderstorm characterized by the presence of a mesocyclone, a deep, persistently rotating updraft. 4 + 5 + Supercell is a lightweight and configurable atproto feed generator. 6 + 7 + # Configuration 8 + 9 + The following environment variables are used: 10 + 11 + * `HTTP_PORT` - The port to listen on for HTTP requests. 12 + * `EXTERNAL_BASE` - The hostname of the feed generator. 13 + * `DATABASE_URL` - The URL of the database to use. 14 + * `JETSTREAM_HOSTNAME` - The hostname of the JetStream server to consume events from. 15 + * `ZSTD_DICTIONARY` - The path to the ZSTD dictionary to use. 16 + * `CONSUMER_TASK_ENABLE` - Whether or not to enable the consumer tasks. 17 + * `FEEDS` - The path to the feeds configuration file. 18 + * `RUST_LOG` - Logging configuration. Defaults to `supercell=debug,info` 19 + 20 + The feed configuration file is a YAML file that contains the feeds to serve and how to match events to the feed. It supports a variable number of matchers with different rules. Matching is done in order and uses json path plus the matcher implementation. 21 + 22 + ```yaml 23 + feeds: 24 + - uri: "at://did:plc:4acsffvbo4niovge362ptijz/app.bsky.feed.generator/3la5azib4xe2c" 25 + name: "Smoke Signal Support" 26 + description: "The Smoke Signal Support feed." 27 + allow: ["did:plc:cbkjy5n7bk3ax2wplmtjofq2"] 28 + deny: "at://did:plc:4acsffvbo4niovge362ptijz/app.bsky.feed.post/3la5bsyzj3j23" 29 + matchers: 30 + - path: "$.did" 31 + value: "did:plc:tgudj2fjm77pzkuawquqhsxm" 32 + type: equal 33 + - path: "$.commit.record.text" 34 + values: ["smoke", "signal"] 35 + type: sequence 36 + - path: "$.commit.record.facets[*].features[?(@['$type'] == 'app.bsky.richtext.facet#tag')].tag" 37 + values: ["smoke", "signal"] 38 + type: sequence 39 + - path: "$.commit.record.reply.parent.uri" 40 + value: "at://did:plc:tgudj2fjm77pzkuawquqhsxm/app.bsky.feed.post/" 41 + type: prefix 42 + - path: "$.commit.record.reply.root.uri" 43 + value: "at://did:plc:tgudj2fjm77pzkuawquqhsxm/app.bsky.feed.post/" 44 + type: prefix 45 + - path: "$.commit.record.facets[*].features[?(@['$type'] == 'app.bsky.richtext.facet#link')].uri" 46 + value: "https://smokesignal.events/" 47 + type: prefix 48 + - path: "$.commit.record.facets[*].features[?(@['$type'] == 'app.bsky.richtext.facet#mention')].did" 49 + value: "did:plc:tgudj2fjm77pzkuawquqhsxm" 50 + type: equal 51 + - path: "$.commit.record.embed.external.uri" 52 + value: "https://smokesignal.events/" 53 + type: prefix 54 + - path: "$.commit.record.embed.record.uri" 55 + value: "at://did:plc:tgudj2fjm77pzkuawquqhsxm/" 56 + type: prefix 57 + ``` 58 + 59 + The `equal` matcher performs an exact string match matched paths. 60 + 61 + The `prefix` matcher performs a prefix string match on matched paths. Given the value "foo bar baz", the following prefixes would match: "foo", "foo ", etc. 62 + 63 + The `sequence` matcher performs a sequence string match on matched paths. This is used to match a list of values in order making flexible ordered matching without needing regex or complex reverse lookups. 64 + 65 + Consider the example string "The quick brown fox jumps over the lazy dog". The following sequences would match: 66 + 67 + * "the" "quick" 68 + * "brown" 69 + * "brow" "fox" "lazy" "dog" 70 + * "the" "dog" 71 + 72 + JSONPath is a query language for JSON. When used with matchers, JSONPath will use all nodes as inputs and each matcher will match against any of the values. 73 + 74 + For example, the following json would match the `equal` matcher with both `$.text` and `$.tags.*`: 75 + 76 + ```json 77 + { 78 + "text": "foo", 79 + "tags": ["foo", "bar"], 80 + } 81 + ``` 82 + 83 + The site [https://jsonpath.com/](https://jsonpath.com/) is a great resource for testing JSONPath queries. 84 + 85 + See the `config.example.yml` file for additional examples. 86 + 87 + # TODO 88 + 89 + * use i64, it's fine 90 + * look up keys on startup 91 + * possible scoring function for queries 92 + * add likes 93 + * support deletes 94 + * document how to register a feed 95 + 96 + # License 97 + 98 + This project is open source under the MIT license. 99 + 100 + Copyright (c) 2023 Astrenox Cooperative. All Rights Reserved. 101 +
+19
config.example.yml
··· 1 + feed: 2 + - uri: "at://did:plc:4acsffvbo4niovge362ptijz/xyz/abc1234" 3 + name: "Smoke Signal Support" 4 + description: "The Smoke Signal Support feed." 5 + allow: ["did:plc:cbkjy5n7bk3ax2wplmtjofq2", "did:plc:fjr24tyxkpi3xqenws7anfmj", "did:plc:4acsffvbo4niovge362ptijz"] 6 + deny: "at://did:plc:4acsffvbo4niovge362ptijz/app.bsky.feed.post/denied" 7 + matchers: 8 + - path: "$.did" 9 + value: "did:plc:tgudj2fjm77pzkuawquqhsxm" 10 + type: equal 11 + - path: "$.commit.record.text" 12 + values: ["smoke", "signal"] 13 + type: sequence 14 + - path: "$.commit.record.reply.parent.uri" 15 + value: "at://did:plc:tgudj2fjm77pzkuawquqhsxm/app.bsky.feed.post/" 16 + type: prefix 17 + - path: "$.commit.record.reply.root.uri" 18 + value: "at://did:plc:tgudj2fjm77pzkuawquqhsxm/app.bsky.feed.post/" 19 + type: prefix
+41
create-release.sh
··· 1 + #!/bin/sh 2 + 3 + IMAGE=$1 4 + OLD_VERSION=$2 5 + VERSION=$3 6 + 7 + git checkout main 8 + git pull 9 + 10 + git-cliff --tag=${VERSION} --strip=header ${OLD_VERSION}.. > .tmp.release_info 11 + git-cliff -o --tag=${VERSION} --strip=header 12 + 13 + sed -i -e "s/^version = \"${OLD_VERSION}\"/version = \"${VERSION}\"/" Cargo.toml 14 + 15 + cargo build 16 + 17 + git add Cargo.lock Cargo.toml CHANGELOG.md 18 + 19 + git commit -m "release: ${VERSION}" -s 20 + 21 + git tag -a "${VERSION}" -F .tmp.release_info 22 + 23 + git push 24 + git push --tags 25 + 26 + gh release create --verify-tag -F .tmp.release_info -t "${VERSION}" ${VERSION} 27 + 28 + git pull 29 + 30 + git checkout ${VERSION} 31 + 32 + docker build --progress=plain -t "${IMAGE}:${VERSION}" . 33 + 34 + docker tag "${IMAGE}:${VERSION}" "${IMAGE}:latest" 35 + docker push "${IMAGE}:${VERSION}" 36 + docker push "${IMAGE}:latest" 37 + 38 + # ssh supercell-host "sudo docker pull ${IMAGE}:latest" 39 + # ssh supercell-host "sudo docker pull ${IMAGE}:${VERSION}" 40 + 41 + git checkout main
+15
dev-server.sh
··· 1 + #!/bin/sh 2 + 3 + export HTTP_PORT=4050 4 + export EXTERNAL_BASE=feeds.smokesignal.events 5 + export DATABASE_URL=sqlite://development.db 6 + export JETSTREAM_HOSTNAME=jetstream1.us-east.bsky.network 7 + export ZSTD_DICTIONARY=$(pwd)/jetstream_zstd_dictionary 8 + export CONSUMER_TASK_ENABLE=true 9 + export FEEDS=$(pwd)/config.yml 10 + 11 + touch development.db 12 + sqlx migrate run --database-url sqlite://development.db 13 + 14 + RUST_BACKTRACE=1 RUST_LOG=debug RUST_LIB_BACKTRACE=1 cargo run --bin supercell 15 +
+15
etc/docker-compose.yml
··· 1 + version: '3.9' 2 + services: 3 + supercell: 4 + labels: [ "com.centurylinklabs.watchtower.scope=supercell" ] 5 + image: supercell:latest 6 + restart: unless-stopped 7 + ports: 8 + - "127.0.0.1:4050:4050" 9 + volumes: 10 + - type: bind 11 + source: /var/lib/supercell/database.db 12 + target: /var/lib/supercell/database.db 13 + env_file: 14 + - /var/lib/supercell/production.env 15 +
+14
etc/release.sh
··· 1 + #!/bin/sh 2 + 3 + VERSION=$1 4 + NOW=$(date +%s) 5 + HOME=/var/lib/supercell 6 + 7 + cp ${HOME}/production.env ${HOME}/backups/${NOW}-production.env 8 + 9 + sqlite3 ${HOME}/database.db ".backup '${HOME}/backups/${NOW}-database.db'" 10 + 11 + chown supercell:supercell ${HOME}/backups/* 12 + 13 + systemctl restart supercell 14 +
+16
etc/supercell.service
··· 1 + [Unit] 2 + Description=Supercell 3 + Documentation=https://github.com/astrenoxcoop/supercell 4 + Requires=docker.service 5 + After=docker.service 6 + 7 + [Service] 8 + Type=oneshot 9 + RemainAfterExit=yes 10 + WorkingDirectory=/var/lib/supercell 11 + ExecStart=/usr/bin/docker compose --file /var/lib/supercell/docker-compose.yml up --detach 12 + ExecStop=/usr/bin/docker compose --file /var/lib/supercell/docker-compose.yml stop 13 + 14 + [Install] 15 + WantedBy=default.target 16 +
+5
migrations/20241103180245_init.down.sql
··· 1 + -- Add down migration script here 2 + 3 + DROP TABLE feed_content; 4 + DROP TABLE consumer_control; 5 +
+27
migrations/20241103180245_init.up.sql
··· 1 + -- Add up migration script here 2 + 3 + CREATE TABLE feed_content ( 4 + feed_id TEXT NOT NULL, 5 + uri TEXT NOT NULL, 6 + indexed_at INTEGER NOT NULL, 7 + indexed_at_more INTEGER NOT NULL, 8 + cid TEXT NOT NULL, 9 + updated_at DATETIME NOT NULL DEFAULT (datetime('now')), 10 + PRIMARY KEY (feed_id, uri) 11 + ); 12 + 13 + CREATE INDEX feed_content_idx_feed ON feed_content(feed_id, indexed_at DESC, indexed_at_more DESC, cid DESC); 14 + 15 + CREATE TABLE consumer_control ( 16 + source TEXT NOT NULL, 17 + time_us VARCHAR NOT NULL, 18 + updated_at DATETIME NOT NULL DEFAULT (datetime('now')), 19 + PRIMARY KEY (source) 20 + ); 21 + 22 + CREATE TABLE verification_method_cache ( 23 + did TEXT NOT NULL, 24 + multikey TEXT NOT NULL, 25 + updated_at DATETIME NOT NULL DEFAULT (datetime('now')), 26 + PRIMARY KEY (did) 27 + );
+131
src/bin/supercell.rs
··· 1 + use anyhow::Result; 2 + use sqlx::SqlitePool; 3 + use std::collections::HashMap; 4 + use std::collections::HashSet; 5 + use std::env; 6 + use tokio::net::TcpListener; 7 + use tokio::signal; 8 + use tokio_util::{sync::CancellationToken, task::TaskTracker}; 9 + use tracing_subscriber::prelude::*; 10 + 11 + use supercell::consumer::ConsumerTask; 12 + use supercell::consumer::ConsumerTaskConfig; 13 + use supercell::http::context::WebContext; 14 + use supercell::http::server::build_router; 15 + 16 + #[tokio::main] 17 + async fn main() -> Result<()> { 18 + tracing_subscriber::registry() 19 + .with(tracing_subscriber::EnvFilter::new( 20 + std::env::var("RUST_LOG").unwrap_or_else(|_| "supercell=debug,info".into()), 21 + )) 22 + .with(tracing_subscriber::fmt::layer().pretty()) 23 + .init(); 24 + 25 + let version = supercell::config::version()?; 26 + 27 + env::args().for_each(|arg| { 28 + if arg == "--version" { 29 + println!("{}", version); 30 + std::process::exit(0); 31 + } 32 + }); 33 + 34 + let config = supercell::config::Config::new()?; 35 + 36 + let pool = SqlitePool::connect(&config.database_url).await?; 37 + sqlx::migrate!().run(&pool).await?; 38 + 39 + let feeds: HashMap<String, (String, HashSet<String>)> = config 40 + .feeds 41 + .feeds 42 + .iter() 43 + .map(|feed| (feed.uri.clone(), (feed.deny.clone(), feed.allow.clone()))) 44 + .collect(); 45 + 46 + let web_context = WebContext::new(pool.clone(), config.external_base.as_str(), feeds); 47 + 48 + let app = build_router(web_context.clone()); 49 + 50 + let tracker = TaskTracker::new(); 51 + let token = CancellationToken::new(); 52 + 53 + { 54 + let tracker = tracker.clone(); 55 + let inner_token = token.clone(); 56 + 57 + let ctrl_c = async { 58 + signal::ctrl_c() 59 + .await 60 + .expect("failed to install Ctrl+C handler"); 61 + }; 62 + 63 + let terminate = async { 64 + signal::unix::signal(signal::unix::SignalKind::terminate()) 65 + .expect("failed to install signal handler") 66 + .recv() 67 + .await; 68 + }; 69 + 70 + tokio::spawn(async move { 71 + tokio::select! { 72 + () = inner_token.cancelled() => { }, 73 + _ = terminate => {}, 74 + _ = ctrl_c => {}, 75 + } 76 + 77 + tracker.close(); 78 + inner_token.cancel(); 79 + }); 80 + } 81 + 82 + { 83 + let inner_config = config.clone(); 84 + let task_enable = *inner_config.consumer_task_enable.as_ref(); 85 + if task_enable { 86 + let consumer_task_config = ConsumerTaskConfig { 87 + zstd_dictionary_location: inner_config.zstd_dictionary.clone(), 88 + jetstream_hostname: inner_config.jetstream_hostname.clone(), 89 + feeds: inner_config.feeds.clone(), 90 + }; 91 + let task = ConsumerTask::new(pool.clone(), consumer_task_config, token.clone())?; 92 + let inner_token = token.clone(); 93 + tracker.spawn(async move { 94 + if let Err(err) = task.run_background().await { 95 + tracing::warn!(error = ?err, "consumer task error"); 96 + } 97 + inner_token.cancel(); 98 + }); 99 + } 100 + } 101 + 102 + { 103 + let inner_config = config.clone(); 104 + let http_port = *inner_config.http_port.as_ref(); 105 + let inner_token = token.clone(); 106 + tracker.spawn(async move { 107 + let listener = TcpListener::bind(&format!("0.0.0.0:{}", http_port)) 108 + .await 109 + .unwrap(); 110 + 111 + let shutdown_token = inner_token.clone(); 112 + let result = axum::serve(listener, app) 113 + .with_graceful_shutdown(async move { 114 + tokio::select! { 115 + () = shutdown_token.cancelled() => { } 116 + } 117 + tracing::info!("axum graceful shutdown complete"); 118 + }) 119 + .await; 120 + if let Err(err) = result { 121 + tracing::error!("axum task failed: {}", err); 122 + } 123 + 124 + inner_token.cancel(); 125 + }); 126 + } 127 + 128 + tracker.wait().await; 129 + 130 + Ok(()) 131 + }
+187
src/config.rs
··· 1 + use std::collections::HashSet; 2 + 3 + use anyhow::{anyhow, Result}; 4 + use serde::Deserialize; 5 + 6 + #[derive(Clone, Deserialize)] 7 + pub struct Feeds { 8 + pub feeds: Vec<Feed>, 9 + } 10 + 11 + #[derive(Clone, Deserialize)] 12 + pub struct Feed { 13 + pub uri: String, 14 + pub name: String, 15 + pub description: String, 16 + pub allow: HashSet<String>, 17 + pub deny: String, 18 + pub matchers: Vec<Matcher>, 19 + } 20 + 21 + #[derive(Clone, Deserialize)] 22 + #[serde(tag = "type")] 23 + pub enum Matcher { 24 + #[serde(rename = "equal")] 25 + Equal { path: String, value: String }, 26 + 27 + #[serde(rename = "prefix")] 28 + Prefix { path: String, value: String }, 29 + 30 + #[serde(rename = "sequence")] 31 + Sequence { path: String, values: Vec<String> }, 32 + } 33 + 34 + #[derive(Clone)] 35 + pub struct HttpPort(u16); 36 + 37 + #[derive(Clone)] 38 + pub struct CertificateBundles(Vec<String>); 39 + 40 + #[derive(Clone)] 41 + pub struct TaskEnable(bool); 42 + 43 + #[derive(Clone)] 44 + pub struct Config { 45 + pub version: String, 46 + pub http_port: HttpPort, 47 + pub external_base: String, 48 + pub database_url: String, 49 + pub certificate_bundles: CertificateBundles, 50 + pub consumer_task_enable: TaskEnable, 51 + pub user_agent: String, 52 + pub zstd_dictionary: String, 53 + pub jetstream_hostname: String, 54 + pub feeds: Feeds, 55 + } 56 + 57 + impl Config { 58 + pub fn new() -> Result<Self> { 59 + let http_port: HttpPort = default_env("HTTP_PORT", "4040").try_into()?; 60 + let external_base = require_env("EXTERNAL_BASE")?; 61 + 62 + let database_url = default_env("DATABASE_URL", "sqlite://development.db"); 63 + 64 + let certificate_bundles: CertificateBundles = 65 + optional_env("CERTIFICATE_BUNDLES").try_into()?; 66 + 67 + let jetstream_hostname = require_env("JETSTREAM_HOSTNAME")?; 68 + let zstd_dictionary = require_env("ZSTD_DICTIONARY")?; 69 + 70 + let consumer_task_enable: TaskEnable = 71 + default_env("CONSUMER_TASK_ENABLE", "false").try_into()?; 72 + 73 + let default_user_agent = format!( 74 + "supercell ({}; +https://github.com/astrenoxcoop/supercell)", 75 + version()? 76 + ); 77 + 78 + let user_agent = default_env("USER_AGENT", &default_user_agent); 79 + 80 + let feeds: Feeds = require_env("FEEDS")?.try_into()?; 81 + 82 + Ok(Self { 83 + version: version()?, 84 + http_port, 85 + external_base, 86 + database_url, 87 + certificate_bundles, 88 + consumer_task_enable, 89 + user_agent, 90 + jetstream_hostname, 91 + zstd_dictionary, 92 + feeds, 93 + }) 94 + } 95 + } 96 + 97 + fn require_env(name: &str) -> Result<String> { 98 + std::env::var(name) 99 + .map_err(|err| anyhow::Error::new(err).context(anyhow!("{} must be set", name))) 100 + } 101 + 102 + fn optional_env(name: &str) -> String { 103 + std::env::var(name).unwrap_or("".to_string()) 104 + } 105 + 106 + fn default_env(name: &str, default_value: &str) -> String { 107 + std::env::var(name).unwrap_or(default_value.to_string()) 108 + } 109 + 110 + pub fn version() -> Result<String> { 111 + option_env!("GIT_HASH") 112 + .or(option_env!("CARGO_PKG_VERSION")) 113 + .map(|val| val.to_string()) 114 + .ok_or(anyhow!("one of GIT_HASH or CARGO_PKG_VERSION must be set")) 115 + } 116 + 117 + impl TryFrom<String> for HttpPort { 118 + type Error = anyhow::Error; 119 + fn try_from(value: String) -> Result<Self, Self::Error> { 120 + if value.is_empty() { 121 + Ok(Self(80)) 122 + } else { 123 + value.parse::<u16>().map(Self).map_err(|err| { 124 + anyhow::Error::new(err).context(anyhow!("parsing PORT into u16 failed")) 125 + }) 126 + } 127 + } 128 + } 129 + 130 + impl AsRef<u16> for HttpPort { 131 + fn as_ref(&self) -> &u16 { 132 + &self.0 133 + } 134 + } 135 + 136 + impl TryFrom<String> for CertificateBundles { 137 + type Error = anyhow::Error; 138 + fn try_from(value: String) -> Result<Self, Self::Error> { 139 + Ok(Self( 140 + value 141 + .split(';') 142 + .filter_map(|s| { 143 + if s.is_empty() { 144 + None 145 + } else { 146 + Some(s.to_string()) 147 + } 148 + }) 149 + .collect::<Vec<String>>(), 150 + )) 151 + } 152 + } 153 + 154 + impl AsRef<Vec<String>> for CertificateBundles { 155 + fn as_ref(&self) -> &Vec<String> { 156 + &self.0 157 + } 158 + } 159 + 160 + impl AsRef<bool> for TaskEnable { 161 + fn as_ref(&self) -> &bool { 162 + &self.0 163 + } 164 + } 165 + 166 + impl TryFrom<String> for TaskEnable { 167 + type Error = anyhow::Error; 168 + fn try_from(value: String) -> Result<Self, Self::Error> { 169 + let value = value.parse::<bool>().map_err(|err| { 170 + anyhow::Error::new(err).context(anyhow!("parsing task enable into bool failed")) 171 + })?; 172 + Ok(Self(value)) 173 + } 174 + } 175 + 176 + impl TryFrom<String> for Feeds { 177 + type Error = anyhow::Error; 178 + fn try_from(value: String) -> Result<Self, Self::Error> { 179 + let content = std::fs::read(value).map_err(|err| { 180 + anyhow::Error::new(err).context(anyhow!("reading feed config file failed")) 181 + })?; 182 + 183 + serde_yaml::from_slice(&content).map_err(|err| { 184 + anyhow::Error::new(err).context(anyhow!("parsing feeds into Feeds failed")) 185 + }) 186 + } 187 + }
+287
src/consumer.rs
··· 1 + use std::str::FromStr; 2 + 3 + use anyhow::{Context, Result}; 4 + use futures_util::SinkExt; 5 + use futures_util::StreamExt; 6 + use http::Uri; 7 + use tokio::time::{sleep, Instant}; 8 + use tokio_util::sync::CancellationToken; 9 + use tokio_websockets::{ClientBuilder, Message}; 10 + 11 + use crate::config; 12 + use crate::matcher::FeedMatchers; 13 + use crate::storage; 14 + use crate::storage::consumer_control_get; 15 + use crate::storage::consumer_control_insert; 16 + use crate::storage::feed_content_insert; 17 + use crate::storage::StoragePool; 18 + 19 + const MAX_MESSAGE_SIZE: usize = 25000; 20 + 21 + #[derive(Clone)] 22 + pub struct ConsumerTaskConfig { 23 + pub zstd_dictionary_location: String, 24 + pub jetstream_hostname: String, 25 + pub feeds: config::Feeds, 26 + } 27 + 28 + pub struct ConsumerTask { 29 + cancellation_token: CancellationToken, 30 + pool: StoragePool, 31 + config: ConsumerTaskConfig, 32 + feed_matchers: FeedMatchers, 33 + } 34 + 35 + impl ConsumerTask { 36 + pub fn new( 37 + pool: StoragePool, 38 + config: ConsumerTaskConfig, 39 + cancellation_token: CancellationToken, 40 + ) -> Result<Self> { 41 + let feed_matchers = FeedMatchers::from_config(&config.feeds)?; 42 + 43 + Ok(Self { 44 + pool, 45 + cancellation_token, 46 + config, 47 + feed_matchers, 48 + }) 49 + } 50 + 51 + pub async fn run_background(&self) -> Result<()> { 52 + tracing::debug!("ConsumerTask started"); 53 + 54 + let last_time_us = 55 + consumer_control_get(&self.pool, &self.config.jetstream_hostname).await?; 56 + 57 + // mkdir -p data/ && curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary 58 + let data: Vec<u8> = std::fs::read(self.config.zstd_dictionary_location.clone()) 59 + .context("unable to load zstd dictionary")?; 60 + 61 + let uri = Uri::from_str(&format!( 62 + "wss://{}/subscribe?compress=true&requireHello=true", 63 + self.config.jetstream_hostname 64 + )) 65 + .context("invalid jetstream URL")?; 66 + 67 + let (mut client, _) = ClientBuilder::from_uri(uri) 68 + .connect() 69 + .await 70 + .map_err(|err| anyhow::Error::new(err).context("cannot connect to jetstream"))?; 71 + 72 + let update = model::SubscriberSourcedMessage::Update { 73 + wanted_collections: vec!["app.bsky.feed.post".to_string()], 74 + wanted_dids: vec![], 75 + max_message_size_bytes: MAX_MESSAGE_SIZE as u64, 76 + cursor: last_time_us, 77 + }; 78 + let serialized_update = serde_json::to_string(&update) 79 + .map_err(|err| anyhow::Error::msg(err).context("cannot serialize update"))?; 80 + 81 + client 82 + .send(Message::text(serialized_update)) 83 + .await 84 + .map_err(|err| anyhow::Error::msg(err).context("cannot send update"))?; 85 + 86 + let mut decompressor = zstd::bulk::Decompressor::with_dictionary(&data) 87 + .map_err(|err| anyhow::Error::msg(err).context("cannot create decompressor"))?; 88 + 89 + let interval = std::time::Duration::from_secs(120); 90 + let sleeper = sleep(interval); 91 + tokio::pin!(sleeper); 92 + 93 + let mut time_usec = 0u64; 94 + 95 + loop { 96 + tokio::select! { 97 + () = self.cancellation_token.cancelled() => { 98 + break; 99 + }, 100 + () = &mut sleeper => { 101 + consumer_control_insert(&self.pool, &self.config.jetstream_hostname, &time_usec.to_string()).await?; 102 + sleeper.as_mut().reset(Instant::now() + interval); 103 + }, 104 + item = client.next() => { 105 + if item.is_none() { 106 + tracing::warn!("jetstream connection closed"); 107 + break; 108 + } 109 + let item = item.unwrap(); 110 + 111 + if let Err(err) = item { 112 + tracing::error!(error = ?err, "error processing jetstream message"); 113 + continue; 114 + } 115 + let item = item.unwrap(); 116 + 117 + if !item.is_binary() { 118 + tracing::warn!("message from jetstream is not binary"); 119 + continue; 120 + } 121 + let payload = item.into_payload(); 122 + 123 + let decoded = decompressor.decompress(&payload, MAX_MESSAGE_SIZE + 1); 124 + if let Err(err) = decoded { 125 + let length = payload.len(); 126 + tracing::error!(error = ?err, length = ?length, "error processing jetstream message"); 127 + continue; 128 + } 129 + let decoded = decoded.unwrap(); 130 + 131 + let event = serde_json::from_slice::<model::Event>(&decoded); 132 + if let Err(err) = event { 133 + tracing::error!(error = ?err, "error processing jetstream message"); 134 + 135 + #[cfg(debug_assertions)] 136 + { 137 + println!("{:?}", std::str::from_utf8(&decoded)); 138 + } 139 + 140 + continue; 141 + } 142 + let event = event.unwrap(); 143 + 144 + time_usec = std::cmp::max(time_usec, event.time_us); 145 + 146 + if event.clone().kind != "commit" { 147 + continue; 148 + } 149 + 150 + let event_value = serde_json::to_value(event.clone()); 151 + if let Err(err) = event_value { 152 + tracing::error!(error = ?err, "error processing jetstream message"); 153 + continue; 154 + } 155 + let event_value = event_value.unwrap(); 156 + 157 + for feed_matcher in self.feed_matchers.0.iter() { 158 + if feed_matcher.matches(&event_value) { 159 + tracing::debug!(feed_id = ?feed_matcher.feed, "matched event"); 160 + if let Some((uri, cid)) = model::to_post_strong_ref(&event) { 161 + let feed_content = storage::model::FeedContent::new(feed_matcher.feed.clone(), uri, event.clone().time_us, cid); 162 + feed_content_insert(&self.pool, &feed_content).await?; 163 + } 164 + } 165 + } 166 + } 167 + } 168 + } 169 + 170 + tracing::debug!("ConsumerTask stopped"); 171 + 172 + Ok(()) 173 + } 174 + } 175 + 176 + pub(crate) mod model { 177 + 178 + use std::collections::HashMap; 179 + 180 + use serde::{Deserialize, Serialize}; 181 + 182 + #[derive(Debug, Clone, Serialize, Deserialize)] 183 + #[serde(tag = "type", content = "payload")] 184 + pub(crate) enum SubscriberSourcedMessage { 185 + #[serde(rename = "options_update")] 186 + Update { 187 + #[serde(rename = "wantedCollections")] 188 + wanted_collections: Vec<String>, 189 + 190 + #[serde(rename = "wantedDids", skip_serializing_if = "Vec::is_empty", default)] 191 + wanted_dids: Vec<String>, 192 + 193 + #[serde(rename = "maxMessageSizeBytes")] 194 + max_message_size_bytes: u64, 195 + 196 + #[serde(skip_serializing_if = "Option::is_none")] 197 + cursor: Option<u64>, 198 + }, 199 + } 200 + 201 + #[derive(Debug, Clone, Serialize, Deserialize)] 202 + pub(crate) struct Facet { 203 + pub(crate) features: Vec<HashMap<String, String>>, 204 + } 205 + 206 + #[derive(Debug, Clone, Serialize, Deserialize)] 207 + pub(crate) struct StrongRef { 208 + pub(crate) uri: String, 209 + } 210 + 211 + #[derive(Debug, Clone, Serialize, Deserialize)] 212 + pub(crate) struct Reply { 213 + pub(crate) root: Option<StrongRef>, 214 + pub(crate) parent: Option<StrongRef>, 215 + } 216 + 217 + #[derive(Debug, Clone, Serialize, Deserialize)] 218 + #[serde(tag = "$type")] 219 + pub(crate) enum Record { 220 + #[serde(rename = "app.bsky.feed.post")] 221 + Post { 222 + text: String, 223 + 224 + facets: Option<Vec<Facet>>, 225 + 226 + reply: Option<Reply>, 227 + 228 + #[serde(flatten)] 229 + extra: HashMap<String, serde_json::Value>, 230 + }, 231 + 232 + #[serde(untagged)] 233 + Other { 234 + #[serde(flatten)] 235 + extra: HashMap<String, serde_json::Value>, 236 + }, 237 + } 238 + 239 + #[derive(Debug, Clone, Serialize, Deserialize)] 240 + #[serde(tag = "operation")] 241 + pub(crate) enum CommitOp { 242 + #[serde(rename = "create")] 243 + Create { 244 + rev: String, 245 + collection: String, 246 + rkey: String, 247 + record: Record, 248 + cid: String, 249 + }, 250 + #[serde(rename = "update")] 251 + Update { 252 + rev: String, 253 + collection: String, 254 + rkey: String, 255 + record: Record, 256 + cid: String, 257 + }, 258 + #[serde(rename = "delete")] 259 + Delete { 260 + rev: String, 261 + collection: String, 262 + rkey: String, 263 + }, 264 + } 265 + 266 + #[derive(Debug, Clone, Serialize, Deserialize)] 267 + pub(crate) struct Event { 268 + pub(crate) did: String, 269 + pub(crate) kind: String, 270 + pub(crate) time_us: u64, 271 + pub(crate) commit: Option<CommitOp>, 272 + } 273 + 274 + pub(crate) fn to_post_strong_ref(event: &Event) -> Option<(String, String)> { 275 + if let Some(CommitOp::Create { 276 + collection, 277 + rkey, 278 + cid, 279 + .. 280 + }) = &event.commit 281 + { 282 + let uri = format!("at://{}/{}/{}", event.did, collection, rkey); 283 + return Some((uri, cid.clone())); 284 + } 285 + None 286 + } 287 + }
+43
src/crypto.rs
··· 1 + use anyhow::{anyhow, Result}; 2 + use serde::Deserialize; 3 + 4 + #[derive(Deserialize)] 5 + pub struct JwtClaims { 6 + pub iss: String, 7 + pub aud: String, 8 + pub iat: i32, 9 + pub exp: i32, 10 + pub lxm: String, 11 + } 12 + 13 + #[derive(Deserialize)] 14 + pub struct JwtHeader { 15 + pub typ: String, 16 + pub alg: String, 17 + } 18 + 19 + pub(crate) fn validate(multibase_key: &str, signature: &[u8], content: &str) -> Result<()> { 20 + let (_, decoded_multibase_key) = multibase::decode(multibase_key)?; 21 + match &decoded_multibase_key[..2] { 22 + // secp256k1 23 + [0xe7, 0x01] => { 24 + let signature = ecdsa::Signature::from_slice(signature)?; 25 + let verifying_key = 26 + k256::ecdsa::VerifyingKey::from_sec1_bytes(&decoded_multibase_key[2..])?; 27 + ecdsa::signature::Verifier::verify(&verifying_key, content.as_bytes(), &signature)?; 28 + Ok(()) 29 + } 30 + // p256 31 + [0x80, 0x24] => { 32 + let signature = ecdsa::Signature::from_slice(signature)?; 33 + let verifying_key = 34 + p256::ecdsa::VerifyingKey::from_sec1_bytes(&decoded_multibase_key[2..])?; 35 + ecdsa::signature::Verifier::verify(&verifying_key, content.as_bytes(), &signature)?; 36 + Ok(()) 37 + } 38 + _ => Err(anyhow!( 39 + "invalid multibase: {:?}", 40 + &decoded_multibase_key[..2] 41 + )), 42 + } 43 + }
+25
src/errors.rs
··· 1 + use axum::{ 2 + http::StatusCode, 3 + response::{IntoResponse, Response}, 4 + }; 5 + 6 + #[derive(Debug)] 7 + pub struct SupercellError(pub anyhow::Error); 8 + 9 + impl<E> From<E> for SupercellError 10 + where 11 + E: Into<anyhow::Error>, 12 + { 13 + fn from(err: E) -> Self { 14 + Self(err.into()) 15 + } 16 + } 17 + 18 + impl IntoResponse for SupercellError { 19 + fn into_response(self) -> Response { 20 + { 21 + tracing::error!(error = ?self.0, "internal server error"); 22 + (StatusCode::INTERNAL_SERVER_ERROR).into_response() 23 + } 24 + } 25 + }
+49
src/http/context.rs
··· 1 + use axum::extract::FromRef; 2 + use std::{ 3 + collections::{HashMap, HashSet}, 4 + ops::Deref, 5 + sync::Arc, 6 + }; 7 + 8 + use crate::storage::StoragePool; 9 + 10 + #[derive(Clone, Debug)] 11 + pub(crate) struct FeedControl { 12 + pub(crate) deny: String, 13 + pub(crate) allowed: HashSet<String>, 14 + } 15 + 16 + pub struct InnerWebContext { 17 + pub(crate) pool: StoragePool, 18 + pub(crate) external_base: String, 19 + pub(crate) feeds: HashMap<String, FeedControl>, 20 + } 21 + 22 + #[derive(Clone, FromRef)] 23 + pub struct WebContext(pub(crate) Arc<InnerWebContext>); 24 + 25 + impl Deref for WebContext { 26 + type Target = InnerWebContext; 27 + 28 + fn deref(&self) -> &Self::Target { 29 + &self.0 30 + } 31 + } 32 + 33 + impl WebContext { 34 + pub fn new( 35 + pool: StoragePool, 36 + external_base: &str, 37 + feeds: HashMap<String, (String, HashSet<String>)>, 38 + ) -> Self { 39 + let feeds = feeds 40 + .into_iter() 41 + .map(|(uri, (deny, allowed))| (uri, FeedControl { deny, allowed })) 42 + .collect(); 43 + Self(Arc::new(InnerWebContext { 44 + pool, 45 + external_base: external_base.to_string(), 46 + feeds, 47 + })) 48 + } 49 + }
+226
src/http/handle_get_feed_skeleton.rs
··· 1 + use anyhow::{anyhow, Context, Result}; 2 + use axum::{extract::State, response::IntoResponse, Json}; 3 + use axum_extra::extract::Query; 4 + use base64::{engine::general_purpose, Engine as _}; 5 + use chrono::Utc; 6 + use http::{HeaderMap, StatusCode}; 7 + use serde::{Deserialize, Serialize}; 8 + use serde_json::json; 9 + 10 + use crate::errors::SupercellError; 11 + use crate::storage::feed_content_paginate; 12 + use crate::storage::{verification_method_get, StoragePool}; 13 + 14 + use crate::crypto::{validate, JwtClaims, JwtHeader}; 15 + 16 + use super::context::WebContext; 17 + 18 + #[derive(Deserialize, Default)] 19 + pub struct FeedParams { 20 + pub feed: Option<String>, 21 + pub limit: Option<u16>, 22 + pub cursor: Option<String>, 23 + } 24 + 25 + #[derive(Serialize)] 26 + pub struct FeedItemView { 27 + pub post: String, 28 + } 29 + 30 + #[derive(Serialize)] 31 + pub struct FeedItemsView { 32 + #[serde(skip_serializing_if = "Option::is_none")] 33 + pub cursor: Option<String>, 34 + pub feed: Vec<FeedItemView>, 35 + } 36 + 37 + pub async fn handle_get_feed_skeleton( 38 + State(web_context): State<WebContext>, 39 + Query(feed_params): Query<FeedParams>, 40 + headers: HeaderMap, 41 + ) -> Result<impl IntoResponse, SupercellError> { 42 + if feed_params.feed.is_none() { 43 + return Err(anyhow!("feed parameter is required").into()); 44 + } 45 + let feed_uri = feed_params.feed.unwrap(); 46 + 47 + let feed_control = web_context.feeds.get(&feed_uri); 48 + if feed_control.is_none() { 49 + return Ok(( 50 + StatusCode::BAD_REQUEST, 51 + Json(json!({ 52 + "error": "UnknownFeed", 53 + "message": "unknown feed", 54 + })), 55 + ) 56 + .into_response()); 57 + } 58 + 59 + let feed_control = feed_control.unwrap(); 60 + 61 + let authorization = headers.get("Authorization").and_then(|value| { 62 + value 63 + .to_str() 64 + .map(|inner_value| inner_value.to_string()) 65 + .ok() 66 + }); 67 + 68 + let did = did_from_jwt(&web_context.pool, &web_context.external_base, authorization).await; 69 + 70 + if let Err(err) = did { 71 + tracing::error!(error = ?err, "failed to validate JWT"); 72 + return Ok(Json(FeedItemsView { 73 + cursor: None, 74 + feed: vec![FeedItemView { 75 + post: feed_control.deny.clone(), 76 + }], 77 + }) 78 + .into_response()); 79 + } 80 + 81 + let did = did.unwrap(); 82 + 83 + if !feed_control.allowed.contains(&did) { 84 + return Ok(Json(FeedItemsView { 85 + cursor: None, 86 + feed: vec![FeedItemView { 87 + post: feed_control.deny.clone(), 88 + }], 89 + }) 90 + .into_response()); 91 + } 92 + 93 + let parsed_cursor = parse_cursor(feed_params.cursor); 94 + let feed_items = feed_content_paginate( 95 + &web_context.pool, 96 + &feed_uri, 97 + feed_params.limit, 98 + parsed_cursor, 99 + ) 100 + .await?; 101 + 102 + let cursor = feed_items 103 + .iter() 104 + .last() 105 + .map(|last_feed_item| format!("{},{}", last_feed_item.time_us(), last_feed_item.cid)); 106 + 107 + let feed_item_views = feed_items 108 + .iter() 109 + .map(|feed_item| FeedItemView { 110 + post: feed_item.uri.clone(), 111 + }) 112 + .collect::<Vec<_>>(); 113 + 114 + Ok(Json(FeedItemsView { 115 + cursor, 116 + feed: feed_item_views, 117 + }) 118 + .into_response()) 119 + } 120 + 121 + pub fn split_token(token: &str) -> Result<[&str; 3]> { 122 + let mut components = token.split('.'); 123 + let header = components.next().ok_or(anyhow!("missing header"))?; 124 + let claims = components.next().ok_or(anyhow!("missing claims"))?; 125 + let signature = components.next().ok_or(anyhow!("missing signature"))?; 126 + 127 + if components.next().is_some() { 128 + return Err(anyhow!("invalid token")); 129 + } 130 + 131 + Ok([header, claims, signature]) 132 + } 133 + 134 + async fn did_from_jwt( 135 + pool: &StoragePool, 136 + external_base: &str, 137 + authorization: Option<String>, 138 + ) -> Result<String> { 139 + let jwt = authorization 140 + .and_then(|value| { 141 + value 142 + .strip_prefix("Bearer ") 143 + .map(|inner_value| inner_value.to_string()) 144 + }) 145 + .ok_or(anyhow!("missing authorization"))?; 146 + let [header_part, claims_part, signature_part] = split_token(&jwt)?; 147 + 148 + let header: JwtHeader = { 149 + let content = general_purpose::URL_SAFE_NO_PAD 150 + .decode(header_part) 151 + .context("unable to base64 decode content")?; 152 + serde_json::from_slice(&content).context("unable to deserialize object")? 153 + }; 154 + let claims: JwtClaims = { 155 + let content = general_purpose::URL_SAFE_NO_PAD 156 + .decode(claims_part) 157 + .context("unable to base64 decode content")?; 158 + serde_json::from_slice(&content).context("unable to deserialize object")? 159 + }; 160 + 161 + let now = Utc::now(); 162 + let now = now.timestamp() as i32; 163 + 164 + if header.alg != "ES256K" { 165 + return Err(anyhow!("unsupported algorithm")); 166 + } 167 + if claims.lxm != "app.bsky.feed.getFeedSkeleton" { 168 + return Err(anyhow!("invalid resource")); 169 + } 170 + if claims.aud != format!("did:web:{}", external_base) { 171 + return Err(anyhow!("invalid audience")); 172 + } 173 + if claims.exp < now { 174 + return Err(anyhow!("token expired")); 175 + } 176 + if claims.iat > now { 177 + return Err(anyhow!("token issued in the future")); 178 + } 179 + 180 + let multibase = verification_method_get(pool, &claims.iss).await?; 181 + if multibase.is_none() { 182 + return Err(anyhow!("verification method not found")); 183 + } 184 + let multibase = multibase.unwrap(); 185 + 186 + let signature = general_purpose::URL_SAFE_NO_PAD 187 + .decode(signature_part) 188 + .context("invalid signature")?; 189 + let signature: &[u8] = &signature; 190 + 191 + let content = format!("{}.{}", header_part, claims_part); 192 + 193 + validate(&multibase, signature, &content)?; 194 + 195 + Ok(claims.iss) 196 + } 197 + 198 + fn parse_cursor(value: Option<String>) -> Option<(u64, u32, u32, String)> { 199 + let value = value.as_ref()?; 200 + 201 + let parts = value.split(",").collect::<Vec<&str>>(); 202 + if parts.len() != 2 { 203 + return None; 204 + } 205 + 206 + let time_us = parts[0].parse::<u64>(); 207 + if time_us.is_err() { 208 + return None; 209 + } 210 + let time_us = time_us.unwrap(); 211 + 212 + let time_us_bytes = time_us.to_be_bytes(); 213 + let indexed_at = u32::from_be_bytes([ 214 + time_us_bytes[0], 215 + time_us_bytes[1], 216 + time_us_bytes[2], 217 + time_us_bytes[3], 218 + ]); 219 + let indexed_at_more = u32::from_be_bytes([ 220 + time_us_bytes[4], 221 + time_us_bytes[5], 222 + time_us_bytes[6], 223 + time_us_bytes[7], 224 + ]); 225 + Some((time_us, indexed_at, indexed_at_more, parts[1].to_string())) 226 + }
+9
src/http/handle_index.rs
··· 1 + use anyhow::Result; 2 + use axum::{response::IntoResponse, Json}; 3 + use serde_json::json; 4 + 5 + use crate::errors::SupercellError; 6 + 7 + pub async fn handle_index() -> Result<impl IntoResponse, SupercellError> { 8 + Ok(Json(json!({"ok": true})).into_response()) 9 + }
+24
src/http/handle_well_known.rs
··· 1 + use anyhow::Result; 2 + use axum::{extract::State, response::IntoResponse, Json}; 3 + use serde_json::json; 4 + 5 + use crate::errors::SupercellError; 6 + 7 + use super::context::WebContext; 8 + 9 + pub async fn handle_well_known( 10 + State(web_context): State<WebContext>, 11 + ) -> Result<impl IntoResponse, SupercellError> { 12 + Ok(Json(json!({ 13 + "@context": ["https://www.w3.org/ns/did/v1"], 14 + "id": format!("did:web:{}", web_context.external_base), 15 + "service": [ 16 + { 17 + "id": "#bsky_fg", 18 + "type": "BskyFeedGenerator", 19 + "serviceEndpoint": format!("https://{}", web_context.external_base), 20 + } 21 + ] 22 + })) 23 + .into_response()) 24 + }
+5
src/http/mod.rs
··· 1 + pub mod context; 2 + pub mod handle_get_feed_skeleton; 3 + pub mod handle_index; 4 + pub mod handle_well_known; 5 + pub mod server;
+34
src/http/server.rs
··· 1 + use super::{ 2 + context::WebContext, handle_get_feed_skeleton::handle_get_feed_skeleton, 3 + handle_index::handle_index, handle_well_known::handle_well_known, 4 + }; 5 + use axum::{http::HeaderValue, routing::get, Router}; 6 + use http::{ 7 + header::{ACCEPT, ACCEPT_LANGUAGE}, 8 + Method, 9 + }; 10 + use std::time::Duration; 11 + use tower_http::cors::CorsLayer; 12 + use tower_http::timeout::TimeoutLayer; 13 + use tower_http::trace::TraceLayer; 14 + 15 + pub fn build_router(web_context: WebContext) -> Router { 16 + Router::new() 17 + .route("/", get(handle_index)) 18 + .route("/.well-known/did.json", get(handle_well_known)) 19 + .route( 20 + "/xrpc/app.bsky.feed.getFeedSkeleton", 21 + get(handle_get_feed_skeleton), 22 + ) 23 + .layer(( 24 + TraceLayer::new_for_http(), 25 + TimeoutLayer::new(Duration::from_secs(10)), 26 + )) 27 + .layer( 28 + CorsLayer::new() 29 + .allow_origin(web_context.external_base.parse::<HeaderValue>().unwrap()) 30 + .allow_methods([Method::GET]) 31 + .allow_headers([ACCEPT_LANGUAGE, ACCEPT]), 32 + ) 33 + .with_state(web_context.clone()) 34 + }
+7
src/lib.rs
··· 1 + pub mod config; 2 + pub mod consumer; 3 + pub mod crypto; 4 + pub mod errors; 5 + pub mod http; 6 + pub mod matcher; 7 + pub mod storage;
+353
src/matcher.rs
··· 1 + use anyhow::{Context, Result}; 2 + use serde_json_path::JsonPath; 3 + 4 + use crate::config; 5 + 6 + pub trait Matcher: Sync + Send { 7 + fn matches(&self, value: &serde_json::Value) -> bool; 8 + } 9 + 10 + pub struct FeedMatcher { 11 + pub(crate) feed: String, 12 + matchers: Vec<Box<dyn Matcher>>, 13 + } 14 + 15 + pub(crate) struct FeedMatchers(pub(crate) Vec<FeedMatcher>); 16 + 17 + impl FeedMatchers { 18 + pub(crate) fn from_config(config_feeds: &config::Feeds) -> Result<Self> { 19 + let mut feed_matchers = vec![]; 20 + 21 + for config_feed in config_feeds.feeds.iter() { 22 + let feed = config_feed.uri.clone(); 23 + 24 + let mut matchers = vec![]; 25 + 26 + for config_feed_matcher in config_feed.matchers.iter() { 27 + match config_feed_matcher { 28 + config::Matcher::Equal { path, value } => { 29 + matchers 30 + .push(Box::new(EqualsMatcher::new(value, path)?) as Box<dyn Matcher>); 31 + } 32 + config::Matcher::Prefix { path, value } => { 33 + matchers 34 + .push(Box::new(PrefixMatcher::new(value, path)?) as Box<dyn Matcher>); 35 + } 36 + config::Matcher::Sequence { path, values } => { 37 + matchers.push(Box::new(SequenceMatcher::new(values, path)?) as Box<dyn Matcher>); 38 + } 39 + } 40 + } 41 + 42 + feed_matchers.push(FeedMatcher { feed, matchers }); 43 + } 44 + 45 + Ok(Self(feed_matchers)) 46 + } 47 + } 48 + 49 + impl FeedMatcher { 50 + pub(crate) fn matches(&self, value: &serde_json::Value) -> bool { 51 + self.matchers.iter().any(|matcher| matcher.matches(value)) 52 + } 53 + } 54 + 55 + pub struct EqualsMatcher { 56 + expected: String, 57 + path: JsonPath, 58 + } 59 + 60 + impl EqualsMatcher { 61 + pub fn new(expected: &str, path: &str) -> Result<Self> { 62 + let path = JsonPath::parse(path).context("cannot parse path")?; 63 + Ok(Self { 64 + expected: expected.to_string(), 65 + path, 66 + }) 67 + } 68 + } 69 + 70 + impl Matcher for EqualsMatcher { 71 + fn matches(&self, value: &serde_json::Value) -> bool { 72 + let nodes = self.path.query(value).all(); 73 + 74 + let string_nodes = nodes 75 + .iter() 76 + .filter_map(|value| { 77 + if let serde_json::Value::String(actual) = value { 78 + Some(actual.to_lowercase().clone()) 79 + } else { 80 + None 81 + } 82 + }) 83 + .collect::<Vec<String>>(); 84 + 85 + string_nodes.iter().any(|value| value == &self.expected) 86 + } 87 + } 88 + 89 + pub struct PrefixMatcher { 90 + prefix: String, 91 + path: JsonPath, 92 + } 93 + 94 + impl PrefixMatcher { 95 + pub(crate) fn new(prefix: &str, path: &str) -> Result<Self> { 96 + let path = JsonPath::parse(path).context("cannot parse path")?; 97 + Ok(Self { 98 + prefix: prefix.to_string(), 99 + path, 100 + }) 101 + } 102 + } 103 + 104 + impl Matcher for PrefixMatcher { 105 + fn matches(&self, value: &serde_json::Value) -> bool { 106 + let nodes = self.path.query(value).all(); 107 + 108 + let string_nodes = nodes 109 + .iter() 110 + .filter_map(|value| { 111 + if let serde_json::Value::String(actual) = value { 112 + Some(actual.to_lowercase().clone()) 113 + } else { 114 + None 115 + } 116 + }) 117 + .collect::<Vec<String>>(); 118 + 119 + string_nodes 120 + .iter() 121 + .any(|value| value.starts_with(&self.prefix)) 122 + } 123 + } 124 + 125 + pub struct SequenceMatcher { 126 + expected: Vec<String>, 127 + path: JsonPath, 128 + } 129 + 130 + impl SequenceMatcher { 131 + pub(crate) fn new(expected: &[String], path: &str) -> Result<Self> { 132 + let path = JsonPath::parse(path).context("cannot parse path")?; 133 + Ok(Self { 134 + expected: expected.to_owned(), 135 + path, 136 + }) 137 + } 138 + } 139 + 140 + impl Matcher for SequenceMatcher { 141 + fn matches(&self, value: &serde_json::Value) -> bool { 142 + let nodes = self.path.query(value).all(); 143 + 144 + let string_nodes = nodes 145 + .iter() 146 + .filter_map(|value| { 147 + if let serde_json::Value::String(actual) = value { 148 + Some(actual.to_lowercase().clone()) 149 + } else { 150 + None 151 + } 152 + }) 153 + .collect::<Vec<String>>(); 154 + 155 + for string_node in string_nodes { 156 + let mut last_found: i32 = -1; 157 + 158 + let mut found_index = 0; 159 + for (index, expected) in self.expected.iter().enumerate() { 160 + if let Some(current_found) = string_node.find(expected) { 161 + if (current_found as i32) > last_found { 162 + last_found = current_found as i32; 163 + found_index = index; 164 + } else { 165 + last_found = -1; 166 + break; 167 + } 168 + } else { 169 + last_found = -1; 170 + break; 171 + } 172 + } 173 + 174 + if last_found != -1 && found_index == self.expected.len() - 1 { 175 + return true; 176 + } 177 + } 178 + 179 + false 180 + } 181 + } 182 + 183 + #[cfg(test)] 184 + mod tests { 185 + use super::*; 186 + 187 + #[test] 188 + fn equals_matcher() { 189 + let raw_json = r#"{ 190 + "did": "did:plc:tgudj2fjm77pzkuawquqhsxm", 191 + "time_us": 1730491093829414, 192 + "kind": "commit", 193 + "commit": { 194 + "rev": "3l7vxhiuibq2u", 195 + "operation": "create", 196 + "collection": "app.bsky.feed.post", 197 + "rkey": "3l7vxhiu4kq2u", 198 + "record": { 199 + "$type": "app.bsky.feed.post", 200 + "createdAt": "2024-11-01T19:58:12.980Z", 201 + "langs": ["en", "es"], 202 + "text": "hey dnd question, what does a 45 on a stealth check look like" 203 + }, 204 + "cid": "bafyreide7jpu67vvkn4p2iznph6frbwv6vamt7yg5duppqjqggz4sdfik4" 205 + } 206 + }"#; 207 + 208 + let value: serde_json::Value = serde_json::from_str(raw_json).expect("json is valid"); 209 + 210 + let tests = vec![ 211 + ("$.did", "did:plc:tgudj2fjm77pzkuawquqhsxm", true), 212 + ("$.commit.record['$type']", "app.bsky.feed.post", true), 213 + ("$.commit.record.langs.*", "en", true), 214 + ( 215 + "$.commit.record.text", 216 + "hey dnd question, what does a 45 on a stealth check look like", 217 + true, 218 + ), 219 + ("$.did", "did:plc:tgudj2fjm77pzkuawquqhsxn", false), 220 + ("$.commit.record.notreal", "value", false), 221 + ]; 222 + 223 + for (path, expected, result) in tests { 224 + let matcher = EqualsMatcher::new(expected, path).expect("matcher is valid"); 225 + assert_eq!(matcher.matches(&value), result); 226 + } 227 + } 228 + 229 + #[test] 230 + fn prefix_matcher() { 231 + let raw_json = r#"{ 232 + "did": "did:plc:tgudj2fjm77pzkuawquqhsxm", 233 + "time_us": 1730491093829414, 234 + "kind": "commit", 235 + "commit": { 236 + "rev": "3l7vxhiuibq2u", 237 + "operation": "create", 238 + "collection": "app.bsky.feed.post", 239 + "rkey": "3l7vxhiu4kq2u", 240 + "record": { 241 + "$type": "app.bsky.feed.post", 242 + "createdAt": "2024-11-01T19:58:12.980Z", 243 + "langs": ["en"], 244 + "text": "hey dnd question, what does a 45 on a stealth check look like", 245 + "facets": [ 246 + { 247 + "features": [{"$type": "app.bsky.richtext.facet#tag", "tag": "dungeonsanddragons"}], 248 + "index": { "byteEnd": 1, "byteStart": 0 } 249 + }, 250 + { 251 + "features": [{"$type": "app.bsky.richtext.facet#tag", "tag": "gaming"}], 252 + "index": { "byteEnd": 1, "byteStart": 0 } 253 + } 254 + ] 255 + }, 256 + "cid": "bafyreide7jpu67vvkn4p2iznph6frbwv6vamt7yg5duppqjqggz4sdfik4" 257 + } 258 + }"#; 259 + 260 + let value: serde_json::Value = serde_json::from_str(raw_json).expect("json is valid"); 261 + 262 + let tests = vec![ 263 + ("$.commit.record['$type']", "app.bsky.", true), 264 + ("$.commit.record.langs.*", "e", true), 265 + ("$.commit.record.text", "hey dnd question", true), 266 + ("$.commit.record.facets[*].features[?(@['$type'] == 'app.bsky.richtext.facet#tag')].tag", "dungeons", true), 267 + ("$.commit.record.notreal", "value", false), 268 + ("$.commit.record['$type']", "com.bsky.", false), 269 + ]; 270 + 271 + for (path, prefix, result) in tests { 272 + let matcher = PrefixMatcher::new(prefix, path).expect("matcher is valid"); 273 + assert_eq!(matcher.matches(&value), result); 274 + } 275 + } 276 + 277 + #[test] 278 + fn sequence_matcher() { 279 + let raw_json = r#"{ 280 + "did": "did:plc:tgudj2fjm77pzkuawquqhsxm", 281 + "time_us": 1730491093829414, 282 + "kind": "commit", 283 + "commit": { 284 + "rev": "3l7vxhiuibq2u", 285 + "operation": "create", 286 + "collection": "app.bsky.feed.post", 287 + "rkey": "3l7vxhiu4kq2u", 288 + "record": { 289 + "$type": "app.bsky.feed.post", 290 + "createdAt": "2024-11-01T19:58:12.980Z", 291 + "langs": ["en"], 292 + "text": "hey dnd question, what does a 45 on a stealth check look like", 293 + "facets": [ 294 + { 295 + "features": [{"$type": "app.bsky.richtext.facet#tag", "tag": "dungeonsanddragons"}], 296 + "index": { "byteEnd": 1, "byteStart": 0 } 297 + }, 298 + { 299 + "features": [{"$type": "app.bsky.richtext.facet#tag", "tag": "gaming"}], 300 + "index": { "byteEnd": 1, "byteStart": 0 } 301 + } 302 + ] 303 + }, 304 + "cid": "bafyreide7jpu67vvkn4p2iznph6frbwv6vamt7yg5duppqjqggz4sdfik4" 305 + } 306 + }"#; 307 + 308 + let value: serde_json::Value = serde_json::from_str(raw_json).expect("json is valid"); 309 + 310 + let tests = vec![ 311 + ( 312 + "$.commit.record.text", 313 + vec!["hey".into(), "dnd".into(), "question".into()], 314 + true, 315 + ), 316 + ( 317 + "$.commit.record.facets[*].features[?(@['$type'] == 'app.bsky.richtext.facet#tag')].tag", 318 + vec!["dungeons".into(), "and".into(), "dragons".into()], 319 + true, 320 + ), 321 + ( 322 + "$.commit.record.text", 323 + vec!["hey".into(), "question".into(), "dnd".into()], 324 + false, 325 + ), 326 + ( 327 + "$.commit.record.operation", 328 + vec!["hey".into(), "dnd".into(), "question".into()], 329 + false, 330 + ), 331 + ( 332 + "$.commit.record.text", 333 + vec!["hey".into(), "nick".into()], 334 + false, 335 + ), 336 + ]; 337 + 338 + for (path, values, result) in tests { 339 + let matcher = SequenceMatcher::new(&values, path).expect("matcher is valid"); 340 + assert_eq!(matcher.matches(&value), result); 341 + } 342 + } 343 + 344 + #[test] 345 + fn sequence_matcher_edge_case_1() { 346 + let raw_json = r#"{"text": "Stellwerkstörung. Und Signalstörung. Und der Alternativzug ist auch ausgefallen. Und überhaupt."}"#; 347 + let value: serde_json::Value = serde_json::from_str(raw_json).expect("json is valid"); 348 + let matcher = 349 + SequenceMatcher::new(&vec!["smoke".to_string(), "signal".to_string()], "$.text") 350 + .expect("matcher is valid"); 351 + assert_eq!(matcher.matches(&value), false); 352 + } 353 + }
+266
src/storage.rs
··· 1 + use anyhow::{Context, Result}; 2 + use chrono::prelude::*; 3 + use sqlx::{Pool, Sqlite}; 4 + 5 + use model::FeedContent; 6 + 7 + pub type StoragePool = Pool<Sqlite>; 8 + 9 + pub mod model { 10 + use serde::Serialize; 11 + use sqlx::prelude::FromRow; 12 + 13 + #[derive(Clone, FromRow, Serialize)] 14 + pub struct FeedContent { 15 + pub feed_id: String, 16 + pub uri: String, 17 + pub indexed_at: u32, 18 + pub indexed_at_more: u32, 19 + pub cid: String, 20 + } 21 + 22 + impl FeedContent { 23 + pub fn new(feed_id: String, uri: String, time_us: u64, cid: String) -> Self { 24 + // Are their better ways to do this? Probably. 25 + let time_us_bytes = time_us.to_be_bytes(); 26 + let indexed_at = u32::from_be_bytes([ 27 + time_us_bytes[0], 28 + time_us_bytes[1], 29 + time_us_bytes[2], 30 + time_us_bytes[3], 31 + ]); 32 + let indexed_at_more = u32::from_be_bytes([ 33 + time_us_bytes[4], 34 + time_us_bytes[5], 35 + time_us_bytes[6], 36 + time_us_bytes[7], 37 + ]); 38 + 39 + Self { 40 + feed_id, 41 + uri, 42 + indexed_at, 43 + indexed_at_more, 44 + cid, 45 + } 46 + } 47 + pub fn time_us(&self) -> u64 { 48 + let indexed_at_bytes = self.indexed_at.to_be_bytes(); 49 + let indexed_at_more_bytes = self.indexed_at_more.to_be_bytes(); 50 + u64::from_be_bytes([ 51 + indexed_at_bytes[0], 52 + indexed_at_bytes[1], 53 + indexed_at_bytes[2], 54 + indexed_at_bytes[3], 55 + indexed_at_more_bytes[0], 56 + indexed_at_more_bytes[1], 57 + indexed_at_more_bytes[2], 58 + indexed_at_more_bytes[3], 59 + ]) 60 + } 61 + } 62 + } 63 + 64 + pub async fn feed_content_insert( 65 + pool: &StoragePool, 66 + feed_content: &model::FeedContent, 67 + ) -> Result<()> { 68 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 69 + 70 + let now = Utc::now(); 71 + sqlx::query("INSERT OR REPLACE INTO feed_content (feed_id, uri, indexed_at, indexed_at_more, cid, updated_at) VALUES (?, ?, ?, ?, ?, ?)") 72 + .bind(&feed_content.feed_id) 73 + .bind(&feed_content.uri) 74 + .bind(feed_content.indexed_at) 75 + .bind(feed_content.indexed_at_more) 76 + .bind(&feed_content.cid) 77 + .bind(now) 78 + .execute(tx.as_mut()) 79 + .await.context("failed to insert feed content record")?; 80 + 81 + tx.commit().await.context("failed to commit transaction") 82 + } 83 + 84 + pub async fn feed_content_paginate( 85 + pool: &StoragePool, 86 + feed_uri: &str, 87 + limit: Option<u16>, 88 + cursor: Option<(u64, u32, u32, String)>, 89 + ) -> Result<Vec<FeedContent>> { 90 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 91 + 92 + let limit = limit.unwrap_or(20).clamp(1, 100); 93 + 94 + let results = if let Some((_time_us, indexed_at, indexed_at_more, cid)) = cursor { 95 + let query = "SELECT * FROM feed_content WHERE feed_id = ? AND (indexed_at, indexed_at_more, cid) < (?, ?, ?) ORDER BY indexed_at DESC, indexed_at_more DESC, cid DESC LIMIT ?"; 96 + 97 + sqlx::query_as::<_, FeedContent>(query) 98 + .bind(feed_uri) 99 + .bind(indexed_at) 100 + .bind(indexed_at_more) 101 + .bind(cid) 102 + .bind(limit) 103 + .fetch_all(tx.as_mut()) 104 + .await? 105 + } else { 106 + let query = "SELECT * FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC, indexed_at_more DESC, cid DESC LIMIT ?"; 107 + 108 + sqlx::query_as::<_, FeedContent>(query) 109 + .bind(feed_uri) 110 + .bind(limit) 111 + .fetch_all(tx.as_mut()) 112 + .await? 113 + }; 114 + 115 + tx.commit().await.context("failed to commit transaction")?; 116 + 117 + Ok(results) 118 + } 119 + 120 + pub async fn consumer_control_insert( 121 + pool: &StoragePool, 122 + source: &str, 123 + time_us: &str, 124 + ) -> Result<()> { 125 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 126 + 127 + let now = Utc::now(); 128 + sqlx::query( 129 + "INSERT OR REPLACE INTO consumer_control (source, time_us, updated_at) VALUES (?, ?, ?)", 130 + ) 131 + .bind(source) 132 + .bind(time_us) 133 + .bind(now) 134 + .execute(tx.as_mut()) 135 + .await?; 136 + 137 + tx.commit().await.context("failed to commit transaction") 138 + } 139 + 140 + pub async fn consumer_control_get(pool: &StoragePool, source: &str) -> Result<Option<u64>> { 141 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 142 + 143 + let result = 144 + sqlx::query_scalar::<_, String>("SELECT time_us FROM consumer_control WHERE source = ?") 145 + .bind(source) 146 + .fetch_optional(tx.as_mut()) 147 + .await 148 + .context("failed to select consumer control record")?; 149 + 150 + tx.commit().await.context("failed to commit transaction")?; 151 + 152 + Ok(result.and_then(|value| value.parse::<u64>().ok())) 153 + } 154 + 155 + pub async fn verifcation_method_insert( 156 + pool: &StoragePool, 157 + did: &str, 158 + multikey: &str, 159 + ) -> Result<()> { 160 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 161 + 162 + let now = Utc::now(); 163 + sqlx::query( 164 + "INSERT OR REPLACE INTO verification_method_cache (did, multikey, updated_at) VALUES (?, ?, ?)", 165 + ) 166 + .bind(did) 167 + .bind(multikey) 168 + .bind(now) 169 + .execute(tx.as_mut()) 170 + .await.context("failed to update verification method cache")?; 171 + 172 + tx.commit().await.context("failed to commit transaction") 173 + } 174 + 175 + pub async fn verification_method_get(pool: &StoragePool, did: &str) -> Result<Option<String>> { 176 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 177 + 178 + let result = sqlx::query_scalar::<_, String>( 179 + "SELECT multikey FROM verification_method_cache WHERE did = ?", 180 + ) 181 + .bind(did) 182 + .fetch_optional(tx.as_mut()) 183 + .await 184 + .context("failed to select verification method cache record")?; 185 + tx.commit().await.context("failed to commit transaction")?; 186 + Ok(result) 187 + } 188 + 189 + pub async fn feed_content_truncate(pool: &StoragePool, feed_id: &str) -> Result<()> { 190 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 191 + 192 + let result = sqlx::query_scalar::<_, DateTime<Utc>>("SELECT updated_at FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC, indexed_at_more DESC LIMIT 1 OFFSET 501") 193 + .bind(feed_id) 194 + .fetch_optional(tx.as_mut()) 195 + .await.context("failed select feed content mark record")?; 196 + 197 + if let Some(updated_at) = result { 198 + sqlx::query("DELETE FROM feed_content WHERE feed_id = ? AND updated_at < ?") 199 + .bind(feed_id) 200 + .bind(updated_at) 201 + .execute(tx.as_mut()) 202 + .await 203 + .context("failed to delete feed content beyond mark")?; 204 + } 205 + 206 + tx.commit().await.context("failed to commit transaction") 207 + } 208 + 209 + #[cfg(test)] 210 + mod tests { 211 + use sqlx::SqlitePool; 212 + 213 + #[sqlx::test] 214 + async fn record_feed_content(pool: SqlitePool) -> sqlx::Result<()> { 215 + let record = super::model::FeedContent::new( 216 + "feed".to_string(), 217 + "at://did:plc:qadlgs4xioohnhi2jg54mqds/app.bsky.feed.post/3la3bqjg4hx2n".to_string(), 218 + 1730673934229172_u64, 219 + "bafyreih74qdc6zskq7yarqi3xm634vnubf4g3ac5ieegbvakprxpjnsj74".to_string(), 220 + ); 221 + super::feed_content_insert(&pool, &record) 222 + .await 223 + .expect("failed to insert record"); 224 + 225 + let records = super::feed_content_paginate(&pool, "feed", None, None) 226 + .await 227 + .expect("failed to paginate records"); 228 + 229 + assert_eq!(records.len(), 1); 230 + assert_eq!(records[0].feed_id, "feed"); 231 + assert_eq!( 232 + records[0].uri, 233 + "at://did:plc:qadlgs4xioohnhi2jg54mqds/app.bsky.feed.post/3la3bqjg4hx2n" 234 + ); 235 + assert_eq!(records[0].time_us(), 1730673934229172_u64); 236 + 237 + Ok(()) 238 + } 239 + 240 + #[sqlx::test] 241 + async fn consumer_control(pool: SqlitePool) -> sqlx::Result<()> { 242 + super::consumer_control_insert(&pool, "foo", "1730673934229172") 243 + .await 244 + .expect("failed to insert record"); 245 + 246 + assert_eq!( 247 + super::consumer_control_get(&pool, "foo") 248 + .await 249 + .expect("failed to get record"), 250 + Some(1730673934229172_u64) 251 + ); 252 + 253 + super::consumer_control_insert(&pool, "foo", "1730673934229173") 254 + .await 255 + .expect("failed to insert record"); 256 + 257 + assert_eq!( 258 + super::consumer_control_get(&pool, "foo") 259 + .await 260 + .expect("failed to get record"), 261 + Some(1730673934229173_u64) 262 + ); 263 + 264 + Ok(()) 265 + } 266 + }