this repo has no description
1use crate::api::error::ApiError; 2use crate::auth::BearerAuthAdmin; 3use crate::state::AppState; 4use axum::{ 5 Json, 6 extract::{Query, State}, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9}; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use tracing::{error, warn}; 13 14#[derive(Deserialize)] 15pub struct GetSubjectStatusParams { 16 pub did: Option<String>, 17 pub uri: Option<String>, 18 pub blob: Option<String>, 19} 20 21#[derive(Serialize)] 22pub struct SubjectStatus { 23 pub subject: serde_json::Value, 24 pub takedown: Option<StatusAttr>, 25 pub deactivated: Option<StatusAttr>, 26} 27 28#[derive(Serialize)] 29#[serde(rename_all = "camelCase")] 30pub struct StatusAttr { 31 pub applied: bool, 32 pub r#ref: Option<String>, 33} 34 35pub async fn get_subject_status( 36 State(state): State<AppState>, 37 _auth: BearerAuthAdmin, 38 Query(params): Query<GetSubjectStatusParams>, 39) -> Response { 40 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() { 41 return ApiError::InvalidRequest("Must provide did, uri, or blob".into()).into_response(); 42 } 43 if let Some(did) = &params.did { 44 let user = sqlx::query!( 45 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1", 46 did 47 ) 48 .fetch_optional(&state.db) 49 .await; 50 match user { 51 Ok(Some(row)) => { 52 let deactivated = row.deactivated_at.map(|_| StatusAttr { 53 applied: true, 54 r#ref: None, 55 }); 56 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 57 applied: true, 58 r#ref: Some(r.clone()), 59 }); 60 return ( 61 StatusCode::OK, 62 Json(SubjectStatus { 63 subject: json!({ 64 "$type": "com.atproto.admin.defs#repoRef", 65 "did": row.did 66 }), 67 takedown, 68 deactivated, 69 }), 70 ) 71 .into_response(); 72 } 73 Ok(None) => { 74 return ApiError::SubjectNotFound.into_response(); 75 } 76 Err(e) => { 77 error!("DB error in get_subject_status: {:?}", e); 78 return ApiError::InternalError(None).into_response(); 79 } 80 } 81 } 82 if let Some(uri) = &params.uri { 83 let record = sqlx::query!( 84 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1", 85 uri 86 ) 87 .fetch_optional(&state.db) 88 .await; 89 match record { 90 Ok(Some(row)) => { 91 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 92 applied: true, 93 r#ref: Some(r.clone()), 94 }); 95 return ( 96 StatusCode::OK, 97 Json(SubjectStatus { 98 subject: json!({ 99 "$type": "com.atproto.repo.strongRef", 100 "uri": uri, 101 "cid": uri 102 }), 103 takedown, 104 deactivated: None, 105 }), 106 ) 107 .into_response(); 108 } 109 Ok(None) => { 110 return ApiError::RecordNotFound.into_response(); 111 } 112 Err(e) => { 113 error!("DB error in get_subject_status: {:?}", e); 114 return ApiError::InternalError(None).into_response(); 115 } 116 } 117 } 118 if let Some(blob_cid) = &params.blob { 119 let did = match &params.did { 120 Some(d) => d, 121 None => { 122 return ApiError::InvalidRequest("Must provide a did to request blob state".into()) 123 .into_response(); 124 } 125 }; 126 let blob = sqlx::query!( 127 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1", 128 blob_cid 129 ) 130 .fetch_optional(&state.db) 131 .await; 132 match blob { 133 Ok(Some(row)) => { 134 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 135 applied: true, 136 r#ref: Some(r.clone()), 137 }); 138 return ( 139 StatusCode::OK, 140 Json(SubjectStatus { 141 subject: json!({ 142 "$type": "com.atproto.admin.defs#repoBlobRef", 143 "did": did, 144 "cid": row.cid 145 }), 146 takedown, 147 deactivated: None, 148 }), 149 ) 150 .into_response(); 151 } 152 Ok(None) => { 153 return ApiError::BlobNotFound(None).into_response(); 154 } 155 Err(e) => { 156 error!("DB error in get_subject_status: {:?}", e); 157 return ApiError::InternalError(None).into_response(); 158 } 159 } 160 } 161 ApiError::InvalidRequest("Invalid subject type".into()).into_response() 162} 163 164#[derive(Deserialize)] 165#[serde(rename_all = "camelCase")] 166pub struct UpdateSubjectStatusInput { 167 pub subject: serde_json::Value, 168 pub takedown: Option<StatusAttrInput>, 169 pub deactivated: Option<StatusAttrInput>, 170} 171 172#[derive(Deserialize)] 173pub struct StatusAttrInput { 174 pub applied: bool, 175 pub r#ref: Option<String>, 176} 177 178pub async fn update_subject_status( 179 State(state): State<AppState>, 180 _auth: BearerAuthAdmin, 181 Json(input): Json<UpdateSubjectStatusInput>, 182) -> Response { 183 let subject_type = input.subject.get("$type").and_then(|t| t.as_str()); 184 match subject_type { 185 Some("com.atproto.admin.defs#repoRef") => { 186 let did = input.subject.get("did").and_then(|d| d.as_str()); 187 if let Some(did) = did { 188 let mut tx = match state.db.begin().await { 189 Ok(tx) => tx, 190 Err(e) => { 191 error!("Failed to begin transaction: {:?}", e); 192 return ApiError::InternalError(None).into_response(); 193 } 194 }; 195 if let Some(takedown) = &input.takedown { 196 let takedown_ref = if takedown.applied { 197 takedown.r#ref.clone() 198 } else { 199 None 200 }; 201 if let Err(e) = sqlx::query!( 202 "UPDATE users SET takedown_ref = $1 WHERE did = $2", 203 takedown_ref, 204 did 205 ) 206 .execute(&mut *tx) 207 .await 208 { 209 error!("Failed to update user takedown status for {}: {:?}", did, e); 210 return ApiError::InternalError(Some( 211 "Failed to update takedown status".into(), 212 )) 213 .into_response(); 214 } 215 } 216 if let Some(deactivated) = &input.deactivated { 217 let result = if deactivated.applied { 218 sqlx::query!( 219 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 220 did 221 ) 222 .execute(&mut *tx) 223 .await 224 } else { 225 sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 226 .execute(&mut *tx) 227 .await 228 }; 229 if let Err(e) = result { 230 error!( 231 "Failed to update user deactivation status for {}: {:?}", 232 did, e 233 ); 234 return ApiError::InternalError(Some( 235 "Failed to update deactivation status".into(), 236 )) 237 .into_response(); 238 } 239 } 240 if let Err(e) = tx.commit().await { 241 error!("Failed to commit transaction: {:?}", e); 242 return ApiError::InternalError(None).into_response(); 243 } 244 if let Some(takedown) = &input.takedown { 245 let status = if takedown.applied { 246 Some("takendown") 247 } else { 248 None 249 }; 250 if let Err(e) = crate::api::repo::record::sequence_account_event( 251 &state, 252 did, 253 !takedown.applied, 254 status, 255 ) 256 .await 257 { 258 warn!("Failed to sequence account event for takedown: {}", e); 259 } 260 } 261 if let Some(deactivated) = &input.deactivated { 262 let status = if deactivated.applied { 263 Some("deactivated") 264 } else { 265 None 266 }; 267 if let Err(e) = crate::api::repo::record::sequence_account_event( 268 &state, 269 did, 270 !deactivated.applied, 271 status, 272 ) 273 .await 274 { 275 warn!("Failed to sequence account event for deactivation: {}", e); 276 } 277 } 278 if let Ok(Some(handle)) = 279 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 280 .fetch_optional(&state.db) 281 .await 282 { 283 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 284 } 285 return ( 286 StatusCode::OK, 287 Json(json!({ 288 "subject": input.subject, 289 "takedown": input.takedown.as_ref().map(|t| json!({ 290 "applied": t.applied, 291 "ref": t.r#ref 292 })), 293 "deactivated": input.deactivated.as_ref().map(|d| json!({ 294 "applied": d.applied 295 })) 296 })), 297 ) 298 .into_response(); 299 } 300 } 301 Some("com.atproto.repo.strongRef") => { 302 let uri = input.subject.get("uri").and_then(|u| u.as_str()); 303 if let Some(uri) = uri { 304 if let Some(takedown) = &input.takedown { 305 let takedown_ref = if takedown.applied { 306 takedown.r#ref.clone() 307 } else { 308 None 309 }; 310 if let Err(e) = sqlx::query!( 311 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2", 312 takedown_ref, 313 uri 314 ) 315 .execute(&state.db) 316 .await 317 { 318 error!( 319 "Failed to update record takedown status for {}: {:?}", 320 uri, e 321 ); 322 return ApiError::InternalError(Some( 323 "Failed to update takedown status".into(), 324 )) 325 .into_response(); 326 } 327 } 328 return ( 329 StatusCode::OK, 330 Json(json!({ 331 "subject": input.subject, 332 "takedown": input.takedown.as_ref().map(|t| json!({ 333 "applied": t.applied, 334 "ref": t.r#ref 335 })) 336 })), 337 ) 338 .into_response(); 339 } 340 } 341 Some("com.atproto.admin.defs#repoBlobRef") => { 342 let cid = input.subject.get("cid").and_then(|c| c.as_str()); 343 if let Some(cid) = cid { 344 if let Some(takedown) = &input.takedown { 345 let takedown_ref = if takedown.applied { 346 takedown.r#ref.clone() 347 } else { 348 None 349 }; 350 if let Err(e) = sqlx::query!( 351 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2", 352 takedown_ref, 353 cid 354 ) 355 .execute(&state.db) 356 .await 357 { 358 error!("Failed to update blob takedown status for {}: {:?}", cid, e); 359 return ApiError::InternalError(Some( 360 "Failed to update takedown status".into(), 361 )) 362 .into_response(); 363 } 364 } 365 return ( 366 StatusCode::OK, 367 Json(json!({ 368 "subject": input.subject, 369 "takedown": input.takedown.as_ref().map(|t| json!({ 370 "applied": t.applied, 371 "ref": t.r#ref 372 })) 373 })), 374 ) 375 .into_response(); 376 } 377 } 378 _ => {} 379 } 380 ApiError::InvalidRequest("Invalid subject type".into()).into_response() 381}