this repo has no description
1use crate::auth::BearerAuthAdmin;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use serde::{Deserialize, Serialize};
10use serde_json::json;
11use tracing::{error, warn};
12
13#[derive(Deserialize)]
14pub struct GetSubjectStatusParams {
15 pub did: Option<String>,
16 pub uri: Option<String>,
17 pub blob: Option<String>,
18}
19
20#[derive(Serialize)]
21pub struct SubjectStatus {
22 pub subject: serde_json::Value,
23 pub takedown: Option<StatusAttr>,
24 pub deactivated: Option<StatusAttr>,
25}
26
27#[derive(Serialize)]
28#[serde(rename_all = "camelCase")]
29pub struct StatusAttr {
30 pub applied: bool,
31 pub r#ref: Option<String>,
32}
33
34pub async fn get_subject_status(
35 State(state): State<AppState>,
36 _auth: BearerAuthAdmin,
37 Query(params): Query<GetSubjectStatusParams>,
38) -> Response {
39 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() {
40 return (
41 StatusCode::BAD_REQUEST,
42 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})),
43 )
44 .into_response();
45 }
46 if let Some(did) = ¶ms.did {
47 let user = sqlx::query!(
48 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1",
49 did
50 )
51 .fetch_optional(&state.db)
52 .await;
53 match user {
54 Ok(Some(row)) => {
55 let deactivated = row.deactivated_at.map(|_| StatusAttr {
56 applied: true,
57 r#ref: None,
58 });
59 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
60 applied: true,
61 r#ref: Some(r.clone()),
62 });
63 return (
64 StatusCode::OK,
65 Json(SubjectStatus {
66 subject: json!({
67 "$type": "com.atproto.admin.defs#repoRef",
68 "did": row.did
69 }),
70 takedown,
71 deactivated,
72 }),
73 )
74 .into_response();
75 }
76 Ok(None) => {
77 return (
78 StatusCode::NOT_FOUND,
79 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
80 )
81 .into_response();
82 }
83 Err(e) => {
84 error!("DB error in get_subject_status: {:?}", e);
85 return (
86 StatusCode::INTERNAL_SERVER_ERROR,
87 Json(json!({"error": "InternalError"})),
88 )
89 .into_response();
90 }
91 }
92 }
93 if let Some(uri) = ¶ms.uri {
94 let record = sqlx::query!(
95 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1",
96 uri
97 )
98 .fetch_optional(&state.db)
99 .await;
100 match record {
101 Ok(Some(row)) => {
102 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
103 applied: true,
104 r#ref: Some(r.clone()),
105 });
106 return (
107 StatusCode::OK,
108 Json(SubjectStatus {
109 subject: json!({
110 "$type": "com.atproto.repo.strongRef",
111 "uri": uri,
112 "cid": uri
113 }),
114 takedown,
115 deactivated: None,
116 }),
117 )
118 .into_response();
119 }
120 Ok(None) => {
121 return (
122 StatusCode::NOT_FOUND,
123 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
124 )
125 .into_response();
126 }
127 Err(e) => {
128 error!("DB error in get_subject_status: {:?}", e);
129 return (
130 StatusCode::INTERNAL_SERVER_ERROR,
131 Json(json!({"error": "InternalError"})),
132 )
133 .into_response();
134 }
135 }
136 }
137 if let Some(blob_cid) = ¶ms.blob {
138 let did = match ¶ms.did {
139 Some(d) => d,
140 None => {
141 return (
142 StatusCode::BAD_REQUEST,
143 Json(json!({"error": "InvalidRequest", "message": "Must provide a did to request blob state"})),
144 )
145 .into_response();
146 }
147 };
148 let blob = sqlx::query!(
149 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1",
150 blob_cid
151 )
152 .fetch_optional(&state.db)
153 .await;
154 match blob {
155 Ok(Some(row)) => {
156 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
157 applied: true,
158 r#ref: Some(r.clone()),
159 });
160 return (
161 StatusCode::OK,
162 Json(SubjectStatus {
163 subject: json!({
164 "$type": "com.atproto.admin.defs#repoBlobRef",
165 "did": did,
166 "cid": row.cid
167 }),
168 takedown,
169 deactivated: None,
170 }),
171 )
172 .into_response();
173 }
174 Ok(None) => {
175 return (
176 StatusCode::NOT_FOUND,
177 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
178 )
179 .into_response();
180 }
181 Err(e) => {
182 error!("DB error in get_subject_status: {:?}", e);
183 return (
184 StatusCode::INTERNAL_SERVER_ERROR,
185 Json(json!({"error": "InternalError"})),
186 )
187 .into_response();
188 }
189 }
190 }
191 (
192 StatusCode::BAD_REQUEST,
193 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
194 )
195 .into_response()
196}
197
198#[derive(Deserialize)]
199#[serde(rename_all = "camelCase")]
200pub struct UpdateSubjectStatusInput {
201 pub subject: serde_json::Value,
202 pub takedown: Option<StatusAttrInput>,
203 pub deactivated: Option<StatusAttrInput>,
204}
205
206#[derive(Deserialize)]
207pub struct StatusAttrInput {
208 pub applied: bool,
209 pub r#ref: Option<String>,
210}
211
212pub async fn update_subject_status(
213 State(state): State<AppState>,
214 _auth: BearerAuthAdmin,
215 Json(input): Json<UpdateSubjectStatusInput>,
216) -> Response {
217 let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
218 match subject_type {
219 Some("com.atproto.admin.defs#repoRef") => {
220 let did = input.subject.get("did").and_then(|d| d.as_str());
221 if let Some(did) = did {
222 let mut tx = match state.db.begin().await {
223 Ok(tx) => tx,
224 Err(e) => {
225 error!("Failed to begin transaction: {:?}", e);
226 return (
227 StatusCode::INTERNAL_SERVER_ERROR,
228 Json(json!({"error": "InternalError"})),
229 )
230 .into_response();
231 }
232 };
233 if let Some(takedown) = &input.takedown {
234 let takedown_ref = if takedown.applied {
235 takedown.r#ref.clone()
236 } else {
237 None
238 };
239 if let Err(e) = sqlx::query!(
240 "UPDATE users SET takedown_ref = $1 WHERE did = $2",
241 takedown_ref,
242 did
243 )
244 .execute(&mut *tx)
245 .await
246 {
247 error!("Failed to update user takedown status for {}: {:?}", did, e);
248 return (
249 StatusCode::INTERNAL_SERVER_ERROR,
250 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
251 )
252 .into_response();
253 }
254 }
255 if let Some(deactivated) = &input.deactivated {
256 let result = if deactivated.applied {
257 sqlx::query!(
258 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
259 did
260 )
261 .execute(&mut *tx)
262 .await
263 } else {
264 sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
265 .execute(&mut *tx)
266 .await
267 };
268 if let Err(e) = result {
269 error!(
270 "Failed to update user deactivation status for {}: {:?}",
271 did, e
272 );
273 return (
274 StatusCode::INTERNAL_SERVER_ERROR,
275 Json(json!({"error": "InternalError", "message": "Failed to update deactivation status"})),
276 )
277 .into_response();
278 }
279 }
280 if let Err(e) = tx.commit().await {
281 error!("Failed to commit transaction: {:?}", e);
282 return (
283 StatusCode::INTERNAL_SERVER_ERROR,
284 Json(json!({"error": "InternalError"})),
285 )
286 .into_response();
287 }
288 if let Some(takedown) = &input.takedown {
289 let status = if takedown.applied {
290 Some("takendown")
291 } else {
292 None
293 };
294 if let Err(e) = crate::api::repo::record::sequence_account_event(
295 &state,
296 did,
297 !takedown.applied,
298 status,
299 )
300 .await
301 {
302 warn!("Failed to sequence account event for takedown: {}", e);
303 }
304 }
305 if let Some(deactivated) = &input.deactivated {
306 let status = if deactivated.applied {
307 Some("deactivated")
308 } else {
309 None
310 };
311 if let Err(e) = crate::api::repo::record::sequence_account_event(
312 &state,
313 did,
314 !deactivated.applied,
315 status,
316 )
317 .await
318 {
319 warn!("Failed to sequence account event for deactivation: {}", e);
320 }
321 }
322 if let Ok(Some(handle)) =
323 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
324 .fetch_optional(&state.db)
325 .await
326 {
327 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
328 }
329 return (
330 StatusCode::OK,
331 Json(json!({
332 "subject": input.subject,
333 "takedown": input.takedown.as_ref().map(|t| json!({
334 "applied": t.applied,
335 "ref": t.r#ref
336 })),
337 "deactivated": input.deactivated.as_ref().map(|d| json!({
338 "applied": d.applied
339 }))
340 })),
341 )
342 .into_response();
343 }
344 }
345 Some("com.atproto.repo.strongRef") => {
346 let uri = input.subject.get("uri").and_then(|u| u.as_str());
347 if let Some(uri) = uri {
348 if let Some(takedown) = &input.takedown {
349 let takedown_ref = if takedown.applied {
350 takedown.r#ref.clone()
351 } else {
352 None
353 };
354 if let Err(e) = sqlx::query!(
355 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2",
356 takedown_ref,
357 uri
358 )
359 .execute(&state.db)
360 .await
361 {
362 error!(
363 "Failed to update record takedown status for {}: {:?}",
364 uri, e
365 );
366 return (
367 StatusCode::INTERNAL_SERVER_ERROR,
368 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
369 )
370 .into_response();
371 }
372 }
373 return (
374 StatusCode::OK,
375 Json(json!({
376 "subject": input.subject,
377 "takedown": input.takedown.as_ref().map(|t| json!({
378 "applied": t.applied,
379 "ref": t.r#ref
380 }))
381 })),
382 )
383 .into_response();
384 }
385 }
386 Some("com.atproto.admin.defs#repoBlobRef") => {
387 let cid = input.subject.get("cid").and_then(|c| c.as_str());
388 if let Some(cid) = cid {
389 if let Some(takedown) = &input.takedown {
390 let takedown_ref = if takedown.applied {
391 takedown.r#ref.clone()
392 } else {
393 None
394 };
395 if let Err(e) = sqlx::query!(
396 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2",
397 takedown_ref,
398 cid
399 )
400 .execute(&state.db)
401 .await
402 {
403 error!("Failed to update blob takedown status for {}: {:?}", cid, e);
404 return (
405 StatusCode::INTERNAL_SERVER_ERROR,
406 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
407 )
408 .into_response();
409 }
410 }
411 return (
412 StatusCode::OK,
413 Json(json!({
414 "subject": input.subject,
415 "takedown": input.takedown.as_ref().map(|t| json!({
416 "applied": t.applied,
417 "ref": t.r#ref
418 }))
419 })),
420 )
421 .into_response();
422 }
423 }
424 _ => {}
425 }
426 (
427 StatusCode::BAD_REQUEST,
428 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
429 )
430 .into_response()
431}