from flask import Flask, render_template_string, request, redirect, session, jsonify, url_for import secrets import hashlib import base64 import urllib.parse import requests import json from datetime import datetime, timedelta, timezone import jwt from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.backends import default_backend import dns.resolver app = Flask(__name__) app.secret_key = secrets.token_hex(32) # In-memory storage identities = {} oauth_requests = {} oauth_sessions = {} # Generate RSA key pair for signing private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) public_key = private_key.public_key() # OAuth client configuration CLIENT_ID = "https://oauth-py.smokesignal.tools/client-metadata.json" REDIRECT_URI = "https://oauth-py.smokesignal.tools/login/callback" def get_jwk(): """Convert public key to JWK format""" public_numbers = public_key.public_numbers() def int_to_base64(n): hex_n = hex(n)[2:] if len(hex_n) % 2: hex_n = '0' + hex_n return base64.urlsafe_b64encode(bytes.fromhex(hex_n)).rstrip(b'=').decode('ascii') return { "kty": "RSA", "use": "sig", "alg": "RS256", "kid": "key1", "n": int_to_base64(public_numbers.n), "e": int_to_base64(public_numbers.e) } def create_dpop_proof(htm, htu, ath=None, nonce=None): """Create a DPoP proof JWT""" jti = secrets.token_urlsafe(16) # Subtract 10 seconds to account for clock skew between client and server iat = int(datetime.now(timezone.utc).timestamp()) - 10 header = { "typ": "dpop+jwt", "alg": "RS256", "jwk": get_jwk() } payload = { "jti": jti, "htm": htm, "htu": htu, "iat": iat } if ath: payload["ath"] = ath if nonce: payload["nonce"] = nonce token = jwt.encode( payload, private_key, algorithm="RS256", headers=header ) return token def generate_pkce_verifier(): """Generate PKCE code verifier""" return base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode('ascii') def generate_pkce_challenge(verifier): """Generate PKCE code challenge from verifier""" digest = hashlib.sha256(verifier.encode('ascii')).digest() return base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii') def resolve_handle_to_did(handle): """Resolve ATProtocol handle to DID using HTTP then DNS""" # Try HTTPS well-known first try: response = requests.get(f"https://{handle}/.well-known/atproto-did", timeout=5) if response.status_code == 200: did = response.text.strip() if did.startswith('did:'): return did except Exception: pass # Try DNS resolution as fallback try: answers = dns.resolver.resolve(f"_atproto.{handle}", 'TXT') for rdata in answers: # TXT records return a tuple of byte strings for txt_string in rdata.strings: txt_value = txt_string.decode('utf-8') if isinstance(txt_string, bytes) else txt_string if txt_value.startswith('did='): did = txt_value.removeprefix('did=') if did.startswith('did:'): return did except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, Exception): pass return None @app.route('/') def home(): """Home page displaying current time""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") html = """ OAuth Masterclass

Welcome to OAuth Masterclass

Current time: {{ time }}

Login

{% if session.get('handle') %}

Logged in as: {{ session['handle'] }}

Go to protected page

{% endif %} """ return render_template_string(html, time=current_time) @app.route('/login', methods=['GET']) def login_get(): """Login page with form""" html = """ Login

Login with ATProtocol

