this repo has no description
at main 14 kB view raw
1use crate::api::error::ApiError; 2use crate::auth::BearerAuthAdmin; 3use crate::state::AppState; 4use crate::types::Did; 5use axum::{ 6 Json, 7 extract::{Query, State}, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use tracing::{error, warn}; 14 15#[derive(Deserialize)] 16pub struct GetSubjectStatusParams { 17 pub did: Option<String>, 18 pub uri: Option<String>, 19 pub blob: Option<String>, 20} 21 22#[derive(Serialize)] 23pub struct SubjectStatus { 24 pub subject: serde_json::Value, 25 pub takedown: Option<StatusAttr>, 26 pub deactivated: Option<StatusAttr>, 27} 28 29#[derive(Serialize)] 30#[serde(rename_all = "camelCase")] 31pub struct StatusAttr { 32 pub applied: bool, 33 pub r#ref: Option<String>, 34} 35 36pub async fn get_subject_status( 37 State(state): State<AppState>, 38 _auth: BearerAuthAdmin, 39 Query(params): Query<GetSubjectStatusParams>, 40) -> Response { 41 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() { 42 return ApiError::InvalidRequest("Must provide did, uri, or blob".into()).into_response(); 43 } 44 if let Some(did) = &params.did { 45 let user = sqlx::query!( 46 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1", 47 did 48 ) 49 .fetch_optional(&state.db) 50 .await; 51 match user { 52 Ok(Some(row)) => { 53 let deactivated = row.deactivated_at.map(|_| StatusAttr { 54 applied: true, 55 r#ref: None, 56 }); 57 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 58 applied: true, 59 r#ref: Some(r.clone()), 60 }); 61 return ( 62 StatusCode::OK, 63 Json(SubjectStatus { 64 subject: json!({ 65 "$type": "com.atproto.admin.defs#repoRef", 66 "did": row.did 67 }), 68 takedown, 69 deactivated, 70 }), 71 ) 72 .into_response(); 73 } 74 Ok(None) => { 75 return ApiError::SubjectNotFound.into_response(); 76 } 77 Err(e) => { 78 error!("DB error in get_subject_status: {:?}", e); 79 return ApiError::InternalError(None).into_response(); 80 } 81 } 82 } 83 if let Some(uri) = &params.uri { 84 let record = sqlx::query!( 85 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1", 86 uri 87 ) 88 .fetch_optional(&state.db) 89 .await; 90 match record { 91 Ok(Some(row)) => { 92 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 93 applied: true, 94 r#ref: Some(r.clone()), 95 }); 96 return ( 97 StatusCode::OK, 98 Json(SubjectStatus { 99 subject: json!({ 100 "$type": "com.atproto.repo.strongRef", 101 "uri": uri, 102 "cid": uri 103 }), 104 takedown, 105 deactivated: None, 106 }), 107 ) 108 .into_response(); 109 } 110 Ok(None) => { 111 return ApiError::RecordNotFound.into_response(); 112 } 113 Err(e) => { 114 error!("DB error in get_subject_status: {:?}", e); 115 return ApiError::InternalError(None).into_response(); 116 } 117 } 118 } 119 if let Some(blob_cid) = &params.blob { 120 let did = match &params.did { 121 Some(d) => d, 122 None => { 123 return ApiError::InvalidRequest("Must provide a did to request blob state".into()) 124 .into_response(); 125 } 126 }; 127 let blob = sqlx::query!( 128 "SELECT cid, takedown_ref FROM blobs WHERE cid = $1", 129 blob_cid 130 ) 131 .fetch_optional(&state.db) 132 .await; 133 match blob { 134 Ok(Some(row)) => { 135 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 136 applied: true, 137 r#ref: Some(r.clone()), 138 }); 139 return ( 140 StatusCode::OK, 141 Json(SubjectStatus { 142 subject: json!({ 143 "$type": "com.atproto.admin.defs#repoBlobRef", 144 "did": did, 145 "cid": row.cid 146 }), 147 takedown, 148 deactivated: None, 149 }), 150 ) 151 .into_response(); 152 } 153 Ok(None) => { 154 return ApiError::BlobNotFound(None).into_response(); 155 } 156 Err(e) => { 157 error!("DB error in get_subject_status: {:?}", e); 158 return ApiError::InternalError(None).into_response(); 159 } 160 } 161 } 162 ApiError::InvalidRequest("Invalid subject type".into()).into_response() 163} 164 165#[derive(Deserialize)] 166#[serde(rename_all = "camelCase")] 167pub struct UpdateSubjectStatusInput { 168 pub subject: serde_json::Value, 169 pub takedown: Option<StatusAttrInput>, 170 pub deactivated: Option<StatusAttrInput>, 171} 172 173#[derive(Deserialize)] 174pub struct StatusAttrInput { 175 pub applied: bool, 176 pub r#ref: Option<String>, 177} 178 179pub async fn update_subject_status( 180 State(state): State<AppState>, 181 _auth: BearerAuthAdmin, 182 Json(input): Json<UpdateSubjectStatusInput>, 183) -> Response { 184 let subject_type = input.subject.get("$type").and_then(|t| t.as_str()); 185 match subject_type { 186 Some("com.atproto.admin.defs#repoRef") => { 187 let did_str = input.subject.get("did").and_then(|d| d.as_str()); 188 if let Some(did_str) = did_str { 189 let did = Did::new_unchecked(did_str); 190 let mut tx = match state.db.begin().await { 191 Ok(tx) => tx, 192 Err(e) => { 193 error!("Failed to begin transaction: {:?}", e); 194 return ApiError::InternalError(None).into_response(); 195 } 196 }; 197 if let Some(takedown) = &input.takedown { 198 let takedown_ref = if takedown.applied { 199 takedown.r#ref.clone() 200 } else { 201 None 202 }; 203 if let Err(e) = sqlx::query!( 204 "UPDATE users SET takedown_ref = $1 WHERE did = $2", 205 takedown_ref, 206 did.as_str() 207 ) 208 .execute(&mut *tx) 209 .await 210 { 211 error!("Failed to update user takedown status for {}: {:?}", did, e); 212 return ApiError::InternalError(Some( 213 "Failed to update takedown status".into(), 214 )) 215 .into_response(); 216 } 217 } 218 if let Some(deactivated) = &input.deactivated { 219 let result = if deactivated.applied { 220 sqlx::query!( 221 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 222 did.as_str() 223 ) 224 .execute(&mut *tx) 225 .await 226 } else { 227 sqlx::query!( 228 "UPDATE users SET deactivated_at = NULL WHERE did = $1", 229 did.as_str() 230 ) 231 .execute(&mut *tx) 232 .await 233 }; 234 if let Err(e) = result { 235 error!( 236 "Failed to update user deactivation status for {}: {:?}", 237 did, e 238 ); 239 return ApiError::InternalError(Some( 240 "Failed to update deactivation status".into(), 241 )) 242 .into_response(); 243 } 244 } 245 if let Err(e) = tx.commit().await { 246 error!("Failed to commit transaction: {:?}", e); 247 return ApiError::InternalError(None).into_response(); 248 } 249 if let Some(takedown) = &input.takedown { 250 let status = if takedown.applied { 251 Some("takendown") 252 } else { 253 None 254 }; 255 if let Err(e) = crate::api::repo::record::sequence_account_event( 256 &state, 257 &did, 258 !takedown.applied, 259 status, 260 ) 261 .await 262 { 263 warn!("Failed to sequence account event for takedown: {}", e); 264 } 265 } 266 if let Some(deactivated) = &input.deactivated { 267 let status = if deactivated.applied { 268 Some("deactivated") 269 } else { 270 None 271 }; 272 if let Err(e) = crate::api::repo::record::sequence_account_event( 273 &state, 274 &did, 275 !deactivated.applied, 276 status, 277 ) 278 .await 279 { 280 warn!("Failed to sequence account event for deactivation: {}", e); 281 } 282 } 283 if let Ok(Some(handle)) = 284 sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did.as_str()) 285 .fetch_optional(&state.db) 286 .await 287 { 288 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 289 } 290 return ( 291 StatusCode::OK, 292 Json(json!({ 293 "subject": input.subject, 294 "takedown": input.takedown.as_ref().map(|t| json!({ 295 "applied": t.applied, 296 "ref": t.r#ref 297 })), 298 "deactivated": input.deactivated.as_ref().map(|d| json!({ 299 "applied": d.applied 300 })) 301 })), 302 ) 303 .into_response(); 304 } 305 } 306 Some("com.atproto.repo.strongRef") => { 307 let uri = input.subject.get("uri").and_then(|u| u.as_str()); 308 if let Some(uri) = uri { 309 if let Some(takedown) = &input.takedown { 310 let takedown_ref = if takedown.applied { 311 takedown.r#ref.clone() 312 } else { 313 None 314 }; 315 if let Err(e) = sqlx::query!( 316 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2", 317 takedown_ref, 318 uri 319 ) 320 .execute(&state.db) 321 .await 322 { 323 error!( 324 "Failed to update record takedown status for {}: {:?}", 325 uri, e 326 ); 327 return ApiError::InternalError(Some( 328 "Failed to update takedown status".into(), 329 )) 330 .into_response(); 331 } 332 } 333 return ( 334 StatusCode::OK, 335 Json(json!({ 336 "subject": input.subject, 337 "takedown": input.takedown.as_ref().map(|t| json!({ 338 "applied": t.applied, 339 "ref": t.r#ref 340 })) 341 })), 342 ) 343 .into_response(); 344 } 345 } 346 Some("com.atproto.admin.defs#repoBlobRef") => { 347 let cid = input.subject.get("cid").and_then(|c| c.as_str()); 348 if let Some(cid) = cid { 349 if let Some(takedown) = &input.takedown { 350 let takedown_ref = if takedown.applied { 351 takedown.r#ref.clone() 352 } else { 353 None 354 }; 355 if let Err(e) = sqlx::query!( 356 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2", 357 takedown_ref, 358 cid 359 ) 360 .execute(&state.db) 361 .await 362 { 363 error!("Failed to update blob takedown status for {}: {:?}", cid, e); 364 return ApiError::InternalError(Some( 365 "Failed to update takedown status".into(), 366 )) 367 .into_response(); 368 } 369 } 370 return ( 371 StatusCode::OK, 372 Json(json!({ 373 "subject": input.subject, 374 "takedown": input.takedown.as_ref().map(|t| json!({ 375 "applied": t.applied, 376 "ref": t.r#ref 377 })) 378 })), 379 ) 380 .into_response(); 381 } 382 } 383 _ => {} 384 } 385 ApiError::InvalidRequest("Invalid subject type".into()).into_response() 386}