actually forcing myself to develop good habits this year on my own projects because i deserve them
+540
-11
Diff
round #0
+27
apps/firehose-service/.env.example
+27
apps/firehose-service/.env.example
···
···
1
+
# Database
2
+
DATABASE_URL=postgres://postgres:postgres@localhost:5432/wisp
3
+
4
+
# Firehose
5
+
FIREHOSE_SERVICE=wss://bsky.network
6
+
FIREHOSE_MAX_CONCURRENCY=5
7
+
8
+
# Redis (cache invalidation + revalidation queue)
9
+
REDIS_URL=redis://localhost:6379
10
+
11
+
# S3 Storage (leave empty for local disk fallback)
12
+
S3_BUCKET=
13
+
S3_METADATA_BUCKET=
14
+
S3_REGION=auto
15
+
S3_ENDPOINT=
16
+
S3_PREFIX=sites/
17
+
S3_FORCE_PATH_STYLE=true
18
+
19
+
# AWS Credentials (required if using S3)
20
+
AWS_ACCESS_KEY_ID=
21
+
AWS_SECRET_ACCESS_KEY=
22
+
23
+
# Health check server
24
+
HEALTH_PORT=3001
25
+
26
+
# For local disk fallback (when S3_BUCKET is empty)
27
+
CACHE_DIR=./cache/sites
+2
apps/firehose-service/src/index.ts
+2
apps/firehose-service/src/index.ts
···
14
import { storage } from './lib/storage';
15
import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer';
16
import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker';
17
18
const app = new Hono();
19
···
41
42
stopFirehose();
43
await stopRevalidateWorker();
44
await closeDatabase();
45
46
console.log('[Service] Shutdown complete');
···
14
import { storage } from './lib/storage';
15
import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer';
16
import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker';
17
+
import { closeCacheInvalidationPublisher } from './lib/cache-invalidation';
18
19
const app = new Hono();
20
···
42
43
stopFirehose();
44
await stopRevalidateWorker();
45
+
await closeCacheInvalidationPublisher();
46
await closeDatabase();
47
48
console.log('[Service] Shutdown complete');
+62
apps/firehose-service/src/lib/cache-invalidation.ts
+62
apps/firehose-service/src/lib/cache-invalidation.ts
···
···
1
+
/**
2
+
* Cache invalidation publisher
3
+
*
4
+
* Publishes invalidation messages to Redis pub/sub so the hosting-service
5
+
* can clear its local caches (tiered storage, redirect rules) when a site
6
+
* is updated or deleted via the firehose.
7
+
*/
8
+
9
+
import Redis from 'ioredis';
10
+
import { config } from '../config';
11
+
12
+
const CHANNEL = 'wisp:cache-invalidate';
13
+
14
+
let publisher: Redis | null = null;
15
+
let loggedMissingRedis = false;
16
+
17
+
function getPublisher(): Redis | null {
18
+
if (!config.redisUrl) {
19
+
if (!loggedMissingRedis) {
20
+
console.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation publishing disabled');
21
+
loggedMissingRedis = true;
22
+
}
23
+
return null;
24
+
}
25
+
26
+
if (!publisher) {
27
+
publisher = new Redis(config.redisUrl, {
28
+
maxRetriesPerRequest: 2,
29
+
enableReadyCheck: true,
30
+
});
31
+
32
+
publisher.on('error', (err) => {
33
+
console.error('[CacheInvalidation] Redis error:', err);
34
+
});
35
+
}
36
+
37
+
return publisher;
38
+
}
39
+
40
+
export async function publishCacheInvalidation(
41
+
did: string,
42
+
rkey: string,
43
+
action: 'update' | 'delete' | 'settings'
44
+
): Promise<void> {
45
+
const redis = getPublisher();
46
+
if (!redis) return;
47
+
48
+
try {
49
+
const message = JSON.stringify({ did, rkey, action });
50
+
await redis.publish(CHANNEL, message);
51
+
} catch (err) {
52
+
console.error('[CacheInvalidation] Failed to publish:', err);
53
+
}
54
+
}
55
+
56
+
export async function closeCacheInvalidationPublisher(): Promise<void> {
57
+
if (publisher) {
58
+
const toClose = publisher;
59
+
publisher = null;
60
+
await toClose.quit();
61
+
}
62
+
}
+13
apps/firehose-service/src/lib/cache-writer.ts
+13
apps/firehose-service/src/lib/cache-writer.ts
···
15
import { getSiteCache, upsertSiteCache, deleteSiteCache, upsertSiteSettingsCache, deleteSiteSettingsCache } from './db';
16
import { rewriteHtmlPaths, isHtmlFile } from './html-rewriter';
17
import { gunzipSync } from 'zlib';
18
19
/**
20
* Fetch a site record from the PDS
···
549
await handleSettingsUpdate(did, rkey, settingsRecord.record, settingsRecord.cid);
550
}
551
552
console.log(`[Cache] Successfully cached site ${did}/${rkey}`);
553
}
554
···
569
// Delete from DB
570
await deleteSiteCache(did, rkey);
571
572
console.log(`[Cache] Deleted site ${did}/${rkey} (${keys.length} files)`);
573
}
574
···
586
cleanUrls: settings.cleanUrls,
587
headers: settings.headers,
588
});
589
}
590
591
/**
···
594
export async function handleSettingsDelete(did: string, rkey: string): Promise<void> {
595
console.log(`[Cache] Deleting settings for ${did}/${rkey}`);
596
await deleteSiteSettingsCache(did, rkey);
597
}
···
15
import { getSiteCache, upsertSiteCache, deleteSiteCache, upsertSiteSettingsCache, deleteSiteSettingsCache } from './db';
16
import { rewriteHtmlPaths, isHtmlFile } from './html-rewriter';
17
import { gunzipSync } from 'zlib';
18
+
import { publishCacheInvalidation } from './cache-invalidation';
19
20
/**
21
* Fetch a site record from the PDS
···
550
await handleSettingsUpdate(did, rkey, settingsRecord.record, settingsRecord.cid);
551
}
552
553
+
// Notify hosting-service to invalidate its local caches
554
+
await publishCacheInvalidation(did, rkey, 'update');
555
+
556
console.log(`[Cache] Successfully cached site ${did}/${rkey}`);
557
}
558
···
573
// Delete from DB
574
await deleteSiteCache(did, rkey);
575
576
+
// Notify hosting-service to invalidate its local caches
577
+
await publishCacheInvalidation(did, rkey, 'delete');
578
+
579
console.log(`[Cache] Deleted site ${did}/${rkey} (${keys.length} files)`);
580
}
581
···
593
cleanUrls: settings.cleanUrls,
594
headers: settings.headers,
595
});
596
+
597
+
// Notify hosting-service to invalidate its local caches (redirect rules depend on settings)
598
+
await publishCacheInvalidation(did, rkey, 'settings');
599
}
600
601
/**
···
604
export async function handleSettingsDelete(did: string, rkey: string): Promise<void> {
605
console.log(`[Cache] Deleting settings for ${did}/${rkey}`);
606
await deleteSiteSettingsCache(did, rkey);
607
+
608
+
// Notify hosting-service to invalidate its local caches
609
+
await publishCacheInvalidation(did, rkey, 'settings');
610
}
+3
-3
apps/firehose-service/src/lib/db.ts
+3
-3
apps/firehose-service/src/lib/db.ts
···
54
recordCid: string,
55
fileCids: Record<string, string>
56
): Promise<void> {
57
-
const fileCidsJson = fileCids ?? {};
58
console.log(`[DB] upsertSiteCache starting for ${did}/${rkey}`);
59
try {
60
await sql`
···
94
const directoryListing = settings.directoryListing ?? false;
95
const spaMode = settings.spaMode ?? null;
96
const custom404 = settings.custom404 ?? null;
97
-
const indexFilesJson = settings.indexFiles ?? [];
98
const cleanUrls = settings.cleanUrls ?? true;
99
-
const headersJson = settings.headers ?? [];
100
101
console.log(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, {
102
directoryListing,
···
54
recordCid: string,
55
fileCids: Record<string, string>
56
): Promise<void> {
57
+
const fileCidsJson = JSON.stringify(fileCids ?? {});
58
console.log(`[DB] upsertSiteCache starting for ${did}/${rkey}`);
59
try {
60
await sql`
···
94
const directoryListing = settings.directoryListing ?? false;
95
const spaMode = settings.spaMode ?? null;
96
const custom404 = settings.custom404 ?? null;
97
+
const indexFilesJson = JSON.stringify(settings.indexFiles ?? []);
98
const cleanUrls = settings.cleanUrls ?? true;
99
+
const headersJson = JSON.stringify(settings.headers ?? []);
100
101
console.log(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, {
102
directoryListing,
+4
-4
apps/firehose-service/src/lib/revalidate-worker.ts
+4
-4
apps/firehose-service/src/lib/revalidate-worker.ts
···
114
'GROUP',
115
config.revalidateGroup,
116
consumerName,
117
-
'BLOCK',
118
-
blockMs,
119
'COUNT',
120
batchSize,
121
'STREAMS',
122
config.revalidateStream,
123
'>'
124
-
);
125
126
if (!response) return;
127
128
for (const [, messages] of response) {
129
-
await processMessages(messages as Array<[string, string[]]>);
130
}
131
}
132
···
114
'GROUP',
115
config.revalidateGroup,
116
consumerName,
117
'COUNT',
118
batchSize,
119
+
'BLOCK',
120
+
blockMs,
121
'STREAMS',
122
config.revalidateStream,
123
'>'
124
+
) as [string, Array<[string, string[]]>][] | null;
125
126
if (!response) return;
127
128
for (const [, messages] of response) {
129
+
await processMessages(messages);
130
}
131
}
132
+18
apps/hosting-service/.env.example
+18
apps/hosting-service/.env.example
···
4
# Server
5
PORT=3001
6
BASE_HOST=wisp.place
7
+
8
+
# Redis (cache invalidation + revalidation queue)
9
+
REDIS_URL=redis://localhost:6379
10
+
11
+
# S3 Storage (leave empty for local disk fallback)
12
+
S3_BUCKET=
13
+
S3_METADATA_BUCKET=
14
+
S3_REGION=auto
15
+
S3_ENDPOINT=
16
+
S3_PREFIX=sites/
17
+
S3_FORCE_PATH_STYLE=true
18
+
19
+
# AWS Credentials (required if using S3)
20
+
AWS_ACCESS_KEY_ID=
21
+
AWS_SECRET_ACCESS_KEY=
22
+
23
+
# For local disk fallback (when S3_BUCKET is empty)
24
+
CACHE_DIR=./cache/sites
+6
apps/hosting-service/src/index.ts
+6
apps/hosting-service/src/index.ts
···
4
import { mkdirSync, existsSync } from 'fs';
5
import { startDomainCacheCleanup, stopDomainCacheCleanup, closeDatabase } from './lib/db';
6
import { closeRevalidateQueue } from './lib/revalidate-queue';
7
import { storage, getStorageConfig } from './lib/storage';
8
9
// Initialize Grafana exporters if configured
···
24
// Start domain cache cleanup
25
startDomainCacheCleanup();
26
27
// Optional: Bootstrap hot cache from warm tier on startup
28
const BOOTSTRAP_HOT_ON_STARTUP = process.env.BOOTSTRAP_HOT_ON_STARTUP === 'true';
29
const BOOTSTRAP_HOT_LIMIT = process.env.BOOTSTRAP_HOT_LIMIT ? parseInt(process.env.BOOTSTRAP_HOT_LIMIT) : 100;
···
80
process.on('SIGINT', async () => {
81
console.log('\n๐ Shutting down...');
82
stopDomainCacheCleanup();
83
await closeRevalidateQueue();
84
await closeDatabase();
85
server.close();
···
89
process.on('SIGTERM', async () => {
90
console.log('\n๐ Shutting down...');
91
stopDomainCacheCleanup();
92
await closeRevalidateQueue();
93
await closeDatabase();
94
server.close();
···
4
import { mkdirSync, existsSync } from 'fs';
5
import { startDomainCacheCleanup, stopDomainCacheCleanup, closeDatabase } from './lib/db';
6
import { closeRevalidateQueue } from './lib/revalidate-queue';
7
+
import { startCacheInvalidationSubscriber, stopCacheInvalidationSubscriber } from './lib/cache-invalidation';
8
import { storage, getStorageConfig } from './lib/storage';
9
10
// Initialize Grafana exporters if configured
···
25
// Start domain cache cleanup
26
startDomainCacheCleanup();
27
28
+
// Start cache invalidation subscriber (listens for firehose-service updates via Redis pub/sub)
29
+
startCacheInvalidationSubscriber();
30
+
31
// Optional: Bootstrap hot cache from warm tier on startup
32
const BOOTSTRAP_HOT_ON_STARTUP = process.env.BOOTSTRAP_HOT_ON_STARTUP === 'true';
33
const BOOTSTRAP_HOT_LIMIT = process.env.BOOTSTRAP_HOT_LIMIT ? parseInt(process.env.BOOTSTRAP_HOT_LIMIT) : 100;
···
84
process.on('SIGINT', async () => {
85
console.log('\n๐ Shutting down...');
86
stopDomainCacheCleanup();
87
+
await stopCacheInvalidationSubscriber();
88
await closeRevalidateQueue();
89
await closeDatabase();
90
server.close();
···
94
process.on('SIGTERM', async () => {
95
console.log('\n๐ Shutting down...');
96
stopDomainCacheCleanup();
97
+
await stopCacheInvalidationSubscriber();
98
await closeRevalidateQueue();
99
await closeDatabase();
100
server.close();
+75
apps/hosting-service/src/lib/cache-invalidation.ts
+75
apps/hosting-service/src/lib/cache-invalidation.ts
···
···
1
+
/**
2
+
* Cache invalidation subscriber
3
+
*
4
+
* Listens to Redis pub/sub for cache invalidation messages from the firehose-service.
5
+
* When a site is updated/deleted, clears the hosting-service's local caches
6
+
* (tiered storage hot+warm tiers, redirect rules) so stale data isn't served.
7
+
*/
8
+
9
+
import Redis from 'ioredis';
10
+
import { storage } from './storage';
11
+
import { clearRedirectRulesCache } from './site-cache';
12
+
13
+
const CHANNEL = 'wisp:cache-invalidate';
14
+
15
+
let subscriber: Redis | null = null;
16
+
17
+
export function startCacheInvalidationSubscriber(): void {
18
+
const redisUrl = process.env.REDIS_URL;
19
+
if (!redisUrl) {
20
+
console.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation disabled');
21
+
return;
22
+
}
23
+
24
+
subscriber = new Redis(redisUrl, {
25
+
maxRetriesPerRequest: 2,
26
+
enableReadyCheck: true,
27
+
});
28
+
29
+
subscriber.on('error', (err) => {
30
+
console.error('[CacheInvalidation] Redis error:', err);
31
+
});
32
+
33
+
subscriber.subscribe(CHANNEL, (err) => {
34
+
if (err) {
35
+
console.error('[CacheInvalidation] Failed to subscribe:', err);
36
+
} else {
37
+
console.log('[CacheInvalidation] Subscribed to', CHANNEL);
38
+
}
39
+
});
40
+
41
+
subscriber.on('message', async (_channel: string, message: string) => {
42
+
try {
43
+
const { did, rkey, action } = JSON.parse(message) as {
44
+
did: string;
45
+
rkey: string;
46
+
action: 'update' | 'delete' | 'settings';
47
+
};
48
+
49
+
if (!did || !rkey) {
50
+
console.warn('[CacheInvalidation] Invalid message:', message);
51
+
return;
52
+
}
53
+
54
+
console.log(`[CacheInvalidation] Invalidating ${did}/${rkey} (${action})`);
55
+
56
+
// Clear tiered storage (hot + warm) for this site
57
+
const prefix = `${did}/${rkey}/`;
58
+
const deleted = await storage.invalidate(prefix);
59
+
console.log(`[CacheInvalidation] Cleared ${deleted} keys from tiered storage for ${did}/${rkey}`);
60
+
61
+
// Clear redirect rules cache
62
+
clearRedirectRulesCache(did, rkey);
63
+
} catch (err) {
64
+
console.error('[CacheInvalidation] Error processing message:', err);
65
+
}
66
+
});
67
+
}
68
+
69
+
export async function stopCacheInvalidationSubscriber(): Promise<void> {
70
+
if (subscriber) {
71
+
const toClose = subscriber;
72
+
subscriber = null;
73
+
await toClose.quit();
74
+
}
75
+
}
+27
apps/hosting-service/src/lib/db.ts
+27
apps/hosting-service/src/lib/db.ts
···
119
console.log('[DB] Read-only mode: skipping upsertSite', { did, rkey, displayName });
120
}
121
122
+
/**
123
+
* Upsert site cache entry (used by on-demand caching when a site is completely missing)
124
+
*/
125
+
export async function upsertSiteCache(
126
+
did: string,
127
+
rkey: string,
128
+
recordCid: string,
129
+
fileCids: Record<string, string>
130
+
): Promise<void> {
131
+
const fileCidsJson = JSON.stringify(fileCids ?? {});
132
+
try {
133
+
await sql`
134
+
INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at)
135
+
VALUES (${did}, ${rkey}, ${recordCid}, ${fileCidsJson}::jsonb, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW()))
136
+
ON CONFLICT (did, rkey)
137
+
DO UPDATE SET
138
+
record_cid = EXCLUDED.record_cid,
139
+
file_cids = EXCLUDED.file_cids,
140
+
updated_at = EXTRACT(EPOCH FROM NOW())
141
+
`;
142
+
} catch (err) {
143
+
const error = err instanceof Error ? err : new Error(String(err));
144
+
console.error('[DB] upsertSiteCache error:', { did, rkey, error: error.message });
145
+
throw error;
146
+
}
147
+
}
148
+
149
export interface SiteRecord {
150
did: string;
151
rkey: string;
+27
-2
apps/hosting-service/src/lib/file-serving.ts
+27
-2
apps/hosting-service/src/lib/file-serving.ts
···
18
import { enqueueRevalidate } from './revalidate-queue';
19
import { recordStorageMiss } from './revalidate-metrics';
20
import { normalizeFileCids } from '@wispplace/fs-utils';
21
22
/**
23
* Helper to retrieve a file with metadata from tiered storage
···
91
rkey: string,
92
filePath: string,
93
preferRewrittenHtml: boolean
94
-
): Promise<{ result: Awaited<ReturnType<typeof storage.getWithMetadata>>; filePath: string } | null> {
95
const mimeTypeGuess = lookup(filePath) || 'application/octet-stream';
96
if (preferRewrittenHtml && isHtmlContent(filePath, mimeTypeGuess)) {
97
const rewrittenPath = `.rewritten/${filePath}`;
···
107
}
108
109
function buildResponseFromStorageResult(
110
-
result: Awaited<ReturnType<typeof storage.getWithMetadata>>,
111
filePath: string,
112
settings: WispSettings | null,
113
requestHeaders?: Record<string, string>
···
148
return new Response(content, { headers });
149
}
150
151
/**
152
* Helper to serve files from cache (for custom domains and subdomains)
153
*/
···
158
fullUrl?: string,
159
headers?: Record<string, string>
160
): Promise<Response> {
161
// Load settings for this site
162
const settings = await getCachedSettings(did, rkey);
163
const indexFiles = getIndexFiles(settings);
···
445
fullUrl?: string,
446
headers?: Record<string, string>
447
): Promise<Response> {
448
// Load settings for this site
449
const settings = await getCachedSettings(did, rkey);
450
const indexFiles = getIndexFiles(settings);
···
18
import { enqueueRevalidate } from './revalidate-queue';
19
import { recordStorageMiss } from './revalidate-metrics';
20
import { normalizeFileCids } from '@wispplace/fs-utils';
21
+
import { fetchAndCacheSite } from './on-demand-cache';
22
+
import type { StorageResult } from '@wispplace/tiered-storage';
23
+
24
+
type FileStorageResult = StorageResult<Uint8Array>;
25
26
/**
27
* Helper to retrieve a file with metadata from tiered storage
···
95
rkey: string,
96
filePath: string,
97
preferRewrittenHtml: boolean
98
+
): Promise<{ result: FileStorageResult; filePath: string } | null> {
99
const mimeTypeGuess = lookup(filePath) || 'application/octet-stream';
100
if (preferRewrittenHtml && isHtmlContent(filePath, mimeTypeGuess)) {
101
const rewrittenPath = `.rewritten/${filePath}`;
···
111
}
112
113
function buildResponseFromStorageResult(
114
+
result: FileStorageResult,
115
filePath: string,
116
settings: WispSettings | null,
117
requestHeaders?: Record<string, string>
···
152
return new Response(content, { headers });
153
}
154
155
+
/**
156
+
* Ensure a site is cached locally. If the site has no DB entry (completely unknown),
157
+
* attempt to fetch and cache it on-demand from the PDS.
158
+
*/
159
+
async function ensureSiteCached(did: string, rkey: string): Promise<void> {
160
+
const existing = await getSiteCache(did, rkey);
161
+
if (existing) return; // Site is known, proceed normally
162
+
163
+
// Site is completely unknown โ try on-demand fetch
164
+
console.log(`[FileServing] Site ${did}/${rkey} not in DB, attempting on-demand cache`);
165
+
await fetchAndCacheSite(did, rkey);
166
+
}
167
+
168
/**
169
* Helper to serve files from cache (for custom domains and subdomains)
170
*/
···
175
fullUrl?: string,
176
headers?: Record<string, string>
177
): Promise<Response> {
178
+
// Check if this site is completely unknown (not in DB, no files in storage)
179
+
// If so, attempt to fetch and cache it on-demand from the PDS
180
+
await ensureSiteCached(did, rkey);
181
+
182
// Load settings for this site
183
const settings = await getCachedSettings(did, rkey);
184
const indexFiles = getIndexFiles(settings);
···
466
fullUrl?: string,
467
headers?: Record<string, string>
468
): Promise<Response> {
469
+
// Check if this site is completely unknown (not in DB, no files in storage)
470
+
// If so, attempt to fetch and cache it on-demand from the PDS
471
+
await ensureSiteCached(did, rkey);
472
+
473
// Load settings for this site
474
const settings = await getCachedSettings(did, rkey);
475
const indexFiles = getIndexFiles(settings);
+259
apps/hosting-service/src/lib/on-demand-cache.ts
+259
apps/hosting-service/src/lib/on-demand-cache.ts
···
···
1
+
/**
2
+
* On-demand site caching for the hosting service
3
+
*
4
+
* When a request hits a site that is completely missing (no DB entry, no files),
5
+
* this module fetches the site record from the PDS, downloads all blobs,
6
+
* writes them to local storage (hot + warm tiers), and updates the DB.
7
+
*
8
+
* This gives immediate serving capability. A revalidate is also enqueued
9
+
* so the firehose-service backfills S3 (cold tier).
10
+
*/
11
+
12
+
import type { Record as WispFsRecord, Directory, Entry, File } from '@wispplace/lexicons/types/place/wisp/fs';
13
+
import { safeFetchJson, safeFetchBlob } from '@wispplace/safe-fetch';
14
+
import { extractBlobCid, getPdsForDid } from '@wispplace/atproto-utils';
15
+
import { shouldCompressMimeType } from '@wispplace/atproto-utils/compression';
16
+
import { collectFileCidsFromEntries, countFilesInDirectory } from '@wispplace/fs-utils';
17
+
import { MAX_BLOB_SIZE, MAX_FILE_COUNT, MAX_SITE_SIZE } from '@wispplace/constants';
18
+
import { expandSubfsNodes } from './utils';
19
+
import { storage } from './storage';
20
+
import { upsertSiteCache, tryAcquireLock, releaseLock } from './db';
21
+
import { enqueueRevalidate } from './revalidate-queue';
22
+
import { gunzipSync } from 'zlib';
23
+
24
+
// Track in-flight fetches to avoid duplicate work
25
+
const inFlightFetches = new Map<string, Promise<boolean>>();
26
+
27
+
interface FileInfo {
28
+
path: string;
29
+
cid: string;
30
+
blob: any;
31
+
encoding?: 'gzip';
32
+
mimeType?: string;
33
+
base64?: boolean;
34
+
}
35
+
36
+
/**
37
+
* Attempt to fetch and cache a completely missing site on-demand.
38
+
* Returns true if the site was successfully cached, false otherwise.
39
+
*
40
+
* Uses a distributed lock (pg advisory lock) to prevent multiple
41
+
* hosting-service instances from fetching the same site simultaneously.
42
+
*/
43
+
export async function fetchAndCacheSite(did: string, rkey: string): Promise<boolean> {
44
+
const key = `${did}:${rkey}`;
45
+
46
+
// Check if there's already an in-flight fetch for this site
47
+
const existing = inFlightFetches.get(key);
48
+
if (existing) {
49
+
return existing;
50
+
}
51
+
52
+
const fetchPromise = doFetchAndCache(did, rkey);
53
+
inFlightFetches.set(key, fetchPromise);
54
+
55
+
try {
56
+
return await fetchPromise;
57
+
} finally {
58
+
inFlightFetches.delete(key);
59
+
}
60
+
}
61
+
62
+
async function doFetchAndCache(did: string, rkey: string): Promise<boolean> {
63
+
const lockKey = `on-demand-cache:${did}:${rkey}`;
64
+
65
+
// Try to acquire a distributed lock
66
+
const acquired = await tryAcquireLock(lockKey);
67
+
if (!acquired) {
68
+
console.log(`[OnDemandCache] Lock not acquired for ${did}/${rkey}, another instance is handling it`);
69
+
return false;
70
+
}
71
+
72
+
try {
73
+
console.log(`[OnDemandCache] Fetching missing site ${did}/${rkey}`);
74
+
75
+
// Fetch site record from PDS
76
+
const pdsEndpoint = await getPdsForDid(did);
77
+
if (!pdsEndpoint) {
78
+
console.error(`[OnDemandCache] Could not resolve PDS for ${did}`);
79
+
return false;
80
+
}
81
+
82
+
const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(rkey)}`;
83
+
84
+
let data: any;
85
+
try {
86
+
data = await safeFetchJson(recordUrl);
87
+
} catch (err) {
88
+
const msg = err instanceof Error ? err.message : String(err);
89
+
if (msg.includes('HTTP 404') || msg.includes('Not Found')) {
90
+
console.log(`[OnDemandCache] Site record not found on PDS: ${did}/${rkey}`);
91
+
} else {
92
+
console.error(`[OnDemandCache] Failed to fetch site record: ${did}/${rkey}`, msg);
93
+
}
94
+
return false;
95
+
}
96
+
97
+
const record = data.value as WispFsRecord;
98
+
const recordCid = data.cid || '';
99
+
100
+
if (!record?.root?.entries) {
101
+
console.error(`[OnDemandCache] Invalid record structure for ${did}/${rkey}`);
102
+
return false;
103
+
}
104
+
105
+
// Expand subfs nodes
106
+
const expandedRoot = await expandSubfsNodes(record.root, pdsEndpoint);
107
+
108
+
// Validate limits
109
+
const fileCount = countFilesInDirectory(expandedRoot);
110
+
if (fileCount > MAX_FILE_COUNT) {
111
+
console.error(`[OnDemandCache] Site exceeds file limit: ${fileCount} > ${MAX_FILE_COUNT}`);
112
+
return false;
113
+
}
114
+
115
+
// Collect files
116
+
const files = collectFileInfo(expandedRoot.entries);
117
+
118
+
// Collect file CIDs for DB
119
+
const fileCids: Record<string, string> = {};
120
+
collectFileCidsFromEntries(expandedRoot.entries, '', fileCids);
121
+
122
+
// Download and write all files to local storage (hot + warm tiers)
123
+
const CONCURRENCY = 10;
124
+
let downloaded = 0;
125
+
let failed = 0;
126
+
127
+
for (let i = 0; i < files.length; i += CONCURRENCY) {
128
+
const batch = files.slice(i, i + CONCURRENCY);
129
+
const results = await Promise.allSettled(
130
+
batch.map(file => downloadAndWriteBlob(did, rkey, file, pdsEndpoint))
131
+
);
132
+
133
+
for (const result of results) {
134
+
if (result.status === 'fulfilled') {
135
+
downloaded++;
136
+
} else {
137
+
failed++;
138
+
console.error(`[OnDemandCache] Failed to download blob:`, result.reason);
139
+
}
140
+
}
141
+
}
142
+
143
+
console.log(`[OnDemandCache] Downloaded ${downloaded} files (${failed} failed) for ${did}/${rkey}`);
144
+
145
+
// Update DB with file CIDs so future storage misses can be detected
146
+
await upsertSiteCache(did, rkey, recordCid, fileCids);
147
+
148
+
// Enqueue revalidate so firehose-service backfills S3 (cold tier)
149
+
await enqueueRevalidate(did, rkey, `on-demand-cache`);
150
+
151
+
console.log(`[OnDemandCache] Successfully cached site ${did}/${rkey}`);
152
+
return downloaded > 0;
153
+
} catch (err) {
154
+
console.error(`[OnDemandCache] Error caching site ${did}/${rkey}:`, err);
155
+
return false;
156
+
} finally {
157
+
await releaseLock(lockKey);
158
+
}
159
+
}
160
+
161
+
function collectFileInfo(entries: Entry[], pathPrefix: string = ''): FileInfo[] {
162
+
const files: FileInfo[] = [];
163
+
164
+
for (const entry of entries) {
165
+
const currentPath = pathPrefix ? `${pathPrefix}/${entry.name}` : entry.name;
166
+
const node = entry.node;
167
+
168
+
if ('type' in node && node.type === 'directory' && 'entries' in node) {
169
+
files.push(...collectFileInfo(node.entries, currentPath));
170
+
} else if ('type' in node && node.type === 'file' && 'blob' in node) {
171
+
const fileNode = node as File;
172
+
const cid = extractBlobCid(fileNode.blob);
173
+
if (cid) {
174
+
files.push({
175
+
path: currentPath,
176
+
cid,
177
+
blob: fileNode.blob,
178
+
encoding: fileNode.encoding,
179
+
mimeType: fileNode.mimeType,
180
+
base64: fileNode.base64,
181
+
});
182
+
}
183
+
}
184
+
}
185
+
186
+
return files;
187
+
}
188
+
189
+
async function downloadAndWriteBlob(
190
+
did: string,
191
+
rkey: string,
192
+
file: FileInfo,
193
+
pdsEndpoint: string
194
+
): Promise<void> {
195
+
const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`;
196
+
197
+
let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 });
198
+
let encoding = file.encoding;
199
+
200
+
// Decode base64 if flagged
201
+
if (file.base64) {
202
+
const base64String = new TextDecoder().decode(content);
203
+
content = Buffer.from(base64String, 'base64');
204
+
}
205
+
206
+
// Decompress if needed and shouldn't stay compressed
207
+
const shouldStayCompressed = shouldCompressMimeType(file.mimeType);
208
+
209
+
if (encoding === 'gzip' && !shouldStayCompressed && content.length >= 2 &&
210
+
content[0] === 0x1f && content[1] === 0x8b) {
211
+
try {
212
+
content = gunzipSync(content);
213
+
encoding = undefined;
214
+
} catch {
215
+
// Keep gzipped if decompression fails
216
+
}
217
+
}
218
+
219
+
// If encoding is missing but data looks gzipped for a text-like file, mark it
220
+
if (!encoding && isTextLikeMime(file.mimeType, file.path) && content.length >= 2 &&
221
+
content[0] === 0x1f && content[1] === 0x8b) {
222
+
encoding = 'gzip';
223
+
}
224
+
225
+
// Build storage key and metadata
226
+
const key = `${did}/${rkey}/${file.path}`;
227
+
const metadata: Record<string, string> = {};
228
+
if (encoding) metadata.encoding = encoding;
229
+
if (file.mimeType) metadata.mimeType = file.mimeType;
230
+
231
+
// Write to hot + warm tiers only (cold/S3 is read-only in hosting-service,
232
+
// firehose-service will backfill via revalidate)
233
+
await storage.set(key as any, content as any, {
234
+
metadata,
235
+
skipTiers: [],
236
+
});
237
+
}
238
+
239
+
function isTextLikeMime(mimeType?: string, path?: string): boolean {
240
+
if (mimeType) {
241
+
if (mimeType === 'text/html') return true;
242
+
if (mimeType === 'text/css') return true;
243
+
if (mimeType === 'text/javascript') return true;
244
+
if (mimeType === 'application/javascript') return true;
245
+
if (mimeType === 'application/json') return true;
246
+
if (mimeType === 'application/xml') return true;
247
+
if (mimeType === 'image/svg+xml') return true;
248
+
}
249
+
250
+
if (!path) return false;
251
+
const lower = path.toLowerCase();
252
+
return lower.endsWith('.html') ||
253
+
lower.endsWith('.htm') ||
254
+
lower.endsWith('.css') ||
255
+
lower.endsWith('.js') ||
256
+
lower.endsWith('.json') ||
257
+
lower.endsWith('.xml') ||
258
+
lower.endsWith('.svg');
259
+
}
+1
-1
apps/hosting-service/src/lib/revalidate-queue.ts
+1
-1
apps/hosting-service/src/lib/revalidate-queue.ts
+1
-1
apps/hosting-service/src/server.ts
+1
-1
apps/hosting-service/src/server.ts
+15
docker-compose.yml
+15
docker-compose.yml
···
17
timeout: 5s
18
retries: 5
19
20
+
redis:
21
+
image: redis:7-alpine
22
+
container_name: wisp-redis
23
+
restart: unless-stopped
24
+
ports:
25
+
- "6379:6379"
26
+
volumes:
27
+
- redis_data:/data
28
+
healthcheck:
29
+
test: ["CMD", "redis-cli", "ping"]
30
+
interval: 5s
31
+
timeout: 5s
32
+
retries: 5
33
+
34
volumes:
35
postgres_data:
36
+
redis_data:
History
4 rounds
1 comment
nekomimi.pet
submitted
#3
7 commits
expand
collapse
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
integrate observability package across hosting and firehose services
Dockerfile
fix storage-miss revalidation loop and tier reporting
1/1 failed
expand
collapse
expand 1 comment
pull request successfully merged
nekomimi.pet
submitted
#2
5 commits
expand
collapse
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
integrate observability package across hosting and firehose services
1/1 failed
expand
collapse
expand 0 comments
nekomimi.pet
submitted
#1
4 commits
expand
collapse
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
1/1 failed
expand
collapse
expand 0 comments
nekomimi.pet
submitted
#0
1 commit
expand
collapse
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
this is what is life on us-east-1 right now. seems to be doing fine as of 2/6 10:38pm