···01from urllib.parse import urlparse
2from typing import Any
3import time
···349350# Helper to demonstrate making a request (HTTP GET or POST) to the user's PDS ("Resource Server" in OAuth terminology) using DPoP and access token.
351# This method returns a 'requests' reponse, without checking status code.
352-def pds_authed_req(method: str, url: str, user: dict, db: Any, body=None) -> Any:
000000353 dpop_private_jwk = JsonWebKey.import_key(json.loads(user["dpop_private_jwk"]))
354 dpop_pds_nonce = user["dpop_pds_nonce"]
355 access_token = user["access_token"]
00356357 # Might need to retry request with a new nonce.
358 for i in range(2):
···365 )
366367 with hardened_http.get_session() as sess:
368- resp = sess.post(
369 url,
370 headers={
371 "Authorization": f"DPoP {access_token}",
···376377 # If we got a new server-provided DPoP nonce, store it in database and retry.
378 # NOTE: the type of error might also be communicated in the `WWW-Authenticate` HTTP response header.
379- if resp.status_code in [400, 401] and resp.json()["error"] == "use_dpop_nonce":
000380 # print(resp.headers)
381- dpop_pds_nonce = resp.headers["DPoP-Nonce"]
382 print(f"retrying with new PDS DPoP nonce: {dpop_pds_nonce}")
383 # update session database with new nonce
384 cur = db.cursor()
385- cur.execute(
386 "UPDATE oauth_session SET dpop_pds_nonce = ? WHERE did = ?;",
387 [dpop_pds_nonce, user["did"]],
388 )
···391 continue
392 break
393394- return resp
···1+import sqlite3
2from urllib.parse import urlparse
3from typing import Any
4import time
···350351# Helper to demonstrate making a request (HTTP GET or POST) to the user's PDS ("Resource Server" in OAuth terminology) using DPoP and access token.
352# This method returns a 'requests' reponse, without checking status code.
353+def pds_authed_req(
354+ method: str,
355+ url: str,
356+ user: dict[str, str],
357+ db: sqlite3.Connection,
358+ body: dict[str, Any] | None = None,
359+) -> Response | None:
360 dpop_private_jwk = JsonWebKey.import_key(json.loads(user["dpop_private_jwk"]))
361 dpop_pds_nonce = user["dpop_pds_nonce"]
362 access_token = user["access_token"]
363+364+ response: Response | None = None
365366 # Might need to retry request with a new nonce.
367 for i in range(2):
···374 )
375376 with hardened_http.get_session() as sess:
377+ response = sess.post(
378 url,
379 headers={
380 "Authorization": f"DPoP {access_token}",
···385386 # If we got a new server-provided DPoP nonce, store it in database and retry.
387 # NOTE: the type of error might also be communicated in the `WWW-Authenticate` HTTP response header.
388+ if (
389+ response.status_code in [400, 401]
390+ and response.json()["error"] == "use_dpop_nonce"
391+ ):
392 # print(resp.headers)
393+ dpop_pds_nonce = response.headers["DPoP-Nonce"]
394 print(f"retrying with new PDS DPoP nonce: {dpop_pds_nonce}")
395 # update session database with new nonce
396 cur = db.cursor()
397+ _ = cur.execute(
398 "UPDATE oauth_session SET dpop_pds_nonce = ? WHERE did = ?;",
399 [dpop_pds_nonce, user["did"]],
400 )
···403 continue
404 break
405406+ return response