this repo has no description
1use crate::api::error::ApiError; 2use crate::auth::BearerAuthAdmin; 3use crate::state::AppState; 4use crate::types::Did; 5use axum::{ 6 Json, 7 extract::{Query, State}, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, warn}; 14 15#[derive(Deserialize)] 16pub struct GetSubjectStatusParams { 17 pub did: Option<String>, 18 pub uri: Option<String>, 19 pub blob: Option<String>, 20} 21 22#[derive(Serialize)] 23pub struct SubjectStatus { 24 pub subject: serde_json::Value, 25 pub takedown: Option<StatusAttr>, 26 pub deactivated: Option<StatusAttr>, 27} 28 29#[derive(Serialize)] 30#[serde(rename_all = "camelCase")] 31pub struct StatusAttr { 32 pub applied: bool, 33 pub r#ref: Option<String>, 34} 35 36pub async fn get_subject_status( 37 State(state): State<AppState>, 38 _auth: BearerAuthAdmin, 39 Query(params): Query<GetSubjectStatusParams>, 40) -> Response { 41 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() { 42 return ApiError::InvalidRequest("Must provide did, uri, or blob".into()).into_response(); 43 } 44 if let Some(did) = &params.did { 45 let user = sqlx::query!( 46 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1", 47 did 48 ) 49 .fetch_optional(&state.db) 50 .await; 51 match user { 52 Ok(Some(row)) => { 53 let deactivated = row.deactivated_at.map(|_| StatusAttr { 54 applied: true, 55 r#ref: None, 56 }); 57 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 58 applied: true, 59 r#ref: Some(r.clone()), 60 }); 61 return ( 62 StatusCode::OK, 63 Json(SubjectStatus { 64 subject: json!({ 65 "$type": "com.atproto.admin.defs#repoRef", 66 "did": row.did 67 }), 68 takedown, 69 deactivated, 70 }), 71 ) 72 .into_response(); 73 } 74 Ok(None) => { 75 return ApiError::SubjectNotFound.into_response(); 76 } 77 Err(e) => { 78 error!("DB error in get_subject_status: {:?}", e); 79 return ApiError::InternalError(None).into_response(); 80 } 81 } 82 } 83 if let Some(uri) = &params.uri { 84 let record = sqlx::query!( 85 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1", 86 uri 87 ) 88 .fetch_optional(&state.db) 89 .await; 90 match record { 91 Ok(Some(row)) => { 92 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 93 applied: true, 94 r#ref: Some(r.clone()), 95 }); 96 return ( 97 StatusCode::OK, 98 Json(SubjectStatus { 99 subject: json!({ 100 "$type": "com.atproto.repo.strongRef", 101 "uri": uri, 102 "cid": uri 103 }), 104 takedown, 105 deactivated: None, 106 }), 107 ) 108 .into_response(); 109 } 110 Ok(None) => { 111 return ApiError::RecordNotFound.into_response(); 112 } 113 Err(e) => { 114 error!("DB error in get_subject_status: {:?}", e); 115 return ApiError::InternalError(None).into_response(); 116 } 117 } 118 } 119 if let Some(blob_cid) = &params.blob { 120 let did = match &params.did { 121 Some(d) => d, 122 None => { 123 return ApiError::InvalidRequest("Must provide a did to request blob state".into()) 124 .into_response(); 125 } 126 }; 127 let blob = sqlx::query!( 128 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1", 129 blob_cid 130 ) 131 .fetch_optional(&state.db) 132 .await; 133 match blob { 134 Ok(Some(row)) => { 135 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 136 applied: true, 137 r#ref: Some(r.clone()), 138 }); 139 return ( 140 StatusCode::OK, 141 Json(SubjectStatus { 142 subject: json!({ 143 "$type": "com.atproto.admin.defs#repoBlobRef", 144 "did": did, 145 "cid": row.cid 146 }), 147 takedown, 148 deactivated: None, 149 }), 150 ) 151 .into_response(); 152 } 153 Ok(None) => { 154 return ApiError::BlobNotFound(None).into_response(); 155 } 156 Err(e) => { 157 error!("DB error in get_subject_status: {:?}", e); 158 return ApiError::InternalError(None).into_response(); 159 } 160 } 161 } 162 ApiError::InvalidRequest("Invalid subject type".into()).into_response() 163} 164 165#[derive(Deserialize)] 166#[serde(rename_all = "camelCase")] 167pub struct UpdateSubjectStatusInput { 168 pub subject: serde_json::Value, 169 pub takedown: Option<StatusAttrInput>, 170 pub deactivated: Option<StatusAttrInput>, 171} 172 173#[derive(Deserialize)] 174pub struct StatusAttrInput { 175 pub applied: bool, 176 pub r#ref: Option<String>, 177} 178 179pub async fn update_subject_status( 180 State(state): State<AppState>, 181 _auth: BearerAuthAdmin, 182 Json(input): Json<UpdateSubjectStatusInput>, 183) -> Response { 184 let subject_type = input.subject.get("$type").and_then(|t| t.as_str()); 185 match subject_type { 186 Some("com.atproto.admin.defs#repoRef") => { 187 let did_str = input.subject.get("did").and_then(|d| d.as_str()); 188 if let Some(did_str) = did_str { 189 let did = Did::new_unchecked(did_str); 190 let mut tx = match state.db.begin().await { 191 Ok(tx) => tx, 192 Err(e) => { 193 error!("Failed to begin transaction: {:?}", e); 194 return ApiError::InternalError(None).into_response(); 195 } 196 }; 197 if let Some(takedown) = &input.takedown { 198 let takedown_ref = if takedown.applied { 199 takedown.r#ref.clone() 200 } else { 201 None 202 }; 203 if let Err(e) = sqlx::query!( 204 "UPDATE users SET takedown_ref = $1 WHERE did = $2", 205 takedown_ref, 206 did.as_str() 207 ) 208 .execute(&mut *tx) 209 .await 210 { 211 error!("Failed to update user takedown status for {}: {:?}", did, e); 212 return ApiError::InternalError(Some( 213 "Failed to update takedown status".into(), 214 )) 215 .into_response(); 216 } 217 } 218 if let Some(deactivated) = &input.deactivated { 219 let result = if deactivated.applied { 220 sqlx::query!( 221 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 222 did.as_str() 223 ) 224 .execute(&mut *tx) 225 .await 226 } else { 227 sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did.as_str()) 228 .execute(&mut *tx) 229 .await 230 }; 231 if let Err(e) = result { 232 error!( 233 "Failed to update user deactivation status for {}: {:?}", 234 did, e 235 ); 236 return ApiError::InternalError(Some( 237 "Failed to update deactivation status".into(), 238 )) 239 .into_response(); 240 } 241 } 242 if let Err(e) = tx.commit().await { 243 error!("Failed to commit transaction: {:?}", e); 244 return ApiError::InternalError(None).into_response(); 245 } 246 if let Some(takedown) = &input.takedown { 247 let status = if takedown.applied { 248 Some("takendown") 249 } else { 250 None 251 }; 252 if let Err(e) = crate::api::repo::record::sequence_account_event( 253 &state, 254 &did, 255 !takedown.applied, 256 status, 257 ) 258 .await 259 { 260 warn!("Failed to sequence account event for takedown: {}", e); 261 } 262 } 263 if let Some(deactivated) = &input.deactivated { 264 let status = if deactivated.applied { 265 Some("deactivated") 266 } else { 267 None 268 }; 269 if let Err(e) = crate::api::repo::record::sequence_account_event( 270 &state, 271 &did, 272 !deactivated.applied, 273 status, 274 ) 275 .await 276 { 277 warn!("Failed to sequence account event for deactivation: {}", e); 278 } 279 } 280 if let Ok(Some(handle)) = 281 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str()) 282 .fetch_optional(&state.db) 283 .await 284 { 285 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 286 } 287 return ( 288 StatusCode::OK, 289 Json(json!({ 290 "subject": input.subject, 291 "takedown": input.takedown.as_ref().map(|t| json!({ 292 "applied": t.applied, 293 "ref": t.r#ref 294 })), 295 "deactivated": input.deactivated.as_ref().map(|d| json!({ 296 "applied": d.applied 297 })) 298 })), 299 ) 300 .into_response(); 301 } 302 } 303 Some("com.atproto.repo.strongRef") => { 304 let uri = input.subject.get("uri").and_then(|u| u.as_str()); 305 if let Some(uri) = uri { 306 if let Some(takedown) = &input.takedown { 307 let takedown_ref = if takedown.applied { 308 takedown.r#ref.clone() 309 } else { 310 None 311 }; 312 if let Err(e) = sqlx::query!( 313 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2", 314 takedown_ref, 315 uri 316 ) 317 .execute(&state.db) 318 .await 319 { 320 error!( 321 "Failed to update record takedown status for {}: {:?}", 322 uri, e 323 ); 324 return ApiError::InternalError(Some( 325 "Failed to update takedown status".into(), 326 )) 327 .into_response(); 328 } 329 } 330 return ( 331 StatusCode::OK, 332 Json(json!({ 333 "subject": input.subject, 334 "takedown": input.takedown.as_ref().map(|t| json!({ 335 "applied": t.applied, 336 "ref": t.r#ref 337 })) 338 })), 339 ) 340 .into_response(); 341 } 342 } 343 Some("com.atproto.admin.defs#repoBlobRef") => { 344 let cid = input.subject.get("cid").and_then(|c| c.as_str()); 345 if let Some(cid) = cid { 346 if let Some(takedown) = &input.takedown { 347 let takedown_ref = if takedown.applied { 348 takedown.r#ref.clone() 349 } else { 350 None 351 }; 352 if let Err(e) = sqlx::query!( 353 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2", 354 takedown_ref, 355 cid 356 ) 357 .execute(&state.db) 358 .await 359 { 360 error!("Failed to update blob takedown status for {}: {:?}", cid, e); 361 return ApiError::InternalError(Some( 362 "Failed to update takedown status".into(), 363 )) 364 .into_response(); 365 } 366 } 367 return ( 368 StatusCode::OK, 369 Json(json!({ 370 "subject": input.subject, 371 "takedown": input.takedown.as_ref().map(|t| json!({ 372 "applied": t.applied, 373 "ref": t.r#ref 374 })) 375 })), 376 ) 377 .into_response(); 378 } 379 } 380 _ => {} 381 } 382 ApiError::InvalidRequest("Invalid subject type".into()).into_response() 383}