this repo has no description
1use crate::api::ApiError;
2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker};
3use crate::plc::{PlcClient, PlcError, signing_key_to_did_key, validate_plc_operation};
4use crate::state::AppState;
5use axum::{
6 Json,
7 extract::State,
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use k256::ecdsa::SigningKey;
12use serde::Deserialize;
13use serde_json::{Value, json};
14use tracing::{error, info, warn};
15
16#[derive(Debug, Deserialize)]
17pub struct SubmitPlcOperationInput {
18 pub operation: Value,
19}
20
21pub async fn submit_plc_operation(
22 State(state): State<AppState>,
23 headers: axum::http::HeaderMap,
24 Json(input): Json<SubmitPlcOperationInput>,
25) -> Response {
26 let bearer = match crate::auth::extract_bearer_token_from_header(
27 headers.get("Authorization").and_then(|h| h.to_str().ok()),
28 ) {
29 Some(t) => t,
30 None => return ApiError::AuthenticationRequired.into_response(),
31 };
32 let auth_user =
33 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await {
34 Ok(user) => user,
35 Err(e) => return ApiError::from(e).into_response(),
36 };
37 if let Err(e) = crate::auth::scope_check::check_identity_scope(
38 auth_user.is_oauth,
39 auth_user.scope.as_deref(),
40 crate::oauth::scopes::IdentityAttr::Wildcard,
41 ) {
42 return e;
43 }
44 let did = &auth_user.did;
45 if let Err(e) = validate_plc_operation(&input.operation) {
46 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response();
47 }
48 let op = &input.operation;
49 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
50 let public_url = format!("https://{}", hostname);
51 let user = match sqlx::query!(
52 "SELECT id, handle, deactivated_at FROM users WHERE did = $1",
53 did
54 )
55 .fetch_optional(&state.db)
56 .await
57 {
58 Ok(Some(row)) => row,
59 _ => {
60 return (
61 StatusCode::NOT_FOUND,
62 Json(json!({"error": "AccountNotFound"})),
63 )
64 .into_response();
65 }
66 };
67 let is_migration = user.deactivated_at.is_some();
68 let key_row = match sqlx::query!(
69 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
70 user.id
71 )
72 .fetch_optional(&state.db)
73 .await
74 {
75 Ok(Some(row)) => row,
76 _ => {
77 return (
78 StatusCode::INTERNAL_SERVER_ERROR,
79 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
80 )
81 .into_response();
82 }
83 };
84 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
85 {
86 Ok(k) => k,
87 Err(e) => {
88 error!("Failed to decrypt user key: {}", e);
89 return (
90 StatusCode::INTERNAL_SERVER_ERROR,
91 Json(json!({"error": "InternalError"})),
92 )
93 .into_response();
94 }
95 };
96 let signing_key = match SigningKey::from_slice(&key_bytes) {
97 Ok(k) => k,
98 Err(e) => {
99 error!("Failed to create signing key: {:?}", e);
100 return (
101 StatusCode::INTERNAL_SERVER_ERROR,
102 Json(json!({"error": "InternalError"})),
103 )
104 .into_response();
105 }
106 };
107 let user_did_key = signing_key_to_did_key(&signing_key);
108 if !is_migration && let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array())
109 {
110 let server_rotation_key =
111 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone());
112 let has_server_key = rotation_keys
113 .iter()
114 .any(|k| k.as_str() == Some(&server_rotation_key));
115 if !has_server_key {
116 return (
117 StatusCode::BAD_REQUEST,
118 Json(json!({
119 "error": "InvalidRequest",
120 "message": "Rotation keys do not include server's rotation key"
121 })),
122 )
123 .into_response();
124 }
125 }
126 if let Some(services) = op.get("services").and_then(|v| v.as_object())
127 && let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object())
128 {
129 let service_type = pds.get("type").and_then(|v| v.as_str());
130 let endpoint = pds.get("endpoint").and_then(|v| v.as_str());
131 if service_type != Some("AtprotoPersonalDataServer") {
132 return (
133 StatusCode::BAD_REQUEST,
134 Json(json!({
135 "error": "InvalidRequest",
136 "message": "Incorrect type on atproto_pds service"
137 })),
138 )
139 .into_response();
140 }
141 if endpoint != Some(&public_url) {
142 return (
143 StatusCode::BAD_REQUEST,
144 Json(json!({
145 "error": "InvalidRequest",
146 "message": "Incorrect endpoint on atproto_pds service"
147 })),
148 )
149 .into_response();
150 }
151 }
152 if !is_migration {
153 if let Some(verification_methods) =
154 op.get("verificationMethods").and_then(|v| v.as_object())
155 && let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str())
156 && atproto_key != user_did_key
157 {
158 return (
159 StatusCode::BAD_REQUEST,
160 Json(json!({
161 "error": "InvalidRequest",
162 "message": "Incorrect signing key in verificationMethods"
163 })),
164 )
165 .into_response();
166 }
167 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) {
168 let expected_handle = format!("at://{}", user.handle);
169 let first_aka = also_known_as.first().and_then(|v| v.as_str());
170 if first_aka != Some(&expected_handle) {
171 return (
172 StatusCode::BAD_REQUEST,
173 Json(json!({
174 "error": "InvalidRequest",
175 "message": "Incorrect handle in alsoKnownAs"
176 })),
177 )
178 .into_response();
179 }
180 }
181 }
182 let plc_client = PlcClient::new(None);
183 let operation_clone = input.operation.clone();
184 let did_clone = did.clone();
185 let result: Result<(), CircuitBreakerError<PlcError>> =
186 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async {
187 plc_client
188 .send_operation(&did_clone, &operation_clone)
189 .await
190 })
191 .await;
192 match result {
193 Ok(()) => {}
194 Err(CircuitBreakerError::CircuitOpen(e)) => {
195 warn!("PLC directory circuit breaker open: {}", e);
196 return (
197 StatusCode::SERVICE_UNAVAILABLE,
198 Json(json!({
199 "error": "ServiceUnavailable",
200 "message": "PLC directory service temporarily unavailable"
201 })),
202 )
203 .into_response();
204 }
205 Err(CircuitBreakerError::OperationFailed(e)) => {
206 error!("Failed to submit PLC operation: {:?}", e);
207 return (
208 StatusCode::BAD_GATEWAY,
209 Json(json!({
210 "error": "UpstreamError",
211 "message": format!("Failed to submit to PLC directory: {}", e)
212 })),
213 )
214 .into_response();
215 }
216 }
217 match sqlx::query!(
218 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq",
219 did
220 )
221 .fetch_one(&state.db)
222 .await
223 {
224 Ok(row) => {
225 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq))
226 .execute(&state.db)
227 .await
228 {
229 warn!("Failed to notify identity event: {:?}", e);
230 }
231 }
232 Err(e) => {
233 warn!("Failed to sequence identity event: {:?}", e);
234 }
235 }
236 info!("Submitted PLC operation for user {}", did);
237 (StatusCode::OK, Json(json!({}))).into_response()
238}