decentralized and customizable links page on top of atproto
ligo.at
atproto
link-in-bio
python
uv
1import json
2from datetime import datetime, timedelta, timezone
3from urllib.parse import urlencode
4
5from aiohttp.client import ClientSession
6from authlib.jose import JsonWebKey, Key
7from flask import Blueprint, current_app, jsonify, redirect, request, session, url_for
8
9from src.atproto import (
10 fetch_authserver_meta,
11 is_valid_did,
12 is_valid_handle,
13 resolve_authserver_from_pds,
14 resolve_identity,
15)
16from src.atproto.oauth import initial_token_request, send_par_auth_request
17from src.atproto.types import (
18 DID,
19 AuthserverUrl,
20 Handle,
21 OAuthAuthRequest,
22 OAuthSession,
23 PdsUrl,
24)
25from src.auth import (
26 delete_auth_request,
27 get_auth_request,
28 save_auth_request,
29 save_auth_session,
30)
31from src.db import KV, get_db
32from src.security import hardened_http, is_safe_url
33
34oauth = Blueprint("oauth", __name__, url_prefix="/oauth")
35
36OAUTH_SCOPE = "atproto include:at.ligo.authFull"
37
38
39@oauth.get("/start")
40async def oauth_start():
41 # Identity
42 username = request.args.get("username_or_authserver")
43 if not username:
44 return redirect(url_for("page_login"), 303)
45
46 db = get_db(current_app)
47 didkv = KV[Handle, DID](db, current_app.logger, "did_from_handle")
48 pdskv = KV[DID, PdsUrl](db, current_app.logger, "pds_from_did")
49 authserverkv = KV[PdsUrl, AuthserverUrl](
50 db,
51 current_app.logger,
52 "authserver_from_pds",
53 )
54
55 client = ClientSession()
56
57 if is_valid_handle(username) or is_valid_did(username):
58 login_hint = username
59 identity = await resolve_identity(client, username, didkv=didkv, pdskv=pdskv)
60 if identity is None:
61 return "couldnt resolve identity", 500
62 did, handle, pds_url = identity
63 current_app.logger.debug(f"account PDS: {pds_url}")
64 authserver_url = await resolve_authserver_from_pds(
65 client, pds_url, authserverkv
66 )
67 if not authserver_url:
68 return "authserver not found", 404
69
70 elif username.startswith("https://") and is_safe_url(username):
71 did, handle, pds_url = None, None, None
72 login_hint = None
73 authserver_url = (
74 await resolve_authserver_from_pds(client, PdsUrl(username), authserverkv)
75 or username
76 )
77
78 else:
79 return "not a valid handle, did or auth server", 400
80
81 current_app.logger.debug(f"Authserver: {authserver_url}")
82 assert is_safe_url(authserver_url)
83 authserver_meta = await fetch_authserver_meta(client, authserver_url)
84 if not authserver_meta:
85 return "no authserver meta", 404
86
87 await client.close()
88
89 # Auth
90 dpop_private_jwk: Key = JsonWebKey.generate_key("EC", "P-256", is_private=True)
91
92 host = request.host
93 metadata_endpoint = url_for("oauth.oauth_metadata")
94 client_id = f"https://{host}{metadata_endpoint}"
95 callback_endpoint = url_for("oauth.oauth_callback")
96 redirect_uri = f"https://{host}{callback_endpoint}"
97
98 current_app.logger.debug(f"client_id {client_id}")
99 current_app.logger.debug(f"redirect_uri {redirect_uri}")
100
101 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
102
103 client = hardened_http.get_session()
104 pkce_verifier, state, dpop_authserver_nonce, resp = await send_par_auth_request(
105 client,
106 authserver_url,
107 authserver_meta,
108 login_hint,
109 client_id,
110 redirect_uri,
111 OAUTH_SCOPE,
112 CLIENT_SECRET_JWK,
113 dpop_private_jwk,
114 )
115
116 if resp.status == 400:
117 current_app.logger.warning("PAR request returned error 400")
118 current_app.logger.warning(await resp.text())
119 return redirect(url_for("page_login"), 303)
120 _ = resp.raise_for_status()
121
122 respjson: dict[str, str] = await resp.json()
123 par_request_uri: str = respjson["request_uri"]
124 await client.close()
125
126 current_app.logger.debug(f"saving oauth_auth_request to DB state={state}")
127 oauth_request = OAuthAuthRequest(
128 state,
129 authserver_meta["issuer"],
130 did,
131 handle,
132 pds_url,
133 pkce_verifier,
134 OAUTH_SCOPE,
135 dpop_authserver_nonce,
136 dpop_private_jwk.as_json(is_private=True),
137 )
138 save_auth_request(session, oauth_request)
139
140 auth_endpoint = authserver_meta["authorization_endpoint"]
141 assert is_safe_url(auth_endpoint)
142 qparam = urlencode({"client_id": client_id, "request_uri": par_request_uri})
143 return redirect(f"{auth_endpoint}?{qparam}")
144
145
146@oauth.get("/callback")
147async def oauth_callback():
148 state = request.args["state"]
149 authserver_iss = request.args["iss"]
150 if "code" not in request.args:
151 message = f"{request.args['error']}: {request.args['error_description']}"
152 current_app.logger.debug(message)
153 return redirect(url_for("page_login"))
154 authorization_code = request.args["code"]
155
156 auth_request = get_auth_request(session)
157 if not auth_request:
158 return redirect(url_for("page_login"), 303)
159
160 current_app.logger.debug(f"Deleting auth request for state={state}")
161 delete_auth_request(session)
162
163 assert auth_request.authserver_iss == authserver_iss
164 assert auth_request.state == state
165
166 client = ClientSession()
167
168 app_url = request.url_root.replace("http://", "https://")
169 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
170 tokens, dpop_authserver_nonce = await initial_token_request(
171 client,
172 auth_request,
173 authorization_code,
174 app_url,
175 CLIENT_SECRET_JWK,
176 )
177
178 row = auth_request
179
180 db = get_db(current_app)
181 didkv = KV(db, current_app.logger, "did_from_handle")
182 authserverkv = KV(db, current_app.logger, "authserver_from_pds")
183
184 if row.did:
185 # If we started with an account identifier, this is simple
186 did, handle, pds_url = row.did, row.handle, row.pds_url
187 assert tokens.sub == did
188 else:
189 did = tokens.sub
190 assert is_valid_did(did)
191 identity = await resolve_identity(client, did, didkv=didkv)
192 if not identity:
193 return "could not resolve identity", 500
194 did, handle, pds_url = identity
195 authserver_url = await resolve_authserver_from_pds(
196 client,
197 pds_url,
198 authserverkv,
199 )
200 assert authserver_url == authserver_iss
201
202 await client.close()
203
204 assert pds_url is not None
205
206 current_app.logger.debug("storing user oauth session")
207 now = datetime.now(timezone.utc)
208 expires_at = now + timedelta(seconds=tokens.expires_in or 300)
209 oauth_session = OAuthSession(
210 did,
211 handle,
212 pds_url,
213 authserver_iss,
214 tokens.access_token,
215 tokens.refresh_token,
216 int(expires_at.timestamp()),
217 dpop_authserver_nonce,
218 None,
219 auth_request.dpop_private_jwk,
220 )
221 save_auth_session(session, oauth_session)
222
223 return redirect(url_for("page_login"))
224
225
226@oauth.get("/metadata")
227def oauth_metadata():
228 host = request.host
229 callback_endpoint = url_for("oauth.oauth_callback")
230 metadata_endpoint = url_for("oauth.oauth_metadata")
231 jwks_endpoint = url_for("oauth.oauth_jwks")
232 return jsonify(
233 {
234 "client_id": f"https://{host}{metadata_endpoint}",
235 "grant_types": ["authorization_code", "refresh_token"],
236 "scope": OAUTH_SCOPE,
237 "response_types": ["code"],
238 "redirect_uris": [
239 f"https://{host}{callback_endpoint}",
240 ],
241 "dpop_bound_access_tokens": True,
242 "token_endpoint_auth_method": "private_key_jwt",
243 "token_endpoint_auth_signing_alg": "ES256",
244 "jwks_uri": f"https://{host}{jwks_endpoint}",
245 # optional
246 "client_name": "ligo.at",
247 "client_uri": f"https://{host}",
248 "logo_uri": f"https://{host}{url_for('static', filename='favicon-48.png')}",
249 "tos_uri": f"https://{host}{url_for('page_terms')}",
250 }
251 )
252
253
254@oauth.get("/jwks")
255def oauth_jwks():
256 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
257 CLIENT_PUB_JWK = json.loads(CLIENT_SECRET_JWK.as_json(is_private=False))
258 return jsonify({"keys": [CLIENT_PUB_JWK]})