···11+[project]
22+name = "ligo-at"
33+version = "0.0.1"
44+description = "Decentralized links page on top of AT Protocol"
55+readme = "readme.md"
66+requires-python = ">=3.13"
77+dependencies = [
88+ "atproto>=0.0.62",
99+ "authlib>=1.3",
1010+ "dnspython>=2.8.0",
1111+ "flask[dotenv]>=3.1.2",
1212+ "requests>=2.32",
1313+ "requests-hardened>=1.0.0b3",
1414+]
+163
src/atproto2/__init__.py
···11+from dns.resolver import resolve as resolve_dns
22+from typing import Any
33+import requests
44+55+from .atproto_oauth import is_valid_authserver_meta
66+from .atproto_security import is_safe_url
77+from .atproto_identity import is_valid_did, is_valid_handle
88+99+PLC_DIRECTORY = "https://plc.directory"
1010+1111+AuthserverUrl = str
1212+PdsUrl = str
1313+DID = str
1414+1515+authservers: dict[PdsUrl, AuthserverUrl] = {}
1616+dids: dict[str, DID] = {}
1717+pdss: dict[DID, PdsUrl] = {}
1818+1919+2020+def resolve_identity(query: str) -> tuple[str, str, dict[str, Any]] | None:
2121+ """Resolves an identity to a DID, handle and DID document, verifies handles bi directionally."""
2222+2323+ if is_valid_handle(query):
2424+ handle = query
2525+ did = resolve_did_from_handle(handle)
2626+ if not did:
2727+ return None
2828+ doc = resolve_doc_from_did(did)
2929+ if not doc:
3030+ return None
3131+ handles = handles_from_doc(doc)
3232+ if not handles or handle not in handles:
3333+ return None
3434+ return (did, handle, doc)
3535+3636+ if is_valid_did(query):
3737+ # TODO: resolve did identity
3838+ return None
3939+4040+ return None
4141+4242+4343+def handles_from_doc(doc: dict[str, list[str]]) -> list[str]:
4444+ """Return all possible handles inside the DID document."""
4545+ handles: list[str] = []
4646+ for aka in doc.get("alsoKnownAs", []):
4747+ if aka.startswith("at://"):
4848+ handle = aka[5:]
4949+ if is_valid_handle(handle):
5050+ handles.append(handle)
5151+ return handles
5252+5353+5454+def handle_from_doc(doc: dict[str, list[str]]) -> str | None:
5555+ """Return the first handle inside the DID document."""
5656+ handles = handles_from_doc(doc)
5757+ try:
5858+ return handles[0]
5959+ except IndexError:
6060+ return None
6161+6262+6363+def resolve_did_from_handle(handle: str, reload: bool = False) -> str | None:
6464+ """Returns the DID for a given handle"""
6565+6666+ if handle in dids and not reload:
6767+ print(f"returning cached did for {handle}")
6868+ return dids[handle]
6969+7070+ answer = resolve_dns(f"_atproto.{handle}", "TXT")
7171+ for record in answer:
7272+ value = str(record).replace('"', "")
7373+ if value.startswith("did="):
7474+ did = value[4:]
7575+ if is_valid_did(did):
7676+ return did
7777+7878+ return None
7979+8080+8181+def pds_endpoint_from_doc(doc: dict[str, list[dict[str, str]]]) -> str | None:
8282+ """Returns the PDS endpoint from the DID document."""
8383+8484+ for service in doc.get("service", []):
8585+ if service.get("id") == "#atproto_pds":
8686+ return service.get("serviceEndpoint")
8787+ return None
8888+8989+9090+def resolve_pds_from_did(did: DID, reload: bool = False) -> PdsUrl | None:
9191+ if did in pdss and not reload:
9292+ print(f"returning cached pds for {did}")
9393+ return pdss[did]
9494+9595+ doc = resolve_doc_from_did(did)
9696+ if doc is None:
9797+ return None
9898+ pds = doc["service"][0]["serviceEndpoint"]
9999+ pdss[did] = pds
100100+ print(f"caching pds {pds} for {did}")
101101+ return pds
102102+103103+104104+def resolve_doc_from_did(
105105+ did: DID,
106106+ directory: str = PLC_DIRECTORY,
107107+) -> dict[str, Any] | None:
108108+ if did.startswith("did:plc:"):
109109+ response = requests.get(f"{directory}/{did}")
110110+ if response.ok:
111111+ return response.json()
112112+ return None
113113+114114+ if did.startswith("did:web:"):
115115+ # TODO: resolve did:web
116116+ return None
117117+118118+ return None
119119+120120+121121+def resolve_authserver_from_pds(
122122+ pds_url: PdsUrl,
123123+ reload: bool = False,
124124+) -> AuthserverUrl | None:
125125+ """Returns the authserver URL for the PDS."""
126126+127127+ if pds_url in authservers and not reload:
128128+ print(f"returning cached authserver for PDS {pds_url}")
129129+ return authservers[pds_url]
130130+131131+ assert is_safe_url(pds_url)
132132+ endpoint = f"{pds_url}/.well-known/oauth-protected-resource"
133133+ response = requests.get(endpoint)
134134+ if response.status_code != 200:
135135+ return None
136136+ parsed: dict[str, list[str]] = response.json()
137137+ authserver_url = parsed["authorization_servers"][0]
138138+ print(f"caching authserver {authserver_url} for PDS {pds_url}")
139139+ authservers[pds_url] = authserver_url
140140+ return authserver_url
141141+142142+143143+def resolve_authserver_meta(authserver_url: str) -> dict[str, str] | None:
144144+ """Returns metadata from the authserver"""
145145+ assert is_safe_url(authserver_url)
146146+ endpoint = f"{authserver_url}/.well-known/oauth-authorization-server"
147147+ meta = http_get_json(endpoint)
148148+ assert is_valid_authserver_meta(meta, authserver_url)
149149+ return meta
150150+151151+152152+def http_get_json(url: str) -> Any | None:
153153+ response = requests.get(url)
154154+ if response.ok:
155155+ return response.json()
156156+ return None
157157+158158+159159+def http_get(url: str) -> str | None:
160160+ response = requests.get(url)
161161+ if response.ok:
162162+ return response.text
163163+ return None
+134
src/atproto2/atproto_identity.py
···11+import re
22+import sys
33+import requests
44+import dns.resolver
55+from typing import Optional, Tuple
66+77+from .atproto_security import hardened_http
88+99+HANDLE_REGEX = r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$"
1010+DID_REGEX = r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$"
1111+1212+1313+def is_valid_handle(handle: str) -> bool:
1414+ return re.match(HANDLE_REGEX, handle) is not None
1515+1616+1717+def is_valid_did(did: str) -> bool:
1818+ return re.match(DID_REGEX, did) is not None
1919+2020+2121+def handle_from_doc(doc: dict) -> Optional[str]:
2222+ for aka in doc.get("alsoKnownAs", []):
2323+ if aka.startswith("at://"):
2424+ handle = aka[5:]
2525+ if is_valid_handle(handle):
2626+ return handle
2727+ return None
2828+2929+3030+# resolves an identity (handle or DID) to a DID, handle, and DID document. verifies handle bi-directionally.
3131+def resolve_identity(atid: str) -> Tuple[str, str, dict]:
3232+ if is_valid_handle(atid):
3333+ handle = atid
3434+ did = resolve_handle(handle)
3535+ if not did:
3636+ raise Exception("Failed to resolve handle: " + handle)
3737+ doc = resolve_did(did)
3838+ if not doc:
3939+ raise Exception("Failed to resolve DID: " + did)
4040+ doc_handle = handle_from_doc(doc)
4141+ if not doc_handle or doc_handle != handle:
4242+ raise Exception("Handle did not match DID: " + handle)
4343+ return did, handle, doc
4444+ if is_valid_did(atid):
4545+ did = atid
4646+ doc = resolve_did(did)
4747+ if not doc:
4848+ raise Exception("Failed to resolve DID: " + did)
4949+ handle = handle_from_doc(doc)
5050+ if not handle:
5151+ raise Exception("Handle did not match DID: " + handle)
5252+ if resolve_handle(handle) != did:
5353+ raise Exception("Handle did not match DID: " + handle)
5454+ return did, handle, doc
5555+5656+ raise Exception("identifier not a handle or DID: " + atid)
5757+5858+5959+def resolve_handle(handle: str) -> Optional[str]:
6060+ # first try TXT record
6161+ try:
6262+ for record in dns.resolver.resolve(f"_atproto.{handle}", "TXT"):
6363+ val = record.to_text().replace('"', "")
6464+ if val.startswith("did="):
6565+ val = val[4:]
6666+ if is_valid_did(val):
6767+ return val
6868+ except Exception:
6969+ pass
7070+7171+ # then try HTTP well-known
7272+ # IMPORTANT: 'handle' domain is untrusted user input. SSRF mitigations are necessary
7373+ try:
7474+ with hardened_http.get_session() as sess:
7575+ resp = sess.get(f"https://{handle}/.well-known/atproto-did")
7676+ except Exception:
7777+ return None
7878+7979+ if resp.status_code != 200:
8080+ return None
8181+ did = resp.text.split()[0]
8282+ if is_valid_did(did):
8383+ return did
8484+ return None
8585+8686+8787+def resolve_did(did: str) -> Optional[dict]:
8888+ if did.startswith("did:plc:"):
8989+ # NOTE: 'did' is untrusted input, but has been validated by regex by this point
9090+ resp = requests.get(f"https://plc.directory/{did}")
9191+ if resp.status_code != 200:
9292+ return None
9393+ return resp.json()
9494+9595+ if did.startswith("did:web:"):
9696+ domain = did[8:]
9797+ # IMPORTANT: domain is untrusted input. SSRF mitigations are necessary
9898+ # "handle" validation works to check that domain is a simple hostname
9999+ assert is_valid_handle(domain)
100100+ try:
101101+ with hardened_http.get_session() as sess:
102102+ resp = sess.get(f"https://{domain}/.well-known/did.json")
103103+ except requests.exceptions.ConnectionError:
104104+ return None
105105+ if resp.status_code != 200:
106106+ return None
107107+ return resp.json()
108108+ raise ValueError("unsupported DID type")
109109+110110+111111+def pds_endpoint(doc: dict) -> str:
112112+ for svc in doc["service"]:
113113+ if svc["id"] == "#atproto_pds":
114114+ return svc["serviceEndpoint"]
115115+ raise Exception("PDS endpoint not found in DID document")
116116+117117+118118+if __name__ == "__main__":
119119+ assert is_valid_did("did:web:example.com")
120120+ assert is_valid_did("did:plc:abc123")
121121+ assert is_valid_did("") is False
122122+ assert is_valid_did("did:asdfasdf") is False
123123+ handle = sys.argv[1]
124124+ if not is_valid_handle(handle):
125125+ print("invalid handle!")
126126+ sys.exit(-1)
127127+ assert handle is not None
128128+ did = resolve_handle(handle)
129129+ print(f"DID: {did}")
130130+ assert did is not None
131131+ doc = resolve_did(did)
132132+ print(doc)
133133+ resolve_identity(handle)
134134+ resolve_identity(did)
+394
src/atproto2/atproto_oauth.py
···11+from urllib.parse import urlparse
22+from typing import Any
33+import time
44+import json
55+from authlib.jose import JsonWebKey, Key
66+from authlib.common.security import generate_token
77+from authlib.jose import jwt
88+from authlib.oauth2.rfc7636 import create_s256_code_challenge
99+from requests import Response
1010+1111+from .atproto_security import is_safe_url, hardened_http
1212+1313+1414+# Checks an Authorization Server metadata response against atproto OAuth requirements
1515+def is_valid_authserver_meta(obj: dict[str, Any] | None, url: str) -> bool:
1616+ if obj is None:
1717+ return False
1818+ fetch_url = urlparse(url)
1919+ issuer_url = urlparse(obj["issuer"])
2020+ assert issuer_url.hostname == fetch_url.hostname
2121+ assert issuer_url.scheme == "https"
2222+ assert issuer_url.port is None
2323+ assert issuer_url.path in ["", "/"]
2424+ assert issuer_url.params == ""
2525+ assert issuer_url.fragment == ""
2626+2727+ assert "code" in obj["response_types_supported"]
2828+ assert "authorization_code" in obj["grant_types_supported"]
2929+ assert "refresh_token" in obj["grant_types_supported"]
3030+ assert "S256" in obj["code_challenge_methods_supported"]
3131+ assert "none" in obj["token_endpoint_auth_methods_supported"]
3232+ assert "private_key_jwt" in obj["token_endpoint_auth_methods_supported"]
3333+ assert "ES256" in obj["token_endpoint_auth_signing_alg_values_supported"]
3434+ assert "atproto" in obj["scopes_supported"]
3535+ assert obj["authorization_response_iss_parameter_supported"] is True
3636+ assert obj["pushed_authorization_request_endpoint"] is not None
3737+ assert obj["require_pushed_authorization_requests"] is True
3838+ assert "ES256" in obj["dpop_signing_alg_values_supported"]
3939+ if "require_request_uri_registration" in obj:
4040+ assert obj["require_request_uri_registration"] is True
4141+ assert obj["client_id_metadata_document_supported"] is True
4242+4343+ return True
4444+4545+4646+# Takes a Resource Server (PDS) URL, and tries to resolve it to an Authorization Server host/origin
4747+def resolve_pds_authserver(url: str) -> str:
4848+ # IMPORTANT: PDS endpoint URL is untrusted input, SSRF mitigations are needed
4949+ assert is_safe_url(url)
5050+ with hardened_http.get_session() as sess:
5151+ resp = sess.get(f"{url}/.well-known/oauth-protected-resource")
5252+ resp.raise_for_status()
5353+ # Additionally check that status is exactly 200 (not just 2xx)
5454+ assert resp.status_code == 200
5555+ authserver_url = resp.json()["authorization_servers"][0]
5656+ return authserver_url
5757+5858+5959+# Does an HTTP GET for Authorization Server (entryway) metadata, verify the contents, and return the metadata as a dict
6060+# DEPRECATED: use atproto2.resolve_authserver_meta
6161+def fetch_authserver_meta(url: str) -> dict[str, Any]:
6262+ # IMPORTANT: Authorization Server URL is untrusted input, SSRF mitigations are needed
6363+ assert is_safe_url(url)
6464+ with hardened_http.get_session() as sess:
6565+ resp = sess.get(f"{url}/.well-known/oauth-authorization-server")
6666+ resp.raise_for_status()
6767+6868+ authserver_meta = resp.json()
6969+ # print("Auth Server Metadata: " + json.dumps(authserver_meta, indent=2))
7070+ assert is_valid_authserver_meta(authserver_meta, url)
7171+ return authserver_meta
7272+7373+7474+def client_assertion_jwt(
7575+ client_id: str,
7676+ authserver_url: str,
7777+ client_secret_jwk: Key,
7878+) -> str:
7979+ client_assertion = jwt.encode(
8080+ {"alg": "ES256", "kid": client_secret_jwk["kid"]},
8181+ {
8282+ "iss": client_id,
8383+ "sub": client_id,
8484+ "aud": authserver_url,
8585+ "jti": generate_token(),
8686+ "iat": int(time.time()),
8787+ },
8888+ client_secret_jwk,
8989+ ).decode("utf-8")
9090+ return client_assertion
9191+9292+9393+def authserver_dpop_jwt(
9494+ method: str,
9595+ url: str,
9696+ nonce: str,
9797+ dpop_private_jwk: Key,
9898+) -> str:
9999+ dpop_pub_jwk = json.loads(dpop_private_jwk.as_json(is_private=False))
100100+ body = {
101101+ "jti": generate_token(),
102102+ "htm": method,
103103+ "htu": url,
104104+ "iat": int(time.time()),
105105+ "exp": int(time.time()) + 30,
106106+ }
107107+ if nonce:
108108+ body["nonce"] = nonce
109109+ dpop_proof = jwt.encode(
110110+ {"typ": "dpop+jwt", "alg": "ES256", "jwk": dpop_pub_jwk},
111111+ body,
112112+ dpop_private_jwk,
113113+ ).decode("utf-8")
114114+ return dpop_proof
115115+116116+117117+# Prepares and sends a pushed auth request (PAR) via HTTP POST to the Authorization Server.
118118+# Returns "state" id HTTP response on success, without checking HTTP response status
119119+def send_par_auth_request(
120120+ authserver_url: str,
121121+ authserver_meta: dict[str, str],
122122+ login_hint: str,
123123+ client_id: str,
124124+ redirect_uri: str,
125125+ scope: str,
126126+ client_secret_jwk: Key,
127127+ dpop_private_jwk: Key,
128128+) -> tuple[str, str, str, Response]:
129129+ par_url = authserver_meta["pushed_authorization_request_endpoint"]
130130+ state = generate_token()
131131+ pkce_verifier = generate_token(48)
132132+133133+ # Generate PKCE code_challenge, and use it for PAR request
134134+ code_challenge: str = create_s256_code_challenge(pkce_verifier)
135135+ code_challenge_method = "S256"
136136+137137+ # Self-signed JWT using the private key declared in client metadata JWKS (confidential client)
138138+ client_assertion = client_assertion_jwt(
139139+ client_id, authserver_url, client_secret_jwk
140140+ )
141141+142142+ # Create DPoP header JWT; we don't have a server Nonce yet
143143+ dpop_authserver_nonce = ""
144144+ dpop_proof = authserver_dpop_jwt(
145145+ "POST", par_url, dpop_authserver_nonce, dpop_private_jwk
146146+ )
147147+148148+ par_body: dict[str, str] = {
149149+ "response_type": "code",
150150+ "code_challenge": code_challenge,
151151+ "code_challenge_method": code_challenge_method,
152152+ "client_id": client_id,
153153+ "state": state,
154154+ "redirect_uri": redirect_uri,
155155+ "scope": scope,
156156+ "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
157157+ "client_assertion": client_assertion,
158158+ }
159159+ if login_hint:
160160+ par_body["login_hint"] = login_hint
161161+ # print(par_body)
162162+163163+ # IMPORTANT: Pushed Authorization Request URL is untrusted input, SSRF mitigations are needed
164164+ assert is_safe_url(par_url)
165165+ with hardened_http.get_session() as sess:
166166+ resp = sess.post(
167167+ par_url,
168168+ headers={
169169+ "Content-Type": "application/x-www-form-urlencoded",
170170+ "DPoP": dpop_proof,
171171+ },
172172+ data=par_body,
173173+ )
174174+175175+ # Handle DPoP missing/invalid nonce error by retrying with server-provided nonce
176176+ if resp.status_code == 400 and resp.json()["error"] == "use_dpop_nonce":
177177+ dpop_authserver_nonce = resp.headers["DPoP-Nonce"]
178178+ print(f"retrying with new auth server DPoP nonce: {dpop_authserver_nonce}")
179179+ dpop_proof = authserver_dpop_jwt(
180180+ "POST", par_url, dpop_authserver_nonce, dpop_private_jwk
181181+ )
182182+ with hardened_http.get_session() as sess:
183183+ resp = sess.post(
184184+ par_url,
185185+ headers={
186186+ "Content-Type": "application/x-www-form-urlencoded",
187187+ "DPoP": dpop_proof,
188188+ },
189189+ data=par_body,
190190+ )
191191+192192+ return pkce_verifier, state, dpop_authserver_nonce, resp
193193+194194+195195+# Completes the auth flow by sending an initial auth token request.
196196+# Returns token response (dict) and DPoP nonce (str)
197197+def initial_token_request(
198198+ auth_request: dict[str, str],
199199+ code: str,
200200+ app_url: str,
201201+ client_secret_jwk: Key,
202202+) -> tuple[dict[str, str], str]:
203203+ authserver_url = auth_request["authserver_iss"]
204204+205205+ # Re-fetch server metadata
206206+ authserver_meta = fetch_authserver_meta(authserver_url)
207207+208208+ # Construct auth token request fields
209209+ client_id = f"{app_url}oauth/metadata"
210210+ redirect_uri = f"{app_url}oauth/callback"
211211+212212+ # Self-signed JWT using the private key declared in client metadata JWKS (confidential client)
213213+ client_assertion = client_assertion_jwt(
214214+ client_id, authserver_url, client_secret_jwk
215215+ )
216216+217217+ params = {
218218+ "client_id": client_id,
219219+ "redirect_uri": redirect_uri,
220220+ "grant_type": "authorization_code",
221221+ "code": code,
222222+ "code_verifier": auth_request["pkce_verifier"],
223223+ "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
224224+ "client_assertion": client_assertion,
225225+ }
226226+227227+ # Create DPoP header JWT, using the existing DPoP signing key for this account/session
228228+ token_url = authserver_meta["token_endpoint"]
229229+ dpop_private_jwk = JsonWebKey.import_key(
230230+ json.loads(auth_request["dpop_private_jwk"])
231231+ )
232232+ dpop_authserver_nonce = auth_request["dpop_authserver_nonce"]
233233+ dpop_proof = authserver_dpop_jwt(
234234+ "POST", token_url, dpop_authserver_nonce, dpop_private_jwk
235235+ )
236236+237237+ # IMPORTANT: Token URL is untrusted input, SSRF mitigations are needed
238238+ assert is_safe_url(token_url)
239239+ with hardened_http.get_session() as sess:
240240+ resp = sess.post(token_url, data=params, headers={"DPoP": dpop_proof})
241241+242242+ # Handle DPoP missing/invalid nonce error by retrying with server-provided nonce
243243+ if resp.status_code == 400 and resp.json()["error"] == "use_dpop_nonce":
244244+ dpop_authserver_nonce = resp.headers["DPoP-Nonce"]
245245+ print(f"retrying with new auth server DPoP nonce: {dpop_authserver_nonce}")
246246+ # print(server_nonce)
247247+ dpop_proof = authserver_dpop_jwt(
248248+ "POST", token_url, dpop_authserver_nonce, dpop_private_jwk
249249+ )
250250+ with hardened_http.get_session() as sess:
251251+ resp = sess.post(token_url, data=params, headers={"DPoP": dpop_proof})
252252+253253+ token_body = resp.json()
254254+ print(token_body)
255255+256256+ resp.raise_for_status()
257257+258258+ # IMPORTANT: the 'sub' field must be verified against the original request by code calling this function.
259259+260260+ return token_body, dpop_authserver_nonce
261261+262262+263263+# Returns token response (dict) and DPoP nonce (str)
264264+def refresh_token_request(
265265+ user: dict,
266266+ app_url: str,
267267+ client_secret_jwk: Key,
268268+) -> tuple[dict[str, str], str]:
269269+ authserver_url = user["authserver_iss"]
270270+271271+ # Re-fetch server metadata
272272+ authserver_meta = fetch_authserver_meta(authserver_url)
273273+274274+ # Construct token request fields
275275+ client_id = f"{app_url}oauth/metadata"
276276+277277+ # Self-signed JWT using the private key declared in client metadata JWKS (confidential client)
278278+ client_assertion = client_assertion_jwt(
279279+ client_id, authserver_url, client_secret_jwk
280280+ )
281281+282282+ params = {
283283+ "client_id": client_id,
284284+ "grant_type": "refresh_token",
285285+ "refresh_token": user["refresh_token"],
286286+ "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
287287+ "client_assertion": client_assertion,
288288+ }
289289+290290+ # Create DPoP header JWT, using the existing DPoP signing key for this account/session
291291+ token_url = authserver_meta["token_endpoint"]
292292+ dpop_private_jwk = JsonWebKey.import_key(json.loads(user["dpop_private_jwk"]))
293293+ dpop_authserver_nonce = user["dpop_authserver_nonce"]
294294+ dpop_proof = authserver_dpop_jwt(
295295+ "POST", token_url, dpop_authserver_nonce, dpop_private_jwk
296296+ )
297297+298298+ # IMPORTANT: Token URL is untrusted input, SSRF mitigations are needed
299299+ assert is_safe_url(token_url)
300300+ with hardened_http.get_session() as sess:
301301+ resp = sess.post(token_url, data=params, headers={"DPoP": dpop_proof})
302302+303303+ # Handle DPoP missing/invalid nonce error by retrying with server-provided nonce
304304+ if resp.status_code == 400 and resp.json()["error"] == "use_dpop_nonce":
305305+ dpop_authserver_nonce = resp.headers["DPoP-Nonce"]
306306+ print(f"retrying with new auth server DPoP nonce: {dpop_authserver_nonce}")
307307+ # print(server_nonce)
308308+ dpop_proof = authserver_dpop_jwt(
309309+ "POST", token_url, dpop_authserver_nonce, dpop_private_jwk
310310+ )
311311+ with hardened_http.get_session() as sess:
312312+ resp = sess.post(token_url, data=params, headers={"DPoP": dpop_proof})
313313+314314+ if resp.status_code not in [200, 201]:
315315+ print(f"Token Refresh Error: {resp.json()}")
316316+317317+ resp.raise_for_status()
318318+ token_body = resp.json()
319319+320320+ return token_body, dpop_authserver_nonce
321321+322322+323323+def pds_dpop_jwt(
324324+ method: str,
325325+ url: str,
326326+ access_token: str,
327327+ nonce: str,
328328+ dpop_private_jwk: Key,
329329+) -> str:
330330+ dpop_pub_jwk = json.loads(dpop_private_jwk.as_json(is_private=False))
331331+ body = {
332332+ "iat": int(time.time()),
333333+ "exp": int(time.time()) + 10,
334334+ "jti": generate_token(),
335335+ "htm": method,
336336+ "htu": url,
337337+ # PKCE S256 is same as DPoP ath hashing
338338+ "ath": create_s256_code_challenge(access_token),
339339+ }
340340+ if nonce:
341341+ body["nonce"] = nonce
342342+ dpop_proof = jwt.encode(
343343+ {"typ": "dpop+jwt", "alg": "ES256", "jwk": dpop_pub_jwk},
344344+ body,
345345+ dpop_private_jwk,
346346+ ).decode("utf-8")
347347+ return dpop_proof
348348+349349+350350+# Helper to demonstrate making a request (HTTP GET or POST) to the user's PDS ("Resource Server" in OAuth terminology) using DPoP and access token.
351351+# This method returns a 'requests' reponse, without checking status code.
352352+def pds_authed_req(method: str, url: str, user: dict, db: Any, body=None) -> Any:
353353+ dpop_private_jwk = JsonWebKey.import_key(json.loads(user["dpop_private_jwk"]))
354354+ dpop_pds_nonce = user["dpop_pds_nonce"]
355355+ access_token = user["access_token"]
356356+357357+ # Might need to retry request with a new nonce.
358358+ for i in range(2):
359359+ dpop_jwt = pds_dpop_jwt(
360360+ "POST",
361361+ url,
362362+ access_token,
363363+ dpop_pds_nonce,
364364+ dpop_private_jwk,
365365+ )
366366+367367+ with hardened_http.get_session() as sess:
368368+ resp = sess.post(
369369+ url,
370370+ headers={
371371+ "Authorization": f"DPoP {access_token}",
372372+ "DPoP": dpop_jwt,
373373+ },
374374+ json=body,
375375+ )
376376+377377+ # If we got a new server-provided DPoP nonce, store it in database and retry.
378378+ # NOTE: the type of error might also be communicated in the `WWW-Authenticate` HTTP response header.
379379+ if resp.status_code in [400, 401] and resp.json()["error"] == "use_dpop_nonce":
380380+ # print(resp.headers)
381381+ dpop_pds_nonce = resp.headers["DPoP-Nonce"]
382382+ print(f"retrying with new PDS DPoP nonce: {dpop_pds_nonce}")
383383+ # update session database with new nonce
384384+ cur = db.cursor()
385385+ cur.execute(
386386+ "UPDATE oauth_session SET dpop_pds_nonce = ? WHERE did = ?;",
387387+ [dpop_pds_nonce, user["did"]],
388388+ )
389389+ db.commit()
390390+ cur.close()
391391+ continue
392392+ break
393393+394394+ return resp
+41
src/atproto2/atproto_security.py
···11+from urllib.parse import urlparse
22+import requests_hardened
33+44+55+# this is a crude/partial filter that looks at HTTPS URLs and checks if they seem "safe" for server-side requests (SSRF). This is only a partial mitigation, the actual HTTP client also needs to prevent other attacks and behaviors.
66+# this isn't a fully complete or secure implementation
77+def is_safe_url(url: str) -> bool:
88+ parts = urlparse(url)
99+ if not (
1010+ parts.scheme == "https"
1111+ and parts.hostname is not None
1212+ and parts.hostname == parts.netloc
1313+ and parts.username is None
1414+ and parts.password is None
1515+ and parts.port is None
1616+ ):
1717+ return False
1818+1919+ segments = parts.hostname.split(".")
2020+ if not (
2121+ len(segments) >= 2
2222+ and segments[-1] not in ["local", "arpa", "internal", "localhost"]
2323+ ):
2424+ return False
2525+2626+ if segments[-1].isdigit():
2727+ return False
2828+2929+ return True
3030+3131+3232+# configures a "hardened" requests wrapper
3333+hardened_http = requests_hardened.Manager(
3434+ requests_hardened.Config(
3535+ default_timeout=(2, 10),
3636+ never_redirect=True,
3737+ ip_filter_enable=True,
3838+ ip_filter_allow_loopback_ips=False,
3939+ user_agent_override="AtprotoCookbookOAuthFlaskDemo",
4040+ )
4141+)
+6-48
src/main.py
···66from urllib import request as http_request
77import json
8899+from .atproto2 import resolve_did_from_handle, resolve_pds_from_did
1010+from .oauth import oauth
1111+912app = Flask(__name__)
1013_ = app.config.from_prefixed_env()
1414+app.register_blueprint(oauth)
11151216pdss: dict[str, str] = {}
1317dids: dict[str, str] = {}
1418links: dict[str, list[dict[str, str]]] = {}
1519profiles: dict[str, tuple[str, str]] = {}
16201717-PLC_DIRECTORY = "https://plc.directory"
1821SCHEMA = "one.nauta"
19222023···187190 return profile, from_bluesky
188191189192190190-def resolve_pds_from_did(did: str, reload: bool = False) -> str | None:
191191- if did in pdss and not reload:
192192- app.logger.debug(f"returning cached pds for {did}")
193193- return pdss[did]
194194-195195- response = http_get(f"{PLC_DIRECTORY}/{did}")
196196- if response is None:
197197- return None
198198- parsed = json.loads(response)
199199- pds = parsed["service"][0]["serviceEndpoint"]
200200- pdss[did] = pds
201201- app.logger.debug(f"caching pds {pds} for {did}")
202202- return pds
203203-204204-205205-def resolve_did_from_handle(handle: str, reload: bool = False) -> str | None:
206206- if handle in dids and not reload:
207207- app.logger.debug(f"returning cached did for {handle}")
208208- return dids[handle]
209209-210210- response = http_get(f"https://dns.google/resolve?name=_atproto.{handle}&type=TXT")
211211- if response is None:
212212- return None
213213- parsed = json.loads(response)
214214- answers = parsed["Answer"]
215215- if len(answers) < 1:
216216- return handle
217217- data: str = answers[0]["data"]
218218- if not data.startswith("did="):
219219- return handle
220220- did = data[4:]
221221- dids[handle] = did
222222- app.logger.debug(f"caching did {did} for {handle}")
223223- return did
224224-225225-226193def get_record(pds: str, repo: str, collection: str, record: str) -> str | None:
227194 response = http_get(
228195 f"{pds}/xrpc/com.atproto.repo.getRecord?repo={repo}&collection={collection}&rkey={record}"
···263230@app.post("/auth/login")
264231def auth_login():
265232 handle = request.form.get("handle")
266266- password = request.form.get("password")
267267- if not handle or not password:
233233+ if not handle:
268234 return redirect("/login", 303)
269235 if handle.startswith("@"):
270236 handle = handle[1:]
271271- session_string: str | None
272272- try:
273273- client = Client()
274274- _ = client.login(handle, password)
275275- session_string = client.export_session_string()
276276- except AtProtocolError:
277277- return redirect("/login", 303)
278278- session["session"] = session_string
279279- return redirect("/editor", code=303)
237237+ return redirect(app.url_for("oauth.oauth_start", username=handle))