this repo has no description
1use crate::api::{ApiError, EmptyResponse};
2use crate::circuit_breaker::with_circuit_breaker;
3use crate::plc::{PlcClient, signing_key_to_did_key, validate_plc_operation};
4use crate::state::AppState;
5use axum::{
6 Json,
7 extract::State,
8 response::{IntoResponse, Response},
9};
10use k256::ecdsa::SigningKey;
11use serde::Deserialize;
12use serde_json::Value;
13use tracing::{error, info, warn};
14
15#[derive(Debug, Deserialize)]
16pub struct SubmitPlcOperationInput {
17 pub operation: Value,
18}
19
20pub async fn submit_plc_operation(
21 State(state): State<AppState>,
22 headers: axum::http::HeaderMap,
23 Json(input): Json<SubmitPlcOperationInput>,
24) -> Response {
25 let bearer = match crate::auth::extract_bearer_token_from_header(
26 headers.get("Authorization").and_then(|h| h.to_str().ok()),
27 ) {
28 Some(t) => t,
29 None => {
30 return ApiError::AuthenticationRequired.into_response();
31 }
32 };
33 let auth_user =
34 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &bearer).await {
35 Ok(user) => user,
36 Err(e) => {
37 return ApiError::from(e).into_response();
38 }
39 };
40 if let Err(e) = crate::auth::scope_check::check_identity_scope(
41 auth_user.is_oauth,
42 auth_user.scope.as_deref(),
43 crate::oauth::scopes::IdentityAttr::Wildcard,
44 ) {
45 return e;
46 }
47 let did = &auth_user.did;
48 if did.starts_with("did:web:") {
49 return ApiError::InvalidRequest(
50 "PLC operations are only valid for did:plc identities".into(),
51 )
52 .into_response();
53 }
54 if let Err(e) = validate_plc_operation(&input.operation) {
55 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response();
56 }
57 let op = &input.operation;
58 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
59 let public_url = format!("https://{}", hostname);
60 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did)
61 .fetch_optional(&state.db)
62 .await
63 {
64 Ok(Some(row)) => row,
65 _ => {
66 return ApiError::AccountNotFound.into_response();
67 }
68 };
69 let key_row = match sqlx::query!(
70 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
71 user.id
72 )
73 .fetch_optional(&state.db)
74 .await
75 {
76 Ok(Some(row)) => row,
77 _ => {
78 return ApiError::InternalError(Some("User signing key not found".into()))
79 .into_response();
80 }
81 };
82 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
83 {
84 Ok(k) => k,
85 Err(e) => {
86 error!("Failed to decrypt user key: {}", e);
87 return ApiError::InternalError(None).into_response();
88 }
89 };
90 let signing_key = match SigningKey::from_slice(&key_bytes) {
91 Ok(k) => k,
92 Err(e) => {
93 error!("Failed to create signing key: {:?}", e);
94 return ApiError::InternalError(None).into_response();
95 }
96 };
97 let user_did_key = signing_key_to_did_key(&signing_key);
98 let server_rotation_key =
99 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone());
100 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) {
101 let has_server_key = rotation_keys
102 .iter()
103 .any(|k| k.as_str() == Some(&server_rotation_key));
104 if !has_server_key {
105 return ApiError::InvalidRequest(
106 "Rotation keys do not include server's rotation key".into(),
107 )
108 .into_response();
109 }
110 }
111 if let Some(services) = op.get("services").and_then(|v| v.as_object())
112 && let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object())
113 {
114 let service_type = pds.get("type").and_then(|v| v.as_str());
115 let endpoint = pds.get("endpoint").and_then(|v| v.as_str());
116 if service_type != Some("AtprotoPersonalDataServer") {
117 return ApiError::InvalidRequest("Incorrect type on atproto_pds service".into())
118 .into_response();
119 }
120 if endpoint != Some(&public_url) {
121 return ApiError::InvalidRequest("Incorrect endpoint on atproto_pds service".into())
122 .into_response();
123 }
124 }
125 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object())
126 && let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str())
127 && atproto_key != user_did_key
128 {
129 return ApiError::InvalidRequest("Incorrect signing key in verificationMethods".into())
130 .into_response();
131 }
132 if let Some(also_known_as) = (!user.handle.is_empty())
133 .then(|| op.get("alsoKnownAs").and_then(|v| v.as_array()))
134 .flatten()
135 {
136 let expected_handle = format!("at://{}", user.handle);
137 let first_aka = also_known_as.first().and_then(|v| v.as_str());
138 if first_aka != Some(&expected_handle) {
139 return ApiError::InvalidRequest("Incorrect handle in alsoKnownAs".into())
140 .into_response();
141 }
142 }
143 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone()));
144 let operation_clone = input.operation.clone();
145 let did_clone = did.clone();
146 if let Err(e) = with_circuit_breaker(&state.circuit_breakers.plc_directory, || async {
147 plc_client
148 .send_operation(&did_clone, &operation_clone)
149 .await
150 })
151 .await
152 {
153 return ApiError::from(e).into_response();
154 }
155 match sqlx::query!(
156 "INSERT INTO repo_seq (did, event_type, handle) VALUES ($1, 'identity', $2) RETURNING seq",
157 did,
158 user.handle
159 )
160 .fetch_one(&state.db)
161 .await
162 {
163 Ok(row) => {
164 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq))
165 .execute(&state.db)
166 .await
167 {
168 warn!("Failed to notify identity event: {:?}", e);
169 }
170 }
171 Err(e) => {
172 warn!("Failed to sequence identity event: {:?}", e);
173 }
174 }
175 let _ = state.cache.delete(&format!("handle:{}", user.handle)).await;
176 if state.did_resolver.refresh_did(did).await.is_none() {
177 warn!(did = %did, "Failed to refresh DID cache after PLC update");
178 }
179 info!(did = %did, "PLC operation submitted successfully");
180 EmptyResponse::ok().into_response()
181}