An easy-to-host PDS on the ATProtocol, MacOS. Grandma-approved.

feat(db): add db module with pool factory, migration runner, and unit tests

authored by malpercio.dev and committed by

Tangled 032781fe 922a952b

+211
+211
crates/relay/src/db/mod.rs
··· 1 + // pattern: Imperative Shell 2 + use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; 3 + use sqlx::SqlitePool; 4 + use std::str::FromStr; 5 + 6 + /// Errors from database pool creation or migration execution. 7 + #[derive(Debug, thiserror::Error)] 8 + pub enum DbError { 9 + #[error("failed to open database pool: {0}")] 10 + Pool(#[from] sqlx::Error), 11 + /// Errors in migration infrastructure (bootstrap table, transaction control). 12 + /// Distinct from `Migration` so operators know there is no version 0 to look for. 13 + #[error("failed to initialize migration infrastructure: {0}")] 14 + Setup(sqlx::Error), 15 + #[error("migration v{version} failed: {source}")] 16 + Migration { version: u32, source: sqlx::Error }, 17 + } 18 + 19 + struct Migration { 20 + version: u32, 21 + sql: &'static str, 22 + } 23 + 24 + static MIGRATIONS: &[Migration] = &[Migration { 25 + version: 1, 26 + sql: include_str!("migrations/V001__init.sql"), 27 + }]; 28 + 29 + /// Open a WAL-mode SQLite connection pool with a maximum of 1 connection. 30 + /// 31 + /// Accepts any sqlx URL string (e.g. `"sqlite:relay.db"`, `"sqlite::memory:"`). 32 + /// `create_if_missing` is enabled so the file is created on first run. 33 + /// WAL journal mode is set via `SqliteConnectOptions` — not a raw PRAGMA — so 34 + /// sqlx tracks the mode across the connection lifecycle. 35 + pub async fn open_pool(url: &str) -> Result<SqlitePool, DbError> { 36 + let opts = SqliteConnectOptions::from_str(url)? 37 + .create_if_missing(true) 38 + .journal_mode(SqliteJournalMode::Wal); 39 + 40 + SqlitePoolOptions::new() 41 + .max_connections(1) 42 + .connect_with(opts) 43 + .await 44 + .map_err(DbError::Pool) 45 + } 46 + 47 + /// Apply any pending migrations from `MIGRATIONS` to the given pool. 48 + /// 49 + /// Creates `schema_migrations` if it does not exist, reads which versions 50 + /// are already recorded, then applies all pending migrations in a single 51 + /// transaction and records each applied version. 52 + pub async fn run_migrations(pool: &SqlitePool) -> Result<(), DbError> { 53 + // Bootstrap the tracking table before any migration SQL runs. 54 + sqlx::query( 55 + "CREATE TABLE IF NOT EXISTS schema_migrations ( 56 + version INTEGER PRIMARY KEY, 57 + applied_at TEXT NOT NULL 58 + ) WITHOUT ROWID", 59 + ) 60 + .execute(pool) 61 + .await 62 + .map_err(DbError::Setup)?; 63 + 64 + // Fetch already-applied versions. 65 + let applied: Vec<(i32,)> = sqlx::query_as("SELECT version FROM schema_migrations") 66 + .fetch_all(pool) 67 + .await 68 + .map_err(DbError::Setup)?; 69 + let applied_set: std::collections::HashSet<u32> = 70 + applied.into_iter().map(|(v,)| v as u32).collect(); 71 + 72 + // Collect pending migrations in order. 73 + let pending: Vec<&Migration> = MIGRATIONS 74 + .iter() 75 + .filter(|m| !applied_set.contains(&m.version)) 76 + .collect(); 77 + 78 + if pending.is_empty() { 79 + return Ok(()); 80 + } 81 + 82 + // Apply all pending migrations in one transaction. 83 + let mut tx = pool.begin().await.map_err(DbError::Setup)?; 84 + 85 + for migration in pending { 86 + // Use raw_sql (not query) so multi-statement SQL files execute fully. 87 + sqlx::raw_sql(migration.sql) 88 + .execute(&mut *tx) 89 + .await 90 + .map_err(|e| DbError::Migration { 91 + version: migration.version, 92 + source: e, 93 + })?; 94 + 95 + sqlx::query( 96 + "INSERT INTO schema_migrations (version, applied_at) VALUES (?, datetime('now'))", 97 + ) 98 + .bind(migration.version as i32) 99 + .execute(&mut *tx) 100 + .await 101 + .map_err(|e| DbError::Migration { 102 + version: migration.version, 103 + source: e, 104 + })?; 105 + } 106 + 107 + tx.commit().await.map_err(DbError::Setup)?; 108 + 109 + Ok(()) 110 + } 111 + 112 + #[cfg(test)] 113 + mod tests { 114 + use super::*; 115 + 116 + /// Open a fresh in-memory pool for each test. 117 + /// Uses "sqlite::memory:" — no files created on disk. 118 + async fn in_memory_pool() -> SqlitePool { 119 + open_pool("sqlite::memory:") 120 + .await 121 + .expect("failed to open in-memory pool") 122 + } 123 + 124 + #[tokio::test] 125 + async fn select_one_succeeds() { 126 + let pool = in_memory_pool().await; 127 + let (n,): (i64,) = sqlx::query_as("SELECT 1").fetch_one(&pool).await.unwrap(); 128 + assert_eq!(n, 1); 129 + } 130 + 131 + #[tokio::test] 132 + async fn migrations_apply_on_first_run() { 133 + let pool = in_memory_pool().await; 134 + run_migrations(&pool).await.unwrap(); 135 + 136 + let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM schema_migrations") 137 + .fetch_one(&pool) 138 + .await 139 + .unwrap(); 140 + assert_eq!(count, 1); 141 + } 142 + 143 + /// MM-72.AC2.1: Running migrations twice leaves only one row in schema_migrations. 144 + #[tokio::test] 145 + async fn migrations_are_idempotent() { 146 + let pool = in_memory_pool().await; 147 + run_migrations(&pool).await.unwrap(); 148 + run_migrations(&pool).await.unwrap(); // second call — must be a no-op 149 + 150 + let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM schema_migrations") 151 + .fetch_one(&pool) 152 + .await 153 + .unwrap(); 154 + assert_eq!( 155 + count, 1, 156 + "second run must not insert a duplicate migration row" 157 + ); 158 + } 159 + 160 + /// MM-72.AC2.2: schema_migrations records version=1 with a non-null applied_at. 161 + #[tokio::test] 162 + async fn schema_migrations_records_version_and_timestamp() { 163 + let pool = in_memory_pool().await; 164 + run_migrations(&pool).await.unwrap(); 165 + 166 + let (version, applied_at): (i64, String) = 167 + sqlx::query_as("SELECT version, applied_at FROM schema_migrations WHERE version = 1") 168 + .fetch_one(&pool) 169 + .await 170 + .unwrap(); 171 + 172 + assert_eq!(version, 1); 173 + assert!(!applied_at.is_empty(), "applied_at must be non-empty"); 174 + } 175 + 176 + #[tokio::test] 177 + async fn server_metadata_table_exists_and_accepts_inserts() { 178 + let pool = in_memory_pool().await; 179 + run_migrations(&pool).await.unwrap(); 180 + 181 + sqlx::query("INSERT INTO server_metadata (key, value) VALUES ('test_key', 'test_value')") 182 + .execute(&pool) 183 + .await 184 + .unwrap(); 185 + 186 + let (value,): (String,) = 187 + sqlx::query_as("SELECT value FROM server_metadata WHERE key = 'test_key'") 188 + .fetch_one(&pool) 189 + .await 190 + .unwrap(); 191 + assert_eq!(value, "test_value"); 192 + } 193 + 194 + /// MM-72.AC4.1: WAL mode requires a real file — use tempfile here, not :memory:. 195 + /// In-memory SQLite reports journal_mode = "memory", not "wal". 196 + #[tokio::test] 197 + async fn wal_mode_enabled_on_file_pool() { 198 + let dir = tempfile::tempdir().unwrap(); 199 + let db_path = dir.path().join("test_wal.db"); 200 + let url = format!("sqlite:{}", db_path.display()); 201 + 202 + let pool = open_pool(&url).await.unwrap(); 203 + 204 + let (mode,): (String,) = sqlx::query_as("PRAGMA journal_mode") 205 + .fetch_one(&pool) 206 + .await 207 + .unwrap(); 208 + 209 + assert_eq!(mode, "wal", "pool must use WAL journal mode"); 210 + } 211 + }