tangled
alpha
login
or
join now
ligo.at
/
core
6
fork
atom
decentralized and customizable links page on top of atproto
ligo.at
atproto
link-in-bio
python
uv
6
fork
atom
overview
issues
2
pulls
pipelines
refresh oauth tokens when expired
nauta.one
1 month ago
e0c9fd09
b143f96c
+70
-14
5 changed files
expand all
collapse all
unified
split
src
atproto
oauth.py
types.py
auth.py
main.py
oauth.py
+4
-3
src/atproto/oauth.py
···
168
async def refresh_token_request(
169
client: ClientSession,
170
user: OAuthSession,
171
-
app_url: str,
172
client_secret_jwk: Key,
173
) -> tuple[OAuthTokens, str]:
174
authserver_url = user.authserver_iss
···
179
raise Exception("missing authserver meta")
180
181
# Construct token request fields
182
-
client_id = f"{app_url}oauth/metadata"
183
184
# Self-signed JWT using the private key declared in client metadata JWKS (confidential client)
185
client_assertion = _client_assertion_jwt(
···
255
)
256
257
async with hardened_http.get_session() as session:
258
-
response = await session.post(
0
259
url,
260
headers={
261
"Authorization": f"DPoP {access_token}",
···
168
async def refresh_token_request(
169
client: ClientSession,
170
user: OAuthSession,
171
+
app_host: str,
172
client_secret_jwk: Key,
173
) -> tuple[OAuthTokens, str]:
174
authserver_url = user.authserver_iss
···
179
raise Exception("missing authserver meta")
180
181
# Construct token request fields
182
+
client_id = f"https://{app_host}/oauth/metadata"
183
184
# Self-signed JWT using the private key declared in client metadata JWKS (confidential client)
185
client_assertion = _client_assertion_jwt(
···
255
)
256
257
async with hardened_http.get_session() as session:
258
+
response = await session.request(
259
+
method,
260
url,
261
headers={
262
"Authorization": f"DPoP {access_token}",
+11
src/atproto/types.py
···
0
1
from typing import NamedTuple, NewType
2
3
AuthserverUrl = NewType("AuthserverUrl", str)
···
25
authserver_iss: str
26
access_token: str | None
27
refresh_token: str | None
0
28
dpop_authserver_nonce: str
29
dpop_pds_nonce: str | None
30
dpop_private_jwk: str
0
0
0
0
0
0
0
0
0
···
1
+
from datetime import datetime, timezone
2
from typing import NamedTuple, NewType
3
4
AuthserverUrl = NewType("AuthserverUrl", str)
···
26
authserver_iss: str
27
access_token: str | None
28
refresh_token: str | None
29
+
expires_at: int | None
30
dpop_authserver_nonce: str
31
dpop_pds_nonce: str | None
32
dpop_private_jwk: str
33
+
34
+
def is_expired(self, now: datetime | None = None) -> bool:
35
+
if self.expires_at is None:
36
+
return True
37
+
38
+
if now is None:
39
+
now = datetime.now(timezone.utc)
40
+
41
+
return self.expires_at < int(now.timestamp())
+34
-2
src/auth.py
···
0
1
from typing import NamedTuple, TypeVar
2
3
-
from flask import current_app
0
0
4
from flask.sessions import SessionMixin
5
0
6
from src.atproto.types import OAuthAuthRequest, OAuthSession
7
8
···
35
36
37
def _delete_from_session(session: SessionMixin, key: str):
38
-
del session[key]
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
39
40
41
OAuthClass = TypeVar("OAuthClass")
···
1
+
from datetime import datetime, timedelta, timezone
2
from typing import NamedTuple, TypeVar
3
4
+
from aiohttp.client import ClientSession
5
+
from authlib.jose import JsonWebKey
6
+
from flask import current_app, request
7
from flask.sessions import SessionMixin
8
9
+
from src.atproto.oauth import refresh_token_request
10
from src.atproto.types import OAuthAuthRequest, OAuthSession
11
12
···
39
40
41
def _delete_from_session(session: SessionMixin, key: str):
42
+
try:
43
+
del session[key]
44
+
except KeyError:
45
+
pass
46
+
47
+
48
+
async def refresh_auth_session(
49
+
session: SessionMixin,
50
+
client: ClientSession,
51
+
current: OAuthSession,
52
+
) -> OAuthSession | None:
53
+
current_app.logger.debug("refreshing oauth tokens")
54
+
CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
55
+
tokens, dpop_authserver_nonce = await refresh_token_request(
56
+
client=client,
57
+
user=current,
58
+
app_host=request.host,
59
+
client_secret_jwk=CLIENT_SECRET_JWK,
60
+
)
61
+
now = datetime.now(timezone.utc)
62
+
expires_at = now + timedelta(seconds=tokens.expires_in or 300)
63
+
user = current._replace(
64
+
access_token=tokens.access_token,
65
+
refresh_token=tokens.refresh_token,
66
+
expires_at=int(expires_at.timestamp()),
67
+
dpop_pds_nonce=dpop_authserver_nonce,
68
+
)
69
+
save_auth_session(session, user)
70
+
return user
71
72
73
OAuthClass = TypeVar("OAuthClass")
+17
-9
src/main.py
···
1
import asyncio
2
import json
3
-
from typing import Any, NamedTuple
4
5
from aiohttp.client import ClientSession
6
from flask import Flask, g, redirect, render_template, request, session, url_for
···
15
)
16
from src.atproto.oauth import pds_authed_req
17
from src.atproto.types import DID, Handle, OAuthSession, PdsUrl
18
-
from src.auth import get_auth_session, save_auth_session
0
0
0
0
19
from src.db import KV, close_db_connection, get_db, init_db
20
from src.oauth import oauth
21
···
32
g.user = get_auth_session(session)
33
34
35
-
def get_user() -> OAuthSession | None:
36
-
return g.user
0
0
0
0
37
38
39
@app.teardown_appcontext
···
114
115
116
@app.get("/login")
117
-
def page_login():
118
-
if get_user() is not None:
119
return redirect("/editor")
120
return render_template("login.html", auth_servers=auth_servers)
121
···
138
139
@app.get("/editor")
140
async def page_editor():
141
-
user = get_user()
142
if user is None:
143
return redirect("/login", 302)
144
···
167
168
@app.post("/editor/profile")
169
async def post_editor_profile():
170
-
user = get_user()
171
if user is None:
172
url = url_for("auth_logout")
173
return htmx_response(redirect=url) if htmx else redirect(url, 303)
···
211
212
@app.post("/editor/links")
213
async def post_editor_links():
214
-
user = get_user()
215
if user is None:
216
url = url_for("auth_logout")
217
return htmx_response(redirect=url) if htmx else redirect(url, 303)
···
1
import asyncio
2
import json
3
+
from typing import Any, NamedTuple, cast
4
5
from aiohttp.client import ClientSession
6
from flask import Flask, g, redirect, render_template, request, session, url_for
···
15
)
16
from src.atproto.oauth import pds_authed_req
17
from src.atproto.types import DID, Handle, OAuthSession, PdsUrl
18
+
from src.auth import (
19
+
get_auth_session,
20
+
refresh_auth_session,
21
+
save_auth_session,
22
+
)
23
from src.db import KV, close_db_connection, get_db, init_db
24
from src.oauth import oauth
25
···
36
g.user = get_auth_session(session)
37
38
39
+
async def get_user() -> OAuthSession | None:
40
+
user = cast(OAuthSession | None, g.user)
41
+
if user is not None and user.is_expired():
42
+
async with ClientSession() as client:
43
+
user = await refresh_auth_session(session, client, user)
44
+
return user
45
46
47
@app.teardown_appcontext
···
122
123
124
@app.get("/login")
125
+
async def page_login():
126
+
if await get_user() is not None:
127
return redirect("/editor")
128
return render_template("login.html", auth_servers=auth_servers)
129
···
146
147
@app.get("/editor")
148
async def page_editor():
149
+
user = await get_user()
150
if user is None:
151
return redirect("/login", 302)
152
···
175
176
@app.post("/editor/profile")
177
async def post_editor_profile():
178
+
user = await get_user()
179
if user is None:
180
url = url_for("auth_logout")
181
return htmx_response(redirect=url) if htmx else redirect(url, 303)
···
219
220
@app.post("/editor/links")
221
async def post_editor_links():
222
+
user = await get_user()
223
if user is None:
224
url = url_for("auth_logout")
225
return htmx_response(redirect=url) if htmx else redirect(url, 303)
+4
src/oauth.py
···
1
import json
0
2
from urllib.parse import urlencode
3
4
from aiohttp.client import ClientSession
···
200
assert pds_url is not None
201
202
current_app.logger.debug("storing user oauth session")
0
0
203
oauth_session = OAuthSession(
204
did,
205
handle,
···
207
authserver_iss,
208
tokens.access_token,
209
tokens.refresh_token,
0
210
dpop_authserver_nonce,
211
None,
212
auth_request.dpop_private_jwk,
···
1
import json
2
+
from datetime import datetime, timedelta, timezone
3
from urllib.parse import urlencode
4
5
from aiohttp.client import ClientSession
···
201
assert pds_url is not None
202
203
current_app.logger.debug("storing user oauth session")
204
+
now = datetime.now(timezone.utc)
205
+
expires_at = now + timedelta(seconds=tokens.expires_in or 300)
206
oauth_session = OAuthSession(
207
did,
208
handle,
···
210
authserver_iss,
211
tokens.access_token,
212
tokens.refresh_token,
213
+
int(expires_at.timestamp()),
214
dpop_authserver_nonce,
215
None,
216
auth_request.dpop_private_jwk,