decentralized and customizable links page on top of atproto ligo.at
atproto link-in-bio python uv

store OAuth session in client side cookie

+104 -124
+8 -20
src/atproto/oauth.py
··· 1 - import sqlite3 2 - from typing import Any, NamedTuple 3 import time 4 import json 5 from authlib.jose import JsonWebKey, Key ··· 20 refresh_token: str 21 scope: str 22 sub: str 23 24 25 # Prepares and sends a pushed auth request (PAR) via HTTP POST to the Authorization Server. ··· 160 161 resp.raise_for_status() 162 token_body = resp.json() 163 - try: 164 - tokens = OAuthTokens(**token_body) 165 - except TypeError: 166 - raise Exception("invalid token body") 167 168 return tokens, dpop_authserver_nonce 169 ··· 226 227 resp.raise_for_status() 228 token_body = resp.json() 229 - try: 230 - tokens = OAuthTokens(**token_body) 231 - except TypeError: 232 - raise Exception("invalid token body") 233 234 return tokens, dpop_authserver_nonce 235 ··· 240 method: str, 241 url: str, 242 user: OAuthSession, 243 - db: sqlite3.Connection, 244 body: dict[str, Any] | None = None, 245 ) -> Response | None: 246 dpop_private_jwk = JsonWebKey.import_key(json.loads(user.dpop_private_jwk)) ··· 275 response.status_code in [400, 401] 276 and response.json()["error"] == "use_dpop_nonce" 277 ): 278 - # print(resp.headers) 279 dpop_pds_nonce = response.headers["DPoP-Nonce"] 280 print(f"retrying with new PDS DPoP nonce: {dpop_pds_nonce}") 281 - # update session database with new nonce 282 - cur = db.cursor() 283 - _ = cur.execute( 284 - "UPDATE oauth_session SET dpop_pds_nonce = ? WHERE did = ?;", 285 - [dpop_pds_nonce, user.did], 286 - ) 287 - db.commit() 288 - cur.close() 289 continue 290 break 291
··· 1 + from typing import Any, Callable, NamedTuple 2 import time 3 import json 4 from authlib.jose import JsonWebKey, Key ··· 19 refresh_token: str 20 scope: str 21 sub: str 22 + # only for parsing 23 + token_type: str | None 24 + expires_in: int | None 25 26 27 # Prepares and sends a pushed auth request (PAR) via HTTP POST to the Authorization Server. ··· 162 163 resp.raise_for_status() 164 token_body = resp.json() 165 + tokens = OAuthTokens(**token_body) 166 167 return tokens, dpop_authserver_nonce 168 ··· 225 226 resp.raise_for_status() 227 token_body = resp.json() 228 + tokens = OAuthTokens(**token_body) 229 230 return tokens, dpop_authserver_nonce 231 ··· 236 method: str, 237 url: str, 238 user: OAuthSession, 239 + update_dpop_pds_nonce: Callable[[str], None], 240 body: dict[str, Any] | None = None, 241 ) -> Response | None: 242 dpop_private_jwk = JsonWebKey.import_key(json.loads(user.dpop_private_jwk)) ··· 271 response.status_code in [400, 401] 272 and response.json()["error"] == "use_dpop_nonce" 273 ): 274 dpop_pds_nonce = response.headers["DPoP-Nonce"] 275 print(f"retrying with new PDS DPoP nonce: {dpop_pds_nonce}") 276 + update_dpop_pds_nonce(dpop_pds_nonce) 277 continue 278 break 279
+20 -29
src/main.py
··· 4 5 from .atproto import PdsUrl, get_record, resolve_did_from_handle, resolve_pds_from_did 6 from .atproto.oauth import pds_authed_req 7 - from .db import close_db_connection, get_db, init_db 8 - from .oauth import oauth 9 from .types import OAuthSession 10 11 app = Flask(__name__) ··· 21 22 @app.before_request 23 def load_user_to_context(): 24 - user: OAuthSession | None = None 25 - did: str | None = session.get("user_did") 26 - if did is not None: 27 - db = get_db(app) 28 - row = db.execute( 29 - "select * from oauth_session where did = ?", 30 - (did,), 31 - ).fetchone() 32 - user = OAuthSession(**row) 33 - g.user = user 34 35 36 def get_user() -> OAuthSession | None: ··· 84 if not username: 85 return redirect(url_for("page_login"), 303) 86 return redirect(url_for("oauth.oauth_start", username=username), 303) 87 88 89 @app.get("/editor") ··· 174 return redirect("/editor", 303) 175 176 177 def load_links(pds: str, did: str, reload: bool = False) -> list[dict[str, str]] | None: 178 if did in links and not reload: 179 app.logger.debug(f"returning cached links for {did}") ··· 228 "rkey": rkey, 229 "record": record, 230 } 231 response = pds_authed_req( 232 method="POST", 233 url=endpoint, 234 body=body, 235 user=user, 236 - db=get_db(app), 237 ) 238 if not response or not response.ok: 239 app.logger.warning("PDS HTTP ERROR") 240 - 241 - 242 - # AUTH 243 - 244 - 245 - @app.route("/auth/logout") 246 - def auth_logout(): 247 - user = get_user() 248 - if user is not None: 249 - db = get_db(app) 250 - cursor = db.cursor() 251 - _ = cursor.execute("delete from oauth_session where did = ?", (user.did,)) 252 - db.commit() 253 - cursor.close() 254 - session.clear() 255 - return redirect("/", 303)
··· 4 5 from .atproto import PdsUrl, get_record, resolve_did_from_handle, resolve_pds_from_did 6 from .atproto.oauth import pds_authed_req 7 + from .db import close_db_connection, init_db 8 + from .oauth import get_auth_session, oauth, save_auth_session 9 from .types import OAuthSession 10 11 app = Flask(__name__) ··· 21 22 @app.before_request 23 def load_user_to_context(): 24 + g.user = get_auth_session(session) 25 26 27 def get_user() -> OAuthSession | None: ··· 75 if not username: 76 return redirect(url_for("page_login"), 303) 77 return redirect(url_for("oauth.oauth_start", username=username), 303) 78 + 79 + 80 + @app.route("/auth/logout") 81 + def auth_logout(): 82 + session.clear() 83 + return redirect("/", 303) 84 85 86 @app.get("/editor") ··· 171 return redirect("/editor", 303) 172 173 174 + @app.get("/terms") 175 + def page_terms(): 176 + return "come back soon" 177 + 178 + 179 def load_links(pds: str, did: str, reload: bool = False) -> list[dict[str, str]] | None: 180 if did in links and not reload: 181 app.logger.debug(f"returning cached links for {did}") ··· 230 "rkey": rkey, 231 "record": record, 232 } 233 + 234 + def update_dpop_pds_nonce(nonce: str): 235 + session_ = user._replace(dpop_pds_nonce=nonce) 236 + save_auth_session(session, session_) 237 + 238 response = pds_authed_req( 239 method="POST", 240 url=endpoint, 241 body=body, 242 user=user, 243 + update_dpop_pds_nonce=update_dpop_pds_nonce, 244 ) 245 if not response or not response.ok: 246 app.logger.warning("PDS HTTP ERROR")
+75 -52
src/oauth.py
··· 1 from authlib.jose import JsonWebKey, Key 2 from flask import Blueprint, current_app, jsonify, redirect, request, session, url_for 3 from urllib.parse import urlencode 4 5 import json ··· 14 ) 15 from .atproto.oauth import initial_token_request, send_par_auth_request 16 from .security import is_safe_url 17 - from .types import OAuthAuthRequest 18 - from .db import get_db 19 20 oauth = Blueprint("oauth", __name__, url_prefix="/oauth") 21 ··· 87 par_request_uri: str = resp.json()["request_uri"] 88 current_app.logger.debug(f"saving oauth_auth_request to DB state={state}") 89 90 - db = get_db(current_app) 91 - cursor = db.cursor() 92 - _ = cursor.execute( 93 - "insert or replace into oauth_auth_requests values (?, ?, ?, ?, ?, ?, ?, ?, ?)", 94 - ( 95 - state, 96 - authserver_meta["issuer"], 97 - did, 98 - handle, 99 - pds_url, 100 - pkce_verifier, 101 - scope, 102 - dpop_authserver_nonce, 103 - dpop_private_jwk.as_json(is_private=True), 104 - ), 105 ) 106 - db.commit() 107 - cursor.close() 108 109 auth_endpoint = authserver_meta["authorization_endpoint"] 110 assert is_safe_url(auth_endpoint) ··· 118 authserver_iss = request.args["iss"] 119 authorization_code = request.args["code"] 120 121 - db = get_db(current_app) 122 - cursor = db.cursor() 123 - 124 - row = cursor.execute( 125 - "select * from oauth_auth_requests where state = ?", (state,) 126 - ).fetchone() 127 - try: 128 - auth_request = OAuthAuthRequest(**row) 129 - except TypeError: 130 return redirect(url_for("page_login"), 303) 131 132 current_app.logger.debug(f"Deleting auth request for state={state}") 133 - _ = cursor.execute("delete from oauth_auth_requests where state = ?", (state,)) 134 - db.commit() 135 136 assert auth_request.authserver_iss == authserver_iss 137 assert auth_request.state == state ··· 147 148 row = auth_request 149 150 - did = auth_request.did 151 if row.did: 152 # If we started with an account identifier, this is simple 153 did, handle, pds_url = row.did, row.handle, row.pds_url ··· 166 assert authserver_url == authserver_iss 167 168 assert row.scope == tokens.scope 169 170 - current_app.logger.debug("storing user did and handle") 171 - db = get_db(current_app) 172 - cursor = db.cursor() 173 - _ = cursor.execute( 174 - "insert or replace into oauth_session values (?, ?, ?, ?, ?, ?, ?, ?, ?)", 175 - ( 176 - did, 177 - handle, 178 - pds_url, 179 - authserver_iss, 180 - tokens.access_token, 181 - tokens.refresh_token, 182 - dpop_authserver_nonce, 183 - None, 184 - auth_request.dpop_private_jwk, 185 - ), 186 ) 187 - db.commit() 188 - cursor.close() 189 - 190 - session["user_did"] = did 191 - session["user_handle"] = auth_request.handle 192 193 return redirect(url_for("page_login")) 194 ··· 221 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"]) 222 CLIENT_PUB_JWK = json.loads(CLIENT_SECRET_JWK.as_json(is_private=False)) 223 return jsonify({"keys": [CLIENT_PUB_JWK]})
··· 1 + from typing import NamedTuple 2 from authlib.jose import JsonWebKey, Key 3 from flask import Blueprint, current_app, jsonify, redirect, request, session, url_for 4 + from flask.sessions import SessionMixin 5 from urllib.parse import urlencode 6 7 import json ··· 16 ) 17 from .atproto.oauth import initial_token_request, send_par_auth_request 18 from .security import is_safe_url 19 + from .types import OAuthAuthRequest, OAuthSession 20 21 oauth = Blueprint("oauth", __name__, url_prefix="/oauth") 22 ··· 88 par_request_uri: str = resp.json()["request_uri"] 89 current_app.logger.debug(f"saving oauth_auth_request to DB state={state}") 90 91 + oauth_request = OAuthAuthRequest( 92 + state, 93 + authserver_meta["issuer"], 94 + did, 95 + handle, 96 + pds_url, 97 + pkce_verifier, 98 + scope, 99 + dpop_authserver_nonce, 100 + dpop_private_jwk.as_json(is_private=True), 101 ) 102 + save_auth_request(session, oauth_request) 103 104 auth_endpoint = authserver_meta["authorization_endpoint"] 105 assert is_safe_url(auth_endpoint) ··· 113 authserver_iss = request.args["iss"] 114 authorization_code = request.args["code"] 115 116 + auth_request = get_auth_request(session) 117 + if not auth_request: 118 return redirect(url_for("page_login"), 303) 119 120 current_app.logger.debug(f"Deleting auth request for state={state}") 121 + delete_auth_request(session) 122 123 assert auth_request.authserver_iss == authserver_iss 124 assert auth_request.state == state ··· 134 135 row = auth_request 136 137 if row.did: 138 # If we started with an account identifier, this is simple 139 did, handle, pds_url = row.did, row.handle, row.pds_url ··· 152 assert authserver_url == authserver_iss 153 154 assert row.scope == tokens.scope 155 + assert pds_url is not None 156 157 + current_app.logger.debug("storing user oauth session") 158 + oauth_session = OAuthSession( 159 + did, 160 + handle, 161 + pds_url, 162 + authserver_iss, 163 + tokens.access_token, 164 + tokens.refresh_token, 165 + dpop_authserver_nonce, 166 + None, 167 + auth_request.dpop_private_jwk, 168 ) 169 + save_auth_session(session, oauth_session) 170 171 return redirect(url_for("page_login")) 172 ··· 199 CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"]) 200 CLIENT_PUB_JWK = json.loads(CLIENT_SECRET_JWK.as_json(is_private=False)) 201 return jsonify({"keys": [CLIENT_PUB_JWK]}) 202 + 203 + 204 + # Session storage 205 + 206 + 207 + def save_auth_request(session: SessionMixin, request: OAuthAuthRequest): 208 + return _set_into_session(session, "oauth_auth_request", request) 209 + 210 + 211 + def save_auth_session(session: SessionMixin, auth_session: OAuthSession): 212 + return _set_into_session(session, "oauth_auth_session", auth_session) 213 + 214 + 215 + def delete_auth_request(session: SessionMixin): 216 + return _delete_from_session(session, "oauth_auth_request") 217 + 218 + 219 + def delete_auth_session(session: SessionMixin): 220 + return _delete_from_session(session, "oauth_auth_session") 221 + 222 + 223 + def get_auth_request(session: SessionMixin) -> OAuthAuthRequest | None: 224 + try: 225 + return OAuthAuthRequest(**session["oauth_auth_request"]) 226 + except TypeError as exception: 227 + current_app.logger.debug("unable to load oauth_auth_request") 228 + current_app.logger.debug(exception) 229 + return None 230 + 231 + 232 + def get_auth_session(session: SessionMixin) -> OAuthSession | None: 233 + try: 234 + return OAuthSession(**session["oauth_auth_session"]) 235 + except TypeError as exception: 236 + current_app.logger.debug("unable to load oauth_auth_session") 237 + current_app.logger.debug(exception) 238 + return None 239 + 240 + 241 + def _set_into_session(session: SessionMixin, key: str, value: NamedTuple): 242 + session[key] = value._asdict() 243 + 244 + 245 + def _delete_from_session(session: SessionMixin, key: str): 246 + del session[key]
+1 -23
src/schema.sql
··· 1 - create table if not exists oauth_auth_requests ( 2 - state text not null primary key, 3 - authserver_iss text not null, 4 - did text, 5 - handle text, 6 - pds_url text, 7 - pkce_verifier text not null, 8 - scope text not null, 9 - dpop_authserver_nonce text not null, 10 - dpop_private_jwk text not null 11 - ) strict, without rowid; 12 - 13 - create table if not exists oauth_sessions ( 14 - did text not null primary key, 15 - handle text, 16 - pds_url text not null, 17 - authserver_iss text not null, 18 - access_token text, 19 - refresh_token text, 20 - dpop_authserver_nonce text not null, 21 - dpop_pds_nonce text, 22 - dpop_private_jwk text not null 23 - ) strict, without rowid;
··· 1 + -- empty for now