decentralized and customizable links page on top of atproto ligo.at
atproto link-in-bio python uv

move kv around

+62 -33
+20 -8
src/atproto/__init__.py
··· 3 3 from typing import Any 4 4 import httpx 5 5 6 - from ..db import KV 6 + from .kv import KV, nokv 7 7 8 8 from .validator import is_valid_authserver_meta 9 9 from ..security import is_safe_url ··· 26 26 return regex_match(DID_REGEX, did) is not None 27 27 28 28 29 - def resolve_identity(query: str, didkv: KV) -> tuple[str, str, dict[str, Any]] | None: 29 + def resolve_identity( 30 + query: str, 31 + didkv: KV = nokv, 32 + ) -> tuple[str, str, dict[str, Any]] | None: 30 33 """Resolves an identity to a DID, handle and DID document, verifies handles bi directionally.""" 31 34 32 35 if is_valid_handle(query): 33 - handle = query 36 + handle = query.lower() 34 37 did = resolve_did_from_handle(handle, didkv) 35 38 if not did: 36 39 return None ··· 62 65 handles: list[str] = [] 63 66 for aka in doc.get("alsoKnownAs", []): 64 67 if aka.startswith("at://"): 65 - handle = aka[5:] 68 + handle = aka[5:].lower() 66 69 if is_valid_handle(handle): 67 70 handles.append(handle) 68 71 return handles ··· 77 80 return None 78 81 79 82 80 - def resolve_did_from_handle(handle: str, kv: KV, reload: bool = False) -> str | None: 83 + def resolve_did_from_handle( 84 + handle: str, 85 + kv: KV = nokv, 86 + reload: bool = False, 87 + ) -> str | None: 81 88 """Returns the DID for a given handle""" 82 89 83 90 if not is_valid_handle(handle): ··· 114 121 return None 115 122 116 123 117 - def resolve_pds_from_did(did: DID, kv: KV, reload: bool = False) -> PdsUrl | None: 124 + def resolve_pds_from_did( 125 + did: DID, 126 + kv: KV = nokv, 127 + reload: bool = False, 128 + ) -> PdsUrl | None: 118 129 pds = kv.get(did) 119 130 if pds is not None and not reload: 120 131 print(f"returning cached pds for {did}") ··· 150 161 151 162 def resolve_authserver_from_pds( 152 163 pds_url: PdsUrl, 153 - kv: KV, 164 + kv: KV = nokv, 154 165 reload: bool = False, 155 166 ) -> AuthserverUrl | None: 156 167 """Returns the authserver URL for the PDS.""" ··· 189 200 repo: str, 190 201 collection: str, 191 202 record: str, 203 + type: str | None = None, 192 204 ) -> dict[str, Any] | None: 193 205 """Retrieve record from PDS. Verifies type is the same as collection name.""" 194 206 response = httpx.get( ··· 198 210 return None 199 211 parsed = response.json() 200 212 value: dict[str, Any] = parsed["value"] 201 - if value["$type"] != collection: 213 + if value["$type"] != (type or collection): 202 214 return None 203 215 del value["$type"] 204 216 return value
+25
src/atproto/kv.py
··· 1 + from abc import ABC, abstractmethod 2 + from typing import override 3 + 4 + 5 + class KV(ABC): 6 + @abstractmethod 7 + def get(self, key: str) -> str | None: 8 + pass 9 + 10 + @abstractmethod 11 + def set(self, key: str, value: str): 12 + pass 13 + 14 + 15 + class NoKV(KV): 16 + @override 17 + def get(self, key: str) -> str | None: 18 + return None 19 + 20 + @override 21 + def set(self, key: str, value: str): 22 + pass 23 + 24 + 25 + nokv = NoKV()
+4 -13
src/db.py
··· 1 - from abc import ABC, abstractmethod 2 1 from typing import override 3 2 from flask import Flask, g 4 3 5 4 import sqlite3 6 5 from sqlite3 import Connection 7 6 7 + from .atproto.kv import KV as BaseKV 8 8 9 - class KV(ABC): 10 - @abstractmethod 11 - def get(self, key: str) -> str | None: 12 - pass 13 9 14 - @abstractmethod 15 - def set(self, key: str, value: str): 16 - pass 17 - 18 - 19 - class Keyval(KV): 10 + class KV(BaseKV): 20 11 db: Connection 21 12 prefix: str 22 13 23 - def __init__(self, app: Flask, prefix: str): 24 - self.db = get_db(app) 14 + def __init__(self, app: Connection | Flask, prefix: str): 15 + self.db = app if isinstance(app, Connection) else get_db(app) 25 16 self.prefix = prefix 26 17 27 18 @override
+5 -5
src/main.py
··· 10 10 resolve_pds_from_did, 11 11 ) 12 12 from .atproto.oauth import pds_authed_req 13 - from .db import Keyval, close_db_connection, init_db 13 + from .db import KV, close_db_connection, init_db 14 14 from .oauth import get_auth_session, oauth, save_auth_session 15 15 from .types import OAuthSession 16 16 ··· 52 52 @app.get("/@<string:handle>") 53 53 def page_profile_with_handle(handle: str): 54 54 reload = request.args.get("reload") is not None 55 - kv = Keyval(app, "did_from_handle") 55 + kv = KV(app, "did_from_handle") 56 56 did = resolve_did_from_handle(handle, kv, reload=reload) 57 57 if did is None: 58 58 return "did not found", 404 ··· 60 60 61 61 62 62 def page_profile(did: str, reload: bool = False): 63 - kv = Keyval(app, "pds_from_did") 63 + kv = KV(app, "pds_from_did") 64 64 pds = resolve_pds_from_did(did, kv, reload=reload) 65 65 if pds is None: 66 66 return "pds not found", 404 ··· 193 193 194 194 195 195 def load_links(pds: str, did: str, reload: bool = False) -> list[dict[str, str]] | None: 196 - kv = Keyval(app, "links_from_did") 196 + kv = KV(app, "links_from_did") 197 197 links = kv.get(did) 198 198 199 199 if links is not None and not reload: ··· 215 215 did: str, 216 216 reload: bool = False, 217 217 ) -> tuple[tuple[str, str] | None, bool]: 218 - kv = Keyval(app, "profile_from_did") 218 + kv = KV(app, "profile_from_did") 219 219 profile = kv.get(did) 220 220 221 221 if profile is not None and not reload:
+7 -5
src/oauth.py
··· 6 6 7 7 import json 8 8 9 - from .db import Keyval 9 + from .db import KV, get_db 10 10 11 11 from .atproto import ( 12 12 is_valid_did, ··· 30 30 if not username: 31 31 return redirect(url_for("page_login"), 303) 32 32 33 - pdskv = Keyval(current_app, "authserver_from_pds") 33 + db = get_db(current_app) 34 + pdskv = KV(db, "authserver_from_pds") 34 35 35 36 if is_valid_handle(username) or is_valid_did(username): 36 37 login_hint = username 37 - kv = Keyval(current_app, "did_from_handle") 38 + kv = KV(db, "did_from_handle") 38 39 identity = resolve_identity(username, didkv=kv) 39 40 if identity is None: 40 41 return "couldnt resolve identity", 500 ··· 139 140 140 141 row = auth_request 141 142 142 - didkv = Keyval(current_app, "did_from_handle") 143 - authserverkv = Keyval(current_app, "authserver_from_pds") 143 + db = get_db(current_app) 144 + didkv = KV(db, "did_from_handle") 145 + authserverkv = KV(db, "authserver_from_pds") 144 146 145 147 if row.did: 146 148 # If we started with an account identifier, this is simple
-1
src/schema.sql
··· 1 - drop table if exists keyval; 2 1 create table if not exists keyval ( 3 2 prefix text not null, 4 3 key text not null,
+1 -1
src/templates/login.html
··· 19 19 <form action="{{ url_for('auth_login') }}" method="post"> 20 20 <label> 21 21 <span>Handle</span> 22 - <input type="text" name="username" placeholder="username.example.com" required /> 22 + <input type="text" name="username" placeholder="username.example.com" autocapitalize="off" autocomplete="off" spellcheck="false" required /> 23 23 </label> 24 24 <span class="caption"> 25 25 Use your AT Protocol handle to log in.