A TypeScript toolkit for consuming the Bluesky network in real-time.
1// User Feed
2// Searches for Bluesky users by name, then streams their posts in real-time.
3// Demonstrates dynamic stream filtering and paginated API search with rate limiting.
4//
5// Usage: npx tsx examples/user-feed.ts <query> [pageSize]
6// Example: npx tsx examples/user-feed.ts developers
7
8import type WebSocket from "ws";
9import {
10 type UserRegistry,
11 parseUserPost,
12 formatUserPost,
13 startStreamWithReconnect,
14 sendOptionsUpdate,
15 waitForOpen,
16 fetchAndMergeUsers,
17 isRateLimitError,
18 delay,
19} from "../lib/index.js";
20
21// --- Config ---
22
23const query = process.argv[2];
24
25if (!query) {
26 console.error("Usage: npx tsx examples/user-feed.ts <query> [pageSize]");
27 console.error("Example: npx tsx examples/user-feed.ts developers");
28 process.exit(1);
29}
30
31// --- Effects ---
32
33const sendFilterUpdate = (ws: WebSocket, alexMap: UserRegistry): void => {
34 if (sendOptionsUpdate(ws, ["app.bsky.feed.post"], [...alexMap.keys()])) {
35 console.log(`Stream updated: ${alexMap.size} users\n`);
36 }
37};
38
39const search = async (
40 ws: WebSocket,
41 initialMap: UserRegistry
42): Promise<UserRegistry> => {
43 let alexMap = initialMap;
44 let cursor: string | undefined;
45 let page = 0;
46
47 while (true) {
48 try {
49 const result = await fetchAndMergeUsers(query, alexMap, cursor);
50 alexMap = result.merged;
51 page++;
52
53 console.log(`Page ${page}: ${result.newCount} new (total: ${alexMap.size})`);
54
55 if (page % 10 === 0) sendFilterUpdate(ws, alexMap);
56
57 cursor = result.cursor;
58 if (!cursor || result.actorCount === 0) {
59 console.log(`Search exhausted after ${page} pages.`);
60 break;
61 }
62 } catch (err: any) {
63 if (isRateLimitError(err)) {
64 console.log("Rate limited, waiting 30s...");
65 await delay(30_000);
66 } else {
67 console.error("Error:", err);
68 break;
69 }
70 }
71 }
72
73 sendFilterUpdate(ws, alexMap);
74 return alexMap;
75};
76
77// --- Main ---
78
79const main = async () => {
80 let alexMap: UserRegistry = new Map();
81
82 const { getWs } = startStreamWithReconnect({
83 config: { requireHello: true },
84 onEvent: (event) => {
85 const post = parseUserPost(event, alexMap);
86 if (post) console.log(formatUserPost(post) + "\n");
87 },
88 onOpen: () => console.log("Stream connected, searching for users...\n"),
89 });
90
91 await waitForOpen(getWs());
92 alexMap = await search(getWs(), alexMap);
93};
94
95main();