decentralized and customizable links page on top of atproto ligo.at
atproto link-in-bio python uv
at main 258 lines 8.3 kB view raw
1import json 2from datetime import datetime, timedelta, timezone 3from urllib.parse import urlencode 4 5from aiohttp.client import ClientSession 6from authlib.jose import JsonWebKey, Key 7from flask import Blueprint, current_app, jsonify, redirect, request, session, url_for 8 9from src.atproto import ( 10 fetch_authserver_meta, 11 is_valid_did, 12 is_valid_handle, 13 resolve_authserver_from_pds, 14 resolve_identity, 15) 16from src.atproto.oauth import initial_token_request, send_par_auth_request 17from src.atproto.types import ( 18 DID, 19 AuthserverUrl, 20 Handle, 21 OAuthAuthRequest, 22 OAuthSession, 23 PdsUrl, 24) 25from src.auth import ( 26 delete_auth_request, 27 get_auth_request, 28 save_auth_request, 29 save_auth_session, 30) 31from src.db import KV, get_db 32from src.security import hardened_http, is_safe_url 33 34oauth = Blueprint("oauth", __name__, url_prefix="/oauth") 35 36OAUTH_SCOPE = "atproto include:at.ligo.authFull" 37 38 39@oauth.get("/start") 40async def oauth_start(): 41 # Identity 42 username = request.args.get("username_or_authserver") 43 if not username: 44 return redirect(url_for("page_login"), 303) 45 46 db = get_db(current_app) 47 didkv = KV[Handle, DID](db, current_app.logger, "did_from_handle") 48 pdskv = KV[DID, PdsUrl](db, current_app.logger, "pds_from_did") 49 authserverkv = KV[PdsUrl, AuthserverUrl]( 50 db, 51 current_app.logger, 52 "authserver_from_pds", 53 ) 54 55 client = ClientSession() 56 57 if is_valid_handle(username) or is_valid_did(username): 58 login_hint = username 59 identity = await resolve_identity(client, username, didkv=didkv, pdskv=pdskv) 60 if identity is None: 61 return "couldnt resolve identity", 500 62 did, handle, pds_url = identity 63 current_app.logger.debug(f"account PDS: {pds_url}") 64 authserver_url = await resolve_authserver_from_pds( 65 client, pds_url, authserverkv 66 ) 67 if not authserver_url: 68 return "authserver not found", 404 69 70 elif username.startswith("https://") and is_safe_url(username): 71 did, handle, pds_url = None, None, None 72 login_hint = None 73 authserver_url = ( 74 await resolve_authserver_from_pds(client, PdsUrl(username), authserverkv) 75 or username 76 ) 77 78 else: 79 return "not a valid handle, did or auth server", 400 80 81 current_app.logger.debug(f"Authserver: {authserver_url}") 82 assert is_safe_url(authserver_url) 83 authserver_meta = await fetch_authserver_meta(client, authserver_url) 84 if not authserver_meta: 85 return "no authserver meta", 404 86 87 await client.close() 88 89 # Auth 90 dpop_private_jwk: Key = JsonWebKey.generate_key("EC", "P-256", is_private=True) 91 92 host = request.host 93 metadata_endpoint = url_for("oauth.oauth_metadata") 94 client_id = f"https://{host}{metadata_endpoint}" 95 callback_endpoint = url_for("oauth.oauth_callback") 96 redirect_uri = f"https://{host}{callback_endpoint}" 97 98 current_app.logger.debug(f"client_id {client_id}") 99 current_app.logger.debug(f"redirect_uri {redirect_uri}") 100 101 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"]) 102 103 client = hardened_http.get_session() 104 pkce_verifier, state, dpop_authserver_nonce, resp = await send_par_auth_request( 105 client, 106 authserver_url, 107 authserver_meta, 108 login_hint, 109 client_id, 110 redirect_uri, 111 OAUTH_SCOPE, 112 CLIENT_SECRET_JWK, 113 dpop_private_jwk, 114 ) 115 116 if resp.status == 400: 117 current_app.logger.warning("PAR request returned error 400") 118 current_app.logger.warning(await resp.text()) 119 return redirect(url_for("page_login"), 303) 120 _ = resp.raise_for_status() 121 122 respjson: dict[str, str] = await resp.json() 123 par_request_uri: str = respjson["request_uri"] 124 await client.close() 125 126 current_app.logger.debug(f"saving oauth_auth_request to DB state={state}") 127 oauth_request = OAuthAuthRequest( 128 state, 129 authserver_meta["issuer"], 130 did, 131 handle, 132 pds_url, 133 pkce_verifier, 134 OAUTH_SCOPE, 135 dpop_authserver_nonce, 136 dpop_private_jwk.as_json(is_private=True), 137 ) 138 save_auth_request(session, oauth_request) 139 140 auth_endpoint = authserver_meta["authorization_endpoint"] 141 assert is_safe_url(auth_endpoint) 142 qparam = urlencode({"client_id": client_id, "request_uri": par_request_uri}) 143 return redirect(f"{auth_endpoint}?{qparam}") 144 145 146@oauth.get("/callback") 147async def oauth_callback(): 148 state = request.args["state"] 149 authserver_iss = request.args["iss"] 150 if "code" not in request.args: 151 message = f"{request.args['error']}: {request.args['error_description']}" 152 current_app.logger.debug(message) 153 return redirect(url_for("page_login")) 154 authorization_code = request.args["code"] 155 156 auth_request = get_auth_request(session) 157 if not auth_request: 158 return redirect(url_for("page_login"), 303) 159 160 current_app.logger.debug(f"Deleting auth request for state={state}") 161 delete_auth_request(session) 162 163 assert auth_request.authserver_iss == authserver_iss 164 assert auth_request.state == state 165 166 client = ClientSession() 167 168 app_url = request.url_root.replace("http://", "https://") 169 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"]) 170 tokens, dpop_authserver_nonce = await initial_token_request( 171 client, 172 auth_request, 173 authorization_code, 174 app_url, 175 CLIENT_SECRET_JWK, 176 ) 177 178 row = auth_request 179 180 db = get_db(current_app) 181 didkv = KV(db, current_app.logger, "did_from_handle") 182 authserverkv = KV(db, current_app.logger, "authserver_from_pds") 183 184 if row.did: 185 # If we started with an account identifier, this is simple 186 did, handle, pds_url = row.did, row.handle, row.pds_url 187 assert tokens.sub == did 188 else: 189 did = tokens.sub 190 assert is_valid_did(did) 191 identity = await resolve_identity(client, did, didkv=didkv) 192 if not identity: 193 return "could not resolve identity", 500 194 did, handle, pds_url = identity 195 authserver_url = await resolve_authserver_from_pds( 196 client, 197 pds_url, 198 authserverkv, 199 ) 200 assert authserver_url == authserver_iss 201 202 await client.close() 203 204 assert pds_url is not None 205 206 current_app.logger.debug("storing user oauth session") 207 now = datetime.now(timezone.utc) 208 expires_at = now + timedelta(seconds=tokens.expires_in or 300) 209 oauth_session = OAuthSession( 210 did, 211 handle, 212 pds_url, 213 authserver_iss, 214 tokens.access_token, 215 tokens.refresh_token, 216 int(expires_at.timestamp()), 217 dpop_authserver_nonce, 218 None, 219 auth_request.dpop_private_jwk, 220 ) 221 save_auth_session(session, oauth_session) 222 223 return redirect(url_for("page_login")) 224 225 226@oauth.get("/metadata") 227def oauth_metadata(): 228 host = request.host 229 callback_endpoint = url_for("oauth.oauth_callback") 230 metadata_endpoint = url_for("oauth.oauth_metadata") 231 jwks_endpoint = url_for("oauth.oauth_jwks") 232 return jsonify( 233 { 234 "client_id": f"https://{host}{metadata_endpoint}", 235 "grant_types": ["authorization_code", "refresh_token"], 236 "scope": OAUTH_SCOPE, 237 "response_types": ["code"], 238 "redirect_uris": [ 239 f"https://{host}{callback_endpoint}", 240 ], 241 "dpop_bound_access_tokens": True, 242 "token_endpoint_auth_method": "private_key_jwt", 243 "token_endpoint_auth_signing_alg": "ES256", 244 "jwks_uri": f"https://{host}{jwks_endpoint}", 245 # optional 246 "client_name": "ligo.at", 247 "client_uri": f"https://{host}", 248 "logo_uri": f"https://{host}{url_for('static', filename='favicon-48.png')}", 249 "tos_uri": f"https://{host}{url_for('page_terms')}", 250 } 251 ) 252 253 254@oauth.get("/jwks") 255def oauth_jwks(): 256 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"]) 257 CLIENT_PUB_JWK = json.loads(CLIENT_SECRET_JWK.as_json(is_private=False)) 258 return jsonify({"keys": [CLIENT_PUB_JWK]})