this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use serde::{Deserialize, Serialize};
9use serde_json::json;
10use tracing::error;
11
12#[derive(Deserialize)]
13pub struct GetSubjectStatusParams {
14 pub did: Option<String>,
15 pub uri: Option<String>,
16 pub blob: Option<String>,
17}
18
19#[derive(Serialize)]
20pub struct SubjectStatus {
21 pub subject: serde_json::Value,
22 pub takedown: Option<StatusAttr>,
23 pub deactivated: Option<StatusAttr>,
24}
25
26#[derive(Serialize)]
27#[serde(rename_all = "camelCase")]
28pub struct StatusAttr {
29 pub applied: bool,
30 pub r#ref: Option<String>,
31}
32
33pub async fn get_subject_status(
34 State(state): State<AppState>,
35 headers: axum::http::HeaderMap,
36 Query(params): Query<GetSubjectStatusParams>,
37) -> Response {
38 let auth_header = headers.get("Authorization");
39 if auth_header.is_none() {
40 return (
41 StatusCode::UNAUTHORIZED,
42 Json(json!({"error": "AuthenticationRequired"})),
43 )
44 .into_response();
45 }
46
47 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() {
48 return (
49 StatusCode::BAD_REQUEST,
50 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})),
51 )
52 .into_response();
53 }
54
55 if let Some(did) = ¶ms.did {
56 let user = sqlx::query!(
57 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1",
58 did
59 )
60 .fetch_optional(&state.db)
61 .await;
62
63 match user {
64 Ok(Some(row)) => {
65 let deactivated = row.deactivated_at.map(|_| StatusAttr {
66 applied: true,
67 r#ref: None,
68 });
69 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
70 applied: true,
71 r#ref: Some(r.clone()),
72 });
73
74 return (
75 StatusCode::OK,
76 Json(SubjectStatus {
77 subject: json!({
78 "$type": "com.atproto.admin.defs#repoRef",
79 "did": row.did
80 }),
81 takedown,
82 deactivated,
83 }),
84 )
85 .into_response();
86 }
87 Ok(None) => {
88 return (
89 StatusCode::NOT_FOUND,
90 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
91 )
92 .into_response();
93 }
94 Err(e) => {
95 error!("DB error in get_subject_status: {:?}", e);
96 return (
97 StatusCode::INTERNAL_SERVER_ERROR,
98 Json(json!({"error": "InternalError"})),
99 )
100 .into_response();
101 }
102 }
103 }
104
105 if let Some(uri) = ¶ms.uri {
106 let record = sqlx::query!(
107 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1",
108 uri
109 )
110 .fetch_optional(&state.db)
111 .await;
112
113 match record {
114 Ok(Some(row)) => {
115 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
116 applied: true,
117 r#ref: Some(r.clone()),
118 });
119
120 return (
121 StatusCode::OK,
122 Json(SubjectStatus {
123 subject: json!({
124 "$type": "com.atproto.repo.strongRef",
125 "uri": uri,
126 "cid": uri
127 }),
128 takedown,
129 deactivated: None,
130 }),
131 )
132 .into_response();
133 }
134 Ok(None) => {
135 return (
136 StatusCode::NOT_FOUND,
137 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
138 )
139 .into_response();
140 }
141 Err(e) => {
142 error!("DB error in get_subject_status: {:?}", e);
143 return (
144 StatusCode::INTERNAL_SERVER_ERROR,
145 Json(json!({"error": "InternalError"})),
146 )
147 .into_response();
148 }
149 }
150 }
151
152 if let Some(blob_cid) = ¶ms.blob {
153 let blob = sqlx::query!("SELECT cid, takedown_ref FROM blobs WHERE cid = $1", blob_cid)
154 .fetch_optional(&state.db)
155 .await;
156
157 match blob {
158 Ok(Some(row)) => {
159 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
160 applied: true,
161 r#ref: Some(r.clone()),
162 });
163
164 return (
165 StatusCode::OK,
166 Json(SubjectStatus {
167 subject: json!({
168 "$type": "com.atproto.admin.defs#repoBlobRef",
169 "did": "",
170 "cid": row.cid
171 }),
172 takedown,
173 deactivated: None,
174 }),
175 )
176 .into_response();
177 }
178 Ok(None) => {
179 return (
180 StatusCode::NOT_FOUND,
181 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
182 )
183 .into_response();
184 }
185 Err(e) => {
186 error!("DB error in get_subject_status: {:?}", e);
187 return (
188 StatusCode::INTERNAL_SERVER_ERROR,
189 Json(json!({"error": "InternalError"})),
190 )
191 .into_response();
192 }
193 }
194 }
195
196 (
197 StatusCode::BAD_REQUEST,
198 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
199 )
200 .into_response()
201}
202
203#[derive(Deserialize)]
204#[serde(rename_all = "camelCase")]
205pub struct UpdateSubjectStatusInput {
206 pub subject: serde_json::Value,
207 pub takedown: Option<StatusAttrInput>,
208 pub deactivated: Option<StatusAttrInput>,
209}
210
211#[derive(Deserialize)]
212pub struct StatusAttrInput {
213 pub apply: bool,
214 pub r#ref: Option<String>,
215}
216
217pub async fn update_subject_status(
218 State(state): State<AppState>,
219 headers: axum::http::HeaderMap,
220 Json(input): Json<UpdateSubjectStatusInput>,
221) -> Response {
222 let auth_header = headers.get("Authorization");
223 if auth_header.is_none() {
224 return (
225 StatusCode::UNAUTHORIZED,
226 Json(json!({"error": "AuthenticationRequired"})),
227 )
228 .into_response();
229 }
230
231 let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
232
233 match subject_type {
234 Some("com.atproto.admin.defs#repoRef") => {
235 let did = input.subject.get("did").and_then(|d| d.as_str());
236 if let Some(did) = did {
237 let mut tx = match state.db.begin().await {
238 Ok(tx) => tx,
239 Err(e) => {
240 error!("Failed to begin transaction: {:?}", e);
241 return (
242 StatusCode::INTERNAL_SERVER_ERROR,
243 Json(json!({"error": "InternalError"})),
244 )
245 .into_response();
246 }
247 };
248
249 if let Some(takedown) = &input.takedown {
250 let takedown_ref = if takedown.apply {
251 takedown.r#ref.clone()
252 } else {
253 None
254 };
255 if let Err(e) = sqlx::query!(
256 "UPDATE users SET takedown_ref = $1 WHERE did = $2",
257 takedown_ref,
258 did
259 )
260 .execute(&mut *tx)
261 .await
262 {
263 error!("Failed to update user takedown status for {}: {:?}", did, e);
264 return (
265 StatusCode::INTERNAL_SERVER_ERROR,
266 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
267 )
268 .into_response();
269 }
270 }
271
272 if let Some(deactivated) = &input.deactivated {
273 let result = if deactivated.apply {
274 sqlx::query!(
275 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
276 did
277 )
278 .execute(&mut *tx)
279 .await
280 } else {
281 sqlx::query!(
282 "UPDATE users SET deactivated_at = NULL WHERE did = $1",
283 did
284 )
285 .execute(&mut *tx)
286 .await
287 };
288
289 if let Err(e) = result {
290 error!("Failed to update user deactivation status for {}: {:?}", did, e);
291 return (
292 StatusCode::INTERNAL_SERVER_ERROR,
293 Json(json!({"error": "InternalError", "message": "Failed to update deactivation status"})),
294 )
295 .into_response();
296 }
297 }
298
299 if let Err(e) = tx.commit().await {
300 error!("Failed to commit transaction: {:?}", e);
301 return (
302 StatusCode::INTERNAL_SERVER_ERROR,
303 Json(json!({"error": "InternalError"})),
304 )
305 .into_response();
306 }
307
308 if let Ok(Some(handle)) = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
309 .fetch_optional(&state.db)
310 .await
311 {
312 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
313 }
314
315 return (
316 StatusCode::OK,
317 Json(json!({
318 "subject": input.subject,
319 "takedown": input.takedown.as_ref().map(|t| json!({
320 "applied": t.apply,
321 "ref": t.r#ref
322 })),
323 "deactivated": input.deactivated.as_ref().map(|d| json!({
324 "applied": d.apply
325 }))
326 })),
327 )
328 .into_response();
329 }
330 }
331 Some("com.atproto.repo.strongRef") => {
332 let uri = input.subject.get("uri").and_then(|u| u.as_str());
333 if let Some(uri) = uri {
334 if let Some(takedown) = &input.takedown {
335 let takedown_ref = if takedown.apply {
336 takedown.r#ref.clone()
337 } else {
338 None
339 };
340 if let Err(e) = sqlx::query!(
341 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2",
342 takedown_ref,
343 uri
344 )
345 .execute(&state.db)
346 .await
347 {
348 error!("Failed to update record takedown status for {}: {:?}", uri, e);
349 return (
350 StatusCode::INTERNAL_SERVER_ERROR,
351 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
352 )
353 .into_response();
354 }
355 }
356
357 return (
358 StatusCode::OK,
359 Json(json!({
360 "subject": input.subject,
361 "takedown": input.takedown.as_ref().map(|t| json!({
362 "applied": t.apply,
363 "ref": t.r#ref
364 }))
365 })),
366 )
367 .into_response();
368 }
369 }
370 Some("com.atproto.admin.defs#repoBlobRef") => {
371 let cid = input.subject.get("cid").and_then(|c| c.as_str());
372 if let Some(cid) = cid {
373 if let Some(takedown) = &input.takedown {
374 let takedown_ref = if takedown.apply {
375 takedown.r#ref.clone()
376 } else {
377 None
378 };
379 if let Err(e) = sqlx::query!(
380 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2",
381 takedown_ref,
382 cid
383 )
384 .execute(&state.db)
385 .await
386 {
387 error!("Failed to update blob takedown status for {}: {:?}", cid, e);
388 return (
389 StatusCode::INTERNAL_SERVER_ERROR,
390 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
391 )
392 .into_response();
393 }
394 }
395
396 return (
397 StatusCode::OK,
398 Json(json!({
399 "subject": input.subject,
400 "takedown": input.takedown.as_ref().map(|t| json!({
401 "applied": t.apply,
402 "ref": t.r#ref
403 }))
404 })),
405 )
406 .into_response();
407 }
408 }
409 _ => {}
410 }
411
412 (
413 StatusCode::BAD_REQUEST,
414 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
415 )
416 .into_response()
417}