Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place

hosting service writes on cache miss, firehose service properly notifies hosting service on new updates #6

merged opened by nekomimi.pet targeting main from hosting-service-fixes

actually forcing myself to develop good habits this year on my own projects because i deserve them

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:ttdrpj45ibqunmfhdsb4zdwq/sh.tangled.repo.pull/3merr52ear522
+173 -114
Interdiff #1 #2
apps/firehose-service/.env.example

This file has not been changed.

+59 -16
apps/firehose-service/src/index.ts
··· 15 import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer'; 16 import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker'; 17 import { closeCacheInvalidationPublisher } from './lib/cache-invalidation'; 18 19 const app = new Hono(); 20 21 // Health endpoint 22 app.get('/health', async (c) => { 23 ··· 38 if (isShuttingDown) return; 39 isShuttingDown = true; 40 41 - console.log(`\n[Service] Received ${signal}, shutting down...`); 42 43 stopFirehose(); 44 await stopRevalidateWorker(); 45 await closeCacheInvalidationPublisher(); 46 await closeDatabase(); 47 48 - console.log('[Service] Shutdown complete'); 49 process.exit(0); 50 } 51 ··· 56 * Backfill mode - process existing sites from database 57 */ 58 async function runBackfill(): Promise<void> { 59 - console.log('[Backfill] Starting backfill mode'); 60 const startTime = Date.now(); 61 const forceRewriteHtml = process.env.BACKFILL_FORCE_REWRITE_HTML === 'true'; 62 63 if (forceRewriteHtml) { 64 - console.log('[Backfill] Forcing HTML rewrite for all sites'); 65 } 66 67 let sites = await listAllSites(); 68 if (sites.length === 0) { 69 const cachedSites = await listAllSiteCaches(); 70 sites = cachedSites.map(site => ({ did: site.did, rkey: site.rkey })); 71 - console.log('[Backfill] Sites table empty; falling back to site_cache entries'); 72 } 73 74 - console.log(`[Backfill] Found ${sites.length} sites in database`); 75 76 let processed = 0; 77 let skipped = 0; ··· 83 const result = await fetchSiteRecord(site.did, site.rkey); 84 85 if (!result) { 86 - console.log(`[Backfill] Site not found on PDS: ${site.did}/${site.rkey}`); 87 skipped++; 88 continue; 89 } ··· 91 const existingCache = await getSiteCache(site.did, site.rkey); 92 // Check if CID matches (already up to date) 93 if (!forceRewriteHtml && existingCache && result.cid === existingCache.record_cid) { 94 - console.log(`[Backfill] Site already up to date: ${site.did}/${site.rkey}`); 95 skipped++; 96 continue; 97 } ··· 102 }); 103 processed++; 104 105 - console.log(`[Backfill] Progress: ${processed + skipped + failed}/${sites.length}`); 106 } catch (err) { 107 - console.error(`[Backfill] Failed to process ${site.did}/${site.rkey}:`, err); 108 failed++; 109 } 110 } ··· 115 const elapsedRemSec = elapsedSec % 60; 116 const elapsedLabel = elapsedMin > 0 ? `${elapsedMin}m ${elapsedRemSec}s` : `${elapsedSec}s`; 117 118 - console.log(`[Backfill] Complete: ${processed} processed, ${skipped} skipped, ${failed} failed (${elapsedLabel} elapsed)`); 119 } 120 121 // Main entry point 122 async function main() { 123 - console.log('[Service] Starting firehose-service'); 124 - console.log(`[Service] Mode: ${config.isBackfill ? 'backfill' : 'firehose'}`); 125 - console.log(`[Service] S3 Bucket: ${config.s3Bucket || '(disk fallback)'}`); 126 127 // Start health server 128 const server = serve({ ··· 130 port: config.healthPort, 131 }); 132 133 - console.log(`[Service] Health endpoint: http://localhost:${config.healthPort}/health`); 134 135 if (config.isBackfill) { 136 // Run backfill and exit ··· 145 } 146 147 main().catch((err) => { 148 - console.error('[Service] Fatal error:', err); 149 process.exit(1); 150 });
··· 15 import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer'; 16 import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker'; 17 import { closeCacheInvalidationPublisher } from './lib/cache-invalidation'; 18 + import { initializeGrafanaExporters, createLogger, logCollector, errorTracker, metricsCollector } from '@wispplace/observability'; 19 + import { observabilityMiddleware, observabilityErrorHandler } from '@wispplace/observability/middleware/hono'; 20 21 + // Initialize Grafana exporters if configured 22 + initializeGrafanaExporters({ 23 + serviceName: 'firehose-service', 24 + serviceVersion: '1.0.0' 25 + }); 26 + 27 + const logger = createLogger('firehose-service'); 28 + 29 const app = new Hono(); 30 31 + // Add observability middleware 32 + app.use('*', observabilityMiddleware('firehose-service')); 33 + 34 + // Error handler 35 + app.onError(observabilityErrorHandler('firehose-service')); 36 + 37 // Health endpoint 38 app.get('/health', async (c) => { 39 ··· 54 if (isShuttingDown) return; 55 isShuttingDown = true; 56 57 + logger.info(`Received ${signal}, shutting down...`); 58 59 stopFirehose(); 60 await stopRevalidateWorker(); 61 await closeCacheInvalidationPublisher(); 62 await closeDatabase(); 63 64 + logger.info('Shutdown complete'); 65 process.exit(0); 66 } 67 ··· 72 * Backfill mode - process existing sites from database 73 */ 74 async function runBackfill(): Promise<void> { 75 + logger.info('Starting backfill mode'); 76 const startTime = Date.now(); 77 const forceRewriteHtml = process.env.BACKFILL_FORCE_REWRITE_HTML === 'true'; 78 79 if (forceRewriteHtml) { 80 + logger.info('Forcing HTML rewrite for all sites'); 81 } 82 83 let sites = await listAllSites(); 84 if (sites.length === 0) { 85 const cachedSites = await listAllSiteCaches(); 86 sites = cachedSites.map(site => ({ did: site.did, rkey: site.rkey })); 87 + logger.info('Sites table empty; falling back to site_cache entries'); 88 } 89 90 + logger.info(`Found ${sites.length} sites in database`); 91 92 let processed = 0; 93 let skipped = 0; ··· 99 const result = await fetchSiteRecord(site.did, site.rkey); 100 101 if (!result) { 102 + logger.info(`Site not found on PDS: ${site.did}/${site.rkey}`); 103 skipped++; 104 continue; 105 } ··· 107 const existingCache = await getSiteCache(site.did, site.rkey); 108 // Check if CID matches (already up to date) 109 if (!forceRewriteHtml && existingCache && result.cid === existingCache.record_cid) { 110 + logger.info(`Site already up to date: ${site.did}/${site.rkey}`); 111 skipped++; 112 continue; 113 } ··· 118 }); 119 processed++; 120 121 + logger.info(`Progress: ${processed + skipped + failed}/${sites.length}`); 122 } catch (err) { 123 + logger.error(`Failed to process ${site.did}/${site.rkey}`, err); 124 failed++; 125 } 126 } ··· 131 const elapsedRemSec = elapsedSec % 60; 132 const elapsedLabel = elapsedMin > 0 ? `${elapsedMin}m ${elapsedRemSec}s` : `${elapsedSec}s`; 133 134 + logger.info(`Complete: ${processed} processed, ${skipped} skipped, ${failed} failed (${elapsedLabel} elapsed)`); 135 } 136 137 + // Internal observability endpoints (for admin panel) 138 + app.get('/__internal__/observability/logs', (c) => { 139 + const query = c.req.query(); 140 + const filter: any = {}; 141 + if (query.level) filter.level = query.level; 142 + if (query.service) filter.service = query.service; 143 + if (query.search) filter.search = query.search; 144 + if (query.eventType) filter.eventType = query.eventType; 145 + if (query.limit) filter.limit = parseInt(query.limit as string); 146 + return c.json({ logs: logCollector.getLogs(filter) }); 147 + }); 148 + 149 + app.get('/__internal__/observability/errors', (c) => { 150 + const query = c.req.query(); 151 + const filter: any = {}; 152 + if (query.service) filter.service = query.service; 153 + if (query.limit) filter.limit = parseInt(query.limit as string); 154 + return c.json({ errors: errorTracker.getErrors(filter) }); 155 + }); 156 + 157 + app.get('/__internal__/observability/metrics', (c) => { 158 + const query = c.req.query(); 159 + const timeWindow = query.timeWindow ? parseInt(query.timeWindow as string) : 3600000; 160 + const stats = metricsCollector.getStats('firehose-service', timeWindow); 161 + return c.json({ stats, timeWindow }); 162 + }); 163 + 164 // Main entry point 165 async function main() { 166 + logger.info('Starting firehose-service'); 167 + logger.info(`Mode: ${config.isBackfill ? 'backfill' : 'firehose'}`); 168 + logger.info(`S3 Bucket: ${config.s3Bucket || '(disk fallback)'}`); 169 170 // Start health server 171 const server = serve({ ··· 173 port: config.healthPort, 174 }); 175 176 + logger.info(`Health endpoint: http://localhost:${config.healthPort}/health`); 177 178 if (config.isBackfill) { 179 // Run backfill and exit ··· 188 } 189 190 main().catch((err) => { 191 + logger.error('Fatal error', err); 192 process.exit(1); 193 });
+8 -6
apps/firehose-service/src/lib/cache-invalidation.ts
··· 7 */ 8 9 import Redis from 'ioredis'; 10 import { config } from '../config'; 11 12 const CHANNEL = 'wisp:cache-invalidate'; 13 14 let publisher: Redis | null = null; ··· 17 function getPublisher(): Redis | null { 18 if (!config.redisUrl) { 19 if (!loggedMissingRedis) { 20 - console.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation publishing disabled'); 21 loggedMissingRedis = true; 22 } 23 return null; 24 } 25 26 if (!publisher) { 27 - console.log(`[CacheInvalidation] Connecting to Redis for publishing: ${config.redisUrl}`); 28 publisher = new Redis(config.redisUrl, { 29 maxRetriesPerRequest: 2, 30 enableReadyCheck: true, 31 }); 32 33 publisher.on('error', (err) => { 34 - console.error('[CacheInvalidation] Redis error:', err); 35 }); 36 37 publisher.on('ready', () => { 38 - console.log('[CacheInvalidation] Redis publisher connected'); 39 }); 40 } 41 ··· 52 53 try { 54 const message = JSON.stringify({ did, rkey, action }); 55 - console.log(`[CacheInvalidation] Publishing ${action} for ${did}/${rkey} to ${CHANNEL}`); 56 await redis.publish(CHANNEL, message); 57 } catch (err) { 58 - console.error('[CacheInvalidation] Failed to publish:', err); 59 } 60 } 61
··· 7 */ 8 9 import Redis from 'ioredis'; 10 + import { createLogger } from '@wispplace/observability'; 11 import { config } from '../config'; 12 13 + const logger = createLogger('firehose-service'); 14 const CHANNEL = 'wisp:cache-invalidate'; 15 16 let publisher: Redis | null = null; ··· 19 function getPublisher(): Redis | null { 20 if (!config.redisUrl) { 21 if (!loggedMissingRedis) { 22 + logger.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation publishing disabled'); 23 loggedMissingRedis = true; 24 } 25 return null; 26 } 27 28 if (!publisher) { 29 + logger.info(`[CacheInvalidation] Connecting to Redis for publishing: ${config.redisUrl}`); 30 publisher = new Redis(config.redisUrl, { 31 maxRetriesPerRequest: 2, 32 enableReadyCheck: true, 33 }); 34 35 publisher.on('error', (err) => { 36 + logger.error('[CacheInvalidation] Redis error', err); 37 }); 38 39 publisher.on('ready', () => { 40 + logger.info('[CacheInvalidation] Redis publisher connected'); 41 }); 42 } 43 ··· 54 55 try { 56 const message = JSON.stringify({ did, rkey, action }); 57 + logger.debug(`[CacheInvalidation] Publishing ${action} for ${did}/${rkey} to ${CHANNEL}`); 58 await redis.publish(CHANNEL, message); 59 } catch (err) { 60 + logger.error('[CacheInvalidation] Failed to publish', err); 61 } 62 } 63
+34 -30
apps/firehose-service/src/lib/cache-writer.ts
··· 11 import { collectFileCidsFromEntries, countFilesInDirectory, normalizeFileCids } from '@wispplace/fs-utils'; 12 import { shouldCompressMimeType } from '@wispplace/atproto-utils/compression'; 13 import { MAX_BLOB_SIZE, MAX_FILE_COUNT, MAX_SITE_SIZE } from '@wispplace/constants'; 14 import { writeFile, deleteFile, listFiles } from './storage'; 15 import { getSiteCache, upsertSiteCache, deleteSiteCache, upsertSiteSettingsCache, deleteSiteSettingsCache } from './db'; 16 import { rewriteHtmlPaths, isHtmlFile } from './html-rewriter'; 17 import { gunzipSync } from 'zlib'; 18 import { publishCacheInvalidation } from './cache-invalidation'; 19 20 /** 21 * Fetch a site record from the PDS 22 ··· 24 try { 25 const pdsEndpoint = await getPdsForDid(did); 26 if (!pdsEndpoint) { 27 - console.error('[Cache] Failed to get PDS endpoint for DID', { did, rkey }); 28 return null; 29 } 30 ··· 38 } catch (err) { 39 const errorMsg = err instanceof Error ? err.message : String(err); 40 if (errorMsg.includes('HTTP 404') || errorMsg.includes('Not Found')) { 41 - console.log('[Cache] Site record not found', { did, rkey }); 42 } else { 43 - console.error('[Cache] Failed to fetch site record', { did, rkey, error: errorMsg }); 44 } 45 return null; 46 } ··· 57 try { 58 const endpoint = pdsEndpoint ?? await getPdsForDid(did); 59 if (!endpoint) { 60 - console.error('[Cache] Failed to get PDS endpoint for DID (settings)', { did, rkey }); 61 return null; 62 } 63 ··· 71 } catch (err) { 72 const errorMsg = err instanceof Error ? err.message : String(err); 73 if (errorMsg.includes('HTTP 404') || errorMsg.includes('Not Found')) { 74 - console.log('[Cache] Settings record not found', { did, rkey }); 75 } else { 76 - console.error('[Cache] Failed to fetch settings record', { did, rkey, error: errorMsg }); 77 } 78 return null; 79 } ··· 137 const MAX_DEPTH = 10; 138 139 if (depth >= MAX_DEPTH) { 140 - console.error('[Cache] Max subfs expansion depth reached'); 141 return directory; 142 } 143 ··· 147 // Fetch uncached subfs records 148 const uncachedUris = subfsUris.filter(({ uri }) => !subfsCache.has(uri)); 149 if (uncachedUris.length > 0) { 150 - console.log(`[Cache] Fetching ${uncachedUris.length} subfs records (depth ${depth})`); 151 const fetchedRecords = await Promise.all( 152 uncachedUris.map(async ({ uri }) => { 153 const record = await fetchSubfsRecord(uri, pdsEndpoint); ··· 342 ): Promise<void> { 343 const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`; 344 345 - console.log(`[Cache] Downloading ${file.path}`); 346 347 let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 }); 348 let encoding = file.encoding; ··· 356 // Heuristic fallback: some records omit base64 flag but content is base64 text 357 const decoded = tryDecodeBase64(content); 358 if (decoded) { 359 - console.warn(`[Cache] Decoded base64 fallback for ${file.path} (base64 flag missing)`); 360 content = decoded; 361 } 362 } ··· 370 content = gunzipSync(content); 371 encoding = undefined; 372 } catch (error) { 373 - console.error(`[Cache] Failed to decompress ${file.path}, storing gzipped`); 374 } 375 } else if (encoding === 'gzip' && content.length >= 2 && 376 !(content[0] === 0x1f && content[1] === 0x8b)) { 377 // If marked gzip but doesn't look gzipped, attempt base64 decode and retry 378 const decoded = tryDecodeBase64(content); 379 if (decoded && decoded.length >= 2 && decoded[0] === 0x1f && decoded[1] === 0x8b) { 380 - console.warn(`[Cache] Decoded base64+gzip fallback for ${file.path}`); 381 try { 382 content = gunzipSync(decoded); 383 encoding = undefined; 384 } catch (error) { 385 - console.error(`[Cache] Failed to decompress base64+gzip fallback for ${file.path}, storing gzipped`); 386 content = decoded; 387 } 388 } ··· 413 try { 414 rewriteSource = gunzipSync(content); 415 } catch (error) { 416 - console.error(`[Cache] Failed to decompress ${file.path} for rewrite, using raw content`); 417 } 418 } 419 ··· 423 424 const rewrittenKey = `${did}/${rkey}/.rewritten/${file.path}`; 425 await writeFile(rewrittenKey, rewrittenContent, { mimeType: 'text/html' }); 426 - console.log(`[Cache] Wrote rewritten HTML: ${rewrittenKey}`); 427 } 428 429 - console.log(`[Cache] Stored ${file.path} (${content.length} bytes)`); 430 } 431 432 /** ··· 443 } 444 ): Promise<void> { 445 const forceRewriteHtml = options?.forceRewriteHtml === true; 446 - console.log(`[Cache] Processing site ${did}/${rkey}, record CID: ${recordCid}`, { 447 forceRewriteHtml, 448 }); 449 450 if (!record.root?.entries) { 451 - console.error('[Cache] Invalid record structure'); 452 return; 453 } 454 455 // Get PDS endpoint 456 const pdsEndpoint = await getPdsForDid(did); 457 if (!pdsEndpoint) { 458 - console.error('[Cache] Could not resolve PDS for', did); 459 return; 460 } 461 ··· 465 // Validate limits 466 const fileCount = countFilesInDirectory(expandedRoot); 467 if (fileCount > MAX_FILE_COUNT) { 468 - console.error(`[Cache] Site exceeds file limit: ${fileCount} > ${MAX_FILE_COUNT}`); 469 return; 470 } 471 472 const totalSize = calculateTotalBlobSize(expandedRoot); 473 if (totalSize > MAX_SITE_SIZE) { 474 - console.error(`[Cache] Site exceeds size limit: ${totalSize} > ${MAX_SITE_SIZE}`); 475 return; 476 } 477 ··· 485 const normalizedFileCids = normalizeFileCids(rawFileCids); 486 const oldFileCids = normalizedFileCids.value; 487 if (normalizedFileCids.source === 'string-invalid' || normalizedFileCids.source === 'other') { 488 - console.warn('[Cache] Existing file_cids had unexpected shape; treating as empty', { 489 did, 490 rkey, 491 type: Array.isArray(rawFileCids) ? 'array' : typeof rawFileCids, ··· 512 } 513 } 514 515 - console.log(`[Cache] Files unchanged: ${newFiles.length - filesToDownload.length}, to download: ${filesToDownload.length}, to delete: ${pathsToDelete.length}`); 516 517 // Download new/changed files (with concurrency limit) 518 const DOWNLOAD_CONCURRENCY = 20; ··· 541 } 542 543 // Update DB with new CIDs 544 - console.log(`[Cache] About to upsert site cache for ${did}/${rkey}`); 545 await upsertSiteCache(did, rkey, recordCid, newFileCids); 546 - console.log(`[Cache] Updated site cache for ${did}/${rkey} with record CID ${recordCid}`); 547 548 // Backfill settings if a record exists for this rkey 549 const settingsRecord = await fetchSettingsRecord(did, rkey, pdsEndpoint); ··· 557 await publishCacheInvalidation(did, rkey, 'update'); 558 } 559 560 - console.log(`[Cache] Successfully cached site ${did}/${rkey}`); 561 } 562 563 /** 564 * Handle a site delete event 565 */ 566 export async function handleSiteDelete(did: string, rkey: string): Promise<void> { 567 - console.log(`[Cache] Deleting site ${did}/${rkey}`); 568 569 // List all files for this site and delete them 570 const prefix = `${did}/${rkey}/`; ··· 580 // Notify hosting-service to invalidate its local caches 581 await publishCacheInvalidation(did, rkey, 'delete'); 582 583 - console.log(`[Cache] Deleted site ${did}/${rkey} (${keys.length} files)`); 584 } 585 586 /** 587 * Handle settings create/update event 588 */ 589 export async function handleSettingsUpdate(did: string, rkey: string, settings: WispSettings, recordCid: string): Promise<void> { 590 - console.log(`[Cache] Updating settings for ${did}/${rkey}`); 591 592 await upsertSiteSettingsCache(did, rkey, recordCid, { 593 directoryListing: settings.directoryListing, ··· 606 * Handle settings delete event 607 */ 608 export async function handleSettingsDelete(did: string, rkey: string): Promise<void> { 609 - console.log(`[Cache] Deleting settings for ${did}/${rkey}`); 610 await deleteSiteSettingsCache(did, rkey); 611 612 // Notify hosting-service to invalidate its local caches
··· 11 import { collectFileCidsFromEntries, countFilesInDirectory, normalizeFileCids } from '@wispplace/fs-utils'; 12 import { shouldCompressMimeType } from '@wispplace/atproto-utils/compression'; 13 import { MAX_BLOB_SIZE, MAX_FILE_COUNT, MAX_SITE_SIZE } from '@wispplace/constants'; 14 + import { createLogger } from '@wispplace/observability'; 15 import { writeFile, deleteFile, listFiles } from './storage'; 16 import { getSiteCache, upsertSiteCache, deleteSiteCache, upsertSiteSettingsCache, deleteSiteSettingsCache } from './db'; 17 import { rewriteHtmlPaths, isHtmlFile } from './html-rewriter'; 18 import { gunzipSync } from 'zlib'; 19 import { publishCacheInvalidation } from './cache-invalidation'; 20 21 + const logger = createLogger('firehose-service'); 22 + 23 /** 24 * Fetch a site record from the PDS 25 ··· 27 try { 28 const pdsEndpoint = await getPdsForDid(did); 29 if (!pdsEndpoint) { 30 + logger.error('Failed to get PDS endpoint for DID', undefined, { did, rkey }); 31 return null; 32 } 33 ··· 41 } catch (err) { 42 const errorMsg = err instanceof Error ? err.message : String(err); 43 if (errorMsg.includes('HTTP 404') || errorMsg.includes('Not Found')) { 44 + logger.info('Site record not found', { did, rkey }); 45 } else { 46 + logger.error('Failed to fetch site record', err, { did, rkey }); 47 } 48 return null; 49 } ··· 60 try { 61 const endpoint = pdsEndpoint ?? await getPdsForDid(did); 62 if (!endpoint) { 63 + logger.error('Failed to get PDS endpoint for DID (settings)', undefined, { did, rkey }); 64 return null; 65 } 66 ··· 74 } catch (err) { 75 const errorMsg = err instanceof Error ? err.message : String(err); 76 if (errorMsg.includes('HTTP 404') || errorMsg.includes('Not Found')) { 77 + logger.info('Settings record not found', { did, rkey }); 78 } else { 79 + logger.error('Failed to fetch settings record', err, { did, rkey }); 80 } 81 return null; 82 } ··· 140 const MAX_DEPTH = 10; 141 142 if (depth >= MAX_DEPTH) { 143 + logger.error('Max subfs expansion depth reached'); 144 return directory; 145 } 146 ··· 150 // Fetch uncached subfs records 151 const uncachedUris = subfsUris.filter(({ uri }) => !subfsCache.has(uri)); 152 if (uncachedUris.length > 0) { 153 + logger.info(`Fetching ${uncachedUris.length} subfs records`, { depth }); 154 const fetchedRecords = await Promise.all( 155 uncachedUris.map(async ({ uri }) => { 156 const record = await fetchSubfsRecord(uri, pdsEndpoint); ··· 345 ): Promise<void> { 346 const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`; 347 348 + logger.debug(`Downloading ${file.path}`); 349 350 let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 }); 351 let encoding = file.encoding; ··· 359 // Heuristic fallback: some records omit base64 flag but content is base64 text 360 const decoded = tryDecodeBase64(content); 361 if (decoded) { 362 + logger.warn(`Decoded base64 fallback for ${file.path} (base64 flag missing)`); 363 content = decoded; 364 } 365 } ··· 373 content = gunzipSync(content); 374 encoding = undefined; 375 } catch (error) { 376 + logger.error(`Failed to decompress ${file.path}, storing gzipped`, error); 377 } 378 } else if (encoding === 'gzip' && content.length >= 2 && 379 !(content[0] === 0x1f && content[1] === 0x8b)) { 380 // If marked gzip but doesn't look gzipped, attempt base64 decode and retry 381 const decoded = tryDecodeBase64(content); 382 if (decoded && decoded.length >= 2 && decoded[0] === 0x1f && decoded[1] === 0x8b) { 383 + logger.warn(`Decoded base64+gzip fallback for ${file.path}`); 384 try { 385 content = gunzipSync(decoded); 386 encoding = undefined; 387 } catch (error) { 388 + logger.error(`Failed to decompress base64+gzip fallback for ${file.path}, storing gzipped`, error); 389 content = decoded; 390 } 391 } ··· 416 try { 417 rewriteSource = gunzipSync(content); 418 } catch (error) { 419 + logger.error(`Failed to decompress ${file.path} for rewrite, using raw content`, error); 420 } 421 } 422 ··· 426 427 const rewrittenKey = `${did}/${rkey}/.rewritten/${file.path}`; 428 await writeFile(rewrittenKey, rewrittenContent, { mimeType: 'text/html' }); 429 + logger.debug(`Wrote rewritten HTML: ${rewrittenKey}`); 430 } 431 432 + logger.debug(`Stored ${file.path} (${content.length} bytes)`); 433 } 434 435 /** ··· 446 } 447 ): Promise<void> { 448 const forceRewriteHtml = options?.forceRewriteHtml === true; 449 + logger.info(`Processing site ${did}/${rkey}`, { 450 + recordCid, 451 forceRewriteHtml, 452 }); 453 454 if (!record.root?.entries) { 455 + logger.error('Invalid record structure'); 456 return; 457 } 458 459 // Get PDS endpoint 460 const pdsEndpoint = await getPdsForDid(did); 461 if (!pdsEndpoint) { 462 + logger.error('Could not resolve PDS', undefined, { did }); 463 return; 464 } 465 ··· 469 // Validate limits 470 const fileCount = countFilesInDirectory(expandedRoot); 471 if (fileCount > MAX_FILE_COUNT) { 472 + logger.error(`Site exceeds file limit: ${fileCount} > ${MAX_FILE_COUNT}`); 473 return; 474 } 475 476 const totalSize = calculateTotalBlobSize(expandedRoot); 477 if (totalSize > MAX_SITE_SIZE) { 478 + logger.error(`Site exceeds size limit: ${totalSize} > ${MAX_SITE_SIZE}`); 479 return; 480 } 481 ··· 489 const normalizedFileCids = normalizeFileCids(rawFileCids); 490 const oldFileCids = normalizedFileCids.value; 491 if (normalizedFileCids.source === 'string-invalid' || normalizedFileCids.source === 'other') { 492 + logger.warn('Existing file_cids had unexpected shape; treating as empty', { 493 did, 494 rkey, 495 type: Array.isArray(rawFileCids) ? 'array' : typeof rawFileCids, ··· 516 } 517 } 518 519 + logger.info(`Files unchanged: ${newFiles.length - filesToDownload.length}, to download: ${filesToDownload.length}, to delete: ${pathsToDelete.length}`); 520 521 // Download new/changed files (with concurrency limit) 522 const DOWNLOAD_CONCURRENCY = 20; ··· 545 } 546 547 // Update DB with new CIDs 548 + logger.debug(`About to upsert site cache for ${did}/${rkey}`); 549 await upsertSiteCache(did, rkey, recordCid, newFileCids); 550 + logger.debug(`Updated site cache for ${did}/${rkey} with record CID ${recordCid}`); 551 552 // Backfill settings if a record exists for this rkey 553 const settingsRecord = await fetchSettingsRecord(did, rkey, pdsEndpoint); ··· 561 await publishCacheInvalidation(did, rkey, 'update'); 562 } 563 564 + logger.info(`Successfully cached site ${did}/${rkey}`); 565 } 566 567 /** 568 * Handle a site delete event 569 */ 570 export async function handleSiteDelete(did: string, rkey: string): Promise<void> { 571 + logger.info(`Deleting site ${did}/${rkey}`); 572 573 // List all files for this site and delete them 574 const prefix = `${did}/${rkey}/`; ··· 584 // Notify hosting-service to invalidate its local caches 585 await publishCacheInvalidation(did, rkey, 'delete'); 586 587 + logger.info(`Deleted site ${did}/${rkey} (${keys.length} files)`); 588 } 589 590 /** 591 * Handle settings create/update event 592 */ 593 export async function handleSettingsUpdate(did: string, rkey: string, settings: WispSettings, recordCid: string): Promise<void> { 594 + logger.info(`Updating settings for ${did}/${rkey}`); 595 596 await upsertSiteSettingsCache(did, rkey, recordCid, { 597 directoryListing: settings.directoryListing, ··· 610 * Handle settings delete event 611 */ 612 export async function handleSettingsDelete(did: string, rkey: string): Promise<void> { 613 + logger.info(`Deleting settings for ${did}/${rkey}`); 614 await deleteSiteSettingsCache(did, rkey); 615 616 // Notify hosting-service to invalidate its local caches
+12 -11
apps/firehose-service/src/lib/db.ts
··· 1 import postgres from 'postgres'; 2 import type { SiteCache, SiteRecord, SiteSettingsCache } from '@wispplace/database'; 3 import { config } from '../config'; 4 5 const sql = postgres(config.databaseUrl, { 6 max: 10, 7 ··· 54 recordCid: string, 55 fileCids: Record<string, string> 56 ): Promise<void> { 57 - console.log(`[DB] upsertSiteCache starting for ${did}/${rkey}`); 58 try { 59 await sql` 60 INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at) ··· 65 file_cids = EXCLUDED.file_cids, 66 updated_at = EXTRACT(EPOCH FROM NOW()) 67 `; 68 - console.log(`[DB] upsertSiteCache completed for ${did}/${rkey}`); 69 } catch (err) { 70 - const error = err instanceof Error ? err : new Error(String(err)); 71 - console.error('[DB] upsertSiteCache error:', { did, rkey, error: error.message, stack: error.stack }); 72 - throw error; 73 } 74 } 75 ··· 98 const indexFiles = settings.indexFiles ?? []; 99 const headers = settings.headers ?? []; 100 101 - console.log(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, { 102 directoryListing, 103 spaMode, 104 custom404, ··· 134 headers = EXCLUDED.headers, 135 updated_at = EXTRACT(EPOCH FROM NOW()) 136 `; 137 - console.log(`[DB] upsertSiteSettingsCache completed for ${did}/${rkey}`); 138 } catch (err) { 139 - const error = err instanceof Error ? err : new Error(String(err)); 140 - console.error('[DB] upsertSiteSettingsCache error:', { did, rkey, error: error.message, stack: error.stack }); 141 - throw error; 142 } 143 } 144 ··· 148 149 export async function closeDatabase(): Promise<void> { 150 await sql.end({ timeout: 5 }); 151 - console.log('[DB] Database connections closed'); 152 } 153 154 export { sql };
··· 1 import postgres from 'postgres'; 2 import type { SiteCache, SiteRecord, SiteSettingsCache } from '@wispplace/database'; 3 + import { createLogger } from '@wispplace/observability'; 4 import { config } from '../config'; 5 6 + const logger = createLogger('firehose-service'); 7 + 8 const sql = postgres(config.databaseUrl, { 9 max: 10, 10 ··· 57 recordCid: string, 58 fileCids: Record<string, string> 59 ): Promise<void> { 60 + logger.debug(`[DB] upsertSiteCache starting for ${did}/${rkey}`); 61 try { 62 await sql` 63 INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at) ··· 68 file_cids = EXCLUDED.file_cids, 69 updated_at = EXTRACT(EPOCH FROM NOW()) 70 `; 71 + logger.debug(`[DB] upsertSiteCache completed for ${did}/${rkey}`); 72 } catch (err) { 73 + logger.error('[DB] upsertSiteCache error', err, { did, rkey }); 74 + throw err; 75 } 76 } 77 ··· 100 const indexFiles = settings.indexFiles ?? []; 101 const headers = settings.headers ?? []; 102 103 + logger.debug(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, { 104 directoryListing, 105 spaMode, 106 custom404, ··· 136 headers = EXCLUDED.headers, 137 updated_at = EXTRACT(EPOCH FROM NOW()) 138 `; 139 + logger.debug(`[DB] upsertSiteSettingsCache completed for ${did}/${rkey}`); 140 } catch (err) { 141 + logger.error('[DB] upsertSiteSettingsCache error', err, { did, rkey }); 142 + throw err; 143 } 144 } 145 ··· 149 150 export async function closeDatabase(): Promise<void> { 151 await sql.end({ timeout: 5 }); 152 + logger.info('[DB] Database connections closed'); 153 } 154 155 export { sql };
+12 -12
apps/firehose-service/src/lib/revalidate-worker.ts
··· 1 import Redis from 'ioredis'; 2 import os from 'os'; 3 import { config } from '../config'; 4 import { fetchSiteRecord, handleSiteCreateOrUpdate } from './cache-writer'; 5 6 const consumerName = process.env.WISP_REVALIDATE_CONSUMER || `${os.hostname()}:${process.pid}`; 7 const batchSize = Number.parseInt(process.env.WISP_REVALIDATE_BATCH_SIZE || '10', 10); 8 const claimIdleMs = Number.parseInt(process.env.WISP_REVALIDATE_CLAIM_IDLE_MS || '60000', 10); ··· 33 const reason = fields.reason || 'storage-miss'; 34 35 if (!did || !rkey) { 36 - console.warn('[Revalidate] Missing did/rkey in message', { id, fields }); 37 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 38 return; 39 } 40 41 - console.log(`[Revalidate] Received message ${id}: ${did}/${rkey} (${reason})`); 42 43 const record = await fetchSiteRecord(did, rkey); 44 if (!record) { 45 - console.warn(`[Revalidate] Site record not found on PDS: ${did}/${rkey}`); 46 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 47 return; 48 } ··· 51 skipInvalidation: true, 52 }); 53 54 - console.log(`[Revalidate] Completed ${id}: ${did}/${rkey}`); 55 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 56 } 57 ··· 60 try { 61 await processMessage(id, rawFields); 62 } catch (err) { 63 - const error = err instanceof Error ? err : new Error(String(err)); 64 - console.error('[Revalidate] Failed to process message', { id, error: error.message, stack: error.stack }); 65 } 66 } 67 } ··· 143 await claimStaleMessages(); 144 await readNewMessages(); 145 } catch (err) { 146 - const error = err instanceof Error ? err : new Error(String(err)); 147 - console.error('[Revalidate] Loop error', { error: error.message, stack: error.stack }); 148 await new Promise((resolve) => setTimeout(resolve, 1000)); 149 } 150 } ··· 152 153 export async function startRevalidateWorker(): Promise<void> { 154 if (!config.redisUrl) { 155 - console.warn('[Revalidate] REDIS_URL not set; revalidate worker disabled'); 156 return; 157 } 158 159 if (running) return; 160 161 - console.log(`[Revalidate] Connecting to Redis: ${config.redisUrl}`); 162 redis = new Redis(config.redisUrl, { 163 maxRetriesPerRequest: 2, 164 enableReadyCheck: true, 165 }); 166 167 redis.on('error', (err) => { 168 - console.error('[Revalidate] Redis error:', err); 169 }); 170 171 redis.on('ready', () => { 172 - console.log(`[Revalidate] Redis connected, stream: ${config.revalidateStream}, group: ${config.revalidateGroup}`); 173 }); 174 175 running = true;
··· 1 import Redis from 'ioredis'; 2 import os from 'os'; 3 + import { createLogger } from '@wispplace/observability'; 4 import { config } from '../config'; 5 import { fetchSiteRecord, handleSiteCreateOrUpdate } from './cache-writer'; 6 7 + const logger = createLogger('firehose-service'); 8 const consumerName = process.env.WISP_REVALIDATE_CONSUMER || `${os.hostname()}:${process.pid}`; 9 const batchSize = Number.parseInt(process.env.WISP_REVALIDATE_BATCH_SIZE || '10', 10); 10 const claimIdleMs = Number.parseInt(process.env.WISP_REVALIDATE_CLAIM_IDLE_MS || '60000', 10); ··· 35 const reason = fields.reason || 'storage-miss'; 36 37 if (!did || !rkey) { 38 + logger.warn('[Revalidate] Missing did/rkey in message', { id, fields }); 39 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 40 return; 41 } 42 43 + logger.info(`[Revalidate] Received message ${id}: ${did}/${rkey} (${reason})`); 44 45 const record = await fetchSiteRecord(did, rkey); 46 if (!record) { 47 + logger.warn(`[Revalidate] Site record not found on PDS: ${did}/${rkey}`); 48 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 49 return; 50 } ··· 53 skipInvalidation: true, 54 }); 55 56 + logger.info(`[Revalidate] Completed ${id}: ${did}/${rkey}`); 57 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 58 } 59 ··· 62 try { 63 await processMessage(id, rawFields); 64 } catch (err) { 65 + logger.error('[Revalidate] Failed to process message', err, { id }); 66 } 67 } 68 } ··· 144 await claimStaleMessages(); 145 await readNewMessages(); 146 } catch (err) { 147 + logger.error('[Revalidate] Loop error', err); 148 await new Promise((resolve) => setTimeout(resolve, 1000)); 149 } 150 } ··· 152 153 export async function startRevalidateWorker(): Promise<void> { 154 if (!config.redisUrl) { 155 + logger.warn('[Revalidate] REDIS_URL not set; revalidate worker disabled'); 156 return; 157 } 158 159 if (running) return; 160 161 + logger.info(`[Revalidate] Connecting to Redis: ${config.redisUrl}`); 162 redis = new Redis(config.redisUrl, { 163 maxRetriesPerRequest: 2, 164 enableReadyCheck: true, 165 }); 166 167 redis.on('error', (err) => { 168 + logger.error('[Revalidate] Redis error', err); 169 }); 170 171 redis.on('ready', () => { 172 + logger.info(`[Revalidate] Redis connected, stream: ${config.revalidateStream}, group: ${config.revalidateGroup}`); 173 }); 174 175 running = true;
apps/hosting-service/.env.example

This file has not been changed.

+9 -7
apps/hosting-service/src/index.ts
··· 1 import app from './server'; 2 import { serve } from '@hono/node-server'; 3 - import { initializeGrafanaExporters } from '@wispplace/observability'; 4 import { mkdirSync, existsSync } from 'fs'; 5 import { startDomainCacheCleanup, stopDomainCacheCleanup, closeDatabase } from './lib/db'; 6 import { closeRevalidateQueue } from './lib/revalidate-queue'; 7 import { startCacheInvalidationSubscriber, stopCacheInvalidationSubscriber } from './lib/cache-invalidation'; 8 import { storage, getStorageConfig } from './lib/storage'; 9 10 // Initialize Grafana exporters if configured 11 initializeGrafanaExporters({ 12 ··· 19 // Ensure cache directory exists 20 if (!existsSync(CACHE_DIR)) { 21 mkdirSync(CACHE_DIR, { recursive: true }); 22 - console.log('Created cache directory:', CACHE_DIR); 23 } 24 25 // Start domain cache cleanup ··· 33 const BOOTSTRAP_HOT_LIMIT = process.env.BOOTSTRAP_HOT_LIMIT ? parseInt(process.env.BOOTSTRAP_HOT_LIMIT) : 100; 34 35 if (BOOTSTRAP_HOT_ON_STARTUP) { 36 - console.log(`🔥 Bootstrapping hot cache (top ${BOOTSTRAP_HOT_LIMIT} items)...`); 37 storage.bootstrapHot(BOOTSTRAP_HOT_LIMIT) 38 .then((loaded: number) => { 39 - console.log(`✅ Bootstrapped ${loaded} items into hot cache`); 40 }) 41 .catch((err: unknown) => { 42 - console.error('❌ Hot cache bootstrap error:', err); 43 }); 44 } 45 ··· 82 83 // Graceful shutdown 84 process.on('SIGINT', async () => { 85 - console.log('\n🛑 Shutting down...'); 86 stopDomainCacheCleanup(); 87 await stopCacheInvalidationSubscriber(); 88 await closeRevalidateQueue(); ··· 92 }); 93 94 process.on('SIGTERM', async () => { 95 - console.log('\n🛑 Shutting down...'); 96 stopDomainCacheCleanup(); 97 await stopCacheInvalidationSubscriber(); 98 await closeRevalidateQueue();
··· 1 import app from './server'; 2 import { serve } from '@hono/node-server'; 3 + import { initializeGrafanaExporters, createLogger } from '@wispplace/observability'; 4 import { mkdirSync, existsSync } from 'fs'; 5 import { startDomainCacheCleanup, stopDomainCacheCleanup, closeDatabase } from './lib/db'; 6 import { closeRevalidateQueue } from './lib/revalidate-queue'; 7 import { startCacheInvalidationSubscriber, stopCacheInvalidationSubscriber } from './lib/cache-invalidation'; 8 import { storage, getStorageConfig } from './lib/storage'; 9 10 + const logger = createLogger('hosting-service'); 11 + 12 // Initialize Grafana exporters if configured 13 initializeGrafanaExporters({ 14 ··· 21 // Ensure cache directory exists 22 if (!existsSync(CACHE_DIR)) { 23 mkdirSync(CACHE_DIR, { recursive: true }); 24 + logger.info('Created cache directory', { CACHE_DIR }); 25 } 26 27 // Start domain cache cleanup ··· 35 const BOOTSTRAP_HOT_LIMIT = process.env.BOOTSTRAP_HOT_LIMIT ? parseInt(process.env.BOOTSTRAP_HOT_LIMIT) : 100; 36 37 if (BOOTSTRAP_HOT_ON_STARTUP) { 38 + logger.info(`Bootstrapping hot cache (top ${BOOTSTRAP_HOT_LIMIT} items)...`); 39 storage.bootstrapHot(BOOTSTRAP_HOT_LIMIT) 40 .then((loaded: number) => { 41 + logger.info(`Bootstrapped ${loaded} items into hot cache`); 42 }) 43 .catch((err: unknown) => { 44 + logger.error('Hot cache bootstrap error', err); 45 }); 46 } 47 ··· 84 85 // Graceful shutdown 86 process.on('SIGINT', async () => { 87 + logger.info('Shutting down...'); 88 stopDomainCacheCleanup(); 89 await stopCacheInvalidationSubscriber(); 90 await closeRevalidateQueue(); ··· 94 }); 95 96 process.on('SIGTERM', async () => { 97 + logger.info('Shutting down...'); 98 stopDomainCacheCleanup(); 99 await stopCacheInvalidationSubscriber(); 100 await closeRevalidateQueue();
apps/hosting-service/src/lib/cache-invalidation.ts

This file has not been changed.

apps/hosting-service/src/lib/db.ts

This file has not been changed.

apps/hosting-service/src/lib/file-serving.ts

This file has not been changed.

apps/hosting-service/src/lib/on-demand-cache.ts

This file has not been changed.

apps/hosting-service/src/lib/revalidate-queue.ts

This file has not been changed.

apps/hosting-service/src/lib/storage.ts

This file has not been changed.

+6 -4
apps/hosting-service/src/server.ts
··· 7 import { cors } from 'hono/cors'; 8 import { getWispDomain, getCustomDomain, getCustomDomainByHash } from './lib/db'; 9 import { resolveDid } from './lib/utils'; 10 - import { logCollector, errorTracker, metricsCollector } from '@wispplace/observability'; 11 import { observabilityMiddleware, observabilityErrorHandler } from '@wispplace/observability/middleware/hono'; 12 import { sanitizePath } from '@wispplace/fs-utils'; 13 import { isValidRkey, extractHeaders } from './lib/request-utils'; 14 import { serveFromCache, serveFromCacheWithRewrite } from './lib/file-serving'; 15 import { getRevalidateMetrics } from './lib/revalidate-metrics'; 16 17 const BASE_HOST_ENV = process.env.BASE_HOST || 'wisp.place'; 18 const BASE_HOST = BASE_HOST_ENV.split(':')[0] || BASE_HOST_ENV; 19 ··· 43 const rawPath = url.pathname.replace(/^\//, ''); 44 const path = sanitizePath(rawPath); 45 46 - console.log(`[Server] Request: host=${hostname} hostnameWithoutPort=${hostnameWithoutPort} path=${path} BASE_HOST=${BASE_HOST}`); 47 48 // Check if this is sites.wisp.place subdomain (strip port for comparison) 49 if (hostnameWithoutPort === `sites.${BASE_HOST}`) { ··· 86 return c.redirect(`${url.pathname}/${url.search}`, 301); 87 } 88 89 - console.log(`[Server] sites.wisp.place request: identifier=${identifier}, site=${site}, filePath=${filePath}`); 90 91 // Serve with HTML path rewriting to handle absolute paths 92 const basePath = `/${identifier}/${site}/`; 93 - console.log(`[Server] Serving with basePath: ${basePath}`); 94 const headers = extractHeaders(c.req.raw.headers); 95 return serveFromCacheWithRewrite(did, site, filePath, basePath, c.req.url, headers); 96 }
··· 7 import { cors } from 'hono/cors'; 8 import { getWispDomain, getCustomDomain, getCustomDomainByHash } from './lib/db'; 9 import { resolveDid } from './lib/utils'; 10 + import { createLogger, logCollector, errorTracker, metricsCollector } from '@wispplace/observability'; 11 import { observabilityMiddleware, observabilityErrorHandler } from '@wispplace/observability/middleware/hono'; 12 import { sanitizePath } from '@wispplace/fs-utils'; 13 import { isValidRkey, extractHeaders } from './lib/request-utils'; 14 import { serveFromCache, serveFromCacheWithRewrite } from './lib/file-serving'; 15 import { getRevalidateMetrics } from './lib/revalidate-metrics'; 16 17 + const logger = createLogger('hosting-service'); 18 + 19 const BASE_HOST_ENV = process.env.BASE_HOST || 'wisp.place'; 20 const BASE_HOST = BASE_HOST_ENV.split(':')[0] || BASE_HOST_ENV; 21 ··· 45 const rawPath = url.pathname.replace(/^\//, ''); 46 const path = sanitizePath(rawPath); 47 48 + logger.debug(`Request: host=${hostname} hostnameWithoutPort=${hostnameWithoutPort} path=${path}`, { BASE_HOST }); 49 50 // Check if this is sites.wisp.place subdomain (strip port for comparison) 51 if (hostnameWithoutPort === `sites.${BASE_HOST}`) { ··· 88 return c.redirect(`${url.pathname}/${url.search}`, 301); 89 } 90 91 + logger.debug(`sites.wisp.place request: identifier=${identifier}, site=${site}, filePath=${filePath}`); 92 93 // Serve with HTML path rewriting to handle absolute paths 94 const basePath = `/${identifier}/${site}/`; 95 + logger.debug(`Serving with basePath: ${basePath}`); 96 const headers = extractHeaders(c.req.raw.headers); 97 return serveFromCacheWithRewrite(did, site, filePath, basePath, c.req.url, headers); 98 }
docker-compose.yml

This file has not been changed.

+1
apps/firehose-service/package.json
··· 20 "@wispplace/database": "workspace:*", 21 "@wispplace/fs-utils": "workspace:*", 22 "@wispplace/lexicons": "workspace:*", 23 "@wispplace/safe-fetch": "workspace:*", 24 "@wispplace/tiered-storage": "workspace:*", 25 "hono": "^4.10.4",
··· 20 "@wispplace/database": "workspace:*", 21 "@wispplace/fs-utils": "workspace:*", 22 "@wispplace/lexicons": "workspace:*", 23 + "@wispplace/observability": "workspace:*", 24 "@wispplace/safe-fetch": "workspace:*", 25 "@wispplace/tiered-storage": "workspace:*", 26 "hono": "^4.10.4",
+19 -23
apps/firehose-service/src/lib/firehose.ts
··· 8 import { isBun, BunFirehose, type Event, type CommitEvt } from '@wispplace/bun-firehose'; 9 import type { Record as WispFsRecord } from '@wispplace/lexicons/types/place/wisp/fs'; 10 import type { Record as WispSettings } from '@wispplace/lexicons/types/place/wisp/settings'; 11 import { config } from '../config'; 12 import { 13 handleSiteCreateOrUpdate, ··· 18 } from './cache-writer'; 19 20 const idResolver = new IdResolver(); 21 22 // Track firehose health 23 let lastEventTime = Date.now(); ··· 70 const commitEvt = evt as CommitEvt; 71 const { did, collection, rkey, record, cid } = commitEvt; 72 73 - console.log(`[Firehose] Debug: Event ${evt.event} for ${collection}:${did}/${rkey}, CID: ${cid?.toString() || 'unknown'}`); 74 75 // Handle place.wisp.fs events 76 if (collection === 'place.wisp.fs') { 77 - console.log(`[Firehose] Received ${commitEvt.event} event for ${did}/${rkey}, CID: ${cid?.toString() || 'unknown'}`); 78 processWithConcurrencyLimit(async () => { 79 try { 80 - console.log(`[Firehose] Inside handler for ${commitEvt.event} event for ${did}/${rkey}`); 81 if (commitEvt.event === 'delete') { 82 await handleSiteDelete(did, rkey); 83 } else { ··· 87 if (verified) { 88 await handleSiteCreateOrUpdate(did, rkey, verified.record, verified.cid); 89 } else { 90 - console.log(`[Firehose] Skipping ${commitEvt.event} event for ${did}/${rkey} - verification failed`); 91 } 92 } 93 - console.log(`[Firehose] Completed handler for ${commitEvt.event} event for ${did}/${rkey}`); 94 } catch (err) { 95 - const error = err instanceof Error ? err : new Error(String(err)); 96 - console.error('[Firehose] Error handling place.wisp.fs event:', { did, rkey, event: commitEvt.event, error: error.message, stack: error.stack }); 97 } 98 }).catch(err => { 99 - const error = err instanceof Error ? err : new Error(String(err)); 100 - console.error('[Firehose] Error processing place.wisp.fs event:', { did, rkey, event: commitEvt.event, error: error.message, stack: error.stack }); 101 }); 102 } 103 ··· 112 await handleSettingsUpdate(did, rkey, record as WispSettings, cidStr); 113 } 114 } catch (err) { 115 - const error = err instanceof Error ? err : new Error(String(err)); 116 - console.error('[Firehose] Error handling place.wisp.settings event:', { did, rkey, event: commitEvt.event, error: error.message, stack: error.stack }); 117 } 118 }).catch(err => { 119 - const error = err instanceof Error ? err : new Error(String(err)); 120 - console.error('[Firehose] Error processing place.wisp.settings event:', { did, rkey, event: commitEvt.event, error: error.message, stack: error.stack }); 121 }); 122 } 123 } catch (err) { 124 - const error = err instanceof Error ? err : new Error(String(err)); 125 - console.error('[Firehose] Unexpected error in handleEvent:', { error: error.message, stack: error.stack }); 126 } 127 } 128 129 function handleError(err: Error): void { 130 - console.error('[Firehose] Error:', err); 131 - console.error('[Firehose] Stack:', err.stack); 132 } 133 134 let firehoseHandle: { destroy: () => void } | null = null; ··· 137 * Start the firehose worker 138 */ 139 export function startFirehose(): void { 140 - console.log(`[Firehose] Starting (runtime: ${isBun ? 'Bun' : 'Node.js'})`); 141 - console.log(`[Firehose] Service: ${config.firehoseService}`); 142 - console.log(`[Firehose] Max concurrency: ${config.firehoseMaxConcurrency}`); 143 144 isConnected = true; 145 ··· 169 170 // Log cache info hourly 171 setInterval(() => { 172 - console.log('[Firehose] Hourly status check'); 173 }, 60 * 60 * 1000); 174 175 // Log status periodically 176 setInterval(() => { 177 const health = getFirehoseHealth(); 178 if (health.timeSinceLastEvent > 30000) { 179 - console.log(`[Firehose] No events for ${Math.round(health.timeSinceLastEvent / 1000)}s`); 180 } 181 }, 30000); 182 } ··· 185 * Stop the firehose worker 186 */ 187 export function stopFirehose(): void { 188 - console.log('[Firehose] Stopping'); 189 isConnected = false; 190 firehoseHandle?.destroy(); 191 firehoseHandle = null;
··· 8 import { isBun, BunFirehose, type Event, type CommitEvt } from '@wispplace/bun-firehose'; 9 import type { Record as WispFsRecord } from '@wispplace/lexicons/types/place/wisp/fs'; 10 import type { Record as WispSettings } from '@wispplace/lexicons/types/place/wisp/settings'; 11 + import { createLogger } from '@wispplace/observability'; 12 import { config } from '../config'; 13 import { 14 handleSiteCreateOrUpdate, ··· 19 } from './cache-writer'; 20 21 const idResolver = new IdResolver(); 22 + const logger = createLogger('firehose-service'); 23 24 // Track firehose health 25 let lastEventTime = Date.now(); ··· 72 const commitEvt = evt as CommitEvt; 73 const { did, collection, rkey, record, cid } = commitEvt; 74 75 + logger.debug(`Event ${evt.event} for ${collection}:${did}/${rkey}`, { cid: cid?.toString() || 'unknown' }); 76 77 // Handle place.wisp.fs events 78 if (collection === 'place.wisp.fs') { 79 + logger.info(`[place.wisp.fs] Received ${commitEvt.event} event`, { did, rkey, cid: cid?.toString() || 'unknown' }); 80 processWithConcurrencyLimit(async () => { 81 try { 82 + logger.debug(`[place.wisp.fs] Processing ${commitEvt.event} event`, { did, rkey }); 83 if (commitEvt.event === 'delete') { 84 await handleSiteDelete(did, rkey); 85 } else { ··· 89 if (verified) { 90 await handleSiteCreateOrUpdate(did, rkey, verified.record, verified.cid); 91 } else { 92 + logger.warn(`[place.wisp.fs] Skipping ${commitEvt.event} event - verification failed`, { did, rkey }); 93 } 94 } 95 + logger.debug(`[place.wisp.fs] Completed ${commitEvt.event} event`, { did, rkey }); 96 } catch (err) { 97 + logger.error(`[place.wisp.fs] Error handling event`, err, { did, rkey, event: commitEvt.event }); 98 } 99 }).catch(err => { 100 + logger.error(`[place.wisp.fs] Error processing event`, err, { did, rkey, event: commitEvt.event }); 101 }); 102 } 103 ··· 112 await handleSettingsUpdate(did, rkey, record as WispSettings, cidStr); 113 } 114 } catch (err) { 115 + logger.error(`[place.wisp.settings] Error handling event`, err, { did, rkey, event: commitEvt.event }); 116 } 117 }).catch(err => { 118 + logger.error(`[place.wisp.settings] Error processing event`, err, { did, rkey, event: commitEvt.event }); 119 }); 120 } 121 } catch (err) { 122 + logger.error('Unexpected error in handleEvent', err); 123 } 124 } 125 126 function handleError(err: Error): void { 127 + logger.error('Firehose connection error', err); 128 } 129 130 let firehoseHandle: { destroy: () => void } | null = null; ··· 133 * Start the firehose worker 134 */ 135 export function startFirehose(): void { 136 + logger.info(`Starting firehose (runtime: ${isBun ? 'Bun' : 'Node.js'})`); 137 + logger.info(`Service: ${config.firehoseService}`); 138 + logger.info(`Max concurrency: ${config.firehoseMaxConcurrency}`); 139 140 isConnected = true; 141 ··· 165 166 // Log cache info hourly 167 setInterval(() => { 168 + logger.info('Hourly status check'); 169 }, 60 * 60 * 1000); 170 171 // Log status periodically 172 setInterval(() => { 173 const health = getFirehoseHealth(); 174 if (health.timeSinceLastEvent > 30000) { 175 + logger.warn(`No events for ${Math.round(health.timeSinceLastEvent / 1000)}s`); 176 } 177 }, 30000); 178 } ··· 181 * Stop the firehose worker 182 */ 183 export function stopFirehose(): void { 184 + logger.info('Stopping firehose'); 185 isConnected = false; 186 firehoseHandle?.destroy(); 187 firehoseHandle = null;
+5 -2
apps/firehose-service/src/lib/storage.ts
··· 8 S3StorageTier, 9 DiskStorageTier, 10 } from '@wispplace/tiered-storage'; 11 import { config } from '../config'; 12 13 // Create S3 tier (or fallback to disk for local dev) 14 let coldTier: S3StorageTier | DiskStorageTier; ··· 28 forcePathStyle: config.s3ForcePathStyle, 29 metadataBucket: config.s3MetadataBucket, 30 }); 31 - console.log('[Storage] Using S3 cold tier:', config.s3Bucket); 32 } else { 33 // Fallback to disk for local development 34 const cacheDir = process.env.CACHE_DIR || './cache/sites'; ··· 38 evictionPolicy: 'lru', 39 encodeColons: false, 40 }); 41 - console.log('[Storage] Using disk fallback (no S3_BUCKET configured):', cacheDir); 42 } 43 44 // Identity serializers for raw binary data (no JSON transformation)
··· 8 S3StorageTier, 9 DiskStorageTier, 10 } from '@wispplace/tiered-storage'; 11 + import { createLogger } from '@wispplace/observability'; 12 import { config } from '../config'; 13 + 14 + const logger = createLogger('firehose-service'); 15 16 // Create S3 tier (or fallback to disk for local dev) 17 let coldTier: S3StorageTier | DiskStorageTier; ··· 31 forcePathStyle: config.s3ForcePathStyle, 32 metadataBucket: config.s3MetadataBucket, 33 }); 34 + logger.info('[Storage] Using S3 cold tier:', { bucket: config.s3Bucket }); 35 } else { 36 // Fallback to disk for local development 37 const cacheDir = process.env.CACHE_DIR || './cache/sites'; ··· 41 evictionPolicy: 'lru', 42 encodeColons: false, 43 }); 44 + logger.info('[Storage] Using disk fallback (no S3_BUCKET configured):', { cacheDir }); 45 } 46 47 // Identity serializers for raw binary data (no JSON transformation)
+8 -3
bun.lock
··· 28 "@wispplace/database": "workspace:*", 29 "@wispplace/fs-utils": "workspace:*", 30 "@wispplace/lexicons": "workspace:*", 31 "@wispplace/safe-fetch": "workspace:*", 32 "@wispplace/tiered-storage": "workspace:*", 33 "hono": "^4.10.4", ··· 135 }, 136 "cli": { 137 "name": "wispctl", 138 - "version": "1.0.6", 139 "bin": { 140 "wispctl": "dist/index.js", 141 }, ··· 1920 1921 "@typescript-eslint/typescript-estree/minimatch": ["minimatch@9.0.5", "", { "dependencies": { "brace-expansion": "^2.0.1" } }, "sha512-G6T0ZX48xgozx7587koeX9Ys2NYy6Gmv//P89sEte9V9whIapMNF4idKxnW2QtCcLiTWlb/wfCabAtAFWhhBow=="], 1922 1923 - "@wispplace/bun-firehose/@types/bun": ["@types/bun@1.3.8", "", { "dependencies": { "bun-types": "1.3.8" } }, "sha512-3LvWJ2q5GerAXYxO2mffLTqOzEu5qnhEAlh48Vnu8WQfnmSwbgagjGZV6BoHKJztENYEDn6QmVd949W4uESRJA=="], 1924 1925 "@wispplace/lexicons/@atproto/lexicon": ["@atproto/lexicon@0.5.2", "", { "dependencies": { "@atproto/common-web": "^0.4.4", "@atproto/syntax": "^0.4.1", "iso-datestring-validator": "^2.2.2", "multiformats": "^9.9.0", "zod": "^3.23.8" } }, "sha512-lRmJgMA8f5j7VB5Iu5cp188ald5FuI4FlmZ7nn6EBrk1dgOstWVrI5Ft6K3z2vjyLZRG6nzknlsw+tDP63p7bQ=="], 1926 1927 "@wispplace/main-app/@atproto/api": ["@atproto/api@0.17.7", "", { "dependencies": { "@atproto/common-web": "^0.4.3", "@atproto/lexicon": "^0.5.1", "@atproto/syntax": "^0.4.1", "@atproto/xrpc": "^0.7.5", "await-lock": "^2.2.2", "multiformats": "^9.9.0", "tlds": "^1.234.0", "zod": "^3.23.8" } }, "sha512-V+OJBZq9chcrD21xk1bUa6oc5DSKfQj5DmUPf5rmZncqL1w9ZEbS38H5cMyqqdhfgo2LWeDRdZHD0rvNyJsIaw=="], 1928 1929 - "@wispplace/tiered-storage/@types/bun": ["@types/bun@1.3.8", "", { "dependencies": { "bun-types": "1.3.8" } }, "sha512-3LvWJ2q5GerAXYxO2mffLTqOzEu5qnhEAlh48Vnu8WQfnmSwbgagjGZV6BoHKJztENYEDn6QmVd949W4uESRJA=="], 1930 1931 "accepts/mime-types": ["mime-types@2.1.35", "", { "dependencies": { "mime-db": "1.52.0" } }, "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw=="], 1932 ··· 2058 2059 "@typescript-eslint/typescript-estree/minimatch/brace-expansion": ["brace-expansion@2.0.2", "", { "dependencies": { "balanced-match": "^1.0.0" } }, "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ=="], 2060 2061 "@wispplace/lexicons/@atproto/lexicon/multiformats": ["multiformats@9.9.0", "", {}, "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg=="], 2062 2063 "@wispplace/main-app/@atproto/api/@atproto/lexicon": ["@atproto/lexicon@0.5.2", "", { "dependencies": { "@atproto/common-web": "^0.4.4", "@atproto/syntax": "^0.4.1", "iso-datestring-validator": "^2.2.2", "multiformats": "^9.9.0", "zod": "^3.23.8" } }, "sha512-lRmJgMA8f5j7VB5Iu5cp188ald5FuI4FlmZ7nn6EBrk1dgOstWVrI5Ft6K3z2vjyLZRG6nzknlsw+tDP63p7bQ=="], 2064 2065 "@wispplace/main-app/@atproto/api/multiformats": ["multiformats@9.9.0", "", {}, "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg=="], 2066 2067 "accepts/mime-types/mime-db": ["mime-db@1.52.0", "", {}, "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg=="], 2068
··· 28 "@wispplace/database": "workspace:*", 29 "@wispplace/fs-utils": "workspace:*", 30 "@wispplace/lexicons": "workspace:*", 31 + "@wispplace/observability": "workspace:*", 32 "@wispplace/safe-fetch": "workspace:*", 33 "@wispplace/tiered-storage": "workspace:*", 34 "hono": "^4.10.4", ··· 136 }, 137 "cli": { 138 "name": "wispctl", 139 + "version": "1.0.8", 140 "bin": { 141 "wispctl": "dist/index.js", 142 }, ··· 1921 1922 "@typescript-eslint/typescript-estree/minimatch": ["minimatch@9.0.5", "", { "dependencies": { "brace-expansion": "^2.0.1" } }, "sha512-G6T0ZX48xgozx7587koeX9Ys2NYy6Gmv//P89sEte9V9whIapMNF4idKxnW2QtCcLiTWlb/wfCabAtAFWhhBow=="], 1923 1924 + "@wispplace/bun-firehose/@types/bun": ["@types/bun@1.3.9", "", { "dependencies": { "bun-types": "1.3.9" } }, "sha512-KQ571yULOdWJiMH+RIWIOZ7B2RXQGpL1YQrBtLIV3FqDcCu6FsbFUBwhdKUlCKUpS3PJDsHlJ1QKlpxoVR+xtw=="], 1925 1926 "@wispplace/lexicons/@atproto/lexicon": ["@atproto/lexicon@0.5.2", "", { "dependencies": { "@atproto/common-web": "^0.4.4", "@atproto/syntax": "^0.4.1", "iso-datestring-validator": "^2.2.2", "multiformats": "^9.9.0", "zod": "^3.23.8" } }, "sha512-lRmJgMA8f5j7VB5Iu5cp188ald5FuI4FlmZ7nn6EBrk1dgOstWVrI5Ft6K3z2vjyLZRG6nzknlsw+tDP63p7bQ=="], 1927 1928 "@wispplace/main-app/@atproto/api": ["@atproto/api@0.17.7", "", { "dependencies": { "@atproto/common-web": "^0.4.3", "@atproto/lexicon": "^0.5.1", "@atproto/syntax": "^0.4.1", "@atproto/xrpc": "^0.7.5", "await-lock": "^2.2.2", "multiformats": "^9.9.0", "tlds": "^1.234.0", "zod": "^3.23.8" } }, "sha512-V+OJBZq9chcrD21xk1bUa6oc5DSKfQj5DmUPf5rmZncqL1w9ZEbS38H5cMyqqdhfgo2LWeDRdZHD0rvNyJsIaw=="], 1929 1930 + "@wispplace/tiered-storage/@types/bun": ["@types/bun@1.3.9", "", { "dependencies": { "bun-types": "1.3.9" } }, "sha512-KQ571yULOdWJiMH+RIWIOZ7B2RXQGpL1YQrBtLIV3FqDcCu6FsbFUBwhdKUlCKUpS3PJDsHlJ1QKlpxoVR+xtw=="], 1931 1932 "accepts/mime-types": ["mime-types@2.1.35", "", { "dependencies": { "mime-db": "1.52.0" } }, "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw=="], 1933 ··· 2059 2060 "@typescript-eslint/typescript-estree/minimatch/brace-expansion": ["brace-expansion@2.0.2", "", { "dependencies": { "balanced-match": "^1.0.0" } }, "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ=="], 2061 2062 + "@wispplace/bun-firehose/@types/bun/bun-types": ["bun-types@1.3.9", "", { "dependencies": { "@types/node": "*" } }, "sha512-+UBWWOakIP4Tswh0Bt0QD0alpTY8cb5hvgiYeWCMet9YukHbzuruIEeXC2D7nMJPB12kbh8C7XJykSexEqGKJg=="], 2063 + 2064 "@wispplace/lexicons/@atproto/lexicon/multiformats": ["multiformats@9.9.0", "", {}, "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg=="], 2065 2066 "@wispplace/main-app/@atproto/api/@atproto/lexicon": ["@atproto/lexicon@0.5.2", "", { "dependencies": { "@atproto/common-web": "^0.4.4", "@atproto/syntax": "^0.4.1", "iso-datestring-validator": "^2.2.2", "multiformats": "^9.9.0", "zod": "^3.23.8" } }, "sha512-lRmJgMA8f5j7VB5Iu5cp188ald5FuI4FlmZ7nn6EBrk1dgOstWVrI5Ft6K3z2vjyLZRG6nzknlsw+tDP63p7bQ=="], 2067 2068 "@wispplace/main-app/@atproto/api/multiformats": ["multiformats@9.9.0", "", {}, "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg=="], 2069 + 2070 + "@wispplace/tiered-storage/@types/bun/bun-types": ["bun-types@1.3.9", "", { "dependencies": { "@types/node": "*" } }, "sha512-+UBWWOakIP4Tswh0Bt0QD0alpTY8cb5hvgiYeWCMet9YukHbzuruIEeXC2D7nMJPB12kbh8C7XJykSexEqGKJg=="], 2071 2072 "accepts/mime-types/mime-db": ["mime-db@1.52.0", "", {}, "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg=="], 2073

History

4 rounds 1 comment
sign up or login to add to the discussion
7 commits
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
integrate observability package across hosting and firehose services
Dockerfile
fix storage-miss revalidation loop and tier reporting
1/1 failed
expand
expand 1 comment

this is what is life on us-east-1 right now. seems to be doing fine as of 2/6 10:38pm

pull request successfully merged
5 commits
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
integrate observability package across hosting and firehose services
1/1 failed
expand
expand 0 comments
4 commits
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
1/1 failed
expand
expand 0 comments
1 commit
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
1/1 failed
expand
expand 0 comments