tangled
alpha
login
or
join now
ligo.at
/
core
6
fork
atom
decentralized and customizable links page on top of atproto
ligo.at
atproto
link-in-bio
python
uv
6
fork
atom
overview
issues
2
pulls
pipelines
cleanup dead code
nauta.one
5 months ago
7db508fc
f3619979
+143
-286
8 changed files
expand all
collapse all
unified
split
pyproject.toml
src
atproto
__init__.py
atproto_identity.py
atproto_oauth.py
validator.py
oauth.py
security.py
uv.lock
+1
-1
pyproject.toml
···
9
9
"dnspython>=2.8.0",
10
10
"flask[dotenv]>=3.1.2",
11
11
"requests>=2.32",
12
12
-
"requests-hardened>=1.0.0b3",
12
12
+
"requests-hardened>=1.2.0",
13
13
]
+15
-4
src/atproto/__init__.py
···
1
1
from dns.resolver import resolve as resolve_dns
2
2
+
from re import match as regex_match
2
3
from typing import Any
3
4
import requests
4
5
5
5
-
from .atproto_oauth import is_valid_authserver_meta
6
6
-
from .atproto_security import is_safe_url
7
7
-
from .atproto_identity import is_valid_did, is_valid_handle
6
6
+
from .validator import is_valid_authserver_meta
7
7
+
from ..security import is_safe_url
8
8
9
9
PLC_DIRECTORY = "https://plc.directory"
10
10
+
HANDLE_REGEX = r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$"
11
11
+
DID_REGEX = r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$"
12
12
+
10
13
11
14
AuthserverUrl = str
12
15
PdsUrl = str
···
15
18
authservers: dict[PdsUrl, AuthserverUrl] = {}
16
19
dids: dict[str, DID] = {}
17
20
pdss: dict[DID, PdsUrl] = {}
21
21
+
22
22
+
23
23
+
def is_valid_handle(handle: str) -> bool:
24
24
+
return regex_match(HANDLE_REGEX, handle) is not None
25
25
+
26
26
+
27
27
+
def is_valid_did(did: str) -> bool:
28
28
+
return regex_match(DID_REGEX, did) is not None
18
29
19
30
20
31
def resolve_identity(query: str) -> tuple[str, str, dict[str, Any]] | None:
···
148
159
return authserver_url
149
160
150
161
151
151
-
def resolve_authserver_meta(authserver_url: str) -> dict[str, str] | None:
162
162
+
def fetch_authserver_meta(authserver_url: str) -> dict[str, str] | None:
152
163
"""Returns metadata from the authserver"""
153
164
assert is_safe_url(authserver_url)
154
165
endpoint = f"{authserver_url}/.well-known/oauth-authorization-server"
-134
src/atproto/atproto_identity.py
···
1
1
-
import re
2
2
-
import sys
3
3
-
import requests
4
4
-
import dns.resolver
5
5
-
from typing import Optional, Tuple
6
6
-
7
7
-
from .atproto_security import hardened_http
8
8
-
9
9
-
HANDLE_REGEX = r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$"
10
10
-
DID_REGEX = r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$"
11
11
-
12
12
-
13
13
-
def is_valid_handle(handle: str) -> bool:
14
14
-
return re.match(HANDLE_REGEX, handle) is not None
15
15
-
16
16
-
17
17
-
def is_valid_did(did: str) -> bool:
18
18
-
return re.match(DID_REGEX, did) is not None
19
19
-
20
20
-
21
21
-
def handle_from_doc(doc: dict) -> Optional[str]:
22
22
-
for aka in doc.get("alsoKnownAs", []):
23
23
-
if aka.startswith("at://"):
24
24
-
handle = aka[5:]
25
25
-
if is_valid_handle(handle):
26
26
-
return handle
27
27
-
return None
28
28
-
29
29
-
30
30
-
# resolves an identity (handle or DID) to a DID, handle, and DID document. verifies handle bi-directionally.
31
31
-
def resolve_identity(atid: str) -> Tuple[str, str, dict]:
32
32
-
if is_valid_handle(atid):
33
33
-
handle = atid
34
34
-
did = resolve_handle(handle)
35
35
-
if not did:
36
36
-
raise Exception("Failed to resolve handle: " + handle)
37
37
-
doc = resolve_did(did)
38
38
-
if not doc:
39
39
-
raise Exception("Failed to resolve DID: " + did)
40
40
-
doc_handle = handle_from_doc(doc)
41
41
-
if not doc_handle or doc_handle != handle:
42
42
-
raise Exception("Handle did not match DID: " + handle)
43
43
-
return did, handle, doc
44
44
-
if is_valid_did(atid):
45
45
-
did = atid
46
46
-
doc = resolve_did(did)
47
47
-
if not doc:
48
48
-
raise Exception("Failed to resolve DID: " + did)
49
49
-
handle = handle_from_doc(doc)
50
50
-
if not handle:
51
51
-
raise Exception("Handle did not match DID: " + handle)
52
52
-
if resolve_handle(handle) != did:
53
53
-
raise Exception("Handle did not match DID: " + handle)
54
54
-
return did, handle, doc
55
55
-
56
56
-
raise Exception("identifier not a handle or DID: " + atid)
57
57
-
58
58
-
59
59
-
def resolve_handle(handle: str) -> Optional[str]:
60
60
-
# first try TXT record
61
61
-
try:
62
62
-
for record in dns.resolver.resolve(f"_atproto.{handle}", "TXT"):
63
63
-
val = record.to_text().replace('"', "")
64
64
-
if val.startswith("did="):
65
65
-
val = val[4:]
66
66
-
if is_valid_did(val):
67
67
-
return val
68
68
-
except Exception:
69
69
-
pass
70
70
-
71
71
-
# then try HTTP well-known
72
72
-
# IMPORTANT: 'handle' domain is untrusted user input. SSRF mitigations are necessary
73
73
-
try:
74
74
-
with hardened_http.get_session() as sess:
75
75
-
resp = sess.get(f"https://{handle}/.well-known/atproto-did")
76
76
-
except Exception:
77
77
-
return None
78
78
-
79
79
-
if resp.status_code != 200:
80
80
-
return None
81
81
-
did = resp.text.split()[0]
82
82
-
if is_valid_did(did):
83
83
-
return did
84
84
-
return None
85
85
-
86
86
-
87
87
-
def resolve_did(did: str) -> Optional[dict]:
88
88
-
if did.startswith("did:plc:"):
89
89
-
# NOTE: 'did' is untrusted input, but has been validated by regex by this point
90
90
-
resp = requests.get(f"https://plc.directory/{did}")
91
91
-
if resp.status_code != 200:
92
92
-
return None
93
93
-
return resp.json()
94
94
-
95
95
-
if did.startswith("did:web:"):
96
96
-
domain = did[8:]
97
97
-
# IMPORTANT: domain is untrusted input. SSRF mitigations are necessary
98
98
-
# "handle" validation works to check that domain is a simple hostname
99
99
-
assert is_valid_handle(domain)
100
100
-
try:
101
101
-
with hardened_http.get_session() as sess:
102
102
-
resp = sess.get(f"https://{domain}/.well-known/did.json")
103
103
-
except requests.exceptions.ConnectionError:
104
104
-
return None
105
105
-
if resp.status_code != 200:
106
106
-
return None
107
107
-
return resp.json()
108
108
-
raise ValueError("unsupported DID type")
109
109
-
110
110
-
111
111
-
def pds_endpoint(doc: dict) -> str:
112
112
-
for svc in doc["service"]:
113
113
-
if svc["id"] == "#atproto_pds":
114
114
-
return svc["serviceEndpoint"]
115
115
-
raise Exception("PDS endpoint not found in DID document")
116
116
-
117
117
-
118
118
-
if __name__ == "__main__":
119
119
-
assert is_valid_did("did:web:example.com")
120
120
-
assert is_valid_did("did:plc:abc123")
121
121
-
assert is_valid_did("") is False
122
122
-
assert is_valid_did("did:asdfasdf") is False
123
123
-
handle = sys.argv[1]
124
124
-
if not is_valid_handle(handle):
125
125
-
print("invalid handle!")
126
126
-
sys.exit(-1)
127
127
-
assert handle is not None
128
128
-
did = resolve_handle(handle)
129
129
-
print(f"DID: {did}")
130
130
-
assert did is not None
131
131
-
doc = resolve_did(did)
132
132
-
print(doc)
133
133
-
resolve_identity(handle)
134
134
-
resolve_identity(did)
+87
-142
src/atproto/atproto_oauth.py
···
1
1
import sqlite3
2
2
-
from urllib.parse import urlparse
3
2
from typing import Any
4
3
import time
5
4
import json
···
9
8
from authlib.oauth2.rfc7636 import create_s256_code_challenge
10
9
from requests import Response
11
10
12
12
-
from ..types import OAuthAuthRequest, OAuthSession
13
13
-
14
14
-
from .atproto_security import is_safe_url, hardened_http
15
15
-
16
16
-
17
17
-
# Checks an Authorization Server metadata response against atproto OAuth requirements
18
18
-
def is_valid_authserver_meta(obj: dict[str, Any] | None, url: str) -> bool:
19
19
-
if obj is None:
20
20
-
return False
21
21
-
fetch_url = urlparse(url)
22
22
-
issuer_url = urlparse(obj["issuer"])
23
23
-
assert issuer_url.hostname == fetch_url.hostname
24
24
-
assert issuer_url.scheme == "https"
25
25
-
assert issuer_url.port is None
26
26
-
assert issuer_url.path in ["", "/"]
27
27
-
assert issuer_url.params == ""
28
28
-
assert issuer_url.fragment == ""
29
29
-
30
30
-
assert "code" in obj["response_types_supported"]
31
31
-
assert "authorization_code" in obj["grant_types_supported"]
32
32
-
assert "refresh_token" in obj["grant_types_supported"]
33
33
-
assert "S256" in obj["code_challenge_methods_supported"]
34
34
-
assert "none" in obj["token_endpoint_auth_methods_supported"]
35
35
-
assert "private_key_jwt" in obj["token_endpoint_auth_methods_supported"]
36
36
-
assert "ES256" in obj["token_endpoint_auth_signing_alg_values_supported"]
37
37
-
assert "atproto" in obj["scopes_supported"]
38
38
-
assert obj["authorization_response_iss_parameter_supported"] is True
39
39
-
assert obj["pushed_authorization_request_endpoint"] is not None
40
40
-
assert obj["require_pushed_authorization_requests"] is True
41
41
-
assert "ES256" in obj["dpop_signing_alg_values_supported"]
42
42
-
if "require_request_uri_registration" in obj:
43
43
-
assert obj["require_request_uri_registration"] is True
44
44
-
assert obj["client_id_metadata_document_supported"] is True
11
11
+
from . import fetch_authserver_meta
45
12
46
46
-
return True
13
13
+
from ..types import OAuthAuthRequest, OAuthSession
47
14
48
48
-
49
49
-
# Takes a Resource Server (PDS) URL, and tries to resolve it to an Authorization Server host/origin
50
50
-
def resolve_pds_authserver(url: str) -> str:
51
51
-
# IMPORTANT: PDS endpoint URL is untrusted input, SSRF mitigations are needed
52
52
-
assert is_safe_url(url)
53
53
-
with hardened_http.get_session() as sess:
54
54
-
resp = sess.get(f"{url}/.well-known/oauth-protected-resource")
55
55
-
resp.raise_for_status()
56
56
-
# Additionally check that status is exactly 200 (not just 2xx)
57
57
-
assert resp.status_code == 200
58
58
-
authserver_url = resp.json()["authorization_servers"][0]
59
59
-
return authserver_url
60
60
-
61
61
-
62
62
-
# Does an HTTP GET for Authorization Server (entryway) metadata, verify the contents, and return the metadata as a dict
63
63
-
# DEPRECATED: use atproto2.resolve_authserver_meta
64
64
-
def fetch_authserver_meta(url: str) -> dict[str, Any]:
65
65
-
# IMPORTANT: Authorization Server URL is untrusted input, SSRF mitigations are needed
66
66
-
assert is_safe_url(url)
67
67
-
with hardened_http.get_session() as sess:
68
68
-
resp = sess.get(f"{url}/.well-known/oauth-authorization-server")
69
69
-
resp.raise_for_status()
70
70
-
71
71
-
authserver_meta = resp.json()
72
72
-
# print("Auth Server Metadata: " + json.dumps(authserver_meta, indent=2))
73
73
-
assert is_valid_authserver_meta(authserver_meta, url)
74
74
-
return authserver_meta
75
75
-
76
76
-
77
77
-
def client_assertion_jwt(
78
78
-
client_id: str,
79
79
-
authserver_url: str,
80
80
-
client_secret_jwk: Key,
81
81
-
) -> str:
82
82
-
client_assertion = jwt.encode(
83
83
-
{"alg": "ES256", "kid": client_secret_jwk["kid"]},
84
84
-
{
85
85
-
"iss": client_id,
86
86
-
"sub": client_id,
87
87
-
"aud": authserver_url,
88
88
-
"jti": generate_token(),
89
89
-
"iat": int(time.time()),
90
90
-
},
91
91
-
client_secret_jwk,
92
92
-
).decode("utf-8")
93
93
-
return client_assertion
94
94
-
95
95
-
96
96
-
def authserver_dpop_jwt(
97
97
-
method: str,
98
98
-
url: str,
99
99
-
nonce: str,
100
100
-
dpop_private_jwk: Key,
101
101
-
) -> str:
102
102
-
dpop_pub_jwk = json.loads(dpop_private_jwk.as_json(is_private=False))
103
103
-
body = {
104
104
-
"jti": generate_token(),
105
105
-
"htm": method,
106
106
-
"htu": url,
107
107
-
"iat": int(time.time()),
108
108
-
"exp": int(time.time()) + 30,
109
109
-
}
110
110
-
if nonce:
111
111
-
body["nonce"] = nonce
112
112
-
dpop_proof = jwt.encode(
113
113
-
{"typ": "dpop+jwt", "alg": "ES256", "jwk": dpop_pub_jwk},
114
114
-
body,
115
115
-
dpop_private_jwk,
116
116
-
).decode("utf-8")
117
117
-
return dpop_proof
15
15
+
from ..security import is_safe_url, hardened_http
118
16
119
17
120
18
# Prepares and sends a pushed auth request (PAR) via HTTP POST to the Authorization Server.
···
138
36
code_challenge_method = "S256"
139
37
140
38
# Self-signed JWT using the private key declared in client metadata JWKS (confidential client)
141
141
-
client_assertion = client_assertion_jwt(
39
39
+
client_assertion = _client_assertion_jwt(
142
40
client_id, authserver_url, client_secret_jwk
143
41
)
144
42
145
43
# Create DPoP header JWT; we don't have a server Nonce yet
146
44
dpop_authserver_nonce = ""
147
147
-
dpop_proof = authserver_dpop_jwt(
45
45
+
dpop_proof = _authserver_dpop_jwt(
148
46
"POST", par_url, dpop_authserver_nonce, dpop_private_jwk
149
47
)
150
48
···
178
76
if resp.status_code == 400 and resp.json()["error"] == "use_dpop_nonce":
179
77
dpop_authserver_nonce = resp.headers["DPoP-Nonce"]
180
78
print(f"retrying with new auth server DPoP nonce: {dpop_authserver_nonce}")
181
181
-
dpop_proof = authserver_dpop_jwt(
79
79
+
dpop_proof = _authserver_dpop_jwt(
182
80
"POST", par_url, dpop_authserver_nonce, dpop_private_jwk
183
81
)
184
82
with hardened_http.get_session() as sess:
···
206
104
207
105
# Re-fetch server metadata
208
106
authserver_meta = fetch_authserver_meta(authserver_url)
107
107
+
if not authserver_meta:
108
108
+
raise Exception("missing authserver meta")
209
109
210
110
# Construct auth token request fields
211
111
client_id = f"{app_url}oauth/metadata"
212
112
redirect_uri = f"{app_url}oauth/callback"
213
113
214
114
# Self-signed JWT using the private key declared in client metadata JWKS (confidential client)
215
215
-
client_assertion = client_assertion_jwt(
115
115
+
client_assertion = _client_assertion_jwt(
216
116
client_id, authserver_url, client_secret_jwk
217
117
)
218
118
···
230
130
token_url = authserver_meta["token_endpoint"]
231
131
dpop_private_jwk = JsonWebKey.import_key(json.loads(auth_request.dpop_private_jwk))
232
132
dpop_authserver_nonce = auth_request.dpop_authserver_nonce
233
233
-
dpop_proof = authserver_dpop_jwt(
133
133
+
dpop_proof = _authserver_dpop_jwt(
234
134
"POST", token_url, dpop_authserver_nonce, dpop_private_jwk
235
135
)
236
136
···
244
144
dpop_authserver_nonce = resp.headers["DPoP-Nonce"]
245
145
print(f"retrying with new auth server DPoP nonce: {dpop_authserver_nonce}")
246
146
# print(server_nonce)
247
247
-
dpop_proof = authserver_dpop_jwt(
147
147
+
dpop_proof = _authserver_dpop_jwt(
248
148
"POST", token_url, dpop_authserver_nonce, dpop_private_jwk
249
149
)
250
150
with hardened_http.get_session() as sess:
···
270
170
271
171
# Re-fetch server metadata
272
172
authserver_meta = fetch_authserver_meta(authserver_url)
173
173
+
if not authserver_meta:
174
174
+
raise Exception("missing authserver meta")
273
175
274
176
# Construct token request fields
275
177
client_id = f"{app_url}oauth/metadata"
276
178
277
179
# Self-signed JWT using the private key declared in client metadata JWKS (confidential client)
278
278
-
client_assertion = client_assertion_jwt(
180
180
+
client_assertion = _client_assertion_jwt(
279
181
client_id, authserver_url, client_secret_jwk
280
182
)
281
183
···
291
193
token_url = authserver_meta["token_endpoint"]
292
194
dpop_private_jwk = JsonWebKey.import_key(json.loads(user.dpop_private_jwk))
293
195
dpop_authserver_nonce = user.dpop_authserver_nonce
294
294
-
dpop_proof = authserver_dpop_jwt(
196
196
+
dpop_proof = _authserver_dpop_jwt(
295
197
"POST", token_url, dpop_authserver_nonce, dpop_private_jwk
296
198
)
297
199
···
305
207
dpop_authserver_nonce = resp.headers["DPoP-Nonce"]
306
208
print(f"retrying with new auth server DPoP nonce: {dpop_authserver_nonce}")
307
209
# print(server_nonce)
308
308
-
dpop_proof = authserver_dpop_jwt(
210
210
+
dpop_proof = _authserver_dpop_jwt(
309
211
"POST", token_url, dpop_authserver_nonce, dpop_private_jwk
310
212
)
311
213
with hardened_http.get_session() as sess:
···
320
222
return token_body, dpop_authserver_nonce
321
223
322
224
323
323
-
def pds_dpop_jwt(
324
324
-
method: str,
325
325
-
url: str,
326
326
-
access_token: str | None,
327
327
-
nonce: str | None,
328
328
-
dpop_private_jwk: Key,
329
329
-
) -> str:
330
330
-
dpop_pub_jwk = json.loads(dpop_private_jwk.as_json(is_private=False))
331
331
-
body = {
332
332
-
"iat": int(time.time()),
333
333
-
"exp": int(time.time()) + 10,
334
334
-
"jti": generate_token(),
335
335
-
"htm": method,
336
336
-
"htu": url,
337
337
-
# PKCE S256 is same as DPoP ath hashing
338
338
-
"ath": create_s256_code_challenge(access_token),
339
339
-
}
340
340
-
if nonce:
341
341
-
body["nonce"] = nonce
342
342
-
dpop_proof = jwt.encode(
343
343
-
{"typ": "dpop+jwt", "alg": "ES256", "jwk": dpop_pub_jwk},
344
344
-
body,
345
345
-
dpop_private_jwk,
346
346
-
).decode("utf-8")
347
347
-
return dpop_proof
348
348
-
349
349
-
350
225
# Helper to demonstrate making a request (HTTP GET or POST) to the user's PDS ("Resource Server" in OAuth terminology) using DPoP and access token.
351
226
# This method returns a 'requests' reponse, without checking status code.
352
227
def pds_authed_req(
···
364
239
365
240
# Might need to retry request with a new nonce.
366
241
for i in range(2):
367
367
-
dpop_jwt = pds_dpop_jwt(
242
242
+
dpop_jwt = _pds_dpop_jwt(
368
243
"POST",
369
244
url,
370
245
access_token,
···
403
278
break
404
279
405
280
return response
281
281
+
282
282
+
283
283
+
def _client_assertion_jwt(
284
284
+
client_id: str,
285
285
+
authserver_url: str,
286
286
+
client_secret_jwk: Key,
287
287
+
) -> str:
288
288
+
client_assertion = jwt.encode(
289
289
+
{"alg": "ES256", "kid": client_secret_jwk["kid"]},
290
290
+
{
291
291
+
"iss": client_id,
292
292
+
"sub": client_id,
293
293
+
"aud": authserver_url,
294
294
+
"jti": generate_token(),
295
295
+
"iat": int(time.time()),
296
296
+
},
297
297
+
client_secret_jwk,
298
298
+
).decode("utf-8")
299
299
+
return client_assertion
300
300
+
301
301
+
302
302
+
def _authserver_dpop_jwt(
303
303
+
method: str,
304
304
+
url: str,
305
305
+
nonce: str,
306
306
+
dpop_private_jwk: Key,
307
307
+
) -> str:
308
308
+
dpop_pub_jwk = json.loads(dpop_private_jwk.as_json(is_private=False))
309
309
+
body = {
310
310
+
"jti": generate_token(),
311
311
+
"htm": method,
312
312
+
"htu": url,
313
313
+
"iat": int(time.time()),
314
314
+
"exp": int(time.time()) + 30,
315
315
+
}
316
316
+
if nonce:
317
317
+
body["nonce"] = nonce
318
318
+
dpop_proof = jwt.encode(
319
319
+
{"typ": "dpop+jwt", "alg": "ES256", "jwk": dpop_pub_jwk},
320
320
+
body,
321
321
+
dpop_private_jwk,
322
322
+
).decode("utf-8")
323
323
+
return dpop_proof
324
324
+
325
325
+
326
326
+
def _pds_dpop_jwt(
327
327
+
method: str,
328
328
+
url: str,
329
329
+
access_token: str | None,
330
330
+
nonce: str | None,
331
331
+
dpop_private_jwk: Key,
332
332
+
) -> str:
333
333
+
dpop_pub_jwk = json.loads(dpop_private_jwk.as_json(is_private=False))
334
334
+
body = {
335
335
+
"iat": int(time.time()),
336
336
+
"exp": int(time.time()) + 10,
337
337
+
"jti": generate_token(),
338
338
+
"htm": method,
339
339
+
"htu": url,
340
340
+
# PKCE S256 is same as DPoP ath hashing
341
341
+
"ath": create_s256_code_challenge(access_token),
342
342
+
}
343
343
+
if nonce:
344
344
+
body["nonce"] = nonce
345
345
+
dpop_proof = jwt.encode(
346
346
+
{"typ": "dpop+jwt", "alg": "ES256", "jwk": dpop_pub_jwk},
347
347
+
body,
348
348
+
dpop_private_jwk,
349
349
+
).decode("utf-8")
350
350
+
return dpop_proof
src/atproto/atproto_security.py
src/security.py
+34
src/atproto/validator.py
···
1
1
+
from typing import Any
2
2
+
from urllib.parse import urlparse
3
3
+
4
4
+
5
5
+
# Checks an Authorization Server metadata response against atproto OAuth requirements
6
6
+
def is_valid_authserver_meta(obj: dict[str, Any] | None, url: str) -> bool:
7
7
+
if obj is None:
8
8
+
return False
9
9
+
fetch_url = urlparse(url)
10
10
+
issuer_url = urlparse(obj["issuer"])
11
11
+
assert issuer_url.hostname == fetch_url.hostname
12
12
+
assert issuer_url.scheme == "https"
13
13
+
assert issuer_url.port is None
14
14
+
assert issuer_url.path in ["", "/"]
15
15
+
assert issuer_url.params == ""
16
16
+
assert issuer_url.fragment == ""
17
17
+
18
18
+
assert "code" in obj["response_types_supported"]
19
19
+
assert "authorization_code" in obj["grant_types_supported"]
20
20
+
assert "refresh_token" in obj["grant_types_supported"]
21
21
+
assert "S256" in obj["code_challenge_methods_supported"]
22
22
+
assert "none" in obj["token_endpoint_auth_methods_supported"]
23
23
+
assert "private_key_jwt" in obj["token_endpoint_auth_methods_supported"]
24
24
+
assert "ES256" in obj["token_endpoint_auth_signing_alg_values_supported"]
25
25
+
assert "atproto" in obj["scopes_supported"]
26
26
+
assert obj["authorization_response_iss_parameter_supported"] is True
27
27
+
assert obj["pushed_authorization_request_endpoint"] is not None
28
28
+
assert obj["require_pushed_authorization_requests"] is True
29
29
+
assert "ES256" in obj["dpop_signing_alg_values_supported"]
30
30
+
if "require_request_uri_registration" in obj:
31
31
+
assert obj["require_request_uri_registration"] is True
32
32
+
assert obj["client_id_metadata_document_supported"] is True
33
33
+
34
34
+
return True
+5
-4
src/oauth.py
···
4
4
5
5
import json
6
6
7
7
-
from .atproto.atproto_identity import is_valid_did, is_valid_handle
8
7
from .atproto.atproto_oauth import initial_token_request, send_par_auth_request
9
9
-
from .atproto.atproto_security import is_safe_url
10
8
from .atproto import (
9
9
+
is_valid_did,
10
10
+
is_valid_handle,
11
11
pds_endpoint_from_doc,
12
12
resolve_authserver_from_pds,
13
13
-
resolve_authserver_meta,
13
13
+
fetch_authserver_meta,
14
14
resolve_identity,
15
15
)
16
16
+
from .security import is_safe_url
16
17
from .types import OAuthAuthRequest
17
18
from .db import get_db
18
19
···
50
51
51
52
current_app.logger.debug(f"Authserver: {authserver_url}")
52
53
assert is_safe_url(authserver_url)
53
53
-
authserver_meta = resolve_authserver_meta(authserver_url)
54
54
+
authserver_meta = fetch_authserver_meta(authserver_url)
54
55
if not authserver_meta:
55
56
return "no authserver meta", 404
56
57
+1
-1
uv.lock
···
243
243
{ name = "dnspython", specifier = ">=2.8.0" },
244
244
{ name = "flask", extras = ["dotenv"], specifier = ">=3.1.2" },
245
245
{ name = "requests", specifier = ">=2.32" },
246
246
-
{ name = "requests-hardened", specifier = ">=1.0.0b3" },
246
246
+
{ name = "requests-hardened", specifier = ">=1.2.0" },
247
247
]
248
248
249
249
[[package]]