this repo has no description
1use crate::api::error::ApiError;
2use crate::auth::BearerAuthAdmin;
3use crate::state::AppState;
4use crate::types::Did;
5use axum::{
6 Json,
7 extract::{Query, State},
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::{error, warn};
14
15#[derive(Deserialize)]
16pub struct GetSubjectStatusParams {
17 pub did: Option<String>,
18 pub uri: Option<String>,
19 pub blob: Option<String>,
20}
21
22#[derive(Serialize)]
23pub struct SubjectStatus {
24 pub subject: serde_json::Value,
25 pub takedown: Option<StatusAttr>,
26 pub deactivated: Option<StatusAttr>,
27}
28
29#[derive(Serialize)]
30#[serde(rename_all = "camelCase")]
31pub struct StatusAttr {
32 pub applied: bool,
33 pub r#ref: Option<String>,
34}
35
36pub async fn get_subject_status(
37 State(state): State<AppState>,
38 _auth: BearerAuthAdmin,
39 Query(params): Query<GetSubjectStatusParams>,
40) -> Response {
41 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() {
42 return ApiError::InvalidRequest("Must provide did, uri, or blob".into()).into_response();
43 }
44 if let Some(did) = ¶ms.did {
45 let user = sqlx::query!(
46 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1",
47 did
48 )
49 .fetch_optional(&state.db)
50 .await;
51 match user {
52 Ok(Some(row)) => {
53 let deactivated = row.deactivated_at.map(|_| StatusAttr {
54 applied: true,
55 r#ref: None,
56 });
57 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
58 applied: true,
59 r#ref: Some(r.clone()),
60 });
61 return (
62 StatusCode::OK,
63 Json(SubjectStatus {
64 subject: json!({
65 "$type": "com.atproto.admin.defs#repoRef",
66 "did": row.did
67 }),
68 takedown,
69 deactivated,
70 }),
71 )
72 .into_response();
73 }
74 Ok(None) => {
75 return ApiError::SubjectNotFound.into_response();
76 }
77 Err(e) => {
78 error!("DB error in get_subject_status: {:?}", e);
79 return ApiError::InternalError(None).into_response();
80 }
81 }
82 }
83 if let Some(uri) = ¶ms.uri {
84 let record = sqlx::query!(
85 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1",
86 uri
87 )
88 .fetch_optional(&state.db)
89 .await;
90 match record {
91 Ok(Some(row)) => {
92 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
93 applied: true,
94 r#ref: Some(r.clone()),
95 });
96 return (
97 StatusCode::OK,
98 Json(SubjectStatus {
99 subject: json!({
100 "$type": "com.atproto.repo.strongRef",
101 "uri": uri,
102 "cid": uri
103 }),
104 takedown,
105 deactivated: None,
106 }),
107 )
108 .into_response();
109 }
110 Ok(None) => {
111 return ApiError::RecordNotFound.into_response();
112 }
113 Err(e) => {
114 error!("DB error in get_subject_status: {:?}", e);
115 return ApiError::InternalError(None).into_response();
116 }
117 }
118 }
119 if let Some(blob_cid) = ¶ms.blob {
120 let did = match ¶ms.did {
121 Some(d) => d,
122 None => {
123 return ApiError::InvalidRequest("Must provide a did to request blob state".into())
124 .into_response();
125 }
126 };
127 let blob = sqlx::query!(
128 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1",
129 blob_cid
130 )
131 .fetch_optional(&state.db)
132 .await;
133 match blob {
134 Ok(Some(row)) => {
135 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
136 applied: true,
137 r#ref: Some(r.clone()),
138 });
139 return (
140 StatusCode::OK,
141 Json(SubjectStatus {
142 subject: json!({
143 "$type": "com.atproto.admin.defs#repoBlobRef",
144 "did": did,
145 "cid": row.cid
146 }),
147 takedown,
148 deactivated: None,
149 }),
150 )
151 .into_response();
152 }
153 Ok(None) => {
154 return ApiError::BlobNotFound(None).into_response();
155 }
156 Err(e) => {
157 error!("DB error in get_subject_status: {:?}", e);
158 return ApiError::InternalError(None).into_response();
159 }
160 }
161 }
162 ApiError::InvalidRequest("Invalid subject type".into()).into_response()
163}
164
165#[derive(Deserialize)]
166#[serde(rename_all = "camelCase")]
167pub struct UpdateSubjectStatusInput {
168 pub subject: serde_json::Value,
169 pub takedown: Option<StatusAttrInput>,
170 pub deactivated: Option<StatusAttrInput>,
171}
172
173#[derive(Deserialize)]
174pub struct StatusAttrInput {
175 pub applied: bool,
176 pub r#ref: Option<String>,
177}
178
179pub async fn update_subject_status(
180 State(state): State<AppState>,
181 _auth: BearerAuthAdmin,
182 Json(input): Json<UpdateSubjectStatusInput>,
183) -> Response {
184 let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
185 match subject_type {
186 Some("com.atproto.admin.defs#repoRef") => {
187 let did_str = input.subject.get("did").and_then(|d| d.as_str());
188 if let Some(did_str) = did_str {
189 let did = Did::new_unchecked(did_str);
190 let mut tx = match state.db.begin().await {
191 Ok(tx) => tx,
192 Err(e) => {
193 error!("Failed to begin transaction: {:?}", e);
194 return ApiError::InternalError(None).into_response();
195 }
196 };
197 if let Some(takedown) = &input.takedown {
198 let takedown_ref = if takedown.applied {
199 takedown.r#ref.clone()
200 } else {
201 None
202 };
203 if let Err(e) = sqlx::query!(
204 "UPDATE users SET takedown_ref = $1 WHERE did = $2",
205 takedown_ref,
206 did.as_str()
207 )
208 .execute(&mut *tx)
209 .await
210 {
211 error!("Failed to update user takedown status for {}: {:?}", did, e);
212 return ApiError::InternalError(Some(
213 "Failed to update takedown status".into(),
214 ))
215 .into_response();
216 }
217 }
218 if let Some(deactivated) = &input.deactivated {
219 let result = if deactivated.applied {
220 sqlx::query!(
221 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
222 did.as_str()
223 )
224 .execute(&mut *tx)
225 .await
226 } else {
227 sqlx::query!(
228 "UPDATE users SET deactivated_at = NULL WHERE did = $1",
229 did.as_str()
230 )
231 .execute(&mut *tx)
232 .await
233 };
234 if let Err(e) = result {
235 error!(
236 "Failed to update user deactivation status for {}: {:?}",
237 did, e
238 );
239 return ApiError::InternalError(Some(
240 "Failed to update deactivation status".into(),
241 ))
242 .into_response();
243 }
244 }
245 if let Err(e) = tx.commit().await {
246 error!("Failed to commit transaction: {:?}", e);
247 return ApiError::InternalError(None).into_response();
248 }
249 if let Some(takedown) = &input.takedown {
250 let status = if takedown.applied {
251 Some("takendown")
252 } else {
253 None
254 };
255 if let Err(e) = crate::api::repo::record::sequence_account_event(
256 &state,
257 &did,
258 !takedown.applied,
259 status,
260 )
261 .await
262 {
263 warn!("Failed to sequence account event for takedown: {}", e);
264 }
265 }
266 if let Some(deactivated) = &input.deactivated {
267 let status = if deactivated.applied {
268 Some("deactivated")
269 } else {
270 None
271 };
272 if let Err(e) = crate::api::repo::record::sequence_account_event(
273 &state,
274 &did,
275 !deactivated.applied,
276 status,
277 )
278 .await
279 {
280 warn!("Failed to sequence account event for deactivation: {}", e);
281 }
282 }
283 if let Ok(Some(handle)) =
284 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str())
285 .fetch_optional(&state.db)
286 .await
287 {
288 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
289 }
290 return (
291 StatusCode::OK,
292 Json(json!({
293 "subject": input.subject,
294 "takedown": input.takedown.as_ref().map(|t| json!({
295 "applied": t.applied,
296 "ref": t.r#ref
297 })),
298 "deactivated": input.deactivated.as_ref().map(|d| json!({
299 "applied": d.applied
300 }))
301 })),
302 )
303 .into_response();
304 }
305 }
306 Some("com.atproto.repo.strongRef") => {
307 let uri = input.subject.get("uri").and_then(|u| u.as_str());
308 if let Some(uri) = uri {
309 if let Some(takedown) = &input.takedown {
310 let takedown_ref = if takedown.applied {
311 takedown.r#ref.clone()
312 } else {
313 None
314 };
315 if let Err(e) = sqlx::query!(
316 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2",
317 takedown_ref,
318 uri
319 )
320 .execute(&state.db)
321 .await
322 {
323 error!(
324 "Failed to update record takedown status for {}: {:?}",
325 uri, e
326 );
327 return ApiError::InternalError(Some(
328 "Failed to update takedown status".into(),
329 ))
330 .into_response();
331 }
332 }
333 return (
334 StatusCode::OK,
335 Json(json!({
336 "subject": input.subject,
337 "takedown": input.takedown.as_ref().map(|t| json!({
338 "applied": t.applied,
339 "ref": t.r#ref
340 }))
341 })),
342 )
343 .into_response();
344 }
345 }
346 Some("com.atproto.admin.defs#repoBlobRef") => {
347 let cid = input.subject.get("cid").and_then(|c| c.as_str());
348 if let Some(cid) = cid {
349 if let Some(takedown) = &input.takedown {
350 let takedown_ref = if takedown.applied {
351 takedown.r#ref.clone()
352 } else {
353 None
354 };
355 if let Err(e) = sqlx::query!(
356 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2",
357 takedown_ref,
358 cid
359 )
360 .execute(&state.db)
361 .await
362 {
363 error!("Failed to update blob takedown status for {}: {:?}", cid, e);
364 return ApiError::InternalError(Some(
365 "Failed to update takedown status".into(),
366 ))
367 .into_response();
368 }
369 }
370 return (
371 StatusCode::OK,
372 Json(json!({
373 "subject": input.subject,
374 "takedown": input.takedown.as_ref().map(|t| json!({
375 "applied": t.applied,
376 "ref": t.r#ref
377 }))
378 })),
379 )
380 .into_response();
381 }
382 }
383 _ => {}
384 }
385 ApiError::InvalidRequest("Invalid subject type".into()).into_response()
386}