this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use serde::{Deserialize, Serialize}; 9use serde_json::json; 10use tracing::error; 11 12#[derive(Deserialize)] 13pub struct GetSubjectStatusParams { 14 pub did: Option<String>, 15 pub uri: Option<String>, 16 pub blob: Option<String>, 17} 18 19#[derive(Serialize)] 20pub struct SubjectStatus { 21 pub subject: serde_json::Value, 22 pub takedown: Option<StatusAttr>, 23 pub deactivated: Option<StatusAttr>, 24} 25 26#[derive(Serialize)] 27#[serde(rename_all = "camelCase")] 28pub struct StatusAttr { 29 pub applied: bool, 30 pub r#ref: Option<String>, 31} 32 33pub async fn get_subject_status( 34 State(state): State<AppState>, 35 headers: axum::http::HeaderMap, 36 Query(params): Query<GetSubjectStatusParams>, 37) -> Response { 38 let auth_header = headers.get("Authorization"); 39 if auth_header.is_none() { 40 return ( 41 StatusCode::UNAUTHORIZED, 42 Json(json!({"error": "AuthenticationRequired"})), 43 ) 44 .into_response(); 45 } 46 47 if params.did.is_none() && params.uri.is_none() && params.blob.is_none() { 48 return ( 49 StatusCode::BAD_REQUEST, 50 Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})), 51 ) 52 .into_response(); 53 } 54 55 if let Some(did) = &params.did { 56 let user = sqlx::query!( 57 "SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1", 58 did 59 ) 60 .fetch_optional(&state.db) 61 .await; 62 63 match user { 64 Ok(Some(row)) => { 65 let deactivated = row.deactivated_at.map(|_| StatusAttr { 66 applied: true, 67 r#ref: None, 68 }); 69 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 70 applied: true, 71 r#ref: Some(r.clone()), 72 }); 73 74 return ( 75 StatusCode::OK, 76 Json(SubjectStatus { 77 subject: json!({ 78 "$type": "com.atproto.admin.defs#repoRef", 79 "did": row.did 80 }), 81 takedown, 82 deactivated, 83 }), 84 ) 85 .into_response(); 86 } 87 Ok(None) => { 88 return ( 89 StatusCode::NOT_FOUND, 90 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 91 ) 92 .into_response(); 93 } 94 Err(e) => { 95 error!("DB error in get_subject_status: {:?}", e); 96 return ( 97 StatusCode::INTERNAL_SERVER_ERROR, 98 Json(json!({"error": "InternalError"})), 99 ) 100 .into_response(); 101 } 102 } 103 } 104 105 if let Some(uri) = &params.uri { 106 let record = sqlx::query!( 107 "SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1", 108 uri 109 ) 110 .fetch_optional(&state.db) 111 .await; 112 113 match record { 114 Ok(Some(row)) => { 115 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 116 applied: true, 117 r#ref: Some(r.clone()), 118 }); 119 120 return ( 121 StatusCode::OK, 122 Json(SubjectStatus { 123 subject: json!({ 124 "$type": "com.atproto.repo.strongRef", 125 "uri": uri, 126 "cid": uri 127 }), 128 takedown, 129 deactivated: None, 130 }), 131 ) 132 .into_response(); 133 } 134 Ok(None) => { 135 return ( 136 StatusCode::NOT_FOUND, 137 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 138 ) 139 .into_response(); 140 } 141 Err(e) => { 142 error!("DB error in get_subject_status: {:?}", e); 143 return ( 144 StatusCode::INTERNAL_SERVER_ERROR, 145 Json(json!({"error": "InternalError"})), 146 ) 147 .into_response(); 148 } 149 } 150 } 151 152 if let Some(blob_cid) = &params.blob { 153 let blob = sqlx::query!("SELECT cid, takedown_ref FROM blobs WHERE cid = $1", blob_cid) 154 .fetch_optional(&state.db) 155 .await; 156 157 match blob { 158 Ok(Some(row)) => { 159 let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr { 160 applied: true, 161 r#ref: Some(r.clone()), 162 }); 163 164 return ( 165 StatusCode::OK, 166 Json(SubjectStatus { 167 subject: json!({ 168 "$type": "com.atproto.admin.defs#repoBlobRef", 169 "did": "", 170 "cid": row.cid 171 }), 172 takedown, 173 deactivated: None, 174 }), 175 ) 176 .into_response(); 177 } 178 Ok(None) => { 179 return ( 180 StatusCode::NOT_FOUND, 181 Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})), 182 ) 183 .into_response(); 184 } 185 Err(e) => { 186 error!("DB error in get_subject_status: {:?}", e); 187 return ( 188 StatusCode::INTERNAL_SERVER_ERROR, 189 Json(json!({"error": "InternalError"})), 190 ) 191 .into_response(); 192 } 193 } 194 } 195 196 ( 197 StatusCode::BAD_REQUEST, 198 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 199 ) 200 .into_response() 201} 202 203#[derive(Deserialize)] 204#[serde(rename_all = "camelCase")] 205pub struct UpdateSubjectStatusInput { 206 pub subject: serde_json::Value, 207 pub takedown: Option<StatusAttrInput>, 208 pub deactivated: Option<StatusAttrInput>, 209} 210 211#[derive(Deserialize)] 212pub struct StatusAttrInput { 213 pub apply: bool, 214 pub r#ref: Option<String>, 215} 216 217pub async fn update_subject_status( 218 State(state): State<AppState>, 219 headers: axum::http::HeaderMap, 220 Json(input): Json<UpdateSubjectStatusInput>, 221) -> Response { 222 let auth_header = headers.get("Authorization"); 223 if auth_header.is_none() { 224 return ( 225 StatusCode::UNAUTHORIZED, 226 Json(json!({"error": "AuthenticationRequired"})), 227 ) 228 .into_response(); 229 } 230 231 let subject_type = input.subject.get("$type").and_then(|t| t.as_str()); 232 233 match subject_type { 234 Some("com.atproto.admin.defs#repoRef") => { 235 let did = input.subject.get("did").and_then(|d| d.as_str()); 236 if let Some(did) = did { 237 if let Some(takedown) = &input.takedown { 238 let takedown_ref = if takedown.apply { 239 takedown.r#ref.clone() 240 } else { 241 None 242 }; 243 let _ = sqlx::query!( 244 "UPDATE users SET takedown_ref = $1 WHERE did = $2", 245 takedown_ref, 246 did 247 ) 248 .execute(&state.db) 249 .await; 250 } 251 252 if let Some(deactivated) = &input.deactivated { 253 if deactivated.apply { 254 let _ = sqlx::query!( 255 "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 256 did 257 ) 258 .execute(&state.db) 259 .await; 260 } else { 261 let _ = sqlx::query!( 262 "UPDATE users SET deactivated_at = NULL WHERE did = $1", 263 did 264 ) 265 .execute(&state.db) 266 .await; 267 } 268 } 269 270 return ( 271 StatusCode::OK, 272 Json(json!({ 273 "subject": input.subject, 274 "takedown": input.takedown.as_ref().map(|t| json!({ 275 "applied": t.apply, 276 "ref": t.r#ref 277 })), 278 "deactivated": input.deactivated.as_ref().map(|d| json!({ 279 "applied": d.apply 280 })) 281 })), 282 ) 283 .into_response(); 284 } 285 } 286 Some("com.atproto.repo.strongRef") => { 287 let uri = input.subject.get("uri").and_then(|u| u.as_str()); 288 if let Some(uri) = uri { 289 if let Some(takedown) = &input.takedown { 290 let takedown_ref = if takedown.apply { 291 takedown.r#ref.clone() 292 } else { 293 None 294 }; 295 let _ = sqlx::query!( 296 "UPDATE records SET takedown_ref = $1 WHERE record_cid = $2", 297 takedown_ref, 298 uri 299 ) 300 .execute(&state.db) 301 .await; 302 } 303 304 return ( 305 StatusCode::OK, 306 Json(json!({ 307 "subject": input.subject, 308 "takedown": input.takedown.as_ref().map(|t| json!({ 309 "applied": t.apply, 310 "ref": t.r#ref 311 })) 312 })), 313 ) 314 .into_response(); 315 } 316 } 317 Some("com.atproto.admin.defs#repoBlobRef") => { 318 let cid = input.subject.get("cid").and_then(|c| c.as_str()); 319 if let Some(cid) = cid { 320 if let Some(takedown) = &input.takedown { 321 let takedown_ref = if takedown.apply { 322 takedown.r#ref.clone() 323 } else { 324 None 325 }; 326 let _ = sqlx::query!( 327 "UPDATE blobs SET takedown_ref = $1 WHERE cid = $2", 328 takedown_ref, 329 cid 330 ) 331 .execute(&state.db) 332 .await; 333 } 334 335 return ( 336 StatusCode::OK, 337 Json(json!({ 338 "subject": input.subject, 339 "takedown": input.takedown.as_ref().map(|t| json!({ 340 "applied": t.apply, 341 "ref": t.r#ref 342 })) 343 })), 344 ) 345 .into_response(); 346 } 347 } 348 _ => {} 349 } 350 351 ( 352 StatusCode::BAD_REQUEST, 353 Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})), 354 ) 355 .into_response() 356}