this repo has no description
1use crate::auth::BearerAuthAdmin; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use serde::{Deserialize, Serialize}; 10use serde_json::json; 11use tracing::{error, warn}; 12 13#[derive(Deserialize)] 14pub struct GetSubjectStatusParams { 15 pub did: Option<String>, 16 pub uri: Option<String>, 17 pub blob: Option<String>, 18} 19 20#[derive(Serialize)] 21pub struct SubjectStatus { 22 pub subject: serde_json::Value, 23 pub takedown: Option<StatusAttr>, 24 pub deactivated: Option<StatusAttr>, 25} 26 27#[derive(Serialize)] 28#[serde(rename_all = "camelCase")] 29pub struct StatusAttr { 30 pub applied: bool, 31 pub r#ref: Option<String>, 32} 33 34pub async fn get_subject_status( 35 State(state): State<AppState>, 36 _auth: BearerAuthAdmin, 37 Query(params): Query<GetSubjectStatusParams>, 38) -> Response { 39 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() { 40 return ( 41 StatusCode::BAD_REQUEST, 42 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})), 43 ) 44 .into_response(); 45 } 46 if let Some(did) = &params.did { 47 let user = sqlx::query!( 48 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1", 49 did 50 ) 51 .fetch_optional(&state.db) 52 .await; 53 match user { 54 Ok(Some(row)) => { 55 let deactivated = row.deactivated_at.map(|_| StatusAttr { 56 applied: true, 57 r#ref: None, 58 }); 59 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 60 applied: true, 61 r#ref: Some(r.clone()), 62 }); 63 return ( 64 StatusCode::OK, 65 Json(SubjectStatus { 66 subject: json!({ 67 "$type": "com.atproto.admin.defs#repoRef", 68 "did": row.did 69 }), 70 takedown, 71 deactivated, 72 }), 73 ) 74 .into_response(); 75 } 76 Ok(None) => { 77 return ( 78 StatusCode::NOT_FOUND, 79 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 80 ) 81 .into_response(); 82 } 83 Err(e) => { 84 error!("DB error in get_subject_status: {:?}", e); 85 return ( 86 StatusCode::INTERNAL_SERVER_ERROR, 87 Json(json!({"error": "InternalError"})), 88 ) 89 .into_response(); 90 } 91 } 92 } 93 if let Some(uri) = &params.uri { 94 let record = sqlx::query!( 95 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1", 96 uri 97 ) 98 .fetch_optional(&state.db) 99 .await; 100 match record { 101 Ok(Some(row)) => { 102 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 103 applied: true, 104 r#ref: Some(r.clone()), 105 }); 106 return ( 107 StatusCode::OK, 108 Json(SubjectStatus { 109 subject: json!({ 110 "$type": "com.atproto.repo.strongRef", 111 "uri": uri, 112 "cid": uri 113 }), 114 takedown, 115 deactivated: None, 116 }), 117 ) 118 .into_response(); 119 } 120 Ok(None) => { 121 return ( 122 StatusCode::NOT_FOUND, 123 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 124 ) 125 .into_response(); 126 } 127 Err(e) => { 128 error!("DB error in get_subject_status: {:?}", e); 129 return ( 130 StatusCode::INTERNAL_SERVER_ERROR, 131 Json(json!({"error": "InternalError"})), 132 ) 133 .into_response(); 134 } 135 } 136 } 137 if let Some(blob_cid) = &params.blob { 138 let blob = sqlx::query!( 139 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1", 140 blob_cid 141 ) 142 .fetch_optional(&state.db) 143 .await; 144 match blob { 145 Ok(Some(row)) => { 146 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 147 applied: true, 148 r#ref: Some(r.clone()), 149 }); 150 return ( 151 StatusCode::OK, 152 Json(SubjectStatus { 153 subject: json!({ 154 "$type": "com.atproto.admin.defs#repoBlobRef", 155 "did": "", 156 "cid": row.cid 157 }), 158 takedown, 159 deactivated: None, 160 }), 161 ) 162 .into_response(); 163 } 164 Ok(None) => { 165 return ( 166 StatusCode::NOT_FOUND, 167 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 168 ) 169 .into_response(); 170 } 171 Err(e) => { 172 error!("DB error in get_subject_status: {:?}", e); 173 return ( 174 StatusCode::INTERNAL_SERVER_ERROR, 175 Json(json!({"error": "InternalError"})), 176 ) 177 .into_response(); 178 } 179 } 180 } 181 ( 182 StatusCode::BAD_REQUEST, 183 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 184 ) 185 .into_response() 186} 187 188#[derive(Deserialize)] 189#[serde(rename_all = "camelCase")] 190pub struct UpdateSubjectStatusInput { 191 pub subject: serde_json::Value, 192 pub takedown: Option<StatusAttrInput>, 193 pub deactivated: Option<StatusAttrInput>, 194} 195 196#[derive(Deserialize)] 197pub struct StatusAttrInput { 198 pub apply: bool, 199 pub r#ref: Option<String>, 200} 201 202pub async fn update_subject_status( 203 State(state): State<AppState>, 204 _auth: BearerAuthAdmin, 205 Json(input): Json<UpdateSubjectStatusInput>, 206) -> Response { 207 let subject_type = input.subject.get("$type").and_then(|t| t.as_str()); 208 match subject_type { 209 Some("com.atproto.admin.defs#repoRef") => { 210 let did = input.subject.get("did").and_then(|d| d.as_str()); 211 if let Some(did) = did { 212 let mut tx = match state.db.begin().await { 213 Ok(tx) => tx, 214 Err(e) => { 215 error!("Failed to begin transaction: {:?}", e); 216 return ( 217 StatusCode::INTERNAL_SERVER_ERROR, 218 Json(json!({"error": "InternalError"})), 219 ) 220 .into_response(); 221 } 222 }; 223 if let Some(takedown) = &input.takedown { 224 let takedown_ref = if takedown.apply { 225 takedown.r#ref.clone() 226 } else { 227 None 228 }; 229 if let Err(e) = sqlx::query!( 230 "UPDATE users SET takedown_ref = $1 WHERE did = $2", 231 takedown_ref, 232 did 233 ) 234 .execute(&mut *tx) 235 .await 236 { 237 error!("Failed to update user takedown status for {}: {:?}", did, e); 238 return ( 239 StatusCode::INTERNAL_SERVER_ERROR, 240 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 241 ) 242 .into_response(); 243 } 244 } 245 if let Some(deactivated) = &input.deactivated { 246 let result = if deactivated.apply { 247 sqlx::query!( 248 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 249 did 250 ) 251 .execute(&mut *tx) 252 .await 253 } else { 254 sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 255 .execute(&mut *tx) 256 .await 257 }; 258 if let Err(e) = result { 259 error!( 260 "Failed to update user deactivation status for {}: {:?}", 261 did, e 262 ); 263 return ( 264 StatusCode::INTERNAL_SERVER_ERROR, 265 Json(json!({"error": "InternalError", "message": "Failed to update deactivation status"})), 266 ) 267 .into_response(); 268 } 269 } 270 if let Err(e) = tx.commit().await { 271 error!("Failed to commit transaction: {:?}", e); 272 return ( 273 StatusCode::INTERNAL_SERVER_ERROR, 274 Json(json!({"error": "InternalError"})), 275 ) 276 .into_response(); 277 } 278 if let Some(takedown) = &input.takedown { 279 let status = if takedown.apply { 280 Some("takendown") 281 } else { 282 None 283 }; 284 if let Err(e) = crate::api::repo::record::sequence_account_event( 285 &state, 286 did, 287 !takedown.apply, 288 status, 289 ) 290 .await 291 { 292 warn!("Failed to sequence account event for takedown: {}", e); 293 } 294 } 295 if let Some(deactivated) = &input.deactivated { 296 let status = if deactivated.apply { 297 Some("deactivated") 298 } else { 299 None 300 }; 301 if let Err(e) = crate::api::repo::record::sequence_account_event( 302 &state, 303 did, 304 !deactivated.apply, 305 status, 306 ) 307 .await 308 { 309 warn!("Failed to sequence account event for deactivation: {}", e); 310 } 311 } 312 if let Ok(Some(handle)) = 313 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 314 .fetch_optional(&state.db) 315 .await 316 { 317 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 318 } 319 return ( 320 StatusCode::OK, 321 Json(json!({ 322 "subject": input.subject, 323 "takedown": input.takedown.as_ref().map(|t| json!({ 324 "applied": t.apply, 325 "ref": t.r#ref 326 })), 327 "deactivated": input.deactivated.as_ref().map(|d| json!({ 328 "applied": d.apply 329 })) 330 })), 331 ) 332 .into_response(); 333 } 334 } 335 Some("com.atproto.repo.strongRef") => { 336 let uri = input.subject.get("uri").and_then(|u| u.as_str()); 337 if let Some(uri) = uri { 338 if let Some(takedown) = &input.takedown { 339 let takedown_ref = if takedown.apply { 340 takedown.r#ref.clone() 341 } else { 342 None 343 }; 344 if let Err(e) = sqlx::query!( 345 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2", 346 takedown_ref, 347 uri 348 ) 349 .execute(&state.db) 350 .await 351 { 352 error!( 353 "Failed to update record takedown status for {}: {:?}", 354 uri, e 355 ); 356 return ( 357 StatusCode::INTERNAL_SERVER_ERROR, 358 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 359 ) 360 .into_response(); 361 } 362 } 363 return ( 364 StatusCode::OK, 365 Json(json!({ 366 "subject": input.subject, 367 "takedown": input.takedown.as_ref().map(|t| json!({ 368 "applied": t.apply, 369 "ref": t.r#ref 370 })) 371 })), 372 ) 373 .into_response(); 374 } 375 } 376 Some("com.atproto.admin.defs#repoBlobRef") => { 377 let cid = input.subject.get("cid").and_then(|c| c.as_str()); 378 if let Some(cid) = cid { 379 if let Some(takedown) = &input.takedown { 380 let takedown_ref = if takedown.apply { 381 takedown.r#ref.clone() 382 } else { 383 None 384 }; 385 if let Err(e) = sqlx::query!( 386 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2", 387 takedown_ref, 388 cid 389 ) 390 .execute(&state.db) 391 .await 392 { 393 error!("Failed to update blob takedown status for {}: {:?}", cid, e); 394 return ( 395 StatusCode::INTERNAL_SERVER_ERROR, 396 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 397 ) 398 .into_response(); 399 } 400 } 401 return ( 402 StatusCode::OK, 403 Json(json!({ 404 "subject": input.subject, 405 "takedown": input.takedown.as_ref().map(|t| json!({ 406 "applied": t.apply, 407 "ref": t.r#ref 408 })) 409 })), 410 ) 411 .into_response(); 412 } 413 } 414 _ => {} 415 } 416 ( 417 StatusCode::BAD_REQUEST, 418 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 419 ) 420 .into_response() 421}