decentralized and customizable links page on top of atproto ligo.at
atproto link-in-bio python uv
at main 389 lines 11 kB view raw
1import asyncio 2import json 3from typing import Any, NamedTuple, cast 4 5from aiohttp.client import ClientSession 6from flask import Flask, g, redirect, render_template, request, session, url_for 7from flask_htmx import HTMX 8from flask_htmx import make_response as htmx_response 9 10from src.atproto import ( 11 get_record, 12 is_valid_did, 13 resolve_did_from_handle, 14 resolve_pds_from_did, 15) 16from src.atproto.oauth import pds_authed_req 17from src.atproto.types import DID, Handle, OAuthSession, PdsUrl 18from src.auth import ( 19 get_auth_session, 20 refresh_auth_session, 21 save_auth_session, 22) 23from src.db import KV, close_db_connection, get_db, init_db 24from src.oauth import oauth 25 26app = Flask(__name__) 27_ = app.config.from_prefixed_env() 28app.register_blueprint(oauth) 29htmx = HTMX() 30htmx.init_app(app) 31init_db(app) 32 33 34@app.before_request 35async def load_user_to_context(): 36 g.user = get_auth_session(session) 37 38 39async def get_user() -> OAuthSession | None: 40 user = cast(OAuthSession | None, g.user) 41 if user is not None and user.is_expired(): 42 async with ClientSession() as client: 43 user = await refresh_auth_session(session, client, user) 44 return user 45 46 47@app.teardown_appcontext 48async def app_teardown(exception: BaseException | None): 49 close_db_connection(exception) 50 51 52@app.get("/") 53def page_home(): 54 return render_template("index.html") 55 56 57@app.get("/<string:atid>") 58async def page_profile(atid: str): 59 reload = request.args.get("reload") is not None 60 61 db = get_db(app) 62 didkv = KV[Handle, DID](db, app.logger, "did_from_handle") 63 pdskv = KV[DID, PdsUrl](db, app.logger, "pds_from_did") 64 65 async with ClientSession() as client: 66 if atid.startswith("@"): 67 handle = atid[1:].lower() 68 did = await resolve_did_from_handle(client, handle, kv=didkv, reload=reload) 69 if did is None: 70 return render_template("error.html", message="did not found"), 404 71 elif is_valid_did(atid): 72 handle = None 73 did = atid 74 else: 75 return render_template("error.html", message="invalid did or handle"), 400 76 77 if _is_did_blocked(did): 78 return render_template("error.html", message="profile not found"), 404 79 80 pds = await resolve_pds_from_did(client, did=did, kv=pdskv, reload=reload) 81 if pds is None: 82 return render_template("error.html", message="pds not found"), 404 83 (profile, _), link_sections = await asyncio.gather( 84 load_profile(client, pds, did, reload=reload), 85 load_links(client, pds, did, reload=reload), 86 ) 87 if profile is None or link_sections is None: 88 return render_template("error.html", message="profile not found"), 404 89 90 if reload: 91 # remove the ?reload parameter 92 return redirect(request.path) 93 94 if handle: 95 profile["handle"] = handle 96 athref = f"at://{did}/at.ligo.actor.links/self" 97 return render_template( 98 "profile.html", 99 profile=profile, 100 links=link_sections[0].links, 101 sections=link_sections, 102 athref=athref, 103 ) 104 105 106class AuthServer(NamedTuple): 107 name: str 108 url: str 109 110 111auth_servers: list[AuthServer] = [ 112 AuthServer("Bluesky", "https://bsky.social"), 113 AuthServer("Blacksky", "https://blacksky.app"), 114 AuthServer("Northsky", "https://northsky.social"), 115 AuthServer("tangled.org", "https://tngl.sh"), 116 AuthServer("Witchraft Systems", "https://pds.witchcraft.systems"), 117 AuthServer("selfhosted.social", "https://selfhosted.social"), 118] 119 120if app.debug: 121 auth_servers.append(AuthServer("pds.rip", "https://pds.rip")) 122 123 124@app.get("/login") 125async def page_login(): 126 if await get_user() is not None: 127 return redirect("/editor") 128 return render_template("login.html", auth_servers=auth_servers) 129 130 131@app.post("/login") 132def auth_login(): 133 value = request.form.get("username") or request.form.get("authserver") 134 if value and value[0] == "@": 135 value = value[1:] 136 if not value: 137 return redirect(url_for("page_login"), 303) 138 return redirect(url_for("oauth.oauth_start", username_or_authserver=value), 303) 139 140 141@app.route("/auth/logout") 142def auth_logout(): 143 session.clear() 144 return redirect(url_for("page_login"), 303) 145 146 147@app.get("/editor") 148async def page_editor(): 149 user = await get_user() 150 if user is None: 151 return redirect("/login", 302) 152 153 did: str = user.did 154 pds: str = user.pds_url 155 handle: str | None = user.handle 156 157 async with ClientSession() as client: 158 (profile, from_bluesky), link_sections = await asyncio.gather( 159 load_profile(client, pds, did), 160 load_links(client, pds, did), 161 ) 162 163 links = [] 164 if link_sections: 165 links = link_sections[0].links 166 167 return render_template( 168 "editor.html", 169 handle=handle, 170 profile=profile, 171 profile_from_bluesky=from_bluesky, 172 links=json.dumps(links), 173 ) 174 175 176@app.post("/editor/profile") 177async def post_editor_profile(): 178 user = await get_user() 179 if user is None: 180 url = url_for("auth_logout") 181 return htmx_response(redirect=url) if htmx else redirect(url, 303) 182 183 display_name = request.form.get("displayName") 184 description = request.form.get("description", "") 185 if not display_name: 186 return redirect("/editor", 303) 187 188 record = { 189 "$type": "at.ligo.actor.profile", 190 "displayName": display_name, 191 "description": description, 192 } 193 194 success = await put_record( 195 user=user, 196 pds=user.pds_url, 197 repo=user.did, 198 collection="at.ligo.actor.profile", 199 rkey="self", 200 record=record, 201 ) 202 203 if success: 204 kv = KV(app, app.logger, "profile_from_did") 205 kv.set(user.did, json.dumps(record)) 206 else: 207 app.logger.warning("log out user for now") 208 url = url_for("auth_logout") 209 return htmx_response(redirect=url) if htmx else redirect(url, 303) 210 211 if htmx: 212 return htmx_response( 213 render_template("_editor_profile.html", profile=record), 214 reswap="outerHTML", 215 ) 216 217 return redirect(url_for("page_editor"), 303) 218 219 220@app.post("/editor/links") 221async def post_editor_links(): 222 user = await get_user() 223 if user is None: 224 url = url_for("auth_logout") 225 return htmx_response(redirect=url) if htmx else redirect(url, 303) 226 227 links: list[dict[str, str]] = [] 228 hrefs = request.form.getlist("link-href") 229 titles = request.form.getlist("link-title") 230 subtitles = request.form.getlist("link-subtitle") 231 backgrounds = request.form.getlist("link-background-color") 232 for href, title, subtitle, background in zip(hrefs, titles, subtitles, backgrounds): 233 if not href or not title or not background: 234 break 235 link: dict[str, str] = { 236 "href": href, 237 "title": title, 238 "backgroundColor": background, 239 } 240 if subtitle: 241 link["subtitle"] = subtitle 242 links.append(link) 243 244 record = { 245 "$type": "at.ligo.actor.links", 246 "sections": [ 247 { 248 "title": "", 249 "links": links, 250 } 251 ], 252 } 253 254 success = await put_record( 255 user=user, 256 pds=user.pds_url, 257 repo=user.did, 258 collection="at.ligo.actor.links", 259 rkey="self", 260 record=record, 261 ) 262 263 if success: 264 kv = KV(app, app.logger, "links_from_did") 265 kv.set(user.did, json.dumps(record)) 266 else: 267 app.logger.warning("log out user for now") 268 url = url_for("auth_logout") 269 return htmx_response(redirect=url) if htmx else redirect(url, 303) 270 271 if htmx: 272 return htmx_response( 273 render_template("_editor_links.html", links=links), 274 reswap="outerHTML", 275 ) 276 277 return redirect(url_for("page_editor"), 303) 278 279 280@app.get("/terms") 281def page_terms(): 282 return render_template("terms.html") 283 284 285class LinkSection(NamedTuple): 286 title: str 287 links: list[dict[str, str]] 288 289 290async def load_links( 291 client: ClientSession, 292 pds: str, 293 did: str, 294 reload: bool = False, 295) -> list[LinkSection] | None: 296 kv = KV(app, app.logger, "links_from_did") 297 record_json = kv.get(did) 298 299 if record_json is not None and not reload: 300 parsed = json.loads(record_json) 301 return _links_or_sections(parsed) 302 303 record = await get_record(client, pds, did, "at.ligo.actor.links", "self") 304 if record is None: 305 return None 306 307 kv.set(did, value=json.dumps(record)) 308 return _links_or_sections(record) 309 310 311def _links_or_sections(raw: dict[str, Any]) -> list[LinkSection] | None: 312 if "sections" in raw: 313 return list(map(lambda s: LinkSection(**s), raw["sections"])) 314 elif "links" in raw: 315 return [LinkSection("", raw["links"])] 316 else: 317 return None 318 319 320async def load_profile( 321 client: ClientSession, 322 pds: str, 323 did: str, 324 fallback_with_bluesky: bool = True, 325 reload: bool = False, 326) -> tuple[dict[str, str] | None, bool]: 327 kv = KV(app, app.logger, "profile_from_did") 328 record_json = kv.get(did) 329 330 if record_json is not None and not reload: 331 return json.loads(record_json), False 332 333 (record, bsky_record) = await asyncio.gather( 334 get_record(client, pds, did, "at.ligo.actor.profile", "self"), 335 get_record(client, pds, did, "app.bsky.actor.profile", "self"), 336 ) 337 338 from_bluesky = False 339 if record is None and fallback_with_bluesky: 340 record = bsky_record 341 from_bluesky = True 342 343 if record is not None: 344 kv.set(did, value=json.dumps(record)) 345 346 return record, from_bluesky 347 348 349# TODO: move to .atproto 350async def put_record( 351 user: OAuthSession, 352 pds: PdsUrl, 353 repo: str, 354 collection: str, 355 rkey: str, 356 record: dict[str, Any], 357) -> bool: 358 """Writes the record onto the users PDS. Returns bool for success.""" 359 360 endpoint = f"{pds}/xrpc/com.atproto.repo.putRecord" 361 body = { 362 "repo": repo, 363 "collection": collection, 364 "rkey": rkey, 365 "record": record, 366 } 367 368 def update_dpop_pds_nonce(nonce: str): 369 session_ = user._replace(dpop_pds_nonce=nonce) 370 save_auth_session(session, session_) 371 372 response = await pds_authed_req( 373 method="POST", 374 url=endpoint, 375 body=body, 376 user=user, 377 update_dpop_pds_nonce=update_dpop_pds_nonce, 378 ) 379 380 if not response.ok: 381 app.logger.warning(f"put_record failed with status {response.status}") 382 app.logger.warning(await response.text()) 383 384 return response.ok 385 386 387def _is_did_blocked(did: str) -> bool: 388 kv = KV(app, app.logger, "blockeddids") 389 return kv.get(did) is not None