from flask import Flask, render_template_string, request, redirect, session, jsonify, url_for
import secrets
import hashlib
import base64
import urllib.parse
import requests
import json
from datetime import datetime, timedelta, timezone
import jwt
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
import dns.resolver
app = Flask(__name__)
app.secret_key = secrets.token_hex(32)
# In-memory storage
identities = {}
oauth_requests = {}
oauth_sessions = {}
# Generate RSA key pair for signing
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
public_key = private_key.public_key()
# OAuth client configuration
CLIENT_ID = "https://oauth-py.smokesignal.tools/client-metadata.json"
REDIRECT_URI = "https://oauth-py.smokesignal.tools/login/callback"
def get_jwk():
"""Convert public key to JWK format"""
public_numbers = public_key.public_numbers()
def int_to_base64(n):
hex_n = hex(n)[2:]
if len(hex_n) % 2:
hex_n = '0' + hex_n
return base64.urlsafe_b64encode(bytes.fromhex(hex_n)).rstrip(b'=').decode('ascii')
return {
"kty": "RSA",
"use": "sig",
"alg": "RS256",
"kid": "key1",
"n": int_to_base64(public_numbers.n),
"e": int_to_base64(public_numbers.e)
}
def create_dpop_proof(htm, htu, ath=None, nonce=None):
"""Create a DPoP proof JWT"""
jti = secrets.token_urlsafe(16)
# Subtract 10 seconds to account for clock skew between client and server
iat = int(datetime.now(timezone.utc).timestamp()) - 10
header = {
"typ": "dpop+jwt",
"alg": "RS256",
"jwk": get_jwk()
}
payload = {
"jti": jti,
"htm": htm,
"htu": htu,
"iat": iat
}
if ath:
payload["ath"] = ath
if nonce:
payload["nonce"] = nonce
token = jwt.encode(
payload,
private_key,
algorithm="RS256",
headers=header
)
return token
def generate_pkce_verifier():
"""Generate PKCE code verifier"""
return base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode('ascii')
def generate_pkce_challenge(verifier):
"""Generate PKCE code challenge from verifier"""
digest = hashlib.sha256(verifier.encode('ascii')).digest()
return base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
def resolve_handle_to_did(handle):
"""Resolve ATProtocol handle to DID using HTTP then DNS"""
# Try HTTPS well-known first
try:
response = requests.get(f"https://{handle}/.well-known/atproto-did", timeout=5)
if response.status_code == 200:
did = response.text.strip()
if did.startswith('did:'):
return did
except Exception:
pass
# Try DNS resolution as fallback
try:
answers = dns.resolver.resolve(f"_atproto.{handle}", 'TXT')
for rdata in answers:
# TXT records return a tuple of byte strings
for txt_string in rdata.strings:
txt_value = txt_string.decode('utf-8') if isinstance(txt_string, bytes) else txt_string
if txt_value.startswith('did='):
did = txt_value.removeprefix('did=')
if did.startswith('did:'):
return did
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, Exception):
pass
return None
@app.route('/')
def home():
"""Home page displaying current time"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
html = """
OAuth Masterclass
Welcome to OAuth Masterclass
Current time: {{ time }}
Login
{% if session.get('handle') %}
Logged in as: {{ session['handle'] }}
Go to protected page
{% endif %}
"""
return render_template_string(html, time=current_time)
@app.route('/login', methods=['GET'])
def login_get():
"""Login page with form"""
html = """
Login
Login with ATProtocol
"""
return render_template_string(html)
@app.route('/login', methods=['POST'])
def login_post():
"""Initiate ATProtocol OAuth flow"""
handle = request.form.get('handle')
if not handle:
return "Handle required", 400
# Resolve handle to DID
if handle.startswith('did:'):
did = handle
else:
did = resolve_handle_to_did(handle)
if not did:
return "Could not resolve handle", 400
# Get DID document
if did.startswith('did:plc:'):
plc_url = f"https://plc.directory/{did}"
response = requests.get(plc_url)
did_doc = response.json()
elif did.startswith('did:web:'):
# Simplified did:web resolution
domain = did.replace('did:web:', '')
response = requests.get(f"https://{domain}/.well-known/did.json")
did_doc = response.json()
else:
return "Unsupported DID method", 400
# Find PDS endpoint
pds_endpoint = None
for service in did_doc.get('service', []):
if service.get('type') == 'AtprotoPersonalDataServer':
pds_endpoint = service.get('serviceEndpoint')
break
if not pds_endpoint:
return "No PDS found for user", 400
# Get authorization server metadata
response = requests.get(f"{pds_endpoint}/.well-known/oauth-authorization-server")
auth_metadata = response.json()
# Check if PKCE is required
pkce_required = 'S256' in auth_metadata.get('code_challenge_methods_supported', [])
# Generate state and PKCE
state = secrets.token_urlsafe(32)
verifier = generate_pkce_verifier()
challenge = generate_pkce_challenge(verifier)
# Store OAuth request
oauth_requests[state] = {
'handle': handle,
'did': did,
'pds_endpoint': pds_endpoint,
'auth_metadata': auth_metadata,
'verifier': verifier,
'created_at': datetime.now(timezone.utc)
}
# Build authorization URL
auth_params = {
'response_type': 'code',
'client_id': CLIENT_ID,
'redirect_uri': REDIRECT_URI,
'state': state,
'scope': 'atproto repo:garden.lexicon.oauth-masterclass.now',
}
if pkce_required:
auth_params['code_challenge'] = challenge
auth_params['code_challenge_method'] = 'S256'
auth_url = f"{auth_metadata['authorization_endpoint']}?{urllib.parse.urlencode(auth_params)}"
return redirect(auth_url)
@app.route('/login/callback')
def login_callback():
"""OAuth callback handler"""
code = request.args.get('code')
state = request.args.get('state')
error = request.args.get('error')
if error:
return f"OAuth error: {error}", 400
if not code or not state:
return "Missing code or state", 400
# Retrieve OAuth request
oauth_req = oauth_requests.get(state)
if not oauth_req:
return "Invalid state", 400
# Exchange code for tokens
auth_metadata = oauth_req['auth_metadata']
# Create DPoP proof for token request
dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint'])
token_data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': REDIRECT_URI,
'client_id': CLIENT_ID,
}
if oauth_req.get('verifier'):
token_data['code_verifier'] = oauth_req['verifier']
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'DPoP': dpop_proof
}
response = requests.post(
auth_metadata['token_endpoint'],
data=token_data,
headers=headers
)
# Handle DPoP nonce requirement
if response.status_code == 400:
error_data = response.json()
if error_data.get('error') == 'use_dpop_nonce':
# Get nonce from response header
dpop_nonce = response.headers.get('DPoP-Nonce')
if dpop_nonce:
# Retry with nonce
dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint'], nonce=dpop_nonce)
headers['DPoP'] = dpop_proof
response = requests.post(
auth_metadata['token_endpoint'],
data=token_data,
headers=headers
)
if response.status_code != 200:
return f"Token exchange failed: {response.text}", 400
tokens = response.json()
# Store session
session_id = secrets.token_urlsafe(32)
oauth_sessions[session_id] = {
'handle': oauth_req['handle'],
'did': oauth_req['did'],
'pds_endpoint': oauth_req['pds_endpoint'],
'access_token': tokens['access_token'],
'refresh_token': tokens.get('refresh_token'),
'expires_at': datetime.now(timezone.utc) + timedelta(seconds=tokens.get('expires_in', 3600)),
'auth_metadata': auth_metadata
}
# Store in Flask session
session['session_id'] = session_id
session['handle'] = oauth_req['handle']
# Clean up OAuth request
del oauth_requests[state]
return redirect('/')
@app.route('/client-metadata.json')
def client_metadata():
"""Serve OAuth client metadata"""
metadata = {
"client_id": CLIENT_ID,
"client_name": "OAuth Masterclass App",
"client_uri": "https://oauth-py.smokesignal.tools",
"redirect_uris": [REDIRECT_URI],
"scope": "atproto repo:garden.lexicon.oauth-masterclass.now",
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"token_endpoint_auth_method": "none",
"application_type": "web",
"dpop_bound_access_tokens": True,
"jwks": {
"keys": [get_jwk()]
}
}
return jsonify(metadata)
def refresh_token_if_needed(session_data):
"""Refresh access token if expired"""
if datetime.now(timezone.utc) >= session_data['expires_at'] - timedelta(minutes=5):
if not session_data.get('refresh_token'):
return False
auth_metadata = session_data['auth_metadata']
dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint'])
token_data = {
'grant_type': 'refresh_token',
'refresh_token': session_data['refresh_token'],
'client_id': CLIENT_ID,
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'DPoP': dpop_proof
}
response = requests.post(
auth_metadata['token_endpoint'],
data=token_data,
headers=headers
)
# Handle DPoP nonce requirement
if response.status_code == 400:
error_data = response.json()
if error_data.get('error') == 'use_dpop_nonce':
# Get nonce from response header
dpop_nonce = response.headers.get('DPoP-Nonce')
if dpop_nonce:
# Retry with nonce
dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint'], nonce=dpop_nonce)
headers['DPoP'] = dpop_proof
response = requests.post(
auth_metadata['token_endpoint'],
data=token_data,
headers=headers
)
if response.status_code == 200:
tokens = response.json()
session_data['access_token'] = tokens['access_token']
if 'refresh_token' in tokens:
session_data['refresh_token'] = tokens['refresh_token']
session_data['expires_at'] = datetime.now(timezone.utc) + timedelta(seconds=tokens.get('expires_in', 3600))
return True
return False
return True
@app.route('/now', methods=['GET'])
def now_get():
"""Protected page for authenticated users only"""
session_id = session.get('session_id')
if not session_id or session_id not in oauth_sessions:
return redirect('/login')
session_data = oauth_sessions[session_id]
# Refresh token if needed
if not refresh_token_if_needed(session_data):
session.clear()
return redirect('/login')
html = """
Now
Protected Page
Hello, {{ handle }}!
Home
"""
return render_template_string(html, handle=session_data['handle'])
@app.route('/now', methods=['POST'])
def now_post():
"""Submit ATProtocol record via XRPC"""
session_id = session.get('session_id')
if not session_id or session_id not in oauth_sessions:
return redirect('/login')
session_data = oauth_sessions[session_id]
message = request.form.get('message')
if not message:
return "Message required", 400
# Refresh token if needed
if not refresh_token_if_needed(session_data):
session.clear()
return redirect('/login')
# Prepare record
record = {
"$type": "garden.lexicon.oauth-masterclass.now",
"now": message,
"createdAt": datetime.now(timezone.utc).isoformat() + "Z"
}
# Prepare XRPC request
xrpc_url = f"{session_data['pds_endpoint']}/xrpc/com.atproto.repo.createRecord"
# Create DPoP proof with access token hash
access_token_hash = base64.urlsafe_b64encode(
hashlib.sha256(session_data['access_token'].encode()).digest()
).rstrip(b'=').decode('ascii')
dpop_proof = create_dpop_proof("POST", xrpc_url, access_token_hash)
headers = {
'Authorization': f"DPoP {session_data['access_token']}",
'DPoP': dpop_proof,
'Content-Type': 'application/json'
}
body = {
'repo': session_data['did'],
'collection': 'garden.lexicon.oauth-masterclass.now',
'record': record,
'validate': False
}
response = requests.post(xrpc_url, json=body, headers=headers)
# Handle DPoP nonce requirement
if response.status_code == 400 or response.status_code == 401:
try:
error_data = response.json()
if error_data.get('error') == 'use_dpop_nonce':
# Get nonce from response header
dpop_nonce = response.headers.get('DPoP-Nonce')
if dpop_nonce:
# Retry with nonce
dpop_proof = create_dpop_proof("POST", xrpc_url, access_token_hash, dpop_nonce)
headers['DPoP'] = dpop_proof
response = requests.post(xrpc_url, json=body, headers=headers)
except:
pass
if response.status_code == 200:
result = response.json()
return f"Record created successfully! URI: {result.get('uri', 'unknown')}"
else:
return f"Failed to create record: {response.text}", 400
if __name__ == '__main__':
app.run(debug=True, port=5000)