decentralized and customizable links page on top of atproto ligo.at
atproto link-in-bio python uv
at 1b7c0778f94de667d002f803fdb1fcc64b2efd23 329 lines 8.7 kB view raw
1import asyncio 2import json 3 4from aiohttp.client import ClientResponse, ClientSession 5from flask import Flask, g, session, redirect, render_template, request, url_for 6from flask_htmx import HTMX, make_response as htmx_reponse 7from typing import Any 8 9from .atproto import ( 10 PdsUrl, 11 get_record, 12 is_valid_did, 13 resolve_did_from_handle, 14 resolve_pds_from_did, 15) 16from .atproto.oauth import pds_authed_req 17from .atproto.types import OAuthSession 18from .auth import get_auth_session, save_auth_session 19from .db import KV, close_db_connection, get_db, init_db 20from .oauth import oauth 21 22app = Flask(__name__) 23_ = app.config.from_prefixed_env() 24app.register_blueprint(oauth) 25htmx = HTMX() 26htmx.init_app(app) 27init_db(app) 28 29SCHEMA = "at.ligo" 30 31 32@app.before_request 33async def load_user_to_context(): 34 g.user = get_auth_session(session) 35 36 37def get_user() -> OAuthSession | None: 38 return g.user 39 40 41@app.teardown_appcontext 42async def app_teardown(exception: BaseException | None): 43 close_db_connection(exception) 44 45 46@app.get("/") 47def page_home(): 48 return render_template("index.html") 49 50 51@app.get("/<string:atid>") 52async def page_profile(atid: str): 53 reload = request.args.get("reload") is not None 54 55 db = get_db(app) 56 didkv = KV(db, app.logger, "did_from_handle") 57 pdskv = KV(db, app.logger, "pds_from_did") 58 59 async with ClientSession() as client: 60 if atid.startswith("@"): 61 handle = atid[1:].lower() 62 did = await resolve_did_from_handle(client, handle, kv=didkv, reload=reload) 63 if did is None: 64 return render_template("error.html", message="did not found"), 404 65 elif is_valid_did(atid): 66 handle = None 67 did = atid 68 else: 69 return render_template("error.html", message="invalid did or handle"), 400 70 71 if _is_did_blocked(did): 72 return render_template("error.html", message="profile not found"), 404 73 74 pds = await resolve_pds_from_did(client, did=did, kv=pdskv, reload=reload) 75 if pds is None: 76 return render_template("error.html", message="pds not found"), 404 77 (profile, _), links = await asyncio.gather( 78 load_profile(client, pds, did, reload=reload), 79 load_links(client, pds, did, reload=reload), 80 ) 81 if links is None: 82 return render_template("error.html", message="profile not found"), 404 83 84 if reload: 85 # remove the ?reload parameter 86 return redirect(request.path) 87 88 profile["handle"] = handle 89 athref = f"at://{did}/at.ligo.actor.links/self" 90 return render_template( 91 "profile.html", 92 profile=profile, 93 links=links, 94 athref=athref, 95 ) 96 97 98@app.get("/login") 99def page_login(): 100 if get_user() is not None: 101 return redirect("/editor") 102 return render_template("login.html") 103 104 105@app.post("/login") 106def auth_login(): 107 username = request.form.get("username", "") 108 if username[0] == "@": 109 username = username[1:] 110 if not username: 111 return redirect(url_for("page_login"), 303) 112 return redirect(url_for("oauth.oauth_start", username=username), 303) 113 114 115@app.route("/auth/logout") 116def auth_logout(): 117 session.clear() 118 return redirect("/", 303) 119 120 121@app.get("/editor") 122async def page_editor(): 123 user = get_user() 124 if user is None: 125 return redirect("/login") 126 127 did: str = user.did 128 pds: str = user.pds_url 129 handle: str | None = user.handle 130 131 async with ClientSession() as client: 132 (profile, from_bluesky), links = await asyncio.gather( 133 load_profile(client, pds, did), 134 load_links(client, pds, did), 135 ) 136 137 return render_template( 138 "editor.html", 139 handle=handle, 140 profile=profile, 141 profile_from_bluesky=from_bluesky, 142 links=json.dumps(links or []), 143 ) 144 145 146@app.post("/editor/profile") 147async def post_editor_profile(): 148 user = get_user() 149 if user is None: 150 return redirect("/login", 303) 151 152 display_name = request.form.get("displayName") 153 description = request.form.get("description", "") 154 if not display_name: 155 return redirect("/editor", 303) 156 157 record = { 158 "$type": f"{SCHEMA}.actor.profile", 159 "displayName": display_name, 160 "description": description, 161 } 162 163 success = await put_record( 164 user=user, 165 pds=user.pds_url, 166 repo=user.did, 167 collection=f"{SCHEMA}.actor.profile", 168 rkey="self", 169 record=record, 170 ) 171 172 if success: 173 kv = KV(app, app.logger, "profile_from_did") 174 kv.set(user.did, json.dumps(record)) 175 176 if htmx: 177 return htmx_reponse( 178 render_template("_editor_profile.html", profile=record), 179 reswap="outerHTML", 180 ) 181 182 return redirect(url_for("page_editor"), 303) 183 184 185@app.post("/editor/links") 186async def post_editor_links(): 187 user = get_user() 188 if user is None: 189 return redirect("/login", 303) 190 191 links: list[dict[str, str]] = [] 192 hrefs = request.form.getlist("link-href") 193 titles = request.form.getlist("link-title") 194 subtitles = request.form.getlist("link-subtitle") 195 backgrounds = request.form.getlist("link-background-color") 196 for href, title, subtitle, background in zip(hrefs, titles, subtitles, backgrounds): 197 if not href or not title or not background: 198 break 199 link: dict[str, str] = { 200 "href": href, 201 "title": title, 202 "backgroundColor": background, 203 } 204 if subtitle: 205 link["subtitle"] = subtitle 206 links.append(link) 207 208 record = { 209 "$type": f"{SCHEMA}.actor.links", 210 "links": links, 211 } 212 213 success = await put_record( 214 user=user, 215 pds=user.pds_url, 216 repo=user.did, 217 collection=f"{SCHEMA}.actor.links", 218 rkey="self", 219 record=record, 220 ) 221 222 if success: 223 kv = KV(app, app.logger, "links_from_did") 224 kv.set(user.did, json.dumps(record)) 225 226 if htmx: 227 return htmx_reponse( 228 render_template("_editor_links.html", links=record["links"]), 229 reswap="outerHTML", 230 ) 231 232 return redirect(url_for("page_editor"), 303) 233 234 235@app.get("/terms") 236def page_terms(): 237 return render_template("terms.html") 238 239 240async def load_links( 241 client: ClientSession, 242 pds: str, 243 did: str, 244 reload: bool = False, 245) -> list[dict[str, str]] | None: 246 kv = KV(app, app.logger, "links_from_did") 247 record_json = kv.get(did) 248 249 if record_json is not None and not reload: 250 return json.loads(record_json)["links"] 251 252 record = await get_record(client, pds, did, f"{SCHEMA}.actor.links", "self") 253 if record is None: 254 return None 255 256 kv.set(did, value=json.dumps(record)) 257 return record["links"] 258 259 260async def load_profile( 261 client: ClientSession, 262 pds: str, 263 did: str, 264 fallback_with_bluesky: bool = True, 265 reload: bool = False, 266) -> tuple[dict[str, str] | None, bool]: 267 kv = KV(app, app.logger, "profile_from_did") 268 record_json = kv.get(did) 269 270 if record_json is not None and not reload: 271 return json.loads(record_json), False 272 273 (record, bsky_record) = await asyncio.gather( 274 get_record(client, pds, did, f"{SCHEMA}.actor.profile", "self"), 275 get_record(client, pds, did, "app.bsky.actor.profile", "self"), 276 ) 277 278 from_bluesky = False 279 if record is None and fallback_with_bluesky: 280 record = bsky_record 281 from_bluesky = True 282 283 if record is not None: 284 kv.set(did, value=json.dumps(record)) 285 286 return record, from_bluesky 287 288 289# TODO: move to .atproto 290async def put_record( 291 user: OAuthSession, 292 pds: PdsUrl, 293 repo: str, 294 collection: str, 295 rkey: str, 296 record: dict[str, Any], 297) -> bool: 298 """Writes the record onto the users PDS. Returns bool for success.""" 299 300 endpoint = f"{pds}/xrpc/com.atproto.repo.putRecord" 301 body = { 302 "repo": repo, 303 "collection": collection, 304 "rkey": rkey, 305 "record": record, 306 } 307 308 def update_dpop_pds_nonce(nonce: str): 309 session_ = user._replace(dpop_pds_nonce=nonce) 310 save_auth_session(session, session_) 311 312 response = await pds_authed_req( 313 method="POST", 314 url=endpoint, 315 body=body, 316 user=user, 317 update_dpop_pds_nonce=update_dpop_pds_nonce, 318 ) 319 320 if not response.ok: 321 app.logger.warning(f"put_record failed with status {response.status}") 322 app.logger.warning(await response.text()) 323 324 return response.ok 325 326 327def _is_did_blocked(did: str) -> bool: 328 kv = KV(app, app.logger, "blockeddids") 329 return kv.get(did) is not None