A tool for tailing the firehose and matching images against known perceptual hashes, and then labeling them.
1import { AtpAgent } from "@atproto/api";
2import { Agent, setGlobalDispatcher } from "undici";
3import { config } from "./config/index.js";
4import { logger } from "./logger/index.js";
5import { type SessionData, loadSession, saveSession } from "./session.js";
6
7setGlobalDispatcher(
8 new Agent({
9 connect: { timeout: 20_000 },
10 keepAliveTimeout: 10_000,
11 keepAliveMaxTimeout: 20_000,
12 })
13);
14
15export const agent = new AtpAgent({
16 service: config.ozone.pds,
17});
18
19const JWT_LIFETIME_MS = 2 * 60 * 60 * 1000; // 2 hours (typical ATP JWT lifetime)
20const REFRESH_AT_PERCENT = 0.8; // Refresh at 80% of lifetime
21let refreshTimer: NodeJS.Timeout | null = null;
22
23async function refreshSession(): Promise<void> {
24 try {
25 logger.info("Refreshing session tokens");
26 if (!agent.session) {
27 throw new Error("No active session to refresh");
28 }
29 await agent.resumeSession(agent.session);
30
31 saveSession(agent.session as SessionData);
32 scheduleSessionRefresh();
33 } catch (error: unknown) {
34 logger.error({ error }, "Failed to refresh session, will re-authenticate");
35 await performLogin();
36 }
37}
38
39function scheduleSessionRefresh(): void {
40 if (refreshTimer) {
41 clearTimeout(refreshTimer);
42 }
43
44 const refreshIn = JWT_LIFETIME_MS * REFRESH_AT_PERCENT;
45 logger.debug(`Scheduling session refresh in ${(refreshIn / 1000 / 60).toFixed(1)} minutes`);
46
47 refreshTimer = setTimeout(() => {
48 refreshSession().catch((error: unknown) => {
49 logger.error({ error }, "Scheduled session refresh failed");
50 });
51 }, refreshIn);
52}
53
54async function performLogin(): Promise<boolean> {
55 try {
56 logger.info("Performing fresh login");
57 const response = await agent.login({
58 identifier: config.labeler.handle,
59 password: config.labeler.password,
60 });
61
62 if (response.success && agent.session) {
63 saveSession(agent.session as SessionData);
64 scheduleSessionRefresh();
65 logger.info("Login successful, session saved");
66 return true;
67 }
68
69 logger.error("Login failed: no session returned");
70 return false;
71 } catch (error) {
72 logger.error({ error }, "Login failed");
73 return false;
74 }
75}
76
77const MAX_LOGIN_RETRIES = 3;
78const RETRY_DELAY_MS = 2000;
79
80let loginPromise: Promise<void> | null = null;
81
82async function sleep(ms: number): Promise<void> {
83 return new Promise((resolve) => setTimeout(resolve, ms));
84}
85
86async function authenticate(): Promise<boolean> {
87 const savedSession = loadSession();
88
89 if (savedSession) {
90 try {
91 logger.info("Attempting to resume saved session");
92 await agent.resumeSession(savedSession);
93
94 // Verify session is still valid with a lightweight call
95 await agent.getProfile({ actor: savedSession.did });
96
97 logger.info("Session resumed successfully");
98 scheduleSessionRefresh();
99 return true;
100 } catch (error) {
101 logger.warn({ error }, "Saved session invalid, will re-authenticate");
102 }
103 }
104
105 return performLogin();
106}
107
108async function authenticateWithRetry(): Promise<void> {
109 // Reuse existing login attempt if one is in progress
110 if (loginPromise) {
111 return loginPromise;
112 }
113
114 loginPromise = (async () => {
115 for (let attempt = 1; attempt <= MAX_LOGIN_RETRIES; attempt++) {
116 logger.info({ attempt, maxRetries: MAX_LOGIN_RETRIES }, "Attempting login");
117
118 const success = await authenticate();
119
120 if (success) {
121 logger.info("Authentication successful");
122 return;
123 }
124
125 if (attempt < MAX_LOGIN_RETRIES) {
126 logger.warn(
127 { attempt, maxRetries: MAX_LOGIN_RETRIES, retryInMs: RETRY_DELAY_MS },
128 "Login failed, retrying"
129 );
130 await sleep(RETRY_DELAY_MS);
131 }
132 }
133
134 logger.error({ maxRetries: MAX_LOGIN_RETRIES }, "All login attempts failed, aborting");
135 process.exit(1);
136 })();
137
138 return loginPromise;
139}
140
141export const login = authenticateWithRetry;
142export const isLoggedIn = authenticateWithRetry().then(() => true);