this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use serde::{Deserialize, Serialize};
9use serde_json::json;
10use tracing::{error, warn};
11#[derive(Deserialize)]
12pub struct GetSubjectStatusParams {
13 pub did: Option<String>,
14 pub uri: Option<String>,
15 pub blob: Option<String>,
16}
17#[derive(Serialize)]
18pub struct SubjectStatus {
19 pub subject: serde_json::Value,
20 pub takedown: Option<StatusAttr>,
21 pub deactivated: Option<StatusAttr>,
22}
23#[derive(Serialize)]
24#[serde(rename_all = "camelCase")]
25pub struct StatusAttr {
26 pub applied: bool,
27 pub r#ref: Option<String>,
28}
29pub async fn get_subject_status(
30 State(state): State<AppState>,
31 headers: axum::http::HeaderMap,
32 Query(params): Query<GetSubjectStatusParams>,
33) -> Response {
34 let auth_header = headers.get("Authorization");
35 if auth_header.is_none() {
36 return (
37 StatusCode::UNAUTHORIZED,
38 Json(json!({"error": "AuthenticationRequired"})),
39 )
40 .into_response();
41 }
42 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() {
43 return (
44 StatusCode::BAD_REQUEST,
45 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})),
46 )
47 .into_response();
48 }
49 if let Some(did) = ¶ms.did {
50 let user = sqlx::query!(
51 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1",
52 did
53 )
54 .fetch_optional(&state.db)
55 .await;
56 match user {
57 Ok(Some(row)) => {
58 let deactivated = row.deactivated_at.map(|_| StatusAttr {
59 applied: true,
60 r#ref: None,
61 });
62 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
63 applied: true,
64 r#ref: Some(r.clone()),
65 });
66 return (
67 StatusCode::OK,
68 Json(SubjectStatus {
69 subject: json!({
70 "$type": "com.atproto.admin.defs#repoRef",
71 "did": row.did
72 }),
73 takedown,
74 deactivated,
75 }),
76 )
77 .into_response();
78 }
79 Ok(None) => {
80 return (
81 StatusCode::NOT_FOUND,
82 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
83 )
84 .into_response();
85 }
86 Err(e) => {
87 error!("DB error in get_subject_status: {:?}", e);
88 return (
89 StatusCode::INTERNAL_SERVER_ERROR,
90 Json(json!({"error": "InternalError"})),
91 )
92 .into_response();
93 }
94 }
95 }
96 if let Some(uri) = ¶ms.uri {
97 let record = sqlx::query!(
98 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1",
99 uri
100 )
101 .fetch_optional(&state.db)
102 .await;
103 match record {
104 Ok(Some(row)) => {
105 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
106 applied: true,
107 r#ref: Some(r.clone()),
108 });
109 return (
110 StatusCode::OK,
111 Json(SubjectStatus {
112 subject: json!({
113 "$type": "com.atproto.repo.strongRef",
114 "uri": uri,
115 "cid": uri
116 }),
117 takedown,
118 deactivated: None,
119 }),
120 )
121 .into_response();
122 }
123 Ok(None) => {
124 return (
125 StatusCode::NOT_FOUND,
126 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
127 )
128 .into_response();
129 }
130 Err(e) => {
131 error!("DB error in get_subject_status: {:?}", e);
132 return (
133 StatusCode::INTERNAL_SERVER_ERROR,
134 Json(json!({"error": "InternalError"})),
135 )
136 .into_response();
137 }
138 }
139 }
140 if let Some(blob_cid) = ¶ms.blob {
141 let blob = sqlx::query!("SELECT cid, takedown_ref FROM blobs WHERE cid = $1", blob_cid)
142 .fetch_optional(&state.db)
143 .await;
144 match blob {
145 Ok(Some(row)) => {
146 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
147 applied: true,
148 r#ref: Some(r.clone()),
149 });
150 return (
151 StatusCode::OK,
152 Json(SubjectStatus {
153 subject: json!({
154 "$type": "com.atproto.admin.defs#repoBlobRef",
155 "did": "",
156 "cid": row.cid
157 }),
158 takedown,
159 deactivated: None,
160 }),
161 )
162 .into_response();
163 }
164 Ok(None) => {
165 return (
166 StatusCode::NOT_FOUND,
167 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
168 )
169 .into_response();
170 }
171 Err(e) => {
172 error!("DB error in get_subject_status: {:?}", e);
173 return (
174 StatusCode::INTERNAL_SERVER_ERROR,
175 Json(json!({"error": "InternalError"})),
176 )
177 .into_response();
178 }
179 }
180 }
181 (
182 StatusCode::BAD_REQUEST,
183 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
184 )
185 .into_response()
186}
187#[derive(Deserialize)]
188#[serde(rename_all = "camelCase")]
189pub struct UpdateSubjectStatusInput {
190 pub subject: serde_json::Value,
191 pub takedown: Option<StatusAttrInput>,
192 pub deactivated: Option<StatusAttrInput>,
193}
194#[derive(Deserialize)]
195pub struct StatusAttrInput {
196 pub apply: bool,
197 pub r#ref: Option<String>,
198}
199pub async fn update_subject_status(
200 State(state): State<AppState>,
201 headers: axum::http::HeaderMap,
202 Json(input): Json<UpdateSubjectStatusInput>,
203) -> Response {
204 let auth_header = headers.get("Authorization");
205 if auth_header.is_none() {
206 return (
207 StatusCode::UNAUTHORIZED,
208 Json(json!({"error": "AuthenticationRequired"})),
209 )
210 .into_response();
211 }
212 let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
213 match subject_type {
214 Some("com.atproto.admin.defs#repoRef") => {
215 let did = input.subject.get("did").and_then(|d| d.as_str());
216 if let Some(did) = did {
217 let mut tx = match state.db.begin().await {
218 Ok(tx) => tx,
219 Err(e) => {
220 error!("Failed to begin transaction: {:?}", e);
221 return (
222 StatusCode::INTERNAL_SERVER_ERROR,
223 Json(json!({"error": "InternalError"})),
224 )
225 .into_response();
226 }
227 };
228 if let Some(takedown) = &input.takedown {
229 let takedown_ref = if takedown.apply {
230 takedown.r#ref.clone()
231 } else {
232 None
233 };
234 if let Err(e) = sqlx::query!(
235 "UPDATE users SET takedown_ref = $1 WHERE did = $2",
236 takedown_ref,
237 did
238 )
239 .execute(&mut *tx)
240 .await
241 {
242 error!("Failed to update user takedown status for {}: {:?}", did, e);
243 return (
244 StatusCode::INTERNAL_SERVER_ERROR,
245 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
246 )
247 .into_response();
248 }
249 }
250 if let Some(deactivated) = &input.deactivated {
251 let result = if deactivated.apply {
252 sqlx::query!(
253 "UPDATE users SET deactivated_at = NOW() WHERE did = $1",
254 did
255 )
256 .execute(&mut *tx)
257 .await
258 } else {
259 sqlx::query!(
260 "UPDATE users SET deactivated_at = NULL WHERE did = $1",
261 did
262 )
263 .execute(&mut *tx)
264 .await
265 };
266 if let Err(e) = result {
267 error!("Failed to update user deactivation status for {}: {:?}", did, e);
268 return (
269 StatusCode::INTERNAL_SERVER_ERROR,
270 Json(json!({"error": "InternalError", "message": "Failed to update deactivation status"})),
271 )
272 .into_response();
273 }
274 }
275 if let Err(e) = tx.commit().await {
276 error!("Failed to commit transaction: {:?}", e);
277 return (
278 StatusCode::INTERNAL_SERVER_ERROR,
279 Json(json!({"error": "InternalError"})),
280 )
281 .into_response();
282 }
283 if let Some(takedown) = &input.takedown {
284 let status = if takedown.apply { Some("takendown") } else { None };
285 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, did, !takedown.apply, status).await {
286 warn!("Failed to sequence account event for takedown: {}", e);
287 }
288 }
289 if let Some(deactivated) = &input.deactivated {
290 let status = if deactivated.apply { Some("deactivated") } else { None };
291 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, did, !deactivated.apply, status).await {
292 warn!("Failed to sequence account event for deactivation: {}", e);
293 }
294 }
295 if let Ok(Some(handle)) = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
296 .fetch_optional(&state.db)
297 .await
298 {
299 let _ = state.cache.delete(&format!("handle:{}", handle)).await;
300 }
301 return (
302 StatusCode::OK,
303 Json(json!({
304 "subject": input.subject,
305 "takedown": input.takedown.as_ref().map(|t| json!({
306 "applied": t.apply,
307 "ref": t.r#ref
308 })),
309 "deactivated": input.deactivated.as_ref().map(|d| json!({
310 "applied": d.apply
311 }))
312 })),
313 )
314 .into_response();
315 }
316 }
317 Some("com.atproto.repo.strongRef") => {
318 let uri = input.subject.get("uri").and_then(|u| u.as_str());
319 if let Some(uri) = uri {
320 if let Some(takedown) = &input.takedown {
321 let takedown_ref = if takedown.apply {
322 takedown.r#ref.clone()
323 } else {
324 None
325 };
326 if let Err(e) = sqlx::query!(
327 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2",
328 takedown_ref,
329 uri
330 )
331 .execute(&state.db)
332 .await
333 {
334 error!("Failed to update record takedown status for {}: {:?}", uri, e);
335 return (
336 StatusCode::INTERNAL_SERVER_ERROR,
337 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
338 )
339 .into_response();
340 }
341 }
342 return (
343 StatusCode::OK,
344 Json(json!({
345 "subject": input.subject,
346 "takedown": input.takedown.as_ref().map(|t| json!({
347 "applied": t.apply,
348 "ref": t.r#ref
349 }))
350 })),
351 )
352 .into_response();
353 }
354 }
355 Some("com.atproto.admin.defs#repoBlobRef") => {
356 let cid = input.subject.get("cid").and_then(|c| c.as_str());
357 if let Some(cid) = cid {
358 if let Some(takedown) = &input.takedown {
359 let takedown_ref = if takedown.apply {
360 takedown.r#ref.clone()
361 } else {
362 None
363 };
364 if let Err(e) = sqlx::query!(
365 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2",
366 takedown_ref,
367 cid
368 )
369 .execute(&state.db)
370 .await
371 {
372 error!("Failed to update blob takedown status for {}: {:?}", cid, e);
373 return (
374 StatusCode::INTERNAL_SERVER_ERROR,
375 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})),
376 )
377 .into_response();
378 }
379 }
380 return (
381 StatusCode::OK,
382 Json(json!({
383 "subject": input.subject,
384 "takedown": input.takedown.as_ref().map(|t| json!({
385 "applied": t.apply,
386 "ref": t.r#ref
387 }))
388 })),
389 )
390 .into_response();
391 }
392 }
393 _ => {}
394 }
395 (
396 StatusCode::BAD_REQUEST,
397 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
398 )
399 .into_response()
400}