this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use serde::{Deserialize, Serialize}; 9use serde_json::json; 10use tracing::{error, warn}; 11#[derive(Deserialize)] 12pub struct GetSubjectStatusParams { 13 pub did: Option<String>, 14 pub uri: Option<String>, 15 pub blob: Option<String>, 16} 17#[derive(Serialize)] 18pub struct SubjectStatus { 19 pub subject: serde_json::Value, 20 pub takedown: Option<StatusAttr>, 21 pub deactivated: Option<StatusAttr>, 22} 23#[derive(Serialize)] 24#[serde(rename_all = "camelCase")] 25pub struct StatusAttr { 26 pub applied: bool, 27 pub r#ref: Option<String>, 28} 29pub async fn get_subject_status( 30 State(state): State<AppState>, 31 headers: axum::http::HeaderMap, 32 Query(params): Query<GetSubjectStatusParams>, 33) -> Response { 34 let auth_header = headers.get("Authorization"); 35 if auth_header.is_none() { 36 return ( 37 StatusCode::UNAUTHORIZED, 38 Json(json!({"error": "AuthenticationRequired"})), 39 ) 40 .into_response(); 41 } 42 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() { 43 return ( 44 StatusCode::BAD_REQUEST, 45 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})), 46 ) 47 .into_response(); 48 } 49 if let Some(did) = &params.did { 50 let user = sqlx::query!( 51 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1", 52 did 53 ) 54 .fetch_optional(&state.db) 55 .await; 56 match user { 57 Ok(Some(row)) => { 58 let deactivated = row.deactivated_at.map(|_| StatusAttr { 59 applied: true, 60 r#ref: None, 61 }); 62 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 63 applied: true, 64 r#ref: Some(r.clone()), 65 }); 66 return ( 67 StatusCode::OK, 68 Json(SubjectStatus { 69 subject: json!({ 70 "$type": "com.atproto.admin.defs#repoRef", 71 "did": row.did 72 }), 73 takedown, 74 deactivated, 75 }), 76 ) 77 .into_response(); 78 } 79 Ok(None) => { 80 return ( 81 StatusCode::NOT_FOUND, 82 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 83 ) 84 .into_response(); 85 } 86 Err(e) => { 87 error!("DB error in get_subject_status: {:?}", e); 88 return ( 89 StatusCode::INTERNAL_SERVER_ERROR, 90 Json(json!({"error": "InternalError"})), 91 ) 92 .into_response(); 93 } 94 } 95 } 96 if let Some(uri) = &params.uri { 97 let record = sqlx::query!( 98 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1", 99 uri 100 ) 101 .fetch_optional(&state.db) 102 .await; 103 match record { 104 Ok(Some(row)) => { 105 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 106 applied: true, 107 r#ref: Some(r.clone()), 108 }); 109 return ( 110 StatusCode::OK, 111 Json(SubjectStatus { 112 subject: json!({ 113 "$type": "com.atproto.repo.strongRef", 114 "uri": uri, 115 "cid": uri 116 }), 117 takedown, 118 deactivated: None, 119 }), 120 ) 121 .into_response(); 122 } 123 Ok(None) => { 124 return ( 125 StatusCode::NOT_FOUND, 126 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 127 ) 128 .into_response(); 129 } 130 Err(e) => { 131 error!("DB error in get_subject_status: {:?}", e); 132 return ( 133 StatusCode::INTERNAL_SERVER_ERROR, 134 Json(json!({"error": "InternalError"})), 135 ) 136 .into_response(); 137 } 138 } 139 } 140 if let Some(blob_cid) = &params.blob { 141 let blob = sqlx::query!("SELECT cid, takedown_ref FROM blobs WHERE cid = $1", blob_cid) 142 .fetch_optional(&state.db) 143 .await; 144 match blob { 145 Ok(Some(row)) => { 146 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 147 applied: true, 148 r#ref: Some(r.clone()), 149 }); 150 return ( 151 StatusCode::OK, 152 Json(SubjectStatus { 153 subject: json!({ 154 "$type": "com.atproto.admin.defs#repoBlobRef", 155 "did": "", 156 "cid": row.cid 157 }), 158 takedown, 159 deactivated: None, 160 }), 161 ) 162 .into_response(); 163 } 164 Ok(None) => { 165 return ( 166 StatusCode::NOT_FOUND, 167 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 168 ) 169 .into_response(); 170 } 171 Err(e) => { 172 error!("DB error in get_subject_status: {:?}", e); 173 return ( 174 StatusCode::INTERNAL_SERVER_ERROR, 175 Json(json!({"error": "InternalError"})), 176 ) 177 .into_response(); 178 } 179 } 180 } 181 ( 182 StatusCode::BAD_REQUEST, 183 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 184 ) 185 .into_response() 186} 187#[derive(Deserialize)] 188#[serde(rename_all = "camelCase")] 189pub struct UpdateSubjectStatusInput { 190 pub subject: serde_json::Value, 191 pub takedown: Option<StatusAttrInput>, 192 pub deactivated: Option<StatusAttrInput>, 193} 194#[derive(Deserialize)] 195pub struct StatusAttrInput { 196 pub apply: bool, 197 pub r#ref: Option<String>, 198} 199pub async fn update_subject_status( 200 State(state): State<AppState>, 201 headers: axum::http::HeaderMap, 202 Json(input): Json<UpdateSubjectStatusInput>, 203) -> Response { 204 let auth_header = headers.get("Authorization"); 205 if auth_header.is_none() { 206 return ( 207 StatusCode::UNAUTHORIZED, 208 Json(json!({"error": "AuthenticationRequired"})), 209 ) 210 .into_response(); 211 } 212 let subject_type = input.subject.get("$type").and_then(|t| t.as_str()); 213 match subject_type { 214 Some("com.atproto.admin.defs#repoRef") => { 215 let did = input.subject.get("did").and_then(|d| d.as_str()); 216 if let Some(did) = did { 217 let mut tx = match state.db.begin().await { 218 Ok(tx) => tx, 219 Err(e) => { 220 error!("Failed to begin transaction: {:?}", e); 221 return ( 222 StatusCode::INTERNAL_SERVER_ERROR, 223 Json(json!({"error": "InternalError"})), 224 ) 225 .into_response(); 226 } 227 }; 228 if let Some(takedown) = &input.takedown { 229 let takedown_ref = if takedown.apply { 230 takedown.r#ref.clone() 231 } else { 232 None 233 }; 234 if let Err(e) = sqlx::query!( 235 "UPDATE users SET takedown_ref = $1 WHERE did = $2", 236 takedown_ref, 237 did 238 ) 239 .execute(&mut *tx) 240 .await 241 { 242 error!("Failed to update user takedown status for {}: {:?}", did, e); 243 return ( 244 StatusCode::INTERNAL_SERVER_ERROR, 245 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 246 ) 247 .into_response(); 248 } 249 } 250 if let Some(deactivated) = &input.deactivated { 251 let result = if deactivated.apply { 252 sqlx::query!( 253 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 254 did 255 ) 256 .execute(&mut *tx) 257 .await 258 } else { 259 sqlx::query!( 260 "UPDATE users SET deactivated_at = NULL WHERE did = $1", 261 did 262 ) 263 .execute(&mut *tx) 264 .await 265 }; 266 if let Err(e) = result { 267 error!("Failed to update user deactivation status for {}: {:?}", did, e); 268 return ( 269 StatusCode::INTERNAL_SERVER_ERROR, 270 Json(json!({"error": "InternalError", "message": "Failed to update deactivation status"})), 271 ) 272 .into_response(); 273 } 274 } 275 if let Err(e) = tx.commit().await { 276 error!("Failed to commit transaction: {:?}", e); 277 return ( 278 StatusCode::INTERNAL_SERVER_ERROR, 279 Json(json!({"error": "InternalError"})), 280 ) 281 .into_response(); 282 } 283 if let Some(takedown) = &input.takedown { 284 let status = if takedown.apply { Some("takendown") } else { None }; 285 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, did, !takedown.apply, status).await { 286 warn!("Failed to sequence account event for takedown: {}", e); 287 } 288 } 289 if let Some(deactivated) = &input.deactivated { 290 let status = if deactivated.apply { Some("deactivated") } else { None }; 291 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, did, !deactivated.apply, status).await { 292 warn!("Failed to sequence account event for deactivation: {}", e); 293 } 294 } 295 if let Ok(Some(handle)) = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 296 .fetch_optional(&state.db) 297 .await 298 { 299 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 300 } 301 return ( 302 StatusCode::OK, 303 Json(json!({ 304 "subject": input.subject, 305 "takedown": input.takedown.as_ref().map(|t| json!({ 306 "applied": t.apply, 307 "ref": t.r#ref 308 })), 309 "deactivated": input.deactivated.as_ref().map(|d| json!({ 310 "applied": d.apply 311 })) 312 })), 313 ) 314 .into_response(); 315 } 316 } 317 Some("com.atproto.repo.strongRef") => { 318 let uri = input.subject.get("uri").and_then(|u| u.as_str()); 319 if let Some(uri) = uri { 320 if let Some(takedown) = &input.takedown { 321 let takedown_ref = if takedown.apply { 322 takedown.r#ref.clone() 323 } else { 324 None 325 }; 326 if let Err(e) = sqlx::query!( 327 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2", 328 takedown_ref, 329 uri 330 ) 331 .execute(&state.db) 332 .await 333 { 334 error!("Failed to update record takedown status for {}: {:?}", uri, e); 335 return ( 336 StatusCode::INTERNAL_SERVER_ERROR, 337 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 338 ) 339 .into_response(); 340 } 341 } 342 return ( 343 StatusCode::OK, 344 Json(json!({ 345 "subject": input.subject, 346 "takedown": input.takedown.as_ref().map(|t| json!({ 347 "applied": t.apply, 348 "ref": t.r#ref 349 })) 350 })), 351 ) 352 .into_response(); 353 } 354 } 355 Some("com.atproto.admin.defs#repoBlobRef") => { 356 let cid = input.subject.get("cid").and_then(|c| c.as_str()); 357 if let Some(cid) = cid { 358 if let Some(takedown) = &input.takedown { 359 let takedown_ref = if takedown.apply { 360 takedown.r#ref.clone() 361 } else { 362 None 363 }; 364 if let Err(e) = sqlx::query!( 365 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2", 366 takedown_ref, 367 cid 368 ) 369 .execute(&state.db) 370 .await 371 { 372 error!("Failed to update blob takedown status for {}: {:?}", cid, e); 373 return ( 374 StatusCode::INTERNAL_SERVER_ERROR, 375 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 376 ) 377 .into_response(); 378 } 379 } 380 return ( 381 StatusCode::OK, 382 Json(json!({ 383 "subject": input.subject, 384 "takedown": input.takedown.as_ref().map(|t| json!({ 385 "applied": t.apply, 386 "ref": t.r#ref 387 })) 388 })), 389 ) 390 .into_response(); 391 } 392 } 393 _ => {} 394 } 395 ( 396 StatusCode::BAD_REQUEST, 397 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 398 ) 399 .into_response() 400}