this repo has no description
1use crate::api::error::ApiError; 2use crate::auth::BearerAuthAdmin; 3use crate::state::AppState; 4use axum::{ 5 Json, 6 extract::{Query, State}, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9}; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use tracing::{error, warn}; 13 14#[derive(Deserialize)] 15pub struct GetSubjectStatusParams { 16 pub did: Option<String>, 17 pub uri: Option<String>, 18 pub blob: Option<String>, 19} 20 21#[derive(Serialize)] 22pub struct SubjectStatus { 23 pub subject: serde_json::Value, 24 pub takedown: Option<StatusAttr>, 25 pub deactivated: Option<StatusAttr>, 26} 27 28#[derive(Serialize)] 29#[serde(rename_all = "camelCase")] 30pub struct StatusAttr { 31 pub applied: bool, 32 pub r#ref: Option<String>, 33} 34 35pub async fn get_subject_status( 36 State(state): State<AppState>, 37 _auth: BearerAuthAdmin, 38 Query(params): Query<GetSubjectStatusParams>, 39) -> Response { 40 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() { 41 return ApiError::InvalidRequest("Must provide did, uri, or blob".into()).into_response(); 42 } 43 if let Some(did) = &params.did { 44 let user = sqlx::query!( 45 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1", 46 did 47 ) 48 .fetch_optional(&state.db) 49 .await; 50 match user { 51 Ok(Some(row)) => { 52 let deactivated = row.deactivated_at.map(|_| StatusAttr { 53 applied: true, 54 r#ref: None, 55 }); 56 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 57 applied: true, 58 r#ref: Some(r.clone()), 59 }); 60 return ( 61 StatusCode::OK, 62 Json(SubjectStatus { 63 subject: json!({ 64 "$type": "com.atproto.admin.defs#repoRef", 65 "did": row.did 66 }), 67 takedown, 68 deactivated, 69 }), 70 ) 71 .into_response(); 72 } 73 Ok(None) => { 74 return ApiError::SubjectNotFound.into_response(); 75 } 76 Err(e) => { 77 error!("DB error in get_subject_status: {:?}", e); 78 return ApiError::InternalError(None).into_response(); 79 } 80 } 81 } 82 if let Some(uri) = &params.uri { 83 let record = sqlx::query!( 84 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1", 85 uri 86 ) 87 .fetch_optional(&state.db) 88 .await; 89 match record { 90 Ok(Some(row)) => { 91 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 92 applied: true, 93 r#ref: Some(r.clone()), 94 }); 95 return ( 96 StatusCode::OK, 97 Json(SubjectStatus { 98 subject: json!({ 99 "$type": "com.atproto.repo.strongRef", 100 "uri": uri, 101 "cid": uri 102 }), 103 takedown, 104 deactivated: None, 105 }), 106 ) 107 .into_response(); 108 } 109 Ok(None) => { 110 return ApiError::RecordNotFound.into_response(); 111 } 112 Err(e) => { 113 error!("DB error in get_subject_status: {:?}", e); 114 return ApiError::InternalError(None).into_response(); 115 } 116 } 117 } 118 if let Some(blob_cid) = &params.blob { 119 let did = match &params.did { 120 Some(d) => d, 121 None => { 122 return ApiError::InvalidRequest( 123 "Must provide a did to request blob state".into(), 124 ) 125 .into_response(); 126 } 127 }; 128 let blob = sqlx::query!( 129 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1", 130 blob_cid 131 ) 132 .fetch_optional(&state.db) 133 .await; 134 match blob { 135 Ok(Some(row)) => { 136 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 137 applied: true, 138 r#ref: Some(r.clone()), 139 }); 140 return ( 141 StatusCode::OK, 142 Json(SubjectStatus { 143 subject: json!({ 144 "$type": "com.atproto.admin.defs#repoBlobRef", 145 "did": did, 146 "cid": row.cid 147 }), 148 takedown, 149 deactivated: None, 150 }), 151 ) 152 .into_response(); 153 } 154 Ok(None) => { 155 return ApiError::BlobNotFound(None).into_response(); 156 } 157 Err(e) => { 158 error!("DB error in get_subject_status: {:?}", e); 159 return ApiError::InternalError(None).into_response(); 160 } 161 } 162 } 163 ApiError::InvalidRequest("Invalid subject type".into()).into_response() 164} 165 166#[derive(Deserialize)] 167#[serde(rename_all = "camelCase")] 168pub struct UpdateSubjectStatusInput { 169 pub subject: serde_json::Value, 170 pub takedown: Option<StatusAttrInput>, 171 pub deactivated: Option<StatusAttrInput>, 172} 173 174#[derive(Deserialize)] 175pub struct StatusAttrInput { 176 pub applied: bool, 177 pub r#ref: Option<String>, 178} 179 180pub async fn update_subject_status( 181 State(state): State<AppState>, 182 _auth: BearerAuthAdmin, 183 Json(input): Json<UpdateSubjectStatusInput>, 184) -> Response { 185 let subject_type = input.subject.get("$type").and_then(|t| t.as_str()); 186 match subject_type { 187 Some("com.atproto.admin.defs#repoRef") => { 188 let did = input.subject.get("did").and_then(|d| d.as_str()); 189 if let Some(did) = did { 190 let mut tx = match state.db.begin().await { 191 Ok(tx) => tx, 192 Err(e) => { 193 error!("Failed to begin transaction: {:?}", e); 194 return ApiError::InternalError(None).into_response(); 195 } 196 }; 197 if let Some(takedown) = &input.takedown { 198 let takedown_ref = if takedown.applied { 199 takedown.r#ref.clone() 200 } else { 201 None 202 }; 203 if let Err(e) = sqlx::query!( 204 "UPDATE users SET takedown_ref = $1 WHERE did = $2", 205 takedown_ref, 206 did 207 ) 208 .execute(&mut *tx) 209 .await 210 { 211 error!("Failed to update user takedown status for {}: {:?}", did, e); 212 return ApiError::InternalError(Some( 213 "Failed to update takedown status".into(), 214 )) 215 .into_response(); 216 } 217 } 218 if let Some(deactivated) = &input.deactivated { 219 let result = if deactivated.applied { 220 sqlx::query!( 221 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 222 did 223 ) 224 .execute(&mut *tx) 225 .await 226 } else { 227 sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did) 228 .execute(&mut *tx) 229 .await 230 }; 231 if let Err(e) = result { 232 error!( 233 "Failed to update user deactivation status for {}: {:?}", 234 did, e 235 ); 236 return ApiError::InternalError(Some( 237 "Failed to update deactivation status".into(), 238 )) 239 .into_response(); 240 } 241 } 242 if let Err(e) = tx.commit().await { 243 error!("Failed to commit transaction: {:?}", e); 244 return ApiError::InternalError(None).into_response(); 245 } 246 if let Some(takedown) = &input.takedown { 247 let status = if takedown.applied { 248 Some("takendown") 249 } else { 250 None 251 }; 252 if let Err(e) = crate::api::repo::record::sequence_account_event( 253 &state, 254 did, 255 !takedown.applied, 256 status, 257 ) 258 .await 259 { 260 warn!("Failed to sequence account event for takedown: {}", e); 261 } 262 } 263 if let Some(deactivated) = &input.deactivated { 264 let status = if deactivated.applied { 265 Some("deactivated") 266 } else { 267 None 268 }; 269 if let Err(e) = crate::api::repo::record::sequence_account_event( 270 &state, 271 did, 272 !deactivated.applied, 273 status, 274 ) 275 .await 276 { 277 warn!("Failed to sequence account event for deactivation: {}", e); 278 } 279 } 280 if let Ok(Some(handle)) = 281 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 282 .fetch_optional(&state.db) 283 .await 284 { 285 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 286 } 287 return ( 288 StatusCode::OK, 289 Json(json!({ 290 "subject": input.subject, 291 "takedown": input.takedown.as_ref().map(|t| json!({ 292 "applied": t.applied, 293 "ref": t.r#ref 294 })), 295 "deactivated": input.deactivated.as_ref().map(|d| json!({ 296 "applied": d.applied 297 })) 298 })), 299 ) 300 .into_response(); 301 } 302 } 303 Some("com.atproto.repo.strongRef") => { 304 let uri = input.subject.get("uri").and_then(|u| u.as_str()); 305 if let Some(uri) = uri { 306 if let Some(takedown) = &input.takedown { 307 let takedown_ref = if takedown.applied { 308 takedown.r#ref.clone() 309 } else { 310 None 311 }; 312 if let Err(e) = sqlx::query!( 313 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2", 314 takedown_ref, 315 uri 316 ) 317 .execute(&state.db) 318 .await 319 { 320 error!( 321 "Failed to update record takedown status for {}: {:?}", 322 uri, e 323 ); 324 return ApiError::InternalError(Some( 325 "Failed to update takedown status".into(), 326 )) 327 .into_response(); 328 } 329 } 330 return ( 331 StatusCode::OK, 332 Json(json!({ 333 "subject": input.subject, 334 "takedown": input.takedown.as_ref().map(|t| json!({ 335 "applied": t.applied, 336 "ref": t.r#ref 337 })) 338 })), 339 ) 340 .into_response(); 341 } 342 } 343 Some("com.atproto.admin.defs#repoBlobRef") => { 344 let cid = input.subject.get("cid").and_then(|c| c.as_str()); 345 if let Some(cid) = cid { 346 if let Some(takedown) = &input.takedown { 347 let takedown_ref = if takedown.applied { 348 takedown.r#ref.clone() 349 } else { 350 None 351 }; 352 if let Err(e) = sqlx::query!( 353 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2", 354 takedown_ref, 355 cid 356 ) 357 .execute(&state.db) 358 .await 359 { 360 error!("Failed to update blob takedown status for {}: {:?}", cid, e); 361 return ApiError::InternalError(Some( 362 "Failed to update takedown status".into(), 363 )) 364 .into_response(); 365 } 366 } 367 return ( 368 StatusCode::OK, 369 Json(json!({ 370 "subject": input.subject, 371 "takedown": input.takedown.as_ref().map(|t| json!({ 372 "applied": t.applied, 373 "ref": t.r#ref 374 })) 375 })), 376 ) 377 .into_response(); 378 } 379 } 380 _ => {} 381 } 382 ApiError::InvalidRequest("Invalid subject type".into()).into_response() 383}