this repo has no description
1use crate::api::ApiError;
2use crate::circuit_breaker::{with_circuit_breaker, CircuitBreakerError};
3use crate::plc::{signing_key_to_did_key, validate_plc_operation, PlcClient, PlcError};
4use crate::state::AppState;
5use axum::{
6 extract::State,
7 http::StatusCode,
8 response::{IntoResponse, Response},
9 Json,
10};
11use k256::ecdsa::SigningKey;
12use serde::Deserialize;
13use serde_json::{json, Value};
14use tracing::{error, info, warn};
15#[derive(Debug, Deserialize)]
16pub struct SubmitPlcOperationInput {
17 pub operation: Value,
18}
19pub async fn submit_plc_operation(
20 State(state): State<AppState>,
21 headers: axum::http::HeaderMap,
22 Json(input): Json<SubmitPlcOperationInput>,
23) -> Response {
24 let bearer = match crate::auth::extract_bearer_token_from_header(
25 headers.get("Authorization").and_then(|h| h.to_str().ok()),
26 ) {
27 Some(t) => t,
28 None => return ApiError::AuthenticationRequired.into_response(),
29 };
30 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await {
31 Ok(user) => user,
32 Err(e) => return ApiError::from(e).into_response(),
33 };
34 let did = &auth_user.did;
35 if let Err(e) = validate_plc_operation(&input.operation) {
36 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response();
37 }
38 let op = &input.operation;
39 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
40 let public_url = format!("https://{}", hostname);
41 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did)
42 .fetch_optional(&state.db)
43 .await
44 {
45 Ok(Some(row)) => row,
46 _ => {
47 return (
48 StatusCode::NOT_FOUND,
49 Json(json!({"error": "AccountNotFound"})),
50 )
51 .into_response();
52 }
53 };
54 let key_row = match sqlx::query!(
55 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
56 user.id
57 )
58 .fetch_optional(&state.db)
59 .await
60 {
61 Ok(Some(row)) => row,
62 _ => {
63 return (
64 StatusCode::INTERNAL_SERVER_ERROR,
65 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
66 )
67 .into_response();
68 }
69 };
70 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
71 {
72 Ok(k) => k,
73 Err(e) => {
74 error!("Failed to decrypt user key: {}", e);
75 return (
76 StatusCode::INTERNAL_SERVER_ERROR,
77 Json(json!({"error": "InternalError"})),
78 )
79 .into_response();
80 }
81 };
82 let signing_key = match SigningKey::from_slice(&key_bytes) {
83 Ok(k) => k,
84 Err(e) => {
85 error!("Failed to create signing key: {:?}", e);
86 return (
87 StatusCode::INTERNAL_SERVER_ERROR,
88 Json(json!({"error": "InternalError"})),
89 )
90 .into_response();
91 }
92 };
93 let user_did_key = signing_key_to_did_key(&signing_key);
94 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) {
95 let server_rotation_key =
96 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone());
97 let has_server_key = rotation_keys
98 .iter()
99 .any(|k| k.as_str() == Some(&server_rotation_key));
100 if !has_server_key {
101 return (
102 StatusCode::BAD_REQUEST,
103 Json(json!({
104 "error": "InvalidRequest",
105 "message": "Rotation keys do not include server's rotation key"
106 })),
107 )
108 .into_response();
109 }
110 }
111 if let Some(services) = op.get("services").and_then(|v| v.as_object()) {
112 if let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) {
113 let service_type = pds.get("type").and_then(|v| v.as_str());
114 let endpoint = pds.get("endpoint").and_then(|v| v.as_str());
115 if service_type != Some("AtprotoPersonalDataServer") {
116 return (
117 StatusCode::BAD_REQUEST,
118 Json(json!({
119 "error": "InvalidRequest",
120 "message": "Incorrect type on atproto_pds service"
121 })),
122 )
123 .into_response();
124 }
125 if endpoint != Some(&public_url) {
126 return (
127 StatusCode::BAD_REQUEST,
128 Json(json!({
129 "error": "InvalidRequest",
130 "message": "Incorrect endpoint on atproto_pds service"
131 })),
132 )
133 .into_response();
134 }
135 }
136 }
137 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) {
138 if let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) {
139 if atproto_key != user_did_key {
140 return (
141 StatusCode::BAD_REQUEST,
142 Json(json!({
143 "error": "InvalidRequest",
144 "message": "Incorrect signing key in verificationMethods"
145 })),
146 )
147 .into_response();
148 }
149 }
150 }
151 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) {
152 let expected_handle = format!("at://{}", user.handle);
153 let first_aka = also_known_as.first().and_then(|v| v.as_str());
154 if first_aka != Some(&expected_handle) {
155 return (
156 StatusCode::BAD_REQUEST,
157 Json(json!({
158 "error": "InvalidRequest",
159 "message": "Incorrect handle in alsoKnownAs"
160 })),
161 )
162 .into_response();
163 }
164 }
165 let plc_client = PlcClient::new(None);
166 let operation_clone = input.operation.clone();
167 let did_clone = did.clone();
168 let result: Result<(), CircuitBreakerError<PlcError>> = with_circuit_breaker(
169 &state.circuit_breakers.plc_directory,
170 || async { plc_client.send_operation(&did_clone, &operation_clone).await },
171 )
172 .await;
173 match result {
174 Ok(()) => {}
175 Err(CircuitBreakerError::CircuitOpen(e)) => {
176 warn!("PLC directory circuit breaker open: {}", e);
177 return (
178 StatusCode::SERVICE_UNAVAILABLE,
179 Json(json!({
180 "error": "ServiceUnavailable",
181 "message": "PLC directory service temporarily unavailable"
182 })),
183 )
184 .into_response();
185 }
186 Err(CircuitBreakerError::OperationFailed(e)) => {
187 error!("Failed to submit PLC operation: {:?}", e);
188 return (
189 StatusCode::BAD_GATEWAY,
190 Json(json!({
191 "error": "UpstreamError",
192 "message": format!("Failed to submit to PLC directory: {}", e)
193 })),
194 )
195 .into_response();
196 }
197 }
198 match sqlx::query!(
199 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq",
200 did
201 )
202 .fetch_one(&state.db)
203 .await
204 {
205 Ok(row) => {
206 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq))
207 .execute(&state.db)
208 .await
209 {
210 warn!("Failed to notify identity event: {:?}", e);
211 }
212 }
213 Err(e) => {
214 warn!("Failed to sequence identity event: {:?}", e);
215 }
216 }
217 info!("Submitted PLC operation for user {}", did);
218 (StatusCode::OK, Json(json!({}))).into_response()
219}