this repo has no description
1use crate::api::ApiError;
2use crate::plc::{signing_key_to_did_key, validate_plc_operation, PlcClient};
3use crate::state::AppState;
4use axum::{
5 extract::State,
6 http::StatusCode,
7 response::{IntoResponse, Response},
8 Json,
9};
10use k256::ecdsa::SigningKey;
11use serde::Deserialize;
12use serde_json::{json, Value};
13use tracing::{error, info, warn};
14
15#[derive(Debug, Deserialize)]
16pub struct SubmitPlcOperationInput {
17 pub operation: Value,
18}
19
20pub async fn submit_plc_operation(
21 State(state): State<AppState>,
22 headers: axum::http::HeaderMap,
23 Json(input): Json<SubmitPlcOperationInput>,
24) -> Response {
25 let bearer = match crate::auth::extract_bearer_token_from_header(
26 headers.get("Authorization").and_then(|h| h.to_str().ok()),
27 ) {
28 Some(t) => t,
29 None => return ApiError::AuthenticationRequired.into_response(),
30 };
31
32 let auth_user = match crate::auth::validate_bearer_token(&state.db, &bearer).await {
33 Ok(user) => user,
34 Err(e) => return ApiError::from(e).into_response(),
35 };
36
37 let did = &auth_user.did;
38
39 if let Err(e) = validate_plc_operation(&input.operation) {
40 return ApiError::InvalidRequest(format!("Invalid operation: {}", e)).into_response();
41 }
42
43 let op = &input.operation;
44 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
45 let public_url = format!("https://{}", hostname);
46
47 let user = match sqlx::query!("SELECT id, handle FROM users WHERE did = $1", did)
48 .fetch_optional(&state.db)
49 .await
50 {
51 Ok(Some(row)) => row,
52 _ => {
53 return (
54 StatusCode::NOT_FOUND,
55 Json(json!({"error": "AccountNotFound"})),
56 )
57 .into_response();
58 }
59 };
60
61 let key_row = match sqlx::query!(
62 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
63 user.id
64 )
65 .fetch_optional(&state.db)
66 .await
67 {
68 Ok(Some(row)) => row,
69 _ => {
70 return (
71 StatusCode::INTERNAL_SERVER_ERROR,
72 Json(json!({"error": "InternalError", "message": "User signing key not found"})),
73 )
74 .into_response();
75 }
76 };
77
78 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
79 {
80 Ok(k) => k,
81 Err(e) => {
82 error!("Failed to decrypt user key: {}", e);
83 return (
84 StatusCode::INTERNAL_SERVER_ERROR,
85 Json(json!({"error": "InternalError"})),
86 )
87 .into_response();
88 }
89 };
90
91 let signing_key = match SigningKey::from_slice(&key_bytes) {
92 Ok(k) => k,
93 Err(e) => {
94 error!("Failed to create signing key: {:?}", e);
95 return (
96 StatusCode::INTERNAL_SERVER_ERROR,
97 Json(json!({"error": "InternalError"})),
98 )
99 .into_response();
100 }
101 };
102
103 let user_did_key = signing_key_to_did_key(&signing_key);
104
105 if let Some(rotation_keys) = op.get("rotationKeys").and_then(|v| v.as_array()) {
106 let server_rotation_key =
107 std::env::var("PLC_ROTATION_KEY").unwrap_or_else(|_| user_did_key.clone());
108
109 let has_server_key = rotation_keys
110 .iter()
111 .any(|k| k.as_str() == Some(&server_rotation_key));
112
113 if !has_server_key {
114 return (
115 StatusCode::BAD_REQUEST,
116 Json(json!({
117 "error": "InvalidRequest",
118 "message": "Rotation keys do not include server's rotation key"
119 })),
120 )
121 .into_response();
122 }
123 }
124
125 if let Some(services) = op.get("services").and_then(|v| v.as_object()) {
126 if let Some(pds) = services.get("atproto_pds").and_then(|v| v.as_object()) {
127 let service_type = pds.get("type").and_then(|v| v.as_str());
128 let endpoint = pds.get("endpoint").and_then(|v| v.as_str());
129
130 if service_type != Some("AtprotoPersonalDataServer") {
131 return (
132 StatusCode::BAD_REQUEST,
133 Json(json!({
134 "error": "InvalidRequest",
135 "message": "Incorrect type on atproto_pds service"
136 })),
137 )
138 .into_response();
139 }
140
141 if endpoint != Some(&public_url) {
142 return (
143 StatusCode::BAD_REQUEST,
144 Json(json!({
145 "error": "InvalidRequest",
146 "message": "Incorrect endpoint on atproto_pds service"
147 })),
148 )
149 .into_response();
150 }
151 }
152 }
153
154 if let Some(verification_methods) = op.get("verificationMethods").and_then(|v| v.as_object()) {
155 if let Some(atproto_key) = verification_methods.get("atproto").and_then(|v| v.as_str()) {
156 if atproto_key != user_did_key {
157 return (
158 StatusCode::BAD_REQUEST,
159 Json(json!({
160 "error": "InvalidRequest",
161 "message": "Incorrect signing key in verificationMethods"
162 })),
163 )
164 .into_response();
165 }
166 }
167 }
168
169 if let Some(also_known_as) = op.get("alsoKnownAs").and_then(|v| v.as_array()) {
170 let expected_handle = format!("at://{}", user.handle);
171 let first_aka = also_known_as.first().and_then(|v| v.as_str());
172
173 if first_aka != Some(&expected_handle) {
174 return (
175 StatusCode::BAD_REQUEST,
176 Json(json!({
177 "error": "InvalidRequest",
178 "message": "Incorrect handle in alsoKnownAs"
179 })),
180 )
181 .into_response();
182 }
183 }
184
185 let plc_client = PlcClient::new(None);
186 if let Err(e) = plc_client.send_operation(did, &input.operation).await {
187 error!("Failed to submit PLC operation: {:?}", e);
188 return (
189 StatusCode::BAD_GATEWAY,
190 Json(json!({
191 "error": "UpstreamError",
192 "message": format!("Failed to submit to PLC directory: {}", e)
193 })),
194 )
195 .into_response();
196 }
197
198 if let Err(e) = sqlx::query!(
199 "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity')",
200 did
201 )
202 .execute(&state.db)
203 .await
204 {
205 warn!("Failed to sequence identity event: {:?}", e);
206 }
207
208 info!("Submitted PLC operation for user {}", did);
209
210 (StatusCode::OK, Json(json!({}))).into_response()
211}