this repo has no description
1use crate::api::ApiError;
2use crate::circuit_breaker::{with_circuit_breaker, CircuitBreakerError};
3use crate::plc::{signing_key_to_did_key, validate_plc_operation, PlcClient, PlcError};
4use crate::state::AppState;
5use axum::{
6 extract::State,
7 http::StatusCode,
8 response::{IntoResponse, Response},
9 Json,
10};
11use k256::ecdsa::SigningKey;
12use serde::Deserialize;
13use serde_json::{json, Value};
14use tracing::{error, info, warn};
15
16#[derive(Debug, Deserialize)]
17pub struct SubmitPlcOperationInput {
18 pub operation: Value,
19}
20
21pub async fn submit_plc_operation(
22 State(state): State<AppState>,
23 headers: axum::http::HeaderMap,
24 Json(input): Json<SubmitPlcOperationInput>,
25) -> Response {
26 let bearer = match crate::auth::extract_bearer_token_from_header(
27 headers.get("Authorization").and_then(|h| h.to_str().ok()),
28 ) {
29 Some(t) => t,
30 None => return ApiError::AuthenticationRequired.into_response(),
31 };
32 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await {
33 Ok(user) => user,
34 Err(e) => return ApiError::from(e).into_response(),
35 };
36 let did = &auth_user.did;
37 if let Err(e) = validate_plc_operation(&input.operation) {
38 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response();
39 }
40 let op = &input.operation;
41 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
42 let public_url = format!("https://{}", hostname);
43 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did)
44 .fetch_optional(&state.db)
45 .await
46 {
47 Ok(Some(row)) => row,
48 _ => {
49 return (
50 StatusCode::NOT_FOUND,
51 Json(json!({"error": "AccountNotFound"})),
52 )
53 .into_response();
54 }
55 };
56 let key_row = match sqlx::query!(
57 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
58 user.id
59 )
60 .fetch_optional(&state.db)
61 .await
62 {
63 Ok(Some(row)) => row,
64 _ => {
65 return (
66 StatusCode::INTERNAL_SERVER_ERROR,
67 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
68 )
69 .into_response();
70 }
71 };
72 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
73 {
74 Ok(k) => k,
75 Err(e) => {
76 error!("Failed to decrypt user key: {}", e);
77 return (
78 StatusCode::INTERNAL_SERVER_ERROR,
79 Json(json!({"error": "InternalError"})),
80 )
81 .into_response();
82 }
83 };
84 let signing_key = match SigningKey::from_slice(&key_bytes) {
85 Ok(k) => k,
86 Err(e) => {
87 error!("Failed to create signing key: {:?}", e);
88 return (
89 StatusCode::INTERNAL_SERVER_ERROR,
90 Json(json!({"error": "InternalError"})),
91 )
92 .into_response();
93 }
94 };
95 let user_did_key = signing_key_to_did_key(&signing_key);
96 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) {
97 let server_rotation_key =
98 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone());
99 let has_server_key = rotation_keys
100 .iter()
101 .any(|k| k.as_str() == Some(&server_rotation_key));
102 if !has_server_key {
103 return (
104 StatusCode::BAD_REQUEST,
105 Json(json!({
106 "error": "InvalidRequest",
107 "message": "Rotation keys do not include server's rotation key"
108 })),
109 )
110 .into_response();
111 }
112 }
113 if let Some(services) = op.get("services").and_then(|v| v.as_object()) {
114 if let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) {
115 let service_type = pds.get("type").and_then(|v| v.as_str());
116 let endpoint = pds.get("endpoint").and_then(|v| v.as_str());
117 if service_type != Some("AtprotoPersonalDataServer") {
118 return (
119 StatusCode::BAD_REQUEST,
120 Json(json!({
121 "error": "InvalidRequest",
122 "message": "Incorrect type on atproto_pds service"
123 })),
124 )
125 .into_response();
126 }
127 if endpoint != Some(&public_url) {
128 return (
129 StatusCode::BAD_REQUEST,
130 Json(json!({
131 "error": "InvalidRequest",
132 "message": "Incorrect endpoint on atproto_pds service"
133 })),
134 )
135 .into_response();
136 }
137 }
138 }
139 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) {
140 if let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) {
141 if atproto_key != user_did_key {
142 return (
143 StatusCode::BAD_REQUEST,
144 Json(json!({
145 "error": "InvalidRequest",
146 "message": "Incorrect signing key in verificationMethods"
147 })),
148 )
149 .into_response();
150 }
151 }
152 }
153 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) {
154 let expected_handle = format!("at://{}", user.handle);
155 let first_aka = also_known_as.first().and_then(|v| v.as_str());
156 if first_aka != Some(&expected_handle) {
157 return (
158 StatusCode::BAD_REQUEST,
159 Json(json!({
160 "error": "InvalidRequest",
161 "message": "Incorrect handle in alsoKnownAs"
162 })),
163 )
164 .into_response();
165 }
166 }
167 let plc_client = PlcClient::new(None);
168 let operation_clone = input.operation.clone();
169 let did_clone = did.clone();
170 let result: Result<(), CircuitBreakerError<PlcError>> = with_circuit_breaker(
171 &state.circuit_breakers.plc_directory,
172 || async { plc_client.send_operation(&did_clone, &operation_clone).await },
173 )
174 .await;
175 match result {
176 Ok(()) => {}
177 Err(CircuitBreakerError::CircuitOpen(e)) => {
178 warn!("PLC directory circuit breaker open: {}", e);
179 return (
180 StatusCode::SERVICE_UNAVAILABLE,
181 Json(json!({
182 "error": "ServiceUnavailable",
183 "message": "PLC directory service temporarily unavailable"
184 })),
185 )
186 .into_response();
187 }
188 Err(CircuitBreakerError::OperationFailed(e)) => {
189 error!("Failed to submit PLC operation: {:?}", e);
190 return (
191 StatusCode::BAD_GATEWAY,
192 Json(json!({
193 "error": "UpstreamError",
194 "message": format!("Failed to submit to PLC directory: {}", e)
195 })),
196 )
197 .into_response();
198 }
199 }
200 match sqlx::query!(
201 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq",
202 did
203 )
204 .fetch_one(&state.db)
205 .await
206 {
207 Ok(row) => {
208 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq))
209 .execute(&state.db)
210 .await
211 {
212 warn!("Failed to notify identity event: {:?}", e);
213 }
214 }
215 Err(e) => {
216 warn!("Failed to sequence identity event: {:?}", e);
217 }
218 }
219 info!("Submitted PLC operation for user {}", did);
220 (StatusCode::OK, Json(json!({}))).into_response()
221}