this repo has no description
1use crate::auth::BearerAuthAdmin; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use serde::{Deserialize, Serialize}; 10use serde_json::json; 11use tracing::{error, warn}; 12 13#[derive(Deserialize)] 14pub struct GetSubjectStatusParams { 15 pub did: Option<String>, 16 pub uri: Option<String>, 17 pub blob: Option<String>, 18} 19 20#[derive(Serialize)] 21pub struct SubjectStatus { 22 pub subject: serde_json::Value, 23 pub takedown: Option<StatusAttr>, 24 pub deactivated: Option<StatusAttr>, 25} 26 27#[derive(Serialize)] 28#[serde(rename_all = "camelCase")] 29pub struct StatusAttr { 30 pub applied: bool, 31 pub r#ref: Option<String>, 32} 33 34pub async fn get_subject_status( 35 State(state): State<AppState>, 36 _auth: BearerAuthAdmin, 37 Query(params): Query<GetSubjectStatusParams>, 38) -> Response { 39 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() { 40 return ( 41 StatusCode::BAD_REQUEST, 42 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})), 43 ) 44 .into_response(); 45 } 46 if let Some(did) = &params.did { 47 let user = sqlx::query!( 48 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1", 49 did 50 ) 51 .fetch_optional(&state.db) 52 .await; 53 match user { 54 Ok(Some(row)) => { 55 let deactivated = row.deactivated_at.map(|_| StatusAttr { 56 applied: true, 57 r#ref: None, 58 }); 59 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 60 applied: true, 61 r#ref: Some(r.clone()), 62 }); 63 return ( 64 StatusCode::OK, 65 Json(SubjectStatus { 66 subject: json!({ 67 "$type": "com.atproto.admin.defs#repoRef", 68 "did": row.did 69 }), 70 takedown, 71 deactivated, 72 }), 73 ) 74 .into_response(); 75 } 76 Ok(None) => { 77 return ( 78 StatusCode::NOT_FOUND, 79 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 80 ) 81 .into_response(); 82 } 83 Err(e) => { 84 error!("DB error in get_subject_status: {:?}", e); 85 return ( 86 StatusCode::INTERNAL_SERVER_ERROR, 87 Json(json!({"error": "InternalError"})), 88 ) 89 .into_response(); 90 } 91 } 92 } 93 if let Some(uri) = &params.uri { 94 let record = sqlx::query!( 95 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1", 96 uri 97 ) 98 .fetch_optional(&state.db) 99 .await; 100 match record { 101 Ok(Some(row)) => { 102 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 103 applied: true, 104 r#ref: Some(r.clone()), 105 }); 106 return ( 107 StatusCode::OK, 108 Json(SubjectStatus { 109 subject: json!({ 110 "$type": "com.atproto.repo.strongRef", 111 "uri": uri, 112 "cid": uri 113 }), 114 takedown, 115 deactivated: None, 116 }), 117 ) 118 .into_response(); 119 } 120 Ok(None) => { 121 return ( 122 StatusCode::NOT_FOUND, 123 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 124 ) 125 .into_response(); 126 } 127 Err(e) => { 128 error!("DB error in get_subject_status: {:?}", e); 129 return ( 130 StatusCode::INTERNAL_SERVER_ERROR, 131 Json(json!({"error": "InternalError"})), 132 ) 133 .into_response(); 134 } 135 } 136 } 137 if let Some(blob_cid) = &params.blob { 138 let did = match &params.did { 139 Some(d) => d, 140 None => { 141 return ( 142 StatusCode::BAD_REQUEST, 143 Json(json!({"error": "InvalidRequest", "message": "Must provide a did to request blob state"})), 144 ) 145 .into_response(); 146 } 147 }; 148 let blob = sqlx::query!( 149 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1", 150 blob_cid 151 ) 152 .fetch_optional(&state.db) 153 .await; 154 match blob { 155 Ok(Some(row)) => { 156 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 157 applied: true, 158 r#ref: Some(r.clone()), 159 }); 160 return ( 161 StatusCode::OK, 162 Json(SubjectStatus { 163 subject: json!({ 164 "$type": "com.atproto.admin.defs#repoBlobRef", 165 "did": did, 166 "cid": row.cid 167 }), 168 takedown, 169 deactivated: None, 170 }), 171 ) 172 .into_response(); 173 } 174 Ok(None) => { 175 return ( 176 StatusCode::NOT_FOUND, 177 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 178 ) 179 .into_response(); 180 } 181 Err(e) => { 182 error!("DB error in get_subject_status: {:?}", e); 183 return ( 184 StatusCode::INTERNAL_SERVER_ERROR, 185 Json(json!({"error": "InternalError"})), 186 ) 187 .into_response(); 188 } 189 } 190 } 191 ( 192 StatusCode::BAD_REQUEST, 193 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 194 ) 195 .into_response() 196} 197 198#[derive(Deserialize)] 199#[serde(rename_all = "camelCase")] 200pub struct UpdateSubjectStatusInput { 201 pub subject: serde_json::Value, 202 pub takedown: Option<StatusAttrInput>, 203 pub deactivated: Option<StatusAttrInput>, 204} 205 206#[derive(Deserialize)] 207pub struct StatusAttrInput { 208 pub applied: bool, 209 pub r#ref: Option<String>, 210} 211 212pub async fn update_subject_status( 213 State(state): State<AppState>, 214 _auth: BearerAuthAdmin, 215 Json(input): Json<UpdateSubjectStatusInput>, 216) -> Response { 217 let subject_type = input.subject.get("$type").and_then(|t| t.as_str()); 218 match subject_type { 219 Some("com.atproto.admin.defs#repoRef") => { 220 let did = input.subject.get("did").and_then(|d| d.as_str()); 221 if let Some(did) = did { 222 let mut tx = match state.db.begin().await { 223 Ok(tx) => tx, 224 Err(e) => { 225 error!("Failed to begin transaction: {:?}", e); 226 return ( 227 StatusCode::INTERNAL_SERVER_ERROR, 228 Json(json!({"error": "InternalError"})), 229 ) 230 .into_response(); 231 } 232 }; 233 if let Some(takedown) = &input.takedown { 234 let takedown_ref = if takedown.applied { 235 takedown.r#ref.clone() 236 } else { 237 None 238 }; 239 if let Err(e) = sqlx::query!( 240 "UPDATE users SET takedown_ref = $1 WHERE did = $2", 241 takedown_ref, 242 did 243 ) 244 .execute(&mut *tx) 245 .await 246 { 247 error!("Failed to update user takedown status for {}: {:?}", did, e); 248 return ( 249 StatusCode::INTERNAL_SERVER_ERROR, 250 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 251 ) 252 .into_response(); 253 } 254 } 255 if let Some(deactivated) = &input.deactivated { 256 let result = if deactivated.applied { 257 sqlx::query!( 258 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 259 did 260 ) 261 .execute(&mut *tx) 262 .await 263 } else { 264 sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 265 .execute(&mut *tx) 266 .await 267 }; 268 if let Err(e) = result { 269 error!( 270 "Failed to update user deactivation status for {}: {:?}", 271 did, e 272 ); 273 return ( 274 StatusCode::INTERNAL_SERVER_ERROR, 275 Json(json!({"error": "InternalError", "message": "Failed to update deactivation status"})), 276 ) 277 .into_response(); 278 } 279 } 280 if let Err(e) = tx.commit().await { 281 error!("Failed to commit transaction: {:?}", e); 282 return ( 283 StatusCode::INTERNAL_SERVER_ERROR, 284 Json(json!({"error": "InternalError"})), 285 ) 286 .into_response(); 287 } 288 if let Some(takedown) = &input.takedown { 289 let status = if takedown.applied { 290 Some("takendown") 291 } else { 292 None 293 }; 294 if let Err(e) = crate::api::repo::record::sequence_account_event( 295 &state, 296 did, 297 !takedown.applied, 298 status, 299 ) 300 .await 301 { 302 warn!("Failed to sequence account event for takedown: {}", e); 303 } 304 } 305 if let Some(deactivated) = &input.deactivated { 306 let status = if deactivated.applied { 307 Some("deactivated") 308 } else { 309 None 310 }; 311 if let Err(e) = crate::api::repo::record::sequence_account_event( 312 &state, 313 did, 314 !deactivated.applied, 315 status, 316 ) 317 .await 318 { 319 warn!("Failed to sequence account event for deactivation: {}", e); 320 } 321 } 322 if let Ok(Some(handle)) = 323 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 324 .fetch_optional(&state.db) 325 .await 326 { 327 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 328 } 329 return ( 330 StatusCode::OK, 331 Json(json!({ 332 "subject": input.subject, 333 "takedown": input.takedown.as_ref().map(|t| json!({ 334 "applied": t.applied, 335 "ref": t.r#ref 336 })), 337 "deactivated": input.deactivated.as_ref().map(|d| json!({ 338 "applied": d.applied 339 })) 340 })), 341 ) 342 .into_response(); 343 } 344 } 345 Some("com.atproto.repo.strongRef") => { 346 let uri = input.subject.get("uri").and_then(|u| u.as_str()); 347 if let Some(uri) = uri { 348 if let Some(takedown) = &input.takedown { 349 let takedown_ref = if takedown.applied { 350 takedown.r#ref.clone() 351 } else { 352 None 353 }; 354 if let Err(e) = sqlx::query!( 355 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2", 356 takedown_ref, 357 uri 358 ) 359 .execute(&state.db) 360 .await 361 { 362 error!( 363 "Failed to update record takedown status for {}: {:?}", 364 uri, e 365 ); 366 return ( 367 StatusCode::INTERNAL_SERVER_ERROR, 368 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 369 ) 370 .into_response(); 371 } 372 } 373 return ( 374 StatusCode::OK, 375 Json(json!({ 376 "subject": input.subject, 377 "takedown": input.takedown.as_ref().map(|t| json!({ 378 "applied": t.applied, 379 "ref": t.r#ref 380 })) 381 })), 382 ) 383 .into_response(); 384 } 385 } 386 Some("com.atproto.admin.defs#repoBlobRef") => { 387 let cid = input.subject.get("cid").and_then(|c| c.as_str()); 388 if let Some(cid) = cid { 389 if let Some(takedown) = &input.takedown { 390 let takedown_ref = if takedown.applied { 391 takedown.r#ref.clone() 392 } else { 393 None 394 }; 395 if let Err(e) = sqlx::query!( 396 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2", 397 takedown_ref, 398 cid 399 ) 400 .execute(&state.db) 401 .await 402 { 403 error!("Failed to update blob takedown status for {}: {:?}", cid, e); 404 return ( 405 StatusCode::INTERNAL_SERVER_ERROR, 406 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 407 ) 408 .into_response(); 409 } 410 } 411 return ( 412 StatusCode::OK, 413 Json(json!({ 414 "subject": input.subject, 415 "takedown": input.takedown.as_ref().map(|t| json!({ 416 "applied": t.applied, 417 "ref": t.r#ref 418 })) 419 })), 420 ) 421 .into_response(); 422 } 423 } 424 _ => {} 425 } 426 ( 427 StatusCode::BAD_REQUEST, 428 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 429 ) 430 .into_response() 431}