···66use crate::db::DbPool;
77use async_trait::async_trait;
88use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent};
99+use chrono::{DateTime, Utc};
1010+use serde::Deserialize;
1111+use serde_json::Value;
912use tokio_util::sync::CancellationToken;
10131114/// Default Jetstream endpoint (Bluesky's public instance).
···1417/// Collections we're interested in indexing.
1518pub const MALFESTIO_COLLECTIONS: &[&str] = &["app.malfestio.deck", "app.malfestio.card", "app.malfestio.note"];
16192020+/// Deck record structure matching the Lexicon schema.
2121+#[derive(Debug, Deserialize)]
2222+#[serde(rename_all = "camelCase")]
2323+pub struct DeckRecord {
2424+ pub title: String,
2525+ #[serde(default)]
2626+ pub description: Option<String>,
2727+ #[serde(default)]
2828+ pub tags: Vec<String>,
2929+ #[serde(default)]
3030+ pub card_refs: Vec<String>,
3131+ #[serde(default)]
3232+ pub source_refs: Vec<String>,
3333+ #[serde(default)]
3434+ pub license: Option<String>,
3535+ pub created_at: String,
3636+}
3737+3838+/// Card record structure matching the Lexicon schema.
3939+#[derive(Debug, Deserialize)]
4040+#[serde(rename_all = "camelCase")]
4141+pub struct CardRecord {
4242+ pub deck_ref: String,
4343+ pub front: String,
4444+ pub back: String,
4545+ #[serde(default)]
4646+ pub card_type: Option<String>,
4747+ #[serde(default)]
4848+ pub hints: Vec<String>,
4949+ pub created_at: String,
5050+}
5151+5252+/// Note record structure matching the Lexicon schema.
5353+#[derive(Debug, Deserialize)]
5454+#[serde(rename_all = "camelCase")]
5555+pub struct NoteRecord {
5656+ pub title: String,
5757+ pub body: String,
5858+ #[serde(default)]
5959+ pub tags: Vec<String>,
6060+ #[serde(default)]
6161+ pub visibility: Option<String>,
6262+ pub created_at: String,
6363+}
6464+6565+/// Parse a datetime string from record into chrono DateTime.
6666+fn parse_record_datetime(dt_str: &str) -> DateTime<Utc> {
6767+ DateTime::parse_from_rfc3339(dt_str)
6868+ .map(|dt| dt.with_timezone(&Utc))
6969+ .unwrap_or_else(|_| Utc::now())
7070+}
7171+1772/// Event handler for Malfestio records from Jetstream.
1873pub struct MalfestioEventHandler {
1974 pool: DbPool,
···2681 Self { pool, handler_id: "malfestio-indexer".to_string() }
2782 }
28832929- /// Index a record into the database.
3030- async fn index_record(
8484+ /// Index a deck record with full content.
8585+ async fn index_deck(
8686+ &self, did: &str, rkey: &str, rev: &str, record: &Value,
8787+ ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
8888+ let at_uri = format!("at://{}/app.malfestio.deck/{}", did, rkey);
8989+ let deck: DeckRecord = serde_json::from_value(record.clone())?;
9090+ let created_at = parse_record_datetime(&deck.created_at);
9191+9292+ let client = self.pool.get().await?;
9393+ client
9494+ .execute(
9595+ "INSERT INTO indexed_decks (at_uri, did, rkey, title, description, tags, card_refs, source_refs, license, record_created_at, indexed_at)
9696+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW())
9797+ ON CONFLICT (at_uri) DO UPDATE SET
9898+ title = $4, description = $5, tags = $6, card_refs = $7, source_refs = $8,
9999+ license = $9, record_created_at = $10, indexed_at = NOW(), deleted_at = NULL",
100100+ &[
101101+ &at_uri,
102102+ &did,
103103+ &rkey,
104104+ &deck.title,
105105+ &deck.description,
106106+ &deck.tags,
107107+ &deck.card_refs,
108108+ &deck.source_refs,
109109+ &deck.license,
110110+ &created_at,
111111+ ],
112112+ )
113113+ .await?;
114114+115115+ self.update_repo_state(did, rev).await?;
116116+117117+ tracing::debug!("Indexed deck: {}", at_uri);
118118+ Ok(())
119119+ }
120120+121121+ /// Index a card record with full content.
122122+ async fn index_card(
123123+ &self, did: &str, rkey: &str, rev: &str, record: &Value,
124124+ ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
125125+ let at_uri = format!("at://{}/app.malfestio.card/{}", did, rkey);
126126+ let card: CardRecord = serde_json::from_value(record.clone())?;
127127+ let created_at = parse_record_datetime(&card.created_at);
128128+ let card_type = card.card_type.unwrap_or_else(|| "basic".to_string());
129129+130130+ let client = self.pool.get().await?;
131131+132132+ client
133133+ .execute(
134134+ "INSERT INTO indexed_cards (at_uri, did, rkey, deck_ref, front, back, card_type, hints, record_created_at, indexed_at)
135135+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW())
136136+ ON CONFLICT (at_uri) DO UPDATE SET
137137+ deck_ref = $4, front = $5, back = $6, card_type = $7, hints = $8,
138138+ record_created_at = $9, indexed_at = NOW(), deleted_at = NULL",
139139+ &[
140140+ &at_uri,
141141+ &did,
142142+ &rkey,
143143+ &card.deck_ref,
144144+ &card.front,
145145+ &card.back,
146146+ &card_type,
147147+ &card.hints,
148148+ &created_at,
149149+ ],
150150+ )
151151+ .await?;
152152+153153+ self.update_repo_state(did, rev).await?;
154154+155155+ tracing::debug!("Indexed card: {}", at_uri);
156156+ Ok(())
157157+ }
158158+159159+ /// Index a note record with full content.
160160+ async fn index_note(
161161+ &self, did: &str, rkey: &str, rev: &str, record: &Value,
162162+ ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
163163+ let at_uri = format!("at://{}/app.malfestio.note/{}", did, rkey);
164164+ let note: NoteRecord = serde_json::from_value(record.clone())?;
165165+ let created_at = parse_record_datetime(¬e.created_at);
166166+ let visibility = note.visibility.unwrap_or_else(|| "public".to_string());
167167+168168+ let client = self.pool.get().await?;
169169+170170+ client
171171+ .execute(
172172+ "INSERT INTO indexed_notes (at_uri, did, rkey, title, body, tags, visibility, record_created_at, indexed_at)
173173+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW())
174174+ ON CONFLICT (at_uri) DO UPDATE SET
175175+ title = $4, body = $5, tags = $6, visibility = $7,
176176+ record_created_at = $8, indexed_at = NOW(), deleted_at = NULL",
177177+ &[
178178+ &at_uri,
179179+ &did,
180180+ &rkey,
181181+ ¬e.title,
182182+ ¬e.body,
183183+ ¬e.tags,
184184+ &visibility,
185185+ &created_at,
186186+ ],
187187+ )
188188+ .await?;
189189+190190+ self.update_repo_state(did, rev).await?;
191191+192192+ tracing::debug!("Indexed note: {}", at_uri);
193193+ Ok(())
194194+ }
195195+196196+ /// Handle deletion of a record (soft delete by setting deleted_at).
197197+ async fn handle_delete(
31198 &self, did: &str, collection: &str, rkey: &str,
32199 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
33200 let at_uri = format!("at://{}/{}/{}", did, collection, rkey);
201201+ let client = self.pool.get().await?;
342023535- let client = self.pool.get().await?;
203203+ let table = match collection {
204204+ "app.malfestio.deck" => "indexed_decks",
205205+ "app.malfestio.card" => "indexed_cards",
206206+ "app.malfestio.note" => "indexed_notes",
207207+ _ => return Ok(()),
208208+ };
209209+210210+ let query = format!(
211211+ "UPDATE {} SET deleted_at = NOW() WHERE at_uri = $1 AND deleted_at IS NULL",
212212+ table
213213+ );
214214+ client.execute(&query, &[&at_uri]).await?;
362153737- // Upsert into indexed_records table
216216+ tracing::info!("Soft-deleted record: {}", at_uri);
217217+ Ok(())
218218+ }
219219+220220+ /// Update the repo sync state with the latest processed revision.
221221+ async fn update_repo_state(&self, did: &str, rev: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
222222+ let client = self.pool.get().await?;
38223 client
39224 .execute(
4040- "INSERT INTO indexed_records (at_uri, did, collection, rkey, indexed_at)
4141- VALUES ($1, $2, $3, $4, NOW())
4242- ON CONFLICT (at_uri) DO UPDATE SET indexed_at = NOW()",
4343- &[&at_uri, &did, &collection, &rkey],
225225+ "INSERT INTO repo_sync_state (did, latest_rev, indexed_at)
226226+ VALUES ($1, $2, NOW())
227227+ ON CONFLICT (did) DO UPDATE SET latest_rev = $2, indexed_at = NOW()
228228+ WHERE repo_sync_state.latest_rev < $2 OR repo_sync_state.latest_rev IS NULL",
229229+ &[&did, &rev],
44230 )
45231 .await?;
4646-4747- tracing::debug!("Indexed record: {}", at_uri);
48232 Ok(())
49233 }
5023451235 /// Update cursor position in database for reconnection.
5252- #[allow(dead_code)]
53236 pub async fn save_cursor(&self, cursor_us: i64) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
54237 let client = self.pool.get().await?;
55238 client
···8526886269 async fn handle_event(&self, event: JetstreamEvent) -> Result<(), anyhow::Error> {
87270 match event {
8888- JetstreamEvent::Commit { did, commit, .. } => {
271271+ JetstreamEvent::Commit { did, time_us, commit, .. } => {
89272 let collection = &commit.collection;
9090-9191- // Only process our collections
92273 if !MALFESTIO_COLLECTIONS.iter().any(|c| collection == *c) {
93274 return Ok(());
94275 }
9527696277 let rkey = &commit.rkey;
278278+ let rev = &commit.rev;
279279+ let operation = &commit.operation;
972809898- tracing::info!("Received {} event: did={}, rkey={}", collection, did, rkey);
281281+ tracing::info!(
282282+ "Received {} {} event: did={}, rkey={}",
283283+ collection,
284284+ operation,
285285+ did,
286286+ rkey
287287+ );
288288+289289+ match operation.as_str() {
290290+ "create" | "update" => {
291291+ let result = match collection.as_str() {
292292+ "app.malfestio.deck" => self.index_deck(&did, rkey, rev, &commit.record).await,
293293+ "app.malfestio.card" => self.index_card(&did, rkey, rev, &commit.record).await,
294294+ "app.malfestio.note" => self.index_note(&did, rkey, rev, &commit.record).await,
295295+ _ => Ok(()),
296296+ };
99297100100- // Index the record
101101- if let Err(e) = self.index_record(&did, collection, rkey).await {
102102- tracing::warn!("Failed to index record: {}", e);
298298+ if let Err(e) = result {
299299+ tracing::warn!("Failed to index record: {}", e);
300300+ }
301301+ }
302302+ "delete" => {
303303+ if let Err(e) = self.handle_delete(&did, collection, rkey).await {
304304+ tracing::warn!("Failed to handle delete: {}", e);
305305+ }
306306+ }
307307+ _ => tracing::debug!("Unknown operation type: {}", operation),
308308+ }
309309+310310+ if let Err(e) = self.save_cursor(time_us as i64).await {
311311+ tracing::warn!("Failed to save cursor: {}", e);
103312 }
104313 }
105105- JetstreamEvent::Identity { .. } | JetstreamEvent::Account { .. } | JetstreamEvent::Delete { .. } => {
106106- // Ignore identity, account, and delete events
314314+ JetstreamEvent::Delete { did, commit, .. } => {
315315+ let collection = &commit.collection;
316316+317317+ if MALFESTIO_COLLECTIONS.iter().any(|c| collection == *c) {
318318+ let rkey = &commit.rkey;
319319+ tracing::info!(
320320+ "Received delete event: did={}, collection={}, rkey={}",
321321+ did,
322322+ collection,
323323+ rkey
324324+ );
325325+326326+ if let Err(e) = self.handle_delete(&did, collection, rkey).await {
327327+ tracing::warn!("Failed to handle delete: {}", e);
328328+ }
329329+ }
107330 }
331331+ JetstreamEvent::Identity { .. } | JetstreamEvent::Account { .. } => (),
108332 }
109333 Ok(())
110334 }
···139363140364 let handler = MalfestioEventHandler::new(pool);
141365142142- // Build consumer config
143366 let task_config = ConsumerTaskConfig {
144367 user_agent: "malfestio-indexer/0.1.0".to_string(),
145368 compression: config.compress,
···177400178401#[cfg(test)]
179402mod tests {
403403+ use chrono::Datelike;
404404+180405 use super::*;
181406182407 #[test]
···192417 assert!(MALFESTIO_COLLECTIONS.contains(&"app.malfestio.deck"));
193418 assert!(MALFESTIO_COLLECTIONS.contains(&"app.malfestio.card"));
194419 assert!(MALFESTIO_COLLECTIONS.contains(&"app.malfestio.note"));
420420+ }
421421+422422+ #[test]
423423+ fn test_parse_deck_record() {
424424+ let json = serde_json::json!({
425425+ "title": "Test Deck",
426426+ "description": "A test deck",
427427+ "tags": ["rust", "learning"],
428428+ "cardRefs": ["at://did:plc:abc/app.malfestio.card/123"],
429429+ "sourceRefs": [],
430430+ "license": "CC-BY-4.0",
431431+ "createdAt": "2024-01-01T00:00:00Z"
432432+ });
433433+434434+ let deck: DeckRecord = serde_json::from_value(json).unwrap();
435435+ assert_eq!(deck.title, "Test Deck");
436436+ assert_eq!(deck.description, Some("A test deck".to_string()));
437437+ assert_eq!(deck.tags, vec!["rust", "learning"]);
438438+ assert_eq!(deck.card_refs.len(), 1);
439439+ assert_eq!(deck.license, Some("CC-BY-4.0".to_string()));
440440+ }
441441+442442+ #[test]
443443+ fn test_parse_card_record() {
444444+ let json = serde_json::json!({
445445+ "deckRef": "at://did:plc:abc/app.malfestio.deck/123",
446446+ "front": "What is Rust?",
447447+ "back": "A systems programming language",
448448+ "cardType": "basic",
449449+ "hints": ["Think about memory safety"],
450450+ "createdAt": "2024-01-01T00:00:00Z"
451451+ });
452452+453453+ let card: CardRecord = serde_json::from_value(json).unwrap();
454454+ assert_eq!(card.deck_ref, "at://did:plc:abc/app.malfestio.deck/123");
455455+ assert_eq!(card.front, "What is Rust?");
456456+ assert_eq!(card.back, "A systems programming language");
457457+ assert_eq!(card.card_type, Some("basic".to_string()));
458458+ assert_eq!(card.hints, vec!["Think about memory safety"]);
459459+ }
460460+461461+ #[test]
462462+ fn test_parse_note_record() {
463463+ let json = serde_json::json!({
464464+ "title": "Study Notes",
465465+ "body": "# Chapter 1\n\nSome content here.",
466466+ "tags": ["chapter1"],
467467+ "visibility": "public",
468468+ "createdAt": "2024-01-01T00:00:00Z"
469469+ });
470470+471471+ let note: NoteRecord = serde_json::from_value(json).unwrap();
472472+ assert_eq!(note.title, "Study Notes");
473473+ assert_eq!(note.body, "# Chapter 1\n\nSome content here.");
474474+ assert_eq!(note.tags, vec!["chapter1"]);
475475+ assert_eq!(note.visibility, Some("public".to_string()));
476476+ }
477477+478478+ #[test]
479479+ fn test_parse_record_datetime() {
480480+ let dt = parse_record_datetime("2024-01-15T10:30:00Z");
481481+ assert_eq!(dt.year(), 2024);
482482+ assert_eq!(dt.month(), 1);
483483+ assert_eq!(dt.day(), 15);
484484+ }
485485+486486+ #[test]
487487+ fn test_parse_record_datetime_invalid() {
488488+ let dt = parse_record_datetime("invalid");
489489+ assert!(dt.year() >= 2024);
195490 }
196491}
+60-8
docs/at-notes.md
···48484949## Firehose / Jetstream
50505151+### Overview
5252+5353+The AT Protocol provides two main options for consuming real-time repository events:
5454+5555+1. **Raw Firehose** (`com.atproto.sync.subscribeRepos`) - Full-fidelity, CBOR-encoded, cryptographically signed
5656+2. **Jetstream** - Simplified JSON format, lower bandwidth, easier to consume
5757+5158### Raw Firehose
52595360- **WebSocket**: Subscribe to `com.atproto.sync.subscribeRepos` from a Relay
5454-- **CBOR Decoding**: Parse incoming events
5555-- **Cursor Management**: Track position for reconnection
6161+- **CBOR Decoding**: Parse CAR files containing MST blocks
6262+- **Cryptographic Verification**: Validate commit signatures against DID signing keys
6363+- **Cursor Management**: Track `seq` position for reliable reconnection
56645757-### Jetstream (Recommended)
6565+**Event Types:**
58665959-Bluesky's simplified JSON firehose:
6767+- `#commit` - Repository changes (record create/update/delete)
6868+- `#identity` - DID/handle updates
6969+- `#account` - Account status changes (active, deactivated, etc.)
60706161-- JSON format (no CBOR decoding)
6262-- Reduced bandwidth (zstd compression)
6363-- Collection/repo filtering at source
6464-- Simpler reconnection with cursors
7171+### Jetstream (Simplified)
7272+7373+Bluesky's simplified JSON firehose - ideal for indexing and discovery:
7474+7575+- **JSON format**: No CBOR decoding required
7676+- **zstd compression**: Reduced bandwidth (enable with `compress=true`)
7777+- **Collection filtering**: Subscribe to specific NSIDs
7878+- **DID filtering**: Watch specific accounts
7979+- **Cursor-based reconnection**: Microsecond timestamps
8080+8181+**Public Endpoints:**
8282+8383+- `wss://jetstream1.us-east.bsky.network/subscribe`
8484+- `wss://jetstream2.us-west.bsky.network/subscribe`
8585+8686+**Tradeoffs:**
8787+8888+- ⚠️ Events are NOT cryptographically signed (trust the Jetstream operator)
8989+- ⚠️ Not self-authenticating data
9090+- ✅ Much simpler to implement
9191+- ✅ Lower bandwidth and compute requirements
9292+9393+### Reliable Synchronization
9494+9595+**Cursor Tracking:**
9696+9797+- Store cursor position (microsecond timestamp) per endpoint
9898+- Resume from last processed cursor on reconnect
9999+- Handle gaps by fetching missing commits via `getRepo` if needed
100100+101101+**Per-Repo Revision Tracking:**
102102+103103+- Track latest `rev` (TID) for each DID
104104+- Compare incoming `rev` against stored value to detect gaps
105105+- Use `since` field to detect out-of-order events
106106+107107+**Deletion Handling:**
108108+109109+- Handle `operation: "delete"` in commit events
110110+- Mark records as deleted (soft or hard delete)
111111+112112+**Best Practices:**
113113+114114+- Process events sequentially per-DID (partition by DID)
115115+- Ignore events with `rev` ≤ stored latest rev
116116+- Validate records against Lexicon schema before indexing
6511766118## Well-Known Endpoints
67119
+3-3
docs/todo.md
···38383939**Firehose Enhancement:**
40404141-- [ ] Upgrade firehose consumer to store full record content (not just metadata)
4242-- [ ] Add `indexed_decks`, `indexed_cards`, `indexed_notes` tables for remote content
4343-- [ ] Track latest processed revision per repo; handle deletions
4141+- [x] Upgrade firehose consumer to store full record content (not just metadata)
4242+- [x] Add `indexed_decks`, `indexed_cards`, `indexed_notes` tables for remote content
4343+- [x] Track latest processed revision per repo; handle deletions
44444545**Search & Discovery:**
4646
+75
migrations/009_2025_12_31_indexed_records.sql
···11+-- Migration: Indexed tables for AT Protocol Firehose/Jetstream consumption
22+33+CREATE TABLE repo_sync_state (
44+ did TEXT PRIMARY KEY,
55+ latest_rev TEXT NOT NULL, -- TID of last processed commit
66+ indexed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
77+);
88+99+CREATE INDEX idx_repo_sync_state_indexed_at ON repo_sync_state(indexed_at);
1010+1111+-- Indexed decks from remote users
1212+CREATE TABLE indexed_decks (
1313+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
1414+ at_uri TEXT NOT NULL UNIQUE,
1515+ did TEXT NOT NULL,
1616+ rkey TEXT NOT NULL,
1717+ -- Full record content (denormalized for query performance)
1818+ title TEXT NOT NULL,
1919+ description TEXT,
2020+ tags TEXT[] DEFAULT '{}',
2121+ card_refs TEXT[] DEFAULT '{}', -- AT-URIs to cards in this deck
2222+ source_refs TEXT[] DEFAULT '{}', -- AT-URIs to source materials
2323+ license TEXT,
2424+ record_created_at TIMESTAMPTZ NOT NULL, -- createdAt from the record
2525+ indexed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
2626+ deleted_at TIMESTAMPTZ -- Soft delete for tombstones
2727+);
2828+2929+CREATE INDEX idx_indexed_decks_did ON indexed_decks(did);
3030+CREATE INDEX idx_indexed_decks_at_uri ON indexed_decks(at_uri);
3131+CREATE INDEX idx_indexed_decks_indexed_at ON indexed_decks(indexed_at);
3232+CREATE INDEX idx_indexed_decks_tags ON indexed_decks USING GIN(tags);
3333+CREATE INDEX idx_indexed_decks_deleted ON indexed_decks(deleted_at) WHERE deleted_at IS NULL;
3434+3535+-- Indexed cards from remote users
3636+CREATE TABLE indexed_cards (
3737+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
3838+ at_uri TEXT NOT NULL UNIQUE,
3939+ did TEXT NOT NULL,
4040+ rkey TEXT NOT NULL,
4141+ deck_ref TEXT NOT NULL, -- AT-URI to parent deck
4242+ front TEXT NOT NULL,
4343+ back TEXT NOT NULL,
4444+ card_type TEXT DEFAULT 'basic',
4545+ hints TEXT[] DEFAULT '{}',
4646+ record_created_at TIMESTAMPTZ NOT NULL,
4747+ indexed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
4848+ deleted_at TIMESTAMPTZ
4949+);
5050+5151+CREATE INDEX idx_indexed_cards_did ON indexed_cards(did);
5252+CREATE INDEX idx_indexed_cards_at_uri ON indexed_cards(at_uri);
5353+CREATE INDEX idx_indexed_cards_deck_ref ON indexed_cards(deck_ref);
5454+CREATE INDEX idx_indexed_cards_deleted ON indexed_cards(deleted_at) WHERE deleted_at IS NULL;
5555+5656+-- Indexed notes from remote users
5757+CREATE TABLE indexed_notes (
5858+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
5959+ at_uri TEXT NOT NULL UNIQUE,
6060+ did TEXT NOT NULL,
6161+ rkey TEXT NOT NULL,
6262+ title TEXT NOT NULL,
6363+ body TEXT NOT NULL,
6464+ tags TEXT[] DEFAULT '{}',
6565+ visibility TEXT DEFAULT 'public',
6666+ record_created_at TIMESTAMPTZ NOT NULL,
6767+ indexed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
6868+ deleted_at TIMESTAMPTZ
6969+);
7070+7171+CREATE INDEX idx_indexed_notes_did ON indexed_notes(did);
7272+CREATE INDEX idx_indexed_notes_at_uri ON indexed_notes(at_uri);
7373+CREATE INDEX idx_indexed_notes_tags ON indexed_notes USING GIN(tags);
7474+CREATE INDEX idx_indexed_notes_visibility ON indexed_notes(visibility);
7575+CREATE INDEX idx_indexed_notes_deleted ON indexed_notes(deleted_at) WHERE deleted_at IS NULL;