this repo has no description
1use crate::api::ApiError;
2use crate::circuit_breaker::{CircuitBreakerError, with_circuit_breaker};
3use crate::plc::{PlcClient, PlcError, signing_key_to_did_key, validate_plc_operation};
4use crate::state::AppState;
5use axum::{
6 Json,
7 extract::State,
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use k256::ecdsa::SigningKey;
12use serde::Deserialize;
13use serde_json::{Value, json};
14use tracing::{error, info, warn};
15
16#[derive(Debug, Deserialize)]
17pub struct SubmitPlcOperationInput {
18 pub operation: Value,
19}
20
21pub async fn submit_plc_operation(
22 State(state): State<AppState>,
23 headers: axum::http::HeaderMap,
24 Json(input): Json<SubmitPlcOperationInput>,
25) -> Response {
26 let bearer = match crate::auth::extract_bearer_token_from_header(
27 headers.get("Authorization").and_then(|h| h.to_str().ok()),
28 ) {
29 Some(t) => t,
30 None => return ApiError::AuthenticationRequired.into_response(),
31 };
32 let auth_user =
33 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await {
34 Ok(user) => user,
35 Err(e) => return ApiError::from(e).into_response(),
36 };
37 if let Err(e) = crate::auth::scope_check::check_identity_scope(
38 auth_user.is_oauth,
39 auth_user.scope.as_deref(),
40 crate::oauth::scopes::IdentityAttr::Wildcard,
41 ) {
42 return e;
43 }
44 let did = &auth_user.did;
45 if did.starts_with("did:web:") {
46 return ApiError::InvalidRequest(
47 "PLC operations are only valid for did:plc identities".into(),
48 )
49 .into_response();
50 }
51 if let Err(e) = validate_plc_operation(&input.operation) {
52 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response();
53 }
54 let op = &input.operation;
55 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
56 let public_url = format!("https://{}", hostname);
57 let user = match sqlx::query!(
58 "SELECT id, handle, deactivated_at FROM users WHERE did = $1",
59 did
60 )
61 .fetch_optional(&state.db)
62 .await
63 {
64 Ok(Some(row)) => row,
65 _ => {
66 return (
67 StatusCode::NOT_FOUND,
68 Json(json!({"error": "AccountNotFound"})),
69 )
70 .into_response();
71 }
72 };
73 let is_migration = user.deactivated_at.is_some();
74 let key_row = match sqlx::query!(
75 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
76 user.id
77 )
78 .fetch_optional(&state.db)
79 .await
80 {
81 Ok(Some(row)) => row,
82 _ => {
83 return (
84 StatusCode::INTERNAL_SERVER_ERROR,
85 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
86 )
87 .into_response();
88 }
89 };
90 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
91 {
92 Ok(k) => k,
93 Err(e) => {
94 error!("Failed to decrypt user key: {}", e);
95 return (
96 StatusCode::INTERNAL_SERVER_ERROR,
97 Json(json!({"error": "InternalError"})),
98 )
99 .into_response();
100 }
101 };
102 let signing_key = match SigningKey::from_slice(&key_bytes) {
103 Ok(k) => k,
104 Err(e) => {
105 error!("Failed to create signing key: {:?}", e);
106 return (
107 StatusCode::INTERNAL_SERVER_ERROR,
108 Json(json!({"error": "InternalError"})),
109 )
110 .into_response();
111 }
112 };
113 let user_did_key = signing_key_to_did_key(&signing_key);
114 if !is_migration && let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array())
115 {
116 let server_rotation_key =
117 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone());
118 let has_server_key = rotation_keys
119 .iter()
120 .any(|k| k.as_str() == Some(&server_rotation_key));
121 if !has_server_key {
122 return (
123 StatusCode::BAD_REQUEST,
124 Json(json!({
125 "error": "InvalidRequest",
126 "message": "Rotation keys do not include server's rotation key"
127 })),
128 )
129 .into_response();
130 }
131 }
132 if let Some(services) = op.get("services").and_then(|v| v.as_object())
133 && let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object())
134 {
135 let service_type = pds.get("type").and_then(|v| v.as_str());
136 let endpoint = pds.get("endpoint").and_then(|v| v.as_str());
137 if service_type != Some("AtprotoPersonalDataServer") {
138 return (
139 StatusCode::BAD_REQUEST,
140 Json(json!({
141 "error": "InvalidRequest",
142 "message": "Incorrect type on atproto_pds service"
143 })),
144 )
145 .into_response();
146 }
147 if endpoint != Some(&public_url) {
148 return (
149 StatusCode::BAD_REQUEST,
150 Json(json!({
151 "error": "InvalidRequest",
152 "message": "Incorrect endpoint on atproto_pds service"
153 })),
154 )
155 .into_response();
156 }
157 }
158 if !is_migration {
159 if let Some(verification_methods) =
160 op.get("verificationMethods").and_then(|v| v.as_object())
161 && let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str())
162 && atproto_key != user_did_key
163 {
164 return (
165 StatusCode::BAD_REQUEST,
166 Json(json!({
167 "error": "InvalidRequest",
168 "message": "Incorrect signing key in verificationMethods"
169 })),
170 )
171 .into_response();
172 }
173 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) {
174 let expected_handle = format!("at://{}", user.handle);
175 let first_aka = also_known_as.first().and_then(|v| v.as_str());
176 if first_aka != Some(&expected_handle) {
177 return (
178 StatusCode::BAD_REQUEST,
179 Json(json!({
180 "error": "InvalidRequest",
181 "message": "Incorrect handle in alsoKnownAs"
182 })),
183 )
184 .into_response();
185 }
186 }
187 }
188 let plc_client = PlcClient::new(None);
189 let operation_clone = input.operation.clone();
190 let did_clone = did.clone();
191 let result: Result<(), CircuitBreakerError<PlcError>> =
192 with_circuit_breaker(&state.circuit_breakers.plc_directory, || async {
193 plc_client
194 .send_operation(&did_clone, &operation_clone)
195 .await
196 })
197 .await;
198 match result {
199 Ok(()) => {}
200 Err(CircuitBreakerError::CircuitOpen(e)) => {
201 warn!("PLC directory circuit breaker open: {}", e);
202 return (
203 StatusCode::SERVICE_UNAVAILABLE,
204 Json(json!({
205 "error": "ServiceUnavailable",
206 "message": "PLC directory service temporarily unavailable"
207 })),
208 )
209 .into_response();
210 }
211 Err(CircuitBreakerError::OperationFailed(e)) => {
212 error!("Failed to submit PLC operation: {:?}", e);
213 return (
214 StatusCode::BAD_GATEWAY,
215 Json(json!({
216 "error": "UpstreamError",
217 "message": format!("Failed to submit to PLC directory: {}", e)
218 })),
219 )
220 .into_response();
221 }
222 }
223 match sqlx::query!(
224 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq",
225 did
226 )
227 .fetch_one(&state.db)
228 .await
229 {
230 Ok(row) => {
231 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq))
232 .execute(&state.db)
233 .await
234 {
235 warn!("Failed to notify identity event: {:?}", e);
236 }
237 }
238 Err(e) => {
239 warn!("Failed to sequence identity event: {:?}", e);
240 }
241 }
242 info!("Submitted PLC operation for user {}", did);
243 (StatusCode::OK, Json(json!({}))).into_response()
244}