""" return render_template_string(html) @app.route('/login', methods=['POST']) def login_post(): """Initiate ATProtocol OAuth flow""" handle = request.form.get('handle') if not handle: return "Handle required", 400 # Resolve handle to DID if handle.startswith('did:'): did = handle else: did = resolve_handle_to_did(handle) if not did: return "Could not resolve handle", 400 # Get DID document if did.startswith('did:plc:'): plc_url = f"https://plc.directory/{did}" response = requests.get(plc_url) did_doc = response.json() elif did.startswith('did:web:'): # Simplified did:web resolution domain = did.replace('did:web:', '') response = requests.get(f"https://{domain}/.well-known/did.json") did_doc = response.json() else: return "Unsupported DID method", 400 # Find PDS endpoint pds_endpoint = None for service in did_doc.get('service', []): if service.get('type') == 'AtprotoPersonalDataServer': pds_endpoint = service.get('serviceEndpoint') break if not pds_endpoint: return "No PDS found for user", 400 # Get authorization server metadata response = requests.get(f"{pds_endpoint}/.well-known/oauth-authorization-server") auth_metadata = response.json() # Check if PKCE is required pkce_required = 'S256' in auth_metadata.get('code_challenge_methods_supported', []) # Generate state and PKCE state = secrets.token_urlsafe(32) verifier = generate_pkce_verifier() challenge = generate_pkce_challenge(verifier) # Store OAuth request oauth_requests[state] = { 'handle': handle, 'did': did, 'pds_endpoint': pds_endpoint, 'auth_metadata': auth_metadata, 'verifier': verifier, 'created_at': datetime.now(timezone.utc) } # Build authorization URL auth_params = { 'response_type': 'code', 'client_id': CLIENT_ID, 'redirect_uri': REDIRECT_URI, 'state': state, 'scope': 'atproto repo:garden.lexicon.oauth-masterclass.now', } if pkce_required: auth_params['code_challenge'] = challenge auth_params['code_challenge_method'] = 'S256' auth_url = f"{auth_metadata['authorization_endpoint']}?{urllib.parse.urlencode(auth_params)}" return redirect(auth_url) @app.route('/login/callback') def login_callback(): """OAuth callback handler""" code = request.args.get('code') state = request.args.get('state') error = request.args.get('error') if error: return f"OAuth error: {error}", 400 if not code or not state: return "Missing code or state", 400 # Retrieve OAuth request oauth_req = oauth_requests.get(state) if not oauth_req: return "Invalid state", 400 # Exchange code for tokens auth_metadata = oauth_req['auth_metadata'] # Create DPoP proof for token request dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint']) token_data = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': REDIRECT_URI, 'client_id': CLIENT_ID, } if oauth_req.get('verifier'): token_data['code_verifier'] = oauth_req['verifier'] headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'DPoP': dpop_proof } response = requests.post( auth_metadata['token_endpoint'], data=token_data, headers=headers ) # Handle DPoP nonce requirement if response.status_code == 400: error_data = response.json() if error_data.get('error') == 'use_dpop_nonce': # Get nonce from response header dpop_nonce = response.headers.get('DPoP-Nonce') if dpop_nonce: # Retry with nonce dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint'], nonce=dpop_nonce) headers['DPoP'] = dpop_proof response = requests.post( auth_metadata['token_endpoint'], data=token_data, headers=headers ) if response.status_code != 200: return f"Token exchange failed: {response.text}", 400 tokens = response.json() # Store session session_id = secrets.token_urlsafe(32) oauth_sessions[session_id] = { 'handle': oauth_req['handle'], 'did': oauth_req['did'], 'pds_endpoint': oauth_req['pds_endpoint'], 'access_token': tokens['access_token'], 'refresh_token': tokens.get('refresh_token'), 'expires_at': datetime.now(timezone.utc) + timedelta(seconds=tokens.get('expires_in', 3600)), 'auth_metadata': auth_metadata } # Store in Flask session session['session_id'] = session_id session['handle'] = oauth_req['handle'] # Clean up OAuth request del oauth_requests[state] return redirect('/') @app.route('/client-metadata.json') def client_metadata(): """Serve OAuth client metadata""" metadata = { "client_id": CLIENT_ID, "client_name": "OAuth Masterclass App", "client_uri": "https://oauth-py.smokesignal.tools", "redirect_uris": [REDIRECT_URI], "scope": "atproto repo:garden.lexicon.oauth-masterclass.now", "grant_types": ["authorization_code", "refresh_token"], "response_types": ["code"], "token_endpoint_auth_method": "none", "application_type": "web", "dpop_bound_access_tokens": True, "jwks": { "keys": [get_jwk()] } } return jsonify(metadata) def refresh_token_if_needed(session_data): """Refresh access token if expired""" if datetime.now(timezone.utc) >= session_data['expires_at'] - timedelta(minutes=5): if not session_data.get('refresh_token'): return False auth_metadata = session_data['auth_metadata'] dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint']) token_data = { 'grant_type': 'refresh_token', 'refresh_token': session_data['refresh_token'], 'client_id': CLIENT_ID, } headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'DPoP': dpop_proof } response = requests.post( auth_metadata['token_endpoint'], data=token_data, headers=headers ) # Handle DPoP nonce requirement if response.status_code == 400: error_data = response.json() if error_data.get('error') == 'use_dpop_nonce': # Get nonce from response header dpop_nonce = response.headers.get('DPoP-Nonce') if dpop_nonce: # Retry with nonce dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint'], nonce=dpop_nonce) headers['DPoP'] = dpop_proof response = requests.post( auth_metadata['token_endpoint'], data=token_data, headers=headers ) if response.status_code == 200: tokens = response.json() session_data['access_token'] = tokens['access_token'] if 'refresh_token' in tokens: session_data['refresh_token'] = tokens['refresh_token'] session_data['expires_at'] = datetime.now(timezone.utc) + timedelta(seconds=tokens.get('expires_in', 3600)) return True return False return True @app.route('/now', methods=['GET']) def now_get(): """Protected page for authenticated users only""" session_id = session.get('session_id') if not session_id or session_id not in oauth_sessions: return redirect('/login') session_data = oauth_sessions[session_id] # Refresh token if needed if not refresh_token_if_needed(session_data): session.clear() return redirect('/login') html = """ Now

