decentralized and customizable links page on top of atproto ligo.at
atproto link-in-bio python uv
at 1b7c0778f94de667d002f803fdb1fcc64b2efd23 238 lines 6.6 kB view raw
1from aiodns import DNSResolver, error as dns_error 2from aiohttp.client import ClientSession 3from asyncio import tasks 4from os import getenv 5from re import match as regex_match 6from typing import Any 7 8from .kv import KV, nokv 9from .validator import is_valid_authserver_meta 10from ..security import is_safe_url 11 12PLC_DIRECTORY = getenv("PLC_DIRECTORY_URL") or "https://plc.directory" 13HANDLE_REGEX = r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$" 14DID_REGEX = r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$" 15 16 17type AuthserverUrl = str 18type PdsUrl = str 19type DID = str 20 21 22def is_valid_handle(handle: str) -> bool: 23 return regex_match(HANDLE_REGEX, handle) is not None 24 25 26def is_valid_did(did: str) -> bool: 27 return regex_match(DID_REGEX, did) is not None 28 29 30async def resolve_identity( 31 client: ClientSession, 32 query: str, 33 didkv: KV = nokv, 34) -> tuple[str, str, dict[str, Any]] | None: 35 """Resolves an identity to a DID, handle and DID document, verifies handles bi directionally.""" 36 37 if is_valid_handle(query): 38 handle = query.lower() 39 did = await resolve_did_from_handle(client, handle, didkv) 40 if not did: 41 return None 42 doc = await resolve_doc_from_did(client, did) 43 if not doc: 44 return None 45 doc_handle = handle_from_doc(doc) 46 if not doc_handle or doc_handle != handle: 47 return None 48 return (did, handle, doc) 49 50 if is_valid_did(query): 51 did = query 52 doc = await resolve_doc_from_did(client, did) 53 if not doc: 54 return None 55 handle = handle_from_doc(doc) 56 if not handle: 57 return None 58 if await resolve_did_from_handle(client, handle, didkv) != did: 59 return None 60 return (did, handle, doc) 61 62 return None 63 64 65def handle_from_doc(doc: dict[str, list[str]]) -> str | None: 66 """Return all possible handles inside the DID document.""" 67 68 for aka in doc.get("alsoKnownAs", []): 69 if aka.startswith("at://"): 70 handle = aka[5:].lower() 71 if is_valid_handle(handle): 72 return handle 73 return None 74 75 76async def resolve_did_from_handle( 77 client: ClientSession, 78 handle: str, 79 kv: KV = nokv, 80 reload: bool = False, 81) -> str | None: 82 """Returns the DID for a given handle.""" 83 84 if not is_valid_handle(handle): 85 return None 86 87 did = kv.get(handle) 88 if did is not None and not reload: 89 return did 90 91 did = await _resolve_did_from_handle_dns(handle) 92 if did is None: 93 did = await _resolve_did_from_handle_wk(client, handle) 94 95 if did is not None and is_valid_did(did): 96 kv.set(handle, value=did) 97 return did 98 99 return None 100 101 102async def _resolve_did_from_handle_wk(client: ClientSession, handle: str) -> str | None: 103 """Resolve the DID for a given handle via .well-known""" 104 105 url = f"https://{handle}/.well-known/atproto-did" 106 response = await client.get(url) 107 if response.ok: 108 return await response.text() 109 return None 110 111 112async def _resolve_did_from_handle_dns(handle: str) -> str | None: 113 """Resolve the DID for a given handle via DNS.""" 114 115 async with DNSResolver() as resolver: 116 try: 117 result = await resolver.query(f"_atproto.{handle}", "TXT") 118 except dns_error.DNSError: 119 return None 120 121 for record in result: 122 value = str(record.text).strip('"') 123 if value.startswith("did="): 124 did = value[4:] 125 if is_valid_did(did): 126 return did 127 128 return None 129 130 131def pds_endpoint_from_doc(doc: dict[str, list[dict[str, str]]]) -> str | None: 132 """Returns the PDS endpoint from the DID document.""" 133 134 for service in doc.get("service", []): 135 if service.get("id") == "#atproto_pds": 136 return service.get("serviceEndpoint") 137 return None 138 139 140async def resolve_pds_from_did( 141 client: ClientSession, 142 did: DID, 143 kv: KV = nokv, 144 reload: bool = False, 145) -> PdsUrl | None: 146 pds = kv.get(did) 147 if pds is not None and not reload: 148 return pds 149 150 doc = await resolve_doc_from_did(client, did) 151 if doc is None: 152 return None 153 pds = doc["service"][0]["serviceEndpoint"] 154 if pds is None: 155 return None 156 kv.set(did, value=pds) 157 return pds 158 159 160async def resolve_doc_from_did( 161 client: ClientSession, 162 did: DID, 163) -> dict[str, Any] | None: 164 """Returns the DID document""" 165 166 if did.startswith("did:plc:"): 167 response = await client.get(f"{PLC_DIRECTORY}/{did}") 168 if response.ok: 169 return await response.json() 170 return None 171 172 if did.startswith("did:web:"): 173 # TODO: resolve did:web 174 raise Exception("resolve did:web") 175 176 return None 177 178 179async def resolve_authserver_from_pds( 180 client: ClientSession, 181 pds_url: PdsUrl, 182 kv: KV = nokv, 183 reload: bool = False, 184) -> AuthserverUrl | None: 185 """Returns the authserver URL for the PDS.""" 186 187 authserver_url = kv.get(pds_url) 188 if authserver_url is not None and not reload: 189 return authserver_url 190 191 assert is_safe_url(pds_url) 192 endpoint = f"{pds_url}/.well-known/oauth-protected-resource" 193 response = await client.get(endpoint) 194 if response.status != 200: 195 return None 196 parsed: dict[str, list[str]] = await response.json() 197 authserver_url = parsed["authorization_servers"][0] 198 kv.set(pds_url, value=authserver_url) 199 return authserver_url 200 201 202async def fetch_authserver_meta( 203 client: ClientSession, 204 authserver_url: str, 205) -> dict[str, str] | None: 206 """Returns metadata from the authserver""" 207 208 assert is_safe_url(authserver_url) 209 endpoint = f"{authserver_url}/.well-known/oauth-authorization-server" 210 response = await client.get(endpoint) 211 if not response.ok: 212 return None 213 meta: dict[str, Any] = await response.json() 214 assert is_valid_authserver_meta(meta, authserver_url) 215 return meta 216 217 218async def get_record( 219 client: ClientSession, 220 pds: str, 221 repo: str, 222 collection: str, 223 record: str, 224 type: str | None = None, 225) -> dict[str, Any] | None: 226 """Retrieve record from PDS. Verifies type is the same as collection name.""" 227 228 params = {"repo": repo, "collection": collection, "rkey": record} 229 response = await client.get(f"{pds}/xrpc/com.atproto.repo.getRecord", params=params) 230 if not response.ok: 231 return None 232 parsed = await response.json() 233 value: dict[str, Any] = parsed["value"] 234 if value["$type"] != (type or collection): 235 return None 236 del value["$type"] 237 238 return value