this repo has no description
1use crate::api::error::ApiError;
2use crate::auth::BearerAuthAdmin;
3use crate::state::AppState;
4use crate::types::Did;
5use axum::{
6 Json,
7 extract::{Query, State},
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use tracing::{error, warn};
14
15#[derive(Deserialize)]
16pub struct GetSubjectStatusParams {
17 pub did: Option<String>,
18 pub uri: Option<String>,
19 pub blob: Option<String>,
20}
21
22#[derive(Serialize)]
23pub struct SubjectStatus {
24 pub subject: serde_json::Value,
25 pub takedown: Option<StatusAttr>,
26 pub deactivated: Option<StatusAttr>,
27}
28
29#[derive(Serialize)]
30#[serde(rename_all = "camelCase")]
31pub struct StatusAttr {
32 pub applied: bool,
33 pub r#ref: Option<String>,
34}
35
36pub async fn get_subject_status(
37 State(state): State<AppState>,
38 _auth: BearerAuthAdmin,
39 Query(params): Query<GetSubjectStatusParams>,
40) -> Response {
41 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() {
42 return ApiError::InvalidRequest("Must provide did, uri, or blob".into()).into_response();
43 }
44 if let Some(did) = ¶ms.did {
45 let user = sqlx::query!(
46 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1",
47 did
48 )
49 .fetch_optional(&state.db)
50 .await;
51 match user {
52 Ok(Some(row)) => {
53 let deactivated = row.deactivated_at.map(|_| StatusAttr {
54 applied: true,
55 r#ref: None,
56 });
57 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
58 applied: true,
59 r#ref: Some(r.clone()),
60 });
61 return (
62 StatusCode::OK,
63 Json(SubjectStatus {
64 subject: json!({
65 "$type": "com.atproto.admin.defs#repoRef",
66 "did": row.did
67 }),
68 takedown,
69 deactivated,
70 }),
71 )
72 .into_response();
73 }
74 Ok(None) => {
75 return ApiError::SubjectNotFound.into_response();
76 }
77 Err(e) => {
78 error!("DB error in get_subject_status: {:?}", e);
79 return ApiError::InternalError(None).into_response();
80 }
81 }
82 }
83 if let Some(uri) = ¶ms.uri {
84 let record = sqlx::query!(
85 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1",
86 uri
87 )
88 .fetch_optional(&state.db)
89 .await;
90 match record {
91 Ok(Some(row)) => {
92 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
93 applied: true,
94 r#ref: Some(r.clone()),
95 });
96 return (
97 StatusCode::OK,
98 Json(SubjectStatus {
99 subject: json!({
100 "$type": "com.atproto.repo.strongRef",
101 "uri": uri,
102 "cid": uri
103 }),
104 takedown,
105 deactivated: None,
106 }),
107 )
108 .into_response();
109 }
110 Ok(None) => {
111 return ApiError::RecordNotFound.into_response();
112 }
113 Err(e) => {
114 error!("DB error in get_subject_status: {:?}", e);
115 return ApiError::InternalError(None).into_response();
116 }
117 }
118 }
119 if let Some(blob_cid) = ¶ms.blob {
120 let did = match ¶ms.did {
121 Some(d) => d,
122 None => {
123 return ApiError::InvalidRequest("Must provide a did to request blob state".into())
124 .into_response();
125 }
126 };
127 let blob = sqlx::query!(
128 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1",
129 blob_cid
130 )
131 .fetch_optional(&state.db)
132 .await;
133 match blob {
134 Ok(Some(row)) => {
135 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
136 applied: true,
137 r#ref: Some(r.clone()),
138 });
139 return (
140 StatusCode::OK,
141 Json(SubjectStatus {
142 subject: json!({
143 "$type": "com.atproto.admin.defs#repoBlobRef",
144 "did": did,
145 "cid": row.cid
146 }),
147 takedown,
148 deactivated: None,
149 }),
150 )
151 .into_response();
152 }
153 Ok(None) => {
154 return ApiError::BlobNotFound(None).into_response();
155 }
156 Err(e) => {
157 error!("DB error in get_subject_status: {:?}", e);
158 return ApiError::InternalError(None).into_response();
159 }
160 }
161 }
162 ApiError::InvalidRequest("Invalid subject type".into()).into_response()
163}
164
165#[derive(Deserialize)]
166#[serde(rename_all = "camelCase")]
167pub struct UpdateSubjectStatusInput {
168 pub subject: serde_json::Value,
169 pub takedown: Option<StatusAttrInput>,
170 pub deactivated: Option<StatusAttrInput>,
171}
172
173#[derive(Deserialize)]
174pub struct StatusAttrInput {
175 pub applied: bool,
176 pub r#ref: Option<String>,
177}
178
179pub async fn update_subject_status(
180 State(state): State<AppState>,
181 _auth: BearerAuthAdmin,
182 Json(input): Json<UpdateSubjectStatusInput>,
183) -> Response {
184 let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
185 match subject_type {
186 Some("com.atproto.admin.defs#repoRef") => {
187 let did_str = input.subject.get("did").and_then(|d| d.as_str());
188 if let Some(did_str) = did_str {
189 let did = Did::new_unchecked(did_str);
190 let mut tx = match state.db.begin().await {
191 Ok(tx) => tx,
192 Err(e) => {
193 error!("Failed to begin transaction: {:?}", e);
194 return ApiError::InternalError(None).into_response();
195 }
196 };
197 if let Some(takedown) = &input.takedown {
198 let takedown_ref = if takedown.applied {
199 takedown.r#ref.clone()
200 } else {
201 None
202 };
203 if let Err(e) = sqlx::query!(
204 "UPDATE users SET takedown_ref = $1 WHERE did = $2",
205 takedown_ref,
206 did.as_str()
207 )
208 .execute(&mut *tx)
209 .await
210 {
211 error!("Failed to update user takedown status for {}: {:?}", did, e);
212 return ApiError::InternalError(Some(
213 "Failed to update takedown status".into(),
214 ))
215 .into_response();
216 }
217 }
218 if let Some(deactivated) = &input.deactivated {
219 let result = if deactivated.applied {
220 sqlx::query!(
221 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
222 did.as_str()
223 )
224 .execute(&mut *tx)
225 .await
226 } else {
227 sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did.as_str())
228 .execute(&mut *tx)
229 .await
230 };
231 if let Err(e) = result {
232 error!(
233 "Failed to update user deactivation status for {}: {:?}",
234 did, e
235 );
236 return ApiError::InternalError(Some(
237 "Failed to update deactivation status".into(),
238 ))
239 .into_response();
240 }
241 }
242 if let Err(e) = tx.commit().await {
243 error!("Failed to commit transaction: {:?}", e);
244 return ApiError::InternalError(None).into_response();
245 }
246 if let Some(takedown) = &input.takedown {
247 let status = if takedown.applied {
248 Some("takendown")
249 } else {
250 None
251 };
252 if let Err(e) = crate::api::repo::record::sequence_account_event(
253 &state,
254 &did,
255 !takedown.applied,
256 status,
257 )
258 .await
259 {
260 warn!("Failed to sequence account event for takedown: {}", e);
261 }
262 }
263 if let Some(deactivated) = &input.deactivated {
264 let status = if deactivated.applied {
265 Some("deactivated")
266 } else {
267 None
268 };
269 if let Err(e) = crate::api::repo::record::sequence_account_event(
270 &state,
271 &did,
272 !deactivated.applied,
273 status,
274 )
275 .await
276 {
277 warn!("Failed to sequence account event for deactivation: {}", e);
278 }
279 }
280 if let Ok(Some(handle)) =
281 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str())
282 .fetch_optional(&state.db)
283 .await
284 {
285 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
286 }
287 return (
288 StatusCode::OK,
289 Json(json!({
290 "subject": input.subject,
291 "takedown": input.takedown.as_ref().map(|t| json!({
292 "applied": t.applied,
293 "ref": t.r#ref
294 })),
295 "deactivated": input.deactivated.as_ref().map(|d| json!({
296 "applied": d.applied
297 }))
298 })),
299 )
300 .into_response();
301 }
302 }
303 Some("com.atproto.repo.strongRef") => {
304 let uri = input.subject.get("uri").and_then(|u| u.as_str());
305 if let Some(uri) = uri {
306 if let Some(takedown) = &input.takedown {
307 let takedown_ref = if takedown.applied {
308 takedown.r#ref.clone()
309 } else {
310 None
311 };
312 if let Err(e) = sqlx::query!(
313 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2",
314 takedown_ref,
315 uri
316 )
317 .execute(&state.db)
318 .await
319 {
320 error!(
321 "Failed to update record takedown status for {}: {:?}",
322 uri, e
323 );
324 return ApiError::InternalError(Some(
325 "Failed to update takedown status".into(),
326 ))
327 .into_response();
328 }
329 }
330 return (
331 StatusCode::OK,
332 Json(json!({
333 "subject": input.subject,
334 "takedown": input.takedown.as_ref().map(|t| json!({
335 "applied": t.applied,
336 "ref": t.r#ref
337 }))
338 })),
339 )
340 .into_response();
341 }
342 }
343 Some("com.atproto.admin.defs#repoBlobRef") => {
344 let cid = input.subject.get("cid").and_then(|c| c.as_str());
345 if let Some(cid) = cid {
346 if let Some(takedown) = &input.takedown {
347 let takedown_ref = if takedown.applied {
348 takedown.r#ref.clone()
349 } else {
350 None
351 };
352 if let Err(e) = sqlx::query!(
353 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2",
354 takedown_ref,
355 cid
356 )
357 .execute(&state.db)
358 .await
359 {
360 error!("Failed to update blob takedown status for {}: {:?}", cid, e);
361 return ApiError::InternalError(Some(
362 "Failed to update takedown status".into(),
363 ))
364 .into_response();
365 }
366 }
367 return (
368 StatusCode::OK,
369 Json(json!({
370 "subject": input.subject,
371 "takedown": input.takedown.as_ref().map(|t| json!({
372 "applied": t.applied,
373 "ref": t.r#ref
374 }))
375 })),
376 )
377 .into_response();
378 }
379 }
380 _ => {}
381 }
382 ApiError::InvalidRequest("Invalid subject type".into()).into_response()
383}