decentralized and customizable links page on top of atproto
ligo.at
atproto
link-in-bio
python
uv
1from aiodns import DNSResolver, error as dns_error
2from aiohttp.client import ClientSession
3from asyncio import tasks
4from os import getenv
5from re import match as regex_match
6from typing import Any
7
8from .kv import KV, nokv
9from .validator import is_valid_authserver_meta
10from ..security import is_safe_url
11
12PLC_DIRECTORY = getenv("PLC_DIRECTORY_URL") or "https://plc.directory"
13HANDLE_REGEX = r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$"
14DID_REGEX = r"^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$"
15
16
17type AuthserverUrl = str
18type PdsUrl = str
19type DID = str
20
21
22def is_valid_handle(handle: str) -> bool:
23 return regex_match(HANDLE_REGEX, handle) is not None
24
25
26def is_valid_did(did: str) -> bool:
27 return regex_match(DID_REGEX, did) is not None
28
29
30async def resolve_identity(
31 client: ClientSession,
32 query: str,
33 didkv: KV = nokv,
34) -> tuple[str, str, dict[str, Any]] | None:
35 """Resolves an identity to a DID, handle and DID document, verifies handles bi directionally."""
36
37 if is_valid_handle(query):
38 handle = query.lower()
39 did = await resolve_did_from_handle(client, handle, didkv)
40 if not did:
41 return None
42 doc = await resolve_doc_from_did(client, did)
43 if not doc:
44 return None
45 doc_handle = handle_from_doc(doc)
46 if not doc_handle or doc_handle != handle:
47 return None
48 return (did, handle, doc)
49
50 if is_valid_did(query):
51 did = query
52 doc = await resolve_doc_from_did(client, did)
53 if not doc:
54 return None
55 handle = handle_from_doc(doc)
56 if not handle:
57 return None
58 if await resolve_did_from_handle(client, handle, didkv) != did:
59 return None
60 return (did, handle, doc)
61
62 return None
63
64
65def handle_from_doc(doc: dict[str, list[str]]) -> str | None:
66 """Return all possible handles inside the DID document."""
67
68 for aka in doc.get("alsoKnownAs", []):
69 if aka.startswith("at://"):
70 handle = aka[5:].lower()
71 if is_valid_handle(handle):
72 return handle
73 return None
74
75
76async def resolve_did_from_handle(
77 client: ClientSession,
78 handle: str,
79 kv: KV = nokv,
80 reload: bool = False,
81) -> str | None:
82 """Returns the DID for a given handle."""
83
84 if not is_valid_handle(handle):
85 return None
86
87 did = kv.get(handle)
88 if did is not None and not reload:
89 return did
90
91 did = await _resolve_did_from_handle_dns(handle)
92 if did is None:
93 did = await _resolve_did_from_handle_wk(client, handle)
94
95 if did is not None and is_valid_did(did):
96 kv.set(handle, value=did)
97 return did
98
99 return None
100
101
102async def _resolve_did_from_handle_wk(client: ClientSession, handle: str) -> str | None:
103 """Resolve the DID for a given handle via .well-known"""
104
105 url = f"https://{handle}/.well-known/atproto-did"
106 response = await client.get(url)
107 if response.ok:
108 return await response.text()
109 return None
110
111
112async def _resolve_did_from_handle_dns(handle: str) -> str | None:
113 """Resolve the DID for a given handle via DNS."""
114
115 async with DNSResolver() as resolver:
116 try:
117 result = await resolver.query(f"_atproto.{handle}", "TXT")
118 except dns_error.DNSError:
119 return None
120
121 for record in result:
122 value = str(record.text).strip('"')
123 if value.startswith("did="):
124 did = value[4:]
125 if is_valid_did(did):
126 return did
127
128 return None
129
130
131def pds_endpoint_from_doc(doc: dict[str, list[dict[str, str]]]) -> str | None:
132 """Returns the PDS endpoint from the DID document."""
133
134 for service in doc.get("service", []):
135 if service.get("id") == "#atproto_pds":
136 return service.get("serviceEndpoint")
137 return None
138
139
140async def resolve_pds_from_did(
141 client: ClientSession,
142 did: DID,
143 kv: KV = nokv,
144 reload: bool = False,
145) -> PdsUrl | None:
146 pds = kv.get(did)
147 if pds is not None and not reload:
148 return pds
149
150 doc = await resolve_doc_from_did(client, did)
151 if doc is None:
152 return None
153 pds = doc["service"][0]["serviceEndpoint"]
154 if pds is None:
155 return None
156 kv.set(did, value=pds)
157 return pds
158
159
160async def resolve_doc_from_did(
161 client: ClientSession,
162 did: DID,
163) -> dict[str, Any] | None:
164 """Returns the DID document"""
165
166 if did.startswith("did:plc:"):
167 response = await client.get(f"{PLC_DIRECTORY}/{did}")
168 if response.ok:
169 return await response.json()
170 return None
171
172 if did.startswith("did:web:"):
173 # TODO: resolve did:web
174 raise Exception("resolve did:web")
175
176 return None
177
178
179async def resolve_authserver_from_pds(
180 client: ClientSession,
181 pds_url: PdsUrl,
182 kv: KV = nokv,
183 reload: bool = False,
184) -> AuthserverUrl | None:
185 """Returns the authserver URL for the PDS."""
186
187 authserver_url = kv.get(pds_url)
188 if authserver_url is not None and not reload:
189 return authserver_url
190
191 assert is_safe_url(pds_url)
192 endpoint = f"{pds_url}/.well-known/oauth-protected-resource"
193 response = await client.get(endpoint)
194 if response.status != 200:
195 return None
196 parsed: dict[str, list[str]] = await response.json()
197 authserver_url = parsed["authorization_servers"][0]
198 kv.set(pds_url, value=authserver_url)
199 return authserver_url
200
201
202async def fetch_authserver_meta(
203 client: ClientSession,
204 authserver_url: str,
205) -> dict[str, str] | None:
206 """Returns metadata from the authserver"""
207
208 assert is_safe_url(authserver_url)
209 endpoint = f"{authserver_url}/.well-known/oauth-authorization-server"
210 response = await client.get(endpoint)
211 if not response.ok:
212 return None
213 meta: dict[str, Any] = await response.json()
214 assert is_valid_authserver_meta(meta, authserver_url)
215 return meta
216
217
218async def get_record(
219 client: ClientSession,
220 pds: str,
221 repo: str,
222 collection: str,
223 record: str,
224 type: str | None = None,
225) -> dict[str, Any] | None:
226 """Retrieve record from PDS. Verifies type is the same as collection name."""
227
228 params = {"repo": repo, "collection": collection, "rkey": record}
229 response = await client.get(f"{pds}/xrpc/com.atproto.repo.getRecord", params=params)
230 if not response.ok:
231 return None
232 parsed = await response.json()
233 value: dict[str, Any] = parsed["value"]
234 if value["$type"] != (type or collection):
235 return None
236 del value["$type"]
237
238 return value