this repo has no description
1use crate::auth::BearerAuthAdmin;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use serde::{Deserialize, Serialize};
10use serde_json::json;
11use tracing::{error, warn};
12
13#[derive(Deserialize)]
14pub struct GetSubjectStatusParams {
15 pub did: Option<String>,
16 pub uri: Option<String>,
17 pub blob: Option<String>,
18}
19
20#[derive(Serialize)]
21pub struct SubjectStatus {
22 pub subject: serde_json::Value,
23 pub takedown: Option<StatusAttr>,
24 pub deactivated: Option<StatusAttr>,
25}
26
27#[derive(Serialize)]
28#[serde(rename_all = "camelCase")]
29pub struct StatusAttr {
30 pub applied: bool,
31 pub r#ref: Option<String>,
32}
33
34pub async fn get_subject_status(
35 State(state): State<AppState>,
36 _auth: BearerAuthAdmin,
37 Query(params): Query<GetSubjectStatusParams>,
38) -> Response {
39 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() {
40 return (
41 StatusCode::BAD_REQUEST,
42 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})),
43 )
44 .into_response();
45 }
46 if let Some(did) = ¶ms.did {
47 let user = sqlx::query!(
48 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1",
49 did
50 )
51 .fetch_optional(&state.db)
52 .await;
53 match user {
54 Ok(Some(row)) => {
55 let deactivated = row.deactivated_at.map(|_| StatusAttr {
56 applied: true,
57 r#ref: None,
58 });
59 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
60 applied: true,
61 r#ref: Some(r.clone()),
62 });
63 return (
64 StatusCode::OK,
65 Json(SubjectStatus {
66 subject: json!({
67 "$type": "com.atproto.admin.defs#repoRef",
68 "did": row.did
69 }),
70 takedown,
71 deactivated,
72 }),
73 )
74 .into_response();
75 }
76 Ok(None) => {
77 return (
78 StatusCode::NOT_FOUND,
79 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
80 )
81 .into_response();
82 }
83 Err(e) => {
84 error!("DB error in get_subject_status: {:?}", e);
85 return (
86 StatusCode::INTERNAL_SERVER_ERROR,
87 Json(json!({"error": "InternalError"})),
88 )
89 .into_response();
90 }
91 }
92 }
93 if let Some(uri) = ¶ms.uri {
94 let record = sqlx::query!(
95 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1",
96 uri
97 )
98 .fetch_optional(&state.db)
99 .await;
100 match record {
101 Ok(Some(row)) => {
102 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
103 applied: true,
104 r#ref: Some(r.clone()),
105 });
106 return (
107 StatusCode::OK,
108 Json(SubjectStatus {
109 subject: json!({
110 "$type": "com.atproto.repo.strongRef",
111 "uri": uri,
112 "cid": uri
113 }),
114 takedown,
115 deactivated: None,
116 }),
117 )
118 .into_response();
119 }
120 Ok(None) => {
121 return (
122 StatusCode::NOT_FOUND,
123 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
124 )
125 .into_response();
126 }
127 Err(e) => {
128 error!("DB error in get_subject_status: {:?}", e);
129 return (
130 StatusCode::INTERNAL_SERVER_ERROR,
131 Json(json!({"error": "InternalError"})),
132 )
133 .into_response();
134 }
135 }
136 }
137 if let Some(blob_cid) = ¶ms.blob {
138 let blob = sqlx::query!(
139 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1",
140 blob_cid
141 )
142 .fetch_optional(&state.db)
143 .await;
144 match blob {
145 Ok(Some(row)) => {
146 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
147 applied: true,
148 r#ref: Some(r.clone()),
149 });
150 return (
151 StatusCode::OK,
152 Json(SubjectStatus {
153 subject: json!({
154 "$type": "com.atproto.admin.defs#repoBlobRef",
155 "did": "",
156 "cid": row.cid
157 }),
158 takedown,
159 deactivated: None,
160 }),
161 )
162 .into_response();
163 }
164 Ok(None) => {
165 return (
166 StatusCode::NOT_FOUND,
167 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
168 )
169 .into_response();
170 }
171 Err(e) => {
172 error!("DB error in get_subject_status: {:?}", e);
173 return (
174 StatusCode::INTERNAL_SERVER_ERROR,
175 Json(json!({"error": "InternalError"})),
176 )
177 .into_response();
178 }
179 }
180 }
181 (
182 StatusCode::BAD_REQUEST,
183 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
184 )
185 .into_response()
186}
187
188#[derive(Deserialize)]
189#[serde(rename_all = "camelCase")]
190pub struct UpdateSubjectStatusInput {
191 pub subject: serde_json::Value,
192 pub takedown: Option<StatusAttrInput>,
193 pub deactivated: Option<StatusAttrInput>,
194}
195
196#[derive(Deserialize)]
197pub struct StatusAttrInput {
198 pub apply: bool,
199 pub r#ref: Option<String>,
200}
201
202pub async fn update_subject_status(
203 State(state): State<AppState>,
204 _auth: BearerAuthAdmin,
205 Json(input): Json<UpdateSubjectStatusInput>,
206) -> Response {
207 let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
208 match subject_type {
209 Some("com.atproto.admin.defs#repoRef") => {
210 let did = input.subject.get("did").and_then(|d| d.as_str());
211 if let Some(did) = did {
212 let mut tx = match state.db.begin().await {
213 Ok(tx) => tx,
214 Err(e) => {
215 error!("Failed to begin transaction: {:?}", e);
216 return (
217 StatusCode::INTERNAL_SERVER_ERROR,
218 Json(json!({"error": "InternalError"})),
219 )
220 .into_response();
221 }
222 };
223 if let Some(takedown) = &input.takedown {
224 let takedown_ref = if takedown.apply {
225 takedown.r#ref.clone()
226 } else {
227 None
228 };
229 if let Err(e) = sqlx::query!(
230 "UPDATE users SET takedown_ref = $1 WHERE did = $2",
231 takedown_ref,
232 did
233 )
234 .execute(&mut *tx)
235 .await
236 {
237 error!("Failed to update user takedown status for {}: {:?}", did, e);
238 return (
239 StatusCode::INTERNAL_SERVER_ERROR,
240 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
241 )
242 .into_response();
243 }
244 }
245 if let Some(deactivated) = &input.deactivated {
246 let result = if deactivated.apply {
247 sqlx::query!(
248 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
249 did
250 )
251 .execute(&mut *tx)
252 .await
253 } else {
254 sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
255 .execute(&mut *tx)
256 .await
257 };
258 if let Err(e) = result {
259 error!(
260 "Failed to update user deactivation status for {}: {:?}",
261 did, e
262 );
263 return (
264 StatusCode::INTERNAL_SERVER_ERROR,
265 Json(json!({"error": "InternalError", "message": "Failed to update deactivation status"})),
266 )
267 .into_response();
268 }
269 }
270 if let Err(e) = tx.commit().await {
271 error!("Failed to commit transaction: {:?}", e);
272 return (
273 StatusCode::INTERNAL_SERVER_ERROR,
274 Json(json!({"error": "InternalError"})),
275 )
276 .into_response();
277 }
278 if let Some(takedown) = &input.takedown {
279 let status = if takedown.apply {
280 Some("takendown")
281 } else {
282 None
283 };
284 if let Err(e) = crate::api::repo::record::sequence_account_event(
285 &state,
286 did,
287 !takedown.apply,
288 status,
289 )
290 .await
291 {
292 warn!("Failed to sequence account event for takedown: {}", e);
293 }
294 }
295 if let Some(deactivated) = &input.deactivated {
296 let status = if deactivated.apply {
297 Some("deactivated")
298 } else {
299 None
300 };
301 if let Err(e) = crate::api::repo::record::sequence_account_event(
302 &state,
303 did,
304 !deactivated.apply,
305 status,
306 )
307 .await
308 {
309 warn!("Failed to sequence account event for deactivation: {}", e);
310 }
311 }
312 if let Ok(Some(handle)) =
313 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
314 .fetch_optional(&state.db)
315 .await
316 {
317 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
318 }
319 return (
320 StatusCode::OK,
321 Json(json!({
322 "subject": input.subject,
323 "takedown": input.takedown.as_ref().map(|t| json!({
324 "applied": t.apply,
325 "ref": t.r#ref
326 })),
327 "deactivated": input.deactivated.as_ref().map(|d| json!({
328 "applied": d.apply
329 }))
330 })),
331 )
332 .into_response();
333 }
334 }
335 Some("com.atproto.repo.strongRef") => {
336 let uri = input.subject.get("uri").and_then(|u| u.as_str());
337 if let Some(uri) = uri {
338 if let Some(takedown) = &input.takedown {
339 let takedown_ref = if takedown.apply {
340 takedown.r#ref.clone()
341 } else {
342 None
343 };
344 if let Err(e) = sqlx::query!(
345 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2",
346 takedown_ref,
347 uri
348 )
349 .execute(&state.db)
350 .await
351 {
352 error!(
353 "Failed to update record takedown status for {}: {:?}",
354 uri, e
355 );
356 return (
357 StatusCode::INTERNAL_SERVER_ERROR,
358 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
359 )
360 .into_response();
361 }
362 }
363 return (
364 StatusCode::OK,
365 Json(json!({
366 "subject": input.subject,
367 "takedown": input.takedown.as_ref().map(|t| json!({
368 "applied": t.apply,
369 "ref": t.r#ref
370 }))
371 })),
372 )
373 .into_response();
374 }
375 }
376 Some("com.atproto.admin.defs#repoBlobRef") => {
377 let cid = input.subject.get("cid").and_then(|c| c.as_str());
378 if let Some(cid) = cid {
379 if let Some(takedown) = &input.takedown {
380 let takedown_ref = if takedown.apply {
381 takedown.r#ref.clone()
382 } else {
383 None
384 };
385 if let Err(e) = sqlx::query!(
386 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2",
387 takedown_ref,
388 cid
389 )
390 .execute(&state.db)
391 .await
392 {
393 error!("Failed to update blob takedown status for {}: {:?}", cid, e);
394 return (
395 StatusCode::INTERNAL_SERVER_ERROR,
396 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
397 )
398 .into_response();
399 }
400 }
401 return (
402 StatusCode::OK,
403 Json(json!({
404 "subject": input.subject,
405 "takedown": input.takedown.as_ref().map(|t| json!({
406 "applied": t.apply,
407 "ref": t.r#ref
408 }))
409 })),
410 )
411 .into_response();
412 }
413 }
414 _ => {}
415 }
416 (
417 StatusCode::BAD_REQUEST,
418 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
419 )
420 .into_response()
421}