this repo has no description
1use crate::api::{ApiError, EmptyResponse};
2use crate::auth::BearerAuthAllowDeactivated;
3use crate::circuit_breaker::with_circuit_breaker;
4use crate::plc::{PlcClient, signing_key_to_did_key, validate_plc_operation};
5use crate::state::AppState;
6use axum::{
7 Json,
8 extract::State,
9 response::{IntoResponse, Response},
10};
11use k256::ecdsa::SigningKey;
12use serde::Deserialize;
13use serde_json::Value;
14use tracing::{error, info, warn};
15
16#[derive(Debug, Deserialize)]
17pub struct SubmitPlcOperationInput {
18 pub operation: Value,
19}
20
21pub async fn submit_plc_operation(
22 State(state): State<AppState>,
23 auth: BearerAuthAllowDeactivated,
24 Json(input): Json<SubmitPlcOperationInput>,
25) -> Response {
26 let auth_user = auth.0;
27 if let Err(e) = crate::auth::scope_check::check_identity_scope(
28 auth_user.is_oauth,
29 auth_user.scope.as_deref(),
30 crate::oauth::scopes::IdentityAttr::Wildcard,
31 ) {
32 return e;
33 }
34 let did = &auth_user.did;
35 if did.starts_with("did:web:") {
36 return ApiError::InvalidRequest(
37 "PLC operations are only valid for did:plc identities".into(),
38 )
39 .into_response();
40 }
41 if let Err(e) = validate_plc_operation(&input.operation) {
42 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response();
43 }
44 let op = &input.operation;
45 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
46 let public_url = format!("https://{}", hostname);
47 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did)
48 .fetch_optional(&state.db)
49 .await
50 {
51 Ok(Some(row)) => row,
52 _ => {
53 return ApiError::AccountNotFound.into_response();
54 }
55 };
56 let key_row = match sqlx::query!(
57 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
58 user.id
59 )
60 .fetch_optional(&state.db)
61 .await
62 {
63 Ok(Some(row)) => row,
64 _ => {
65 return ApiError::InternalError(Some("User signing key not found".into()))
66 .into_response();
67 }
68 };
69 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
70 {
71 Ok(k) => k,
72 Err(e) => {
73 error!("Failed to decrypt user key: {}", e);
74 return ApiError::InternalError(None).into_response();
75 }
76 };
77 let signing_key = match SigningKey::from_slice(&key_bytes) {
78 Ok(k) => k,
79 Err(e) => {
80 error!("Failed to create signing key: {:?}", e);
81 return ApiError::InternalError(None).into_response();
82 }
83 };
84 let user_did_key = signing_key_to_did_key(&signing_key);
85 let server_rotation_key =
86 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone());
87 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) {
88 let has_server_key = rotation_keys
89 .iter()
90 .any(|k| k.as_str() == Some(&server_rotation_key));
91 if !has_server_key {
92 return ApiError::InvalidRequest(
93 "Rotation keys do not include server's rotation key".into(),
94 )
95 .into_response();
96 }
97 }
98 if let Some(services) = op.get("services").and_then(|v| v.as_object())
99 && let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object())
100 {
101 let service_type = pds.get("type").and_then(|v| v.as_str());
102 let endpoint = pds.get("endpoint").and_then(|v| v.as_str());
103 if service_type != Some("AtprotoPersonalDataServer") {
104 return ApiError::InvalidRequest("Incorrect type on atproto_pds service".into())
105 .into_response();
106 }
107 if endpoint != Some(&public_url) {
108 return ApiError::InvalidRequest("Incorrect endpoint on atproto_pds service".into())
109 .into_response();
110 }
111 }
112 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object())
113 && let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str())
114 && atproto_key != user_did_key
115 {
116 return ApiError::InvalidRequest("Incorrect signing key in verificationMethods".into())
117 .into_response();
118 }
119 if let Some(also_known_as) = (!user.handle.is_empty())
120 .then(|| op.get("alsoKnownAs").and_then(|v| v.as_array()))
121 .flatten()
122 {
123 let expected_handle = format!("at://{}", user.handle);
124 let first_aka = also_known_as.first().and_then(|v| v.as_str());
125 if first_aka != Some(&expected_handle) {
126 return ApiError::InvalidRequest("Incorrect handle in alsoKnownAs".into())
127 .into_response();
128 }
129 }
130 let plc_client = PlcClient::with_cache(None, Some(state.cache.clone()));
131 let operation_clone = input.operation.clone();
132 let did_clone = did.clone();
133 if let Err(e) = with_circuit_breaker(&state.circuit_breakers.plc_directory, || async {
134 plc_client
135 .send_operation(&did_clone, &operation_clone)
136 .await
137 })
138 .await
139 {
140 return ApiError::from(e).into_response();
141 }
142 match sqlx::query!(
143 "INSERT INTO repo_seq (did, event_type, handle) VALUES ($1, 'identity', $2) RETURNING seq",
144 did,
145 user.handle
146 )
147 .fetch_one(&state.db)
148 .await
149 {
150 Ok(row) => {
151 if let Err(e) = sqlx::query(&format!("NOTIFY repo_updates, '{}'", row.seq))
152 .execute(&state.db)
153 .await
154 {
155 warn!("Failed to notify identity event: {:?}", e);
156 }
157 }
158 Err(e) => {
159 warn!("Failed to sequence identity event: {:?}", e);
160 }
161 }
162 let _ = state.cache.delete(&format!("handle:{}", user.handle)).await;
163 if state.did_resolver.refresh_did(did).await.is_none() {
164 warn!(did = %did, "Failed to refresh DID cache after PLC update");
165 }
166 info!(did = %did, "PLC operation submitted successfully");
167 EmptyResponse::ok().into_response()
168}