this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use serde::{Deserialize, Serialize}; 9use serde_json::json; 10use tracing::{error, warn}; 11 12#[derive(Deserialize)] 13pub struct GetSubjectStatusParams { 14 pub did: Option<String>, 15 pub uri: Option<String>, 16 pub blob: Option<String>, 17} 18 19#[derive(Serialize)] 20pub struct SubjectStatus { 21 pub subject: serde_json::Value, 22 pub takedown: Option<StatusAttr>, 23 pub deactivated: Option<StatusAttr>, 24} 25 26#[derive(Serialize)] 27#[serde(rename_all = "camelCase")] 28pub struct StatusAttr { 29 pub applied: bool, 30 pub r#ref: Option<String>, 31} 32 33pub async fn get_subject_status( 34 State(state): State<AppState>, 35 headers: axum::http::HeaderMap, 36 Query(params): Query<GetSubjectStatusParams>, 37) -> Response { 38 let auth_header = headers.get("Authorization"); 39 if auth_header.is_none() { 40 return ( 41 StatusCode::UNAUTHORIZED, 42 Json(json!({"error": "AuthenticationRequired"})), 43 ) 44 .into_response(); 45 } 46 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() { 47 return ( 48 StatusCode::BAD_REQUEST, 49 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})), 50 ) 51 .into_response(); 52 } 53 if let Some(did) = &params.did { 54 let user = sqlx::query!( 55 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1", 56 did 57 ) 58 .fetch_optional(&state.db) 59 .await; 60 match user { 61 Ok(Some(row)) => { 62 let deactivated = row.deactivated_at.map(|_| StatusAttr { 63 applied: true, 64 r#ref: None, 65 }); 66 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 67 applied: true, 68 r#ref: Some(r.clone()), 69 }); 70 return ( 71 StatusCode::OK, 72 Json(SubjectStatus { 73 subject: json!({ 74 "$type": "com.atproto.admin.defs#repoRef", 75 "did": row.did 76 }), 77 takedown, 78 deactivated, 79 }), 80 ) 81 .into_response(); 82 } 83 Ok(None) => { 84 return ( 85 StatusCode::NOT_FOUND, 86 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 87 ) 88 .into_response(); 89 } 90 Err(e) => { 91 error!("DB error in get_subject_status: {:?}", e); 92 return ( 93 StatusCode::INTERNAL_SERVER_ERROR, 94 Json(json!({"error": "InternalError"})), 95 ) 96 .into_response(); 97 } 98 } 99 } 100 if let Some(uri) = &params.uri { 101 let record = sqlx::query!( 102 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1", 103 uri 104 ) 105 .fetch_optional(&state.db) 106 .await; 107 match record { 108 Ok(Some(row)) => { 109 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 110 applied: true, 111 r#ref: Some(r.clone()), 112 }); 113 return ( 114 StatusCode::OK, 115 Json(SubjectStatus { 116 subject: json!({ 117 "$type": "com.atproto.repo.strongRef", 118 "uri": uri, 119 "cid": uri 120 }), 121 takedown, 122 deactivated: None, 123 }), 124 ) 125 .into_response(); 126 } 127 Ok(None) => { 128 return ( 129 StatusCode::NOT_FOUND, 130 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 131 ) 132 .into_response(); 133 } 134 Err(e) => { 135 error!("DB error in get_subject_status: {:?}", e); 136 return ( 137 StatusCode::INTERNAL_SERVER_ERROR, 138 Json(json!({"error": "InternalError"})), 139 ) 140 .into_response(); 141 } 142 } 143 } 144 if let Some(blob_cid) = &params.blob { 145 let blob = sqlx::query!("SELECT cid, takedown_ref FROM blobs WHERE cid = $1", blob_cid) 146 .fetch_optional(&state.db) 147 .await; 148 match blob { 149 Ok(Some(row)) => { 150 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 151 applied: true, 152 r#ref: Some(r.clone()), 153 }); 154 return ( 155 StatusCode::OK, 156 Json(SubjectStatus { 157 subject: json!({ 158 "$type": "com.atproto.admin.defs#repoBlobRef", 159 "did": "", 160 "cid": row.cid 161 }), 162 takedown, 163 deactivated: None, 164 }), 165 ) 166 .into_response(); 167 } 168 Ok(None) => { 169 return ( 170 StatusCode::NOT_FOUND, 171 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 172 ) 173 .into_response(); 174 } 175 Err(e) => { 176 error!("DB error in get_subject_status: {:?}", e); 177 return ( 178 StatusCode::INTERNAL_SERVER_ERROR, 179 Json(json!({"error": "InternalError"})), 180 ) 181 .into_response(); 182 } 183 } 184 } 185 ( 186 StatusCode::BAD_REQUEST, 187 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 188 ) 189 .into_response() 190} 191 192#[derive(Deserialize)] 193#[serde(rename_all = "camelCase")] 194pub struct UpdateSubjectStatusInput { 195 pub subject: serde_json::Value, 196 pub takedown: Option<StatusAttrInput>, 197 pub deactivated: Option<StatusAttrInput>, 198} 199 200#[derive(Deserialize)] 201pub struct StatusAttrInput { 202 pub apply: bool, 203 pub r#ref: Option<String>, 204} 205 206pub async fn update_subject_status( 207 State(state): State<AppState>, 208 headers: axum::http::HeaderMap, 209 Json(input): Json<UpdateSubjectStatusInput>, 210) -> Response { 211 let auth_header = headers.get("Authorization"); 212 if auth_header.is_none() { 213 return ( 214 StatusCode::UNAUTHORIZED, 215 Json(json!({"error": "AuthenticationRequired"})), 216 ) 217 .into_response(); 218 } 219 let subject_type = input.subject.get("$type").and_then(|t| t.as_str()); 220 match subject_type { 221 Some("com.atproto.admin.defs#repoRef") => { 222 let did = input.subject.get("did").and_then(|d| d.as_str()); 223 if let Some(did) = did { 224 let mut tx = match state.db.begin().await { 225 Ok(tx) => tx, 226 Err(e) => { 227 error!("Failed to begin transaction: {:?}", e); 228 return ( 229 StatusCode::INTERNAL_SERVER_ERROR, 230 Json(json!({"error": "InternalError"})), 231 ) 232 .into_response(); 233 } 234 }; 235 if let Some(takedown) = &input.takedown { 236 let takedown_ref = if takedown.apply { 237 takedown.r#ref.clone() 238 } else { 239 None 240 }; 241 if let Err(e) = sqlx::query!( 242 "UPDATE users SET takedown_ref = $1 WHERE did = $2", 243 takedown_ref, 244 did 245 ) 246 .execute(&mut *tx) 247 .await 248 { 249 error!("Failed to update user takedown status for {}: {:?}", did, e); 250 return ( 251 StatusCode::INTERNAL_SERVER_ERROR, 252 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 253 ) 254 .into_response(); 255 } 256 } 257 if let Some(deactivated) = &input.deactivated { 258 let result = if deactivated.apply { 259 sqlx::query!( 260 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 261 did 262 ) 263 .execute(&mut *tx) 264 .await 265 } else { 266 sqlx::query!( 267 "UPDATE users SET deactivated_at = NULL WHERE did = $1", 268 did 269 ) 270 .execute(&mut *tx) 271 .await 272 }; 273 if let Err(e) = result { 274 error!("Failed to update user deactivation status for {}: {:?}", did, e); 275 return ( 276 StatusCode::INTERNAL_SERVER_ERROR, 277 Json(json!({"error": "InternalError", "message": "Failed to update deactivation status"})), 278 ) 279 .into_response(); 280 } 281 } 282 if let Err(e) = tx.commit().await { 283 error!("Failed to commit transaction: {:?}", e); 284 return ( 285 StatusCode::INTERNAL_SERVER_ERROR, 286 Json(json!({"error": "InternalError"})), 287 ) 288 .into_response(); 289 } 290 if let Some(takedown) = &input.takedown { 291 let status = if takedown.apply { Some("takendown") } else { None }; 292 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, did, !takedown.apply, status).await { 293 warn!("Failed to sequence account event for takedown: {}", e); 294 } 295 } 296 if let Some(deactivated) = &input.deactivated { 297 let status = if deactivated.apply { Some("deactivated") } else { None }; 298 if let Err(e) = crate::api::repo::record::sequence_account_event(&state, did, !deactivated.apply, status).await { 299 warn!("Failed to sequence account event for deactivation: {}", e); 300 } 301 } 302 if let Ok(Some(handle)) = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 303 .fetch_optional(&state.db) 304 .await 305 { 306 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 307 } 308 return ( 309 StatusCode::OK, 310 Json(json!({ 311 "subject": input.subject, 312 "takedown": input.takedown.as_ref().map(|t| json!({ 313 "applied": t.apply, 314 "ref": t.r#ref 315 })), 316 "deactivated": input.deactivated.as_ref().map(|d| json!({ 317 "applied": d.apply 318 })) 319 })), 320 ) 321 .into_response(); 322 } 323 } 324 Some("com.atproto.repo.strongRef") => { 325 let uri = input.subject.get("uri").and_then(|u| u.as_str()); 326 if let Some(uri) = uri { 327 if let Some(takedown) = &input.takedown { 328 let takedown_ref = if takedown.apply { 329 takedown.r#ref.clone() 330 } else { 331 None 332 }; 333 if let Err(e) = sqlx::query!( 334 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2", 335 takedown_ref, 336 uri 337 ) 338 .execute(&state.db) 339 .await 340 { 341 error!("Failed to update record takedown status for {}: {:?}", uri, e); 342 return ( 343 StatusCode::INTERNAL_SERVER_ERROR, 344 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 345 ) 346 .into_response(); 347 } 348 } 349 return ( 350 StatusCode::OK, 351 Json(json!({ 352 "subject": input.subject, 353 "takedown": input.takedown.as_ref().map(|t| json!({ 354 "applied": t.apply, 355 "ref": t.r#ref 356 })) 357 })), 358 ) 359 .into_response(); 360 } 361 } 362 Some("com.atproto.admin.defs#repoBlobRef") => { 363 let cid = input.subject.get("cid").and_then(|c| c.as_str()); 364 if let Some(cid) = cid { 365 if let Some(takedown) = &input.takedown { 366 let takedown_ref = if takedown.apply { 367 takedown.r#ref.clone() 368 } else { 369 None 370 }; 371 if let Err(e) = sqlx::query!( 372 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2", 373 takedown_ref, 374 cid 375 ) 376 .execute(&state.db) 377 .await 378 { 379 error!("Failed to update blob takedown status for {}: {:?}", cid, e); 380 return ( 381 StatusCode::INTERNAL_SERVER_ERROR, 382 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 383 ) 384 .into_response(); 385 } 386 } 387 return ( 388 StatusCode::OK, 389 Json(json!({ 390 "subject": input.subject, 391 "takedown": input.takedown.as_ref().map(|t| json!({ 392 "applied": t.apply, 393 "ref": t.r#ref 394 })) 395 })), 396 ) 397 .into_response(); 398 } 399 } 400 _ => {} 401 } 402 ( 403 StatusCode::BAD_REQUEST, 404 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 405 ) 406 .into_response() 407}