···1use anyhow::Result;
2+use atproto_tap::TapClient;
3use axum::{
4 extract::{Path, Query},
5 response::{IntoResponse, Redirect},
···17 pagination::{Pagination, PaginationView},
18 },
19 select_template,
20+ storage::identity_profile::{get_all_dids, handle_list, handle_nuke},
21};
2223pub(crate) async fn handle_admin_handles(
···109 Ok(Redirect::to("/admin/handles").into_response())
110 }
111}
112+113+/// Sync all known DIDs to TAP for event streaming
114+pub(crate) async fn handle_admin_tap_sync_all(
115+ admin_ctx: AdminRequestContext,
116+ HxRequest(hx_request): HxRequest,
117+) -> Result<impl IntoResponse, WebError> {
118+ let error_template = select_template!(false, false, admin_ctx.language);
119+120+ // Check if TAP is enabled
121+ if !admin_ctx.web_context.config.enable_tap {
122+ return contextual_error!(
123+ admin_ctx.web_context,
124+ admin_ctx.language,
125+ error_template,
126+ template_context! {
127+ message => "TAP is not enabled in the configuration."
128+ },
129+ "TAP is not enabled"
130+ );
131+ }
132+133+ // Get all DIDs from the database
134+ let dids = match get_all_dids(&admin_ctx.web_context.pool).await {
135+ Ok(dids) => dids,
136+ Err(err) => {
137+ return contextual_error!(
138+ admin_ctx.web_context,
139+ admin_ctx.language,
140+ error_template,
141+ template_context! {},
142+ err
143+ );
144+ }
145+ };
146+147+ if dids.is_empty() {
148+ tracing::info!("No DIDs to sync to TAP");
149+ if hx_request {
150+ let hx_redirect = HxRedirect::from("/admin/identity_profiles");
151+ return Ok((StatusCode::OK, hx_redirect, "").into_response());
152+ } else {
153+ return Ok(Redirect::to("/admin/identity_profiles").into_response());
154+ }
155+ }
156+157+ // Create TAP client and sync DIDs
158+ let tap_client = TapClient::new(
159+ &admin_ctx.web_context.config.tap_hostname,
160+ admin_ctx.web_context.config.tap_password.clone(),
161+ );
162+163+ // Convert Vec<String> to Vec<&str> for add_repos
164+ let did_refs: Vec<&str> = dids.iter().map(|s| s.as_str()).collect();
165+166+ match tap_client.add_repos(&did_refs).await {
167+ Ok(()) => {
168+ tracing::info!(count = dids.len(), "Successfully synced all DIDs to TAP");
169+ }
170+ Err(err) => {
171+ tracing::error!(error = %err, "Failed to sync DIDs to TAP");
172+ return contextual_error!(
173+ admin_ctx.web_context,
174+ admin_ctx.language,
175+ error_template,
176+ template_context! {
177+ message => format!("Failed to sync DIDs to TAP: {}", err)
178+ },
179+ err
180+ );
181+ }
182+ }
183+184+ if hx_request {
185+ let hx_redirect = HxRedirect::from("/admin/identity_profiles");
186+ Ok((StatusCode::OK, hx_redirect, "").into_response())
187+ } else {
188+ Ok(Redirect::to("/admin/identity_profiles").into_response())
189+ }
190+}
+19
src/http/handle_oauth_aip_callback.rs
···10use anyhow::Result;
11use atproto_client::errors::SimpleError;
12use atproto_identity::resolve::IdentityResolver;
013use axum::{
14 extract::State,
15 response::{IntoResponse, Redirect},
···212 {
213 // Log the error but don't fail the authentication flow
214 tracing::warn!(?err, "Failed to send confirmation email to new user");
000000000000000000215 }
216217 // Import Bluesky profile for new users
···10use anyhow::Result;
11use atproto_client::errors::SimpleError;
12use atproto_identity::resolve::IdentityResolver;
13+use atproto_tap::TapClient;
14use axum::{
15 extract::State,
16 response::{IntoResponse, Redirect},
···213 {
214 // Log the error but don't fail the authentication flow
215 tracing::warn!(?err, "Failed to send confirmation email to new user");
216+ }
217+218+ // Register new user's DID with TAP for event streaming
219+ if is_new_user && web_context.config.enable_tap {
220+ let tap_client = TapClient::new(
221+ &web_context.config.tap_hostname,
222+ web_context.config.tap_password.clone(),
223+ );
224+ if let Err(err) = tap_client.add_repos(&[&document.id]).await {
225+ // Log the error but don't fail the authentication flow
226+ tracing::warn!(
227+ did = %document.id,
228+ error = %err,
229+ "Failed to register DID with TAP"
230+ );
231+ } else {
232+ tracing::info!(did = %document.id, "Registered new user DID with TAP");
233+ }
234 }
235236 // Import Bluesky profile for new users
···447 ))
448}
4490000000000450/// Get an identity profile by DID
451pub async fn identity_profile_get(
452 pool: &StoragePool,
···447 ))
448}
449450+/// Get all DIDs from identity profiles
451+pub async fn get_all_dids(pool: &StoragePool) -> Result<Vec<String>, StorageError> {
452+ let dids = sqlx::query_scalar::<_, String>("SELECT did FROM identity_profiles ORDER BY did")
453+ .fetch_all(pool)
454+ .await
455+ .map_err(StorageError::UnableToExecuteQuery)?;
456+457+ Ok(dids)
458+}
459+460/// Get an identity profile by DID
461pub async fn identity_profile_get(
462 pool: &StoragePool,