tangled
alpha
login
or
join now
leaflet.pub
/
leaflet
289
fork
atom
a tool for shared writing and social publishing
289
fork
atom
overview
issues
27
pulls
pipelines
add inngest function to write records
awarm.space
1 month ago
14b33393
401fd15b
+82
3 changed files
expand all
collapse all
unified
split
app
api
inngest
client.ts
functions
write_records_to_pds.ts
route.tsx
+10
app/api/inngest/client.ts
···
51
documentUris?: string[];
52
};
53
};
0
0
0
0
0
0
0
0
0
0
54
};
55
56
// Create a client to send and receive events
···
51
documentUris?: string[];
52
};
53
};
54
+
"user/write-records-to-pds": {
55
+
data: {
56
+
did: string;
57
+
records: Array<{
58
+
collection: string;
59
+
rkey: string;
60
+
record: unknown;
61
+
}>;
62
+
};
63
+
};
64
};
65
66
// Create a client to send and receive events
+70
app/api/inngest/functions/write_records_to_pds.ts
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
import { inngest } from "../client";
2
+
import { restoreOAuthSession } from "src/atproto-oauth";
3
+
import { AtpBaseClient } from "lexicons/api";
4
+
5
+
// Batch size to avoid Inngest payload limits and PDS rate limits
6
+
const BATCH_SIZE = 50;
7
+
8
+
// Helper to create authenticated agent - must be called fresh in each step
9
+
// (OAuth sessions cannot be serialized across Inngest steps)
10
+
async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> {
11
+
const result = await restoreOAuthSession(did);
12
+
if (!result.ok) {
13
+
throw new Error(`Failed to restore OAuth session: ${result.error.message}`);
14
+
}
15
+
return new AtpBaseClient(result.value.fetchHandler.bind(result.value));
16
+
}
17
+
18
+
export const write_records_to_pds = inngest.createFunction(
19
+
{ id: "write-records-to-pds" },
20
+
{ event: "user/write-records-to-pds" },
21
+
async ({ event, step }) => {
22
+
const { did, records } = event.data;
23
+
24
+
// Step 1: Verify OAuth session is valid before proceeding
25
+
await step.run("verify-oauth-session", async () => {
26
+
const result = await restoreOAuthSession(did);
27
+
if (!result.ok) {
28
+
throw new Error(`OAuth restore failed: ${result.error.message}`);
29
+
}
30
+
return { success: true };
31
+
});
32
+
33
+
// Step 2: Write records to PDS in batches
34
+
// Split records into batches to avoid payload limits and rate limiting
35
+
const batches: typeof records[] = [];
36
+
for (let i = 0; i < records.length; i += BATCH_SIZE) {
37
+
batches.push(records.slice(i, i + BATCH_SIZE));
38
+
}
39
+
40
+
let totalWritten = 0;
41
+
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
42
+
const batch = batches[batchIndex];
43
+
const batchWritten = await step.run(
44
+
`write-batch-${batchIndex}`,
45
+
async () => {
46
+
const agent = await createAuthenticatedAgent(did);
47
+
let written = 0;
48
+
for (const rec of batch) {
49
+
await agent.com.atproto.repo.putRecord({
50
+
repo: did,
51
+
collection: rec.collection,
52
+
rkey: rec.rkey,
53
+
record: rec.record as Record<string, unknown>,
54
+
validate: false,
55
+
});
56
+
written++;
57
+
}
58
+
return written;
59
+
},
60
+
);
61
+
totalWritten += batchWritten;
62
+
}
63
+
64
+
return {
65
+
success: true,
66
+
recordsWritten: totalWritten,
67
+
batchCount: batches.length,
68
+
};
69
+
},
70
+
);
+2
app/api/inngest/route.tsx
···
12
cleanup_expired_oauth_sessions,
13
check_oauth_session,
14
} from "./functions/cleanup_expired_oauth_sessions";
0
15
16
export const { GET, POST, PUT } = serve({
17
client: inngest,
···
26
fix_standard_document_postref,
27
cleanup_expired_oauth_sessions,
28
check_oauth_session,
0
29
],
30
});
···
12
cleanup_expired_oauth_sessions,
13
check_oauth_session,
14
} from "./functions/cleanup_expired_oauth_sessions";
15
+
import { write_records_to_pds } from "./functions/write_records_to_pds";
16
17
export const { GET, POST, PUT } = serve({
18
client: inngest,
···
27
fix_standard_document_postref,
28
cleanup_expired_oauth_sessions,
29
check_oauth_session,
30
+
write_records_to_pds,
31
],
32
});