···11+-- Add table for identity webhook endpoints
22+CREATE TABLE identity_webhooks (
33+ did TEXT NOT NULL,
44+ service TEXT NOT NULL,
55+ created_at TIMESTAMP WITH TIME ZONE NOT NULL,
66+ enabled BOOLEAN NOT NULL DEFAULT true,
77+ errored_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,
88+ error TEXT DEFAULT NULL,
99+ PRIMARY KEY (did, service)
1010+);
+49-2
src/bin/smokesignal.rs
···11use anyhow::Result;
22-use atproto_identity::key::identify_key;
22+use atproto_identity::key::{identify_key, to_public};
33use atproto_identity::resolve::{IdentityResolver, InnerIdentityResolver, create_resolver};
44use atproto_jetstream::{CancellationToken, Consumer as JetstreamConsumer, ConsumerTaskConfig};
55use atproto_oauth_axum::state::OAuthClientConfig;
66use smokesignal::atproto::lexicon::community::lexicon::calendar::event::NSID;
77use smokesignal::consumer::Consumer;
88use smokesignal::processor::ContentFetcher;
99+use smokesignal::service::{ServiceDID, ServiceKey, build_service_document};
910use smokesignal::storage::content::{CachedContentStorage, ContentStorage, FilesystemStorage};
1011use smokesignal::{
1112 http::{
···3334use std::{collections::HashMap, env, str::FromStr, sync::Arc};
3435use tokio::net::TcpListener;
3536use tokio::signal;
3737+use tokio::sync::mpsc;
3638use tokio_util::task::TaskTracker;
3739use tracing_subscriber::prelude::*;
3840use unic_langid::LanguageIdentifier;
···100102101103 // Initialize the DNS resolver with configuration from the app config
102104 let dns_resolver = create_resolver(config.dns_nameservers.as_ref());
105105+106106+ let service_did = ServiceDID(format!("did:web:{}", &config.external_base));
107107+108108+ let service_key = ServiceKey(config.service_key.as_ref().clone());
109109+ let public_service_key = to_public(config.service_key.as_ref())
110110+ .map(|public_key_data| public_key_data.to_string())
111111+ .expect("public service key");
112112+113113+ let service_document = build_service_document(&config.external_base, &public_service_key);
103114104115 // Initialize OAuth and identity resolution components
105116 let oauth_storage = PostgresOAuthRequestStorage::new_arc(pool.clone());
106117 let document_storage = PostgresDidDocumentStorage::new_arc(pool.clone());
107118108119 // Create a key provider populated with signing keys from OAuth backend config
109109- let key_provider_keys =
120120+ let mut key_provider_keys =
110121 if let OAuthBackendConfig::ATProtocol { signing_keys } = &config.oauth_backend {
111122 signing_keys
112123 .as_ref()
···122133 } else {
123134 HashMap::new() // Empty for AIP backend
124135 };
136136+ key_provider_keys.insert(public_service_key, service_key.0.clone());
125137 let key_provider = Arc::new(SimpleKeyProvider::new(key_provider_keys));
126138127139 // Create OAuth client config (only for AT Protocol backend)
···157169 plc_hostname: config.plc_hostname.clone(),
158170 }));
159171172172+ // Create webhook channel if webhooks are enabled
173173+ let webhook_sender = if config.enable_webhooks && config.enable_task_webhooks {
174174+ let (sender, receiver) = mpsc::channel(100);
175175+ Some((sender, receiver))
176176+ } else {
177177+ None
178178+ };
179179+160180 let content_storage: Arc<dyn ContentStorage> = if config.content_storage.starts_with("s3://") {
161181 #[cfg(feature = "s3")]
162182 {
···194214 supported_languages,
195215 locales,
196216 content_storage.clone(),
217217+ webhook_sender.as_ref().map(|(sender, _)| sender.clone()),
218218+ service_did,
219219+ service_document,
220220+ service_key,
197221 );
198222199223 let app = build_router(web_context.clone());
···371395 tracker.spawn(async move {
372396 if let Err(err) = cleanup_task.run().await {
373397 tracing::error!("OAuth requests cleanup task failed: {}", err);
398398+ }
399399+ inner_token.cancel();
400400+ });
401401+ }
402402+403403+ // Spawn webhook processor task if enabled
404404+ if let Some((_, receiver)) = webhook_sender {
405405+ use axum::extract::FromRef;
406406+ use smokesignal::task_webhooks::WebhookProcessor;
407407+408408+ let webhook_processor = WebhookProcessor::new(
409409+ pool.clone(),
410410+ document_storage.clone(),
411411+ ServiceDID::from_ref(&web_context),
412412+ ServiceKey::from_ref(&web_context),
413413+ receiver,
414414+ token.clone(),
415415+ );
416416+417417+ let inner_token = token.clone();
418418+ tracker.spawn(async move {
419419+ if let Err(err) = webhook_processor.run().await {
420420+ tracing::error!("Webhook processor task failed: {}", err);
374421 }
375422 inner_token.cancel();
376423 });
···3333pub mod handle_set_language;
3434pub mod handle_settings;
3535pub mod handle_view_event;
3636+pub mod handle_wellknown;
3637pub mod handle_xrpc_get_event;
3738pub mod import_utils;
3839pub mod location_edit_status;
···66pub mod http;
77pub mod i18n;
88pub mod key_provider;
99+pub mod processor;
910pub mod refresh_tokens_errors;
1111+pub mod service;
1012pub mod storage;
1111-1212-pub mod processor;
1313pub mod task_identity_refresh;
1414pub mod task_oauth_requests_cleanup;
1515pub mod task_refresh_tokens;
1616+pub mod task_webhooks;
1717+pub mod webhooks;