this repo has no description
1use crate::api::ApiError;
2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker};
3use crate::plc::{PlcClient, PlcError, signing_key_to_did_key, validate_plc_operation};
4use crate::state::AppState;
5use axum::{
6 Json,
7 extract::State,
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use k256::ecdsa::SigningKey;
12use serde::Deserialize;
13use serde_json::{Value, json};
14use tracing::{error, info, warn};
15
16#[derive(Debug, Deserialize)]
17pub struct SubmitPlcOperationInput {
18 pub operation: Value,
19}
20
21pub async fn submit_plc_operation(
22 State(state): State<AppState>,
23 headers: axum::http::HeaderMap,
24 Json(input): Json<SubmitPlcOperationInput>,
25) -> Response {
26 info!("[MIGRATION] submitPlcOperation called");
27 let bearer = match crate::auth::extract_bearer_token_from_header(
28 headers.get("Authorization").and_then(|h| h.to_str().ok()),
29 ) {
30 Some(t) => t,
31 None => {
32 info!("[MIGRATION] submitPlcOperation: No bearer token");
33 return ApiError::AuthenticationRequired.into_response();
34 }
35 };
36 let auth_user =
37 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await {
38 Ok(user) => user,
39 Err(e) => {
40 info!("[MIGRATION] submitPlcOperation: Auth failed: {:?}", e);
41 return ApiError::from(e).into_response();
42 }
43 };
44 info!(
45 "[MIGRATION] submitPlcOperation: Authenticated user did={}",
46 auth_user.did
47 );
48 if let Err(e) = crate::auth::scope_check::check_identity_scope(
49 auth_user.is_oauth,
50 auth_user.scope.as_deref(),
51 crate::oauth::scopes::IdentityAttr::Wildcard,
52 ) {
53 info!("[MIGRATION] submitPlcOperation: Scope check failed");
54 return e;
55 }
56 let did = &auth_user.did;
57 if did.starts_with("did:web:") {
58 return ApiError::InvalidRequest(
59 "PLC operations are only valid for did:plc identities".into(),
60 )
61 .into_response();
62 }
63 if let Err(e) = validate_plc_operation(&input.operation) {
64 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response();
65 }
66 let op = &input.operation;
67 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
68 let public_url = format!("https://{}", hostname);
69 let user = match sqlx::query!(
70 "SELECT id, handle, deactivated_at FROM users WHERE did = $1",
71 did
72 )
73 .fetch_optional(&state.db)
74 .await
75 {
76 Ok(Some(row)) => row,
77 _ => {
78 return (
79 StatusCode::NOT_FOUND,
80 Json(json!({"error": "AccountNotFound"})),
81 )
82 .into_response();
83 }
84 };
85 let is_migration = user.deactivated_at.is_some();
86 let key_row = match sqlx::query!(
87 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
88 user.id
89 )
90 .fetch_optional(&state.db)
91 .await
92 {
93 Ok(Some(row)) => row,
94 _ => {
95 return (
96 StatusCode::INTERNAL_SERVER_ERROR,
97 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
98 )
99 .into_response();
100 }
101 };
102 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
103 {
104 Ok(k) => k,
105 Err(e) => {
106 error!("Failed to decrypt user key: {}", e);
107 return (
108 StatusCode::INTERNAL_SERVER_ERROR,
109 Json(json!({"error": "InternalError"})),
110 )
111 .into_response();
112 }
113 };
114 let signing_key = match SigningKey::from_slice(&key_bytes) {
115 Ok(k) => k,
116 Err(e) => {
117 error!("Failed to create signing key: {:?}", e);
118 return (
119 StatusCode::INTERNAL_SERVER_ERROR,
120 Json(json!({"error": "InternalError"})),
121 )
122 .into_response();
123 }
124 };
125 let user_did_key = signing_key_to_did_key(&signing_key);
126 if !is_migration && let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array())
127 {
128 let server_rotation_key =
129 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone());
130 let has_server_key = rotation_keys
131 .iter()
132 .any(|k| k.as_str() == Some(&server_rotation_key));
133 if !has_server_key {
134 return (
135 StatusCode::BAD_REQUEST,
136 Json(json!({
137 "error": "InvalidRequest",
138 "message": "Rotation keys do not include server's rotation key"
139 })),
140 )
141 .into_response();
142 }
143 }
144 if let Some(services) = op.get("services").and_then(|v| v.as_object())
145 && let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object())
146 {
147 let service_type = pds.get("type").and_then(|v| v.as_str());
148 let endpoint = pds.get("endpoint").and_then(|v| v.as_str());
149 if service_type != Some("AtprotoPersonalDataServer") {
150 return (
151 StatusCode::BAD_REQUEST,
152 Json(json!({
153 "error": "InvalidRequest",
154 "message": "Incorrect type on atproto_pds service"
155 })),
156 )
157 .into_response();
158 }
159 if endpoint != Some(&public_url) {
160 return (
161 StatusCode::BAD_REQUEST,
162 Json(json!({
163 "error": "InvalidRequest",
164 "message": "Incorrect endpoint on atproto_pds service"
165 })),
166 )
167 .into_response();
168 }
169 }
170 if !is_migration {
171 if let Some(verification_methods) =
172 op.get("verificationMethods").and_then(|v| v.as_object())
173 && let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str())
174 && atproto_key != user_did_key
175 {
176 return (
177 StatusCode::BAD_REQUEST,
178 Json(json!({
179 "error": "InvalidRequest",
180 "message": "Incorrect signing key in verificationMethods"
181 })),
182 )
183 .into_response();
184 }
185 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) {
186 let expected_handle = format!("at://{}", user.handle);
187 let first_aka = also_known_as.first().and_then(|v| v.as_str());
188 if first_aka != Some(&expected_handle) {
189 return (
190 StatusCode::BAD_REQUEST,
191 Json(json!({
192 "error": "InvalidRequest",
193 "message": "Incorrect handle in alsoKnownAs"
194 })),
195 )
196 .into_response();
197 }
198 }
199 }
200 let plc_client = PlcClient::new(None);
201 let operation_clone = input.operation.clone();
202 let did_clone = did.clone();
203 info!(
204 "[MIGRATION] submitPlcOperation: Sending operation to PLC directory for did={}",
205 did
206 );
207 let plc_start = std::time::Instant::now();
208 let result: Result<(), CircuitBreakerError<PlcError>> =
209 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async {
210 plc_client
211 .send_operation(&did_clone, &operation_clone)
212 .await
213 })
214 .await;
215 match result {
216 Ok(()) => {
217 info!(
218 "[MIGRATION] submitPlcOperation: PLC directory accepted operation in {:?}",
219 plc_start.elapsed()
220 );
221 }
222 Err(CircuitBreakerError::CircuitOpen(e)) => {
223 warn!(
224 "[MIGRATION] submitPlcOperation: PLC directory circuit breaker open: {}",
225 e
226 );
227 return (
228 StatusCode::SERVICE_UNAVAILABLE,
229 Json(json!({
230 "error": "ServiceUnavailable",
231 "message": "PLC directory service temporarily unavailable"
232 })),
233 )
234 .into_response();
235 }
236 Err(CircuitBreakerError::OperationFailed(e)) => {
237 error!(
238 "[MIGRATION] submitPlcOperation: PLC operation failed: {:?}",
239 e
240 );
241 return (
242 StatusCode::BAD_GATEWAY,
243 Json(json!({
244 "error": "UpstreamError",
245 "message": format!("Failed to submit to PLC directory: {}", e)
246 })),
247 )
248 .into_response();
249 }
250 }
251 info!(
252 "[MIGRATION] submitPlcOperation: Sequencing identity event for did={}",
253 did
254 );
255 match sqlx::query!(
256 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq",
257 did
258 )
259 .fetch_one(&state.db)
260 .await
261 {
262 Ok(row) => {
263 info!(
264 "[MIGRATION] submitPlcOperation: Identity event sequenced with seq={}",
265 row.seq
266 );
267 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq))
268 .execute(&state.db)
269 .await
270 {
271 warn!(
272 "[MIGRATION] submitPlcOperation: Failed to notify identity event: {:?}",
273 e
274 );
275 }
276 }
277 Err(e) => {
278 warn!(
279 "[MIGRATION] submitPlcOperation: Failed to sequence identity event: {:?}",
280 e
281 );
282 }
283 }
284 info!("[MIGRATION] submitPlcOperation: SUCCESS for did={}", did);
285 (StatusCode::OK, Json(json!({}))).into_response()
286}