decentralized and customizable links page on top of atproto ligo.at
atproto link-in-bio python uv
at 1b7c0778f94de667d002f803fdb1fcc64b2efd23 239 lines 7.8 kB view raw
1from aiohttp.client import ClientSession 2from authlib.jose import JsonWebKey, Key 3from flask import Blueprint, current_app, jsonify, redirect, request, session, url_for 4from urllib.parse import urlencode 5 6import json 7 8from .auth import ( 9 delete_auth_request, 10 get_auth_request, 11 save_auth_request, 12 save_auth_session, 13) 14from .db import KV, get_db 15from .atproto import ( 16 is_valid_did, 17 is_valid_handle, 18 pds_endpoint_from_doc, 19 resolve_authserver_from_pds, 20 fetch_authserver_meta, 21 resolve_identity, 22) 23from .atproto.oauth import initial_token_request, send_par_auth_request 24from .atproto.types import OAuthAuthRequest, OAuthSession 25from .security import is_safe_url 26 27oauth = Blueprint("oauth", __name__, url_prefix="/oauth") 28 29 30@oauth.get("/start") 31async def oauth_start(): 32 # Identity 33 username = request.args.get("username") or request.args.get("authserver") 34 if not username: 35 return redirect(url_for("page_login"), 303) 36 37 db = get_db(current_app) 38 pdskv = KV(db, current_app.logger, "authserver_from_pds") 39 40 client = ClientSession() 41 42 if is_valid_handle(username) or is_valid_did(username): 43 login_hint = username 44 kv = KV(db, current_app.logger, "did_from_handle") 45 identity = await resolve_identity(client, username, didkv=kv) 46 if identity is None: 47 return "couldnt resolve identity", 500 48 did, handle, doc = identity 49 pds_url = pds_endpoint_from_doc(doc) 50 if not pds_url: 51 return "pds not found", 404 52 current_app.logger.debug(f"account PDS: {pds_url}") 53 authserver_url = await resolve_authserver_from_pds(client, pds_url, pdskv) 54 if not authserver_url: 55 return "authserver not found", 404 56 57 elif username.startswith("https://") and is_safe_url(username): 58 did, handle, pds_url = None, None, None 59 login_hint = None 60 authserver_url = ( 61 await resolve_authserver_from_pds(client, username, pdskv) or username 62 ) 63 64 else: 65 return "not a valid handle, did or auth server", 400 66 67 current_app.logger.debug(f"Authserver: {authserver_url}") 68 assert is_safe_url(authserver_url) 69 authserver_meta = await fetch_authserver_meta(client, authserver_url) 70 if not authserver_meta: 71 return "no authserver meta", 404 72 73 await client.close() 74 75 # Auth 76 dpop_private_jwk: Key = JsonWebKey.generate_key("EC", "P-256", is_private=True) 77 scope = "atproto transition:generic" 78 79 host = request.host 80 metadata_endpoint = url_for("oauth.oauth_metadata") 81 client_id = f"https://{host}{metadata_endpoint}" 82 callback_endpoint = url_for("oauth.oauth_callback") 83 redirect_uri = f"https://{host}{callback_endpoint}" 84 85 current_app.logger.debug(f"client_id {client_id}") 86 current_app.logger.debug(f"redirect_uri {redirect_uri}") 87 88 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"]) 89 90 pkce_verifier, state, dpop_authserver_nonce, resp = await send_par_auth_request( 91 authserver_url, 92 authserver_meta, 93 login_hint, 94 client_id, 95 redirect_uri, 96 scope, 97 CLIENT_SECRET_JWK, 98 dpop_private_jwk, 99 ) 100 101 if resp.status == 400: 102 current_app.logger.warning("PAR request returned error 400") 103 current_app.logger.warning(await resp.text()) 104 return redirect(url_for("page_login"), 303) 105 _ = resp.raise_for_status() 106 107 respjson: dict[str, str] = await resp.json() 108 par_request_uri: str = respjson["request_uri"] 109 current_app.logger.debug(f"saving oauth_auth_request to DB state={state}") 110 111 oauth_request = OAuthAuthRequest( 112 state, 113 authserver_meta["issuer"], 114 did, 115 handle, 116 pds_url, 117 pkce_verifier, 118 scope, 119 dpop_authserver_nonce, 120 dpop_private_jwk.as_json(is_private=True), 121 ) 122 save_auth_request(session, oauth_request) 123 124 auth_endpoint = authserver_meta["authorization_endpoint"] 125 assert is_safe_url(auth_endpoint) 126 qparam = urlencode({"client_id": client_id, "request_uri": par_request_uri}) 127 return redirect(f"{auth_endpoint}?{qparam}") 128 129 130@oauth.get("/callback") 131async def oauth_callback(): 132 state = request.args["state"] 133 authserver_iss = request.args["iss"] 134 authorization_code = request.args["code"] 135 136 auth_request = get_auth_request(session) 137 if not auth_request: 138 return redirect(url_for("page_login"), 303) 139 140 current_app.logger.debug(f"Deleting auth request for state={state}") 141 delete_auth_request(session) 142 143 assert auth_request.authserver_iss == authserver_iss 144 assert auth_request.state == state 145 146 client = ClientSession() 147 148 app_url = request.url_root.replace("http://", "https://") 149 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"]) 150 tokens, dpop_authserver_nonce = await initial_token_request( 151 client, 152 auth_request, 153 authorization_code, 154 app_url, 155 CLIENT_SECRET_JWK, 156 ) 157 158 row = auth_request 159 160 db = get_db(current_app) 161 didkv = KV(db, current_app.logger, "did_from_handle") 162 authserverkv = KV(db, current_app.logger, "authserver_from_pds") 163 164 if row.did: 165 # If we started with an account identifier, this is simple 166 did, handle, pds_url = row.did, row.handle, row.pds_url 167 assert tokens.sub == did 168 else: 169 did = tokens.sub 170 assert is_valid_did(did) 171 identity = await resolve_identity(client, did, didkv=didkv) 172 if not identity: 173 return "could not resolve identity", 500 174 did, handle, did_doc = identity 175 pds_url = pds_endpoint_from_doc(did_doc) 176 if not pds_url: 177 return "could not resolve pds", 500 178 authserver_url = await resolve_authserver_from_pds( 179 client, 180 pds_url, 181 authserverkv, 182 ) 183 assert authserver_url == authserver_iss 184 185 await client.close() 186 187 assert row.scope == tokens.scope 188 assert pds_url is not None 189 190 current_app.logger.debug("storing user oauth session") 191 oauth_session = OAuthSession( 192 did, 193 handle, 194 pds_url, 195 authserver_iss, 196 tokens.access_token, 197 tokens.refresh_token, 198 dpop_authserver_nonce, 199 None, 200 auth_request.dpop_private_jwk, 201 ) 202 save_auth_session(session, oauth_session) 203 204 return redirect(url_for("page_login")) 205 206 207@oauth.get("/metadata") 208def oauth_metadata(): 209 host = request.host 210 callback_endpoint = url_for("oauth.oauth_callback") 211 metadata_endpoint = url_for("oauth.oauth_metadata") 212 jwks_endpoint = url_for("oauth.oauth_jwks") 213 return jsonify( 214 { 215 "client_id": f"https://{host}{metadata_endpoint}", 216 "grant_types": ["authorization_code", "refresh_token"], 217 "scope": "atproto transition:generic", 218 "response_types": ["code"], 219 "redirect_uris": [ 220 f"https://{host}{callback_endpoint}", 221 ], 222 "dpop_bound_access_tokens": True, 223 "token_endpoint_auth_method": "private_key_jwt", 224 "token_endpoint_auth_signing_alg": "ES256", 225 "jwks_uri": f"https://{host}{jwks_endpoint}", 226 # optional 227 "client_name": "ligo.at", 228 "client_uri": f"https://{host}", 229 "logo_uri": f"https://{host}{url_for('static', filename='favicon-48.png')}", 230 "tos_uri": f"https://{host}{url_for('page_terms')}", 231 } 232 ) 233 234 235@oauth.get("/jwks") 236def oauth_jwks(): 237 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"]) 238 CLIENT_PUB_JWK = json.loads(CLIENT_SECRET_JWK.as_json(is_private=False)) 239 return jsonify({"keys": [CLIENT_PUB_JWK]})