Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place

hosting service writes on cache miss, firehose service properly notifies hosting service on new updates #6

merged opened by nekomimi.pet targeting main from hosting-service-fixes

actually forcing myself to develop good habits this year on my own projects because i deserve them

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:ttdrpj45ibqunmfhdsb4zdwq/sh.tangled.repo.pull/3merr52ear522
+802 -129
Diff #2
+27
apps/firehose-service/.env.example
···
··· 1 + # Database 2 + DATABASE_URL=postgres://postgres:postgres@localhost:5432/wisp 3 + 4 + # Firehose 5 + FIREHOSE_SERVICE=wss://bsky.network 6 + FIREHOSE_MAX_CONCURRENCY=5 7 + 8 + # Redis (cache invalidation + revalidation queue) 9 + REDIS_URL=redis://localhost:6379 10 + 11 + # S3 Storage (leave empty for local disk fallback) 12 + S3_BUCKET= 13 + S3_METADATA_BUCKET= 14 + S3_REGION=auto 15 + S3_ENDPOINT= 16 + S3_PREFIX=sites/ 17 + S3_FORCE_PATH_STYLE=true 18 + 19 + # AWS Credentials (required if using S3) 20 + AWS_ACCESS_KEY_ID= 21 + AWS_SECRET_ACCESS_KEY= 22 + 23 + # Health check server 24 + HEALTH_PORT=3001 25 + 26 + # For local disk fallback (when S3_BUCKET is empty) 27 + CACHE_DIR=./cache/sites
+1
apps/firehose-service/package.json
··· 20 "@wispplace/database": "workspace:*", 21 "@wispplace/fs-utils": "workspace:*", 22 "@wispplace/lexicons": "workspace:*", 23 "@wispplace/safe-fetch": "workspace:*", 24 "@wispplace/tiered-storage": "workspace:*", 25 "hono": "^4.10.4",
··· 20 "@wispplace/database": "workspace:*", 21 "@wispplace/fs-utils": "workspace:*", 22 "@wispplace/lexicons": "workspace:*", 23 + "@wispplace/observability": "workspace:*", 24 "@wispplace/safe-fetch": "workspace:*", 25 "@wispplace/tiered-storage": "workspace:*", 26 "hono": "^4.10.4",
+61 -16
apps/firehose-service/src/index.ts
··· 14 import { storage } from './lib/storage'; 15 import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer'; 16 import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker'; 17 18 const app = new Hono(); 19 20 // Health endpoint 21 app.get('/health', async (c) => { ··· 37 if (isShuttingDown) return; 38 isShuttingDown = true; 39 40 - console.log(`\n[Service] Received ${signal}, shutting down...`); 41 42 stopFirehose(); 43 await stopRevalidateWorker(); 44 await closeDatabase(); 45 46 - console.log('[Service] Shutdown complete'); 47 process.exit(0); 48 } 49 ··· 54 * Backfill mode - process existing sites from database 55 */ 56 async function runBackfill(): Promise<void> { 57 - console.log('[Backfill] Starting backfill mode'); 58 const startTime = Date.now(); 59 const forceRewriteHtml = process.env.BACKFILL_FORCE_REWRITE_HTML === 'true'; 60 61 if (forceRewriteHtml) { 62 - console.log('[Backfill] Forcing HTML rewrite for all sites'); 63 } 64 65 let sites = await listAllSites(); 66 if (sites.length === 0) { 67 const cachedSites = await listAllSiteCaches(); 68 sites = cachedSites.map(site => ({ did: site.did, rkey: site.rkey })); 69 - console.log('[Backfill] Sites table empty; falling back to site_cache entries'); 70 } 71 72 - console.log(`[Backfill] Found ${sites.length} sites in database`); 73 74 let processed = 0; 75 let skipped = 0; ··· 81 const result = await fetchSiteRecord(site.did, site.rkey); 82 83 if (!result) { 84 - console.log(`[Backfill] Site not found on PDS: ${site.did}/${site.rkey}`); 85 skipped++; 86 continue; 87 } ··· 89 const existingCache = await getSiteCache(site.did, site.rkey); 90 // Check if CID matches (already up to date) 91 if (!forceRewriteHtml && existingCache && result.cid === existingCache.record_cid) { 92 - console.log(`[Backfill] Site already up to date: ${site.did}/${site.rkey}`); 93 skipped++; 94 continue; 95 } ··· 100 }); 101 processed++; 102 103 - console.log(`[Backfill] Progress: ${processed + skipped + failed}/${sites.length}`); 104 } catch (err) { 105 - console.error(`[Backfill] Failed to process ${site.did}/${site.rkey}:`, err); 106 failed++; 107 } 108 } ··· 113 const elapsedRemSec = elapsedSec % 60; 114 const elapsedLabel = elapsedMin > 0 ? `${elapsedMin}m ${elapsedRemSec}s` : `${elapsedSec}s`; 115 116 - console.log(`[Backfill] Complete: ${processed} processed, ${skipped} skipped, ${failed} failed (${elapsedLabel} elapsed)`); 117 } 118 119 // Main entry point 120 async function main() { 121 - console.log('[Service] Starting firehose-service'); 122 - console.log(`[Service] Mode: ${config.isBackfill ? 'backfill' : 'firehose'}`); 123 - console.log(`[Service] S3 Bucket: ${config.s3Bucket || '(disk fallback)'}`); 124 125 // Start health server 126 const server = serve({ ··· 128 port: config.healthPort, 129 }); 130 131 - console.log(`[Service] Health endpoint: http://localhost:${config.healthPort}/health`); 132 133 if (config.isBackfill) { 134 // Run backfill and exit ··· 143 } 144 145 main().catch((err) => { 146 - console.error('[Service] Fatal error:', err); 147 process.exit(1); 148 });
··· 14 import { storage } from './lib/storage'; 15 import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer'; 16 import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker'; 17 + import { closeCacheInvalidationPublisher } from './lib/cache-invalidation'; 18 + import { initializeGrafanaExporters, createLogger, logCollector, errorTracker, metricsCollector } from '@wispplace/observability'; 19 + import { observabilityMiddleware, observabilityErrorHandler } from '@wispplace/observability/middleware/hono'; 20 + 21 + // Initialize Grafana exporters if configured 22 + initializeGrafanaExporters({ 23 + serviceName: 'firehose-service', 24 + serviceVersion: '1.0.0' 25 + }); 26 + 27 + const logger = createLogger('firehose-service'); 28 29 const app = new Hono(); 30 + 31 + // Add observability middleware 32 + app.use('*', observabilityMiddleware('firehose-service')); 33 + 34 + // Error handler 35 + app.onError(observabilityErrorHandler('firehose-service')); 36 37 // Health endpoint 38 app.get('/health', async (c) => { ··· 54 if (isShuttingDown) return; 55 isShuttingDown = true; 56 57 + logger.info(`Received ${signal}, shutting down...`); 58 59 stopFirehose(); 60 await stopRevalidateWorker(); 61 + await closeCacheInvalidationPublisher(); 62 await closeDatabase(); 63 64 + logger.info('Shutdown complete'); 65 process.exit(0); 66 } 67 ··· 72 * Backfill mode - process existing sites from database 73 */ 74 async function runBackfill(): Promise<void> { 75 + logger.info('Starting backfill mode'); 76 const startTime = Date.now(); 77 const forceRewriteHtml = process.env.BACKFILL_FORCE_REWRITE_HTML === 'true'; 78 79 if (forceRewriteHtml) { 80 + logger.info('Forcing HTML rewrite for all sites'); 81 } 82 83 let sites = await listAllSites(); 84 if (sites.length === 0) { 85 const cachedSites = await listAllSiteCaches(); 86 sites = cachedSites.map(site => ({ did: site.did, rkey: site.rkey })); 87 + logger.info('Sites table empty; falling back to site_cache entries'); 88 } 89 90 + logger.info(`Found ${sites.length} sites in database`); 91 92 let processed = 0; 93 let skipped = 0; ··· 99 const result = await fetchSiteRecord(site.did, site.rkey); 100 101 if (!result) { 102 + logger.info(`Site not found on PDS: ${site.did}/${site.rkey}`); 103 skipped++; 104 continue; 105 } ··· 107 const existingCache = await getSiteCache(site.did, site.rkey); 108 // Check if CID matches (already up to date) 109 if (!forceRewriteHtml && existingCache && result.cid === existingCache.record_cid) { 110 + logger.info(`Site already up to date: ${site.did}/${site.rkey}`); 111 skipped++; 112 continue; 113 } ··· 118 }); 119 processed++; 120 121 + logger.info(`Progress: ${processed + skipped + failed}/${sites.length}`); 122 } catch (err) { 123 + logger.error(`Failed to process ${site.did}/${site.rkey}`, err); 124 failed++; 125 } 126 } ··· 131 const elapsedRemSec = elapsedSec % 60; 132 const elapsedLabel = elapsedMin > 0 ? `${elapsedMin}m ${elapsedRemSec}s` : `${elapsedSec}s`; 133 134 + logger.info(`Complete: ${processed} processed, ${skipped} skipped, ${failed} failed (${elapsedLabel} elapsed)`); 135 } 136 137 + // Internal observability endpoints (for admin panel) 138 + app.get('/__internal__/observability/logs', (c) => { 139 + const query = c.req.query(); 140 + const filter: any = {}; 141 + if (query.level) filter.level = query.level; 142 + if (query.service) filter.service = query.service; 143 + if (query.search) filter.search = query.search; 144 + if (query.eventType) filter.eventType = query.eventType; 145 + if (query.limit) filter.limit = parseInt(query.limit as string); 146 + return c.json({ logs: logCollector.getLogs(filter) }); 147 + }); 148 + 149 + app.get('/__internal__/observability/errors', (c) => { 150 + const query = c.req.query(); 151 + const filter: any = {}; 152 + if (query.service) filter.service = query.service; 153 + if (query.limit) filter.limit = parseInt(query.limit as string); 154 + return c.json({ errors: errorTracker.getErrors(filter) }); 155 + }); 156 + 157 + app.get('/__internal__/observability/metrics', (c) => { 158 + const query = c.req.query(); 159 + const timeWindow = query.timeWindow ? parseInt(query.timeWindow as string) : 3600000; 160 + const stats = metricsCollector.getStats('firehose-service', timeWindow); 161 + return c.json({ stats, timeWindow }); 162 + }); 163 + 164 // Main entry point 165 async function main() { 166 + logger.info('Starting firehose-service'); 167 + logger.info(`Mode: ${config.isBackfill ? 'backfill' : 'firehose'}`); 168 + logger.info(`S3 Bucket: ${config.s3Bucket || '(disk fallback)'}`); 169 170 // Start health server 171 const server = serve({ ··· 173 port: config.healthPort, 174 }); 175 176 + logger.info(`Health endpoint: http://localhost:${config.healthPort}/health`); 177 178 if (config.isBackfill) { 179 // Run backfill and exit ··· 188 } 189 190 main().catch((err) => { 191 + logger.error('Fatal error', err); 192 process.exit(1); 193 });
+70
apps/firehose-service/src/lib/cache-invalidation.ts
···
··· 1 + /** 2 + * Cache invalidation publisher 3 + * 4 + * Publishes invalidation messages to Redis pub/sub so the hosting-service 5 + * can clear its local caches (tiered storage, redirect rules) when a site 6 + * is updated or deleted via the firehose. 7 + */ 8 + 9 + import Redis from 'ioredis'; 10 + import { createLogger } from '@wispplace/observability'; 11 + import { config } from '../config'; 12 + 13 + const logger = createLogger('firehose-service'); 14 + const CHANNEL = 'wisp:cache-invalidate'; 15 + 16 + let publisher: Redis | null = null; 17 + let loggedMissingRedis = false; 18 + 19 + function getPublisher(): Redis | null { 20 + if (!config.redisUrl) { 21 + if (!loggedMissingRedis) { 22 + logger.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation publishing disabled'); 23 + loggedMissingRedis = true; 24 + } 25 + return null; 26 + } 27 + 28 + if (!publisher) { 29 + logger.info(`[CacheInvalidation] Connecting to Redis for publishing: ${config.redisUrl}`); 30 + publisher = new Redis(config.redisUrl, { 31 + maxRetriesPerRequest: 2, 32 + enableReadyCheck: true, 33 + }); 34 + 35 + publisher.on('error', (err) => { 36 + logger.error('[CacheInvalidation] Redis error', err); 37 + }); 38 + 39 + publisher.on('ready', () => { 40 + logger.info('[CacheInvalidation] Redis publisher connected'); 41 + }); 42 + } 43 + 44 + return publisher; 45 + } 46 + 47 + export async function publishCacheInvalidation( 48 + did: string, 49 + rkey: string, 50 + action: 'update' | 'delete' | 'settings' 51 + ): Promise<void> { 52 + const redis = getPublisher(); 53 + if (!redis) return; 54 + 55 + try { 56 + const message = JSON.stringify({ did, rkey, action }); 57 + logger.debug(`[CacheInvalidation] Publishing ${action} for ${did}/${rkey} to ${CHANNEL}`); 58 + await redis.publish(CHANNEL, message); 59 + } catch (err) { 60 + logger.error('[CacheInvalidation] Failed to publish', err); 61 + } 62 + } 63 + 64 + export async function closeCacheInvalidationPublisher(): Promise<void> { 65 + if (publisher) { 66 + const toClose = publisher; 67 + publisher = null; 68 + await toClose.quit(); 69 + } 70 + }
+51 -30
apps/firehose-service/src/lib/cache-writer.ts
··· 11 import { collectFileCidsFromEntries, countFilesInDirectory, normalizeFileCids } from '@wispplace/fs-utils'; 12 import { shouldCompressMimeType } from '@wispplace/atproto-utils/compression'; 13 import { MAX_BLOB_SIZE, MAX_FILE_COUNT, MAX_SITE_SIZE } from '@wispplace/constants'; 14 import { writeFile, deleteFile, listFiles } from './storage'; 15 import { getSiteCache, upsertSiteCache, deleteSiteCache, upsertSiteSettingsCache, deleteSiteSettingsCache } from './db'; 16 import { rewriteHtmlPaths, isHtmlFile } from './html-rewriter'; 17 import { gunzipSync } from 'zlib'; 18 19 /** 20 * Fetch a site record from the PDS ··· 23 try { 24 const pdsEndpoint = await getPdsForDid(did); 25 if (!pdsEndpoint) { 26 - console.error('[Cache] Failed to get PDS endpoint for DID', { did, rkey }); 27 return null; 28 } 29 ··· 37 } catch (err) { 38 const errorMsg = err instanceof Error ? err.message : String(err); 39 if (errorMsg.includes('HTTP 404') || errorMsg.includes('Not Found')) { 40 - console.log('[Cache] Site record not found', { did, rkey }); 41 } else { 42 - console.error('[Cache] Failed to fetch site record', { did, rkey, error: errorMsg }); 43 } 44 return null; 45 } ··· 56 try { 57 const endpoint = pdsEndpoint ?? await getPdsForDid(did); 58 if (!endpoint) { 59 - console.error('[Cache] Failed to get PDS endpoint for DID (settings)', { did, rkey }); 60 return null; 61 } 62 ··· 70 } catch (err) { 71 const errorMsg = err instanceof Error ? err.message : String(err); 72 if (errorMsg.includes('HTTP 404') || errorMsg.includes('Not Found')) { 73 - console.log('[Cache] Settings record not found', { did, rkey }); 74 } else { 75 - console.error('[Cache] Failed to fetch settings record', { did, rkey, error: errorMsg }); 76 } 77 return null; 78 } ··· 136 const MAX_DEPTH = 10; 137 138 if (depth >= MAX_DEPTH) { 139 - console.error('[Cache] Max subfs expansion depth reached'); 140 return directory; 141 } 142 ··· 146 // Fetch uncached subfs records 147 const uncachedUris = subfsUris.filter(({ uri }) => !subfsCache.has(uri)); 148 if (uncachedUris.length > 0) { 149 - console.log(`[Cache] Fetching ${uncachedUris.length} subfs records (depth ${depth})`); 150 const fetchedRecords = await Promise.all( 151 uncachedUris.map(async ({ uri }) => { 152 const record = await fetchSubfsRecord(uri, pdsEndpoint); ··· 341 ): Promise<void> { 342 const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`; 343 344 - console.log(`[Cache] Downloading ${file.path}`); 345 346 let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 }); 347 let encoding = file.encoding; ··· 355 // Heuristic fallback: some records omit base64 flag but content is base64 text 356 const decoded = tryDecodeBase64(content); 357 if (decoded) { 358 - console.warn(`[Cache] Decoded base64 fallback for ${file.path} (base64 flag missing)`); 359 content = decoded; 360 } 361 } ··· 369 content = gunzipSync(content); 370 encoding = undefined; 371 } catch (error) { 372 - console.error(`[Cache] Failed to decompress ${file.path}, storing gzipped`); 373 } 374 } else if (encoding === 'gzip' && content.length >= 2 && 375 !(content[0] === 0x1f && content[1] === 0x8b)) { 376 // If marked gzip but doesn't look gzipped, attempt base64 decode and retry 377 const decoded = tryDecodeBase64(content); 378 if (decoded && decoded.length >= 2 && decoded[0] === 0x1f && decoded[1] === 0x8b) { 379 - console.warn(`[Cache] Decoded base64+gzip fallback for ${file.path}`); 380 try { 381 content = gunzipSync(decoded); 382 encoding = undefined; 383 } catch (error) { 384 - console.error(`[Cache] Failed to decompress base64+gzip fallback for ${file.path}, storing gzipped`); 385 content = decoded; 386 } 387 } ··· 412 try { 413 rewriteSource = gunzipSync(content); 414 } catch (error) { 415 - console.error(`[Cache] Failed to decompress ${file.path} for rewrite, using raw content`); 416 } 417 } 418 ··· 422 423 const rewrittenKey = `${did}/${rkey}/.rewritten/${file.path}`; 424 await writeFile(rewrittenKey, rewrittenContent, { mimeType: 'text/html' }); 425 - console.log(`[Cache] Wrote rewritten HTML: ${rewrittenKey}`); 426 } 427 428 - console.log(`[Cache] Stored ${file.path} (${content.length} bytes)`); 429 } 430 431 /** ··· 438 recordCid: string, 439 options?: { 440 forceRewriteHtml?: boolean; 441 } 442 ): Promise<void> { 443 const forceRewriteHtml = options?.forceRewriteHtml === true; 444 - console.log(`[Cache] Processing site ${did}/${rkey}, record CID: ${recordCid}`, { 445 forceRewriteHtml, 446 }); 447 448 if (!record.root?.entries) { 449 - console.error('[Cache] Invalid record structure'); 450 return; 451 } 452 453 // Get PDS endpoint 454 const pdsEndpoint = await getPdsForDid(did); 455 if (!pdsEndpoint) { 456 - console.error('[Cache] Could not resolve PDS for', did); 457 return; 458 } 459 ··· 463 // Validate limits 464 const fileCount = countFilesInDirectory(expandedRoot); 465 if (fileCount > MAX_FILE_COUNT) { 466 - console.error(`[Cache] Site exceeds file limit: ${fileCount} > ${MAX_FILE_COUNT}`); 467 return; 468 } 469 470 const totalSize = calculateTotalBlobSize(expandedRoot); 471 if (totalSize > MAX_SITE_SIZE) { 472 - console.error(`[Cache] Site exceeds size limit: ${totalSize} > ${MAX_SITE_SIZE}`); 473 return; 474 } 475 ··· 483 const normalizedFileCids = normalizeFileCids(rawFileCids); 484 const oldFileCids = normalizedFileCids.value; 485 if (normalizedFileCids.source === 'string-invalid' || normalizedFileCids.source === 'other') { 486 - console.warn('[Cache] Existing file_cids had unexpected shape; treating as empty', { 487 did, 488 rkey, 489 type: Array.isArray(rawFileCids) ? 'array' : typeof rawFileCids, ··· 510 } 511 } 512 513 - console.log(`[Cache] Files unchanged: ${newFiles.length - filesToDownload.length}, to download: ${filesToDownload.length}, to delete: ${pathsToDelete.length}`); 514 515 // Download new/changed files (with concurrency limit) 516 const DOWNLOAD_CONCURRENCY = 20; ··· 539 } 540 541 // Update DB with new CIDs 542 - console.log(`[Cache] About to upsert site cache for ${did}/${rkey}`); 543 await upsertSiteCache(did, rkey, recordCid, newFileCids); 544 - console.log(`[Cache] Updated site cache for ${did}/${rkey} with record CID ${recordCid}`); 545 546 // Backfill settings if a record exists for this rkey 547 const settingsRecord = await fetchSettingsRecord(did, rkey, pdsEndpoint); ··· 549 await handleSettingsUpdate(did, rkey, settingsRecord.record, settingsRecord.cid); 550 } 551 552 - console.log(`[Cache] Successfully cached site ${did}/${rkey}`); 553 } 554 555 /** 556 * Handle a site delete event 557 */ 558 export async function handleSiteDelete(did: string, rkey: string): Promise<void> { 559 - console.log(`[Cache] Deleting site ${did}/${rkey}`); 560 561 // List all files for this site and delete them 562 const prefix = `${did}/${rkey}/`; ··· 569 // Delete from DB 570 await deleteSiteCache(did, rkey); 571 572 - console.log(`[Cache] Deleted site ${did}/${rkey} (${keys.length} files)`); 573 } 574 575 /** 576 * Handle settings create/update event 577 */ 578 export async function handleSettingsUpdate(did: string, rkey: string, settings: WispSettings, recordCid: string): Promise<void> { 579 - console.log(`[Cache] Updating settings for ${did}/${rkey}`); 580 581 await upsertSiteSettingsCache(did, rkey, recordCid, { 582 directoryListing: settings.directoryListing, ··· 586 cleanUrls: settings.cleanUrls, 587 headers: settings.headers, 588 }); 589 } 590 591 /** 592 * Handle settings delete event 593 */ 594 export async function handleSettingsDelete(did: string, rkey: string): Promise<void> { 595 - console.log(`[Cache] Deleting settings for ${did}/${rkey}`); 596 await deleteSiteSettingsCache(did, rkey); 597 }
··· 11 import { collectFileCidsFromEntries, countFilesInDirectory, normalizeFileCids } from '@wispplace/fs-utils'; 12 import { shouldCompressMimeType } from '@wispplace/atproto-utils/compression'; 13 import { MAX_BLOB_SIZE, MAX_FILE_COUNT, MAX_SITE_SIZE } from '@wispplace/constants'; 14 + import { createLogger } from '@wispplace/observability'; 15 import { writeFile, deleteFile, listFiles } from './storage'; 16 import { getSiteCache, upsertSiteCache, deleteSiteCache, upsertSiteSettingsCache, deleteSiteSettingsCache } from './db'; 17 import { rewriteHtmlPaths, isHtmlFile } from './html-rewriter'; 18 import { gunzipSync } from 'zlib'; 19 + import { publishCacheInvalidation } from './cache-invalidation'; 20 + 21 + const logger = createLogger('firehose-service'); 22 23 /** 24 * Fetch a site record from the PDS ··· 27 try { 28 const pdsEndpoint = await getPdsForDid(did); 29 if (!pdsEndpoint) { 30 + logger.error('Failed to get PDS endpoint for DID', undefined, { did, rkey }); 31 return null; 32 } 33 ··· 41 } catch (err) { 42 const errorMsg = err instanceof Error ? err.message : String(err); 43 if (errorMsg.includes('HTTP 404') || errorMsg.includes('Not Found')) { 44 + logger.info('Site record not found', { did, rkey }); 45 } else { 46 + logger.error('Failed to fetch site record', err, { did, rkey }); 47 } 48 return null; 49 } ··· 60 try { 61 const endpoint = pdsEndpoint ?? await getPdsForDid(did); 62 if (!endpoint) { 63 + logger.error('Failed to get PDS endpoint for DID (settings)', undefined, { did, rkey }); 64 return null; 65 } 66 ··· 74 } catch (err) { 75 const errorMsg = err instanceof Error ? err.message : String(err); 76 if (errorMsg.includes('HTTP 404') || errorMsg.includes('Not Found')) { 77 + logger.info('Settings record not found', { did, rkey }); 78 } else { 79 + logger.error('Failed to fetch settings record', err, { did, rkey }); 80 } 81 return null; 82 } ··· 140 const MAX_DEPTH = 10; 141 142 if (depth >= MAX_DEPTH) { 143 + logger.error('Max subfs expansion depth reached'); 144 return directory; 145 } 146 ··· 150 // Fetch uncached subfs records 151 const uncachedUris = subfsUris.filter(({ uri }) => !subfsCache.has(uri)); 152 if (uncachedUris.length > 0) { 153 + logger.info(`Fetching ${uncachedUris.length} subfs records`, { depth }); 154 const fetchedRecords = await Promise.all( 155 uncachedUris.map(async ({ uri }) => { 156 const record = await fetchSubfsRecord(uri, pdsEndpoint); ··· 345 ): Promise<void> { 346 const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`; 347 348 + logger.debug(`Downloading ${file.path}`); 349 350 let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 }); 351 let encoding = file.encoding; ··· 359 // Heuristic fallback: some records omit base64 flag but content is base64 text 360 const decoded = tryDecodeBase64(content); 361 if (decoded) { 362 + logger.warn(`Decoded base64 fallback for ${file.path} (base64 flag missing)`); 363 content = decoded; 364 } 365 } ··· 373 content = gunzipSync(content); 374 encoding = undefined; 375 } catch (error) { 376 + logger.error(`Failed to decompress ${file.path}, storing gzipped`, error); 377 } 378 } else if (encoding === 'gzip' && content.length >= 2 && 379 !(content[0] === 0x1f && content[1] === 0x8b)) { 380 // If marked gzip but doesn't look gzipped, attempt base64 decode and retry 381 const decoded = tryDecodeBase64(content); 382 if (decoded && decoded.length >= 2 && decoded[0] === 0x1f && decoded[1] === 0x8b) { 383 + logger.warn(`Decoded base64+gzip fallback for ${file.path}`); 384 try { 385 content = gunzipSync(decoded); 386 encoding = undefined; 387 } catch (error) { 388 + logger.error(`Failed to decompress base64+gzip fallback for ${file.path}, storing gzipped`, error); 389 content = decoded; 390 } 391 } ··· 416 try { 417 rewriteSource = gunzipSync(content); 418 } catch (error) { 419 + logger.error(`Failed to decompress ${file.path} for rewrite, using raw content`, error); 420 } 421 } 422 ··· 426 427 const rewrittenKey = `${did}/${rkey}/.rewritten/${file.path}`; 428 await writeFile(rewrittenKey, rewrittenContent, { mimeType: 'text/html' }); 429 + logger.debug(`Wrote rewritten HTML: ${rewrittenKey}`); 430 } 431 432 + logger.debug(`Stored ${file.path} (${content.length} bytes)`); 433 } 434 435 /** ··· 442 recordCid: string, 443 options?: { 444 forceRewriteHtml?: boolean; 445 + skipInvalidation?: boolean; 446 } 447 ): Promise<void> { 448 const forceRewriteHtml = options?.forceRewriteHtml === true; 449 + logger.info(`Processing site ${did}/${rkey}`, { 450 + recordCid, 451 forceRewriteHtml, 452 }); 453 454 if (!record.root?.entries) { 455 + logger.error('Invalid record structure'); 456 return; 457 } 458 459 // Get PDS endpoint 460 const pdsEndpoint = await getPdsForDid(did); 461 if (!pdsEndpoint) { 462 + logger.error('Could not resolve PDS', undefined, { did }); 463 return; 464 } 465 ··· 469 // Validate limits 470 const fileCount = countFilesInDirectory(expandedRoot); 471 if (fileCount > MAX_FILE_COUNT) { 472 + logger.error(`Site exceeds file limit: ${fileCount} > ${MAX_FILE_COUNT}`); 473 return; 474 } 475 476 const totalSize = calculateTotalBlobSize(expandedRoot); 477 if (totalSize > MAX_SITE_SIZE) { 478 + logger.error(`Site exceeds size limit: ${totalSize} > ${MAX_SITE_SIZE}`); 479 return; 480 } 481 ··· 489 const normalizedFileCids = normalizeFileCids(rawFileCids); 490 const oldFileCids = normalizedFileCids.value; 491 if (normalizedFileCids.source === 'string-invalid' || normalizedFileCids.source === 'other') { 492 + logger.warn('Existing file_cids had unexpected shape; treating as empty', { 493 did, 494 rkey, 495 type: Array.isArray(rawFileCids) ? 'array' : typeof rawFileCids, ··· 516 } 517 } 518 519 + logger.info(`Files unchanged: ${newFiles.length - filesToDownload.length}, to download: ${filesToDownload.length}, to delete: ${pathsToDelete.length}`); 520 521 // Download new/changed files (with concurrency limit) 522 const DOWNLOAD_CONCURRENCY = 20; ··· 545 } 546 547 // Update DB with new CIDs 548 + logger.debug(`About to upsert site cache for ${did}/${rkey}`); 549 await upsertSiteCache(did, rkey, recordCid, newFileCids); 550 + logger.debug(`Updated site cache for ${did}/${rkey} with record CID ${recordCid}`); 551 552 // Backfill settings if a record exists for this rkey 553 const settingsRecord = await fetchSettingsRecord(did, rkey, pdsEndpoint); ··· 555 await handleSettingsUpdate(did, rkey, settingsRecord.record, settingsRecord.cid); 556 } 557 558 + // Notify hosting-service to invalidate its local caches 559 + // (skip for revalidate/backfill since hosting-service already has the files locally) 560 + if (!options?.skipInvalidation) { 561 + await publishCacheInvalidation(did, rkey, 'update'); 562 + } 563 + 564 + logger.info(`Successfully cached site ${did}/${rkey}`); 565 } 566 567 /** 568 * Handle a site delete event 569 */ 570 export async function handleSiteDelete(did: string, rkey: string): Promise<void> { 571 + logger.info(`Deleting site ${did}/${rkey}`); 572 573 // List all files for this site and delete them 574 const prefix = `${did}/${rkey}/`; ··· 581 // Delete from DB 582 await deleteSiteCache(did, rkey); 583 584 + // Notify hosting-service to invalidate its local caches 585 + await publishCacheInvalidation(did, rkey, 'delete'); 586 + 587 + logger.info(`Deleted site ${did}/${rkey} (${keys.length} files)`); 588 } 589 590 /** 591 * Handle settings create/update event 592 */ 593 export async function handleSettingsUpdate(did: string, rkey: string, settings: WispSettings, recordCid: string): Promise<void> { 594 + logger.info(`Updating settings for ${did}/${rkey}`); 595 596 await upsertSiteSettingsCache(did, rkey, recordCid, { 597 directoryListing: settings.directoryListing, ··· 601 cleanUrls: settings.cleanUrls, 602 headers: settings.headers, 603 }); 604 + 605 + // Notify hosting-service to invalidate its local caches (redirect rules depend on settings) 606 + await publishCacheInvalidation(did, rkey, 'settings'); 607 } 608 609 /** 610 * Handle settings delete event 611 */ 612 export async function handleSettingsDelete(did: string, rkey: string): Promise<void> { 613 + logger.info(`Deleting settings for ${did}/${rkey}`); 614 await deleteSiteSettingsCache(did, rkey); 615 + 616 + // Notify hosting-service to invalidate its local caches 617 + await publishCacheInvalidation(did, rkey, 'settings'); 618 }
+20 -19
apps/firehose-service/src/lib/db.ts
··· 1 import postgres from 'postgres'; 2 import type { SiteCache, SiteRecord, SiteSettingsCache } from '@wispplace/database'; 3 import { config } from '../config'; 4 5 const sql = postgres(config.databaseUrl, { 6 max: 10, ··· 54 recordCid: string, 55 fileCids: Record<string, string> 56 ): Promise<void> { 57 - const fileCidsJson = fileCids ?? {}; 58 - console.log(`[DB] upsertSiteCache starting for ${did}/${rkey}`); 59 try { 60 await sql` 61 INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at) 62 - VALUES (${did}, ${rkey}, ${recordCid}, ${fileCidsJson}::jsonb, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW())) 63 ON CONFLICT (did, rkey) 64 DO UPDATE SET 65 record_cid = EXCLUDED.record_cid, 66 file_cids = EXCLUDED.file_cids, 67 updated_at = EXTRACT(EPOCH FROM NOW()) 68 `; 69 - console.log(`[DB] upsertSiteCache completed for ${did}/${rkey}`); 70 } catch (err) { 71 - const error = err instanceof Error ? err : new Error(String(err)); 72 - console.error('[DB] upsertSiteCache error:', { did, rkey, error: error.message, stack: error.stack }); 73 - throw error; 74 } 75 } 76 ··· 94 const directoryListing = settings.directoryListing ?? false; 95 const spaMode = settings.spaMode ?? null; 96 const custom404 = settings.custom404 ?? null; 97 - const indexFilesJson = settings.indexFiles ?? []; 98 const cleanUrls = settings.cleanUrls ?? true; 99 - const headersJson = settings.headers ?? []; 100 101 - console.log(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, { 102 directoryListing, 103 spaMode, 104 custom404, 105 - indexFiles: indexFilesJson, 106 cleanUrls, 107 - headers: headersJson, 108 }); 109 110 try { ··· 117 ${directoryListing}, 118 ${spaMode}, 119 ${custom404}, 120 - ${indexFilesJson}::jsonb, 121 ${cleanUrls}, 122 - ${headersJson}::jsonb, 123 EXTRACT(EPOCH FROM NOW()), 124 EXTRACT(EPOCH FROM NOW()) 125 ) ··· 134 headers = EXCLUDED.headers, 135 updated_at = EXTRACT(EPOCH FROM NOW()) 136 `; 137 - console.log(`[DB] upsertSiteSettingsCache completed for ${did}/${rkey}`); 138 } catch (err) { 139 - const error = err instanceof Error ? err : new Error(String(err)); 140 - console.error('[DB] upsertSiteSettingsCache error:', { did, rkey, error: error.message, stack: error.stack }); 141 - throw error; 142 } 143 } 144 ··· 148 149 export async function closeDatabase(): Promise<void> { 150 await sql.end({ timeout: 5 }); 151 - console.log('[DB] Database connections closed'); 152 } 153 154 export { sql };
··· 1 import postgres from 'postgres'; 2 import type { SiteCache, SiteRecord, SiteSettingsCache } from '@wispplace/database'; 3 + import { createLogger } from '@wispplace/observability'; 4 import { config } from '../config'; 5 + 6 + const logger = createLogger('firehose-service'); 7 8 const sql = postgres(config.databaseUrl, { 9 max: 10, ··· 57 recordCid: string, 58 fileCids: Record<string, string> 59 ): Promise<void> { 60 + logger.debug(`[DB] upsertSiteCache starting for ${did}/${rkey}`); 61 try { 62 await sql` 63 INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at) 64 + VALUES (${did}, ${rkey}, ${recordCid}, ${sql.json(fileCids ?? {})}, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW())) 65 ON CONFLICT (did, rkey) 66 DO UPDATE SET 67 record_cid = EXCLUDED.record_cid, 68 file_cids = EXCLUDED.file_cids, 69 updated_at = EXTRACT(EPOCH FROM NOW()) 70 `; 71 + logger.debug(`[DB] upsertSiteCache completed for ${did}/${rkey}`); 72 } catch (err) { 73 + logger.error('[DB] upsertSiteCache error', err, { did, rkey }); 74 + throw err; 75 } 76 } 77 ··· 95 const directoryListing = settings.directoryListing ?? false; 96 const spaMode = settings.spaMode ?? null; 97 const custom404 = settings.custom404 ?? null; 98 const cleanUrls = settings.cleanUrls ?? true; 99 + 100 + const indexFiles = settings.indexFiles ?? []; 101 + const headers = settings.headers ?? []; 102 103 + logger.debug(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, { 104 directoryListing, 105 spaMode, 106 custom404, 107 + indexFiles, 108 cleanUrls, 109 + headers, 110 }); 111 112 try { ··· 119 ${directoryListing}, 120 ${spaMode}, 121 ${custom404}, 122 + ${sql.json(indexFiles)}, 123 ${cleanUrls}, 124 + ${sql.json(headers)}, 125 EXTRACT(EPOCH FROM NOW()), 126 EXTRACT(EPOCH FROM NOW()) 127 ) ··· 136 headers = EXCLUDED.headers, 137 updated_at = EXTRACT(EPOCH FROM NOW()) 138 `; 139 + logger.debug(`[DB] upsertSiteSettingsCache completed for ${did}/${rkey}`); 140 } catch (err) { 141 + logger.error('[DB] upsertSiteSettingsCache error', err, { did, rkey }); 142 + throw err; 143 } 144 } 145 ··· 149 150 export async function closeDatabase(): Promise<void> { 151 await sql.end({ timeout: 5 }); 152 + logger.info('[DB] Database connections closed'); 153 } 154 155 export { sql };
+19 -23
apps/firehose-service/src/lib/firehose.ts
··· 8 import { isBun, BunFirehose, type Event, type CommitEvt } from '@wispplace/bun-firehose'; 9 import type { Record as WispFsRecord } from '@wispplace/lexicons/types/place/wisp/fs'; 10 import type { Record as WispSettings } from '@wispplace/lexicons/types/place/wisp/settings'; 11 import { config } from '../config'; 12 import { 13 handleSiteCreateOrUpdate, ··· 18 } from './cache-writer'; 19 20 const idResolver = new IdResolver(); 21 22 // Track firehose health 23 let lastEventTime = Date.now(); ··· 70 const commitEvt = evt as CommitEvt; 71 const { did, collection, rkey, record, cid } = commitEvt; 72 73 - console.log(`[Firehose] Debug: Event ${evt.event} for ${collection}:${did}/${rkey}, CID: ${cid?.toString() || 'unknown'}`); 74 75 // Handle place.wisp.fs events 76 if (collection === 'place.wisp.fs') { 77 - console.log(`[Firehose] Received ${commitEvt.event} event for ${did}/${rkey}, CID: ${cid?.toString() || 'unknown'}`); 78 processWithConcurrencyLimit(async () => { 79 try { 80 - console.log(`[Firehose] Inside handler for ${commitEvt.event} event for ${did}/${rkey}`); 81 if (commitEvt.event === 'delete') { 82 await handleSiteDelete(did, rkey); 83 } else { ··· 87 if (verified) { 88 await handleSiteCreateOrUpdate(did, rkey, verified.record, verified.cid); 89 } else { 90 - console.log(`[Firehose] Skipping ${commitEvt.event} event for ${did}/${rkey} - verification failed`); 91 } 92 } 93 - console.log(`[Firehose] Completed handler for ${commitEvt.event} event for ${did}/${rkey}`); 94 } catch (err) { 95 - const error = err instanceof Error ? err : new Error(String(err)); 96 - console.error('[Firehose] Error handling place.wisp.fs event:', { did, rkey, event: commitEvt.event, error: error.message, stack: error.stack }); 97 } 98 }).catch(err => { 99 - const error = err instanceof Error ? err : new Error(String(err)); 100 - console.error('[Firehose] Error processing place.wisp.fs event:', { did, rkey, event: commitEvt.event, error: error.message, stack: error.stack }); 101 }); 102 } 103 ··· 112 await handleSettingsUpdate(did, rkey, record as WispSettings, cidStr); 113 } 114 } catch (err) { 115 - const error = err instanceof Error ? err : new Error(String(err)); 116 - console.error('[Firehose] Error handling place.wisp.settings event:', { did, rkey, event: commitEvt.event, error: error.message, stack: error.stack }); 117 } 118 }).catch(err => { 119 - const error = err instanceof Error ? err : new Error(String(err)); 120 - console.error('[Firehose] Error processing place.wisp.settings event:', { did, rkey, event: commitEvt.event, error: error.message, stack: error.stack }); 121 }); 122 } 123 } catch (err) { 124 - const error = err instanceof Error ? err : new Error(String(err)); 125 - console.error('[Firehose] Unexpected error in handleEvent:', { error: error.message, stack: error.stack }); 126 } 127 } 128 129 function handleError(err: Error): void { 130 - console.error('[Firehose] Error:', err); 131 - console.error('[Firehose] Stack:', err.stack); 132 } 133 134 let firehoseHandle: { destroy: () => void } | null = null; ··· 137 * Start the firehose worker 138 */ 139 export function startFirehose(): void { 140 - console.log(`[Firehose] Starting (runtime: ${isBun ? 'Bun' : 'Node.js'})`); 141 - console.log(`[Firehose] Service: ${config.firehoseService}`); 142 - console.log(`[Firehose] Max concurrency: ${config.firehoseMaxConcurrency}`); 143 144 isConnected = true; 145 ··· 169 170 // Log cache info hourly 171 setInterval(() => { 172 - console.log('[Firehose] Hourly status check'); 173 }, 60 * 60 * 1000); 174 175 // Log status periodically 176 setInterval(() => { 177 const health = getFirehoseHealth(); 178 if (health.timeSinceLastEvent > 30000) { 179 - console.log(`[Firehose] No events for ${Math.round(health.timeSinceLastEvent / 1000)}s`); 180 } 181 }, 30000); 182 } ··· 185 * Stop the firehose worker 186 */ 187 export function stopFirehose(): void { 188 - console.log('[Firehose] Stopping'); 189 isConnected = false; 190 firehoseHandle?.destroy(); 191 firehoseHandle = null;
··· 8 import { isBun, BunFirehose, type Event, type CommitEvt } from '@wispplace/bun-firehose'; 9 import type { Record as WispFsRecord } from '@wispplace/lexicons/types/place/wisp/fs'; 10 import type { Record as WispSettings } from '@wispplace/lexicons/types/place/wisp/settings'; 11 + import { createLogger } from '@wispplace/observability'; 12 import { config } from '../config'; 13 import { 14 handleSiteCreateOrUpdate, ··· 19 } from './cache-writer'; 20 21 const idResolver = new IdResolver(); 22 + const logger = createLogger('firehose-service'); 23 24 // Track firehose health 25 let lastEventTime = Date.now(); ··· 72 const commitEvt = evt as CommitEvt; 73 const { did, collection, rkey, record, cid } = commitEvt; 74 75 + logger.debug(`Event ${evt.event} for ${collection}:${did}/${rkey}`, { cid: cid?.toString() || 'unknown' }); 76 77 // Handle place.wisp.fs events 78 if (collection === 'place.wisp.fs') { 79 + logger.info(`[place.wisp.fs] Received ${commitEvt.event} event`, { did, rkey, cid: cid?.toString() || 'unknown' }); 80 processWithConcurrencyLimit(async () => { 81 try { 82 + logger.debug(`[place.wisp.fs] Processing ${commitEvt.event} event`, { did, rkey }); 83 if (commitEvt.event === 'delete') { 84 await handleSiteDelete(did, rkey); 85 } else { ··· 89 if (verified) { 90 await handleSiteCreateOrUpdate(did, rkey, verified.record, verified.cid); 91 } else { 92 + logger.warn(`[place.wisp.fs] Skipping ${commitEvt.event} event - verification failed`, { did, rkey }); 93 } 94 } 95 + logger.debug(`[place.wisp.fs] Completed ${commitEvt.event} event`, { did, rkey }); 96 } catch (err) { 97 + logger.error(`[place.wisp.fs] Error handling event`, err, { did, rkey, event: commitEvt.event }); 98 } 99 }).catch(err => { 100 + logger.error(`[place.wisp.fs] Error processing event`, err, { did, rkey, event: commitEvt.event }); 101 }); 102 } 103 ··· 112 await handleSettingsUpdate(did, rkey, record as WispSettings, cidStr); 113 } 114 } catch (err) { 115 + logger.error(`[place.wisp.settings] Error handling event`, err, { did, rkey, event: commitEvt.event }); 116 } 117 }).catch(err => { 118 + logger.error(`[place.wisp.settings] Error processing event`, err, { did, rkey, event: commitEvt.event }); 119 }); 120 } 121 } catch (err) { 122 + logger.error('Unexpected error in handleEvent', err); 123 } 124 } 125 126 function handleError(err: Error): void { 127 + logger.error('Firehose connection error', err); 128 } 129 130 let firehoseHandle: { destroy: () => void } | null = null; ··· 133 * Start the firehose worker 134 */ 135 export function startFirehose(): void { 136 + logger.info(`Starting firehose (runtime: ${isBun ? 'Bun' : 'Node.js'})`); 137 + logger.info(`Service: ${config.firehoseService}`); 138 + logger.info(`Max concurrency: ${config.firehoseMaxConcurrency}`); 139 140 isConnected = true; 141 ··· 165 166 // Log cache info hourly 167 setInterval(() => { 168 + logger.info('Hourly status check'); 169 }, 60 * 60 * 1000); 170 171 // Log status periodically 172 setInterval(() => { 173 const health = getFirehoseHealth(); 174 if (health.timeSinceLastEvent > 30000) { 175 + logger.warn(`No events for ${Math.round(health.timeSinceLastEvent / 1000)}s`); 176 } 177 }, 30000); 178 } ··· 181 * Stop the firehose worker 182 */ 183 export function stopFirehose(): void { 184 + logger.info('Stopping firehose'); 185 isConnected = false; 186 firehoseHandle?.destroy(); 187 firehoseHandle = null;
+22 -14
apps/firehose-service/src/lib/revalidate-worker.ts
··· 1 import Redis from 'ioredis'; 2 import os from 'os'; 3 import { config } from '../config'; 4 import { fetchSiteRecord, handleSiteCreateOrUpdate } from './cache-writer'; 5 6 const consumerName = process.env.WISP_REVALIDATE_CONSUMER || `${os.hostname()}:${process.pid}`; 7 const batchSize = Number.parseInt(process.env.WISP_REVALIDATE_BATCH_SIZE || '10', 10); 8 const claimIdleMs = Number.parseInt(process.env.WISP_REVALIDATE_CLAIM_IDLE_MS || '60000', 10); ··· 33 const reason = fields.reason || 'storage-miss'; 34 35 if (!did || !rkey) { 36 - console.warn('[Revalidate] Missing did/rkey in message', { id, fields }); 37 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 38 return; 39 } 40 41 - console.log('[Revalidate] Processing', { did, rkey, reason, id }); 42 43 const record = await fetchSiteRecord(did, rkey); 44 if (!record) { 45 - console.warn('[Revalidate] Site record not found', { did, rkey }); 46 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 47 return; 48 } 49 50 - await handleSiteCreateOrUpdate(did, rkey, record.record, record.cid); 51 52 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 53 } 54 ··· 57 try { 58 await processMessage(id, rawFields); 59 } catch (err) { 60 - const error = err instanceof Error ? err : new Error(String(err)); 61 - console.error('[Revalidate] Failed to process message', { id, error: error.message, stack: error.stack }); 62 } 63 } 64 } ··· 114 'GROUP', 115 config.revalidateGroup, 116 consumerName, 117 - 'BLOCK', 118 - blockMs, 119 'COUNT', 120 batchSize, 121 'STREAMS', 122 config.revalidateStream, 123 '>' 124 - ); 125 126 if (!response) return; 127 128 for (const [, messages] of response) { 129 - await processMessages(messages as Array<[string, string[]]>); 130 } 131 } 132 ··· 140 await claimStaleMessages(); 141 await readNewMessages(); 142 } catch (err) { 143 - const error = err instanceof Error ? err : new Error(String(err)); 144 - console.error('[Revalidate] Loop error', { error: error.message, stack: error.stack }); 145 await new Promise((resolve) => setTimeout(resolve, 1000)); 146 } 147 } ··· 149 150 export async function startRevalidateWorker(): Promise<void> { 151 if (!config.redisUrl) { 152 - console.warn('[Revalidate] REDIS_URL not set; revalidate worker disabled'); 153 return; 154 } 155 156 if (running) return; 157 158 redis = new Redis(config.redisUrl, { 159 maxRetriesPerRequest: 2, 160 enableReadyCheck: true, 161 }); 162 163 redis.on('error', (err) => { 164 - console.error('[Revalidate] Redis error:', err); 165 }); 166 167 running = true;
··· 1 import Redis from 'ioredis'; 2 import os from 'os'; 3 + import { createLogger } from '@wispplace/observability'; 4 import { config } from '../config'; 5 import { fetchSiteRecord, handleSiteCreateOrUpdate } from './cache-writer'; 6 7 + const logger = createLogger('firehose-service'); 8 const consumerName = process.env.WISP_REVALIDATE_CONSUMER || `${os.hostname()}:${process.pid}`; 9 const batchSize = Number.parseInt(process.env.WISP_REVALIDATE_BATCH_SIZE || '10', 10); 10 const claimIdleMs = Number.parseInt(process.env.WISP_REVALIDATE_CLAIM_IDLE_MS || '60000', 10); ··· 35 const reason = fields.reason || 'storage-miss'; 36 37 if (!did || !rkey) { 38 + logger.warn('[Revalidate] Missing did/rkey in message', { id, fields }); 39 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 40 return; 41 } 42 43 + logger.info(`[Revalidate] Received message ${id}: ${did}/${rkey} (${reason})`); 44 45 const record = await fetchSiteRecord(did, rkey); 46 if (!record) { 47 + logger.warn(`[Revalidate] Site record not found on PDS: ${did}/${rkey}`); 48 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 49 return; 50 } 51 52 + await handleSiteCreateOrUpdate(did, rkey, record.record, record.cid, { 53 + skipInvalidation: true, 54 + }); 55 56 + logger.info(`[Revalidate] Completed ${id}: ${did}/${rkey}`); 57 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 58 } 59 ··· 62 try { 63 await processMessage(id, rawFields); 64 } catch (err) { 65 + logger.error('[Revalidate] Failed to process message', err, { id }); 66 } 67 } 68 } ··· 118 'GROUP', 119 config.revalidateGroup, 120 consumerName, 121 'COUNT', 122 batchSize, 123 + 'BLOCK', 124 + blockMs, 125 'STREAMS', 126 config.revalidateStream, 127 '>' 128 + ) as [string, Array<[string, string[]]>][] | null; 129 130 if (!response) return; 131 132 for (const [, messages] of response) { 133 + await processMessages(messages); 134 } 135 } 136 ··· 144 await claimStaleMessages(); 145 await readNewMessages(); 146 } catch (err) { 147 + logger.error('[Revalidate] Loop error', err); 148 await new Promise((resolve) => setTimeout(resolve, 1000)); 149 } 150 } ··· 152 153 export async function startRevalidateWorker(): Promise<void> { 154 if (!config.redisUrl) { 155 + logger.warn('[Revalidate] REDIS_URL not set; revalidate worker disabled'); 156 return; 157 } 158 159 if (running) return; 160 161 + logger.info(`[Revalidate] Connecting to Redis: ${config.redisUrl}`); 162 redis = new Redis(config.redisUrl, { 163 maxRetriesPerRequest: 2, 164 enableReadyCheck: true, 165 }); 166 167 redis.on('error', (err) => { 168 + logger.error('[Revalidate] Redis error', err); 169 + }); 170 + 171 + redis.on('ready', () => { 172 + logger.info(`[Revalidate] Redis connected, stream: ${config.revalidateStream}, group: ${config.revalidateGroup}`); 173 }); 174 175 running = true;
+5 -2
apps/firehose-service/src/lib/storage.ts
··· 8 S3StorageTier, 9 DiskStorageTier, 10 } from '@wispplace/tiered-storage'; 11 import { config } from '../config'; 12 13 // Create S3 tier (or fallback to disk for local dev) 14 let coldTier: S3StorageTier | DiskStorageTier; ··· 28 forcePathStyle: config.s3ForcePathStyle, 29 metadataBucket: config.s3MetadataBucket, 30 }); 31 - console.log('[Storage] Using S3 cold tier:', config.s3Bucket); 32 } else { 33 // Fallback to disk for local development 34 const cacheDir = process.env.CACHE_DIR || './cache/sites'; ··· 38 evictionPolicy: 'lru', 39 encodeColons: false, 40 }); 41 - console.log('[Storage] Using disk fallback (no S3_BUCKET configured):', cacheDir); 42 } 43 44 // Identity serializers for raw binary data (no JSON transformation)
··· 8 S3StorageTier, 9 DiskStorageTier, 10 } from '@wispplace/tiered-storage'; 11 + import { createLogger } from '@wispplace/observability'; 12 import { config } from '../config'; 13 + 14 + const logger = createLogger('firehose-service'); 15 16 // Create S3 tier (or fallback to disk for local dev) 17 let coldTier: S3StorageTier | DiskStorageTier; ··· 31 forcePathStyle: config.s3ForcePathStyle, 32 metadataBucket: config.s3MetadataBucket, 33 }); 34 + logger.info('[Storage] Using S3 cold tier:', { bucket: config.s3Bucket }); 35 } else { 36 // Fallback to disk for local development 37 const cacheDir = process.env.CACHE_DIR || './cache/sites'; ··· 41 evictionPolicy: 'lru', 42 encodeColons: false, 43 }); 44 + logger.info('[Storage] Using disk fallback (no S3_BUCKET configured):', { cacheDir }); 45 } 46 47 // Identity serializers for raw binary data (no JSON transformation)
+20 -1
apps/hosting-service/.env.example
··· 3 4 # Server 5 PORT=3001 6 - BASE_HOST=wisp.place
··· 3 4 # Server 5 PORT=3001 6 + # Base domain (e.g., "localhost" for sites.localhost, "wisp.place" for sites.wisp.place) 7 + BASE_HOST=localhost 8 + 9 + # Redis (cache invalidation + revalidation queue) 10 + REDIS_URL=redis://localhost:6379 11 + 12 + # S3 Storage (leave empty for local disk fallback) 13 + S3_BUCKET= 14 + S3_METADATA_BUCKET= 15 + S3_REGION=auto 16 + S3_ENDPOINT= 17 + S3_PREFIX=sites/ 18 + S3_FORCE_PATH_STYLE=true 19 + 20 + # AWS Credentials (required if using S3) 21 + AWS_ACCESS_KEY_ID= 22 + AWS_SECRET_ACCESS_KEY= 23 + 24 + # For local disk fallback (when S3_BUCKET is empty) 25 + CACHE_DIR=./cache/sites
+15 -7
apps/hosting-service/src/index.ts
··· 1 import app from './server'; 2 import { serve } from '@hono/node-server'; 3 - import { initializeGrafanaExporters } from '@wispplace/observability'; 4 import { mkdirSync, existsSync } from 'fs'; 5 import { startDomainCacheCleanup, stopDomainCacheCleanup, closeDatabase } from './lib/db'; 6 import { closeRevalidateQueue } from './lib/revalidate-queue'; 7 import { storage, getStorageConfig } from './lib/storage'; 8 9 // Initialize Grafana exporters if configured 10 initializeGrafanaExporters({ ··· 18 // Ensure cache directory exists 19 if (!existsSync(CACHE_DIR)) { 20 mkdirSync(CACHE_DIR, { recursive: true }); 21 - console.log('Created cache directory:', CACHE_DIR); 22 } 23 24 // Start domain cache cleanup 25 startDomainCacheCleanup(); 26 27 // Optional: Bootstrap hot cache from warm tier on startup 28 const BOOTSTRAP_HOT_ON_STARTUP = process.env.BOOTSTRAP_HOT_ON_STARTUP === 'true'; 29 const BOOTSTRAP_HOT_LIMIT = process.env.BOOTSTRAP_HOT_LIMIT ? parseInt(process.env.BOOTSTRAP_HOT_LIMIT) : 100; 30 31 if (BOOTSTRAP_HOT_ON_STARTUP) { 32 - console.log(`๐Ÿ”ฅ Bootstrapping hot cache (top ${BOOTSTRAP_HOT_LIMIT} items)...`); 33 storage.bootstrapHot(BOOTSTRAP_HOT_LIMIT) 34 .then((loaded: number) => { 35 - console.log(`โœ… Bootstrapped ${loaded} items into hot cache`); 36 }) 37 .catch((err: unknown) => { 38 - console.error('โŒ Hot cache bootstrap error:', err); 39 }); 40 } 41 ··· 78 79 // Graceful shutdown 80 process.on('SIGINT', async () => { 81 - console.log('\n๐Ÿ›‘ Shutting down...'); 82 stopDomainCacheCleanup(); 83 await closeRevalidateQueue(); 84 await closeDatabase(); 85 server.close(); ··· 87 }); 88 89 process.on('SIGTERM', async () => { 90 - console.log('\n๐Ÿ›‘ Shutting down...'); 91 stopDomainCacheCleanup(); 92 await closeRevalidateQueue(); 93 await closeDatabase(); 94 server.close();
··· 1 import app from './server'; 2 import { serve } from '@hono/node-server'; 3 + import { initializeGrafanaExporters, createLogger } from '@wispplace/observability'; 4 import { mkdirSync, existsSync } from 'fs'; 5 import { startDomainCacheCleanup, stopDomainCacheCleanup, closeDatabase } from './lib/db'; 6 import { closeRevalidateQueue } from './lib/revalidate-queue'; 7 + import { startCacheInvalidationSubscriber, stopCacheInvalidationSubscriber } from './lib/cache-invalidation'; 8 import { storage, getStorageConfig } from './lib/storage'; 9 + 10 + const logger = createLogger('hosting-service'); 11 12 // Initialize Grafana exporters if configured 13 initializeGrafanaExporters({ ··· 21 // Ensure cache directory exists 22 if (!existsSync(CACHE_DIR)) { 23 mkdirSync(CACHE_DIR, { recursive: true }); 24 + logger.info('Created cache directory', { CACHE_DIR }); 25 } 26 27 // Start domain cache cleanup 28 startDomainCacheCleanup(); 29 30 + // Start cache invalidation subscriber (listens for firehose-service updates via Redis pub/sub) 31 + startCacheInvalidationSubscriber(); 32 + 33 // Optional: Bootstrap hot cache from warm tier on startup 34 const BOOTSTRAP_HOT_ON_STARTUP = process.env.BOOTSTRAP_HOT_ON_STARTUP === 'true'; 35 const BOOTSTRAP_HOT_LIMIT = process.env.BOOTSTRAP_HOT_LIMIT ? parseInt(process.env.BOOTSTRAP_HOT_LIMIT) : 100; 36 37 if (BOOTSTRAP_HOT_ON_STARTUP) { 38 + logger.info(`Bootstrapping hot cache (top ${BOOTSTRAP_HOT_LIMIT} items)...`); 39 storage.bootstrapHot(BOOTSTRAP_HOT_LIMIT) 40 .then((loaded: number) => { 41 + logger.info(`Bootstrapped ${loaded} items into hot cache`); 42 }) 43 .catch((err: unknown) => { 44 + logger.error('Hot cache bootstrap error', err); 45 }); 46 } 47 ··· 84 85 // Graceful shutdown 86 process.on('SIGINT', async () => { 87 + logger.info('Shutting down...'); 88 stopDomainCacheCleanup(); 89 + await stopCacheInvalidationSubscriber(); 90 await closeRevalidateQueue(); 91 await closeDatabase(); 92 server.close(); ··· 94 }); 95 96 process.on('SIGTERM', async () => { 97 + logger.info('Shutting down...'); 98 stopDomainCacheCleanup(); 99 + await stopCacheInvalidationSubscriber(); 100 await closeRevalidateQueue(); 101 await closeDatabase(); 102 server.close();
+80
apps/hosting-service/src/lib/cache-invalidation.ts
···
··· 1 + /** 2 + * Cache invalidation subscriber 3 + * 4 + * Listens to Redis pub/sub for cache invalidation messages from the firehose-service. 5 + * When a site is updated/deleted, clears the hosting-service's local caches 6 + * (tiered storage hot+warm tiers, redirect rules) so stale data isn't served. 7 + */ 8 + 9 + import Redis from 'ioredis'; 10 + import { storage } from './storage'; 11 + import { clearRedirectRulesCache } from './site-cache'; 12 + 13 + const CHANNEL = 'wisp:cache-invalidate'; 14 + 15 + let subscriber: Redis | null = null; 16 + 17 + export function startCacheInvalidationSubscriber(): void { 18 + const redisUrl = process.env.REDIS_URL; 19 + if (!redisUrl) { 20 + console.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation disabled'); 21 + return; 22 + } 23 + 24 + console.log(`[CacheInvalidation] Connecting to Redis for subscribing: ${redisUrl}`); 25 + subscriber = new Redis(redisUrl, { 26 + maxRetriesPerRequest: 2, 27 + enableReadyCheck: true, 28 + }); 29 + 30 + subscriber.on('error', (err) => { 31 + console.error('[CacheInvalidation] Redis error:', err); 32 + }); 33 + 34 + subscriber.on('ready', () => { 35 + console.log('[CacheInvalidation] Redis subscriber connected'); 36 + }); 37 + 38 + subscriber.subscribe(CHANNEL, (err) => { 39 + if (err) { 40 + console.error('[CacheInvalidation] Failed to subscribe:', err); 41 + } else { 42 + console.log('[CacheInvalidation] Subscribed to', CHANNEL); 43 + } 44 + }); 45 + 46 + subscriber.on('message', async (_channel: string, message: string) => { 47 + try { 48 + const { did, rkey, action } = JSON.parse(message) as { 49 + did: string; 50 + rkey: string; 51 + action: 'update' | 'delete' | 'settings'; 52 + }; 53 + 54 + if (!did || !rkey) { 55 + console.warn('[CacheInvalidation] Invalid message:', message); 56 + return; 57 + } 58 + 59 + console.log(`[CacheInvalidation] Invalidating ${did}/${rkey} (${action})`); 60 + 61 + // Clear tiered storage (hot + warm) for this site 62 + const prefix = `${did}/${rkey}/`; 63 + const deleted = await storage.invalidate(prefix); 64 + console.log(`[CacheInvalidation] Cleared ${deleted} keys from tiered storage for ${did}/${rkey}`); 65 + 66 + // Clear redirect rules cache 67 + clearRedirectRulesCache(did, rkey); 68 + } catch (err) { 69 + console.error('[CacheInvalidation] Error processing message:', err); 70 + } 71 + }); 72 + } 73 + 74 + export async function stopCacheInvalidationSubscriber(): Promise<void> { 75 + if (subscriber) { 76 + const toClose = subscriber; 77 + subscriber = null; 78 + await toClose.quit(); 79 + } 80 + }
+26
apps/hosting-service/src/lib/db.ts
··· 119 console.log('[DB] Read-only mode: skipping upsertSite', { did, rkey, displayName }); 120 } 121 122 export interface SiteRecord { 123 did: string; 124 rkey: string;
··· 119 console.log('[DB] Read-only mode: skipping upsertSite', { did, rkey, displayName }); 120 } 121 122 + /** 123 + * Upsert site cache entry (used by on-demand caching when a site is completely missing) 124 + */ 125 + export async function upsertSiteCache( 126 + did: string, 127 + rkey: string, 128 + recordCid: string, 129 + fileCids: Record<string, string> 130 + ): Promise<void> { 131 + try { 132 + await sql` 133 + INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at) 134 + VALUES (${did}, ${rkey}, ${recordCid}, ${sql.json(fileCids ?? {})}, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW())) 135 + ON CONFLICT (did, rkey) 136 + DO UPDATE SET 137 + record_cid = EXCLUDED.record_cid, 138 + file_cids = EXCLUDED.file_cids, 139 + updated_at = EXTRACT(EPOCH FROM NOW()) 140 + `; 141 + } catch (err) { 142 + const error = err instanceof Error ? err : new Error(String(err)); 143 + console.error('[DB] upsertSiteCache error:', { did, rkey, error: error.message }); 144 + throw error; 145 + } 146 + } 147 + 148 export interface SiteRecord { 149 did: string; 150 rkey: string;
+45 -2
apps/hosting-service/src/lib/file-serving.ts
··· 18 import { enqueueRevalidate } from './revalidate-queue'; 19 import { recordStorageMiss } from './revalidate-metrics'; 20 import { normalizeFileCids } from '@wispplace/fs-utils'; 21 22 /** 23 * Helper to retrieve a file with metadata from tiered storage ··· 91 rkey: string, 92 filePath: string, 93 preferRewrittenHtml: boolean 94 - ): Promise<{ result: Awaited<ReturnType<typeof storage.getWithMetadata>>; filePath: string } | null> { 95 const mimeTypeGuess = lookup(filePath) || 'application/octet-stream'; 96 if (preferRewrittenHtml && isHtmlContent(filePath, mimeTypeGuess)) { 97 const rewrittenPath = `.rewritten/${filePath}`; ··· 107 } 108 109 function buildResponseFromStorageResult( 110 - result: Awaited<ReturnType<typeof storage.getWithMetadata>>, 111 filePath: string, 112 settings: WispSettings | null, 113 requestHeaders?: Record<string, string> ··· 149 } 150 151 /** 152 * Helper to serve files from cache (for custom domains and subdomains) 153 */ 154 export async function serveFromCache( ··· 158 fullUrl?: string, 159 headers?: Record<string, string> 160 ): Promise<Response> { 161 // Load settings for this site 162 const settings = await getCachedSettings(did, rkey); 163 const indexFiles = getIndexFiles(settings); ··· 445 fullUrl?: string, 446 headers?: Record<string, string> 447 ): Promise<Response> { 448 // Load settings for this site 449 const settings = await getCachedSettings(did, rkey); 450 const indexFiles = getIndexFiles(settings);
··· 18 import { enqueueRevalidate } from './revalidate-queue'; 19 import { recordStorageMiss } from './revalidate-metrics'; 20 import { normalizeFileCids } from '@wispplace/fs-utils'; 21 + import { fetchAndCacheSite } from './on-demand-cache'; 22 + import type { StorageResult } from '@wispplace/tiered-storage'; 23 + 24 + type FileStorageResult = StorageResult<Uint8Array>; 25 26 /** 27 * Helper to retrieve a file with metadata from tiered storage ··· 95 rkey: string, 96 filePath: string, 97 preferRewrittenHtml: boolean 98 + ): Promise<{ result: FileStorageResult; filePath: string } | null> { 99 const mimeTypeGuess = lookup(filePath) || 'application/octet-stream'; 100 if (preferRewrittenHtml && isHtmlContent(filePath, mimeTypeGuess)) { 101 const rewrittenPath = `.rewritten/${filePath}`; ··· 111 } 112 113 function buildResponseFromStorageResult( 114 + result: FileStorageResult, 115 filePath: string, 116 settings: WispSettings | null, 117 requestHeaders?: Record<string, string> ··· 153 } 154 155 /** 156 + * Ensure a site is cached locally. If the site has no DB entry (completely unknown), 157 + * attempt to fetch and cache it on-demand from the PDS. 158 + */ 159 + async function ensureSiteCached(did: string, rkey: string): Promise<void> { 160 + const existing = await getSiteCache(did, rkey); 161 + if (existing) { 162 + // Site is in DB โ€” check if any files actually exist in storage 163 + const prefix = `${did}/${rkey}/`; 164 + const hasFiles = await storage.exists(prefix.slice(0, -1)) || 165 + await checkAnyFileExists(did, rkey, existing.file_cids); 166 + if (hasFiles) { 167 + return; 168 + } 169 + console.log(`[FileServing] Site ${did}/${rkey} in DB but no files in storage, re-fetching`); 170 + } else { 171 + console.log(`[FileServing] Site ${did}/${rkey} not in DB, attempting on-demand cache`); 172 + } 173 + 174 + const success = await fetchAndCacheSite(did, rkey); 175 + console.log(`[FileServing] On-demand cache for ${did}/${rkey}: ${success ? 'success' : 'failed'}`); 176 + } 177 + 178 + async function checkAnyFileExists(did: string, rkey: string, fileCids: unknown): Promise<boolean> { 179 + if (!fileCids || typeof fileCids !== 'object') return false; 180 + const cids = fileCids as Record<string, string>; 181 + const firstFile = Object.keys(cids)[0]; 182 + if (!firstFile) return false; 183 + return storage.exists(`${did}/${rkey}/${firstFile}`); 184 + } 185 + 186 + /** 187 * Helper to serve files from cache (for custom domains and subdomains) 188 */ 189 export async function serveFromCache( ··· 193 fullUrl?: string, 194 headers?: Record<string, string> 195 ): Promise<Response> { 196 + // Check if this site is completely unknown (not in DB, no files in storage) 197 + // If so, attempt to fetch and cache it on-demand from the PDS 198 + await ensureSiteCached(did, rkey); 199 + 200 // Load settings for this site 201 const settings = await getCachedSettings(did, rkey); 202 const indexFiles = getIndexFiles(settings); ··· 484 fullUrl?: string, 485 headers?: Record<string, string> 486 ): Promise<Response> { 487 + // Check if this site is completely unknown (not in DB, no files in storage) 488 + // If so, attempt to fetch and cache it on-demand from the PDS 489 + await ensureSiteCached(did, rkey); 490 + 491 // Load settings for this site 492 const settings = await getCachedSettings(did, rkey); 493 const indexFiles = getIndexFiles(settings);
+259
apps/hosting-service/src/lib/on-demand-cache.ts
···
··· 1 + /** 2 + * On-demand site caching for the hosting service 3 + * 4 + * When a request hits a site that is completely missing (no DB entry, no files), 5 + * this module fetches the site record from the PDS, downloads all blobs, 6 + * writes them to local storage (hot + warm tiers), and updates the DB. 7 + * 8 + * This gives immediate serving capability. A revalidate is also enqueued 9 + * so the firehose-service backfills S3 (cold tier). 10 + */ 11 + 12 + import type { Record as WispFsRecord, Directory, Entry, File } from '@wispplace/lexicons/types/place/wisp/fs'; 13 + import { safeFetchJson, safeFetchBlob } from '@wispplace/safe-fetch'; 14 + import { extractBlobCid, getPdsForDid } from '@wispplace/atproto-utils'; 15 + import { shouldCompressMimeType } from '@wispplace/atproto-utils/compression'; 16 + import { collectFileCidsFromEntries, countFilesInDirectory } from '@wispplace/fs-utils'; 17 + import { MAX_BLOB_SIZE, MAX_FILE_COUNT, MAX_SITE_SIZE } from '@wispplace/constants'; 18 + import { expandSubfsNodes } from './utils'; 19 + import { storage } from './storage'; 20 + import { upsertSiteCache, tryAcquireLock, releaseLock } from './db'; 21 + import { enqueueRevalidate } from './revalidate-queue'; 22 + import { gunzipSync } from 'zlib'; 23 + 24 + // Track in-flight fetches to avoid duplicate work 25 + const inFlightFetches = new Map<string, Promise<boolean>>(); 26 + 27 + interface FileInfo { 28 + path: string; 29 + cid: string; 30 + blob: any; 31 + encoding?: 'gzip'; 32 + mimeType?: string; 33 + base64?: boolean; 34 + } 35 + 36 + /** 37 + * Attempt to fetch and cache a completely missing site on-demand. 38 + * Returns true if the site was successfully cached, false otherwise. 39 + * 40 + * Uses a distributed lock (pg advisory lock) to prevent multiple 41 + * hosting-service instances from fetching the same site simultaneously. 42 + */ 43 + export async function fetchAndCacheSite(did: string, rkey: string): Promise<boolean> { 44 + const key = `${did}:${rkey}`; 45 + 46 + // Check if there's already an in-flight fetch for this site 47 + const existing = inFlightFetches.get(key); 48 + if (existing) { 49 + return existing; 50 + } 51 + 52 + const fetchPromise = doFetchAndCache(did, rkey); 53 + inFlightFetches.set(key, fetchPromise); 54 + 55 + try { 56 + return await fetchPromise; 57 + } finally { 58 + inFlightFetches.delete(key); 59 + } 60 + } 61 + 62 + async function doFetchAndCache(did: string, rkey: string): Promise<boolean> { 63 + const lockKey = `on-demand-cache:${did}:${rkey}`; 64 + 65 + // Try to acquire a distributed lock 66 + const acquired = await tryAcquireLock(lockKey); 67 + if (!acquired) { 68 + console.log(`[OnDemandCache] Lock not acquired for ${did}/${rkey}, another instance is handling it`); 69 + return false; 70 + } 71 + 72 + try { 73 + console.log(`[OnDemandCache] Fetching missing site ${did}/${rkey}`); 74 + 75 + // Fetch site record from PDS 76 + const pdsEndpoint = await getPdsForDid(did); 77 + if (!pdsEndpoint) { 78 + console.error(`[OnDemandCache] Could not resolve PDS for ${did}`); 79 + return false; 80 + } 81 + 82 + const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(rkey)}`; 83 + 84 + let data: any; 85 + try { 86 + data = await safeFetchJson(recordUrl); 87 + } catch (err) { 88 + const msg = err instanceof Error ? err.message : String(err); 89 + if (msg.includes('HTTP 404') || msg.includes('Not Found')) { 90 + console.log(`[OnDemandCache] Site record not found on PDS: ${did}/${rkey}`); 91 + } else { 92 + console.error(`[OnDemandCache] Failed to fetch site record: ${did}/${rkey}`, msg); 93 + } 94 + return false; 95 + } 96 + 97 + const record = data.value as WispFsRecord; 98 + const recordCid = data.cid || ''; 99 + 100 + if (!record?.root?.entries) { 101 + console.error(`[OnDemandCache] Invalid record structure for ${did}/${rkey}`); 102 + return false; 103 + } 104 + 105 + // Expand subfs nodes 106 + const expandedRoot = await expandSubfsNodes(record.root, pdsEndpoint); 107 + 108 + // Validate limits 109 + const fileCount = countFilesInDirectory(expandedRoot); 110 + if (fileCount > MAX_FILE_COUNT) { 111 + console.error(`[OnDemandCache] Site exceeds file limit: ${fileCount} > ${MAX_FILE_COUNT}`); 112 + return false; 113 + } 114 + 115 + // Collect files 116 + const files = collectFileInfo(expandedRoot.entries); 117 + 118 + // Collect file CIDs for DB 119 + const fileCids: Record<string, string> = {}; 120 + collectFileCidsFromEntries(expandedRoot.entries, '', fileCids); 121 + 122 + // Download and write all files to local storage (hot + warm tiers) 123 + const CONCURRENCY = 10; 124 + let downloaded = 0; 125 + let failed = 0; 126 + 127 + for (let i = 0; i < files.length; i += CONCURRENCY) { 128 + const batch = files.slice(i, i + CONCURRENCY); 129 + const results = await Promise.allSettled( 130 + batch.map(file => downloadAndWriteBlob(did, rkey, file, pdsEndpoint)) 131 + ); 132 + 133 + for (const result of results) { 134 + if (result.status === 'fulfilled') { 135 + downloaded++; 136 + } else { 137 + failed++; 138 + console.error(`[OnDemandCache] Failed to download blob:`, result.reason); 139 + } 140 + } 141 + } 142 + 143 + console.log(`[OnDemandCache] Downloaded ${downloaded} files (${failed} failed) for ${did}/${rkey}`); 144 + 145 + // Update DB with file CIDs so future storage misses can be detected 146 + await upsertSiteCache(did, rkey, recordCid, fileCids); 147 + 148 + // Enqueue revalidate so firehose-service backfills S3 (cold tier) 149 + await enqueueRevalidate(did, rkey, `on-demand-cache`); 150 + 151 + console.log(`[OnDemandCache] Successfully cached site ${did}/${rkey}`); 152 + return downloaded > 0; 153 + } catch (err) { 154 + console.error(`[OnDemandCache] Error caching site ${did}/${rkey}:`, err); 155 + return false; 156 + } finally { 157 + await releaseLock(lockKey); 158 + } 159 + } 160 + 161 + function collectFileInfo(entries: Entry[], pathPrefix: string = ''): FileInfo[] { 162 + const files: FileInfo[] = []; 163 + 164 + for (const entry of entries) { 165 + const currentPath = pathPrefix ? `${pathPrefix}/${entry.name}` : entry.name; 166 + const node = entry.node; 167 + 168 + if ('type' in node && node.type === 'directory' && 'entries' in node) { 169 + files.push(...collectFileInfo(node.entries, currentPath)); 170 + } else if ('type' in node && node.type === 'file' && 'blob' in node) { 171 + const fileNode = node as File; 172 + const cid = extractBlobCid(fileNode.blob); 173 + if (cid) { 174 + files.push({ 175 + path: currentPath, 176 + cid, 177 + blob: fileNode.blob, 178 + encoding: fileNode.encoding, 179 + mimeType: fileNode.mimeType, 180 + base64: fileNode.base64, 181 + }); 182 + } 183 + } 184 + } 185 + 186 + return files; 187 + } 188 + 189 + async function downloadAndWriteBlob( 190 + did: string, 191 + rkey: string, 192 + file: FileInfo, 193 + pdsEndpoint: string 194 + ): Promise<void> { 195 + const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`; 196 + 197 + let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 }); 198 + let encoding = file.encoding; 199 + 200 + // Decode base64 if flagged 201 + if (file.base64) { 202 + const base64String = new TextDecoder().decode(content); 203 + content = Buffer.from(base64String, 'base64'); 204 + } 205 + 206 + // Decompress if needed and shouldn't stay compressed 207 + const shouldStayCompressed = shouldCompressMimeType(file.mimeType); 208 + 209 + if (encoding === 'gzip' && !shouldStayCompressed && content.length >= 2 && 210 + content[0] === 0x1f && content[1] === 0x8b) { 211 + try { 212 + content = gunzipSync(content); 213 + encoding = undefined; 214 + } catch { 215 + // Keep gzipped if decompression fails 216 + } 217 + } 218 + 219 + // If encoding is missing but data looks gzipped for a text-like file, mark it 220 + if (!encoding && isTextLikeMime(file.mimeType, file.path) && content.length >= 2 && 221 + content[0] === 0x1f && content[1] === 0x8b) { 222 + encoding = 'gzip'; 223 + } 224 + 225 + // Build storage key and metadata 226 + const key = `${did}/${rkey}/${file.path}`; 227 + const metadata: Record<string, string> = {}; 228 + if (encoding) metadata.encoding = encoding; 229 + if (file.mimeType) metadata.mimeType = file.mimeType; 230 + 231 + // Write to hot + warm tiers only (cold/S3 is read-only in hosting-service, 232 + // firehose-service will backfill via revalidate) 233 + await storage.set(key as any, content as any, { 234 + metadata, 235 + skipTiers: [], 236 + }); 237 + } 238 + 239 + function isTextLikeMime(mimeType?: string, path?: string): boolean { 240 + if (mimeType) { 241 + if (mimeType === 'text/html') return true; 242 + if (mimeType === 'text/css') return true; 243 + if (mimeType === 'text/javascript') return true; 244 + if (mimeType === 'application/javascript') return true; 245 + if (mimeType === 'application/json') return true; 246 + if (mimeType === 'application/xml') return true; 247 + if (mimeType === 'image/svg+xml') return true; 248 + } 249 + 250 + if (!path) return false; 251 + const lower = path.toLowerCase(); 252 + return lower.endsWith('.html') || 253 + lower.endsWith('.htm') || 254 + lower.endsWith('.css') || 255 + lower.endsWith('.js') || 256 + lower.endsWith('.json') || 257 + lower.endsWith('.xml') || 258 + lower.endsWith('.svg'); 259 + }
+7 -1
apps/hosting-service/src/lib/revalidate-queue.ts
··· 18 } 19 20 if (!client) { 21 client = new Redis(redisUrl, { 22 maxRetriesPerRequest: 2, 23 enableReadyCheck: true, ··· 26 client.on('error', (err) => { 27 console.error('[Revalidate] Redis error:', err); 28 }); 29 } 30 31 return client; ··· 46 47 try { 48 const dedupeKey = `revalidate:site:${did}:${rkey}`; 49 - const set = await redis.set(dedupeKey, '1', 'NX', 'EX', dedupeTtlSeconds); 50 if (!set) { 51 recordRevalidateResult('deduped'); 52 return { enqueued: false, result: 'deduped' }; ··· 65 Date.now().toString() 66 ); 67 68 recordRevalidateResult('enqueued'); 69 return { enqueued: true, result: 'enqueued' }; 70 } catch (err) {
··· 18 } 19 20 if (!client) { 21 + console.log(`[Revalidate] Connecting to Redis: ${redisUrl}`); 22 client = new Redis(redisUrl, { 23 maxRetriesPerRequest: 2, 24 enableReadyCheck: true, ··· 27 client.on('error', (err) => { 28 console.error('[Revalidate] Redis error:', err); 29 }); 30 + 31 + client.on('ready', () => { 32 + console.log(`[Revalidate] Redis connected, stream: ${streamName}`); 33 + }); 34 } 35 36 return client; ··· 51 52 try { 53 const dedupeKey = `revalidate:site:${did}:${rkey}`; 54 + const set = await redis.set(dedupeKey, '1', 'EX', dedupeTtlSeconds, 'NX'); 55 if (!set) { 56 recordRevalidateResult('deduped'); 57 return { enqueued: false, result: 'deduped' }; ··· 70 Date.now().toString() 71 ); 72 73 + console.log(`[Revalidate] Enqueued ${did}/${rkey} (${reason}) to ${streamName}`); 74 recordRevalidateResult('enqueued'); 75 return { enqueued: true, result: 'enqueued' }; 76 } catch (err) {
+36 -6
apps/hosting-service/src/lib/storage.ts
··· 59 60 constructor(private tier: StorageTier) {} 61 62 - // Read operations - pass through to underlying tier 63 async get(key: string) { 64 - return this.tier.get(key); 65 } 66 67 async getWithMetadata(key: string) { 68 - return this.tier.getWithMetadata?.(key) ?? null; 69 } 70 71 async getStream(key: string) { 72 - return this.tier.getStream?.(key) ?? null; 73 } 74 75 async exists(key: string) { 76 - return this.tier.exists(key); 77 } 78 79 async getMetadata(key: string) { 80 - return this.tier.getMetadata(key); 81 } 82 83 async *listKeys(prefix?: string) { ··· 111 112 async clear() { 113 this.logWriteSkip('clear', 'all keys'); 114 } 115 116 private logWriteSkip(operation: string, key: string) {
··· 59 60 constructor(private tier: StorageTier) {} 61 62 + // Read operations - pass through to underlying tier, catch errors as cache misses 63 async get(key: string) { 64 + try { 65 + return await this.tier.get(key); 66 + } catch (err) { 67 + this.logReadError('get', key, err); 68 + return null; 69 + } 70 } 71 72 async getWithMetadata(key: string) { 73 + try { 74 + return await this.tier.getWithMetadata?.(key) ?? null; 75 + } catch (err) { 76 + this.logReadError('getWithMetadata', key, err); 77 + return null; 78 + } 79 } 80 81 async getStream(key: string) { 82 + try { 83 + return await this.tier.getStream?.(key) ?? null; 84 + } catch (err) { 85 + this.logReadError('getStream', key, err); 86 + return null; 87 + } 88 } 89 90 async exists(key: string) { 91 + try { 92 + return await this.tier.exists(key); 93 + } catch (err) { 94 + this.logReadError('exists', key, err); 95 + return false; 96 + } 97 } 98 99 async getMetadata(key: string) { 100 + try { 101 + return await this.tier.getMetadata(key); 102 + } catch (err) { 103 + this.logReadError('getMetadata', key, err); 104 + return null; 105 + } 106 } 107 108 async *listKeys(prefix?: string) { ··· 136 137 async clear() { 138 this.logWriteSkip('clear', 'all keys'); 139 + } 140 + 141 + private logReadError(operation: string, key: string, err: unknown) { 142 + const msg = err instanceof Error ? err.message : String(err); 143 + console.warn(`[Storage] S3 read error (${operation}) for ${key}: ${msg}`); 144 } 145 146 private logWriteSkip(operation: string, key: string) {
+15 -5
apps/hosting-service/src/server.ts
··· 7 import { cors } from 'hono/cors'; 8 import { getWispDomain, getCustomDomain, getCustomDomainByHash } from './lib/db'; 9 import { resolveDid } from './lib/utils'; 10 - import { logCollector, errorTracker, metricsCollector } from '@wispplace/observability'; 11 import { observabilityMiddleware, observabilityErrorHandler } from '@wispplace/observability/middleware/hono'; 12 import { sanitizePath } from '@wispplace/fs-utils'; 13 import { isValidRkey, extractHeaders } from './lib/request-utils'; 14 import { serveFromCache, serveFromCacheWithRewrite } from './lib/file-serving'; 15 import { getRevalidateMetrics } from './lib/revalidate-metrics'; 16 17 - const BASE_HOST = process.env.BASE_HOST || 'wisp.place'; 18 19 const app = new Hono(); 20 ··· 38 app.get('/*', async (c) => { 39 const url = new URL(c.req.url); 40 const hostname = c.req.header('host') || ''; 41 - const hostnameWithoutPort = hostname.split(':')[0]; 42 const rawPath = url.pathname.replace(/^\//, ''); 43 const path = sanitizePath(rawPath); 44 45 // Check if this is sites.wisp.place subdomain (strip port for comparison) 46 if (hostnameWithoutPort === `sites.${BASE_HOST}`) { 47 // Sanitize the path FIRST to prevent path traversal ··· 78 return c.text('Invalid identifier', 400); 79 } 80 81 - console.log(`[Server] sites.wisp.place request: identifier=${identifier}, site=${site}, filePath=${filePath}`); 82 83 // Serve with HTML path rewriting to handle absolute paths 84 const basePath = `/${identifier}/${site}/`; 85 - console.log(`[Server] Serving with basePath: ${basePath}`); 86 const headers = extractHeaders(c.req.raw.headers); 87 return serveFromCacheWithRewrite(did, site, filePath, basePath, c.req.url, headers); 88 }
··· 7 import { cors } from 'hono/cors'; 8 import { getWispDomain, getCustomDomain, getCustomDomainByHash } from './lib/db'; 9 import { resolveDid } from './lib/utils'; 10 + import { createLogger, logCollector, errorTracker, metricsCollector } from '@wispplace/observability'; 11 import { observabilityMiddleware, observabilityErrorHandler } from '@wispplace/observability/middleware/hono'; 12 import { sanitizePath } from '@wispplace/fs-utils'; 13 import { isValidRkey, extractHeaders } from './lib/request-utils'; 14 import { serveFromCache, serveFromCacheWithRewrite } from './lib/file-serving'; 15 import { getRevalidateMetrics } from './lib/revalidate-metrics'; 16 17 + const logger = createLogger('hosting-service'); 18 + 19 + const BASE_HOST_ENV = process.env.BASE_HOST || 'wisp.place'; 20 + const BASE_HOST = BASE_HOST_ENV.split(':')[0] || BASE_HOST_ENV; 21 22 const app = new Hono(); 23 ··· 41 app.get('/*', async (c) => { 42 const url = new URL(c.req.url); 43 const hostname = c.req.header('host') || ''; 44 + const hostnameWithoutPort = hostname.split(':')[0] || ''; 45 const rawPath = url.pathname.replace(/^\//, ''); 46 const path = sanitizePath(rawPath); 47 48 + logger.debug(`Request: host=${hostname} hostnameWithoutPort=${hostnameWithoutPort} path=${path}`, { BASE_HOST }); 49 + 50 // Check if this is sites.wisp.place subdomain (strip port for comparison) 51 if (hostnameWithoutPort === `sites.${BASE_HOST}`) { 52 // Sanitize the path FIRST to prevent path traversal ··· 83 return c.text('Invalid identifier', 400); 84 } 85 86 + // Redirect to trailing slash when accessing site root so relative paths resolve correctly 87 + if (!filePath && !url.pathname.endsWith('/')) { 88 + return c.redirect(`${url.pathname}/${url.search}`, 301); 89 + } 90 + 91 + logger.debug(`sites.wisp.place request: identifier=${identifier}, site=${site}, filePath=${filePath}`); 92 93 // Serve with HTML path rewriting to handle absolute paths 94 const basePath = `/${identifier}/${site}/`; 95 + logger.debug(`Serving with basePath: ${basePath}`); 96 const headers = extractHeaders(c.req.raw.headers); 97 return serveFromCacheWithRewrite(did, site, filePath, basePath, c.req.url, headers); 98 }
+8 -3
bun.lock
··· 28 "@wispplace/database": "workspace:*", 29 "@wispplace/fs-utils": "workspace:*", 30 "@wispplace/lexicons": "workspace:*", 31 "@wispplace/safe-fetch": "workspace:*", 32 "@wispplace/tiered-storage": "workspace:*", 33 "hono": "^4.10.4", ··· 135 }, 136 "cli": { 137 "name": "wispctl", 138 - "version": "1.0.6", 139 "bin": { 140 "wispctl": "dist/index.js", 141 }, ··· 1920 1921 "@typescript-eslint/typescript-estree/minimatch": ["minimatch@9.0.5", "", { "dependencies": { "brace-expansion": "^2.0.1" } }, "sha512-G6T0ZX48xgozx7587koeX9Ys2NYy6Gmv//P89sEte9V9whIapMNF4idKxnW2QtCcLiTWlb/wfCabAtAFWhhBow=="], 1922 1923 - "@wispplace/bun-firehose/@types/bun": ["@types/bun@1.3.8", "", { "dependencies": { "bun-types": "1.3.8" } }, "sha512-3LvWJ2q5GerAXYxO2mffLTqOzEu5qnhEAlh48Vnu8WQfnmSwbgagjGZV6BoHKJztENYEDn6QmVd949W4uESRJA=="], 1924 1925 "@wispplace/lexicons/@atproto/lexicon": ["@atproto/lexicon@0.5.2", "", { "dependencies": { "@atproto/common-web": "^0.4.4", "@atproto/syntax": "^0.4.1", "iso-datestring-validator": "^2.2.2", "multiformats": "^9.9.0", "zod": "^3.23.8" } }, "sha512-lRmJgMA8f5j7VB5Iu5cp188ald5FuI4FlmZ7nn6EBrk1dgOstWVrI5Ft6K3z2vjyLZRG6nzknlsw+tDP63p7bQ=="], 1926 1927 "@wispplace/main-app/@atproto/api": ["@atproto/api@0.17.7", "", { "dependencies": { "@atproto/common-web": "^0.4.3", "@atproto/lexicon": "^0.5.1", "@atproto/syntax": "^0.4.1", "@atproto/xrpc": "^0.7.5", "await-lock": "^2.2.2", "multiformats": "^9.9.0", "tlds": "^1.234.0", "zod": "^3.23.8" } }, "sha512-V+OJBZq9chcrD21xk1bUa6oc5DSKfQj5DmUPf5rmZncqL1w9ZEbS38H5cMyqqdhfgo2LWeDRdZHD0rvNyJsIaw=="], 1928 1929 - "@wispplace/tiered-storage/@types/bun": ["@types/bun@1.3.8", "", { "dependencies": { "bun-types": "1.3.8" } }, "sha512-3LvWJ2q5GerAXYxO2mffLTqOzEu5qnhEAlh48Vnu8WQfnmSwbgagjGZV6BoHKJztENYEDn6QmVd949W4uESRJA=="], 1930 1931 "accepts/mime-types": ["mime-types@2.1.35", "", { "dependencies": { "mime-db": "1.52.0" } }, "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw=="], 1932 ··· 2058 2059 "@typescript-eslint/typescript-estree/minimatch/brace-expansion": ["brace-expansion@2.0.2", "", { "dependencies": { "balanced-match": "^1.0.0" } }, "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ=="], 2060 2061 "@wispplace/lexicons/@atproto/lexicon/multiformats": ["multiformats@9.9.0", "", {}, "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg=="], 2062 2063 "@wispplace/main-app/@atproto/api/@atproto/lexicon": ["@atproto/lexicon@0.5.2", "", { "dependencies": { "@atproto/common-web": "^0.4.4", "@atproto/syntax": "^0.4.1", "iso-datestring-validator": "^2.2.2", "multiformats": "^9.9.0", "zod": "^3.23.8" } }, "sha512-lRmJgMA8f5j7VB5Iu5cp188ald5FuI4FlmZ7nn6EBrk1dgOstWVrI5Ft6K3z2vjyLZRG6nzknlsw+tDP63p7bQ=="], 2064 2065 "@wispplace/main-app/@atproto/api/multiformats": ["multiformats@9.9.0", "", {}, "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg=="], 2066 2067 "accepts/mime-types/mime-db": ["mime-db@1.52.0", "", {}, "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg=="], 2068
··· 28 "@wispplace/database": "workspace:*", 29 "@wispplace/fs-utils": "workspace:*", 30 "@wispplace/lexicons": "workspace:*", 31 + "@wispplace/observability": "workspace:*", 32 "@wispplace/safe-fetch": "workspace:*", 33 "@wispplace/tiered-storage": "workspace:*", 34 "hono": "^4.10.4", ··· 136 }, 137 "cli": { 138 "name": "wispctl", 139 + "version": "1.0.8", 140 "bin": { 141 "wispctl": "dist/index.js", 142 }, ··· 1921 1922 "@typescript-eslint/typescript-estree/minimatch": ["minimatch@9.0.5", "", { "dependencies": { "brace-expansion": "^2.0.1" } }, "sha512-G6T0ZX48xgozx7587koeX9Ys2NYy6Gmv//P89sEte9V9whIapMNF4idKxnW2QtCcLiTWlb/wfCabAtAFWhhBow=="], 1923 1924 + "@wispplace/bun-firehose/@types/bun": ["@types/bun@1.3.9", "", { "dependencies": { "bun-types": "1.3.9" } }, "sha512-KQ571yULOdWJiMH+RIWIOZ7B2RXQGpL1YQrBtLIV3FqDcCu6FsbFUBwhdKUlCKUpS3PJDsHlJ1QKlpxoVR+xtw=="], 1925 1926 "@wispplace/lexicons/@atproto/lexicon": ["@atproto/lexicon@0.5.2", "", { "dependencies": { "@atproto/common-web": "^0.4.4", "@atproto/syntax": "^0.4.1", "iso-datestring-validator": "^2.2.2", "multiformats": "^9.9.0", "zod": "^3.23.8" } }, "sha512-lRmJgMA8f5j7VB5Iu5cp188ald5FuI4FlmZ7nn6EBrk1dgOstWVrI5Ft6K3z2vjyLZRG6nzknlsw+tDP63p7bQ=="], 1927 1928 "@wispplace/main-app/@atproto/api": ["@atproto/api@0.17.7", "", { "dependencies": { "@atproto/common-web": "^0.4.3", "@atproto/lexicon": "^0.5.1", "@atproto/syntax": "^0.4.1", "@atproto/xrpc": "^0.7.5", "await-lock": "^2.2.2", "multiformats": "^9.9.0", "tlds": "^1.234.0", "zod": "^3.23.8" } }, "sha512-V+OJBZq9chcrD21xk1bUa6oc5DSKfQj5DmUPf5rmZncqL1w9ZEbS38H5cMyqqdhfgo2LWeDRdZHD0rvNyJsIaw=="], 1929 1930 + "@wispplace/tiered-storage/@types/bun": ["@types/bun@1.3.9", "", { "dependencies": { "bun-types": "1.3.9" } }, "sha512-KQ571yULOdWJiMH+RIWIOZ7B2RXQGpL1YQrBtLIV3FqDcCu6FsbFUBwhdKUlCKUpS3PJDsHlJ1QKlpxoVR+xtw=="], 1931 1932 "accepts/mime-types": ["mime-types@2.1.35", "", { "dependencies": { "mime-db": "1.52.0" } }, "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw=="], 1933 ··· 2059 2060 "@typescript-eslint/typescript-estree/minimatch/brace-expansion": ["brace-expansion@2.0.2", "", { "dependencies": { "balanced-match": "^1.0.0" } }, "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ=="], 2061 2062 + "@wispplace/bun-firehose/@types/bun/bun-types": ["bun-types@1.3.9", "", { "dependencies": { "@types/node": "*" } }, "sha512-+UBWWOakIP4Tswh0Bt0QD0alpTY8cb5hvgiYeWCMet9YukHbzuruIEeXC2D7nMJPB12kbh8C7XJykSexEqGKJg=="], 2063 + 2064 "@wispplace/lexicons/@atproto/lexicon/multiformats": ["multiformats@9.9.0", "", {}, "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg=="], 2065 2066 "@wispplace/main-app/@atproto/api/@atproto/lexicon": ["@atproto/lexicon@0.5.2", "", { "dependencies": { "@atproto/common-web": "^0.4.4", "@atproto/syntax": "^0.4.1", "iso-datestring-validator": "^2.2.2", "multiformats": "^9.9.0", "zod": "^3.23.8" } }, "sha512-lRmJgMA8f5j7VB5Iu5cp188ald5FuI4FlmZ7nn6EBrk1dgOstWVrI5Ft6K3z2vjyLZRG6nzknlsw+tDP63p7bQ=="], 2067 2068 "@wispplace/main-app/@atproto/api/multiformats": ["multiformats@9.9.0", "", {}, "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg=="], 2069 + 2070 + "@wispplace/tiered-storage/@types/bun/bun-types": ["bun-types@1.3.9", "", { "dependencies": { "@types/node": "*" } }, "sha512-+UBWWOakIP4Tswh0Bt0QD0alpTY8cb5hvgiYeWCMet9YukHbzuruIEeXC2D7nMJPB12kbh8C7XJykSexEqGKJg=="], 2071 2072 "accepts/mime-types/mime-db": ["mime-db@1.52.0", "", {}, "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg=="], 2073
+15
docker-compose.yml
··· 17 timeout: 5s 18 retries: 5 19 20 volumes: 21 postgres_data:
··· 17 timeout: 5s 18 retries: 5 19 20 + redis: 21 + image: redis:7-alpine 22 + container_name: wisp-redis 23 + restart: unless-stopped 24 + ports: 25 + - "6379:6379" 26 + volumes: 27 + - redis_data:/data 28 + healthcheck: 29 + test: ["CMD", "redis-cli", "ping"] 30 + interval: 5s 31 + timeout: 5s 32 + retries: 5 33 + 34 volumes: 35 postgres_data: 36 + redis_data:

History

4 rounds 1 comment
sign up or login to add to the discussion
7 commits
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
integrate observability package across hosting and firehose services
Dockerfile
fix storage-miss revalidation loop and tier reporting
1/1 failed
expand
expand 1 comment

this is what is life on us-east-1 right now. seems to be doing fine as of 2/6 10:38pm

pull request successfully merged
5 commits
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
integrate observability package across hosting and firehose services
1/1 failed
expand
expand 0 comments
4 commits
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
1/1 failed
expand
expand 0 comments
1 commit
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
1/1 failed
expand
expand 0 comments