this repo has no description
1use crate::api::error::ApiError;
2use crate::auth::BearerAuthAdmin;
3use crate::state::AppState;
4use axum::{
5 Json,
6 extract::{Query, State},
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use tracing::{error, warn};
13
14#[derive(Deserialize)]
15pub struct GetSubjectStatusParams {
16 pub did: Option<String>,
17 pub uri: Option<String>,
18 pub blob: Option<String>,
19}
20
21#[derive(Serialize)]
22pub struct SubjectStatus {
23 pub subject: serde_json::Value,
24 pub takedown: Option<StatusAttr>,
25 pub deactivated: Option<StatusAttr>,
26}
27
28#[derive(Serialize)]
29#[serde(rename_all = "camelCase")]
30pub struct StatusAttr {
31 pub applied: bool,
32 pub r#ref: Option<String>,
33}
34
35pub async fn get_subject_status(
36 State(state): State<AppState>,
37 _auth: BearerAuthAdmin,
38 Query(params): Query<GetSubjectStatusParams>,
39) -> Response {
40 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() {
41 return ApiError::InvalidRequest("Must provide did, uri, or blob".into()).into_response();
42 }
43 if let Some(did) = ¶ms.did {
44 let user = sqlx::query!(
45 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1",
46 did
47 )
48 .fetch_optional(&state.db)
49 .await;
50 match user {
51 Ok(Some(row)) => {
52 let deactivated = row.deactivated_at.map(|_| StatusAttr {
53 applied: true,
54 r#ref: None,
55 });
56 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
57 applied: true,
58 r#ref: Some(r.clone()),
59 });
60 return (
61 StatusCode::OK,
62 Json(SubjectStatus {
63 subject: json!({
64 "$type": "com.atproto.admin.defs#repoRef",
65 "did": row.did
66 }),
67 takedown,
68 deactivated,
69 }),
70 )
71 .into_response();
72 }
73 Ok(None) => {
74 return ApiError::SubjectNotFound.into_response();
75 }
76 Err(e) => {
77 error!("DB error in get_subject_status: {:?}", e);
78 return ApiError::InternalError(None).into_response();
79 }
80 }
81 }
82 if let Some(uri) = ¶ms.uri {
83 let record = sqlx::query!(
84 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1",
85 uri
86 )
87 .fetch_optional(&state.db)
88 .await;
89 match record {
90 Ok(Some(row)) => {
91 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
92 applied: true,
93 r#ref: Some(r.clone()),
94 });
95 return (
96 StatusCode::OK,
97 Json(SubjectStatus {
98 subject: json!({
99 "$type": "com.atproto.repo.strongRef",
100 "uri": uri,
101 "cid": uri
102 }),
103 takedown,
104 deactivated: None,
105 }),
106 )
107 .into_response();
108 }
109 Ok(None) => {
110 return ApiError::RecordNotFound.into_response();
111 }
112 Err(e) => {
113 error!("DB error in get_subject_status: {:?}", e);
114 return ApiError::InternalError(None).into_response();
115 }
116 }
117 }
118 if let Some(blob_cid) = ¶ms.blob {
119 let did = match ¶ms.did {
120 Some(d) => d,
121 None => {
122 return ApiError::InvalidRequest(
123 "Must provide a did to request blob state".into(),
124 )
125 .into_response();
126 }
127 };
128 let blob = sqlx::query!(
129 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1",
130 blob_cid
131 )
132 .fetch_optional(&state.db)
133 .await;
134 match blob {
135 Ok(Some(row)) => {
136 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
137 applied: true,
138 r#ref: Some(r.clone()),
139 });
140 return (
141 StatusCode::OK,
142 Json(SubjectStatus {
143 subject: json!({
144 "$type": "com.atproto.admin.defs#repoBlobRef",
145 "did": did,
146 "cid": row.cid
147 }),
148 takedown,
149 deactivated: None,
150 }),
151 )
152 .into_response();
153 }
154 Ok(None) => {
155 return ApiError::BlobNotFound(None).into_response();
156 }
157 Err(e) => {
158 error!("DB error in get_subject_status: {:?}", e);
159 return ApiError::InternalError(None).into_response();
160 }
161 }
162 }
163 ApiError::InvalidRequest("Invalid subject type".into()).into_response()
164}
165
166#[derive(Deserialize)]
167#[serde(rename_all = "camelCase")]
168pub struct UpdateSubjectStatusInput {
169 pub subject: serde_json::Value,
170 pub takedown: Option<StatusAttrInput>,
171 pub deactivated: Option<StatusAttrInput>,
172}
173
174#[derive(Deserialize)]
175pub struct StatusAttrInput {
176 pub applied: bool,
177 pub r#ref: Option<String>,
178}
179
180pub async fn update_subject_status(
181 State(state): State<AppState>,
182 _auth: BearerAuthAdmin,
183 Json(input): Json<UpdateSubjectStatusInput>,
184) -> Response {
185 let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
186 match subject_type {
187 Some("com.atproto.admin.defs#repoRef") => {
188 let did = input.subject.get("did").and_then(|d| d.as_str());
189 if let Some(did) = did {
190 let mut tx = match state.db.begin().await {
191 Ok(tx) => tx,
192 Err(e) => {
193 error!("Failed to begin transaction: {:?}", e);
194 return ApiError::InternalError(None).into_response();
195 }
196 };
197 if let Some(takedown) = &input.takedown {
198 let takedown_ref = if takedown.applied {
199 takedown.r#ref.clone()
200 } else {
201 None
202 };
203 if let Err(e) = sqlx::query!(
204 "UPDATE users SET takedown_ref = $1 WHERE did = $2",
205 takedown_ref,
206 did
207 )
208 .execute(&mut *tx)
209 .await
210 {
211 error!("Failed to update user takedown status for {}: {:?}", did, e);
212 return ApiError::InternalError(Some(
213 "Failed to update takedown status".into(),
214 ))
215 .into_response();
216 }
217 }
218 if let Some(deactivated) = &input.deactivated {
219 let result = if deactivated.applied {
220 sqlx::query!(
221 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
222 did
223 )
224 .execute(&mut *tx)
225 .await
226 } else {
227 sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
228 .execute(&mut *tx)
229 .await
230 };
231 if let Err(e) = result {
232 error!(
233 "Failed to update user deactivation status for {}: {:?}",
234 did, e
235 );
236 return ApiError::InternalError(Some(
237 "Failed to update deactivation status".into(),
238 ))
239 .into_response();
240 }
241 }
242 if let Err(e) = tx.commit().await {
243 error!("Failed to commit transaction: {:?}", e);
244 return ApiError::InternalError(None).into_response();
245 }
246 if let Some(takedown) = &input.takedown {
247 let status = if takedown.applied {
248 Some("takendown")
249 } else {
250 None
251 };
252 if let Err(e) = crate::api::repo::record::sequence_account_event(
253 &state,
254 did,
255 !takedown.applied,
256 status,
257 )
258 .await
259 {
260 warn!("Failed to sequence account event for takedown: {}", e);
261 }
262 }
263 if let Some(deactivated) = &input.deactivated {
264 let status = if deactivated.applied {
265 Some("deactivated")
266 } else {
267 None
268 };
269 if let Err(e) = crate::api::repo::record::sequence_account_event(
270 &state,
271 did,
272 !deactivated.applied,
273 status,
274 )
275 .await
276 {
277 warn!("Failed to sequence account event for deactivation: {}", e);
278 }
279 }
280 if let Ok(Some(handle)) =
281 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
282 .fetch_optional(&state.db)
283 .await
284 {
285 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
286 }
287 return (
288 StatusCode::OK,
289 Json(json!({
290 "subject": input.subject,
291 "takedown": input.takedown.as_ref().map(|t| json!({
292 "applied": t.applied,
293 "ref": t.r#ref
294 })),
295 "deactivated": input.deactivated.as_ref().map(|d| json!({
296 "applied": d.applied
297 }))
298 })),
299 )
300 .into_response();
301 }
302 }
303 Some("com.atproto.repo.strongRef") => {
304 let uri = input.subject.get("uri").and_then(|u| u.as_str());
305 if let Some(uri) = uri {
306 if let Some(takedown) = &input.takedown {
307 let takedown_ref = if takedown.applied {
308 takedown.r#ref.clone()
309 } else {
310 None
311 };
312 if let Err(e) = sqlx::query!(
313 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2",
314 takedown_ref,
315 uri
316 )
317 .execute(&state.db)
318 .await
319 {
320 error!(
321 "Failed to update record takedown status for {}: {:?}",
322 uri, e
323 );
324 return ApiError::InternalError(Some(
325 "Failed to update takedown status".into(),
326 ))
327 .into_response();
328 }
329 }
330 return (
331 StatusCode::OK,
332 Json(json!({
333 "subject": input.subject,
334 "takedown": input.takedown.as_ref().map(|t| json!({
335 "applied": t.applied,
336 "ref": t.r#ref
337 }))
338 })),
339 )
340 .into_response();
341 }
342 }
343 Some("com.atproto.admin.defs#repoBlobRef") => {
344 let cid = input.subject.get("cid").and_then(|c| c.as_str());
345 if let Some(cid) = cid {
346 if let Some(takedown) = &input.takedown {
347 let takedown_ref = if takedown.applied {
348 takedown.r#ref.clone()
349 } else {
350 None
351 };
352 if let Err(e) = sqlx::query!(
353 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2",
354 takedown_ref,
355 cid
356 )
357 .execute(&state.db)
358 .await
359 {
360 error!("Failed to update blob takedown status for {}: {:?}", cid, e);
361 return ApiError::InternalError(Some(
362 "Failed to update takedown status".into(),
363 ))
364 .into_response();
365 }
366 }
367 return (
368 StatusCode::OK,
369 Json(json!({
370 "subject": input.subject,
371 "takedown": input.takedown.as_ref().map(|t| json!({
372 "applied": t.applied,
373 "ref": t.r#ref
374 }))
375 })),
376 )
377 .into_response();
378 }
379 }
380 _ => {}
381 }
382 ApiError::InvalidRequest("Invalid subject type".into()).into_response()
383}