tangled
alpha
login
or
join now
tokono.ma
/
diffuse
5
fork
atom
A music player that connects to your cloud/distributed storage.
5
fork
atom
overview
issues
4
pulls
pipelines
chore: remove atproto retry logic
Steven Vandevelde
2 weeks ago
54a47ceb
c278f157
+17
-67
1 changed file
expand all
collapse all
unified
split
src
components
output
raw
atproto
element.js
+17
-67
src/components/output/raw/atproto/element.js
···
149
149
* @param {unknown} err
150
150
* @returns {boolean}
151
151
*/
152
152
-
#isRateLimitError(err) {
153
153
-
if (err instanceof ClientResponseError && err.status === 429) return true;
154
154
-
if (err && typeof err === "object" && "cause" in err) {
155
155
-
return this.#isRateLimitError(/** @type {any} */ (err).cause);
156
156
-
}
157
157
-
return false;
158
158
-
}
159
159
-
160
160
-
/**
161
161
-
* Retry an async operation on rate-limit errors, respecting Retry-After.
162
162
-
*
163
163
-
* @template T
164
164
-
* @param {() => Promise<T>} fn
165
165
-
* @returns {Promise<T>}
166
166
-
*/
167
167
-
async #withRetry(fn) {
168
168
-
let delay = 10_000;
169
169
-
for (let attempt = 0;; attempt++) {
170
170
-
try {
171
171
-
return await fn();
172
172
-
} catch (err) {
173
173
-
if (attempt < 5 && this.#isRateLimitError(err)) {
174
174
-
let wait = delay;
175
175
-
if (err instanceof ClientResponseError) {
176
176
-
const resetAt = err.headers.get("ratelimit-reset");
177
177
-
if (resetAt) {
178
178
-
wait = Math.max(0, parseFloat(resetAt) * 1000 - Date.now());
179
179
-
}
180
180
-
}
181
181
-
await new Promise((r) => setTimeout(r, wait));
182
182
-
delay *= 2;
183
183
-
continue;
184
184
-
}
185
185
-
throw err;
186
186
-
}
187
187
-
}
188
188
-
}
189
189
-
190
190
-
/**
191
191
-
* @param {unknown} err
192
192
-
* @returns {boolean}
193
193
-
*/
194
152
#isSessionError(err) {
195
153
if (err instanceof TokenRefreshError) return true;
196
154
// OAuthUserAgent.handle() swallows TokenRefreshError and returns the
···
263
221
if (!rpc || !did) return null;
264
222
265
223
try {
266
266
-
const result = await this.#withRetry(() =>
267
267
-
ok(rpc.get(
268
268
-
"com.atproto.sync.getLatestCommit",
269
269
-
{ params: { did } },
270
270
-
))
271
271
-
);
224
224
+
const result = await ok(rpc.get(
225
225
+
"com.atproto.sync.getLatestCommit",
226
226
+
{ params: { did } },
227
227
+
));
272
228
273
229
this.#rev.value = result?.rev;
274
230
return result?.rev;
···
301
257
let cursor;
302
258
303
259
do {
304
304
-
const page = await this.#withRetry(() =>
305
305
-
ok(rpc.get(
306
306
-
"com.atproto.repo.listRecords",
307
307
-
{ params: { repo: did, collection, limit: 100, cursor } },
308
308
-
))
309
309
-
);
260
260
+
const page = await ok(rpc.get(
261
261
+
"com.atproto.repo.listRecords",
262
262
+
{ params: { repo: did, collection, limit: 100, cursor } },
263
263
+
));
310
264
311
265
for (const record of (page?.records ?? [])) {
312
266
records.push(record.value);
···
343
297
let cursor;
344
298
345
299
do {
346
346
-
const page = await this.#withRetry(() =>
347
347
-
ok(rpc.get(
348
348
-
"com.atproto.repo.listRecords",
349
349
-
{
350
350
-
params: { repo: this.#did.value, collection, limit: 100, cursor },
351
351
-
},
352
352
-
))
353
353
-
);
300
300
+
const page = await ok(rpc.get(
301
301
+
"com.atproto.repo.listRecords",
302
302
+
{
303
303
+
params: { repo: this.#did.value, collection, limit: 100, cursor },
304
304
+
},
305
305
+
));
354
306
355
307
for (const record of (page?.records ?? [])) {
356
308
const rkey = record.uri.split("/").pop();
···
403
355
for (let i = 0; i < writes.length; i += 100) {
404
356
const batch = writes.slice(i, i + 100);
405
357
406
406
-
const result = await this.#withRetry(() =>
407
407
-
ok(rpc.post("com.atproto.repo.applyWrites", {
408
408
-
input: { repo: this.#did.value, writes: batch },
409
409
-
}))
410
410
-
);
358
358
+
const result = await ok(rpc.post("com.atproto.repo.applyWrites", {
359
359
+
input: { repo: this.#did.value, writes: batch },
360
360
+
}));
411
361
412
362
if (result?.commit?.rev) {
413
363
this.#rev.value = result.commit.rev;