decentralized and customizable links page on top of atproto
ligo.at
atproto
link-in-bio
python
uv
1from aiohttp.client import ClientSession
2from authlib.jose import JsonWebKey, Key
3from flask import Blueprint, current_app, jsonify, redirect, request, session, url_for
4from urllib.parse import urlencode
5
6import json
7
8from .auth import (
9 delete_auth_request,
10 get_auth_request,
11 save_auth_request,
12 save_auth_session,
13)
14from .db import KV, get_db
15from .atproto import (
16 is_valid_did,
17 is_valid_handle,
18 pds_endpoint_from_doc,
19 resolve_authserver_from_pds,
20 fetch_authserver_meta,
21 resolve_identity,
22)
23from .atproto.oauth import initial_token_request, send_par_auth_request
24from .atproto.types import OAuthAuthRequest, OAuthSession
25from .security import is_safe_url
26
27oauth = Blueprint("oauth", __name__, url_prefix="/oauth")
28
29
30@oauth.get("/start")
31async def oauth_start():
32 # Identity
33 username = request.args.get("username") or request.args.get("authserver")
34 if not username:
35 return redirect(url_for("page_login"), 303)
36
37 db = get_db(current_app)
38 pdskv = KV(db, current_app.logger, "authserver_from_pds")
39
40 client = ClientSession()
41
42 if is_valid_handle(username) or is_valid_did(username):
43 login_hint = username
44 kv = KV(db, current_app.logger, "did_from_handle")
45 identity = await resolve_identity(client, username, didkv=kv)
46 if identity is None:
47 return "couldnt resolve identity", 500
48 did, handle, doc = identity
49 pds_url = pds_endpoint_from_doc(doc)
50 if not pds_url:
51 return "pds not found", 404
52 current_app.logger.debug(f"account PDS: {pds_url}")
53 authserver_url = await resolve_authserver_from_pds(client, pds_url, pdskv)
54 if not authserver_url:
55 return "authserver not found", 404
56
57 elif username.startswith("https://") and is_safe_url(username):
58 did, handle, pds_url = None, None, None
59 login_hint = None
60 authserver_url = (
61 await resolve_authserver_from_pds(client, username, pdskv) or username
62 )
63
64 else:
65 return "not a valid handle, did or auth server", 400
66
67 current_app.logger.debug(f"Authserver: {authserver_url}")
68 assert is_safe_url(authserver_url)
69 authserver_meta = await fetch_authserver_meta(client, authserver_url)
70 if not authserver_meta:
71 return "no authserver meta", 404
72
73 await client.close()
74
75 # Auth
76 dpop_private_jwk: Key = JsonWebKey.generate_key("EC", "P-256", is_private=True)
77 scope = "atproto transition:generic"
78
79 host = request.host
80 metadata_endpoint = url_for("oauth.oauth_metadata")
81 client_id = f"https://{host}{metadata_endpoint}"
82 callback_endpoint = url_for("oauth.oauth_callback")
83 redirect_uri = f"https://{host}{callback_endpoint}"
84
85 current_app.logger.debug(f"client_id {client_id}")
86 current_app.logger.debug(f"redirect_uri {redirect_uri}")
87
88 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
89
90 pkce_verifier, state, dpop_authserver_nonce, resp = await send_par_auth_request(
91 authserver_url,
92 authserver_meta,
93 login_hint,
94 client_id,
95 redirect_uri,
96 scope,
97 CLIENT_SECRET_JWK,
98 dpop_private_jwk,
99 )
100
101 if resp.status == 400:
102 current_app.logger.warning("PAR request returned error 400")
103 current_app.logger.warning(await resp.text())
104 return redirect(url_for("page_login"), 303)
105 _ = resp.raise_for_status()
106
107 respjson: dict[str, str] = await resp.json()
108 par_request_uri: str = respjson["request_uri"]
109 current_app.logger.debug(f"saving oauth_auth_request to DB state={state}")
110
111 oauth_request = OAuthAuthRequest(
112 state,
113 authserver_meta["issuer"],
114 did,
115 handle,
116 pds_url,
117 pkce_verifier,
118 scope,
119 dpop_authserver_nonce,
120 dpop_private_jwk.as_json(is_private=True),
121 )
122 save_auth_request(session, oauth_request)
123
124 auth_endpoint = authserver_meta["authorization_endpoint"]
125 assert is_safe_url(auth_endpoint)
126 qparam = urlencode({"client_id": client_id, "request_uri": par_request_uri})
127 return redirect(f"{auth_endpoint}?{qparam}")
128
129
130@oauth.get("/callback")
131async def oauth_callback():
132 state = request.args["state"]
133 authserver_iss = request.args["iss"]
134 authorization_code = request.args["code"]
135
136 auth_request = get_auth_request(session)
137 if not auth_request:
138 return redirect(url_for("page_login"), 303)
139
140 current_app.logger.debug(f"Deleting auth request for state={state}")
141 delete_auth_request(session)
142
143 assert auth_request.authserver_iss == authserver_iss
144 assert auth_request.state == state
145
146 client = ClientSession()
147
148 app_url = request.url_root.replace("http://", "https://")
149 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
150 tokens, dpop_authserver_nonce = await initial_token_request(
151 client,
152 auth_request,
153 authorization_code,
154 app_url,
155 CLIENT_SECRET_JWK,
156 )
157
158 row = auth_request
159
160 db = get_db(current_app)
161 didkv = KV(db, current_app.logger, "did_from_handle")
162 authserverkv = KV(db, current_app.logger, "authserver_from_pds")
163
164 if row.did:
165 # If we started with an account identifier, this is simple
166 did, handle, pds_url = row.did, row.handle, row.pds_url
167 assert tokens.sub == did
168 else:
169 did = tokens.sub
170 assert is_valid_did(did)
171 identity = await resolve_identity(client, did, didkv=didkv)
172 if not identity:
173 return "could not resolve identity", 500
174 did, handle, did_doc = identity
175 pds_url = pds_endpoint_from_doc(did_doc)
176 if not pds_url:
177 return "could not resolve pds", 500
178 authserver_url = await resolve_authserver_from_pds(
179 client,
180 pds_url,
181 authserverkv,
182 )
183 assert authserver_url == authserver_iss
184
185 await client.close()
186
187 assert row.scope == tokens.scope
188 assert pds_url is not None
189
190 current_app.logger.debug("storing user oauth session")
191 oauth_session = OAuthSession(
192 did,
193 handle,
194 pds_url,
195 authserver_iss,
196 tokens.access_token,
197 tokens.refresh_token,
198 dpop_authserver_nonce,
199 None,
200 auth_request.dpop_private_jwk,
201 )
202 save_auth_session(session, oauth_session)
203
204 return redirect(url_for("page_login"))
205
206
207@oauth.get("/metadata")
208def oauth_metadata():
209 host = request.host
210 callback_endpoint = url_for("oauth.oauth_callback")
211 metadata_endpoint = url_for("oauth.oauth_metadata")
212 jwks_endpoint = url_for("oauth.oauth_jwks")
213 return jsonify(
214 {
215 "client_id": f"https://{host}{metadata_endpoint}",
216 "grant_types": ["authorization_code", "refresh_token"],
217 "scope": "atproto transition:generic",
218 "response_types": ["code"],
219 "redirect_uris": [
220 f"https://{host}{callback_endpoint}",
221 ],
222 "dpop_bound_access_tokens": True,
223 "token_endpoint_auth_method": "private_key_jwt",
224 "token_endpoint_auth_signing_alg": "ES256",
225 "jwks_uri": f"https://{host}{jwks_endpoint}",
226 # optional
227 "client_name": "ligo.at",
228 "client_uri": f"https://{host}",
229 "logo_uri": f"https://{host}{url_for('static', filename='favicon-48.png')}",
230 "tos_uri": f"https://{host}{url_for('page_terms')}",
231 }
232 )
233
234
235@oauth.get("/jwks")
236def oauth_jwks():
237 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
238 CLIENT_PUB_JWK = json.loads(CLIENT_SECRET_JWK.as_json(is_private=False))
239 return jsonify({"keys": [CLIENT_PUB_JWK]})