this repo has no description
1use crate::api::ApiError;
2use crate::circuit_breaker::{with_circuit_breaker, CircuitBreakerError};
3use crate::plc::{signing_key_to_did_key, validate_plc_operation, PlcClient, PlcError};
4use crate::state::AppState;
5use axum::{
6 extract::State,
7 http::StatusCode,
8 response::{IntoResponse, Response},
9 Json,
10};
11use k256::ecdsa::SigningKey;
12use serde::Deserialize;
13use serde_json::{json, Value};
14use tracing::{error, info, warn};
15
16#[derive(Debug, Deserialize)]
17pub struct SubmitPlcOperationInput {
18 pub operation: Value,
19}
20
21pub async fn submit_plc_operation(
22 State(state): State<AppState>,
23 headers: axum::http::HeaderMap,
24 Json(input): Json<SubmitPlcOperationInput>,
25) -> Response {
26 let bearer = match crate::auth::extract_bearer_token_from_header(
27 headers.get("Authorization").and_then(|h| h.to_str().ok()),
28 ) {
29 Some(t) => t,
30 None => return ApiError::AuthenticationRequired.into_response(),
31 };
32
33 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await {
34 Ok(user) => user,
35 Err(e) => return ApiError::from(e).into_response(),
36 };
37
38 let did = &auth_user.did;
39
40 if let Err(e) = validate_plc_operation(&input.operation) {
41 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response();
42 }
43
44 let op = &input.operation;
45 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
46 let public_url = format!("https://{}", hostname);
47
48 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did)
49 .fetch_optional(&state.db)
50 .await
51 {
52 Ok(Some(row)) => row,
53 _ => {
54 return (
55 StatusCode::NOT_FOUND,
56 Json(json!({"error": "AccountNotFound"})),
57 )
58 .into_response();
59 }
60 };
61
62 let key_row = match sqlx::query!(
63 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
64 user.id
65 )
66 .fetch_optional(&state.db)
67 .await
68 {
69 Ok(Some(row)) => row,
70 _ => {
71 return (
72 StatusCode::INTERNAL_SERVER_ERROR,
73 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
74 )
75 .into_response();
76 }
77 };
78
79 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
80 {
81 Ok(k) => k,
82 Err(e) => {
83 error!("Failed to decrypt user key: {}", e);
84 return (
85 StatusCode::INTERNAL_SERVER_ERROR,
86 Json(json!({"error": "InternalError"})),
87 )
88 .into_response();
89 }
90 };
91
92 let signing_key = match SigningKey::from_slice(&key_bytes) {
93 Ok(k) => k,
94 Err(e) => {
95 error!("Failed to create signing key: {:?}", e);
96 return (
97 StatusCode::INTERNAL_SERVER_ERROR,
98 Json(json!({"error": "InternalError"})),
99 )
100 .into_response();
101 }
102 };
103
104 let user_did_key = signing_key_to_did_key(&signing_key);
105
106 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) {
107 let server_rotation_key =
108 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone());
109
110 let has_server_key = rotation_keys
111 .iter()
112 .any(|k| k.as_str() == Some(&server_rotation_key));
113
114 if !has_server_key {
115 return (
116 StatusCode::BAD_REQUEST,
117 Json(json!({
118 "error": "InvalidRequest",
119 "message": "Rotation keys do not include server's rotation key"
120 })),
121 )
122 .into_response();
123 }
124 }
125
126 if let Some(services) = op.get("services").and_then(|v| v.as_object()) {
127 if let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) {
128 let service_type = pds.get("type").and_then(|v| v.as_str());
129 let endpoint = pds.get("endpoint").and_then(|v| v.as_str());
130
131 if service_type != Some("AtprotoPersonalDataServer") {
132 return (
133 StatusCode::BAD_REQUEST,
134 Json(json!({
135 "error": "InvalidRequest",
136 "message": "Incorrect type on atproto_pds service"
137 })),
138 )
139 .into_response();
140 }
141
142 if endpoint != Some(&public_url) {
143 return (
144 StatusCode::BAD_REQUEST,
145 Json(json!({
146 "error": "InvalidRequest",
147 "message": "Incorrect endpoint on atproto_pds service"
148 })),
149 )
150 .into_response();
151 }
152 }
153 }
154
155 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) {
156 if let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) {
157 if atproto_key != user_did_key {
158 return (
159 StatusCode::BAD_REQUEST,
160 Json(json!({
161 "error": "InvalidRequest",
162 "message": "Incorrect signing key in verificationMethods"
163 })),
164 )
165 .into_response();
166 }
167 }
168 }
169
170 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) {
171 let expected_handle = format!("at://{}", user.handle);
172 let first_aka = also_known_as.first().and_then(|v| v.as_str());
173
174 if first_aka != Some(&expected_handle) {
175 return (
176 StatusCode::BAD_REQUEST,
177 Json(json!({
178 "error": "InvalidRequest",
179 "message": "Incorrect handle in alsoKnownAs"
180 })),
181 )
182 .into_response();
183 }
184 }
185
186 let plc_client = PlcClient::new(None);
187 let operation_clone = input.operation.clone();
188 let did_clone = did.clone();
189 let result: Result<(), CircuitBreakerError<PlcError>> = with_circuit_breaker(
190 &state.circuit_breakers.plc_directory,
191 || async { plc_client.send_operation(&did_clone, &operation_clone).await },
192 )
193 .await;
194
195 match result {
196 Ok(()) => {}
197 Err(CircuitBreakerError::CircuitOpen(e)) => {
198 warn!("PLC directory circuit breaker open: {}", e);
199 return (
200 StatusCode::SERVICE_UNAVAILABLE,
201 Json(json!({
202 "error": "ServiceUnavailable",
203 "message": "PLC directory service temporarily unavailable"
204 })),
205 )
206 .into_response();
207 }
208 Err(CircuitBreakerError::OperationFailed(e)) => {
209 error!("Failed to submit PLC operation: {:?}", e);
210 return (
211 StatusCode::BAD_GATEWAY,
212 Json(json!({
213 "error": "UpstreamError",
214 "message": format!("Failed to submit to PLC directory: {}", e)
215 })),
216 )
217 .into_response();
218 }
219 }
220
221 match sqlx::query!(
222 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq",
223 did
224 )
225 .fetch_one(&state.db)
226 .await
227 {
228 Ok(row) => {
229 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq))
230 .execute(&state.db)
231 .await
232 {
233 warn!("Failed to notify identity event: {:?}", e);
234 }
235 }
236 Err(e) => {
237 warn!("Failed to sequence identity event: {:?}", e);
238 }
239 }
240
241 info!("Submitted PLC operation for user {}", did);
242
243 (StatusCode::OK, Json(json!({}))).into_response()
244}