···11+DO $$
22+BEGIN
33+ IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'users' AND column_name = 'email_confirmed') THEN
44+ ALTER TABLE users RENAME COLUMN email_confirmed TO email_verified;
55+ END IF;
66+END $$;
···11+DO $$
22+BEGIN
33+ IF EXISTS (SELECT 1 FROM pg_type WHERE typname = 'notification_channel') THEN
44+ ALTER TYPE notification_channel RENAME TO comms_channel;
55+ END IF;
66+ IF EXISTS (SELECT 1 FROM pg_type WHERE typname = 'notification_status') THEN
77+ ALTER TYPE notification_status RENAME TO comms_status;
88+ END IF;
99+ IF EXISTS (SELECT 1 FROM pg_type WHERE typname = 'notification_type') THEN
1010+ ALTER TYPE notification_type RENAME TO comms_type;
1111+ END IF;
1212+ IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = 'notification_queue') THEN
1313+ ALTER TABLE notification_queue RENAME TO comms_queue;
1414+ END IF;
1515+ IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'comms_queue' AND column_name = 'notification_type') THEN
1616+ ALTER TABLE comms_queue RENAME COLUMN notification_type TO comms_type;
1717+ END IF;
1818+ IF EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_notification_queue_status_scheduled') THEN
1919+ ALTER INDEX idx_notification_queue_status_scheduled RENAME TO idx_comms_queue_status_scheduled;
2020+ END IF;
2121+ IF EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_notification_queue_user_id') THEN
2222+ ALTER INDEX idx_notification_queue_user_id RENAME TO idx_comms_queue_user_id;
2323+ END IF;
2424+ IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'users' AND column_name = 'preferred_notification_channel') THEN
2525+ ALTER TABLE users RENAME COLUMN preferred_notification_channel TO preferred_comms_channel;
2626+ END IF;
2727+END $$;
+3-3
src/api/admin/account/email.rs
···8787 .subject
8888 .clone()
8989 .unwrap_or_else(|| format!("Message from {}", hostname));
9090- let notification = crate::notifications::NewNotification::email(
9090+ let item = crate::comms::NewComms::email(
9191 user_id,
9292- crate::notifications::NotificationType::AdminEmail,
9292+ crate::comms::CommsType::AdminEmail,
9393 email,
9494 subject,
9595 content.to_string(),
9696 );
9797- let result = crate::notifications::enqueue_notification(&state.db, notification).await;
9797+ let result = crate::comms::enqueue_comms(&state.db, item).await;
9898 match result {
9999 Ok(_) => {
100100 tracing::info!("Admin email queued for {} ({})", handle, recipient_did);
···6868 let record = match sqlx::query!(
6969 r#"
7070 SELECT code, pending_identifier, expires_at FROM channel_verifications
7171- WHERE user_id = $1 AND channel = $2::notification_channel
7171+ WHERE user_id = $1 AND channel = $2::comms_channel
7272 "#,
7373 user_id,
7474 channel_str as _
···163163 }
164164165165 if let Err(e) = sqlx::query!(
166166- "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = $2::notification_channel",
166166+ "DELETE FROM channel_verifications WHERE user_id = $1 AND channel = $2::comms_channel",
167167 user_id,
168168 channel_str as _
169169 )
···55pub mod circuit_breaker;
66pub mod config;
77pub mod crawlers;
88+pub mod handle;
89pub mod image;
910pub mod metrics;
1010-pub mod notifications;
1111+pub mod comms;
1112pub mod oauth;
1213pub mod plc;
1314pub mod rate_limit;
+13-15
src/main.rs
···11+use bspds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender};
12use bspds::crawlers::{Crawlers, start_crawlers_service};
22-use bspds::notifications::{
33- DiscordSender, EmailSender, NotificationService, SignalSender, TelegramSender,
44-};
53use bspds::state::AppState;
64use std::net::SocketAddr;
75use std::process::ExitCode;
···68666967 let (shutdown_tx, shutdown_rx) = watch::channel(false);
70687171- let mut notification_service = NotificationService::new(pool);
6969+ let mut comms_service = CommsService::new(pool);
72707371 if let Some(email_sender) = EmailSender::from_env() {
7474- info!("Email notifications enabled");
7575- notification_service = notification_service.register_sender(email_sender);
7272+ info!("Email comms enabled");
7373+ comms_service = comms_service.register_sender(email_sender);
7674 } else {
7777- warn!("Email notifications disabled (MAIL_FROM_ADDRESS not set)");
7575+ warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)");
7876 }
79778078 if let Some(discord_sender) = DiscordSender::from_env() {
8181- info!("Discord notifications enabled");
8282- notification_service = notification_service.register_sender(discord_sender);
7979+ info!("Discord comms enabled");
8080+ comms_service = comms_service.register_sender(discord_sender);
8381 }
84828583 if let Some(telegram_sender) = TelegramSender::from_env() {
8686- info!("Telegram notifications enabled");
8787- notification_service = notification_service.register_sender(telegram_sender);
8484+ info!("Telegram comms enabled");
8585+ comms_service = comms_service.register_sender(telegram_sender);
8886 }
89879088 if let Some(signal_sender) = SignalSender::from_env() {
9191- info!("Signal notifications enabled");
9292- notification_service = notification_service.register_sender(signal_sender);
8989+ info!("Signal comms enabled");
9090+ comms_service = comms_service.register_sender(signal_sender);
9391 }
94929595- let notification_handle = tokio::spawn(notification_service.run(shutdown_rx.clone()));
9393+ let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone()));
96949795 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() {
9896 let crawlers = Arc::new(
···122120 .with_graceful_shutdown(shutdown_signal(shutdown_tx))
123121 .await;
124122125125- notification_handle.await.ok();
123123+ comms_handle.await.ok();
126124127125 if let Some(handle) = crawlers_handle {
128126 handle.await.ok();
+4-4
src/metrics.rs
···5454 "Total number of S3/blob storage operations"
5555 );
5656 metrics::describe_gauge!(
5757- "bspds_notification_queue_size",
5858- "Current size of the notification queue"
5757+ "bspds_comms_queue_size",
5858+ "Current size of the comms queue"
5959 );
6060 metrics::describe_counter!(
6161 "bspds_rate_limit_rejections_total",
···167167 .increment(1);
168168}
169169170170-pub fn set_notification_queue_size(size: usize) {
171171- gauge!("bspds_notification_queue_size").set(size as f64);
170170+pub fn set_comms_queue_size(size: usize) {
171171+ gauge!("bspds_comms_queue_size").set(size as f64);
172172}
173173174174pub fn record_rate_limit_rejection(limiter: &str) {
···11mod common;
22use common::{base_url, client, create_account_and_login, get_db_connection_string};
33-use bspds::notifications::{NewNotification, NotificationType, enqueue_notification};
33+use bspds::comms::{NewComms, CommsType, enqueue_comms};
44use serde_json::{Value, json};
55use sqlx::PgPool;
66···2626 .expect("User not found");
27272828 for i in 0..3 {
2929- let notification = NewNotification::email(
2929+ let comms = NewComms::email(
3030 user_id,
3131- NotificationType::Welcome,
3131+ CommsType::Welcome,
3232 "test@example.com".to_string(),
3333 format!("Subject {}", i),
3434 format!("Body {}", i),
3535 );
3636- enqueue_notification(&pool, notification).await.expect("Failed to enqueue");
3636+ enqueue_comms(&pool, comms).await.expect("Failed to enqueue");
3737 }
38383939 let resp = client
+2-2
tests/admin_email.rs
···3939 .await
4040 .expect("User not found");
4141 let notification = sqlx::query!(
4242- "SELECT subject, body, notification_type as \"notification_type: String\" FROM notification_queue WHERE user_id = $1 AND notification_type = 'admin_email' ORDER BY created_at DESC LIMIT 1",
4242+ "SELECT subject, body, comms_type as \"comms_type: String\" FROM comms_queue WHERE user_id = $1 AND comms_type = 'admin_email' ORDER BY created_at DESC LIMIT 1",
4343 user.id
4444 )
4545 .fetch_one(&pool)
···7878 .await
7979 .expect("User not found");
8080 let notification = sqlx::query!(
8181- "SELECT subject FROM notification_queue WHERE user_id = $1 AND notification_type = 'admin_email' AND body = 'Email without subject' LIMIT 1",
8181+ "SELECT subject FROM comms_queue WHERE user_id = $1 AND comms_type = 'admin_email' AND body = 'Email without subject' LIMIT 1",
8282 user.id
8383 )
8484 .fetch_one(&pool)
+29-30
tests/notifications.rs
···11mod common;
22-use bspds::notifications::{
33- NewNotification, NotificationChannel, NotificationStatus, NotificationType,
44- enqueue_notification, enqueue_welcome,
22+use bspds::comms::{
33+ CommsChannel, CommsStatus, CommsType, NewComms, enqueue_comms, enqueue_welcome,
54};
65use sqlx::PgPool;
76···1514}
16151716#[tokio::test]
1818-async fn test_enqueue_notification() {
1717+async fn test_enqueue_comms() {
1918 let pool = get_pool().await;
2019 let (_, did) = common::create_account_and_login(&common::client()).await;
2120 let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
2221 .fetch_one(&pool)
2322 .await
2423 .expect("User not found");
2525- let notification = NewNotification::email(
2424+ let item = NewComms::email(
2625 user_id,
2727- NotificationType::Welcome,
2626+ CommsType::Welcome,
2827 "test@example.com".to_string(),
2928 "Test Subject".to_string(),
3029 "Test body".to_string(),
3130 );
3232- let notification_id = enqueue_notification(&pool, notification)
3131+ let comms_id = enqueue_comms(&pool, item)
3332 .await
3434- .expect("Failed to enqueue notification");
3333+ .expect("Failed to enqueue comms");
3534 let row = sqlx::query!(
3635 r#"
3736 SELECT
3837 id, user_id, recipient, subject, body,
3939- channel as "channel: NotificationChannel",
4040- notification_type as "notification_type: NotificationType",
4141- status as "status: NotificationStatus"
4242- FROM notification_queue
3838+ channel as "channel: CommsChannel",
3939+ comms_type as "comms_type: CommsType",
4040+ status as "status: CommsStatus"
4141+ FROM comms_queue
4342 WHERE id = $1
4443 "#,
4545- notification_id
4444+ comms_id
4645 )
4746 .fetch_one(&pool)
4847 .await
4949- .expect("Notification not found");
4848+ .expect("Comms not found");
5049 assert_eq!(row.user_id, user_id);
5150 assert_eq!(row.recipient, "test@example.com");
5251 assert_eq!(row.subject.as_deref(), Some("Test Subject"));
5352 assert_eq!(row.body, "Test body");
5454- assert_eq!(row.channel, NotificationChannel::Email);
5555- assert_eq!(row.notification_type, NotificationType::Welcome);
5656- assert_eq!(row.status, NotificationStatus::Pending);
5353+ assert_eq!(row.channel, CommsChannel::Email);
5454+ assert_eq!(row.comms_type, CommsType::Welcome);
5555+ assert_eq!(row.status, CommsStatus::Pending);
5756}
58575958#[tokio::test]
···6463 .fetch_one(&pool)
6564 .await
6665 .expect("User not found");
6767- let notification_id = enqueue_welcome(&pool, user_row.id, "example.com")
6666+ let comms_id = enqueue_welcome(&pool, user_row.id, "example.com")
6867 .await
6969- .expect("Failed to enqueue welcome notification");
6868+ .expect("Failed to enqueue welcome comms");
7069 let row = sqlx::query!(
7170 r#"
7271 SELECT
7372 recipient, subject, body,
7474- notification_type as "notification_type: NotificationType"
7575- FROM notification_queue
7373+ comms_type as "comms_type: CommsType"
7474+ FROM comms_queue
7675 WHERE id = $1
7776 "#,
7878- notification_id
7777+ comms_id
7978 )
8079 .fetch_one(&pool)
8180 .await
8282- .expect("Notification not found");
8181+ .expect("Comms not found");
8382 assert_eq!(Some(row.recipient), user_row.email);
8483 assert_eq!(row.subject.as_deref(), Some("Welcome to example.com"));
8584 assert!(row.body.contains(&format!("@{}", user_row.handle)));
8686- assert_eq!(row.notification_type, NotificationType::Welcome);
8585+ assert_eq!(row.comms_type, CommsType::Welcome);
8786}
88878988#[tokio::test]
9090-async fn test_notification_queue_status_index() {
8989+async fn test_comms_queue_status_index() {
9190 let pool = get_pool().await;
9291 let (_, did) = common::create_account_and_login(&common::client()).await;
9392 let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
···9594 .await
9695 .expect("User not found");
9796 let initial_count: i64 = sqlx::query_scalar!(
9898- "SELECT COUNT(*) FROM notification_queue WHERE status = 'pending' AND user_id = $1",
9797+ "SELECT COUNT(*) FROM comms_queue WHERE status = 'pending' AND user_id = $1",
9998 user_id
10099 )
101100 .fetch_one(&pool)
···103102 .expect("Failed to count")
104103 .unwrap_or(0);
105104 for i in 0..5 {
106106- let notification = NewNotification::email(
105105+ let item = NewComms::email(
107106 user_id,
108108- NotificationType::PasswordReset,
107107+ CommsType::PasswordReset,
109108 format!("test{}@example.com", i),
110109 "Test".to_string(),
111110 "Body".to_string(),
112111 );
113113- enqueue_notification(&pool, notification)
112112+ enqueue_comms(&pool, item)
114113 .await
115114 .expect("Failed to enqueue");
116115 }
117116 let final_count: i64 = sqlx::query_scalar!(
118118- "SELECT COUNT(*) FROM notification_queue WHERE status = 'pending' AND user_id = $1",
117117+ "SELECT COUNT(*) FROM comms_queue WHERE status = 'pending' AND user_id = $1",
119118 user_id
120119 )
121120 .fetch_one(&pool)
+9-2
tests/oauth.rs
···22mod helpers;
33use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
44use chrono::Utc;
55-use common::{base_url, client, create_account_and_login, get_db_connection_string};
55+use common::{base_url, client, get_db_connection_string};
66+use helpers::verify_new_account;
67use reqwest::{StatusCode, redirect};
78use serde_json::{Value, json};
89use sha2::{Digest, Sha256};
···124125 assert_eq!(create_res.status(), StatusCode::OK);
125126 let account: Value = create_res.json().await.unwrap();
126127 let user_did = account["did"].as_str().unwrap();
128128+ verify_new_account(&http_client, user_did).await;
127129 let redirect_uri = "https://example.com/oauth/callback";
128130 let mock_client = setup_mock_client_metadata(redirect_uri).await;
129131 let client_id = mock_client.uri();
···261263 assert_eq!(create_res.status(), StatusCode::OK);
262264 let account: Value = create_res.json().await.unwrap();
263265 let user_did = account["did"].as_str().unwrap();
266266+ verify_new_account(&http_client, user_did).await;
264267 let db_url = get_db_connection_string().await;
265268 let pool = sqlx::postgres::PgPoolOptions::new().max_connections(1).connect(&db_url).await.unwrap();
266269 sqlx::query("UPDATE users SET two_factor_enabled = true WHERE did = $1")
···324327 .send().await.unwrap();
325328 let account: Value = create_res.json().await.unwrap();
326329 let user_did = account["did"].as_str().unwrap();
330330+ verify_new_account(&http_client, user_did).await;
327331 let db_url = get_db_connection_string().await;
328332 let pool = sqlx::postgres::PgPoolOptions::new().max_connections(1).connect(&db_url).await.unwrap();
329333 sqlx::query("UPDATE users SET two_factor_enabled = true WHERE did = $1")
···375379 .send().await.unwrap();
376380 let account: Value = create_res.json().await.unwrap();
377381 let user_did = account["did"].as_str().unwrap().to_string();
382382+ verify_new_account(&http_client, &user_did).await;
378383 let redirect_uri = "https://example.com/selector-2fa-callback";
379384 let mock_client = setup_mock_client_metadata(redirect_uri).await;
380385 let client_id = mock_client.uri();
···451456 let handle = format!("state-special-{}", ts);
452457 let email = format!("state-special-{}@example.com", ts);
453458 let password = "state-special-password";
454454- http_client
459459+ let create_res = http_client
455460 .post(format!("{}/xrpc/com.atproto.server.createAccount", url))
456461 .json(&json!({ "handle": handle, "email": email, "password": password }))
457462 .send().await.unwrap();
463463+ let account: Value = create_res.json().await.unwrap();
464464+ verify_new_account(&http_client, account["did"].as_str().unwrap()).await;
458465 let redirect_uri = "https://example.com/state-special-callback";
459466 let mock_client = setup_mock_client_metadata(redirect_uri).await;
460467 let client_id = mock_client.uri();
+13-4
tests/oauth_security.rs
···4545async fn get_oauth_tokens(http_client: &reqwest::Client, url: &str) -> (String, String, String) {
4646 let ts = Utc::now().timestamp_millis();
4747 let handle = format!("sec-test-{}", ts);
4848- http_client.post(format!("{}/xrpc/com.atproto.server.createAccount", url))
4848+ let create_res = http_client.post(format!("{}/xrpc/com.atproto.server.createAccount", url))
4949 .json(&json!({ "handle": handle, "email": format!("{}@example.com", handle), "password": "security-test-password" }))
5050 .send().await.unwrap();
5151+ let account: Value = create_res.json().await.unwrap();
5252+ let did = account["did"].as_str().unwrap();
5353+ verify_new_account(http_client, did).await;
5154 let redirect_uri = "https://example.com/sec-callback";
5255 let mock_client = setup_mock_client_metadata(redirect_uri).await;
5356 let client_id = mock_client.uri();
···129132 assert_eq!(res.status(), StatusCode::BAD_REQUEST, "Missing PKCE challenge should be rejected");
130133 let ts = Utc::now().timestamp_millis();
131134 let handle = format!("pkce-attack-{}", ts);
132132- http_client.post(format!("{}/xrpc/com.atproto.server.createAccount", url))
135135+ let create_res = http_client.post(format!("{}/xrpc/com.atproto.server.createAccount", url))
133136 .json(&json!({ "handle": handle, "email": format!("{}@example.com", handle), "password": "pkce-password" }))
134137 .send().await.unwrap();
138138+ let account: Value = create_res.json().await.unwrap();
139139+ verify_new_account(&http_client, account["did"].as_str().unwrap()).await;
135140 let (_, code_challenge) = generate_pkce();
136141 let (attacker_verifier, _) = generate_pkce();
137142 let par_body: Value = http_client.post(format!("{}/oauth/par", url))
···158163 let http_client = client();
159164 let ts = Utc::now().timestamp_millis();
160165 let handle = format!("replay-{}", ts);
161161- http_client.post(format!("{}/xrpc/com.atproto.server.createAccount", url))
166166+ let create_res = http_client.post(format!("{}/xrpc/com.atproto.server.createAccount", url))
162167 .json(&json!({ "handle": handle, "email": format!("{}@example.com", handle), "password": "replay-password" }))
163168 .send().await.unwrap();
169169+ let account: Value = create_res.json().await.unwrap();
170170+ verify_new_account(&http_client, account["did"].as_str().unwrap()).await;
164171 let redirect_uri = "https://example.com/replay-callback";
165172 let mock_client = setup_mock_client_metadata(redirect_uri).await;
166173 let client_id = mock_client.uri();
···243250 let client_id_b = mock_b.uri();
244251 let ts2 = Utc::now().timestamp_millis();
245252 let handle2 = format!("cross-{}", ts2);
246246- http_client.post(format!("{}/xrpc/com.atproto.server.createAccount", url))
253253+ let create_res2 = http_client.post(format!("{}/xrpc/com.atproto.server.createAccount", url))
247254 .json(&json!({ "handle": handle2, "email": format!("{}@example.com", handle2), "password": "cross-password" }))
248255 .send().await.unwrap();
256256+ let account2: Value = create_res2.json().await.unwrap();
257257+ verify_new_account(&http_client, account2["did"].as_str().unwrap()).await;
249258 let (code_verifier2, code_challenge2) = generate_pkce();
250259 let par_a: Value = http_client.post(format!("{}/oauth/par", url))
251260 .form(&[("response_type", "code"), ("client_id", &client_id_a), ("redirect_uri", redirect_uri_a),
+2-2
tests/password_reset.rs
···373373 .await
374374 .expect("User not found");
375375 let initial_count: i64 = sqlx::query_scalar!(
376376- "SELECT COUNT(*) FROM notification_queue WHERE user_id = $1 AND notification_type = 'password_reset'",
376376+ "SELECT COUNT(*) FROM comms_queue WHERE user_id = $1 AND comms_type = 'password_reset'",
377377 user.id
378378 )
379379 .fetch_one(&pool)
···391391 .expect("Failed to request password reset");
392392 assert_eq!(res.status(), StatusCode::OK);
393393 let final_count: i64 = sqlx::query_scalar!(
394394- "SELECT COUNT(*) FROM notification_queue WHERE user_id = $1 AND notification_type = 'password_reset'",
394394+ "SELECT COUNT(*) FROM comms_queue WHERE user_id = $1 AND comms_type = 'password_reset'",
395395 user.id
396396 )
397397 .fetch_one(&pool)