A from-scratch atproto PDS implementation in Python (mirrors https://github.com/DavidBuchanan314/millipds)

refactor symmetric auth into its own function

+41 -32
+41 -32
src/millipds/auth_bearer.py
··· 11 routes = web.RouteTableDef() 12 13 14 def authenticated(handler): 15 """ 16 There are three types of auth: ··· 39 ) 40 # logger.info(unverified) 41 if unverified["header"]["alg"] == "HS256": # symmetric secret 42 - try: 43 - payload: dict = jwt.decode( 44 - jwt=token, 45 - key=db.config["jwt_access_secret"], 46 - algorithms=["HS256"], 47 - audience=db.config["pds_did"], 48 - options={ 49 - "require": ["exp", "iat", "scope", "jti", "sub"], 50 - "verify_exp": True, 51 - "verify_iat": True, 52 - "strict_aud": True, # may be unnecessary 53 - }, 54 - ) 55 - except jwt.exceptions.PyJWTError: 56 - raise web.HTTPUnauthorized(text="invalid jwt") 57 - 58 - revoked = db.con.execute( 59 - "SELECT COUNT(*) FROM revoked_token WHERE did=? AND jti=?", 60 - (payload["sub"], payload["jti"]), 61 - ).fetchone()[0] 62 - 63 - if revoked: 64 - raise web.HTTPUnauthorized(text="revoked token") 65 - 66 - # if we reached this far, the payload must've been signed by us 67 - if payload.get("scope") != "com.atproto.access": 68 - raise web.HTTPUnauthorized(text="invalid jwt scope") 69 - 70 - subject: str = payload.get("sub", "") 71 - if not subject.startswith("did:"): 72 - raise web.HTTPUnauthorized(text="invalid jwt: invalid subject") 73 - request["authed_did"] = subject 74 else: # asymmetric service auth (scoped to a specific lxm) 75 did: str = unverified["payload"]["iss"] 76 if not did.startswith("did:"):
··· 11 routes = web.RouteTableDef() 12 13 14 + def verify_symmetric_token_and_get_subject( 15 + request: web.Request, token: str, expected_scope: str 16 + ) -> str: 17 + db = get_db(request) 18 + try: 19 + payload: dict = jwt.decode( 20 + jwt=token, 21 + key=db.config["jwt_access_secret"], 22 + algorithms=["HS256"], 23 + audience=db.config["pds_did"], 24 + options={ 25 + "require": ["exp", "iat", "scope", "jti", "sub"], 26 + "verify_exp": True, 27 + "verify_iat": True, 28 + "strict_aud": True, # may be unnecessary 29 + }, 30 + ) 31 + except jwt.exceptions.PyJWTError: 32 + raise web.HTTPUnauthorized(text="invalid jwt") 33 + 34 + revoked = db.con.execute( 35 + "SELECT COUNT(*) FROM revoked_token WHERE did=? AND jti=?", 36 + (payload["sub"], payload["jti"]), 37 + ).fetchone()[0] 38 + 39 + if revoked: 40 + raise web.HTTPUnauthorized(text="revoked token") 41 + 42 + # if we reached this far, the payload must've been signed by us 43 + if payload.get("scope") != expected_scope: 44 + raise web.HTTPUnauthorized(text="invalid jwt scope") 45 + 46 + subject: str = payload.get("sub", "") 47 + if not subject.startswith("did:"): 48 + raise web.HTTPUnauthorized(text="invalid jwt: invalid subject") 49 + return subject 50 + 51 + 52 def authenticated(handler): 53 """ 54 There are three types of auth: ··· 77 ) 78 # logger.info(unverified) 79 if unverified["header"]["alg"] == "HS256": # symmetric secret 80 + request["authed_did"] = verify_symmetric_token_and_get_subject( 81 + request, token, "com.atproto.access" 82 + ) 83 else: # asymmetric service auth (scoped to a specific lxm) 84 did: str = unverified["payload"]["iss"] 85 if not did.startswith("did:"):