this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use serde::{Deserialize, Serialize}; 9use serde_json::json; 10use tracing::{error, warn}; 11 12#[derive(Deserialize)] 13pub struct GetSubjectStatusParams { 14 pub did: Option<String>, 15 pub uri: Option<String>, 16 pub blob: Option<String>, 17} 18 19#[derive(Serialize)] 20pub struct SubjectStatus { 21 pub subject: serde_json::Value, 22 pub takedown: Option<StatusAttr>, 23 pub deactivated: Option<StatusAttr>, 24} 25 26#[derive(Serialize)] 27#[serde(rename_all = "camelCase")] 28pub struct StatusAttr { 29 pub applied: bool, 30 pub r#ref: Option<String>, 31} 32 33pub async fn get_subject_status( 34 State(state): State<AppState>, 35 headers: axum::http::HeaderMap, 36 Query(params): Query<GetSubjectStatusParams>, 37) -> Response { 38 let auth_header = headers.get("Authorization"); 39 if auth_header.is_none() { 40 return ( 41 StatusCode::UNAUTHORIZED, 42 Json(json!({"error": "AuthenticationRequired"})), 43 ) 44 .into_response(); 45 } 46 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() { 47 return ( 48 StatusCode::BAD_REQUEST, 49 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})), 50 ) 51 .into_response(); 52 } 53 if let Some(did) = &params.did { 54 let user = sqlx::query!( 55 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1", 56 did 57 ) 58 .fetch_optional(&state.db) 59 .await; 60 match user { 61 Ok(Some(row)) => { 62 let deactivated = row.deactivated_at.map(|_| StatusAttr { 63 applied: true, 64 r#ref: None, 65 }); 66 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 67 applied: true, 68 r#ref: Some(r.clone()), 69 }); 70 return ( 71 StatusCode::OK, 72 Json(SubjectStatus { 73 subject: json!({ 74 "$type": "com.atproto.admin.defs#repoRef", 75 "did": row.did 76 }), 77 takedown, 78 deactivated, 79 }), 80 ) 81 .into_response(); 82 } 83 Ok(None) => { 84 return ( 85 StatusCode::NOT_FOUND, 86 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 87 ) 88 .into_response(); 89 } 90 Err(e) => { 91 error!("DB error in get_subject_status: {:?}", e); 92 return ( 93 StatusCode::INTERNAL_SERVER_ERROR, 94 Json(json!({"error": "InternalError"})), 95 ) 96 .into_response(); 97 } 98 } 99 } 100 if let Some(uri) = &params.uri { 101 let record = sqlx::query!( 102 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1", 103 uri 104 ) 105 .fetch_optional(&state.db) 106 .await; 107 match record { 108 Ok(Some(row)) => { 109 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 110 applied: true, 111 r#ref: Some(r.clone()), 112 }); 113 return ( 114 StatusCode::OK, 115 Json(SubjectStatus { 116 subject: json!({ 117 "$type": "com.atproto.repo.strongRef", 118 "uri": uri, 119 "cid": uri 120 }), 121 takedown, 122 deactivated: None, 123 }), 124 ) 125 .into_response(); 126 } 127 Ok(None) => { 128 return ( 129 StatusCode::NOT_FOUND, 130 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 131 ) 132 .into_response(); 133 } 134 Err(e) => { 135 error!("DB error in get_subject_status: {:?}", e); 136 return ( 137 StatusCode::INTERNAL_SERVER_ERROR, 138 Json(json!({"error": "InternalError"})), 139 ) 140 .into_response(); 141 } 142 } 143 } 144 if let Some(blob_cid) = &params.blob { 145 let blob = sqlx::query!( 146 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1", 147 blob_cid 148 ) 149 .fetch_optional(&state.db) 150 .await; 151 match blob { 152 Ok(Some(row)) => { 153 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 154 applied: true, 155 r#ref: Some(r.clone()), 156 }); 157 return ( 158 StatusCode::OK, 159 Json(SubjectStatus { 160 subject: json!({ 161 "$type": "com.atproto.admin.defs#repoBlobRef", 162 "did": "", 163 "cid": row.cid 164 }), 165 takedown, 166 deactivated: None, 167 }), 168 ) 169 .into_response(); 170 } 171 Ok(None) => { 172 return ( 173 StatusCode::NOT_FOUND, 174 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 175 ) 176 .into_response(); 177 } 178 Err(e) => { 179 error!("DB error in get_subject_status: {:?}", e); 180 return ( 181 StatusCode::INTERNAL_SERVER_ERROR, 182 Json(json!({"error": "InternalError"})), 183 ) 184 .into_response(); 185 } 186 } 187 } 188 ( 189 StatusCode::BAD_REQUEST, 190 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 191 ) 192 .into_response() 193} 194 195#[derive(Deserialize)] 196#[serde(rename_all = "camelCase")] 197pub struct UpdateSubjectStatusInput { 198 pub subject: serde_json::Value, 199 pub takedown: Option<StatusAttrInput>, 200 pub deactivated: Option<StatusAttrInput>, 201} 202 203#[derive(Deserialize)] 204pub struct StatusAttrInput { 205 pub apply: bool, 206 pub r#ref: Option<String>, 207} 208 209pub async fn update_subject_status( 210 State(state): State<AppState>, 211 headers: axum::http::HeaderMap, 212 Json(input): Json<UpdateSubjectStatusInput>, 213) -> Response { 214 let auth_header = headers.get("Authorization"); 215 if auth_header.is_none() { 216 return ( 217 StatusCode::UNAUTHORIZED, 218 Json(json!({"error": "AuthenticationRequired"})), 219 ) 220 .into_response(); 221 } 222 let subject_type = input.subject.get("$type").and_then(|t| t.as_str()); 223 match subject_type { 224 Some("com.atproto.admin.defs#repoRef") => { 225 let did = input.subject.get("did").and_then(|d| d.as_str()); 226 if let Some(did) = did { 227 let mut tx = match state.db.begin().await { 228 Ok(tx) => tx, 229 Err(e) => { 230 error!("Failed to begin transaction: {:?}", e); 231 return ( 232 StatusCode::INTERNAL_SERVER_ERROR, 233 Json(json!({"error": "InternalError"})), 234 ) 235 .into_response(); 236 } 237 }; 238 if let Some(takedown) = &input.takedown { 239 let takedown_ref = if takedown.apply { 240 takedown.r#ref.clone() 241 } else { 242 None 243 }; 244 if let Err(e) = sqlx::query!( 245 "UPDATE users SET takedown_ref = $1 WHERE did = $2", 246 takedown_ref, 247 did 248 ) 249 .execute(&mut *tx) 250 .await 251 { 252 error!("Failed to update user takedown status for {}: {:?}", did, e); 253 return ( 254 StatusCode::INTERNAL_SERVER_ERROR, 255 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 256 ) 257 .into_response(); 258 } 259 } 260 if let Some(deactivated) = &input.deactivated { 261 let result = if deactivated.apply { 262 sqlx::query!( 263 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 264 did 265 ) 266 .execute(&mut *tx) 267 .await 268 } else { 269 sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 270 .execute(&mut *tx) 271 .await 272 }; 273 if let Err(e) = result { 274 error!( 275 "Failed to update user deactivation status for {}: {:?}", 276 did, e 277 ); 278 return ( 279 StatusCode::INTERNAL_SERVER_ERROR, 280 Json(json!({"error": "InternalError", "message": "Failed to update deactivation status"})), 281 ) 282 .into_response(); 283 } 284 } 285 if let Err(e) = tx.commit().await { 286 error!("Failed to commit transaction: {:?}", e); 287 return ( 288 StatusCode::INTERNAL_SERVER_ERROR, 289 Json(json!({"error": "InternalError"})), 290 ) 291 .into_response(); 292 } 293 if let Some(takedown) = &input.takedown { 294 let status = if takedown.apply { 295 Some("takendown") 296 } else { 297 None 298 }; 299 if let Err(e) = crate::api::repo::record::sequence_account_event( 300 &state, 301 did, 302 !takedown.apply, 303 status, 304 ) 305 .await 306 { 307 warn!("Failed to sequence account event for takedown: {}", e); 308 } 309 } 310 if let Some(deactivated) = &input.deactivated { 311 let status = if deactivated.apply { 312 Some("deactivated") 313 } else { 314 None 315 }; 316 if let Err(e) = crate::api::repo::record::sequence_account_event( 317 &state, 318 did, 319 !deactivated.apply, 320 status, 321 ) 322 .await 323 { 324 warn!("Failed to sequence account event for deactivation: {}", e); 325 } 326 } 327 if let Ok(Some(handle)) = 328 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 329 .fetch_optional(&state.db) 330 .await 331 { 332 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 333 } 334 return ( 335 StatusCode::OK, 336 Json(json!({ 337 "subject": input.subject, 338 "takedown": input.takedown.as_ref().map(|t| json!({ 339 "applied": t.apply, 340 "ref": t.r#ref 341 })), 342 "deactivated": input.deactivated.as_ref().map(|d| json!({ 343 "applied": d.apply 344 })) 345 })), 346 ) 347 .into_response(); 348 } 349 } 350 Some("com.atproto.repo.strongRef") => { 351 let uri = input.subject.get("uri").and_then(|u| u.as_str()); 352 if let Some(uri) = uri { 353 if let Some(takedown) = &input.takedown { 354 let takedown_ref = if takedown.apply { 355 takedown.r#ref.clone() 356 } else { 357 None 358 }; 359 if let Err(e) = sqlx::query!( 360 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2", 361 takedown_ref, 362 uri 363 ) 364 .execute(&state.db) 365 .await 366 { 367 error!( 368 "Failed to update record takedown status for {}: {:?}", 369 uri, e 370 ); 371 return ( 372 StatusCode::INTERNAL_SERVER_ERROR, 373 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 374 ) 375 .into_response(); 376 } 377 } 378 return ( 379 StatusCode::OK, 380 Json(json!({ 381 "subject": input.subject, 382 "takedown": input.takedown.as_ref().map(|t| json!({ 383 "applied": t.apply, 384 "ref": t.r#ref 385 })) 386 })), 387 ) 388 .into_response(); 389 } 390 } 391 Some("com.atproto.admin.defs#repoBlobRef") => { 392 let cid = input.subject.get("cid").and_then(|c| c.as_str()); 393 if let Some(cid) = cid { 394 if let Some(takedown) = &input.takedown { 395 let takedown_ref = if takedown.apply { 396 takedown.r#ref.clone() 397 } else { 398 None 399 }; 400 if let Err(e) = sqlx::query!( 401 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2", 402 takedown_ref, 403 cid 404 ) 405 .execute(&state.db) 406 .await 407 { 408 error!("Failed to update blob takedown status for {}: {:?}", cid, e); 409 return ( 410 StatusCode::INTERNAL_SERVER_ERROR, 411 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 412 ) 413 .into_response(); 414 } 415 } 416 return ( 417 StatusCode::OK, 418 Json(json!({ 419 "subject": input.subject, 420 "takedown": input.takedown.as_ref().map(|t| json!({ 421 "applied": t.apply, 422 "ref": t.r#ref 423 })) 424 })), 425 ) 426 .into_response(); 427 } 428 } 429 _ => {} 430 } 431 ( 432 StatusCode::BAD_REQUEST, 433 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 434 ) 435 .into_response() 436}