tangled
alpha
login
or
join now
ligo.at
/
core
6
fork
atom
decentralized and customizable links page on top of atproto
ligo.at
atproto
link-in-bio
python
uv
6
fork
atom
overview
issues
2
pulls
pipelines
close connection after reading json
nauta.one
4 weeks ago
25f4e2c0
7bbcc337
+15
-12
2 changed files
expand all
collapse all
unified
split
src
atproto
oauth.py
oauth.py
+10
-10
src/atproto/oauth.py
···
26
# Prepares and sends a pushed auth request (PAR) via HTTP POST to the Authorization Server.
27
# Returns "state" id HTTP response on success, without checking HTTP response status
28
async def send_par_auth_request(
0
29
authserver_url: str,
30
authserver_meta: dict[str, str],
31
login_hint: str | None,
···
70
71
# IMPORTANT: Pushed Authorization Request URL is untrusted input, SSRF mitigations are needed
72
assert is_safe_url(par_url)
73
-
async with hardened_http.get_session() as session:
74
-
resp = await session.post(
75
-
par_url,
76
-
headers={
77
-
"Content-Type": "application/x-www-form-urlencoded",
78
-
"DPoP": dpop_proof,
79
-
},
80
-
data=par_body,
81
-
)
82
-
respjson = await resp.json()
83
84
# Handle DPoP missing/invalid nonce error by retrying with server-provided nonce
85
if resp.status == 400 and respjson["error"] == "use_dpop_nonce":
···
26
# Prepares and sends a pushed auth request (PAR) via HTTP POST to the Authorization Server.
27
# Returns "state" id HTTP response on success, without checking HTTP response status
28
async def send_par_auth_request(
29
+
hardened_client: ClientSession,
30
authserver_url: str,
31
authserver_meta: dict[str, str],
32
login_hint: str | None,
···
71
72
# IMPORTANT: Pushed Authorization Request URL is untrusted input, SSRF mitigations are needed
73
assert is_safe_url(par_url)
74
+
resp = await hardened_client.post(
75
+
par_url,
76
+
headers={
77
+
"Content-Type": "application/x-www-form-urlencoded",
78
+
"DPoP": dpop_proof,
79
+
},
80
+
data=par_body,
81
+
)
82
+
respjson = await resp.json()
0
83
84
# Handle DPoP missing/invalid nonce error by retrying with server-provided nonce
85
if resp.status == 400 and respjson["error"] == "use_dpop_nonce":
+5
-2
src/oauth.py
···
29
save_auth_session,
30
)
31
from src.db import KV, get_db
32
-
from src.security import is_safe_url
33
34
oauth = Blueprint("oauth", __name__, url_prefix="/oauth")
35
···
100
101
CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
102
0
103
pkce_verifier, state, dpop_authserver_nonce, resp = await send_par_auth_request(
0
104
authserver_url,
105
authserver_meta,
106
login_hint,
···
119
120
respjson: dict[str, str] = await resp.json()
121
par_request_uri: str = respjson["request_uri"]
122
-
current_app.logger.debug(f"saving oauth_auth_request to DB state={state}")
123
0
124
oauth_request = OAuthAuthRequest(
125
state,
126
authserver_meta["issuer"],
···
29
save_auth_session,
30
)
31
from src.db import KV, get_db
32
+
from src.security import hardened_http, is_safe_url
33
34
oauth = Blueprint("oauth", __name__, url_prefix="/oauth")
35
···
100
101
CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
102
103
+
client = hardened_http.get_session()
104
pkce_verifier, state, dpop_authserver_nonce, resp = await send_par_auth_request(
105
+
client,
106
authserver_url,
107
authserver_meta,
108
login_hint,
···
121
122
respjson: dict[str, str] = await resp.json()
123
par_request_uri: str = respjson["request_uri"]
124
+
await client.close()
125
126
+
current_app.logger.debug(f"saving oauth_auth_request to DB state={state}")
127
oauth_request = OAuthAuthRequest(
128
state,
129
authserver_meta["issuer"],