this repo has no description
1from flask import Flask, render_template_string, request, redirect, session, jsonify, url_for
2import secrets
3import hashlib
4import base64
5import urllib.parse
6import requests
7import json
8from datetime import datetime, timedelta, timezone
9import jwt
10from cryptography.hazmat.primitives import serialization
11from cryptography.hazmat.primitives.asymmetric import rsa
12from cryptography.hazmat.backends import default_backend
13import dns.resolver
14
15app = Flask(__name__)
16app.secret_key = secrets.token_hex(32)
17
18# In-memory storage
19identities = {}
20oauth_requests = {}
21oauth_sessions = {}
22
23# Generate RSA key pair for signing
24private_key = rsa.generate_private_key(
25 public_exponent=65537,
26 key_size=2048,
27 backend=default_backend()
28)
29public_key = private_key.public_key()
30
31# OAuth client configuration
32CLIENT_ID = "https://oauth-py.smokesignal.tools/client-metadata.json"
33REDIRECT_URI = "https://oauth-py.smokesignal.tools/login/callback"
34
35def get_jwk():
36 """Convert public key to JWK format"""
37 public_numbers = public_key.public_numbers()
38
39 def int_to_base64(n):
40 hex_n = hex(n)[2:]
41 if len(hex_n) % 2:
42 hex_n = '0' + hex_n
43 return base64.urlsafe_b64encode(bytes.fromhex(hex_n)).rstrip(b'=').decode('ascii')
44
45 return {
46 "kty": "RSA",
47 "use": "sig",
48 "alg": "RS256",
49 "kid": "key1",
50 "n": int_to_base64(public_numbers.n),
51 "e": int_to_base64(public_numbers.e)
52 }
53
54def create_dpop_proof(htm, htu, ath=None, nonce=None):
55 """Create a DPoP proof JWT"""
56 jti = secrets.token_urlsafe(16)
57 # Subtract 10 seconds to account for clock skew between client and server
58 iat = int(datetime.now(timezone.utc).timestamp()) - 10
59
60 header = {
61 "typ": "dpop+jwt",
62 "alg": "RS256",
63 "jwk": get_jwk()
64 }
65
66 payload = {
67 "jti": jti,
68 "htm": htm,
69 "htu": htu,
70 "iat": iat
71 }
72
73 if ath:
74 payload["ath"] = ath
75
76 if nonce:
77 payload["nonce"] = nonce
78
79 token = jwt.encode(
80 payload,
81 private_key,
82 algorithm="RS256",
83 headers=header
84 )
85
86 return token
87
88def generate_pkce_verifier():
89 """Generate PKCE code verifier"""
90 return base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=').decode('ascii')
91
92def generate_pkce_challenge(verifier):
93 """Generate PKCE code challenge from verifier"""
94 digest = hashlib.sha256(verifier.encode('ascii')).digest()
95 return base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
96
97def resolve_handle_to_did(handle):
98 """Resolve ATProtocol handle to DID using HTTP then DNS"""
99 # Try HTTPS well-known first
100 try:
101 response = requests.get(f"https://{handle}/.well-known/atproto-did", timeout=5)
102 if response.status_code == 200:
103 did = response.text.strip()
104 if did.startswith('did:'):
105 return did
106 except Exception:
107 pass
108
109 # Try DNS resolution as fallback
110 try:
111 answers = dns.resolver.resolve(f"_atproto.{handle}", 'TXT')
112 for rdata in answers:
113 # TXT records return a tuple of byte strings
114 for txt_string in rdata.strings:
115 txt_value = txt_string.decode('utf-8') if isinstance(txt_string, bytes) else txt_string
116 if txt_value.startswith('did='):
117 did = txt_value.removeprefix('did=')
118 if did.startswith('did:'):
119 return did
120 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, Exception):
121 pass
122
123 return None
124
125@app.route('/')
126def home():
127 """Home page displaying current time"""
128 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
129 html = """
130 <!DOCTYPE html>
131 <html>
132 <head><title>OAuth Masterclass</title></head>
133 <body>
134 <h1>Welcome to OAuth Masterclass</h1>
135 <p>Current time: {{ time }}</p>
136 <p><a href="/login">Login</a></p>
137 {% if session.get('handle') %}
138 <p>Logged in as: {{ session['handle'] }}</p>
139 <p><a href="/now">Go to protected page</a></p>
140 {% endif %}
141 </body>
142 </html>
143 """
144 return render_template_string(html, time=current_time)
145
146@app.route('/login', methods=['GET'])
147def login_get():
148 """Login page with form"""
149 html = """
150 <!DOCTYPE html>
151 <html>
152 <head><title>Login</title></head>
153 <body>
154 <h1>Login with ATProtocol</h1>
155 <form method="POST" action="/login">
156 <label for="handle">Handle:</label>
157 <input type="text" id="handle" name="handle" placeholder="user.bsky.social" required>
158 <button type="submit">Login</button>
159 </form>
160 </body>
161 </html>
162 """
163 return render_template_string(html)
164
165@app.route('/login', methods=['POST'])
166def login_post():
167 """Initiate ATProtocol OAuth flow"""
168 handle = request.form.get('handle')
169 if not handle:
170 return "Handle required", 400
171
172 # Resolve handle to DID
173 if handle.startswith('did:'):
174 did = handle
175 else:
176 did = resolve_handle_to_did(handle)
177 if not did:
178 return "Could not resolve handle", 400
179
180 # Get DID document
181 if did.startswith('did:plc:'):
182 plc_url = f"https://plc.directory/{did}"
183 response = requests.get(plc_url)
184 did_doc = response.json()
185 elif did.startswith('did:web:'):
186 # Simplified did:web resolution
187 domain = did.replace('did:web:', '')
188 response = requests.get(f"https://{domain}/.well-known/did.json")
189 did_doc = response.json()
190 else:
191 return "Unsupported DID method", 400
192
193 # Find PDS endpoint
194 pds_endpoint = None
195 for service in did_doc.get('service', []):
196 if service.get('type') == 'AtprotoPersonalDataServer':
197 pds_endpoint = service.get('serviceEndpoint')
198 break
199
200 if not pds_endpoint:
201 return "No PDS found for user", 400
202
203 # Get authorization server metadata
204 response = requests.get(f"{pds_endpoint}/.well-known/oauth-authorization-server")
205 auth_metadata = response.json()
206
207 # Check if PKCE is required
208 pkce_required = 'S256' in auth_metadata.get('code_challenge_methods_supported', [])
209
210 # Generate state and PKCE
211 state = secrets.token_urlsafe(32)
212 verifier = generate_pkce_verifier()
213 challenge = generate_pkce_challenge(verifier)
214
215 # Store OAuth request
216 oauth_requests[state] = {
217 'handle': handle,
218 'did': did,
219 'pds_endpoint': pds_endpoint,
220 'auth_metadata': auth_metadata,
221 'verifier': verifier,
222 'created_at': datetime.now(timezone.utc)
223 }
224
225 # Build authorization URL
226 auth_params = {
227 'response_type': 'code',
228 'client_id': CLIENT_ID,
229 'redirect_uri': REDIRECT_URI,
230 'state': state,
231 'scope': 'atproto repo:garden.lexicon.oauth-masterclass.now',
232 }
233
234 if pkce_required:
235 auth_params['code_challenge'] = challenge
236 auth_params['code_challenge_method'] = 'S256'
237
238 auth_url = f"{auth_metadata['authorization_endpoint']}?{urllib.parse.urlencode(auth_params)}"
239
240 return redirect(auth_url)
241
242@app.route('/login/callback')
243def login_callback():
244 """OAuth callback handler"""
245 code = request.args.get('code')
246 state = request.args.get('state')
247 error = request.args.get('error')
248
249 if error:
250 return f"OAuth error: {error}", 400
251
252 if not code or not state:
253 return "Missing code or state", 400
254
255 # Retrieve OAuth request
256 oauth_req = oauth_requests.get(state)
257 if not oauth_req:
258 return "Invalid state", 400
259
260 # Exchange code for tokens
261 auth_metadata = oauth_req['auth_metadata']
262
263 # Create DPoP proof for token request
264 dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint'])
265
266 token_data = {
267 'grant_type': 'authorization_code',
268 'code': code,
269 'redirect_uri': REDIRECT_URI,
270 'client_id': CLIENT_ID,
271 }
272
273 if oauth_req.get('verifier'):
274 token_data['code_verifier'] = oauth_req['verifier']
275
276 headers = {
277 'Content-Type': 'application/x-www-form-urlencoded',
278 'DPoP': dpop_proof
279 }
280
281 response = requests.post(
282 auth_metadata['token_endpoint'],
283 data=token_data,
284 headers=headers
285 )
286
287 # Handle DPoP nonce requirement
288 if response.status_code == 400:
289 error_data = response.json()
290 if error_data.get('error') == 'use_dpop_nonce':
291 # Get nonce from response header
292 dpop_nonce = response.headers.get('DPoP-Nonce')
293 if dpop_nonce:
294 # Retry with nonce
295 dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint'], nonce=dpop_nonce)
296 headers['DPoP'] = dpop_proof
297 response = requests.post(
298 auth_metadata['token_endpoint'],
299 data=token_data,
300 headers=headers
301 )
302
303 if response.status_code != 200:
304 return f"Token exchange failed: {response.text}", 400
305
306 tokens = response.json()
307
308 # Store session
309 session_id = secrets.token_urlsafe(32)
310 oauth_sessions[session_id] = {
311 'handle': oauth_req['handle'],
312 'did': oauth_req['did'],
313 'pds_endpoint': oauth_req['pds_endpoint'],
314 'access_token': tokens['access_token'],
315 'refresh_token': tokens.get('refresh_token'),
316 'expires_at': datetime.now(timezone.utc) + timedelta(seconds=tokens.get('expires_in', 3600)),
317 'auth_metadata': auth_metadata
318 }
319
320 # Store in Flask session
321 session['session_id'] = session_id
322 session['handle'] = oauth_req['handle']
323
324 # Clean up OAuth request
325 del oauth_requests[state]
326
327 return redirect('/')
328
329@app.route('/client-metadata.json')
330def client_metadata():
331 """Serve OAuth client metadata"""
332 metadata = {
333 "client_id": CLIENT_ID,
334 "client_name": "OAuth Masterclass App",
335 "client_uri": "https://oauth-py.smokesignal.tools",
336 "redirect_uris": [REDIRECT_URI],
337 "scope": "atproto repo:garden.lexicon.oauth-masterclass.now",
338 "grant_types": ["authorization_code", "refresh_token"],
339 "response_types": ["code"],
340 "token_endpoint_auth_method": "none",
341 "application_type": "web",
342 "dpop_bound_access_tokens": True,
343 "jwks": {
344 "keys": [get_jwk()]
345 }
346 }
347 return jsonify(metadata)
348
349def refresh_token_if_needed(session_data):
350 """Refresh access token if expired"""
351 if datetime.now(timezone.utc) >= session_data['expires_at'] - timedelta(minutes=5):
352 if not session_data.get('refresh_token'):
353 return False
354
355 auth_metadata = session_data['auth_metadata']
356 dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint'])
357
358 token_data = {
359 'grant_type': 'refresh_token',
360 'refresh_token': session_data['refresh_token'],
361 'client_id': CLIENT_ID,
362 }
363
364 headers = {
365 'Content-Type': 'application/x-www-form-urlencoded',
366 'DPoP': dpop_proof
367 }
368
369 response = requests.post(
370 auth_metadata['token_endpoint'],
371 data=token_data,
372 headers=headers
373 )
374
375 # Handle DPoP nonce requirement
376 if response.status_code == 400:
377 error_data = response.json()
378 if error_data.get('error') == 'use_dpop_nonce':
379 # Get nonce from response header
380 dpop_nonce = response.headers.get('DPoP-Nonce')
381 if dpop_nonce:
382 # Retry with nonce
383 dpop_proof = create_dpop_proof("POST", auth_metadata['token_endpoint'], nonce=dpop_nonce)
384 headers['DPoP'] = dpop_proof
385 response = requests.post(
386 auth_metadata['token_endpoint'],
387 data=token_data,
388 headers=headers
389 )
390
391 if response.status_code == 200:
392 tokens = response.json()
393 session_data['access_token'] = tokens['access_token']
394 if 'refresh_token' in tokens:
395 session_data['refresh_token'] = tokens['refresh_token']
396 session_data['expires_at'] = datetime.now(timezone.utc) + timedelta(seconds=tokens.get('expires_in', 3600))
397 return True
398
399 return False
400
401 return True
402
403@app.route('/now', methods=['GET'])
404def now_get():
405 """Protected page for authenticated users only"""
406 session_id = session.get('session_id')
407 if not session_id or session_id not in oauth_sessions:
408 return redirect('/login')
409
410 session_data = oauth_sessions[session_id]
411
412 # Refresh token if needed
413 if not refresh_token_if_needed(session_data):
414 session.clear()
415 return redirect('/login')
416
417 html = """
418 <!DOCTYPE html>
419 <html>
420 <head><title>Now</title></head>
421 <body>
422 <h1>Protected Page</h1>
423 <p>Hello, {{ handle }}!</p>
424 <form method="POST" action="/now">
425 <label for="message">Message:</label>
426 <input type="text" id="message" name="message" required>
427 <button type="submit">Submit to ATProtocol</button>
428 </form>
429 <p><a href="/">Home</a></p>
430 </body>
431 </html>
432 """
433 return render_template_string(html, handle=session_data['handle'])
434
435@app.route('/now', methods=['POST'])
436def now_post():
437 """Submit ATProtocol record via XRPC"""
438 session_id = session.get('session_id')
439 if not session_id or session_id not in oauth_sessions:
440 return redirect('/login')
441
442 session_data = oauth_sessions[session_id]
443 message = request.form.get('message')
444
445 if not message:
446 return "Message required", 400
447
448 # Refresh token if needed
449 if not refresh_token_if_needed(session_data):
450 session.clear()
451 return redirect('/login')
452
453 # Prepare record
454 record = {
455 "$type": "garden.lexicon.oauth-masterclass.now",
456 "now": message,
457 "createdAt": datetime.now(timezone.utc).isoformat() + "Z"
458 }
459
460 # Prepare XRPC request
461 xrpc_url = f"{session_data['pds_endpoint']}/xrpc/com.atproto.repo.createRecord"
462
463 # Create DPoP proof with access token hash
464 access_token_hash = base64.urlsafe_b64encode(
465 hashlib.sha256(session_data['access_token'].encode()).digest()
466 ).rstrip(b'=').decode('ascii')
467
468 dpop_proof = create_dpop_proof("POST", xrpc_url, access_token_hash)
469
470 headers = {
471 'Authorization': f"DPoP {session_data['access_token']}",
472 'DPoP': dpop_proof,
473 'Content-Type': 'application/json'
474 }
475
476 body = {
477 'repo': session_data['did'],
478 'collection': 'garden.lexicon.oauth-masterclass.now',
479 'record': record,
480 'validate': False
481 }
482
483 response = requests.post(xrpc_url, json=body, headers=headers)
484
485 # Handle DPoP nonce requirement
486 if response.status_code == 400 or response.status_code == 401:
487 try:
488 error_data = response.json()
489 if error_data.get('error') == 'use_dpop_nonce':
490 # Get nonce from response header
491 dpop_nonce = response.headers.get('DPoP-Nonce')
492 if dpop_nonce:
493 # Retry with nonce
494 dpop_proof = create_dpop_proof("POST", xrpc_url, access_token_hash, dpop_nonce)
495 headers['DPoP'] = dpop_proof
496 response = requests.post(xrpc_url, json=body, headers=headers)
497 except:
498 pass
499
500 if response.status_code == 200:
501 result = response.json()
502 return f"Record created successfully! URI: {result.get('uri', 'unknown')}"
503 else:
504 return f"Failed to create record: {response.text}", 400
505
506if __name__ == '__main__':
507 app.run(debug=True, port=5000)