tangled
alpha
login
or
join now
ligo.at
/
core
6
fork
atom
decentralized and customizable links page on top of atproto
ligo.at
atproto
link-in-bio
python
uv
6
fork
atom
overview
issues
2
pulls
pipelines
use async await more
nauta.one
5 months ago
39e4c4ff
77e3fdc5
+88
-75
6 changed files
expand all
collapse all
unified
split
src
atproto
__init__.py
oauth.py
main.py
oauth.py
security.py
templates
login.html
+33
-30
src/atproto/__init__.py
···
25
return regex_match(DID_REGEX, did) is not None
26
27
28
-
def resolve_identity(
29
query: str,
30
didkv: KV = nokv,
31
) -> tuple[str, str, dict[str, Any]] | None:
···
36
did = resolve_did_from_handle(handle, didkv)
37
if not did:
38
return None
39
-
doc = resolve_doc_from_did(did)
40
if not doc:
41
return None
42
handles = handles_from_doc(doc)
···
46
47
if is_valid_did(query):
48
did = query
49
-
doc = resolve_doc_from_did(did)
50
if not doc:
51
return None
52
handle = handle_from_doc(doc)
···
120
return None
121
122
123
-
def resolve_pds_from_did(
124
did: DID,
125
kv: KV = nokv,
126
reload: bool = False,
···
130
print(f"returning cached pds for {did}")
131
return pds
132
133
-
doc = resolve_doc_from_did(did)
134
if doc is None:
135
return None
136
pds = doc["service"][0]["serviceEndpoint"]
···
141
return pds
142
143
144
-
def resolve_doc_from_did(
145
did: DID,
146
directory: str = PLC_DIRECTORY,
147
) -> dict[str, Any] | None:
148
-
if did.startswith("did:plc:"):
149
-
response = httpx.get(f"{directory}/{did}")
150
-
if response.is_success:
151
-
return response.json()
152
-
return None
0
153
154
-
if did.startswith("did:web:"):
155
-
# TODO: resolve did:web
156
-
return None
157
158
return None
159
160
161
-
def resolve_authserver_from_pds(
162
pds_url: PdsUrl,
163
kv: KV = nokv,
164
reload: bool = False,
···
172
173
assert is_safe_url(pds_url)
174
endpoint = f"{pds_url}/.well-known/oauth-protected-resource"
175
-
response = httpx.get(endpoint)
176
-
if response.status_code != 200:
177
-
return None
178
-
parsed: dict[str, list[str]] = response.json()
179
-
authserver_url = parsed["authorization_servers"][0]
180
-
print(f"caching authserver {authserver_url} for PDS {pds_url}")
181
-
kv.set(pds_url, value=authserver_url)
182
-
return authserver_url
0
183
184
185
-
def fetch_authserver_meta(authserver_url: str) -> dict[str, str] | None:
186
"""Returns metadata from the authserver"""
187
assert is_safe_url(authserver_url)
188
endpoint = f"{authserver_url}/.well-known/oauth-authorization-server"
189
-
response = httpx.get(endpoint)
190
-
if not response.is_success:
191
-
return None
192
-
meta: dict[str, Any] = response.json()
193
-
assert is_valid_authserver_meta(meta, authserver_url)
194
-
return meta
0
195
196
197
async def get_record(
···
25
return regex_match(DID_REGEX, did) is not None
26
27
28
+
async def resolve_identity(
29
query: str,
30
didkv: KV = nokv,
31
) -> tuple[str, str, dict[str, Any]] | None:
···
36
did = resolve_did_from_handle(handle, didkv)
37
if not did:
38
return None
39
+
doc = await resolve_doc_from_did(did)
40
if not doc:
41
return None
42
handles = handles_from_doc(doc)
···
46
47
if is_valid_did(query):
48
did = query
49
+
doc = await resolve_doc_from_did(did)
50
if not doc:
51
return None
52
handle = handle_from_doc(doc)
···
120
return None
121
122
123
+
async def resolve_pds_from_did(
124
did: DID,
125
kv: KV = nokv,
126
reload: bool = False,
···
130
print(f"returning cached pds for {did}")
131
return pds
132
133
+
doc = await resolve_doc_from_did(did)
134
if doc is None:
135
return None
136
pds = doc["service"][0]["serviceEndpoint"]
···
141
return pds
142
143
144
+
async def resolve_doc_from_did(
145
did: DID,
146
directory: str = PLC_DIRECTORY,
147
) -> dict[str, Any] | None:
148
+
async with httpx.AsyncClient() as client:
149
+
if did.startswith("did:plc:"):
150
+
response = await client.get(f"{directory}/{did}")
151
+
if response.is_success:
152
+
return response.json()
153
+
return None
154
155
+
if did.startswith("did:web:"):
156
+
# TODO: resolve did:web
157
+
return None
158
159
return None
160
161
162
+
async def resolve_authserver_from_pds(
163
pds_url: PdsUrl,
164
kv: KV = nokv,
165
reload: bool = False,
···
173
174
assert is_safe_url(pds_url)
175
endpoint = f"{pds_url}/.well-known/oauth-protected-resource"
176
+
async with httpx.AsyncClient() as client:
177
+
response = await client.get(endpoint)
178
+
if response.status_code != 200:
179
+
return None
180
+
parsed: dict[str, list[str]] = response.json()
181
+
authserver_url = parsed["authorization_servers"][0]
182
+
print(f"caching authserver {authserver_url} for PDS {pds_url}")
183
+
kv.set(pds_url, value=authserver_url)
184
+
return authserver_url
185
186
187
+
async def fetch_authserver_meta(authserver_url: str) -> dict[str, str] | None:
188
"""Returns metadata from the authserver"""
189
assert is_safe_url(authserver_url)
190
endpoint = f"{authserver_url}/.well-known/oauth-authorization-server"
191
+
async with httpx.AsyncClient() as client:
192
+
response = await client.get(endpoint)
193
+
if not response.is_success:
194
+
return None
195
+
meta: dict[str, Any] = response.json()
196
+
assert is_valid_authserver_meta(meta, authserver_url)
197
+
return meta
198
199
200
async def get_record(
+27
-22
src/atproto/oauth.py
···
1
from typing import Any, Callable, NamedTuple
2
import time
3
import json
4
-
from authlib.jose import JsonWebKey, Key
5
from authlib.common.security import generate_token
6
-
from authlib.jose import jwt
7
from authlib.oauth2.rfc7636 import create_s256_code_challenge
8
from httpx import Response
9
···
26
27
# Prepares and sends a pushed auth request (PAR) via HTTP POST to the Authorization Server.
28
# Returns "state" id HTTP response on success, without checking HTTP response status
29
-
def send_par_auth_request(
30
authserver_url: str,
31
authserver_meta: dict[str, str],
32
login_hint: str | None,
···
71
72
# IMPORTANT: Pushed Authorization Request URL is untrusted input, SSRF mitigations are needed
73
assert is_safe_url(par_url)
74
-
with hardened_http.get_session() as sess:
75
-
resp = sess.post(
76
par_url,
77
headers={
78
"Content-Type": "application/x-www-form-urlencoded",
···
88
dpop_proof = _authserver_dpop_jwt(
89
"POST", par_url, dpop_authserver_nonce, dpop_private_jwk
90
)
91
-
with hardened_http.get_session() as sess:
92
-
resp = sess.post(
93
par_url,
94
headers={
95
"Content-Type": "application/x-www-form-urlencoded",
···
104
# Completes the auth flow by sending an initial auth token request.
105
# Returns token response (OAuthTokens) and DPoP nonce (str)
106
# IMPORTANT: the 'tokens.sub' field must be verified against the original request by code calling this function.
107
-
def initial_token_request(
108
auth_request: OAuthAuthRequest,
109
code: str,
110
app_url: str,
···
113
authserver_url = auth_request.authserver_iss
114
115
# Re-fetch server metadata
116
-
authserver_meta = fetch_authserver_meta(authserver_url)
117
if not authserver_meta:
118
raise Exception("missing authserver meta")
119
···
146
147
# IMPORTANT: Token URL is untrusted input, SSRF mitigations are needed
148
assert is_safe_url(token_url)
149
-
with hardened_http.get_session() as sess:
150
-
resp = sess.post(token_url, data=params, headers={"DPoP": dpop_proof})
151
152
# Handle DPoP missing/invalid nonce error by retrying with server-provided nonce
153
if resp.status_code == 400 and resp.json()["error"] == "use_dpop_nonce":
···
157
dpop_proof = _authserver_dpop_jwt(
158
"POST", token_url, dpop_authserver_nonce, dpop_private_jwk
159
)
160
-
with hardened_http.get_session() as sess:
161
-
resp = sess.post(token_url, data=params, headers={"DPoP": dpop_proof})
0
0
0
0
162
163
resp.raise_for_status()
164
token_body = resp.json()
···
168
169
170
# Returns token response (OAuthTokens) and DPoP nonce (str)
171
-
def refresh_token_request(
172
user: OAuthSession,
173
app_url: str,
174
client_secret_jwk: Key,
···
176
authserver_url = user.authserver_iss
177
178
# Re-fetch server metadata
179
-
authserver_meta = fetch_authserver_meta(authserver_url)
180
if not authserver_meta:
181
raise Exception("missing authserver meta")
182
···
206
207
# IMPORTANT: Token URL is untrusted input, SSRF mitigations are needed
208
assert is_safe_url(token_url)
209
-
with hardened_http.get_session() as sess:
210
-
resp = sess.post(token_url, data=params, headers={"DPoP": dpop_proof})
211
212
# Handle DPoP missing/invalid nonce error by retrying with server-provided nonce
213
if resp.status_code == 400 and resp.json()["error"] == "use_dpop_nonce":
···
217
dpop_proof = _authserver_dpop_jwt(
218
"POST", token_url, dpop_authserver_nonce, dpop_private_jwk
219
)
220
-
with hardened_http.get_session() as sess:
221
-
resp = sess.post(token_url, data=params, headers={"DPoP": dpop_proof})
0
0
222
223
if resp.status_code not in [200, 201]:
224
print(f"Token Refresh Error: {resp.json()}")
···
232
233
# Helper to demonstrate making a request (HTTP GET or POST) to the user's PDS ("Resource Server" in OAuth terminology) using DPoP and access token.
234
# This method returns a 'requests' reponse, without checking status code.
235
-
def pds_authed_req(
236
method: str,
237
url: str,
238
user: OAuthSession,
···
255
dpop_private_jwk,
256
)
257
258
-
with hardened_http.get_session() as sess:
259
-
response = sess.post(
260
url,
261
headers={
262
"Authorization": f"DPoP {access_token}",
···
1
from typing import Any, Callable, NamedTuple
2
import time
3
import json
4
+
from authlib.jose import JsonWebKey, Key, jwt
5
from authlib.common.security import generate_token
0
6
from authlib.oauth2.rfc7636 import create_s256_code_challenge
7
from httpx import Response
8
···
25
26
# Prepares and sends a pushed auth request (PAR) via HTTP POST to the Authorization Server.
27
# Returns "state" id HTTP response on success, without checking HTTP response status
28
+
async def send_par_auth_request(
29
authserver_url: str,
30
authserver_meta: dict[str, str],
31
login_hint: str | None,
···
70
71
# IMPORTANT: Pushed Authorization Request URL is untrusted input, SSRF mitigations are needed
72
assert is_safe_url(par_url)
73
+
async with hardened_http.get_session() as session:
74
+
resp = await session.post(
75
par_url,
76
headers={
77
"Content-Type": "application/x-www-form-urlencoded",
···
87
dpop_proof = _authserver_dpop_jwt(
88
"POST", par_url, dpop_authserver_nonce, dpop_private_jwk
89
)
90
+
async with hardened_http.get_session() as session:
91
+
resp = await session.post(
92
par_url,
93
headers={
94
"Content-Type": "application/x-www-form-urlencoded",
···
103
# Completes the auth flow by sending an initial auth token request.
104
# Returns token response (OAuthTokens) and DPoP nonce (str)
105
# IMPORTANT: the 'tokens.sub' field must be verified against the original request by code calling this function.
106
+
async def initial_token_request(
107
auth_request: OAuthAuthRequest,
108
code: str,
109
app_url: str,
···
112
authserver_url = auth_request.authserver_iss
113
114
# Re-fetch server metadata
115
+
authserver_meta = await fetch_authserver_meta(authserver_url)
116
if not authserver_meta:
117
raise Exception("missing authserver meta")
118
···
145
146
# IMPORTANT: Token URL is untrusted input, SSRF mitigations are needed
147
assert is_safe_url(token_url)
148
+
async with hardened_http.get_session() as session:
149
+
resp = await session.post(token_url, data=params, headers={"DPoP": dpop_proof})
150
151
# Handle DPoP missing/invalid nonce error by retrying with server-provided nonce
152
if resp.status_code == 400 and resp.json()["error"] == "use_dpop_nonce":
···
156
dpop_proof = _authserver_dpop_jwt(
157
"POST", token_url, dpop_authserver_nonce, dpop_private_jwk
158
)
159
+
async with hardened_http.get_session() as session:
160
+
resp = await session.post(
161
+
token_url,
162
+
data=params,
163
+
headers={"DPoP": dpop_proof},
164
+
)
165
166
resp.raise_for_status()
167
token_body = resp.json()
···
171
172
173
# Returns token response (OAuthTokens) and DPoP nonce (str)
174
+
async def refresh_token_request(
175
user: OAuthSession,
176
app_url: str,
177
client_secret_jwk: Key,
···
179
authserver_url = user.authserver_iss
180
181
# Re-fetch server metadata
182
+
authserver_meta = await fetch_authserver_meta(authserver_url)
183
if not authserver_meta:
184
raise Exception("missing authserver meta")
185
···
209
210
# IMPORTANT: Token URL is untrusted input, SSRF mitigations are needed
211
assert is_safe_url(token_url)
212
+
async with hardened_http.get_session() as session:
213
+
resp = await session.post(token_url, data=params, headers={"DPoP": dpop_proof})
214
215
# Handle DPoP missing/invalid nonce error by retrying with server-provided nonce
216
if resp.status_code == 400 and resp.json()["error"] == "use_dpop_nonce":
···
220
dpop_proof = _authserver_dpop_jwt(
221
"POST", token_url, dpop_authserver_nonce, dpop_private_jwk
222
)
223
+
async with hardened_http.get_session() as session:
224
+
resp = await session.post(
225
+
token_url, data=params, headers={"DPoP": dpop_proof}
226
+
)
227
228
if resp.status_code not in [200, 201]:
229
print(f"Token Refresh Error: {resp.json()}")
···
237
238
# Helper to demonstrate making a request (HTTP GET or POST) to the user's PDS ("Resource Server" in OAuth terminology) using DPoP and access token.
239
# This method returns a 'requests' reponse, without checking status code.
240
+
async def pds_authed_req(
241
method: str,
242
url: str,
243
user: OAuthSession,
···
260
dpop_private_jwk,
261
)
262
263
+
async with hardened_http.get_session() as session:
264
+
response = await session.post(
265
url,
266
headers={
267
"Authorization": f"DPoP {access_token}",
+10
-8
src/main.py
···
61
return render_template("error.html", message="profile not found"), 404
62
63
kv = KV(app, "pds_from_did")
64
-
pds = resolve_pds_from_did(did, kv, reload=reload)
65
if pds is None:
66
return render_template("error.html", message="pds not found"), 404
67
(profile, _), links = await asyncio.gather(
···
126
127
128
@app.post("/editor/profile")
129
-
def post_editor_profile():
130
user = get_user()
131
if user is None:
132
return redirect("/login", 303)
···
136
if not display_name:
137
return redirect("/editor", 303)
138
139
-
put_record(
140
user=user,
141
pds=user.pds_url,
142
repo=user.did,
···
153
154
155
@app.post("/editor/links")
156
-
def post_editor_links():
157
user = get_user()
158
if user is None:
159
return redirect("/login", 303)
···
176
link["detail"] = detail
177
links.append(link)
178
179
-
put_record(
180
user=user,
181
pds=user.pds_url,
182
repo=user.did,
···
221
async def load_profile(
222
pds: str,
223
did: str,
0
224
reload: bool = False,
225
) -> tuple[tuple[str, str] | None, bool]:
226
kv = KV(app, "profile_from_did")
···
232
233
from_bluesky = False
234
record = await get_record(pds, did, f"{SCHEMA}.actor.profile", "self")
235
-
if record is None:
236
record = await get_record(pds, did, "app.bsky.actor.profile", "self")
237
from_bluesky = True
238
if record is None:
···
244
return profile, from_bluesky
245
246
247
-
def put_record(
0
248
user: OAuthSession,
249
pds: PdsUrl,
250
repo: str,
···
264
session_ = user._replace(dpop_pds_nonce=nonce)
265
save_auth_session(session, session_)
266
267
-
response = pds_authed_req(
268
method="POST",
269
url=endpoint,
270
body=body,
···
61
return render_template("error.html", message="profile not found"), 404
62
63
kv = KV(app, "pds_from_did")
64
+
pds = await resolve_pds_from_did(did, kv, reload=reload)
65
if pds is None:
66
return render_template("error.html", message="pds not found"), 404
67
(profile, _), links = await asyncio.gather(
···
126
127
128
@app.post("/editor/profile")
129
+
async def post_editor_profile():
130
user = get_user()
131
if user is None:
132
return redirect("/login", 303)
···
136
if not display_name:
137
return redirect("/editor", 303)
138
139
+
await put_record(
140
user=user,
141
pds=user.pds_url,
142
repo=user.did,
···
153
154
155
@app.post("/editor/links")
156
+
async def post_editor_links():
157
user = get_user()
158
if user is None:
159
return redirect("/login", 303)
···
176
link["detail"] = detail
177
links.append(link)
178
179
+
await put_record(
180
user=user,
181
pds=user.pds_url,
182
repo=user.did,
···
221
async def load_profile(
222
pds: str,
223
did: str,
224
+
fallback_with_bluesky: bool = True,
225
reload: bool = False,
226
) -> tuple[tuple[str, str] | None, bool]:
227
kv = KV(app, "profile_from_did")
···
233
234
from_bluesky = False
235
record = await get_record(pds, did, f"{SCHEMA}.actor.profile", "self")
236
+
if record is None and fallback_with_bluesky:
237
record = await get_record(pds, did, "app.bsky.actor.profile", "self")
238
from_bluesky = True
239
if record is None:
···
245
return profile, from_bluesky
246
247
248
+
# TODO: move to .atproto
249
+
async def put_record(
250
user: OAuthSession,
251
pds: PdsUrl,
252
repo: str,
···
266
session_ = user._replace(dpop_pds_nonce=nonce)
267
save_auth_session(session, session_)
268
269
+
response = await pds_authed_req(
270
method="POST",
271
url=endpoint,
272
body=body,
+15
-12
src/oauth.py
···
24
25
26
@oauth.get("/start")
27
-
def oauth_start():
28
# Identity
29
username = request.args.get("username") or request.args.get("authserver")
30
if not username:
···
36
if is_valid_handle(username) or is_valid_did(username):
37
login_hint = username
38
kv = KV(db, "did_from_handle")
39
-
identity = resolve_identity(username, didkv=kv)
40
if identity is None:
41
return "couldnt resolve identity", 500
42
did, handle, doc = identity
···
44
if not pds_url:
45
return "pds not found", 404
46
current_app.logger.debug(f"account PDS: {pds_url}")
47
-
authserver_url = resolve_authserver_from_pds(pds_url, pdskv)
48
if not authserver_url:
49
return "authserver not found", 404
50
51
elif username.startswith("https://") and is_safe_url(username):
52
did, handle, pds_url = None, None, None
53
login_hint = None
54
-
authserver_url = resolve_authserver_from_pds(username, pdskv) or username
55
56
else:
57
return "not a valid handle, did or auth server", 400
58
59
current_app.logger.debug(f"Authserver: {authserver_url}")
60
assert is_safe_url(authserver_url)
61
-
authserver_meta = fetch_authserver_meta(authserver_url)
62
if not authserver_meta:
63
return "no authserver meta", 404
64
···
77
78
CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
79
80
-
pkce_verifier, state, dpop_authserver_nonce, resp = send_par_auth_request(
81
authserver_url,
82
authserver_meta,
83
login_hint,
···
87
CLIENT_SECRET_JWK,
88
dpop_private_jwk,
89
)
0
90
if resp.status_code == 400:
91
-
current_app.logger.debug(f"PAR HTTP 400: {resp.json()}")
92
-
resp.raise_for_status()
0
0
93
94
par_request_uri: str = resp.json()["request_uri"]
95
current_app.logger.debug(f"saving oauth_auth_request to DB state={state}")
···
114
115
116
@oauth.get("/callback")
117
-
def oauth_callback():
118
state = request.args["state"]
119
authserver_iss = request.args["iss"]
120
authorization_code = request.args["code"]
···
131
132
app_url = request.url_root.replace("http://", "https://")
133
CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
134
-
tokens, dpop_authserver_nonce = initial_token_request(
135
auth_request,
136
authorization_code,
137
app_url,
···
151
else:
152
did = tokens.sub
153
assert is_valid_did(did)
154
-
identity = resolve_identity(did, didkv=didkv)
155
if not identity:
156
return "could not resolve identity", 500
157
did, handle, did_doc = identity
158
pds_url = pds_endpoint_from_doc(did_doc)
159
if not pds_url:
160
return "could not resolve pds", 500
161
-
authserver_url = resolve_authserver_from_pds(pds_url, authserverkv)
162
assert authserver_url == authserver_iss
163
164
assert row.scope == tokens.scope
···
24
25
26
@oauth.get("/start")
27
+
async def oauth_start():
28
# Identity
29
username = request.args.get("username") or request.args.get("authserver")
30
if not username:
···
36
if is_valid_handle(username) or is_valid_did(username):
37
login_hint = username
38
kv = KV(db, "did_from_handle")
39
+
identity = await resolve_identity(username, didkv=kv)
40
if identity is None:
41
return "couldnt resolve identity", 500
42
did, handle, doc = identity
···
44
if not pds_url:
45
return "pds not found", 404
46
current_app.logger.debug(f"account PDS: {pds_url}")
47
+
authserver_url = await resolve_authserver_from_pds(pds_url, pdskv)
48
if not authserver_url:
49
return "authserver not found", 404
50
51
elif username.startswith("https://") and is_safe_url(username):
52
did, handle, pds_url = None, None, None
53
login_hint = None
54
+
authserver_url = await resolve_authserver_from_pds(username, pdskv) or username
55
56
else:
57
return "not a valid handle, did or auth server", 400
58
59
current_app.logger.debug(f"Authserver: {authserver_url}")
60
assert is_safe_url(authserver_url)
61
+
authserver_meta = await fetch_authserver_meta(authserver_url)
62
if not authserver_meta:
63
return "no authserver meta", 404
64
···
77
78
CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
79
80
+
pkce_verifier, state, dpop_authserver_nonce, resp = await send_par_auth_request(
81
authserver_url,
82
authserver_meta,
83
login_hint,
···
87
CLIENT_SECRET_JWK,
88
dpop_private_jwk,
89
)
90
+
91
if resp.status_code == 400:
92
+
current_app.logger.debug("PAR request returned error 400")
93
+
current_app.logger.debug(resp.text)
94
+
return redirect(url_for("page_login"), 303)
95
+
_ = resp.raise_for_status()
96
97
par_request_uri: str = resp.json()["request_uri"]
98
current_app.logger.debug(f"saving oauth_auth_request to DB state={state}")
···
117
118
119
@oauth.get("/callback")
120
+
async def oauth_callback():
121
state = request.args["state"]
122
authserver_iss = request.args["iss"]
123
authorization_code = request.args["code"]
···
134
135
app_url = request.url_root.replace("http://", "https://")
136
CLIENT_SECRET_JWK = JsonWebKey.import_key(current_app.config["CLIENT_SECRET_JWK"])
137
+
tokens, dpop_authserver_nonce = await initial_token_request(
138
auth_request,
139
authorization_code,
140
app_url,
···
154
else:
155
did = tokens.sub
156
assert is_valid_did(did)
157
+
identity = await resolve_identity(did, didkv=didkv)
158
if not identity:
159
return "could not resolve identity", 500
160
did, handle, did_doc = identity
161
pds_url = pds_endpoint_from_doc(did_doc)
162
if not pds_url:
163
return "could not resolve pds", 500
164
+
authserver_url = await resolve_authserver_from_pds(pds_url, authserverkv)
165
assert authserver_url == authserver_iss
166
167
assert row.scope == tokens.scope
+2
-2
src/security.py
···
30
31
32
class HardenedHttp:
33
-
def get_session(self) -> httpx.Client:
34
-
return httpx.Client(
35
timeout=httpx.Timeout(20, connect=5),
36
follow_redirects=False,
37
headers={
···
30
31
32
class HardenedHttp:
33
+
def get_session(self) -> httpx.AsyncClient:
34
+
return httpx.AsyncClient(
35
timeout=httpx.Timeout(20, connect=5),
36
follow_redirects=False,
37
headers={
+1
-1
src/templates/login.html
···
19
<form action="{{ url_for('auth_login') }}" method="post">
20
<label>
21
<span>Handle</span>
22
-
<input type="text" name="username" placeholder="username.example.com" autocapitalize="off" autocomplete="off" spellcheck="false" required />
23
</label>
24
<span class="caption">
25
Use your AT Protocol handle to log in.
···
19
<form action="{{ url_for('auth_login') }}" method="post">
20
<label>
21
<span>Handle</span>
22
+
<input type="text" name="username" placeholder="username.example.com" autocapitalize="off" spellcheck="false" required />
23
</label>
24
<span class="caption">
25
Use your AT Protocol handle to log in.