this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use serde::{Deserialize, Serialize}; 9use serde_json::json; 10use tracing::error; 11 12#[derive(Deserialize)] 13pub struct GetSubjectStatusParams { 14 pub did: Option<String>, 15 pub uri: Option<String>, 16 pub blob: Option<String>, 17} 18 19#[derive(Serialize)] 20pub struct SubjectStatus { 21 pub subject: serde_json::Value, 22 pub takedown: Option<StatusAttr>, 23 pub deactivated: Option<StatusAttr>, 24} 25 26#[derive(Serialize)] 27#[serde(rename_all = "camelCase")] 28pub struct StatusAttr { 29 pub applied: bool, 30 pub r#ref: Option<String>, 31} 32 33pub async fn get_subject_status( 34 State(state): State<AppState>, 35 headers: axum::http::HeaderMap, 36 Query(params): Query<GetSubjectStatusParams>, 37) -> Response { 38 let auth_header = headers.get("Authorization"); 39 if auth_header.is_none() { 40 return ( 41 StatusCode::UNAUTHORIZED, 42 Json(json!({"error": "AuthenticationRequired"})), 43 ) 44 .into_response(); 45 } 46 47 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() { 48 return ( 49 StatusCode::BAD_REQUEST, 50 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})), 51 ) 52 .into_response(); 53 } 54 55 if let Some(did) = &params.did { 56 let user = sqlx::query!( 57 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1", 58 did 59 ) 60 .fetch_optional(&state.db) 61 .await; 62 63 match user { 64 Ok(Some(row)) => { 65 let deactivated = row.deactivated_at.map(|_| StatusAttr { 66 applied: true, 67 r#ref: None, 68 }); 69 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 70 applied: true, 71 r#ref: Some(r.clone()), 72 }); 73 74 return ( 75 StatusCode::OK, 76 Json(SubjectStatus { 77 subject: json!({ 78 "$type": "com.atproto.admin.defs#repoRef", 79 "did": row.did 80 }), 81 takedown, 82 deactivated, 83 }), 84 ) 85 .into_response(); 86 } 87 Ok(None) => { 88 return ( 89 StatusCode::NOT_FOUND, 90 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 91 ) 92 .into_response(); 93 } 94 Err(e) => { 95 error!("DB error in get_subject_status: {:?}", e); 96 return ( 97 StatusCode::INTERNAL_SERVER_ERROR, 98 Json(json!({"error": "InternalError"})), 99 ) 100 .into_response(); 101 } 102 } 103 } 104 105 if let Some(uri) = &params.uri { 106 let record = sqlx::query!( 107 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1", 108 uri 109 ) 110 .fetch_optional(&state.db) 111 .await; 112 113 match record { 114 Ok(Some(row)) => { 115 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 116 applied: true, 117 r#ref: Some(r.clone()), 118 }); 119 120 return ( 121 StatusCode::OK, 122 Json(SubjectStatus { 123 subject: json!({ 124 "$type": "com.atproto.repo.strongRef", 125 "uri": uri, 126 "cid": uri 127 }), 128 takedown, 129 deactivated: None, 130 }), 131 ) 132 .into_response(); 133 } 134 Ok(None) => { 135 return ( 136 StatusCode::NOT_FOUND, 137 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 138 ) 139 .into_response(); 140 } 141 Err(e) => { 142 error!("DB error in get_subject_status: {:?}", e); 143 return ( 144 StatusCode::INTERNAL_SERVER_ERROR, 145 Json(json!({"error": "InternalError"})), 146 ) 147 .into_response(); 148 } 149 } 150 } 151 152 if let Some(blob_cid) = &params.blob { 153 let blob = sqlx::query!("SELECT cid, takedown_ref FROM blobs WHERE cid = $1", blob_cid) 154 .fetch_optional(&state.db) 155 .await; 156 157 match blob { 158 Ok(Some(row)) => { 159 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 160 applied: true, 161 r#ref: Some(r.clone()), 162 }); 163 164 return ( 165 StatusCode::OK, 166 Json(SubjectStatus { 167 subject: json!({ 168 "$type": "com.atproto.admin.defs#repoBlobRef", 169 "did": "", 170 "cid": row.cid 171 }), 172 takedown, 173 deactivated: None, 174 }), 175 ) 176 .into_response(); 177 } 178 Ok(None) => { 179 return ( 180 StatusCode::NOT_FOUND, 181 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 182 ) 183 .into_response(); 184 } 185 Err(e) => { 186 error!("DB error in get_subject_status: {:?}", e); 187 return ( 188 StatusCode::INTERNAL_SERVER_ERROR, 189 Json(json!({"error": "InternalError"})), 190 ) 191 .into_response(); 192 } 193 } 194 } 195 196 ( 197 StatusCode::BAD_REQUEST, 198 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 199 ) 200 .into_response() 201} 202 203#[derive(Deserialize)] 204#[serde(rename_all = "camelCase")] 205pub struct UpdateSubjectStatusInput { 206 pub subject: serde_json::Value, 207 pub takedown: Option<StatusAttrInput>, 208 pub deactivated: Option<StatusAttrInput>, 209} 210 211#[derive(Deserialize)] 212pub struct StatusAttrInput { 213 pub apply: bool, 214 pub r#ref: Option<String>, 215} 216 217pub async fn update_subject_status( 218 State(state): State<AppState>, 219 headers: axum::http::HeaderMap, 220 Json(input): Json<UpdateSubjectStatusInput>, 221) -> Response { 222 let auth_header = headers.get("Authorization"); 223 if auth_header.is_none() { 224 return ( 225 StatusCode::UNAUTHORIZED, 226 Json(json!({"error": "AuthenticationRequired"})), 227 ) 228 .into_response(); 229 } 230 231 let subject_type = input.subject.get("$type").and_then(|t| t.as_str()); 232 233 match subject_type { 234 Some("com.atproto.admin.defs#repoRef") => { 235 let did = input.subject.get("did").and_then(|d| d.as_str()); 236 if let Some(did) = did { 237 let mut tx = match state.db.begin().await { 238 Ok(tx) => tx, 239 Err(e) => { 240 error!("Failed to begin transaction: {:?}", e); 241 return ( 242 StatusCode::INTERNAL_SERVER_ERROR, 243 Json(json!({"error": "InternalError"})), 244 ) 245 .into_response(); 246 } 247 }; 248 249 if let Some(takedown) = &input.takedown { 250 let takedown_ref = if takedown.apply { 251 takedown.r#ref.clone() 252 } else { 253 None 254 }; 255 if let Err(e) = sqlx::query!( 256 "UPDATE users SET takedown_ref = $1 WHERE did = $2", 257 takedown_ref, 258 did 259 ) 260 .execute(&mut *tx) 261 .await 262 { 263 error!("Failed to update user takedown status for {}: {:?}", did, e); 264 return ( 265 StatusCode::INTERNAL_SERVER_ERROR, 266 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 267 ) 268 .into_response(); 269 } 270 } 271 272 if let Some(deactivated) = &input.deactivated { 273 let result = if deactivated.apply { 274 sqlx::query!( 275 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 276 did 277 ) 278 .execute(&mut *tx) 279 .await 280 } else { 281 sqlx::query!( 282 "UPDATE users SET deactivated_at = NULL WHERE did = $1", 283 did 284 ) 285 .execute(&mut *tx) 286 .await 287 }; 288 289 if let Err(e) = result { 290 error!("Failed to update user deactivation status for {}: {:?}", did, e); 291 return ( 292 StatusCode::INTERNAL_SERVER_ERROR, 293 Json(json!({"error": "InternalError", "message": "Failed to update deactivation status"})), 294 ) 295 .into_response(); 296 } 297 } 298 299 if let Err(e) = tx.commit().await { 300 error!("Failed to commit transaction: {:?}", e); 301 return ( 302 StatusCode::INTERNAL_SERVER_ERROR, 303 Json(json!({"error": "InternalError"})), 304 ) 305 .into_response(); 306 } 307 308 if let Ok(Some(handle)) = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 309 .fetch_optional(&state.db) 310 .await 311 { 312 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 313 } 314 315 return ( 316 StatusCode::OK, 317 Json(json!({ 318 "subject": input.subject, 319 "takedown": input.takedown.as_ref().map(|t| json!({ 320 "applied": t.apply, 321 "ref": t.r#ref 322 })), 323 "deactivated": input.deactivated.as_ref().map(|d| json!({ 324 "applied": d.apply 325 })) 326 })), 327 ) 328 .into_response(); 329 } 330 } 331 Some("com.atproto.repo.strongRef") => { 332 let uri = input.subject.get("uri").and_then(|u| u.as_str()); 333 if let Some(uri) = uri { 334 if let Some(takedown) = &input.takedown { 335 let takedown_ref = if takedown.apply { 336 takedown.r#ref.clone() 337 } else { 338 None 339 }; 340 if let Err(e) = sqlx::query!( 341 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2", 342 takedown_ref, 343 uri 344 ) 345 .execute(&state.db) 346 .await 347 { 348 error!("Failed to update record takedown status for {}: {:?}", uri, e); 349 return ( 350 StatusCode::INTERNAL_SERVER_ERROR, 351 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 352 ) 353 .into_response(); 354 } 355 } 356 357 return ( 358 StatusCode::OK, 359 Json(json!({ 360 "subject": input.subject, 361 "takedown": input.takedown.as_ref().map(|t| json!({ 362 "applied": t.apply, 363 "ref": t.r#ref 364 })) 365 })), 366 ) 367 .into_response(); 368 } 369 } 370 Some("com.atproto.admin.defs#repoBlobRef") => { 371 let cid = input.subject.get("cid").and_then(|c| c.as_str()); 372 if let Some(cid) = cid { 373 if let Some(takedown) = &input.takedown { 374 let takedown_ref = if takedown.apply { 375 takedown.r#ref.clone() 376 } else { 377 None 378 }; 379 if let Err(e) = sqlx::query!( 380 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2", 381 takedown_ref, 382 cid 383 ) 384 .execute(&state.db) 385 .await 386 { 387 error!("Failed to update blob takedown status for {}: {:?}", cid, e); 388 return ( 389 StatusCode::INTERNAL_SERVER_ERROR, 390 Json(json!({"error": "InternalError", "message": "Failed to update takedown status"})), 391 ) 392 .into_response(); 393 } 394 } 395 396 return ( 397 StatusCode::OK, 398 Json(json!({ 399 "subject": input.subject, 400 "takedown": input.takedown.as_ref().map(|t| json!({ 401 "applied": t.apply, 402 "ref": t.r#ref 403 })) 404 })), 405 ) 406 .into_response(); 407 } 408 } 409 _ => {} 410 } 411 412 ( 413 StatusCode::BAD_REQUEST, 414 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 415 ) 416 .into_response() 417}