this repo has no description
at main 507 lines 16 kB view raw
1from flask import Flask, render_template_string, request, redirect, session, jsonify, url_for 2import secrets 3import hashlib 4import base64 5import urllib.parse 6import requests 7import json 8from datetime import datetime, timedelta, timezone 9import jwt 10from cryptography.hazmat.primitives import serialization 11from cryptography.hazmat.primitives.asymmetric import rsa 12from cryptography.hazmat.backends import default_backend 13import dns.resolver 14 15app = Flask(__name__) 16app.secret_key = secrets.token_hex(32) 17 18# In-memory storage 19identities = {} 20oauth_requests = {} 21oauth_sessions = {} 22 23# Generate RSA key pair for signing 24private_key = rsa.generate_private_key( 25 public_exponent=65537, 26 key_size=2048, 27 backend=default_backend() 28) 29public_key = private_key.public_key() 30 31# OAuth client configuration 32CLIENT_ID = "https://oauth-py.smokesignal.tools/client-metadata.json" 33REDIRECT_URI = "https://oauth-py.smokesignal.tools/login/callback" 34 35def get_jwk(): 36 """Convert public key to JWK format""" 37 public_numbers = public_key.public_numbers() 38 39 def int_to_base64(n): 40 hex_n = hex(n)[2:] 41 if len(hex_n) % 2: 42 hex_n = '0' + hex_n 43 return base64.urlsafe_b64encode(bytes.fromhex(hex_n)).rstrip(b'=').decode('ascii') 44 45 return { 46 "kty": "RSA", 47 "use": "sig", 48 "alg": "RS256", 49 "kid": "key1", 50 "n": int_to_base64(public_numbers.n), 51 "e": int_to_base64(public_numbers.e) 52 } 53 54def create_dpop_proof(htm, htu, ath=None, nonce=None): 55 """Create a DPoP proof JWT""" 56 jti = secrets.token_urlsafe(16) 57 # Subtract 10 seconds to account for clock skew between client and server 58 iat = int(datetime.now(timezone.utc).timestamp()) - 10 59 60 header = { 61 "typ": "dpop+jwt", 62 "alg": "RS256", 63 "jwk": get_jwk() 64 } 65 66 payload = { 67 "jti": jti, 68 "htm": htm, 69 "htu": htu, 70 "iat": iat 71 } 72 73 if ath: 74 payload["ath"] = ath 75 76 if nonce: 77 payload["nonce"] = nonce 78 79 token = jwt.encode( 80 payload, 81 private_key, 82 algorithm="RS256", 83 headers=header 84 ) 85 86 return token 87 88def generate_pkce_verifier(): 89 """Generate PKCE code verifier""" 90 return base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode('ascii') 91 92def generate_pkce_challenge(verifier): 93 """Generate PKCE code challenge from verifier""" 94 digest = hashlib.sha256(verifier.encode('ascii')).digest() 95 return base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii') 96 97def resolve_handle_to_did(handle): 98 """Resolve ATProtocol handle to DID using HTTP then DNS""" 99 # Try HTTPS well-known first 100 try: 101 response = requests.get(f"https://{handle}/.well-known/atproto-did", timeout=5) 102 if response.status_code == 200: 103 did = response.text.strip() 104 if did.startswith('did:'): 105 return did 106 except Exception: 107 pass 108 109 # Try DNS resolution as fallback 110 try: 111 answers = dns.resolver.resolve(f"_atproto.{handle}", 'TXT') 112 for rdata in answers: 113 # TXT records return a tuple of byte strings 114 for txt_string in rdata.strings: 115 txt_value = txt_string.decode('utf-8') if isinstance(txt_string, bytes) else txt_string 116 if txt_value.startswith('did='): 117 did = txt_value.removeprefix('did=') 118 if did.startswith('did:'): 119 return did 120 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, Exception): 121 pass 122 123 return None 124 125@app.route('/') 126def home(): 127 """Home page displaying current time""" 128 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") 129 html = """ 130 <!DOCTYPE html> 131 <html> 132 <head><title>OAuth Masterclass</title></head> 133 <body> 134 <h1>Welcome to OAuth Masterclass</h1> 135 <p>Current time: {{ time }}</p> 136 <p><a href="/login">Login</a></p> 137 {% if session.get('handle') %} 138 <p>Logged in as: {{ session['handle'] }}</p> 139 <p><a href="/now">Go to protected page</a></p> 140 {% endif %} 141 </body> 142 </html> 143 """ 144 return render_template_string(html, time=current_time) 145 146@app.route('/login', methods=['GET']) 147def login_get(): 148 """Login page with form""" 149 html = """ 150 <!DOCTYPE html> 151 <html> 152 <head><title>Login</title></head> 153 <body> 154 <h1>Login with ATProtocol</h1> 155 <form method="POST" action="/login"> 156 <label for="handle">Handle:</label> 157 <input type="text" id="handle" name="handle" placeholder="user.bsky.social" required> 158 <button type="submit">Login</button> 159 </form> 160 </body> 161 </html> 162 """ 163 return render_template_string(html) 164 165@app.route('/login', methods=['POST']) 166def login_post(): 167 """Initiate ATProtocol OAuth flow""" 168 handle = request.form.get('handle') 169 if not handle: 170 return "Handle required", 400 171 172 # Resolve handle to DID 173 if handle.startswith('did:'): 174 did = handle 175 else: 176 did = resolve_handle_to_did(handle) 177 if not did: 178 return "Could not resolve handle", 400 179 180 # Get DID document 181 if did.startswith('did:plc:'): 182 plc_url = f"https://plc.directory/{did}" 183 response = requests.get(plc_url) 184 did_doc = response.json() 185 elif did.startswith('did:web:'): 186 # Simplified did:web resolution 187 domain = did.replace('did:web:', '') 188 response = requests.get(f"https://{domain}/.well-known/did.json") 189 did_doc = response.json() 190 else: 191 return "Unsupported DID method", 400 192 193 # Find PDS endpoint 194 pds_endpoint = None 195 for service in did_doc.get('service', []): 196 if service.get('type') == 'AtprotoPersonalDataServer': 197 pds_endpoint = service.get('serviceEndpoint') 198 break 199 200 if not pds_endpoint: 201 return "No PDS found for user", 400 202 203 # Get authorization server metadata 204 response = requests.get(f"{pds_endpoint}/.well-known/oauth-authorization-server") 205 auth_metadata = response.json() 206 207 # Check if PKCE is required 208 pkce_required = 'S256' in auth_metadata.get('code_challenge_methods_supported', []) 209 210 # Generate state and PKCE 211 state = secrets.token_urlsafe(32) 212 verifier = generate_pkce_verifier() 213 challenge = generate_pkce_challenge(verifier) 214 215 # Store OAuth request 216 oauth_requests[state] = { 217 'handle': handle, 218 'did': did, 219 'pds_endpoint': pds_endpoint, 220 'auth_metadata': auth_metadata, 221 'verifier': verifier, 222 'created_at': datetime.now(timezone.utc) 223 } 224 225 # Build authorization URL 226 auth_params = { 227 'response_type': 'code', 228 'client_id': CLIENT_ID, 229 'redirect_uri': REDIRECT_URI, 230 'state': state, 231 'scope': 'atproto repo:garden.lexicon.oauth-masterclass.now', 232 } 233 234 if pkce_required: 235 auth_params['code_challenge'] = challenge 236 auth_params['code_challenge_method'] = 'S256' 237 238 auth_url = f"{auth_metadata['authorization_endpoint']}?{urllib.parse.urlencode(auth_params)}" 239 240 return redirect(auth_url) 241 242@app.route('/login/callback') 243def login_callback(): 244 """OAuth callback handler""" 245 code = request.args.get('code') 246 state = request.args.get('state') 247 error = request.args.get('error') 248 249 if error: 250 return f"OAuth error: {error}", 400 251 252 if not code or not state: 253 return "Missing code or state", 400 254 255 # Retrieve OAuth request 256 oauth_req = oauth_requests.get(state) 257 if not oauth_req: 258 return "Invalid state", 400 259 260 # Exchange code for tokens 261 auth_metadata = oauth_req['auth_metadata'] 262 263 # Create DPoP proof for token request 264 dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint']) 265 266 token_data = { 267 'grant_type': 'authorization_code', 268 'code': code, 269 'redirect_uri': REDIRECT_URI, 270 'client_id': CLIENT_ID, 271 } 272 273 if oauth_req.get('verifier'): 274 token_data['code_verifier'] = oauth_req['verifier'] 275 276 headers = { 277 'Content-Type': 'application/x-www-form-urlencoded', 278 'DPoP': dpop_proof 279 } 280 281 response = requests.post( 282 auth_metadata['token_endpoint'], 283 data=token_data, 284 headers=headers 285 ) 286 287 # Handle DPoP nonce requirement 288 if response.status_code == 400: 289 error_data = response.json() 290 if error_data.get('error') == 'use_dpop_nonce': 291 # Get nonce from response header 292 dpop_nonce = response.headers.get('DPoP-Nonce') 293 if dpop_nonce: 294 # Retry with nonce 295 dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint'], nonce=dpop_nonce) 296 headers['DPoP'] = dpop_proof 297 response = requests.post( 298 auth_metadata['token_endpoint'], 299 data=token_data, 300 headers=headers 301 ) 302 303 if response.status_code != 200: 304 return f"Token exchange failed: {response.text}", 400 305 306 tokens = response.json() 307 308 # Store session 309 session_id = secrets.token_urlsafe(32) 310 oauth_sessions[session_id] = { 311 'handle': oauth_req['handle'], 312 'did': oauth_req['did'], 313 'pds_endpoint': oauth_req['pds_endpoint'], 314 'access_token': tokens['access_token'], 315 'refresh_token': tokens.get('refresh_token'), 316 'expires_at': datetime.now(timezone.utc) + timedelta(seconds=tokens.get('expires_in', 3600)), 317 'auth_metadata': auth_metadata 318 } 319 320 # Store in Flask session 321 session['session_id'] = session_id 322 session['handle'] = oauth_req['handle'] 323 324 # Clean up OAuth request 325 del oauth_requests[state] 326 327 return redirect('/') 328 329@app.route('/client-metadata.json') 330def client_metadata(): 331 """Serve OAuth client metadata""" 332 metadata = { 333 "client_id": CLIENT_ID, 334 "client_name": "OAuth Masterclass App", 335 "client_uri": "https://oauth-py.smokesignal.tools", 336 "redirect_uris": [REDIRECT_URI], 337 "scope": "atproto repo:garden.lexicon.oauth-masterclass.now", 338 "grant_types": ["authorization_code", "refresh_token"], 339 "response_types": ["code"], 340 "token_endpoint_auth_method": "none", 341 "application_type": "web", 342 "dpop_bound_access_tokens": True, 343 "jwks": { 344 "keys": [get_jwk()] 345 } 346 } 347 return jsonify(metadata) 348 349def refresh_token_if_needed(session_data): 350 """Refresh access token if expired""" 351 if datetime.now(timezone.utc) >= session_data['expires_at'] - timedelta(minutes=5): 352 if not session_data.get('refresh_token'): 353 return False 354 355 auth_metadata = session_data['auth_metadata'] 356 dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint']) 357 358 token_data = { 359 'grant_type': 'refresh_token', 360 'refresh_token': session_data['refresh_token'], 361 'client_id': CLIENT_ID, 362 } 363 364 headers = { 365 'Content-Type': 'application/x-www-form-urlencoded', 366 'DPoP': dpop_proof 367 } 368 369 response = requests.post( 370 auth_metadata['token_endpoint'], 371 data=token_data, 372 headers=headers 373 ) 374 375 # Handle DPoP nonce requirement 376 if response.status_code == 400: 377 error_data = response.json() 378 if error_data.get('error') == 'use_dpop_nonce': 379 # Get nonce from response header 380 dpop_nonce = response.headers.get('DPoP-Nonce') 381 if dpop_nonce: 382 # Retry with nonce 383 dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint'], nonce=dpop_nonce) 384 headers['DPoP'] = dpop_proof 385 response = requests.post( 386 auth_metadata['token_endpoint'], 387 data=token_data, 388 headers=headers 389 ) 390 391 if response.status_code == 200: 392 tokens = response.json() 393 session_data['access_token'] = tokens['access_token'] 394 if 'refresh_token' in tokens: 395 session_data['refresh_token'] = tokens['refresh_token'] 396 session_data['expires_at'] = datetime.now(timezone.utc) + timedelta(seconds=tokens.get('expires_in', 3600)) 397 return True 398 399 return False 400 401 return True 402 403@app.route('/now', methods=['GET']) 404def now_get(): 405 """Protected page for authenticated users only""" 406 session_id = session.get('session_id') 407 if not session_id or session_id not in oauth_sessions: 408 return redirect('/login') 409 410 session_data = oauth_sessions[session_id] 411 412 # Refresh token if needed 413 if not refresh_token_if_needed(session_data): 414 session.clear() 415 return redirect('/login') 416 417 html = """ 418 <!DOCTYPE html> 419 <html> 420 <head><title>Now</title></head> 421 <body> 422 <h1>Protected Page</h1> 423 <p>Hello, {{ handle }}!</p> 424 <form method="POST" action="/now"> 425 <label for="message">Message:</label> 426 <input type="text" id="message" name="message" required> 427 <button type="submit">Submit to ATProtocol</button> 428 </form> 429 <p><a href="/">Home</a></p> 430 </body> 431 </html> 432 """ 433 return render_template_string(html, handle=session_data['handle']) 434 435@app.route('/now', methods=['POST']) 436def now_post(): 437 """Submit ATProtocol record via XRPC""" 438 session_id = session.get('session_id') 439 if not session_id or session_id not in oauth_sessions: 440 return redirect('/login') 441 442 session_data = oauth_sessions[session_id] 443 message = request.form.get('message') 444 445 if not message: 446 return "Message required", 400 447 448 # Refresh token if needed 449 if not refresh_token_if_needed(session_data): 450 session.clear() 451 return redirect('/login') 452 453 # Prepare record 454 record = { 455 "$type": "garden.lexicon.oauth-masterclass.now", 456 "now": message, 457 "createdAt": datetime.now(timezone.utc).isoformat() + "Z" 458 } 459 460 # Prepare XRPC request 461 xrpc_url = f"{session_data['pds_endpoint']}/xrpc/com.atproto.repo.createRecord" 462 463 # Create DPoP proof with access token hash 464 access_token_hash = base64.urlsafe_b64encode( 465 hashlib.sha256(session_data['access_token'].encode()).digest() 466 ).rstrip(b'=').decode('ascii') 467 468 dpop_proof = create_dpop_proof("POST", xrpc_url, access_token_hash) 469 470 headers = { 471 'Authorization': f"DPoP {session_data['access_token']}", 472 'DPoP': dpop_proof, 473 'Content-Type': 'application/json' 474 } 475 476 body = { 477 'repo': session_data['did'], 478 'collection': 'garden.lexicon.oauth-masterclass.now', 479 'record': record, 480 'validate': False 481 } 482 483 response = requests.post(xrpc_url, json=body, headers=headers) 484 485 # Handle DPoP nonce requirement 486 if response.status_code == 400 or response.status_code == 401: 487 try: 488 error_data = response.json() 489 if error_data.get('error') == 'use_dpop_nonce': 490 # Get nonce from response header 491 dpop_nonce = response.headers.get('DPoP-Nonce') 492 if dpop_nonce: 493 # Retry with nonce 494 dpop_proof = create_dpop_proof("POST", xrpc_url, access_token_hash, dpop_nonce) 495 headers['DPoP'] = dpop_proof 496 response = requests.post(xrpc_url, json=body, headers=headers) 497 except: 498 pass 499 500 if response.status_code == 200: 501 result = response.json() 502 return f"Record created successfully! URI: {result.get('uri', 'unknown')}" 503 else: 504 return f"Failed to create record: {response.text}", 400 505 506if __name__ == '__main__': 507 app.run(debug=True, port=5000)