this repo has no description
1use crate::api::ApiError;
2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker};
3use crate::plc::{PlcClient, PlcError, signing_key_to_did_key, validate_plc_operation};
4use crate::state::AppState;
5use axum::{
6 Json,
7 extract::State,
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use k256::ecdsa::SigningKey;
12use serde::Deserialize;
13use serde_json::{Value, json};
14use tracing::{error, info, warn};
15
16#[derive(Debug, Deserialize)]
17pub struct SubmitPlcOperationInput {
18 pub operation: Value,
19}
20
21pub async fn submit_plc_operation(
22 State(state): State<AppState>,
23 headers: axum::http::HeaderMap,
24 Json(input): Json<SubmitPlcOperationInput>,
25) -> Response {
26 let bearer = match crate::auth::extract_bearer_token_from_header(
27 headers.get("Authorization").and_then(|h| h.to_str().ok()),
28 ) {
29 Some(t) => t,
30 None => {
31 return ApiError::AuthenticationRequired.into_response();
32 }
33 };
34 let auth_user =
35 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await {
36 Ok(user) => user,
37 Err(e) => {
38 return ApiError::from(e).into_response();
39 }
40 };
41 if let Err(e) = crate::auth::scope_check::check_identity_scope(
42 auth_user.is_oauth,
43 auth_user.scope.as_deref(),
44 crate::oauth::scopes::IdentityAttr::Wildcard,
45 ) {
46 return e;
47 }
48 let did = &auth_user.did;
49 if did.starts_with("did:web:") {
50 return ApiError::InvalidRequest(
51 "PLC operations are only valid for did:plc identities".into(),
52 )
53 .into_response();
54 }
55 if let Err(e) = validate_plc_operation(&input.operation) {
56 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response();
57 }
58 let op = &input.operation;
59 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
60 let public_url = format!("https://{}", hostname);
61 let user = match sqlx::query!(
62 "SELECT id, handle FROM users WHERE did = $1",
63 did
64 )
65 .fetch_optional(&state.db)
66 .await
67 {
68 Ok(Some(row)) => row,
69 _ => {
70 return (
71 StatusCode::NOT_FOUND,
72 Json(json!({"error": "AccountNotFound"})),
73 )
74 .into_response();
75 }
76 };
77 let key_row = match sqlx::query!(
78 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
79 user.id
80 )
81 .fetch_optional(&state.db)
82 .await
83 {
84 Ok(Some(row)) => row,
85 _ => {
86 return (
87 StatusCode::INTERNAL_SERVER_ERROR,
88 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
89 )
90 .into_response();
91 }
92 };
93 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
94 {
95 Ok(k) => k,
96 Err(e) => {
97 error!("Failed to decrypt user key: {}", e);
98 return (
99 StatusCode::INTERNAL_SERVER_ERROR,
100 Json(json!({"error": "InternalError"})),
101 )
102 .into_response();
103 }
104 };
105 let signing_key = match SigningKey::from_slice(&key_bytes) {
106 Ok(k) => k,
107 Err(e) => {
108 error!("Failed to create signing key: {:?}", e);
109 return (
110 StatusCode::INTERNAL_SERVER_ERROR,
111 Json(json!({"error": "InternalError"})),
112 )
113 .into_response();
114 }
115 };
116 let user_did_key = signing_key_to_did_key(&signing_key);
117 let server_rotation_key =
118 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone());
119 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) {
120 let has_server_key = rotation_keys
121 .iter()
122 .any(|k| k.as_str() == Some(&server_rotation_key));
123 if !has_server_key {
124 return (
125 StatusCode::BAD_REQUEST,
126 Json(json!({
127 "error": "InvalidRequest",
128 "message": "Rotation keys do not include server's rotation key"
129 })),
130 )
131 .into_response();
132 }
133 }
134 if let Some(services) = op.get("services").and_then(|v| v.as_object())
135 && let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object())
136 {
137 let service_type = pds.get("type").and_then(|v| v.as_str());
138 let endpoint = pds.get("endpoint").and_then(|v| v.as_str());
139 if service_type != Some("AtprotoPersonalDataServer") {
140 return (
141 StatusCode::BAD_REQUEST,
142 Json(json!({
143 "error": "InvalidRequest",
144 "message": "Incorrect type on atproto_pds service"
145 })),
146 )
147 .into_response();
148 }
149 if endpoint != Some(&public_url) {
150 return (
151 StatusCode::BAD_REQUEST,
152 Json(json!({
153 "error": "InvalidRequest",
154 "message": "Incorrect endpoint on atproto_pds service"
155 })),
156 )
157 .into_response();
158 }
159 }
160 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object())
161 && let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str())
162 && atproto_key != user_did_key
163 {
164 return (
165 StatusCode::BAD_REQUEST,
166 Json(json!({
167 "error": "InvalidRequest",
168 "message": "Incorrect signing key in verificationMethods"
169 })),
170 )
171 .into_response();
172 }
173 if !user.handle.is_empty() {
174 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) {
175 let expected_handle = format!("at://{}", user.handle);
176 let first_aka = also_known_as.first().and_then(|v| v.as_str());
177 if first_aka != Some(&expected_handle) {
178 return (
179 StatusCode::BAD_REQUEST,
180 Json(json!({
181 "error": "InvalidRequest",
182 "message": "Incorrect handle in alsoKnownAs"
183 })),
184 )
185 .into_response();
186 }
187 }
188 }
189 let plc_client = PlcClient::new(None);
190 let operation_clone = input.operation.clone();
191 let did_clone = did.clone();
192 let result: Result<(), CircuitBreakerError<PlcError>> =
193 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async {
194 plc_client
195 .send_operation(&did_clone, &operation_clone)
196 .await
197 })
198 .await;
199 match result {
200 Ok(()) => {}
201 Err(CircuitBreakerError::CircuitOpen(e)) => {
202 warn!("PLC directory circuit breaker open: {}", e);
203 return (
204 StatusCode::SERVICE_UNAVAILABLE,
205 Json(json!({
206 "error": "ServiceUnavailable",
207 "message": "PLC directory service temporarily unavailable"
208 })),
209 )
210 .into_response();
211 }
212 Err(CircuitBreakerError::OperationFailed(e)) => {
213 error!("PLC operation failed: {:?}", e);
214 return (
215 StatusCode::BAD_GATEWAY,
216 Json(json!({
217 "error": "UpstreamError",
218 "message": format!("Failed to submit to PLC directory: {}", e)
219 })),
220 )
221 .into_response();
222 }
223 }
224 match sqlx::query!(
225 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq",
226 did
227 )
228 .fetch_one(&state.db)
229 .await
230 {
231 Ok(row) => {
232 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq))
233 .execute(&state.db)
234 .await
235 {
236 warn!("Failed to notify identity event: {:?}", e);
237 }
238 }
239 Err(e) => {
240 warn!("Failed to sequence identity event: {:?}", e);
241 }
242 }
243 let _ = state.cache.delete(&format!("handle:{}", user.handle)).await;
244 if state.did_resolver.refresh_did(did).await.is_none() {
245 warn!(did = %did, "Failed to refresh DID cache after PLC update");
246 }
247 info!(did = %did, "PLC operation submitted successfully");
248 (StatusCode::OK, Json(json!({}))).into_response()
249}