this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use serde::{Deserialize, Serialize};
9use serde_json::json;
10use tracing::{error, warn};
11
12#[derive(Deserialize)]
13pub struct GetSubjectStatusParams {
14 pub did: Option<String>,
15 pub uri: Option<String>,
16 pub blob: Option<String>,
17}
18
19#[derive(Serialize)]
20pub struct SubjectStatus {
21 pub subject: serde_json::Value,
22 pub takedown: Option<StatusAttr>,
23 pub deactivated: Option<StatusAttr>,
24}
25
26#[derive(Serialize)]
27#[serde(rename_all = "camelCase")]
28pub struct StatusAttr {
29 pub applied: bool,
30 pub r#ref: Option<String>,
31}
32
33pub async fn get_subject_status(
34 State(state): State<AppState>,
35 headers: axum::http::HeaderMap,
36 Query(params): Query<GetSubjectStatusParams>,
37) -> Response {
38 let auth_header = headers.get("Authorization");
39 if auth_header.is_none() {
40 return (
41 StatusCode::UNAUTHORIZED,
42 Json(json!({"error": "AuthenticationRequired"})),
43 )
44 .into_response();
45 }
46 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() {
47 return (
48 StatusCode::BAD_REQUEST,
49 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})),
50 )
51 .into_response();
52 }
53 if let Some(did) = ¶ms.did {
54 let user = sqlx::query!(
55 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1",
56 did
57 )
58 .fetch_optional(&state.db)
59 .await;
60 match user {
61 Ok(Some(row)) => {
62 let deactivated = row.deactivated_at.map(|_| StatusAttr {
63 applied: true,
64 r#ref: None,
65 });
66 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
67 applied: true,
68 r#ref: Some(r.clone()),
69 });
70 return (
71 StatusCode::OK,
72 Json(SubjectStatus {
73 subject: json!({
74 "$type": "com.atproto.admin.defs#repoRef",
75 "did": row.did
76 }),
77 takedown,
78 deactivated,
79 }),
80 )
81 .into_response();
82 }
83 Ok(None) => {
84 return (
85 StatusCode::NOT_FOUND,
86 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
87 )
88 .into_response();
89 }
90 Err(e) => {
91 error!("DB error in get_subject_status: {:?}", e);
92 return (
93 StatusCode::INTERNAL_SERVER_ERROR,
94 Json(json!({"error": "InternalError"})),
95 )
96 .into_response();
97 }
98 }
99 }
100 if let Some(uri) = ¶ms.uri {
101 let record = sqlx::query!(
102 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1",
103 uri
104 )
105 .fetch_optional(&state.db)
106 .await;
107 match record {
108 Ok(Some(row)) => {
109 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
110 applied: true,
111 r#ref: Some(r.clone()),
112 });
113 return (
114 StatusCode::OK,
115 Json(SubjectStatus {
116 subject: json!({
117 "$type": "com.atproto.repo.strongRef",
118 "uri": uri,
119 "cid": uri
120 }),
121 takedown,
122 deactivated: None,
123 }),
124 )
125 .into_response();
126 }
127 Ok(None) => {
128 return (
129 StatusCode::NOT_FOUND,
130 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
131 )
132 .into_response();
133 }
134 Err(e) => {
135 error!("DB error in get_subject_status: {:?}", e);
136 return (
137 StatusCode::INTERNAL_SERVER_ERROR,
138 Json(json!({"error": "InternalError"})),
139 )
140 .into_response();
141 }
142 }
143 }
144 if let Some(blob_cid) = ¶ms.blob {
145 let blob = sqlx::query!(
146 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1",
147 blob_cid
148 )
149 .fetch_optional(&state.db)
150 .await;
151 match blob {
152 Ok(Some(row)) => {
153 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
154 applied: true,
155 r#ref: Some(r.clone()),
156 });
157 return (
158 StatusCode::OK,
159 Json(SubjectStatus {
160 subject: json!({
161 "$type": "com.atproto.admin.defs#repoBlobRef",
162 "did": "",
163 "cid": row.cid
164 }),
165 takedown,
166 deactivated: None,
167 }),
168 )
169 .into_response();
170 }
171 Ok(None) => {
172 return (
173 StatusCode::NOT_FOUND,
174 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
175 )
176 .into_response();
177 }
178 Err(e) => {
179 error!("DB error in get_subject_status: {:?}", e);
180 return (
181 StatusCode::INTERNAL_SERVER_ERROR,
182 Json(json!({"error": "InternalError"})),
183 )
184 .into_response();
185 }
186 }
187 }
188 (
189 StatusCode::BAD_REQUEST,
190 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
191 )
192 .into_response()
193}
194
195#[derive(Deserialize)]
196#[serde(rename_all = "camelCase")]
197pub struct UpdateSubjectStatusInput {
198 pub subject: serde_json::Value,
199 pub takedown: Option<StatusAttrInput>,
200 pub deactivated: Option<StatusAttrInput>,
201}
202
203#[derive(Deserialize)]
204pub struct StatusAttrInput {
205 pub apply: bool,
206 pub r#ref: Option<String>,
207}
208
209pub async fn update_subject_status(
210 State(state): State<AppState>,
211 headers: axum::http::HeaderMap,
212 Json(input): Json<UpdateSubjectStatusInput>,
213) -> Response {
214 let auth_header = headers.get("Authorization");
215 if auth_header.is_none() {
216 return (
217 StatusCode::UNAUTHORIZED,
218 Json(json!({"error": "AuthenticationRequired"})),
219 )
220 .into_response();
221 }
222 let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
223 match subject_type {
224 Some("com.atproto.admin.defs#repoRef") => {
225 let did = input.subject.get("did").and_then(|d| d.as_str());
226 if let Some(did) = did {
227 let mut tx = match state.db.begin().await {
228 Ok(tx) => tx,
229 Err(e) => {
230 error!("Failed to begin transaction: {:?}", e);
231 return (
232 StatusCode::INTERNAL_SERVER_ERROR,
233 Json(json!({"error": "InternalError"})),
234 )
235 .into_response();
236 }
237 };
238 if let Some(takedown) = &input.takedown {
239 let takedown_ref = if takedown.apply {
240 takedown.r#ref.clone()
241 } else {
242 None
243 };
244 if let Err(e) = sqlx::query!(
245 "UPDATE users SET takedown_ref = $1 WHERE did = $2",
246 takedown_ref,
247 did
248 )
249 .execute(&mut *tx)
250 .await
251 {
252 error!("Failed to update user takedown status for {}: {:?}", did, e);
253 return (
254 StatusCode::INTERNAL_SERVER_ERROR,
255 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
256 )
257 .into_response();
258 }
259 }
260 if let Some(deactivated) = &input.deactivated {
261 let result = if deactivated.apply {
262 sqlx::query!(
263 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
264 did
265 )
266 .execute(&mut *tx)
267 .await
268 } else {
269 sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
270 .execute(&mut *tx)
271 .await
272 };
273 if let Err(e) = result {
274 error!(
275 "Failed to update user deactivation status for {}: {:?}",
276 did, e
277 );
278 return (
279 StatusCode::INTERNAL_SERVER_ERROR,
280 Json(json!({"error": "InternalError", "message": "Failed to update deactivation status"})),
281 )
282 .into_response();
283 }
284 }
285 if let Err(e) = tx.commit().await {
286 error!("Failed to commit transaction: {:?}", e);
287 return (
288 StatusCode::INTERNAL_SERVER_ERROR,
289 Json(json!({"error": "InternalError"})),
290 )
291 .into_response();
292 }
293 if let Some(takedown) = &input.takedown {
294 let status = if takedown.apply {
295 Some("takendown")
296 } else {
297 None
298 };
299 if let Err(e) = crate::api::repo::record::sequence_account_event(
300 &state,
301 did,
302 !takedown.apply,
303 status,
304 )
305 .await
306 {
307 warn!("Failed to sequence account event for takedown: {}", e);
308 }
309 }
310 if let Some(deactivated) = &input.deactivated {
311 let status = if deactivated.apply {
312 Some("deactivated")
313 } else {
314 None
315 };
316 if let Err(e) = crate::api::repo::record::sequence_account_event(
317 &state,
318 did,
319 !deactivated.apply,
320 status,
321 )
322 .await
323 {
324 warn!("Failed to sequence account event for deactivation: {}", e);
325 }
326 }
327 if let Ok(Some(handle)) =
328 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
329 .fetch_optional(&state.db)
330 .await
331 {
332 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
333 }
334 return (
335 StatusCode::OK,
336 Json(json!({
337 "subject": input.subject,
338 "takedown": input.takedown.as_ref().map(|t| json!({
339 "applied": t.apply,
340 "ref": t.r#ref
341 })),
342 "deactivated": input.deactivated.as_ref().map(|d| json!({
343 "applied": d.apply
344 }))
345 })),
346 )
347 .into_response();
348 }
349 }
350 Some("com.atproto.repo.strongRef") => {
351 let uri = input.subject.get("uri").and_then(|u| u.as_str());
352 if let Some(uri) = uri {
353 if let Some(takedown) = &input.takedown {
354 let takedown_ref = if takedown.apply {
355 takedown.r#ref.clone()
356 } else {
357 None
358 };
359 if let Err(e) = sqlx::query!(
360 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2",
361 takedown_ref,
362 uri
363 )
364 .execute(&state.db)
365 .await
366 {
367 error!(
368 "Failed to update record takedown status for {}: {:?}",
369 uri, e
370 );
371 return (
372 StatusCode::INTERNAL_SERVER_ERROR,
373 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
374 )
375 .into_response();
376 }
377 }
378 return (
379 StatusCode::OK,
380 Json(json!({
381 "subject": input.subject,
382 "takedown": input.takedown.as_ref().map(|t| json!({
383 "applied": t.apply,
384 "ref": t.r#ref
385 }))
386 })),
387 )
388 .into_response();
389 }
390 }
391 Some("com.atproto.admin.defs#repoBlobRef") => {
392 let cid = input.subject.get("cid").and_then(|c| c.as_str());
393 if let Some(cid) = cid {
394 if let Some(takedown) = &input.takedown {
395 let takedown_ref = if takedown.apply {
396 takedown.r#ref.clone()
397 } else {
398 None
399 };
400 if let Err(e) = sqlx::query!(
401 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2",
402 takedown_ref,
403 cid
404 )
405 .execute(&state.db)
406 .await
407 {
408 error!("Failed to update blob takedown status for {}: {:?}", cid, e);
409 return (
410 StatusCode::INTERNAL_SERVER_ERROR,
411 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
412 )
413 .into_response();
414 }
415 }
416 return (
417 StatusCode::OK,
418 Json(json!({
419 "subject": input.subject,
420 "takedown": input.takedown.as_ref().map(|t| json!({
421 "applied": t.apply,
422 "ref": t.r#ref
423 }))
424 })),
425 )
426 .into_response();
427 }
428 }
429 _ => {}
430 }
431 (
432 StatusCode::BAD_REQUEST,
433 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
434 )
435 .into_response()
436}