actually forcing myself to develop good habits this year on my own projects because i deserve them
+27
apps/firehose-service/.env.example
+27
apps/firehose-service/.env.example
···
1
+
# Database
2
+
DATABASE_URL=postgres://postgres:postgres@localhost:5432/wisp
3
+
4
+
# Firehose
5
+
FIREHOSE_SERVICE=wss://bsky.network
6
+
FIREHOSE_MAX_CONCURRENCY=5
7
+
8
+
# Redis (cache invalidation + revalidation queue)
9
+
REDIS_URL=redis://localhost:6379
10
+
11
+
# S3 Storage (leave empty for local disk fallback)
12
+
S3_BUCKET=
13
+
S3_METADATA_BUCKET=
14
+
S3_REGION=auto
15
+
S3_ENDPOINT=
16
+
S3_PREFIX=sites/
17
+
S3_FORCE_PATH_STYLE=true
18
+
19
+
# AWS Credentials (required if using S3)
20
+
AWS_ACCESS_KEY_ID=
21
+
AWS_SECRET_ACCESS_KEY=
22
+
23
+
# Health check server
24
+
HEALTH_PORT=3001
25
+
26
+
# For local disk fallback (when S3_BUCKET is empty)
27
+
CACHE_DIR=./cache/sites
+2
apps/firehose-service/Dockerfile
+2
apps/firehose-service/Dockerfile
+1
apps/firehose-service/package.json
+1
apps/firehose-service/package.json
···
20
20
"@wispplace/database": "workspace:*",
21
21
"@wispplace/fs-utils": "workspace:*",
22
22
"@wispplace/lexicons": "workspace:*",
23
+
"@wispplace/observability": "workspace:*",
23
24
"@wispplace/safe-fetch": "workspace:*",
24
25
"@wispplace/tiered-storage": "workspace:*",
25
26
"hono": "^4.10.4",
+61
-16
apps/firehose-service/src/index.ts
+61
-16
apps/firehose-service/src/index.ts
···
14
14
import { storage } from './lib/storage';
15
15
import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer';
16
16
import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker';
17
+
import { closeCacheInvalidationPublisher } from './lib/cache-invalidation';
18
+
import { initializeGrafanaExporters, createLogger, logCollector, errorTracker, metricsCollector } from '@wispplace/observability';
19
+
import { observabilityMiddleware, observabilityErrorHandler } from '@wispplace/observability/middleware/hono';
20
+
21
+
// Initialize Grafana exporters if configured
22
+
initializeGrafanaExporters({
23
+
serviceName: 'firehose-service',
24
+
serviceVersion: '1.0.0'
25
+
});
26
+
27
+
const logger = createLogger('firehose-service');
17
28
18
29
const app = new Hono();
30
+
31
+
// Add observability middleware
32
+
app.use('*', observabilityMiddleware('firehose-service'));
33
+
34
+
// Error handler
35
+
app.onError(observabilityErrorHandler('firehose-service'));
19
36
20
37
// Health endpoint
21
38
app.get('/health', async (c) => {
···
37
54
if (isShuttingDown) return;
38
55
isShuttingDown = true;
39
56
40
-
console.log(`\n[Service] Received ${signal}, shutting down...`);
57
+
logger.info(`Received ${signal}, shutting down...`);
41
58
42
59
stopFirehose();
43
60
await stopRevalidateWorker();
61
+
await closeCacheInvalidationPublisher();
44
62
await closeDatabase();
45
63
46
-
console.log('[Service] Shutdown complete');
64
+
logger.info('Shutdown complete');
47
65
process.exit(0);
48
66
}
49
67
···
54
72
* Backfill mode - process existing sites from database
55
73
*/
56
74
async function runBackfill(): Promise<void> {
57
-
console.log('[Backfill] Starting backfill mode');
75
+
logger.info('Starting backfill mode');
58
76
const startTime = Date.now();
59
77
const forceRewriteHtml = process.env.BACKFILL_FORCE_REWRITE_HTML === 'true';
60
78
61
79
if (forceRewriteHtml) {
62
-
console.log('[Backfill] Forcing HTML rewrite for all sites');
80
+
logger.info('Forcing HTML rewrite for all sites');
63
81
}
64
82
65
83
let sites = await listAllSites();
66
84
if (sites.length === 0) {
67
85
const cachedSites = await listAllSiteCaches();
68
86
sites = cachedSites.map(site => ({ did: site.did, rkey: site.rkey }));
69
-
console.log('[Backfill] Sites table empty; falling back to site_cache entries');
87
+
logger.info('Sites table empty; falling back to site_cache entries');
70
88
}
71
89
72
-
console.log(`[Backfill] Found ${sites.length} sites in database`);
90
+
logger.info(`Found ${sites.length} sites in database`);
73
91
74
92
let processed = 0;
75
93
let skipped = 0;
···
81
99
const result = await fetchSiteRecord(site.did, site.rkey);
82
100
83
101
if (!result) {
84
-
console.log(`[Backfill] Site not found on PDS: ${site.did}/${site.rkey}`);
102
+
logger.info(`Site not found on PDS: ${site.did}/${site.rkey}`);
85
103
skipped++;
86
104
continue;
87
105
}
···
89
107
const existingCache = await getSiteCache(site.did, site.rkey);
90
108
// Check if CID matches (already up to date)
91
109
if (!forceRewriteHtml && existingCache && result.cid === existingCache.record_cid) {
92
-
console.log(`[Backfill] Site already up to date: ${site.did}/${site.rkey}`);
110
+
logger.info(`Site already up to date: ${site.did}/${site.rkey}`);
93
111
skipped++;
94
112
continue;
95
113
}
···
100
118
});
101
119
processed++;
102
120
103
-
console.log(`[Backfill] Progress: ${processed + skipped + failed}/${sites.length}`);
121
+
logger.info(`Progress: ${processed + skipped + failed}/${sites.length}`);
104
122
} catch (err) {
105
-
console.error(`[Backfill] Failed to process ${site.did}/${site.rkey}:`, err);
123
+
logger.error(`Failed to process ${site.did}/${site.rkey}`, err);
106
124
failed++;
107
125
}
108
126
}
···
113
131
const elapsedRemSec = elapsedSec % 60;
114
132
const elapsedLabel = elapsedMin > 0 ? `${elapsedMin}m ${elapsedRemSec}s` : `${elapsedSec}s`;
115
133
116
-
console.log(`[Backfill] Complete: ${processed} processed, ${skipped} skipped, ${failed} failed (${elapsedLabel} elapsed)`);
134
+
logger.info(`Complete: ${processed} processed, ${skipped} skipped, ${failed} failed (${elapsedLabel} elapsed)`);
117
135
}
118
136
137
+
// Internal observability endpoints (for admin panel)
138
+
app.get('/__internal__/observability/logs', (c) => {
139
+
const query = c.req.query();
140
+
const filter: any = {};
141
+
if (query.level) filter.level = query.level;
142
+
if (query.service) filter.service = query.service;
143
+
if (query.search) filter.search = query.search;
144
+
if (query.eventType) filter.eventType = query.eventType;
145
+
if (query.limit) filter.limit = parseInt(query.limit as string);
146
+
return c.json({ logs: logCollector.getLogs(filter) });
147
+
});
148
+
149
+
app.get('/__internal__/observability/errors', (c) => {
150
+
const query = c.req.query();
151
+
const filter: any = {};
152
+
if (query.service) filter.service = query.service;
153
+
if (query.limit) filter.limit = parseInt(query.limit as string);
154
+
return c.json({ errors: errorTracker.getErrors(filter) });
155
+
});
156
+
157
+
app.get('/__internal__/observability/metrics', (c) => {
158
+
const query = c.req.query();
159
+
const timeWindow = query.timeWindow ? parseInt(query.timeWindow as string) : 3600000;
160
+
const stats = metricsCollector.getStats('firehose-service', timeWindow);
161
+
return c.json({ stats, timeWindow });
162
+
});
163
+
119
164
// Main entry point
120
165
async function main() {
121
-
console.log('[Service] Starting firehose-service');
122
-
console.log(`[Service] Mode: ${config.isBackfill ? 'backfill' : 'firehose'}`);
123
-
console.log(`[Service] S3 Bucket: ${config.s3Bucket || '(disk fallback)'}`);
166
+
logger.info('Starting firehose-service');
167
+
logger.info(`Mode: ${config.isBackfill ? 'backfill' : 'firehose'}`);
168
+
logger.info(`S3 Bucket: ${config.s3Bucket || '(disk fallback)'}`);
124
169
125
170
// Start health server
126
171
const server = serve({
···
128
173
port: config.healthPort,
129
174
});
130
175
131
-
console.log(`[Service] Health endpoint: http://localhost:${config.healthPort}/health`);
176
+
logger.info(`Health endpoint: http://localhost:${config.healthPort}/health`);
132
177
133
178
if (config.isBackfill) {
134
179
// Run backfill and exit
···
143
188
}
144
189
145
190
main().catch((err) => {
146
-
console.error('[Service] Fatal error:', err);
191
+
logger.error('Fatal error', err);
147
192
process.exit(1);
148
193
});
+70
apps/firehose-service/src/lib/cache-invalidation.ts
+70
apps/firehose-service/src/lib/cache-invalidation.ts
···
1
+
/**
2
+
* Cache invalidation publisher
3
+
*
4
+
* Publishes invalidation messages to Redis pub/sub so the hosting-service
5
+
* can clear its local caches (tiered storage, redirect rules) when a site
6
+
* is updated or deleted via the firehose.
7
+
*/
8
+
9
+
import Redis from 'ioredis';
10
+
import { createLogger } from '@wispplace/observability';
11
+
import { config } from '../config';
12
+
13
+
const logger = createLogger('firehose-service');
14
+
const CHANNEL = 'wisp:cache-invalidate';
15
+
16
+
let publisher: Redis | null = null;
17
+
let loggedMissingRedis = false;
18
+
19
+
function getPublisher(): Redis | null {
20
+
if (!config.redisUrl) {
21
+
if (!loggedMissingRedis) {
22
+
logger.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation publishing disabled');
23
+
loggedMissingRedis = true;
24
+
}
25
+
return null;
26
+
}
27
+
28
+
if (!publisher) {
29
+
logger.info(`[CacheInvalidation] Connecting to Redis for publishing: ${config.redisUrl}`);
30
+
publisher = new Redis(config.redisUrl, {
31
+
maxRetriesPerRequest: 2,
32
+
enableReadyCheck: true,
33
+
});
34
+
35
+
publisher.on('error', (err) => {
36
+
logger.error('[CacheInvalidation] Redis error', err);
37
+
});
38
+
39
+
publisher.on('ready', () => {
40
+
logger.info('[CacheInvalidation] Redis publisher connected');
41
+
});
42
+
}
43
+
44
+
return publisher;
45
+
}
46
+
47
+
export async function publishCacheInvalidation(
48
+
did: string,
49
+
rkey: string,
50
+
action: 'update' | 'delete' | 'settings'
51
+
): Promise<void> {
52
+
const redis = getPublisher();
53
+
if (!redis) return;
54
+
55
+
try {
56
+
const message = JSON.stringify({ did, rkey, action });
57
+
logger.debug(`[CacheInvalidation] Publishing ${action} for ${did}/${rkey} to ${CHANNEL}`);
58
+
await redis.publish(CHANNEL, message);
59
+
} catch (err) {
60
+
logger.error('[CacheInvalidation] Failed to publish', err);
61
+
}
62
+
}
63
+
64
+
export async function closeCacheInvalidationPublisher(): Promise<void> {
65
+
if (publisher) {
66
+
const toClose = publisher;
67
+
publisher = null;
68
+
await toClose.quit();
69
+
}
70
+
}
+67
-33
apps/firehose-service/src/lib/cache-writer.ts
+67
-33
apps/firehose-service/src/lib/cache-writer.ts
···
11
11
import { collectFileCidsFromEntries, countFilesInDirectory, normalizeFileCids } from '@wispplace/fs-utils';
12
12
import { shouldCompressMimeType } from '@wispplace/atproto-utils/compression';
13
13
import { MAX_BLOB_SIZE, MAX_FILE_COUNT, MAX_SITE_SIZE } from '@wispplace/constants';
14
+
import { createLogger } from '@wispplace/observability';
14
15
import { writeFile, deleteFile, listFiles } from './storage';
15
16
import { getSiteCache, upsertSiteCache, deleteSiteCache, upsertSiteSettingsCache, deleteSiteSettingsCache } from './db';
16
17
import { rewriteHtmlPaths, isHtmlFile } from './html-rewriter';
17
18
import { gunzipSync } from 'zlib';
19
+
import { publishCacheInvalidation } from './cache-invalidation';
20
+
21
+
const logger = createLogger('firehose-service');
18
22
19
23
/**
20
24
* Fetch a site record from the PDS
···
23
27
try {
24
28
const pdsEndpoint = await getPdsForDid(did);
25
29
if (!pdsEndpoint) {
26
-
console.error('[Cache] Failed to get PDS endpoint for DID', { did, rkey });
30
+
logger.error('Failed to get PDS endpoint for DID', undefined, { did, rkey });
27
31
return null;
28
32
}
29
33
···
37
41
} catch (err) {
38
42
const errorMsg = err instanceof Error ? err.message : String(err);
39
43
if (errorMsg.includes('HTTP 404') || errorMsg.includes('Not Found')) {
40
-
console.log('[Cache] Site record not found', { did, rkey });
44
+
logger.info('Site record not found', { did, rkey });
41
45
} else {
42
-
console.error('[Cache] Failed to fetch site record', { did, rkey, error: errorMsg });
46
+
logger.error('Failed to fetch site record', err, { did, rkey });
43
47
}
44
48
return null;
45
49
}
···
56
60
try {
57
61
const endpoint = pdsEndpoint ?? await getPdsForDid(did);
58
62
if (!endpoint) {
59
-
console.error('[Cache] Failed to get PDS endpoint for DID (settings)', { did, rkey });
63
+
logger.error('Failed to get PDS endpoint for DID (settings)', undefined, { did, rkey });
60
64
return null;
61
65
}
62
66
···
70
74
} catch (err) {
71
75
const errorMsg = err instanceof Error ? err.message : String(err);
72
76
if (errorMsg.includes('HTTP 404') || errorMsg.includes('Not Found')) {
73
-
console.log('[Cache] Settings record not found', { did, rkey });
77
+
logger.info('Settings record not found', { did, rkey });
74
78
} else {
75
-
console.error('[Cache] Failed to fetch settings record', { did, rkey, error: errorMsg });
79
+
logger.error('Failed to fetch settings record', err, { did, rkey });
76
80
}
77
81
return null;
78
82
}
···
136
140
const MAX_DEPTH = 10;
137
141
138
142
if (depth >= MAX_DEPTH) {
139
-
console.error('[Cache] Max subfs expansion depth reached');
143
+
logger.error('Max subfs expansion depth reached');
140
144
return directory;
141
145
}
142
146
···
146
150
// Fetch uncached subfs records
147
151
const uncachedUris = subfsUris.filter(({ uri }) => !subfsCache.has(uri));
148
152
if (uncachedUris.length > 0) {
149
-
console.log(`[Cache] Fetching ${uncachedUris.length} subfs records (depth ${depth})`);
153
+
logger.info(`Fetching ${uncachedUris.length} subfs records`, { depth });
150
154
const fetchedRecords = await Promise.all(
151
155
uncachedUris.map(async ({ uri }) => {
152
156
const record = await fetchSubfsRecord(uri, pdsEndpoint);
···
341
345
): Promise<void> {
342
346
const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`;
343
347
344
-
console.log(`[Cache] Downloading ${file.path}`);
348
+
logger.debug(`Downloading ${file.path}`);
345
349
346
350
let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 });
347
351
let encoding = file.encoding;
···
355
359
// Heuristic fallback: some records omit base64 flag but content is base64 text
356
360
const decoded = tryDecodeBase64(content);
357
361
if (decoded) {
358
-
console.warn(`[Cache] Decoded base64 fallback for ${file.path} (base64 flag missing)`);
362
+
logger.warn(`Decoded base64 fallback for ${file.path} (base64 flag missing)`);
359
363
content = decoded;
360
364
}
361
365
}
···
369
373
content = gunzipSync(content);
370
374
encoding = undefined;
371
375
} catch (error) {
372
-
console.error(`[Cache] Failed to decompress ${file.path}, storing gzipped`);
376
+
logger.error(`Failed to decompress ${file.path}, storing gzipped`, error);
373
377
}
374
378
} else if (encoding === 'gzip' && content.length >= 2 &&
375
379
!(content[0] === 0x1f && content[1] === 0x8b)) {
376
380
// If marked gzip but doesn't look gzipped, attempt base64 decode and retry
377
381
const decoded = tryDecodeBase64(content);
378
382
if (decoded && decoded.length >= 2 && decoded[0] === 0x1f && decoded[1] === 0x8b) {
379
-
console.warn(`[Cache] Decoded base64+gzip fallback for ${file.path}`);
383
+
logger.warn(`Decoded base64+gzip fallback for ${file.path}`);
380
384
try {
381
385
content = gunzipSync(decoded);
382
386
encoding = undefined;
383
387
} catch (error) {
384
-
console.error(`[Cache] Failed to decompress base64+gzip fallback for ${file.path}, storing gzipped`);
388
+
logger.error(`Failed to decompress base64+gzip fallback for ${file.path}, storing gzipped`, error);
385
389
content = decoded;
386
390
}
387
391
}
···
412
416
try {
413
417
rewriteSource = gunzipSync(content);
414
418
} catch (error) {
415
-
console.error(`[Cache] Failed to decompress ${file.path} for rewrite, using raw content`);
419
+
logger.error(`Failed to decompress ${file.path} for rewrite, using raw content`, error);
416
420
}
417
421
}
418
422
···
422
426
423
427
const rewrittenKey = `${did}/${rkey}/.rewritten/${file.path}`;
424
428
await writeFile(rewrittenKey, rewrittenContent, { mimeType: 'text/html' });
425
-
console.log(`[Cache] Wrote rewritten HTML: ${rewrittenKey}`);
429
+
logger.debug(`Wrote rewritten HTML: ${rewrittenKey}`);
426
430
}
427
431
428
-
console.log(`[Cache] Stored ${file.path} (${content.length} bytes)`);
432
+
logger.debug(`Stored ${file.path} (${content.length} bytes)`);
429
433
}
430
434
431
435
/**
···
438
442
recordCid: string,
439
443
options?: {
440
444
forceRewriteHtml?: boolean;
445
+
skipInvalidation?: boolean;
446
+
forceDownload?: boolean;
441
447
}
442
448
): Promise<void> {
443
449
const forceRewriteHtml = options?.forceRewriteHtml === true;
444
-
console.log(`[Cache] Processing site ${did}/${rkey}, record CID: ${recordCid}`, {
450
+
const forceDownload = options?.forceDownload === true;
451
+
logger.info(`Processing site ${did}/${rkey}`, {
452
+
recordCid,
445
453
forceRewriteHtml,
454
+
forceDownload,
446
455
});
447
456
448
457
if (!record.root?.entries) {
449
-
console.error('[Cache] Invalid record structure');
458
+
logger.error('Invalid record structure');
450
459
return;
451
460
}
452
461
453
462
// Get PDS endpoint
454
463
const pdsEndpoint = await getPdsForDid(did);
455
464
if (!pdsEndpoint) {
456
-
console.error('[Cache] Could not resolve PDS for', did);
465
+
logger.error('Could not resolve PDS', undefined, { did });
457
466
return;
458
467
}
459
468
···
463
472
// Validate limits
464
473
const fileCount = countFilesInDirectory(expandedRoot);
465
474
if (fileCount > MAX_FILE_COUNT) {
466
-
console.error(`[Cache] Site exceeds file limit: ${fileCount} > ${MAX_FILE_COUNT}`);
475
+
logger.error(`Site exceeds file limit: ${fileCount} > ${MAX_FILE_COUNT}`);
467
476
return;
468
477
}
469
478
470
479
const totalSize = calculateTotalBlobSize(expandedRoot);
471
480
if (totalSize > MAX_SITE_SIZE) {
472
-
console.error(`[Cache] Site exceeds size limit: ${totalSize} > ${MAX_SITE_SIZE}`);
481
+
logger.error(`Site exceeds size limit: ${totalSize} > ${MAX_SITE_SIZE}`);
473
482
return;
474
483
}
475
484
···
483
492
const normalizedFileCids = normalizeFileCids(rawFileCids);
484
493
const oldFileCids = normalizedFileCids.value;
485
494
if (normalizedFileCids.source === 'string-invalid' || normalizedFileCids.source === 'other') {
486
-
console.warn('[Cache] Existing file_cids had unexpected shape; treating as empty', {
495
+
logger.warn('Existing file_cids had unexpected shape; treating as empty', {
487
496
did,
488
497
rkey,
489
498
type: Array.isArray(rawFileCids) ? 'array' : typeof rawFileCids,
···
498
507
// Find new or changed files
499
508
for (const file of newFiles) {
500
509
const shouldForceRewrite = forceRewriteHtml && isHtmlFile(file.path);
501
-
if (oldFileCids[file.path] !== file.cid || shouldForceRewrite) {
510
+
if (forceDownload || oldFileCids[file.path] !== file.cid || shouldForceRewrite) {
502
511
filesToDownload.push(file);
503
512
}
504
513
}
···
510
519
}
511
520
}
512
521
513
-
console.log(`[Cache] Files unchanged: ${newFiles.length - filesToDownload.length}, to download: ${filesToDownload.length}, to delete: ${pathsToDelete.length}`);
522
+
logger.info(`Files unchanged: ${newFiles.length - filesToDownload.length}, to download: ${filesToDownload.length}, to delete: ${pathsToDelete.length}`);
514
523
515
524
// Download new/changed files (with concurrency limit)
516
525
const DOWNLOAD_CONCURRENCY = 20;
···
539
548
}
540
549
541
550
// Update DB with new CIDs
542
-
console.log(`[Cache] About to upsert site cache for ${did}/${rkey}`);
551
+
logger.debug(`About to upsert site cache for ${did}/${rkey}`);
543
552
await upsertSiteCache(did, rkey, recordCid, newFileCids);
544
-
console.log(`[Cache] Updated site cache for ${did}/${rkey} with record CID ${recordCid}`);
553
+
logger.debug(`Updated site cache for ${did}/${rkey} with record CID ${recordCid}`);
545
554
546
555
// Backfill settings if a record exists for this rkey
547
556
const settingsRecord = await fetchSettingsRecord(did, rkey, pdsEndpoint);
548
557
if (settingsRecord) {
549
-
await handleSettingsUpdate(did, rkey, settingsRecord.record, settingsRecord.cid);
558
+
await handleSettingsUpdate(did, rkey, settingsRecord.record, settingsRecord.cid, {
559
+
skipInvalidation: options?.skipInvalidation,
560
+
});
550
561
}
551
562
552
-
console.log(`[Cache] Successfully cached site ${did}/${rkey}`);
563
+
// Notify hosting-service to invalidate its local caches
564
+
// (skip for revalidate/backfill since hosting-service already has the files locally)
565
+
if (!options?.skipInvalidation) {
566
+
await publishCacheInvalidation(did, rkey, 'update');
567
+
}
568
+
569
+
logger.info(`Successfully cached site ${did}/${rkey}`);
553
570
}
554
571
555
572
/**
556
573
* Handle a site delete event
557
574
*/
558
575
export async function handleSiteDelete(did: string, rkey: string): Promise<void> {
559
-
console.log(`[Cache] Deleting site ${did}/${rkey}`);
576
+
logger.info(`Deleting site ${did}/${rkey}`);
560
577
561
578
// List all files for this site and delete them
562
579
const prefix = `${did}/${rkey}/`;
···
569
586
// Delete from DB
570
587
await deleteSiteCache(did, rkey);
571
588
572
-
console.log(`[Cache] Deleted site ${did}/${rkey} (${keys.length} files)`);
589
+
// Notify hosting-service to invalidate its local caches
590
+
await publishCacheInvalidation(did, rkey, 'delete');
591
+
592
+
logger.info(`Deleted site ${did}/${rkey} (${keys.length} files)`);
573
593
}
574
594
575
595
/**
576
596
* Handle settings create/update event
577
597
*/
578
-
export async function handleSettingsUpdate(did: string, rkey: string, settings: WispSettings, recordCid: string): Promise<void> {
579
-
console.log(`[Cache] Updating settings for ${did}/${rkey}`);
598
+
export async function handleSettingsUpdate(
599
+
did: string,
600
+
rkey: string,
601
+
settings: WispSettings,
602
+
recordCid: string,
603
+
options?: { skipInvalidation?: boolean }
604
+
): Promise<void> {
605
+
logger.info(`Updating settings for ${did}/${rkey}`);
580
606
581
607
await upsertSiteSettingsCache(did, rkey, recordCid, {
582
608
directoryListing: settings.directoryListing,
···
586
612
cleanUrls: settings.cleanUrls,
587
613
headers: settings.headers,
588
614
});
615
+
616
+
// Notify hosting-service to invalidate its local caches (redirect rules depend on settings)
617
+
if (!options?.skipInvalidation) {
618
+
await publishCacheInvalidation(did, rkey, 'settings');
619
+
}
589
620
}
590
621
591
622
/**
592
623
* Handle settings delete event
593
624
*/
594
625
export async function handleSettingsDelete(did: string, rkey: string): Promise<void> {
595
-
console.log(`[Cache] Deleting settings for ${did}/${rkey}`);
626
+
logger.info(`Deleting settings for ${did}/${rkey}`);
596
627
await deleteSiteSettingsCache(did, rkey);
628
+
629
+
// Notify hosting-service to invalidate its local caches
630
+
await publishCacheInvalidation(did, rkey, 'settings');
597
631
}
+20
-19
apps/firehose-service/src/lib/db.ts
+20
-19
apps/firehose-service/src/lib/db.ts
···
1
1
import postgres from 'postgres';
2
2
import type { SiteCache, SiteRecord, SiteSettingsCache } from '@wispplace/database';
3
+
import { createLogger } from '@wispplace/observability';
3
4
import { config } from '../config';
5
+
6
+
const logger = createLogger('firehose-service');
4
7
5
8
const sql = postgres(config.databaseUrl, {
6
9
max: 10,
···
54
57
recordCid: string,
55
58
fileCids: Record<string, string>
56
59
): Promise<void> {
57
-
const fileCidsJson = fileCids ?? {};
58
-
console.log(`[DB] upsertSiteCache starting for ${did}/${rkey}`);
60
+
logger.debug(`[DB] upsertSiteCache starting for ${did}/${rkey}`);
59
61
try {
60
62
await sql`
61
63
INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at)
62
-
VALUES (${did}, ${rkey}, ${recordCid}, ${fileCidsJson}::jsonb, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW()))
64
+
VALUES (${did}, ${rkey}, ${recordCid}, ${sql.json(fileCids ?? {})}, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW()))
63
65
ON CONFLICT (did, rkey)
64
66
DO UPDATE SET
65
67
record_cid = EXCLUDED.record_cid,
66
68
file_cids = EXCLUDED.file_cids,
67
69
updated_at = EXTRACT(EPOCH FROM NOW())
68
70
`;
69
-
console.log(`[DB] upsertSiteCache completed for ${did}/${rkey}`);
71
+
logger.debug(`[DB] upsertSiteCache completed for ${did}/${rkey}`);
70
72
} catch (err) {
71
-
const error = err instanceof Error ? err : new Error(String(err));
72
-
console.error('[DB] upsertSiteCache error:', { did, rkey, error: error.message, stack: error.stack });
73
-
throw error;
73
+
logger.error('[DB] upsertSiteCache error', err, { did, rkey });
74
+
throw err;
74
75
}
75
76
}
76
77
···
94
95
const directoryListing = settings.directoryListing ?? false;
95
96
const spaMode = settings.spaMode ?? null;
96
97
const custom404 = settings.custom404 ?? null;
97
-
const indexFilesJson = settings.indexFiles ?? [];
98
98
const cleanUrls = settings.cleanUrls ?? true;
99
-
const headersJson = settings.headers ?? [];
99
+
100
+
const indexFiles = settings.indexFiles ?? [];
101
+
const headers = settings.headers ?? [];
100
102
101
-
console.log(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, {
103
+
logger.debug(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, {
102
104
directoryListing,
103
105
spaMode,
104
106
custom404,
105
-
indexFiles: indexFilesJson,
107
+
indexFiles,
106
108
cleanUrls,
107
-
headers: headersJson,
109
+
headers,
108
110
});
109
111
110
112
try {
···
117
119
${directoryListing},
118
120
${spaMode},
119
121
${custom404},
120
-
${indexFilesJson}::jsonb,
122
+
${sql.json(indexFiles)},
121
123
${cleanUrls},
122
-
${headersJson}::jsonb,
124
+
${sql.json(headers)},
123
125
EXTRACT(EPOCH FROM NOW()),
124
126
EXTRACT(EPOCH FROM NOW())
125
127
)
···
134
136
headers = EXCLUDED.headers,
135
137
updated_at = EXTRACT(EPOCH FROM NOW())
136
138
`;
137
-
console.log(`[DB] upsertSiteSettingsCache completed for ${did}/${rkey}`);
139
+
logger.debug(`[DB] upsertSiteSettingsCache completed for ${did}/${rkey}`);
138
140
} catch (err) {
139
-
const error = err instanceof Error ? err : new Error(String(err));
140
-
console.error('[DB] upsertSiteSettingsCache error:', { did, rkey, error: error.message, stack: error.stack });
141
-
throw error;
141
+
logger.error('[DB] upsertSiteSettingsCache error', err, { did, rkey });
142
+
throw err;
142
143
}
143
144
}
144
145
···
148
149
149
150
export async function closeDatabase(): Promise<void> {
150
151
await sql.end({ timeout: 5 });
151
-
console.log('[DB] Database connections closed');
152
+
logger.info('[DB] Database connections closed');
152
153
}
153
154
154
155
export { sql };
+19
-23
apps/firehose-service/src/lib/firehose.ts
+19
-23
apps/firehose-service/src/lib/firehose.ts
···
8
8
import { isBun, BunFirehose, type Event, type CommitEvt } from '@wispplace/bun-firehose';
9
9
import type { Record as WispFsRecord } from '@wispplace/lexicons/types/place/wisp/fs';
10
10
import type { Record as WispSettings } from '@wispplace/lexicons/types/place/wisp/settings';
11
+
import { createLogger } from '@wispplace/observability';
11
12
import { config } from '../config';
12
13
import {
13
14
handleSiteCreateOrUpdate,
···
18
19
} from './cache-writer';
19
20
20
21
const idResolver = new IdResolver();
22
+
const logger = createLogger('firehose-service');
21
23
22
24
// Track firehose health
23
25
let lastEventTime = Date.now();
···
70
72
const commitEvt = evt as CommitEvt;
71
73
const { did, collection, rkey, record, cid } = commitEvt;
72
74
73
-
console.log(`[Firehose] Debug: Event ${evt.event} for ${collection}:${did}/${rkey}, CID: ${cid?.toString() || 'unknown'}`);
75
+
logger.debug(`Event ${evt.event} for ${collection}:${did}/${rkey}`, { cid: cid?.toString() || 'unknown' });
74
76
75
77
// Handle place.wisp.fs events
76
78
if (collection === 'place.wisp.fs') {
77
-
console.log(`[Firehose] Received ${commitEvt.event} event for ${did}/${rkey}, CID: ${cid?.toString() || 'unknown'}`);
79
+
logger.info(`[place.wisp.fs] Received ${commitEvt.event} event`, { did, rkey, cid: cid?.toString() || 'unknown' });
78
80
processWithConcurrencyLimit(async () => {
79
81
try {
80
-
console.log(`[Firehose] Inside handler for ${commitEvt.event} event for ${did}/${rkey}`);
82
+
logger.debug(`[place.wisp.fs] Processing ${commitEvt.event} event`, { did, rkey });
81
83
if (commitEvt.event === 'delete') {
82
84
await handleSiteDelete(did, rkey);
83
85
} else {
···
87
89
if (verified) {
88
90
await handleSiteCreateOrUpdate(did, rkey, verified.record, verified.cid);
89
91
} else {
90
-
console.log(`[Firehose] Skipping ${commitEvt.event} event for ${did}/${rkey} - verification failed`);
92
+
logger.warn(`[place.wisp.fs] Skipping ${commitEvt.event} event - verification failed`, { did, rkey });
91
93
}
92
94
}
93
-
console.log(`[Firehose] Completed handler for ${commitEvt.event} event for ${did}/${rkey}`);
95
+
logger.debug(`[place.wisp.fs] Completed ${commitEvt.event} event`, { did, rkey });
94
96
} catch (err) {
95
-
const error = err instanceof Error ? err : new Error(String(err));
96
-
console.error('[Firehose] Error handling place.wisp.fs event:', { did, rkey, event: commitEvt.event, error: error.message, stack: error.stack });
97
+
logger.error(`[place.wisp.fs] Error handling event`, err, { did, rkey, event: commitEvt.event });
97
98
}
98
99
}).catch(err => {
99
-
const error = err instanceof Error ? err : new Error(String(err));
100
-
console.error('[Firehose] Error processing place.wisp.fs event:', { did, rkey, event: commitEvt.event, error: error.message, stack: error.stack });
100
+
logger.error(`[place.wisp.fs] Error processing event`, err, { did, rkey, event: commitEvt.event });
101
101
});
102
102
}
103
103
···
112
112
await handleSettingsUpdate(did, rkey, record as WispSettings, cidStr);
113
113
}
114
114
} catch (err) {
115
-
const error = err instanceof Error ? err : new Error(String(err));
116
-
console.error('[Firehose] Error handling place.wisp.settings event:', { did, rkey, event: commitEvt.event, error: error.message, stack: error.stack });
115
+
logger.error(`[place.wisp.settings] Error handling event`, err, { did, rkey, event: commitEvt.event });
117
116
}
118
117
}).catch(err => {
119
-
const error = err instanceof Error ? err : new Error(String(err));
120
-
console.error('[Firehose] Error processing place.wisp.settings event:', { did, rkey, event: commitEvt.event, error: error.message, stack: error.stack });
118
+
logger.error(`[place.wisp.settings] Error processing event`, err, { did, rkey, event: commitEvt.event });
121
119
});
122
120
}
123
121
} catch (err) {
124
-
const error = err instanceof Error ? err : new Error(String(err));
125
-
console.error('[Firehose] Unexpected error in handleEvent:', { error: error.message, stack: error.stack });
122
+
logger.error('Unexpected error in handleEvent', err);
126
123
}
127
124
}
128
125
129
126
function handleError(err: Error): void {
130
-
console.error('[Firehose] Error:', err);
131
-
console.error('[Firehose] Stack:', err.stack);
127
+
logger.error('Firehose connection error', err);
132
128
}
133
129
134
130
let firehoseHandle: { destroy: () => void } | null = null;
···
137
133
* Start the firehose worker
138
134
*/
139
135
export function startFirehose(): void {
140
-
console.log(`[Firehose] Starting (runtime: ${isBun ? 'Bun' : 'Node.js'})`);
141
-
console.log(`[Firehose] Service: ${config.firehoseService}`);
142
-
console.log(`[Firehose] Max concurrency: ${config.firehoseMaxConcurrency}`);
136
+
logger.info(`Starting firehose (runtime: ${isBun ? 'Bun' : 'Node.js'})`);
137
+
logger.info(`Service: ${config.firehoseService}`);
138
+
logger.info(`Max concurrency: ${config.firehoseMaxConcurrency}`);
143
139
144
140
isConnected = true;
145
141
···
169
165
170
166
// Log cache info hourly
171
167
setInterval(() => {
172
-
console.log('[Firehose] Hourly status check');
168
+
logger.info('Hourly status check');
173
169
}, 60 * 60 * 1000);
174
170
175
171
// Log status periodically
176
172
setInterval(() => {
177
173
const health = getFirehoseHealth();
178
174
if (health.timeSinceLastEvent > 30000) {
179
-
console.log(`[Firehose] No events for ${Math.round(health.timeSinceLastEvent / 1000)}s`);
175
+
logger.warn(`No events for ${Math.round(health.timeSinceLastEvent / 1000)}s`);
180
176
}
181
177
}, 30000);
182
178
}
···
185
181
* Stop the firehose worker
186
182
*/
187
183
export function stopFirehose(): void {
188
-
console.log('[Firehose] Stopping');
184
+
logger.info('Stopping firehose');
189
185
isConnected = false;
190
186
firehoseHandle?.destroy();
191
187
firehoseHandle = null;
+26
-14
apps/firehose-service/src/lib/revalidate-worker.ts
+26
-14
apps/firehose-service/src/lib/revalidate-worker.ts
···
1
1
import Redis from 'ioredis';
2
2
import os from 'os';
3
+
import { createLogger } from '@wispplace/observability';
3
4
import { config } from '../config';
4
5
import { fetchSiteRecord, handleSiteCreateOrUpdate } from './cache-writer';
5
6
7
+
const logger = createLogger('firehose-service');
6
8
const consumerName = process.env.WISP_REVALIDATE_CONSUMER || `${os.hostname()}:${process.pid}`;
7
9
const batchSize = Number.parseInt(process.env.WISP_REVALIDATE_BATCH_SIZE || '10', 10);
8
10
const claimIdleMs = Number.parseInt(process.env.WISP_REVALIDATE_CLAIM_IDLE_MS || '60000', 10);
···
33
35
const reason = fields.reason || 'storage-miss';
34
36
35
37
if (!did || !rkey) {
36
-
console.warn('[Revalidate] Missing did/rkey in message', { id, fields });
38
+
logger.warn('[Revalidate] Missing did/rkey in message', { id, fields });
37
39
await redis.xack(config.revalidateStream, config.revalidateGroup, id);
38
40
return;
39
41
}
40
42
41
-
console.log('[Revalidate] Processing', { did, rkey, reason, id });
43
+
logger.info(`[Revalidate] Received message ${id}: ${did}/${rkey} (${reason})`);
42
44
43
45
const record = await fetchSiteRecord(did, rkey);
44
46
if (!record) {
45
-
console.warn('[Revalidate] Site record not found', { did, rkey });
47
+
logger.warn(`[Revalidate] Site record not found on PDS: ${did}/${rkey}`);
46
48
await redis.xack(config.revalidateStream, config.revalidateGroup, id);
47
49
return;
48
50
}
49
51
50
-
await handleSiteCreateOrUpdate(did, rkey, record.record, record.cid);
52
+
// For storage-miss events, force re-download all files since storage is empty
53
+
const forceDownload = reason.startsWith('storage-miss');
51
54
55
+
await handleSiteCreateOrUpdate(did, rkey, record.record, record.cid, {
56
+
skipInvalidation: true,
57
+
forceDownload,
58
+
});
59
+
60
+
logger.info(`[Revalidate] Completed ${id}: ${did}/${rkey}`);
52
61
await redis.xack(config.revalidateStream, config.revalidateGroup, id);
53
62
}
54
63
···
57
66
try {
58
67
await processMessage(id, rawFields);
59
68
} catch (err) {
60
-
const error = err instanceof Error ? err : new Error(String(err));
61
-
console.error('[Revalidate] Failed to process message', { id, error: error.message, stack: error.stack });
69
+
logger.error('[Revalidate] Failed to process message', err, { id });
62
70
}
63
71
}
64
72
}
···
114
122
'GROUP',
115
123
config.revalidateGroup,
116
124
consumerName,
125
+
'COUNT',
126
+
batchSize,
117
127
'BLOCK',
118
128
blockMs,
119
-
'COUNT',
120
-
batchSize,
121
129
'STREAMS',
122
130
config.revalidateStream,
123
131
'>'
124
-
);
132
+
) as [string, Array<[string, string[]]>][] | null;
125
133
126
134
if (!response) return;
127
135
128
136
for (const [, messages] of response) {
129
-
await processMessages(messages as Array<[string, string[]]>);
137
+
await processMessages(messages);
130
138
}
131
139
}
132
140
···
140
148
await claimStaleMessages();
141
149
await readNewMessages();
142
150
} catch (err) {
143
-
const error = err instanceof Error ? err : new Error(String(err));
144
-
console.error('[Revalidate] Loop error', { error: error.message, stack: error.stack });
151
+
logger.error('[Revalidate] Loop error', err);
145
152
await new Promise((resolve) => setTimeout(resolve, 1000));
146
153
}
147
154
}
···
149
156
150
157
export async function startRevalidateWorker(): Promise<void> {
151
158
if (!config.redisUrl) {
152
-
console.warn('[Revalidate] REDIS_URL not set; revalidate worker disabled');
159
+
logger.warn('[Revalidate] REDIS_URL not set; revalidate worker disabled');
153
160
return;
154
161
}
155
162
156
163
if (running) return;
157
164
165
+
logger.info(`[Revalidate] Connecting to Redis: ${config.redisUrl}`);
158
166
redis = new Redis(config.redisUrl, {
159
167
maxRetriesPerRequest: 2,
160
168
enableReadyCheck: true,
161
169
});
162
170
163
171
redis.on('error', (err) => {
164
-
console.error('[Revalidate] Redis error:', err);
172
+
logger.error('[Revalidate] Redis error', err);
173
+
});
174
+
175
+
redis.on('ready', () => {
176
+
logger.info(`[Revalidate] Redis connected, stream: ${config.revalidateStream}, group: ${config.revalidateGroup}`);
165
177
});
166
178
167
179
running = true;
+5
-2
apps/firehose-service/src/lib/storage.ts
+5
-2
apps/firehose-service/src/lib/storage.ts
···
8
8
S3StorageTier,
9
9
DiskStorageTier,
10
10
} from '@wispplace/tiered-storage';
11
+
import { createLogger } from '@wispplace/observability';
11
12
import { config } from '../config';
13
+
14
+
const logger = createLogger('firehose-service');
12
15
13
16
// Create S3 tier (or fallback to disk for local dev)
14
17
let coldTier: S3StorageTier | DiskStorageTier;
···
28
31
forcePathStyle: config.s3ForcePathStyle,
29
32
metadataBucket: config.s3MetadataBucket,
30
33
});
31
-
console.log('[Storage] Using S3 cold tier:', config.s3Bucket);
34
+
logger.info('[Storage] Using S3 cold tier:', { bucket: config.s3Bucket });
32
35
} else {
33
36
// Fallback to disk for local development
34
37
const cacheDir = process.env.CACHE_DIR || './cache/sites';
···
38
41
evictionPolicy: 'lru',
39
42
encodeColons: false,
40
43
});
41
-
console.log('[Storage] Using disk fallback (no S3_BUCKET configured):', cacheDir);
44
+
logger.info('[Storage] Using disk fallback (no S3_BUCKET configured):', { cacheDir });
42
45
}
43
46
44
47
// Identity serializers for raw binary data (no JSON transformation)
+20
-1
apps/hosting-service/.env.example
+20
-1
apps/hosting-service/.env.example
···
3
3
4
4
# Server
5
5
PORT=3001
6
-
BASE_HOST=wisp.place
6
+
# Base domain (e.g., "localhost" for sites.localhost, "wisp.place" for sites.wisp.place)
7
+
BASE_HOST=localhost
8
+
9
+
# Redis (cache invalidation + revalidation queue)
10
+
REDIS_URL=redis://localhost:6379
11
+
12
+
# S3 Storage (leave empty for local disk fallback)
13
+
S3_BUCKET=
14
+
S3_METADATA_BUCKET=
15
+
S3_REGION=auto
16
+
S3_ENDPOINT=
17
+
S3_PREFIX=sites/
18
+
S3_FORCE_PATH_STYLE=true
19
+
20
+
# AWS Credentials (required if using S3)
21
+
AWS_ACCESS_KEY_ID=
22
+
AWS_SECRET_ACCESS_KEY=
23
+
24
+
# For local disk fallback (when S3_BUCKET is empty)
25
+
CACHE_DIR=./cache/sites
+15
-7
apps/hosting-service/src/index.ts
+15
-7
apps/hosting-service/src/index.ts
···
1
1
import app from './server';
2
2
import { serve } from '@hono/node-server';
3
-
import { initializeGrafanaExporters } from '@wispplace/observability';
3
+
import { initializeGrafanaExporters, createLogger } from '@wispplace/observability';
4
4
import { mkdirSync, existsSync } from 'fs';
5
5
import { startDomainCacheCleanup, stopDomainCacheCleanup, closeDatabase } from './lib/db';
6
6
import { closeRevalidateQueue } from './lib/revalidate-queue';
7
+
import { startCacheInvalidationSubscriber, stopCacheInvalidationSubscriber } from './lib/cache-invalidation';
7
8
import { storage, getStorageConfig } from './lib/storage';
9
+
10
+
const logger = createLogger('hosting-service');
8
11
9
12
// Initialize Grafana exporters if configured
10
13
initializeGrafanaExporters({
···
18
21
// Ensure cache directory exists
19
22
if (!existsSync(CACHE_DIR)) {
20
23
mkdirSync(CACHE_DIR, { recursive: true });
21
-
console.log('Created cache directory:', CACHE_DIR);
24
+
logger.info('Created cache directory', { CACHE_DIR });
22
25
}
23
26
24
27
// Start domain cache cleanup
25
28
startDomainCacheCleanup();
26
29
30
+
// Start cache invalidation subscriber (listens for firehose-service updates via Redis pub/sub)
31
+
startCacheInvalidationSubscriber();
32
+
27
33
// Optional: Bootstrap hot cache from warm tier on startup
28
34
const BOOTSTRAP_HOT_ON_STARTUP = process.env.BOOTSTRAP_HOT_ON_STARTUP === 'true';
29
35
const BOOTSTRAP_HOT_LIMIT = process.env.BOOTSTRAP_HOT_LIMIT ? parseInt(process.env.BOOTSTRAP_HOT_LIMIT) : 100;
30
36
31
37
if (BOOTSTRAP_HOT_ON_STARTUP) {
32
-
console.log(`🔥 Bootstrapping hot cache (top ${BOOTSTRAP_HOT_LIMIT} items)...`);
38
+
logger.info(`Bootstrapping hot cache (top ${BOOTSTRAP_HOT_LIMIT} items)...`);
33
39
storage.bootstrapHot(BOOTSTRAP_HOT_LIMIT)
34
40
.then((loaded: number) => {
35
-
console.log(`✅ Bootstrapped ${loaded} items into hot cache`);
41
+
logger.info(`Bootstrapped ${loaded} items into hot cache`);
36
42
})
37
43
.catch((err: unknown) => {
38
-
console.error('❌ Hot cache bootstrap error:', err);
44
+
logger.error('Hot cache bootstrap error', err);
39
45
});
40
46
}
41
47
···
78
84
79
85
// Graceful shutdown
80
86
process.on('SIGINT', async () => {
81
-
console.log('\n🛑 Shutting down...');
87
+
logger.info('Shutting down...');
82
88
stopDomainCacheCleanup();
89
+
await stopCacheInvalidationSubscriber();
83
90
await closeRevalidateQueue();
84
91
await closeDatabase();
85
92
server.close();
···
87
94
});
88
95
89
96
process.on('SIGTERM', async () => {
90
-
console.log('\n🛑 Shutting down...');
97
+
logger.info('Shutting down...');
91
98
stopDomainCacheCleanup();
99
+
await stopCacheInvalidationSubscriber();
92
100
await closeRevalidateQueue();
93
101
await closeDatabase();
94
102
server.close();
+80
apps/hosting-service/src/lib/cache-invalidation.ts
+80
apps/hosting-service/src/lib/cache-invalidation.ts
···
1
+
/**
2
+
* Cache invalidation subscriber
3
+
*
4
+
* Listens to Redis pub/sub for cache invalidation messages from the firehose-service.
5
+
* When a site is updated/deleted, clears the hosting-service's local caches
6
+
* (tiered storage hot+warm tiers, redirect rules) so stale data isn't served.
7
+
*/
8
+
9
+
import Redis from 'ioredis';
10
+
import { storage } from './storage';
11
+
import { clearRedirectRulesCache } from './site-cache';
12
+
13
+
const CHANNEL = 'wisp:cache-invalidate';
14
+
15
+
let subscriber: Redis | null = null;
16
+
17
+
export function startCacheInvalidationSubscriber(): void {
18
+
const redisUrl = process.env.REDIS_URL;
19
+
if (!redisUrl) {
20
+
console.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation disabled');
21
+
return;
22
+
}
23
+
24
+
console.log(`[CacheInvalidation] Connecting to Redis for subscribing: ${redisUrl}`);
25
+
subscriber = new Redis(redisUrl, {
26
+
maxRetriesPerRequest: 2,
27
+
enableReadyCheck: true,
28
+
});
29
+
30
+
subscriber.on('error', (err) => {
31
+
console.error('[CacheInvalidation] Redis error:', err);
32
+
});
33
+
34
+
subscriber.on('ready', () => {
35
+
console.log('[CacheInvalidation] Redis subscriber connected');
36
+
});
37
+
38
+
subscriber.subscribe(CHANNEL, (err) => {
39
+
if (err) {
40
+
console.error('[CacheInvalidation] Failed to subscribe:', err);
41
+
} else {
42
+
console.log('[CacheInvalidation] Subscribed to', CHANNEL);
43
+
}
44
+
});
45
+
46
+
subscriber.on('message', async (_channel: string, message: string) => {
47
+
try {
48
+
const { did, rkey, action } = JSON.parse(message) as {
49
+
did: string;
50
+
rkey: string;
51
+
action: 'update' | 'delete' | 'settings';
52
+
};
53
+
54
+
if (!did || !rkey) {
55
+
console.warn('[CacheInvalidation] Invalid message:', message);
56
+
return;
57
+
}
58
+
59
+
console.log(`[CacheInvalidation] Invalidating ${did}/${rkey} (${action})`);
60
+
61
+
// Clear tiered storage (hot + warm) for this site
62
+
const prefix = `${did}/${rkey}/`;
63
+
const deleted = await storage.invalidate(prefix);
64
+
console.log(`[CacheInvalidation] Cleared ${deleted} keys from tiered storage for ${did}/${rkey}`);
65
+
66
+
// Clear redirect rules cache
67
+
clearRedirectRulesCache(did, rkey);
68
+
} catch (err) {
69
+
console.error('[CacheInvalidation] Error processing message:', err);
70
+
}
71
+
});
72
+
}
73
+
74
+
export async function stopCacheInvalidationSubscriber(): Promise<void> {
75
+
if (subscriber) {
76
+
const toClose = subscriber;
77
+
subscriber = null;
78
+
await toClose.quit();
79
+
}
80
+
}
+26
apps/hosting-service/src/lib/db.ts
+26
apps/hosting-service/src/lib/db.ts
···
119
119
console.log('[DB] Read-only mode: skipping upsertSite', { did, rkey, displayName });
120
120
}
121
121
122
+
/**
123
+
* Upsert site cache entry (used by on-demand caching when a site is completely missing)
124
+
*/
125
+
export async function upsertSiteCache(
126
+
did: string,
127
+
rkey: string,
128
+
recordCid: string,
129
+
fileCids: Record<string, string>
130
+
): Promise<void> {
131
+
try {
132
+
await sql`
133
+
INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at)
134
+
VALUES (${did}, ${rkey}, ${recordCid}, ${sql.json(fileCids ?? {})}, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW()))
135
+
ON CONFLICT (did, rkey)
136
+
DO UPDATE SET
137
+
record_cid = EXCLUDED.record_cid,
138
+
file_cids = EXCLUDED.file_cids,
139
+
updated_at = EXTRACT(EPOCH FROM NOW())
140
+
`;
141
+
} catch (err) {
142
+
const error = err instanceof Error ? err : new Error(String(err));
143
+
console.error('[DB] upsertSiteCache error:', { did, rkey, error: error.message });
144
+
throw error;
145
+
}
146
+
}
147
+
122
148
export interface SiteRecord {
123
149
did: string;
124
150
rkey: string;
+46
-7
apps/hosting-service/src/lib/file-serving.ts
+46
-7
apps/hosting-service/src/lib/file-serving.ts
···
18
18
import { enqueueRevalidate } from './revalidate-queue';
19
19
import { recordStorageMiss } from './revalidate-metrics';
20
20
import { normalizeFileCids } from '@wispplace/fs-utils';
21
+
import { fetchAndCacheSite } from './on-demand-cache';
22
+
import type { StorageResult } from '@wispplace/tiered-storage';
23
+
24
+
type FileStorageResult = StorageResult<Uint8Array>;
21
25
22
26
/**
23
27
* Helper to retrieve a file with metadata from tiered storage
···
28
32
const result = await storage.getWithMetadata(key);
29
33
30
34
if (result) {
31
-
const metadata = result.metadata;
32
-
const tier =
33
-
metadata && typeof metadata === 'object' && 'tier' in metadata
34
-
? String((metadata as Record<string, unknown>).tier)
35
-
: 'unknown';
35
+
const tier = result.source || 'unknown';
36
36
const size = result.data ? (result.data as Uint8Array).length : 0;
37
37
console.log(`[Storage] Served ${filePath} from ${tier} tier (${size} bytes) - ${did}:${rkey}`);
38
38
}
···
91
91
rkey: string,
92
92
filePath: string,
93
93
preferRewrittenHtml: boolean
94
-
): Promise<{ result: Awaited<ReturnType<typeof storage.getWithMetadata>>; filePath: string } | null> {
94
+
): Promise<{ result: FileStorageResult; filePath: string } | null> {
95
95
const mimeTypeGuess = lookup(filePath) || 'application/octet-stream';
96
96
if (preferRewrittenHtml && isHtmlContent(filePath, mimeTypeGuess)) {
97
97
const rewrittenPath = `.rewritten/${filePath}`;
···
107
107
}
108
108
109
109
function buildResponseFromStorageResult(
110
-
result: Awaited<ReturnType<typeof storage.getWithMetadata>>,
110
+
result: FileStorageResult,
111
111
filePath: string,
112
112
settings: WispSettings | null,
113
113
requestHeaders?: Record<string, string>
···
149
149
}
150
150
151
151
/**
152
+
* Ensure a site is cached locally. If the site has no DB entry (completely unknown),
153
+
* attempt to fetch and cache it on-demand from the PDS.
154
+
*/
155
+
async function ensureSiteCached(did: string, rkey: string): Promise<void> {
156
+
const existing = await getSiteCache(did, rkey);
157
+
if (existing) {
158
+
// Site is in DB — check if any files actually exist in storage
159
+
const prefix = `${did}/${rkey}/`;
160
+
const hasFiles = await storage.exists(prefix.slice(0, -1)) ||
161
+
await checkAnyFileExists(did, rkey, existing.file_cids);
162
+
if (hasFiles) {
163
+
return;
164
+
}
165
+
console.log(`[FileServing] Site ${did}/${rkey} in DB but no files in storage, re-fetching`);
166
+
} else {
167
+
console.log(`[FileServing] Site ${did}/${rkey} not in DB, attempting on-demand cache`);
168
+
}
169
+
170
+
const success = await fetchAndCacheSite(did, rkey);
171
+
console.log(`[FileServing] On-demand cache for ${did}/${rkey}: ${success ? 'success' : 'failed'}`);
172
+
}
173
+
174
+
async function checkAnyFileExists(did: string, rkey: string, fileCids: unknown): Promise<boolean> {
175
+
if (!fileCids || typeof fileCids !== 'object') return false;
176
+
const cids = fileCids as Record<string, string>;
177
+
const firstFile = Object.keys(cids)[0];
178
+
if (!firstFile) return false;
179
+
return storage.exists(`${did}/${rkey}/${firstFile}`);
180
+
}
181
+
182
+
/**
152
183
* Helper to serve files from cache (for custom domains and subdomains)
153
184
*/
154
185
export async function serveFromCache(
···
158
189
fullUrl?: string,
159
190
headers?: Record<string, string>
160
191
): Promise<Response> {
192
+
// Check if this site is completely unknown (not in DB, no files in storage)
193
+
// If so, attempt to fetch and cache it on-demand from the PDS
194
+
await ensureSiteCached(did, rkey);
195
+
161
196
// Load settings for this site
162
197
const settings = await getCachedSettings(did, rkey);
163
198
const indexFiles = getIndexFiles(settings);
···
445
480
fullUrl?: string,
446
481
headers?: Record<string, string>
447
482
): Promise<Response> {
483
+
// Check if this site is completely unknown (not in DB, no files in storage)
484
+
// If so, attempt to fetch and cache it on-demand from the PDS
485
+
await ensureSiteCached(did, rkey);
486
+
448
487
// Load settings for this site
449
488
const settings = await getCachedSettings(did, rkey);
450
489
const indexFiles = getIndexFiles(settings);
+259
apps/hosting-service/src/lib/on-demand-cache.ts
+259
apps/hosting-service/src/lib/on-demand-cache.ts
···
1
+
/**
2
+
* On-demand site caching for the hosting service
3
+
*
4
+
* When a request hits a site that is completely missing (no DB entry, no files),
5
+
* this module fetches the site record from the PDS, downloads all blobs,
6
+
* writes them to local storage (hot + warm tiers), and updates the DB.
7
+
*
8
+
* This gives immediate serving capability. A revalidate is also enqueued
9
+
* so the firehose-service backfills S3 (cold tier).
10
+
*/
11
+
12
+
import type { Record as WispFsRecord, Directory, Entry, File } from '@wispplace/lexicons/types/place/wisp/fs';
13
+
import { safeFetchJson, safeFetchBlob } from '@wispplace/safe-fetch';
14
+
import { extractBlobCid, getPdsForDid } from '@wispplace/atproto-utils';
15
+
import { shouldCompressMimeType } from '@wispplace/atproto-utils/compression';
16
+
import { collectFileCidsFromEntries, countFilesInDirectory } from '@wispplace/fs-utils';
17
+
import { MAX_BLOB_SIZE, MAX_FILE_COUNT, MAX_SITE_SIZE } from '@wispplace/constants';
18
+
import { expandSubfsNodes } from './utils';
19
+
import { storage } from './storage';
20
+
import { upsertSiteCache, tryAcquireLock, releaseLock } from './db';
21
+
import { enqueueRevalidate } from './revalidate-queue';
22
+
import { gunzipSync } from 'zlib';
23
+
24
+
// Track in-flight fetches to avoid duplicate work
25
+
const inFlightFetches = new Map<string, Promise<boolean>>();
26
+
27
+
interface FileInfo {
28
+
path: string;
29
+
cid: string;
30
+
blob: any;
31
+
encoding?: 'gzip';
32
+
mimeType?: string;
33
+
base64?: boolean;
34
+
}
35
+
36
+
/**
37
+
* Attempt to fetch and cache a completely missing site on-demand.
38
+
* Returns true if the site was successfully cached, false otherwise.
39
+
*
40
+
* Uses a distributed lock (pg advisory lock) to prevent multiple
41
+
* hosting-service instances from fetching the same site simultaneously.
42
+
*/
43
+
export async function fetchAndCacheSite(did: string, rkey: string): Promise<boolean> {
44
+
const key = `${did}:${rkey}`;
45
+
46
+
// Check if there's already an in-flight fetch for this site
47
+
const existing = inFlightFetches.get(key);
48
+
if (existing) {
49
+
return existing;
50
+
}
51
+
52
+
const fetchPromise = doFetchAndCache(did, rkey);
53
+
inFlightFetches.set(key, fetchPromise);
54
+
55
+
try {
56
+
return await fetchPromise;
57
+
} finally {
58
+
inFlightFetches.delete(key);
59
+
}
60
+
}
61
+
62
+
async function doFetchAndCache(did: string, rkey: string): Promise<boolean> {
63
+
const lockKey = `on-demand-cache:${did}:${rkey}`;
64
+
65
+
// Try to acquire a distributed lock
66
+
const acquired = await tryAcquireLock(lockKey);
67
+
if (!acquired) {
68
+
console.log(`[OnDemandCache] Lock not acquired for ${did}/${rkey}, another instance is handling it`);
69
+
return false;
70
+
}
71
+
72
+
try {
73
+
console.log(`[OnDemandCache] Fetching missing site ${did}/${rkey}`);
74
+
75
+
// Fetch site record from PDS
76
+
const pdsEndpoint = await getPdsForDid(did);
77
+
if (!pdsEndpoint) {
78
+
console.error(`[OnDemandCache] Could not resolve PDS for ${did}`);
79
+
return false;
80
+
}
81
+
82
+
const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(rkey)}`;
83
+
84
+
let data: any;
85
+
try {
86
+
data = await safeFetchJson(recordUrl);
87
+
} catch (err) {
88
+
const msg = err instanceof Error ? err.message : String(err);
89
+
if (msg.includes('HTTP 404') || msg.includes('Not Found')) {
90
+
console.log(`[OnDemandCache] Site record not found on PDS: ${did}/${rkey}`);
91
+
} else {
92
+
console.error(`[OnDemandCache] Failed to fetch site record: ${did}/${rkey}`, msg);
93
+
}
94
+
return false;
95
+
}
96
+
97
+
const record = data.value as WispFsRecord;
98
+
const recordCid = data.cid || '';
99
+
100
+
if (!record?.root?.entries) {
101
+
console.error(`[OnDemandCache] Invalid record structure for ${did}/${rkey}`);
102
+
return false;
103
+
}
104
+
105
+
// Expand subfs nodes
106
+
const expandedRoot = await expandSubfsNodes(record.root, pdsEndpoint);
107
+
108
+
// Validate limits
109
+
const fileCount = countFilesInDirectory(expandedRoot);
110
+
if (fileCount > MAX_FILE_COUNT) {
111
+
console.error(`[OnDemandCache] Site exceeds file limit: ${fileCount} > ${MAX_FILE_COUNT}`);
112
+
return false;
113
+
}
114
+
115
+
// Collect files
116
+
const files = collectFileInfo(expandedRoot.entries);
117
+
118
+
// Collect file CIDs for DB
119
+
const fileCids: Record<string, string> = {};
120
+
collectFileCidsFromEntries(expandedRoot.entries, '', fileCids);
121
+
122
+
// Download and write all files to local storage (hot + warm tiers)
123
+
const CONCURRENCY = 10;
124
+
let downloaded = 0;
125
+
let failed = 0;
126
+
127
+
for (let i = 0; i < files.length; i += CONCURRENCY) {
128
+
const batch = files.slice(i, i + CONCURRENCY);
129
+
const results = await Promise.allSettled(
130
+
batch.map(file => downloadAndWriteBlob(did, rkey, file, pdsEndpoint))
131
+
);
132
+
133
+
for (const result of results) {
134
+
if (result.status === 'fulfilled') {
135
+
downloaded++;
136
+
} else {
137
+
failed++;
138
+
console.error(`[OnDemandCache] Failed to download blob:`, result.reason);
139
+
}
140
+
}
141
+
}
142
+
143
+
console.log(`[OnDemandCache] Downloaded ${downloaded} files (${failed} failed) for ${did}/${rkey}`);
144
+
145
+
// Update DB with file CIDs so future storage misses can be detected
146
+
await upsertSiteCache(did, rkey, recordCid, fileCids);
147
+
148
+
// Enqueue revalidate so firehose-service backfills S3 (cold tier)
149
+
await enqueueRevalidate(did, rkey, `on-demand-cache`);
150
+
151
+
console.log(`[OnDemandCache] Successfully cached site ${did}/${rkey}`);
152
+
return downloaded > 0;
153
+
} catch (err) {
154
+
console.error(`[OnDemandCache] Error caching site ${did}/${rkey}:`, err);
155
+
return false;
156
+
} finally {
157
+
await releaseLock(lockKey);
158
+
}
159
+
}
160
+
161
+
function collectFileInfo(entries: Entry[], pathPrefix: string = ''): FileInfo[] {
162
+
const files: FileInfo[] = [];
163
+
164
+
for (const entry of entries) {
165
+
const currentPath = pathPrefix ? `${pathPrefix}/${entry.name}` : entry.name;
166
+
const node = entry.node;
167
+
168
+
if ('type' in node && node.type === 'directory' && 'entries' in node) {
169
+
files.push(...collectFileInfo(node.entries, currentPath));
170
+
} else if ('type' in node && node.type === 'file' && 'blob' in node) {
171
+
const fileNode = node as File;
172
+
const cid = extractBlobCid(fileNode.blob);
173
+
if (cid) {
174
+
files.push({
175
+
path: currentPath,
176
+
cid,
177
+
blob: fileNode.blob,
178
+
encoding: fileNode.encoding,
179
+
mimeType: fileNode.mimeType,
180
+
base64: fileNode.base64,
181
+
});
182
+
}
183
+
}
184
+
}
185
+
186
+
return files;
187
+
}
188
+
189
+
async function downloadAndWriteBlob(
190
+
did: string,
191
+
rkey: string,
192
+
file: FileInfo,
193
+
pdsEndpoint: string
194
+
): Promise<void> {
195
+
const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`;
196
+
197
+
let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 });
198
+
let encoding = file.encoding;
199
+
200
+
// Decode base64 if flagged
201
+
if (file.base64) {
202
+
const base64String = new TextDecoder().decode(content);
203
+
content = Buffer.from(base64String, 'base64');
204
+
}
205
+
206
+
// Decompress if needed and shouldn't stay compressed
207
+
const shouldStayCompressed = shouldCompressMimeType(file.mimeType);
208
+
209
+
if (encoding === 'gzip' && !shouldStayCompressed && content.length >= 2 &&
210
+
content[0] === 0x1f && content[1] === 0x8b) {
211
+
try {
212
+
content = gunzipSync(content);
213
+
encoding = undefined;
214
+
} catch {
215
+
// Keep gzipped if decompression fails
216
+
}
217
+
}
218
+
219
+
// If encoding is missing but data looks gzipped for a text-like file, mark it
220
+
if (!encoding && isTextLikeMime(file.mimeType, file.path) && content.length >= 2 &&
221
+
content[0] === 0x1f && content[1] === 0x8b) {
222
+
encoding = 'gzip';
223
+
}
224
+
225
+
// Build storage key and metadata
226
+
const key = `${did}/${rkey}/${file.path}`;
227
+
const metadata: Record<string, string> = {};
228
+
if (encoding) metadata.encoding = encoding;
229
+
if (file.mimeType) metadata.mimeType = file.mimeType;
230
+
231
+
// Write to hot + warm tiers only (cold/S3 is read-only in hosting-service,
232
+
// firehose-service will backfill via revalidate)
233
+
await storage.set(key as any, content as any, {
234
+
metadata,
235
+
skipTiers: [],
236
+
});
237
+
}
238
+
239
+
function isTextLikeMime(mimeType?: string, path?: string): boolean {
240
+
if (mimeType) {
241
+
if (mimeType === 'text/html') return true;
242
+
if (mimeType === 'text/css') return true;
243
+
if (mimeType === 'text/javascript') return true;
244
+
if (mimeType === 'application/javascript') return true;
245
+
if (mimeType === 'application/json') return true;
246
+
if (mimeType === 'application/xml') return true;
247
+
if (mimeType === 'image/svg+xml') return true;
248
+
}
249
+
250
+
if (!path) return false;
251
+
const lower = path.toLowerCase();
252
+
return lower.endsWith('.html') ||
253
+
lower.endsWith('.htm') ||
254
+
lower.endsWith('.css') ||
255
+
lower.endsWith('.js') ||
256
+
lower.endsWith('.json') ||
257
+
lower.endsWith('.xml') ||
258
+
lower.endsWith('.svg');
259
+
}
+7
-1
apps/hosting-service/src/lib/revalidate-queue.ts
+7
-1
apps/hosting-service/src/lib/revalidate-queue.ts
···
18
18
}
19
19
20
20
if (!client) {
21
+
console.log(`[Revalidate] Connecting to Redis: ${redisUrl}`);
21
22
client = new Redis(redisUrl, {
22
23
maxRetriesPerRequest: 2,
23
24
enableReadyCheck: true,
···
26
27
client.on('error', (err) => {
27
28
console.error('[Revalidate] Redis error:', err);
28
29
});
30
+
31
+
client.on('ready', () => {
32
+
console.log(`[Revalidate] Redis connected, stream: ${streamName}`);
33
+
});
29
34
}
30
35
31
36
return client;
···
46
51
47
52
try {
48
53
const dedupeKey = `revalidate:site:${did}:${rkey}`;
49
-
const set = await redis.set(dedupeKey, '1', 'NX', 'EX', dedupeTtlSeconds);
54
+
const set = await redis.set(dedupeKey, '1', 'EX', dedupeTtlSeconds, 'NX');
50
55
if (!set) {
51
56
recordRevalidateResult('deduped');
52
57
return { enqueued: false, result: 'deduped' };
···
65
70
Date.now().toString()
66
71
);
67
72
73
+
console.log(`[Revalidate] Enqueued ${did}/${rkey} (${reason}) to ${streamName}`);
68
74
recordRevalidateResult('enqueued');
69
75
return { enqueued: true, result: 'enqueued' };
70
76
} catch (err) {
+36
-6
apps/hosting-service/src/lib/storage.ts
+36
-6
apps/hosting-service/src/lib/storage.ts
···
59
59
60
60
constructor(private tier: StorageTier) {}
61
61
62
-
// Read operations - pass through to underlying tier
62
+
// Read operations - pass through to underlying tier, catch errors as cache misses
63
63
async get(key: string) {
64
-
return this.tier.get(key);
64
+
try {
65
+
return await this.tier.get(key);
66
+
} catch (err) {
67
+
this.logReadError('get', key, err);
68
+
return null;
69
+
}
65
70
}
66
71
67
72
async getWithMetadata(key: string) {
68
-
return this.tier.getWithMetadata?.(key) ?? null;
73
+
try {
74
+
return await this.tier.getWithMetadata?.(key) ?? null;
75
+
} catch (err) {
76
+
this.logReadError('getWithMetadata', key, err);
77
+
return null;
78
+
}
69
79
}
70
80
71
81
async getStream(key: string) {
72
-
return this.tier.getStream?.(key) ?? null;
82
+
try {
83
+
return await this.tier.getStream?.(key) ?? null;
84
+
} catch (err) {
85
+
this.logReadError('getStream', key, err);
86
+
return null;
87
+
}
73
88
}
74
89
75
90
async exists(key: string) {
76
-
return this.tier.exists(key);
91
+
try {
92
+
return await this.tier.exists(key);
93
+
} catch (err) {
94
+
this.logReadError('exists', key, err);
95
+
return false;
96
+
}
77
97
}
78
98
79
99
async getMetadata(key: string) {
80
-
return this.tier.getMetadata(key);
100
+
try {
101
+
return await this.tier.getMetadata(key);
102
+
} catch (err) {
103
+
this.logReadError('getMetadata', key, err);
104
+
return null;
105
+
}
81
106
}
82
107
83
108
async *listKeys(prefix?: string) {
···
111
136
112
137
async clear() {
113
138
this.logWriteSkip('clear', 'all keys');
139
+
}
140
+
141
+
private logReadError(operation: string, key: string, err: unknown) {
142
+
const msg = err instanceof Error ? err.message : String(err);
143
+
console.warn(`[Storage] S3 read error (${operation}) for ${key}: ${msg}`);
114
144
}
115
145
116
146
private logWriteSkip(operation: string, key: string) {
+15
-5
apps/hosting-service/src/server.ts
+15
-5
apps/hosting-service/src/server.ts
···
7
7
import { cors } from 'hono/cors';
8
8
import { getWispDomain, getCustomDomain, getCustomDomainByHash } from './lib/db';
9
9
import { resolveDid } from './lib/utils';
10
-
import { logCollector, errorTracker, metricsCollector } from '@wispplace/observability';
10
+
import { createLogger, logCollector, errorTracker, metricsCollector } from '@wispplace/observability';
11
11
import { observabilityMiddleware, observabilityErrorHandler } from '@wispplace/observability/middleware/hono';
12
12
import { sanitizePath } from '@wispplace/fs-utils';
13
13
import { isValidRkey, extractHeaders } from './lib/request-utils';
14
14
import { serveFromCache, serveFromCacheWithRewrite } from './lib/file-serving';
15
15
import { getRevalidateMetrics } from './lib/revalidate-metrics';
16
16
17
-
const BASE_HOST = process.env.BASE_HOST || 'wisp.place';
17
+
const logger = createLogger('hosting-service');
18
+
19
+
const BASE_HOST_ENV = process.env.BASE_HOST || 'wisp.place';
20
+
const BASE_HOST = BASE_HOST_ENV.split(':')[0] || BASE_HOST_ENV;
18
21
19
22
const app = new Hono();
20
23
···
38
41
app.get('/*', async (c) => {
39
42
const url = new URL(c.req.url);
40
43
const hostname = c.req.header('host') || '';
41
-
const hostnameWithoutPort = hostname.split(':')[0];
44
+
const hostnameWithoutPort = hostname.split(':')[0] || '';
42
45
const rawPath = url.pathname.replace(/^\//, '');
43
46
const path = sanitizePath(rawPath);
44
47
48
+
logger.debug(`Request: host=${hostname} hostnameWithoutPort=${hostnameWithoutPort} path=${path}`, { BASE_HOST });
49
+
45
50
// Check if this is sites.wisp.place subdomain (strip port for comparison)
46
51
if (hostnameWithoutPort === `sites.${BASE_HOST}`) {
47
52
// Sanitize the path FIRST to prevent path traversal
···
78
83
return c.text('Invalid identifier', 400);
79
84
}
80
85
81
-
console.log(`[Server] sites.wisp.place request: identifier=${identifier}, site=${site}, filePath=${filePath}`);
86
+
// Redirect to trailing slash when accessing site root so relative paths resolve correctly
87
+
if (!filePath && !url.pathname.endsWith('/')) {
88
+
return c.redirect(`${url.pathname}/${url.search}`, 301);
89
+
}
90
+
91
+
logger.debug(`sites.wisp.place request: identifier=${identifier}, site=${site}, filePath=${filePath}`);
82
92
83
93
// Serve with HTML path rewriting to handle absolute paths
84
94
const basePath = `/${identifier}/${site}/`;
85
-
console.log(`[Server] Serving with basePath: ${basePath}`);
95
+
logger.debug(`Serving with basePath: ${basePath}`);
86
96
const headers = extractHeaders(c.req.raw.headers);
87
97
return serveFromCacheWithRewrite(did, site, filePath, basePath, c.req.url, headers);
88
98
}
+8
-3
bun.lock
+8
-3
bun.lock
···
28
28
"@wispplace/database": "workspace:*",
29
29
"@wispplace/fs-utils": "workspace:*",
30
30
"@wispplace/lexicons": "workspace:*",
31
+
"@wispplace/observability": "workspace:*",
31
32
"@wispplace/safe-fetch": "workspace:*",
32
33
"@wispplace/tiered-storage": "workspace:*",
33
34
"hono": "^4.10.4",
···
135
136
},
136
137
"cli": {
137
138
"name": "wispctl",
138
-
"version": "1.0.6",
139
+
"version": "1.0.8",
139
140
"bin": {
140
141
"wispctl": "dist/index.js",
141
142
},
···
1920
1921
1921
1922
"@typescript-eslint/typescript-estree/minimatch": ["minimatch@9.0.5", "", { "dependencies": { "brace-expansion": "^2.0.1" } }, "sha512-G6T0ZX48xgozx7587koeX9Ys2NYy6Gmv//P89sEte9V9whIapMNF4idKxnW2QtCcLiTWlb/wfCabAtAFWhhBow=="],
1922
1923
1923
-
"@wispplace/bun-firehose/@types/bun": ["@types/bun@1.3.8", "", { "dependencies": { "bun-types": "1.3.8" } }, "sha512-3LvWJ2q5GerAXYxO2mffLTqOzEu5qnhEAlh48Vnu8WQfnmSwbgagjGZV6BoHKJztENYEDn6QmVd949W4uESRJA=="],
1924
+
"@wispplace/bun-firehose/@types/bun": ["@types/bun@1.3.9", "", { "dependencies": { "bun-types": "1.3.9" } }, "sha512-KQ571yULOdWJiMH+RIWIOZ7B2RXQGpL1YQrBtLIV3FqDcCu6FsbFUBwhdKUlCKUpS3PJDsHlJ1QKlpxoVR+xtw=="],
1924
1925
1925
1926
"@wispplace/lexicons/@atproto/lexicon": ["@atproto/lexicon@0.5.2", "", { "dependencies": { "@atproto/common-web": "^0.4.4", "@atproto/syntax": "^0.4.1", "iso-datestring-validator": "^2.2.2", "multiformats": "^9.9.0", "zod": "^3.23.8" } }, "sha512-lRmJgMA8f5j7VB5Iu5cp188ald5FuI4FlmZ7nn6EBrk1dgOstWVrI5Ft6K3z2vjyLZRG6nzknlsw+tDP63p7bQ=="],
1926
1927
1927
1928
"@wispplace/main-app/@atproto/api": ["@atproto/api@0.17.7", "", { "dependencies": { "@atproto/common-web": "^0.4.3", "@atproto/lexicon": "^0.5.1", "@atproto/syntax": "^0.4.1", "@atproto/xrpc": "^0.7.5", "await-lock": "^2.2.2", "multiformats": "^9.9.0", "tlds": "^1.234.0", "zod": "^3.23.8" } }, "sha512-V+OJBZq9chcrD21xk1bUa6oc5DSKfQj5DmUPf5rmZncqL1w9ZEbS38H5cMyqqdhfgo2LWeDRdZHD0rvNyJsIaw=="],
1928
1929
1929
-
"@wispplace/tiered-storage/@types/bun": ["@types/bun@1.3.8", "", { "dependencies": { "bun-types": "1.3.8" } }, "sha512-3LvWJ2q5GerAXYxO2mffLTqOzEu5qnhEAlh48Vnu8WQfnmSwbgagjGZV6BoHKJztENYEDn6QmVd949W4uESRJA=="],
1930
+
"@wispplace/tiered-storage/@types/bun": ["@types/bun@1.3.9", "", { "dependencies": { "bun-types": "1.3.9" } }, "sha512-KQ571yULOdWJiMH+RIWIOZ7B2RXQGpL1YQrBtLIV3FqDcCu6FsbFUBwhdKUlCKUpS3PJDsHlJ1QKlpxoVR+xtw=="],
1930
1931
1931
1932
"accepts/mime-types": ["mime-types@2.1.35", "", { "dependencies": { "mime-db": "1.52.0" } }, "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw=="],
1932
1933
···
2058
2059
2059
2060
"@typescript-eslint/typescript-estree/minimatch/brace-expansion": ["brace-expansion@2.0.2", "", { "dependencies": { "balanced-match": "^1.0.0" } }, "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ=="],
2060
2061
2062
+
"@wispplace/bun-firehose/@types/bun/bun-types": ["bun-types@1.3.9", "", { "dependencies": { "@types/node": "*" } }, "sha512-+UBWWOakIP4Tswh0Bt0QD0alpTY8cb5hvgiYeWCMet9YukHbzuruIEeXC2D7nMJPB12kbh8C7XJykSexEqGKJg=="],
2063
+
2061
2064
"@wispplace/lexicons/@atproto/lexicon/multiformats": ["multiformats@9.9.0", "", {}, "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg=="],
2062
2065
2063
2066
"@wispplace/main-app/@atproto/api/@atproto/lexicon": ["@atproto/lexicon@0.5.2", "", { "dependencies": { "@atproto/common-web": "^0.4.4", "@atproto/syntax": "^0.4.1", "iso-datestring-validator": "^2.2.2", "multiformats": "^9.9.0", "zod": "^3.23.8" } }, "sha512-lRmJgMA8f5j7VB5Iu5cp188ald5FuI4FlmZ7nn6EBrk1dgOstWVrI5Ft6K3z2vjyLZRG6nzknlsw+tDP63p7bQ=="],
2064
2067
2065
2068
"@wispplace/main-app/@atproto/api/multiformats": ["multiformats@9.9.0", "", {}, "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg=="],
2069
+
2070
+
"@wispplace/tiered-storage/@types/bun/bun-types": ["bun-types@1.3.9", "", { "dependencies": { "@types/node": "*" } }, "sha512-+UBWWOakIP4Tswh0Bt0QD0alpTY8cb5hvgiYeWCMet9YukHbzuruIEeXC2D7nMJPB12kbh8C7XJykSexEqGKJg=="],
2066
2071
2067
2072
"accepts/mime-types/mime-db": ["mime-db@1.52.0", "", {}, "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg=="],
2068
2073
+15
docker-compose.yml
+15
docker-compose.yml
···
17
17
timeout: 5s
18
18
retries: 5
19
19
20
+
redis:
21
+
image: redis:7-alpine
22
+
container_name: wisp-redis
23
+
restart: unless-stopped
24
+
ports:
25
+
- "6379:6379"
26
+
volumes:
27
+
- redis_data:/data
28
+
healthcheck:
29
+
test: ["CMD", "redis-cli", "ping"]
30
+
interval: 5s
31
+
timeout: 5s
32
+
retries: 5
33
+
20
34
volumes:
21
35
postgres_data:
36
+
redis_data:
History
4 rounds
1 comment
nekomimi.pet
submitted
#3
7 commits
expand
collapse
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
integrate observability package across hosting and firehose services
Dockerfile
fix storage-miss revalidation loop and tier reporting
1/1 failed
expand
collapse
expand 1 comment
pull request successfully merged
nekomimi.pet
submitted
#2
5 commits
expand
collapse
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
integrate observability package across hosting and firehose services
1/1 failed
expand
collapse
expand 0 comments
nekomimi.pet
submitted
#1
4 commits
expand
collapse
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
1/1 failed
expand
collapse
expand 0 comments
nekomimi.pet
submitted
#0
1 commit
expand
collapse
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
this is what is life on us-east-1 right now. seems to be doing fine as of 2/6 10:38pm