this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use serde::{Deserialize, Serialize};
9use serde_json::json;
10use tracing::{error, warn};
11
12#[derive(Deserialize)]
13pub struct GetSubjectStatusParams {
14 pub did: Option<String>,
15 pub uri: Option<String>,
16 pub blob: Option<String>,
17}
18
19#[derive(Serialize)]
20pub struct SubjectStatus {
21 pub subject: serde_json::Value,
22 pub takedown: Option<StatusAttr>,
23 pub deactivated: Option<StatusAttr>,
24}
25
26#[derive(Serialize)]
27#[serde(rename_all = "camelCase")]
28pub struct StatusAttr {
29 pub applied: bool,
30 pub r#ref: Option<String>,
31}
32
33pub async fn get_subject_status(
34 State(state): State<AppState>,
35 headers: axum::http::HeaderMap,
36 Query(params): Query<GetSubjectStatusParams>,
37) -> Response {
38 let auth_header = headers.get("Authorization");
39 if auth_header.is_none() {
40 return (
41 StatusCode::UNAUTHORIZED,
42 Json(json!({"error": "AuthenticationRequired"})),
43 )
44 .into_response();
45 }
46 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() {
47 return (
48 StatusCode::BAD_REQUEST,
49 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})),
50 )
51 .into_response();
52 }
53 if let Some(did) = ¶ms.did {
54 let user = sqlx::query!(
55 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1",
56 did
57 )
58 .fetch_optional(&state.db)
59 .await;
60 match user {
61 Ok(Some(row)) => {
62 let deactivated = row.deactivated_at.map(|_| StatusAttr {
63 applied: true,
64 r#ref: None,
65 });
66 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
67 applied: true,
68 r#ref: Some(r.clone()),
69 });
70 return (
71 StatusCode::OK,
72 Json(SubjectStatus {
73 subject: json!({
74 "$type": "com.atproto.admin.defs#repoRef",
75 "did": row.did
76 }),
77 takedown,
78 deactivated,
79 }),
80 )
81 .into_response();
82 }
83 Ok(None) => {
84 return (
85 StatusCode::NOT_FOUND,
86 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
87 )
88 .into_response();
89 }
90 Err(e) => {
91 error!("DB error in get_subject_status: {:?}", e);
92 return (
93 StatusCode::INTERNAL_SERVER_ERROR,
94 Json(json!({"error": "InternalError"})),
95 )
96 .into_response();
97 }
98 }
99 }
100 if let Some(uri) = ¶ms.uri {
101 let record = sqlx::query!(
102 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1",
103 uri
104 )
105 .fetch_optional(&state.db)
106 .await;
107 match record {
108 Ok(Some(row)) => {
109 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
110 applied: true,
111 r#ref: Some(r.clone()),
112 });
113 return (
114 StatusCode::OK,
115 Json(SubjectStatus {
116 subject: json!({
117 "$type": "com.atproto.repo.strongRef",
118 "uri": uri,
119 "cid": uri
120 }),
121 takedown,
122 deactivated: None,
123 }),
124 )
125 .into_response();
126 }
127 Ok(None) => {
128 return (
129 StatusCode::NOT_FOUND,
130 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
131 )
132 .into_response();
133 }
134 Err(e) => {
135 error!("DB error in get_subject_status: {:?}", e);
136 return (
137 StatusCode::INTERNAL_SERVER_ERROR,
138 Json(json!({"error": "InternalError"})),
139 )
140 .into_response();
141 }
142 }
143 }
144 if let Some(blob_cid) = ¶ms.blob {
145 let blob = sqlx::query!("SELECT cid, takedown_ref FROM blobs WHERE cid = $1", blob_cid)
146 .fetch_optional(&state.db)
147 .await;
148 match blob {
149 Ok(Some(row)) => {
150 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
151 applied: true,
152 r#ref: Some(r.clone()),
153 });
154 return (
155 StatusCode::OK,
156 Json(SubjectStatus {
157 subject: json!({
158 "$type": "com.atproto.admin.defs#repoBlobRef",
159 "did": "",
160 "cid": row.cid
161 }),
162 takedown,
163 deactivated: None,
164 }),
165 )
166 .into_response();
167 }
168 Ok(None) => {
169 return (
170 StatusCode::NOT_FOUND,
171 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
172 )
173 .into_response();
174 }
175 Err(e) => {
176 error!("DB error in get_subject_status: {:?}", e);
177 return (
178 StatusCode::INTERNAL_SERVER_ERROR,
179 Json(json!({"error": "InternalError"})),
180 )
181 .into_response();
182 }
183 }
184 }
185 (
186 StatusCode::BAD_REQUEST,
187 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
188 )
189 .into_response()
190}
191
192#[derive(Deserialize)]
193#[serde(rename_all = "camelCase")]
194pub struct UpdateSubjectStatusInput {
195 pub subject: serde_json::Value,
196 pub takedown: Option<StatusAttrInput>,
197 pub deactivated: Option<StatusAttrInput>,
198}
199
200#[derive(Deserialize)]
201pub struct StatusAttrInput {
202 pub apply: bool,
203 pub r#ref: Option<String>,
204}
205
206pub async fn update_subject_status(
207 State(state): State<AppState>,
208 headers: axum::http::HeaderMap,
209 Json(input): Json<UpdateSubjectStatusInput>,
210) -> Response {
211 let auth_header = headers.get("Authorization");
212 if auth_header.is_none() {
213 return (
214 StatusCode::UNAUTHORIZED,
215 Json(json!({"error": "AuthenticationRequired"})),
216 )
217 .into_response();
218 }
219 let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
220 match subject_type {
221 Some("com.atproto.admin.defs#repoRef") => {
222 let did = input.subject.get("did").and_then(|d| d.as_str());
223 if let Some(did) = did {
224 let mut tx = match state.db.begin().await {
225 Ok(tx) => tx,
226 Err(e) => {
227 error!("Failed to begin transaction: {:?}", e);
228 return (
229 StatusCode::INTERNAL_SERVER_ERROR,
230 Json(json!({"error": "InternalError"})),
231 )
232 .into_response();
233 }
234 };
235 if let Some(takedown) = &input.takedown {
236 let takedown_ref = if takedown.apply {
237 takedown.r#ref.clone()
238 } else {
239 None
240 };
241 if let Err(e) = sqlx::query!(
242 "UPDATE users SET takedown_ref = $1 WHERE did = $2",
243 takedown_ref,
244 did
245 )
246 .execute(&mut *tx)
247 .await
248 {
249 error!("Failed to update user takedown status for {}: {:?}", did, e);
250 return (
251 StatusCode::INTERNAL_SERVER_ERROR,
252 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
253 )
254 .into_response();
255 }
256 }
257 if let Some(deactivated) = &input.deactivated {
258 let result = if deactivated.apply {
259 sqlx::query!(
260 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
261 did
262 )
263 .execute(&mut *tx)
264 .await
265 } else {
266 sqlx::query!(
267 "UPDATE users SET deactivated_at = NULL WHERE did = $1",
268 did
269 )
270 .execute(&mut *tx)
271 .await
272 };
273 if let Err(e) = result {
274 error!("Failed to update user deactivation status for {}: {:?}", did, e);
275 return (
276 StatusCode::INTERNAL_SERVER_ERROR,
277 Json(json!({"error": "InternalError", "message": "Failed to update deactivation status"})),
278 )
279 .into_response();
280 }
281 }
282 if let Err(e) = tx.commit().await {
283 error!("Failed to commit transaction: {:?}", e);
284 return (
285 StatusCode::INTERNAL_SERVER_ERROR,
286 Json(json!({"error": "InternalError"})),
287 )
288 .into_response();
289 }
290 if let Some(takedown) = &input.takedown {
291 let status = if takedown.apply { Some("takendown") } else { None };
292 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, did, !takedown.apply, status).await {
293 warn!("Failed to sequence account event for takedown: {}", e);
294 }
295 }
296 if let Some(deactivated) = &input.deactivated {
297 let status = if deactivated.apply { Some("deactivated") } else { None };
298 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, did, !deactivated.apply, status).await {
299 warn!("Failed to sequence account event for deactivation: {}", e);
300 }
301 }
302 if let Ok(Some(handle)) = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
303 .fetch_optional(&state.db)
304 .await
305 {
306 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
307 }
308 return (
309 StatusCode::OK,
310 Json(json!({
311 "subject": input.subject,
312 "takedown": input.takedown.as_ref().map(|t| json!({
313 "applied": t.apply,
314 "ref": t.r#ref
315 })),
316 "deactivated": input.deactivated.as_ref().map(|d| json!({
317 "applied": d.apply
318 }))
319 })),
320 )
321 .into_response();
322 }
323 }
324 Some("com.atproto.repo.strongRef") => {
325 let uri = input.subject.get("uri").and_then(|u| u.as_str());
326 if let Some(uri) = uri {
327 if let Some(takedown) = &input.takedown {
328 let takedown_ref = if takedown.apply {
329 takedown.r#ref.clone()
330 } else {
331 None
332 };
333 if let Err(e) = sqlx::query!(
334 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2",
335 takedown_ref,
336 uri
337 )
338 .execute(&state.db)
339 .await
340 {
341 error!("Failed to update record takedown status for {}: {:?}", uri, e);
342 return (
343 StatusCode::INTERNAL_SERVER_ERROR,
344 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
345 )
346 .into_response();
347 }
348 }
349 return (
350 StatusCode::OK,
351 Json(json!({
352 "subject": input.subject,
353 "takedown": input.takedown.as_ref().map(|t| json!({
354 "applied": t.apply,
355 "ref": t.r#ref
356 }))
357 })),
358 )
359 .into_response();
360 }
361 }
362 Some("com.atproto.admin.defs#repoBlobRef") => {
363 let cid = input.subject.get("cid").and_then(|c| c.as_str());
364 if let Some(cid) = cid {
365 if let Some(takedown) = &input.takedown {
366 let takedown_ref = if takedown.apply {
367 takedown.r#ref.clone()
368 } else {
369 None
370 };
371 if let Err(e) = sqlx::query!(
372 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2",
373 takedown_ref,
374 cid
375 )
376 .execute(&state.db)
377 .await
378 {
379 error!("Failed to update blob takedown status for {}: {:?}", cid, e);
380 return (
381 StatusCode::INTERNAL_SERVER_ERROR,
382 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
383 )
384 .into_response();
385 }
386 }
387 return (
388 StatusCode::OK,
389 Json(json!({
390 "subject": input.subject,
391 "takedown": input.takedown.as_ref().map(|t| json!({
392 "applied": t.apply,
393 "ref": t.r#ref
394 }))
395 })),
396 )
397 .into_response();
398 }
399 }
400 _ => {}
401 }
402 (
403 StatusCode::BAD_REQUEST,
404 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
405 )
406 .into_response()
407}