Protected Page

Hello, {{ handle }}!

Home

""" return render_template_string(html, handle=session_data['handle']) @app.route('/now', methods=['POST']) def now_post(): """Submit ATProtocol record via XRPC""" session_id = session.get('session_id') if not session_id or session_id not in oauth_sessions: return redirect('/login') session_data = oauth_sessions[session_id] message = request.form.get('message') if not message: return "Message required", 400 # Refresh token if needed if not refresh_token_if_needed(session_data): session.clear() return redirect('/login') # Prepare record record = { "$type": "garden.lexicon.oauth-masterclass.now", "now": message, "createdAt": datetime.now(timezone.utc).isoformat() + "Z" } # Prepare XRPC request xrpc_url = f"{session_data['pds_endpoint']}/xrpc/com.atproto.repo.createRecord" # Create DPoP proof with access token hash access_token_hash = base64.urlsafe_b64encode( hashlib.sha256(session_data['access_token'].encode()).digest() ).rstrip(b'=').decode('ascii') dpop_proof = create_dpop_proof("POST", xrpc_url, access_token_hash) headers = { 'Authorization': f"DPoP {session_data['access_token']}", 'DPoP': dpop_proof, 'Content-Type': 'application/json' } body = { 'repo': session_data['did'], 'collection': 'garden.lexicon.oauth-masterclass.now', 'record': record, 'validate': False } response = requests.post(xrpc_url, json=body, headers=headers) # Handle DPoP nonce requirement if response.status_code == 400 or response.status_code == 401: try: error_data = response.json() if error_data.get('error') == 'use_dpop_nonce': # Get nonce from response header dpop_nonce = response.headers.get('DPoP-Nonce') if dpop_nonce: # Retry with nonce dpop_proof = create_dpop_proof("POST", xrpc_url, access_token_hash, dpop_nonce) headers['DPoP'] = dpop_proof response = requests.post(xrpc_url, json=body, headers=headers) except: pass if response.status_code == 200: result = response.json() return f"Record created successfully! URI: {result.get('uri', 'unknown')}" else: return f"Failed to create record: {response.text}", 400 if __name__ == '__main__': app.run(debug=True, port=5000)