decentralized and customizable links page on top of atproto ligo.at
atproto link-in-bio python uv

store auth_server list in config db

+72 -39
+9 -5
src/atproto/__init__.py
··· 2 2 from os import getenv 3 3 from re import match as regex_match 4 4 from typing import Any, TypeGuard 5 + from urllib.parse import urljoin 5 6 6 7 from aiodns import DNSResolver 7 8 from aiodns import error as dns_error ··· 96 97 didkv: KV[Handle, DID], 97 98 pdskv: KV[DID, PdsUrl], 98 99 ) -> tuple[DID, Handle, PdsUrl] | None: 99 - base = "https://slingshot.microcosm.blue" 100 - url = f"{base}/xrpc/com.bad-example.identity.resolveMiniDoc?identifier={query}" 100 + url = urljoin( 101 + "https://slingshot.microcosm.blue", 102 + f"/xrpc/com.bad-example.identity.resolveMiniDoc?identifier={query}", 103 + ) 101 104 response = await client.get(url) 102 105 if not response.ok: 103 106 return None ··· 241 244 return authserver_url 242 245 243 246 assert is_safe_url(pds_url) 244 - endpoint = f"{pds_url}/.well-known/oauth-protected-resource" 247 + endpoint = urljoin(pds_url, "/.well-known/oauth-protected-resource") 245 248 response = await client.get(endpoint) 246 249 if response.status != 200: 247 250 return None ··· 258 261 """Returns metadata from the authserver""" 259 262 260 263 assert is_safe_url(authserver_url) 261 - endpoint = f"{authserver_url}/.well-known/oauth-authorization-server" 264 + endpoint = urljoin(authserver_url, "/.well-known/oauth-authorization-server") 262 265 response = await client.get(endpoint) 263 266 if not response.ok: 264 267 return None ··· 278 281 """Retrieve record from PDS. Verifies type is the same as collection name.""" 279 282 280 283 params = {"repo": repo, "collection": collection, "rkey": record} 281 - response = await client.get(f"{pds}/xrpc/com.atproto.repo.getRecord", params=params) 284 + url = urljoin(pds, "/xrpc/com.atproto.repo.getRecord") 285 + response = await client.get(url, params=params) 282 286 if not response.ok: 283 287 return None 284 288 parsed = await response.json()
+26
src/config.py
··· 1 + from sqlite3 import Connection 2 + from typing import NamedTuple 3 + 4 + from flask import Flask 5 + 6 + from src.db import get_db 7 + 8 + 9 + class AuthServer(NamedTuple): 10 + name: str 11 + url: str 12 + 13 + 14 + class Config: 15 + db: Connection 16 + 17 + def __init__(self, app: Flask): 18 + self.db = get_db(app, name="config") 19 + 20 + def auth_servers(self) -> list[AuthServer]: 21 + raw = ( 22 + self.db.cursor() 23 + .execute("select name, url from pdss order by relevance desc") 24 + .fetchall() 25 + ) 26 + return [AuthServer(*r) for r in raw]
+7
src/config.sql
··· 1 + create table if not exists pdss ( 2 + name text not null unique, 3 + url text not null unique, 4 + relevance integer not null 5 + ) strict; 6 + 7 + create index if not exists pdss_by_relevance on pdss(relevance desc);
+18 -12
src/db.py
··· 1 1 import sqlite3 2 2 from logging import Logger 3 3 from sqlite3 import Connection 4 - from typing import Generic, cast, override 4 + from typing import Generic, Literal, cast, override 5 5 6 6 from flask import Flask, g 7 7 ··· 15 15 prefix: str 16 16 17 17 def __init__(self, app: Connection | Flask, logger: Logger, prefix: str): 18 - self.db = app if isinstance(app, Connection) else get_db(app) 18 + self.db = app if isinstance(app, Connection) else get_db(app, name="keyval") 19 19 self.logger = logger 20 20 self.prefix = prefix 21 21 ··· 42 42 self.db.commit() 43 43 44 44 45 - def get_db(app: Flask) -> sqlite3.Connection: 46 - db: sqlite3.Connection | None = g.get("db", None) 45 + type DatabaseName = Literal["config"] | Literal["keyval"] 46 + 47 + 48 + def get_db(app: Flask, name: DatabaseName) -> sqlite3.Connection: 49 + global_key = f"{name}_db" 50 + db: sqlite3.Connection | None = g.get(global_key, None) 47 51 if db is None: 48 - db_path: str = app.config.get("DATABASE_URL", "ligoat.db") 49 - db = g.db = sqlite3.connect(db_path, check_same_thread=False) 52 + db_path: str = app.config[f"{name.upper()}_DB_URL"] 53 + db = sqlite3.connect(db_path, check_same_thread=False) 54 + setattr(g, global_key, db) 50 55 # return rows as dict-like objects 51 56 db.row_factory = sqlite3.Row 52 57 return db 53 58 54 59 55 60 def close_db_connection(_exception: BaseException | None): 56 - db: sqlite3.Connection | None = g.get("db", None) 57 - if db is not None: 58 - db.close() 61 + for name in ["keyval", "config"]: 62 + db: sqlite3.Connection | None = g.pop(f"{name}_db", None) 63 + if db is not None: 64 + db.close() 59 65 60 66 61 - def init_db(app: Flask): 67 + def init_db(app: Flask, name: DatabaseName) -> None: 62 68 with app.app_context(): 63 - db = get_db(app) 64 - with app.open_resource("schema.sql", mode="r") as schema: 69 + db = get_db(app, name) 70 + with app.open_resource(f"{name}.sql", mode="r") as schema: 65 71 _ = db.cursor().executescript(schema.read()) 66 72 db.commit()
+8 -20
src/main.py
··· 20 20 refresh_auth_session, 21 21 save_auth_session, 22 22 ) 23 + from src.config import AuthServer, Config 23 24 from src.db import KV, close_db_connection, get_db, init_db 24 25 from src.oauth import oauth 25 26 ··· 28 29 app.register_blueprint(oauth) 29 30 htmx = HTMX() 30 31 htmx.init_app(app) 31 - init_db(app) 32 + init_db(app, name="config") 33 + init_db(app, name="keyval") 32 34 33 35 34 36 @app.before_request ··· 58 60 async def page_profile(atid: str): 59 61 reload = request.args.get("reload") is not None 60 62 61 - db = get_db(app) 63 + db = get_db(app, name="keyval") 62 64 didkv = KV[Handle, DID](db, app.logger, "did_from_handle") 63 65 pdskv = KV[DID, PdsUrl](db, app.logger, "pds_from_did") 64 66 ··· 103 105 ) 104 106 105 107 106 - class AuthServer(NamedTuple): 107 - name: str 108 - url: str 109 - 110 - 111 - auth_servers: list[AuthServer] = [ 112 - AuthServer("Bluesky", "https://bsky.social"), 113 - AuthServer("Blacksky", "https://blacksky.app"), 114 - AuthServer("Northsky", "https://northsky.social"), 115 - AuthServer("tangled.org", "https://tngl.sh"), 116 - AuthServer("Witchraft Systems", "https://pds.witchcraft.systems"), 117 - AuthServer("selfhosted.social", "https://selfhosted.social"), 118 - ] 119 - 120 - if app.debug: 121 - auth_servers.append(AuthServer("pds.rip", "https://pds.rip")) 122 - 123 - 124 108 @app.get("/login") 125 109 async def page_login(): 126 110 if await get_user() is not None: 127 111 return redirect("/editor") 112 + config = Config(app) 113 + auth_servers = config.auth_servers() 114 + if app.debug: 115 + auth_servers.append(AuthServer("pds.rip", "https://pds.rip")) 128 116 return render_template("login.html", auth_servers=auth_servers) 129 117 130 118
+2 -2
src/oauth.py
··· 43 43 if not username: 44 44 return redirect(url_for("page_login"), 303) 45 45 46 - db = get_db(current_app) 46 + db = get_db(current_app, name="keyval") 47 47 didkv = KV[Handle, DID](db, current_app.logger, "did_from_handle") 48 48 pdskv = KV[DID, PdsUrl](db, current_app.logger, "pds_from_did") 49 49 authserverkv = KV[PdsUrl, AuthserverUrl]( ··· 177 177 178 178 row = auth_request 179 179 180 - db = get_db(current_app) 180 + db = get_db(current_app, name="keyval") 181 181 didkv = KV(db, current_app.logger, "did_from_handle") 182 182 authserverkv = KV(db, current_app.logger, "authserver_from_pds") 183 183
src/schema.sql src/keyval.sql
+2
src/templates/login.html
··· 28 28 <input type="submit" value="continue" /> 29 29 </form> 30 30 31 + {% if auth_servers %} 31 32 <div class="authservers"> 32 33 <span class="faded caption">If you're unsure you can log in with...</span> 33 34 <form action="{{ url_for('auth_login') }}" method="post"> ··· 36 37 {% endfor %} 37 38 </form> 38 39 </div> 40 + {% endif %} 39 41 40 42 <footer> 41 43 <p>