this repo has no description
1use crate::api::error::ApiError;
2use crate::auth::BearerAuthAdmin;
3use crate::state::AppState;
4use axum::{
5 Json,
6 extract::{Query, State},
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use tracing::{error, warn};
13
14#[derive(Deserialize)]
15pub struct GetSubjectStatusParams {
16 pub did: Option<String>,
17 pub uri: Option<String>,
18 pub blob: Option<String>,
19}
20
21#[derive(Serialize)]
22pub struct SubjectStatus {
23 pub subject: serde_json::Value,
24 pub takedown: Option<StatusAttr>,
25 pub deactivated: Option<StatusAttr>,
26}
27
28#[derive(Serialize)]
29#[serde(rename_all = "camelCase")]
30pub struct StatusAttr {
31 pub applied: bool,
32 pub r#ref: Option<String>,
33}
34
35pub async fn get_subject_status(
36 State(state): State<AppState>,
37 _auth: BearerAuthAdmin,
38 Query(params): Query<GetSubjectStatusParams>,
39) -> Response {
40 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() {
41 return ApiError::InvalidRequest("Must provide did, uri, or blob".into()).into_response();
42 }
43 if let Some(did) = ¶ms.did {
44 let user = sqlx::query!(
45 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1",
46 did
47 )
48 .fetch_optional(&state.db)
49 .await;
50 match user {
51 Ok(Some(row)) => {
52 let deactivated = row.deactivated_at.map(|_| StatusAttr {
53 applied: true,
54 r#ref: None,
55 });
56 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
57 applied: true,
58 r#ref: Some(r.clone()),
59 });
60 return (
61 StatusCode::OK,
62 Json(SubjectStatus {
63 subject: json!({
64 "$type": "com.atproto.admin.defs#repoRef",
65 "did": row.did
66 }),
67 takedown,
68 deactivated,
69 }),
70 )
71 .into_response();
72 }
73 Ok(None) => {
74 return ApiError::SubjectNotFound.into_response();
75 }
76 Err(e) => {
77 error!("DB error in get_subject_status: {:?}", e);
78 return ApiError::InternalError(None).into_response();
79 }
80 }
81 }
82 if let Some(uri) = ¶ms.uri {
83 let record = sqlx::query!(
84 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1",
85 uri
86 )
87 .fetch_optional(&state.db)
88 .await;
89 match record {
90 Ok(Some(row)) => {
91 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
92 applied: true,
93 r#ref: Some(r.clone()),
94 });
95 return (
96 StatusCode::OK,
97 Json(SubjectStatus {
98 subject: json!({
99 "$type": "com.atproto.repo.strongRef",
100 "uri": uri,
101 "cid": uri
102 }),
103 takedown,
104 deactivated: None,
105 }),
106 )
107 .into_response();
108 }
109 Ok(None) => {
110 return ApiError::RecordNotFound.into_response();
111 }
112 Err(e) => {
113 error!("DB error in get_subject_status: {:?}", e);
114 return ApiError::InternalError(None).into_response();
115 }
116 }
117 }
118 if let Some(blob_cid) = ¶ms.blob {
119 let did = match ¶ms.did {
120 Some(d) => d,
121 None => {
122 return ApiError::InvalidRequest("Must provide a did to request blob state".into())
123 .into_response();
124 }
125 };
126 let blob = sqlx::query!(
127 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1",
128 blob_cid
129 )
130 .fetch_optional(&state.db)
131 .await;
132 match blob {
133 Ok(Some(row)) => {
134 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
135 applied: true,
136 r#ref: Some(r.clone()),
137 });
138 return (
139 StatusCode::OK,
140 Json(SubjectStatus {
141 subject: json!({
142 "$type": "com.atproto.admin.defs#repoBlobRef",
143 "did": did,
144 "cid": row.cid
145 }),
146 takedown,
147 deactivated: None,
148 }),
149 )
150 .into_response();
151 }
152 Ok(None) => {
153 return ApiError::BlobNotFound(None).into_response();
154 }
155 Err(e) => {
156 error!("DB error in get_subject_status: {:?}", e);
157 return ApiError::InternalError(None).into_response();
158 }
159 }
160 }
161 ApiError::InvalidRequest("Invalid subject type".into()).into_response()
162}
163
164#[derive(Deserialize)]
165#[serde(rename_all = "camelCase")]
166pub struct UpdateSubjectStatusInput {
167 pub subject: serde_json::Value,
168 pub takedown: Option<StatusAttrInput>,
169 pub deactivated: Option<StatusAttrInput>,
170}
171
172#[derive(Deserialize)]
173pub struct StatusAttrInput {
174 pub applied: bool,
175 pub r#ref: Option<String>,
176}
177
178pub async fn update_subject_status(
179 State(state): State<AppState>,
180 _auth: BearerAuthAdmin,
181 Json(input): Json<UpdateSubjectStatusInput>,
182) -> Response {
183 let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
184 match subject_type {
185 Some("com.atproto.admin.defs#repoRef") => {
186 let did = input.subject.get("did").and_then(|d| d.as_str());
187 if let Some(did) = did {
188 let mut tx = match state.db.begin().await {
189 Ok(tx) => tx,
190 Err(e) => {
191 error!("Failed to begin transaction: {:?}", e);
192 return ApiError::InternalError(None).into_response();
193 }
194 };
195 if let Some(takedown) = &input.takedown {
196 let takedown_ref = if takedown.applied {
197 takedown.r#ref.clone()
198 } else {
199 None
200 };
201 if let Err(e) = sqlx::query!(
202 "UPDATE users SET takedown_ref = $1 WHERE did = $2",
203 takedown_ref,
204 did
205 )
206 .execute(&mut *tx)
207 .await
208 {
209 error!("Failed to update user takedown status for {}: {:?}", did, e);
210 return ApiError::InternalError(Some(
211 "Failed to update takedown status".into(),
212 ))
213 .into_response();
214 }
215 }
216 if let Some(deactivated) = &input.deactivated {
217 let result = if deactivated.applied {
218 sqlx::query!(
219 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
220 did
221 )
222 .execute(&mut *tx)
223 .await
224 } else {
225 sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
226 .execute(&mut *tx)
227 .await
228 };
229 if let Err(e) = result {
230 error!(
231 "Failed to update user deactivation status for {}: {:?}",
232 did, e
233 );
234 return ApiError::InternalError(Some(
235 "Failed to update deactivation status".into(),
236 ))
237 .into_response();
238 }
239 }
240 if let Err(e) = tx.commit().await {
241 error!("Failed to commit transaction: {:?}", e);
242 return ApiError::InternalError(None).into_response();
243 }
244 if let Some(takedown) = &input.takedown {
245 let status = if takedown.applied {
246 Some("takendown")
247 } else {
248 None
249 };
250 if let Err(e) = crate::api::repo::record::sequence_account_event(
251 &state,
252 did,
253 !takedown.applied,
254 status,
255 )
256 .await
257 {
258 warn!("Failed to sequence account event for takedown: {}", e);
259 }
260 }
261 if let Some(deactivated) = &input.deactivated {
262 let status = if deactivated.applied {
263 Some("deactivated")
264 } else {
265 None
266 };
267 if let Err(e) = crate::api::repo::record::sequence_account_event(
268 &state,
269 did,
270 !deactivated.applied,
271 status,
272 )
273 .await
274 {
275 warn!("Failed to sequence account event for deactivation: {}", e);
276 }
277 }
278 if let Ok(Some(handle)) =
279 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
280 .fetch_optional(&state.db)
281 .await
282 {
283 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
284 }
285 return (
286 StatusCode::OK,
287 Json(json!({
288 "subject": input.subject,
289 "takedown": input.takedown.as_ref().map(|t| json!({
290 "applied": t.applied,
291 "ref": t.r#ref
292 })),
293 "deactivated": input.deactivated.as_ref().map(|d| json!({
294 "applied": d.applied
295 }))
296 })),
297 )
298 .into_response();
299 }
300 }
301 Some("com.atproto.repo.strongRef") => {
302 let uri = input.subject.get("uri").and_then(|u| u.as_str());
303 if let Some(uri) = uri {
304 if let Some(takedown) = &input.takedown {
305 let takedown_ref = if takedown.applied {
306 takedown.r#ref.clone()
307 } else {
308 None
309 };
310 if let Err(e) = sqlx::query!(
311 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2",
312 takedown_ref,
313 uri
314 )
315 .execute(&state.db)
316 .await
317 {
318 error!(
319 "Failed to update record takedown status for {}: {:?}",
320 uri, e
321 );
322 return ApiError::InternalError(Some(
323 "Failed to update takedown status".into(),
324 ))
325 .into_response();
326 }
327 }
328 return (
329 StatusCode::OK,
330 Json(json!({
331 "subject": input.subject,
332 "takedown": input.takedown.as_ref().map(|t| json!({
333 "applied": t.applied,
334 "ref": t.r#ref
335 }))
336 })),
337 )
338 .into_response();
339 }
340 }
341 Some("com.atproto.admin.defs#repoBlobRef") => {
342 let cid = input.subject.get("cid").and_then(|c| c.as_str());
343 if let Some(cid) = cid {
344 if let Some(takedown) = &input.takedown {
345 let takedown_ref = if takedown.applied {
346 takedown.r#ref.clone()
347 } else {
348 None
349 };
350 if let Err(e) = sqlx::query!(
351 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2",
352 takedown_ref,
353 cid
354 )
355 .execute(&state.db)
356 .await
357 {
358 error!("Failed to update blob takedown status for {}: {:?}", cid, e);
359 return ApiError::InternalError(Some(
360 "Failed to update takedown status".into(),
361 ))
362 .into_response();
363 }
364 }
365 return (
366 StatusCode::OK,
367 Json(json!({
368 "subject": input.subject,
369 "takedown": input.takedown.as_ref().map(|t| json!({
370 "applied": t.applied,
371 "ref": t.r#ref
372 }))
373 })),
374 )
375 .into_response();
376 }
377 }
378 _ => {}
379 }
380 ApiError::InvalidRequest("Invalid subject type".into()).into_response()
381}