Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place

hosting service writes on cache miss, firehose service properly notifies hosting service on new updates #6

merged opened by nekomimi.pet targeting main from hosting-service-fixes

actually forcing myself to develop good habits this year on my own projects because i deserve them

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:ttdrpj45ibqunmfhdsb4zdwq/sh.tangled.repo.pull/3merr52ear522
+641 -27
Diff #1
+27
apps/firehose-service/.env.example
···
··· 1 + # Database 2 + DATABASE_URL=postgres://postgres:postgres@localhost:5432/wisp 3 + 4 + # Firehose 5 + FIREHOSE_SERVICE=wss://bsky.network 6 + FIREHOSE_MAX_CONCURRENCY=5 7 + 8 + # Redis (cache invalidation + revalidation queue) 9 + REDIS_URL=redis://localhost:6379 10 + 11 + # S3 Storage (leave empty for local disk fallback) 12 + S3_BUCKET= 13 + S3_METADATA_BUCKET= 14 + S3_REGION=auto 15 + S3_ENDPOINT= 16 + S3_PREFIX=sites/ 17 + S3_FORCE_PATH_STYLE=true 18 + 19 + # AWS Credentials (required if using S3) 20 + AWS_ACCESS_KEY_ID= 21 + AWS_SECRET_ACCESS_KEY= 22 + 23 + # Health check server 24 + HEALTH_PORT=3001 25 + 26 + # For local disk fallback (when S3_BUCKET is empty) 27 + CACHE_DIR=./cache/sites
+2
apps/firehose-service/src/index.ts
··· 14 import { storage } from './lib/storage'; 15 import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer'; 16 import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker'; 17 18 const app = new Hono(); 19 ··· 41 42 stopFirehose(); 43 await stopRevalidateWorker(); 44 await closeDatabase(); 45 46 console.log('[Service] Shutdown complete');
··· 14 import { storage } from './lib/storage'; 15 import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer'; 16 import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker'; 17 + import { closeCacheInvalidationPublisher } from './lib/cache-invalidation'; 18 19 const app = new Hono(); 20 ··· 42 43 stopFirehose(); 44 await stopRevalidateWorker(); 45 + await closeCacheInvalidationPublisher(); 46 await closeDatabase(); 47 48 console.log('[Service] Shutdown complete');
+68
apps/firehose-service/src/lib/cache-invalidation.ts
···
··· 1 + /** 2 + * Cache invalidation publisher 3 + * 4 + * Publishes invalidation messages to Redis pub/sub so the hosting-service 5 + * can clear its local caches (tiered storage, redirect rules) when a site 6 + * is updated or deleted via the firehose. 7 + */ 8 + 9 + import Redis from 'ioredis'; 10 + import { config } from '../config'; 11 + 12 + const CHANNEL = 'wisp:cache-invalidate'; 13 + 14 + let publisher: Redis | null = null; 15 + let loggedMissingRedis = false; 16 + 17 + function getPublisher(): Redis | null { 18 + if (!config.redisUrl) { 19 + if (!loggedMissingRedis) { 20 + console.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation publishing disabled'); 21 + loggedMissingRedis = true; 22 + } 23 + return null; 24 + } 25 + 26 + if (!publisher) { 27 + console.log(`[CacheInvalidation] Connecting to Redis for publishing: ${config.redisUrl}`); 28 + publisher = new Redis(config.redisUrl, { 29 + maxRetriesPerRequest: 2, 30 + enableReadyCheck: true, 31 + }); 32 + 33 + publisher.on('error', (err) => { 34 + console.error('[CacheInvalidation] Redis error:', err); 35 + }); 36 + 37 + publisher.on('ready', () => { 38 + console.log('[CacheInvalidation] Redis publisher connected'); 39 + }); 40 + } 41 + 42 + return publisher; 43 + } 44 + 45 + export async function publishCacheInvalidation( 46 + did: string, 47 + rkey: string, 48 + action: 'update' | 'delete' | 'settings' 49 + ): Promise<void> { 50 + const redis = getPublisher(); 51 + if (!redis) return; 52 + 53 + try { 54 + const message = JSON.stringify({ did, rkey, action }); 55 + console.log(`[CacheInvalidation] Publishing ${action} for ${did}/${rkey} to ${CHANNEL}`); 56 + await redis.publish(CHANNEL, message); 57 + } catch (err) { 58 + console.error('[CacheInvalidation] Failed to publish:', err); 59 + } 60 + } 61 + 62 + export async function closeCacheInvalidationPublisher(): Promise<void> { 63 + if (publisher) { 64 + const toClose = publisher; 65 + publisher = null; 66 + await toClose.quit(); 67 + } 68 + }
+17
apps/firehose-service/src/lib/cache-writer.ts
··· 15 import { getSiteCache, upsertSiteCache, deleteSiteCache, upsertSiteSettingsCache, deleteSiteSettingsCache } from './db'; 16 import { rewriteHtmlPaths, isHtmlFile } from './html-rewriter'; 17 import { gunzipSync } from 'zlib'; 18 19 /** 20 * Fetch a site record from the PDS ··· 438 recordCid: string, 439 options?: { 440 forceRewriteHtml?: boolean; 441 } 442 ): Promise<void> { 443 const forceRewriteHtml = options?.forceRewriteHtml === true; ··· 549 await handleSettingsUpdate(did, rkey, settingsRecord.record, settingsRecord.cid); 550 } 551 552 console.log(`[Cache] Successfully cached site ${did}/${rkey}`); 553 } 554 ··· 569 // Delete from DB 570 await deleteSiteCache(did, rkey); 571 572 console.log(`[Cache] Deleted site ${did}/${rkey} (${keys.length} files)`); 573 } 574 ··· 586 cleanUrls: settings.cleanUrls, 587 headers: settings.headers, 588 }); 589 } 590 591 /** ··· 594 export async function handleSettingsDelete(did: string, rkey: string): Promise<void> { 595 console.log(`[Cache] Deleting settings for ${did}/${rkey}`); 596 await deleteSiteSettingsCache(did, rkey); 597 }
··· 15 import { getSiteCache, upsertSiteCache, deleteSiteCache, upsertSiteSettingsCache, deleteSiteSettingsCache } from './db'; 16 import { rewriteHtmlPaths, isHtmlFile } from './html-rewriter'; 17 import { gunzipSync } from 'zlib'; 18 + import { publishCacheInvalidation } from './cache-invalidation'; 19 20 /** 21 * Fetch a site record from the PDS ··· 439 recordCid: string, 440 options?: { 441 forceRewriteHtml?: boolean; 442 + skipInvalidation?: boolean; 443 } 444 ): Promise<void> { 445 const forceRewriteHtml = options?.forceRewriteHtml === true; ··· 551 await handleSettingsUpdate(did, rkey, settingsRecord.record, settingsRecord.cid); 552 } 553 554 + // Notify hosting-service to invalidate its local caches 555 + // (skip for revalidate/backfill since hosting-service already has the files locally) 556 + if (!options?.skipInvalidation) { 557 + await publishCacheInvalidation(did, rkey, 'update'); 558 + } 559 + 560 console.log(`[Cache] Successfully cached site ${did}/${rkey}`); 561 } 562 ··· 577 // Delete from DB 578 await deleteSiteCache(did, rkey); 579 580 + // Notify hosting-service to invalidate its local caches 581 + await publishCacheInvalidation(did, rkey, 'delete'); 582 + 583 console.log(`[Cache] Deleted site ${did}/${rkey} (${keys.length} files)`); 584 } 585 ··· 597 cleanUrls: settings.cleanUrls, 598 headers: settings.headers, 599 }); 600 + 601 + // Notify hosting-service to invalidate its local caches (redirect rules depend on settings) 602 + await publishCacheInvalidation(did, rkey, 'settings'); 603 } 604 605 /** ··· 608 export async function handleSettingsDelete(did: string, rkey: string): Promise<void> { 609 console.log(`[Cache] Deleting settings for ${did}/${rkey}`); 610 await deleteSiteSettingsCache(did, rkey); 611 + 612 + // Notify hosting-service to invalidate its local caches 613 + await publishCacheInvalidation(did, rkey, 'settings'); 614 }
+8 -8
apps/firehose-service/src/lib/db.ts
··· 54 recordCid: string, 55 fileCids: Record<string, string> 56 ): Promise<void> { 57 - const fileCidsJson = fileCids ?? {}; 58 console.log(`[DB] upsertSiteCache starting for ${did}/${rkey}`); 59 try { 60 await sql` 61 INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at) 62 - VALUES (${did}, ${rkey}, ${recordCid}, ${fileCidsJson}::jsonb, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW())) 63 ON CONFLICT (did, rkey) 64 DO UPDATE SET 65 record_cid = EXCLUDED.record_cid, ··· 94 const directoryListing = settings.directoryListing ?? false; 95 const spaMode = settings.spaMode ?? null; 96 const custom404 = settings.custom404 ?? null; 97 - const indexFilesJson = settings.indexFiles ?? []; 98 const cleanUrls = settings.cleanUrls ?? true; 99 - const headersJson = settings.headers ?? []; 100 101 console.log(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, { 102 directoryListing, 103 spaMode, 104 custom404, 105 - indexFiles: indexFilesJson, 106 cleanUrls, 107 - headers: headersJson, 108 }); 109 110 try { ··· 117 ${directoryListing}, 118 ${spaMode}, 119 ${custom404}, 120 - ${indexFilesJson}::jsonb, 121 ${cleanUrls}, 122 - ${headersJson}::jsonb, 123 EXTRACT(EPOCH FROM NOW()), 124 EXTRACT(EPOCH FROM NOW()) 125 )
··· 54 recordCid: string, 55 fileCids: Record<string, string> 56 ): Promise<void> { 57 console.log(`[DB] upsertSiteCache starting for ${did}/${rkey}`); 58 try { 59 await sql` 60 INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at) 61 + VALUES (${did}, ${rkey}, ${recordCid}, ${sql.json(fileCids ?? {})}, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW())) 62 ON CONFLICT (did, rkey) 63 DO UPDATE SET 64 record_cid = EXCLUDED.record_cid, ··· 93 const directoryListing = settings.directoryListing ?? false; 94 const spaMode = settings.spaMode ?? null; 95 const custom404 = settings.custom404 ?? null; 96 const cleanUrls = settings.cleanUrls ?? true; 97 + 98 + const indexFiles = settings.indexFiles ?? []; 99 + const headers = settings.headers ?? []; 100 101 console.log(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, { 102 directoryListing, 103 spaMode, 104 custom404, 105 + indexFiles, 106 cleanUrls, 107 + headers, 108 }); 109 110 try { ··· 117 ${directoryListing}, 118 ${spaMode}, 119 ${custom404}, 120 + ${sql.json(indexFiles)}, 121 ${cleanUrls}, 122 + ${sql.json(headers)}, 123 EXTRACT(EPOCH FROM NOW()), 124 EXTRACT(EPOCH FROM NOW()) 125 )
+15 -7
apps/firehose-service/src/lib/revalidate-worker.ts
··· 38 return; 39 } 40 41 - console.log('[Revalidate] Processing', { did, rkey, reason, id }); 42 43 const record = await fetchSiteRecord(did, rkey); 44 if (!record) { 45 - console.warn('[Revalidate] Site record not found', { did, rkey }); 46 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 47 return; 48 } 49 50 - await handleSiteCreateOrUpdate(did, rkey, record.record, record.cid); 51 52 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 53 } 54 ··· 114 'GROUP', 115 config.revalidateGroup, 116 consumerName, 117 - 'BLOCK', 118 - blockMs, 119 'COUNT', 120 batchSize, 121 'STREAMS', 122 config.revalidateStream, 123 '>' 124 - ); 125 126 if (!response) return; 127 128 for (const [, messages] of response) { 129 - await processMessages(messages as Array<[string, string[]]>); 130 } 131 } 132 ··· 155 156 if (running) return; 157 158 redis = new Redis(config.redisUrl, { 159 maxRetriesPerRequest: 2, 160 enableReadyCheck: true, ··· 162 163 redis.on('error', (err) => { 164 console.error('[Revalidate] Redis error:', err); 165 }); 166 167 running = true;
··· 38 return; 39 } 40 41 + console.log(`[Revalidate] Received message ${id}: ${did}/${rkey} (${reason})`); 42 43 const record = await fetchSiteRecord(did, rkey); 44 if (!record) { 45 + console.warn(`[Revalidate] Site record not found on PDS: ${did}/${rkey}`); 46 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 47 return; 48 } 49 50 + await handleSiteCreateOrUpdate(did, rkey, record.record, record.cid, { 51 + skipInvalidation: true, 52 + }); 53 54 + console.log(`[Revalidate] Completed ${id}: ${did}/${rkey}`); 55 await redis.xack(config.revalidateStream, config.revalidateGroup, id); 56 } 57 ··· 117 'GROUP', 118 config.revalidateGroup, 119 consumerName, 120 'COUNT', 121 batchSize, 122 + 'BLOCK', 123 + blockMs, 124 'STREAMS', 125 config.revalidateStream, 126 '>' 127 + ) as [string, Array<[string, string[]]>][] | null; 128 129 if (!response) return; 130 131 for (const [, messages] of response) { 132 + await processMessages(messages); 133 } 134 } 135 ··· 158 159 if (running) return; 160 161 + console.log(`[Revalidate] Connecting to Redis: ${config.redisUrl}`); 162 redis = new Redis(config.redisUrl, { 163 maxRetriesPerRequest: 2, 164 enableReadyCheck: true, ··· 166 167 redis.on('error', (err) => { 168 console.error('[Revalidate] Redis error:', err); 169 + }); 170 + 171 + redis.on('ready', () => { 172 + console.log(`[Revalidate] Redis connected, stream: ${config.revalidateStream}, group: ${config.revalidateGroup}`); 173 }); 174 175 running = true;
+20 -1
apps/hosting-service/.env.example
··· 3 4 # Server 5 PORT=3001 6 - BASE_HOST=wisp.place
··· 3 4 # Server 5 PORT=3001 6 + # Base domain (e.g., "localhost" for sites.localhost, "wisp.place" for sites.wisp.place) 7 + BASE_HOST=localhost 8 + 9 + # Redis (cache invalidation + revalidation queue) 10 + REDIS_URL=redis://localhost:6379 11 + 12 + # S3 Storage (leave empty for local disk fallback) 13 + S3_BUCKET= 14 + S3_METADATA_BUCKET= 15 + S3_REGION=auto 16 + S3_ENDPOINT= 17 + S3_PREFIX=sites/ 18 + S3_FORCE_PATH_STYLE=true 19 + 20 + # AWS Credentials (required if using S3) 21 + AWS_ACCESS_KEY_ID= 22 + AWS_SECRET_ACCESS_KEY= 23 + 24 + # For local disk fallback (when S3_BUCKET is empty) 25 + CACHE_DIR=./cache/sites
+6
apps/hosting-service/src/index.ts
··· 4 import { mkdirSync, existsSync } from 'fs'; 5 import { startDomainCacheCleanup, stopDomainCacheCleanup, closeDatabase } from './lib/db'; 6 import { closeRevalidateQueue } from './lib/revalidate-queue'; 7 import { storage, getStorageConfig } from './lib/storage'; 8 9 // Initialize Grafana exporters if configured ··· 23 24 // Start domain cache cleanup 25 startDomainCacheCleanup(); 26 27 // Optional: Bootstrap hot cache from warm tier on startup 28 const BOOTSTRAP_HOT_ON_STARTUP = process.env.BOOTSTRAP_HOT_ON_STARTUP === 'true'; ··· 80 process.on('SIGINT', async () => { 81 console.log('\n๐Ÿ›‘ Shutting down...'); 82 stopDomainCacheCleanup(); 83 await closeRevalidateQueue(); 84 await closeDatabase(); 85 server.close(); ··· 89 process.on('SIGTERM', async () => { 90 console.log('\n๐Ÿ›‘ Shutting down...'); 91 stopDomainCacheCleanup(); 92 await closeRevalidateQueue(); 93 await closeDatabase(); 94 server.close();
··· 4 import { mkdirSync, existsSync } from 'fs'; 5 import { startDomainCacheCleanup, stopDomainCacheCleanup, closeDatabase } from './lib/db'; 6 import { closeRevalidateQueue } from './lib/revalidate-queue'; 7 + import { startCacheInvalidationSubscriber, stopCacheInvalidationSubscriber } from './lib/cache-invalidation'; 8 import { storage, getStorageConfig } from './lib/storage'; 9 10 // Initialize Grafana exporters if configured ··· 24 25 // Start domain cache cleanup 26 startDomainCacheCleanup(); 27 + 28 + // Start cache invalidation subscriber (listens for firehose-service updates via Redis pub/sub) 29 + startCacheInvalidationSubscriber(); 30 31 // Optional: Bootstrap hot cache from warm tier on startup 32 const BOOTSTRAP_HOT_ON_STARTUP = process.env.BOOTSTRAP_HOT_ON_STARTUP === 'true'; ··· 84 process.on('SIGINT', async () => { 85 console.log('\n๐Ÿ›‘ Shutting down...'); 86 stopDomainCacheCleanup(); 87 + await stopCacheInvalidationSubscriber(); 88 await closeRevalidateQueue(); 89 await closeDatabase(); 90 server.close(); ··· 94 process.on('SIGTERM', async () => { 95 console.log('\n๐Ÿ›‘ Shutting down...'); 96 stopDomainCacheCleanup(); 97 + await stopCacheInvalidationSubscriber(); 98 await closeRevalidateQueue(); 99 await closeDatabase(); 100 server.close();
+80
apps/hosting-service/src/lib/cache-invalidation.ts
···
··· 1 + /** 2 + * Cache invalidation subscriber 3 + * 4 + * Listens to Redis pub/sub for cache invalidation messages from the firehose-service. 5 + * When a site is updated/deleted, clears the hosting-service's local caches 6 + * (tiered storage hot+warm tiers, redirect rules) so stale data isn't served. 7 + */ 8 + 9 + import Redis from 'ioredis'; 10 + import { storage } from './storage'; 11 + import { clearRedirectRulesCache } from './site-cache'; 12 + 13 + const CHANNEL = 'wisp:cache-invalidate'; 14 + 15 + let subscriber: Redis | null = null; 16 + 17 + export function startCacheInvalidationSubscriber(): void { 18 + const redisUrl = process.env.REDIS_URL; 19 + if (!redisUrl) { 20 + console.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation disabled'); 21 + return; 22 + } 23 + 24 + console.log(`[CacheInvalidation] Connecting to Redis for subscribing: ${redisUrl}`); 25 + subscriber = new Redis(redisUrl, { 26 + maxRetriesPerRequest: 2, 27 + enableReadyCheck: true, 28 + }); 29 + 30 + subscriber.on('error', (err) => { 31 + console.error('[CacheInvalidation] Redis error:', err); 32 + }); 33 + 34 + subscriber.on('ready', () => { 35 + console.log('[CacheInvalidation] Redis subscriber connected'); 36 + }); 37 + 38 + subscriber.subscribe(CHANNEL, (err) => { 39 + if (err) { 40 + console.error('[CacheInvalidation] Failed to subscribe:', err); 41 + } else { 42 + console.log('[CacheInvalidation] Subscribed to', CHANNEL); 43 + } 44 + }); 45 + 46 + subscriber.on('message', async (_channel: string, message: string) => { 47 + try { 48 + const { did, rkey, action } = JSON.parse(message) as { 49 + did: string; 50 + rkey: string; 51 + action: 'update' | 'delete' | 'settings'; 52 + }; 53 + 54 + if (!did || !rkey) { 55 + console.warn('[CacheInvalidation] Invalid message:', message); 56 + return; 57 + } 58 + 59 + console.log(`[CacheInvalidation] Invalidating ${did}/${rkey} (${action})`); 60 + 61 + // Clear tiered storage (hot + warm) for this site 62 + const prefix = `${did}/${rkey}/`; 63 + const deleted = await storage.invalidate(prefix); 64 + console.log(`[CacheInvalidation] Cleared ${deleted} keys from tiered storage for ${did}/${rkey}`); 65 + 66 + // Clear redirect rules cache 67 + clearRedirectRulesCache(did, rkey); 68 + } catch (err) { 69 + console.error('[CacheInvalidation] Error processing message:', err); 70 + } 71 + }); 72 + } 73 + 74 + export async function stopCacheInvalidationSubscriber(): Promise<void> { 75 + if (subscriber) { 76 + const toClose = subscriber; 77 + subscriber = null; 78 + await toClose.quit(); 79 + } 80 + }
+26
apps/hosting-service/src/lib/db.ts
··· 119 console.log('[DB] Read-only mode: skipping upsertSite', { did, rkey, displayName }); 120 } 121 122 export interface SiteRecord { 123 did: string; 124 rkey: string;
··· 119 console.log('[DB] Read-only mode: skipping upsertSite', { did, rkey, displayName }); 120 } 121 122 + /** 123 + * Upsert site cache entry (used by on-demand caching when a site is completely missing) 124 + */ 125 + export async function upsertSiteCache( 126 + did: string, 127 + rkey: string, 128 + recordCid: string, 129 + fileCids: Record<string, string> 130 + ): Promise<void> { 131 + try { 132 + await sql` 133 + INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at) 134 + VALUES (${did}, ${rkey}, ${recordCid}, ${sql.json(fileCids ?? {})}, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW())) 135 + ON CONFLICT (did, rkey) 136 + DO UPDATE SET 137 + record_cid = EXCLUDED.record_cid, 138 + file_cids = EXCLUDED.file_cids, 139 + updated_at = EXTRACT(EPOCH FROM NOW()) 140 + `; 141 + } catch (err) { 142 + const error = err instanceof Error ? err : new Error(String(err)); 143 + console.error('[DB] upsertSiteCache error:', { did, rkey, error: error.message }); 144 + throw error; 145 + } 146 + } 147 + 148 export interface SiteRecord { 149 did: string; 150 rkey: string;
+45 -2
apps/hosting-service/src/lib/file-serving.ts
··· 18 import { enqueueRevalidate } from './revalidate-queue'; 19 import { recordStorageMiss } from './revalidate-metrics'; 20 import { normalizeFileCids } from '@wispplace/fs-utils'; 21 22 /** 23 * Helper to retrieve a file with metadata from tiered storage ··· 91 rkey: string, 92 filePath: string, 93 preferRewrittenHtml: boolean 94 - ): Promise<{ result: Awaited<ReturnType<typeof storage.getWithMetadata>>; filePath: string } | null> { 95 const mimeTypeGuess = lookup(filePath) || 'application/octet-stream'; 96 if (preferRewrittenHtml && isHtmlContent(filePath, mimeTypeGuess)) { 97 const rewrittenPath = `.rewritten/${filePath}`; ··· 107 } 108 109 function buildResponseFromStorageResult( 110 - result: Awaited<ReturnType<typeof storage.getWithMetadata>>, 111 filePath: string, 112 settings: WispSettings | null, 113 requestHeaders?: Record<string, string> ··· 149 } 150 151 /** 152 * Helper to serve files from cache (for custom domains and subdomains) 153 */ 154 export async function serveFromCache( ··· 158 fullUrl?: string, 159 headers?: Record<string, string> 160 ): Promise<Response> { 161 // Load settings for this site 162 const settings = await getCachedSettings(did, rkey); 163 const indexFiles = getIndexFiles(settings); ··· 445 fullUrl?: string, 446 headers?: Record<string, string> 447 ): Promise<Response> { 448 // Load settings for this site 449 const settings = await getCachedSettings(did, rkey); 450 const indexFiles = getIndexFiles(settings);
··· 18 import { enqueueRevalidate } from './revalidate-queue'; 19 import { recordStorageMiss } from './revalidate-metrics'; 20 import { normalizeFileCids } from '@wispplace/fs-utils'; 21 + import { fetchAndCacheSite } from './on-demand-cache'; 22 + import type { StorageResult } from '@wispplace/tiered-storage'; 23 + 24 + type FileStorageResult = StorageResult<Uint8Array>; 25 26 /** 27 * Helper to retrieve a file with metadata from tiered storage ··· 95 rkey: string, 96 filePath: string, 97 preferRewrittenHtml: boolean 98 + ): Promise<{ result: FileStorageResult; filePath: string } | null> { 99 const mimeTypeGuess = lookup(filePath) || 'application/octet-stream'; 100 if (preferRewrittenHtml && isHtmlContent(filePath, mimeTypeGuess)) { 101 const rewrittenPath = `.rewritten/${filePath}`; ··· 111 } 112 113 function buildResponseFromStorageResult( 114 + result: FileStorageResult, 115 filePath: string, 116 settings: WispSettings | null, 117 requestHeaders?: Record<string, string> ··· 153 } 154 155 /** 156 + * Ensure a site is cached locally. If the site has no DB entry (completely unknown), 157 + * attempt to fetch and cache it on-demand from the PDS. 158 + */ 159 + async function ensureSiteCached(did: string, rkey: string): Promise<void> { 160 + const existing = await getSiteCache(did, rkey); 161 + if (existing) { 162 + // Site is in DB โ€” check if any files actually exist in storage 163 + const prefix = `${did}/${rkey}/`; 164 + const hasFiles = await storage.exists(prefix.slice(0, -1)) || 165 + await checkAnyFileExists(did, rkey, existing.file_cids); 166 + if (hasFiles) { 167 + return; 168 + } 169 + console.log(`[FileServing] Site ${did}/${rkey} in DB but no files in storage, re-fetching`); 170 + } else { 171 + console.log(`[FileServing] Site ${did}/${rkey} not in DB, attempting on-demand cache`); 172 + } 173 + 174 + const success = await fetchAndCacheSite(did, rkey); 175 + console.log(`[FileServing] On-demand cache for ${did}/${rkey}: ${success ? 'success' : 'failed'}`); 176 + } 177 + 178 + async function checkAnyFileExists(did: string, rkey: string, fileCids: unknown): Promise<boolean> { 179 + if (!fileCids || typeof fileCids !== 'object') return false; 180 + const cids = fileCids as Record<string, string>; 181 + const firstFile = Object.keys(cids)[0]; 182 + if (!firstFile) return false; 183 + return storage.exists(`${did}/${rkey}/${firstFile}`); 184 + } 185 + 186 + /** 187 * Helper to serve files from cache (for custom domains and subdomains) 188 */ 189 export async function serveFromCache( ··· 193 fullUrl?: string, 194 headers?: Record<string, string> 195 ): Promise<Response> { 196 + // Check if this site is completely unknown (not in DB, no files in storage) 197 + // If so, attempt to fetch and cache it on-demand from the PDS 198 + await ensureSiteCached(did, rkey); 199 + 200 // Load settings for this site 201 const settings = await getCachedSettings(did, rkey); 202 const indexFiles = getIndexFiles(settings); ··· 484 fullUrl?: string, 485 headers?: Record<string, string> 486 ): Promise<Response> { 487 + // Check if this site is completely unknown (not in DB, no files in storage) 488 + // If so, attempt to fetch and cache it on-demand from the PDS 489 + await ensureSiteCached(did, rkey); 490 + 491 // Load settings for this site 492 const settings = await getCachedSettings(did, rkey); 493 const indexFiles = getIndexFiles(settings);
+259
apps/hosting-service/src/lib/on-demand-cache.ts
···
··· 1 + /** 2 + * On-demand site caching for the hosting service 3 + * 4 + * When a request hits a site that is completely missing (no DB entry, no files), 5 + * this module fetches the site record from the PDS, downloads all blobs, 6 + * writes them to local storage (hot + warm tiers), and updates the DB. 7 + * 8 + * This gives immediate serving capability. A revalidate is also enqueued 9 + * so the firehose-service backfills S3 (cold tier). 10 + */ 11 + 12 + import type { Record as WispFsRecord, Directory, Entry, File } from '@wispplace/lexicons/types/place/wisp/fs'; 13 + import { safeFetchJson, safeFetchBlob } from '@wispplace/safe-fetch'; 14 + import { extractBlobCid, getPdsForDid } from '@wispplace/atproto-utils'; 15 + import { shouldCompressMimeType } from '@wispplace/atproto-utils/compression'; 16 + import { collectFileCidsFromEntries, countFilesInDirectory } from '@wispplace/fs-utils'; 17 + import { MAX_BLOB_SIZE, MAX_FILE_COUNT, MAX_SITE_SIZE } from '@wispplace/constants'; 18 + import { expandSubfsNodes } from './utils'; 19 + import { storage } from './storage'; 20 + import { upsertSiteCache, tryAcquireLock, releaseLock } from './db'; 21 + import { enqueueRevalidate } from './revalidate-queue'; 22 + import { gunzipSync } from 'zlib'; 23 + 24 + // Track in-flight fetches to avoid duplicate work 25 + const inFlightFetches = new Map<string, Promise<boolean>>(); 26 + 27 + interface FileInfo { 28 + path: string; 29 + cid: string; 30 + blob: any; 31 + encoding?: 'gzip'; 32 + mimeType?: string; 33 + base64?: boolean; 34 + } 35 + 36 + /** 37 + * Attempt to fetch and cache a completely missing site on-demand. 38 + * Returns true if the site was successfully cached, false otherwise. 39 + * 40 + * Uses a distributed lock (pg advisory lock) to prevent multiple 41 + * hosting-service instances from fetching the same site simultaneously. 42 + */ 43 + export async function fetchAndCacheSite(did: string, rkey: string): Promise<boolean> { 44 + const key = `${did}:${rkey}`; 45 + 46 + // Check if there's already an in-flight fetch for this site 47 + const existing = inFlightFetches.get(key); 48 + if (existing) { 49 + return existing; 50 + } 51 + 52 + const fetchPromise = doFetchAndCache(did, rkey); 53 + inFlightFetches.set(key, fetchPromise); 54 + 55 + try { 56 + return await fetchPromise; 57 + } finally { 58 + inFlightFetches.delete(key); 59 + } 60 + } 61 + 62 + async function doFetchAndCache(did: string, rkey: string): Promise<boolean> { 63 + const lockKey = `on-demand-cache:${did}:${rkey}`; 64 + 65 + // Try to acquire a distributed lock 66 + const acquired = await tryAcquireLock(lockKey); 67 + if (!acquired) { 68 + console.log(`[OnDemandCache] Lock not acquired for ${did}/${rkey}, another instance is handling it`); 69 + return false; 70 + } 71 + 72 + try { 73 + console.log(`[OnDemandCache] Fetching missing site ${did}/${rkey}`); 74 + 75 + // Fetch site record from PDS 76 + const pdsEndpoint = await getPdsForDid(did); 77 + if (!pdsEndpoint) { 78 + console.error(`[OnDemandCache] Could not resolve PDS for ${did}`); 79 + return false; 80 + } 81 + 82 + const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(rkey)}`; 83 + 84 + let data: any; 85 + try { 86 + data = await safeFetchJson(recordUrl); 87 + } catch (err) { 88 + const msg = err instanceof Error ? err.message : String(err); 89 + if (msg.includes('HTTP 404') || msg.includes('Not Found')) { 90 + console.log(`[OnDemandCache] Site record not found on PDS: ${did}/${rkey}`); 91 + } else { 92 + console.error(`[OnDemandCache] Failed to fetch site record: ${did}/${rkey}`, msg); 93 + } 94 + return false; 95 + } 96 + 97 + const record = data.value as WispFsRecord; 98 + const recordCid = data.cid || ''; 99 + 100 + if (!record?.root?.entries) { 101 + console.error(`[OnDemandCache] Invalid record structure for ${did}/${rkey}`); 102 + return false; 103 + } 104 + 105 + // Expand subfs nodes 106 + const expandedRoot = await expandSubfsNodes(record.root, pdsEndpoint); 107 + 108 + // Validate limits 109 + const fileCount = countFilesInDirectory(expandedRoot); 110 + if (fileCount > MAX_FILE_COUNT) { 111 + console.error(`[OnDemandCache] Site exceeds file limit: ${fileCount} > ${MAX_FILE_COUNT}`); 112 + return false; 113 + } 114 + 115 + // Collect files 116 + const files = collectFileInfo(expandedRoot.entries); 117 + 118 + // Collect file CIDs for DB 119 + const fileCids: Record<string, string> = {}; 120 + collectFileCidsFromEntries(expandedRoot.entries, '', fileCids); 121 + 122 + // Download and write all files to local storage (hot + warm tiers) 123 + const CONCURRENCY = 10; 124 + let downloaded = 0; 125 + let failed = 0; 126 + 127 + for (let i = 0; i < files.length; i += CONCURRENCY) { 128 + const batch = files.slice(i, i + CONCURRENCY); 129 + const results = await Promise.allSettled( 130 + batch.map(file => downloadAndWriteBlob(did, rkey, file, pdsEndpoint)) 131 + ); 132 + 133 + for (const result of results) { 134 + if (result.status === 'fulfilled') { 135 + downloaded++; 136 + } else { 137 + failed++; 138 + console.error(`[OnDemandCache] Failed to download blob:`, result.reason); 139 + } 140 + } 141 + } 142 + 143 + console.log(`[OnDemandCache] Downloaded ${downloaded} files (${failed} failed) for ${did}/${rkey}`); 144 + 145 + // Update DB with file CIDs so future storage misses can be detected 146 + await upsertSiteCache(did, rkey, recordCid, fileCids); 147 + 148 + // Enqueue revalidate so firehose-service backfills S3 (cold tier) 149 + await enqueueRevalidate(did, rkey, `on-demand-cache`); 150 + 151 + console.log(`[OnDemandCache] Successfully cached site ${did}/${rkey}`); 152 + return downloaded > 0; 153 + } catch (err) { 154 + console.error(`[OnDemandCache] Error caching site ${did}/${rkey}:`, err); 155 + return false; 156 + } finally { 157 + await releaseLock(lockKey); 158 + } 159 + } 160 + 161 + function collectFileInfo(entries: Entry[], pathPrefix: string = ''): FileInfo[] { 162 + const files: FileInfo[] = []; 163 + 164 + for (const entry of entries) { 165 + const currentPath = pathPrefix ? `${pathPrefix}/${entry.name}` : entry.name; 166 + const node = entry.node; 167 + 168 + if ('type' in node && node.type === 'directory' && 'entries' in node) { 169 + files.push(...collectFileInfo(node.entries, currentPath)); 170 + } else if ('type' in node && node.type === 'file' && 'blob' in node) { 171 + const fileNode = node as File; 172 + const cid = extractBlobCid(fileNode.blob); 173 + if (cid) { 174 + files.push({ 175 + path: currentPath, 176 + cid, 177 + blob: fileNode.blob, 178 + encoding: fileNode.encoding, 179 + mimeType: fileNode.mimeType, 180 + base64: fileNode.base64, 181 + }); 182 + } 183 + } 184 + } 185 + 186 + return files; 187 + } 188 + 189 + async function downloadAndWriteBlob( 190 + did: string, 191 + rkey: string, 192 + file: FileInfo, 193 + pdsEndpoint: string 194 + ): Promise<void> { 195 + const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`; 196 + 197 + let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 }); 198 + let encoding = file.encoding; 199 + 200 + // Decode base64 if flagged 201 + if (file.base64) { 202 + const base64String = new TextDecoder().decode(content); 203 + content = Buffer.from(base64String, 'base64'); 204 + } 205 + 206 + // Decompress if needed and shouldn't stay compressed 207 + const shouldStayCompressed = shouldCompressMimeType(file.mimeType); 208 + 209 + if (encoding === 'gzip' && !shouldStayCompressed && content.length >= 2 && 210 + content[0] === 0x1f && content[1] === 0x8b) { 211 + try { 212 + content = gunzipSync(content); 213 + encoding = undefined; 214 + } catch { 215 + // Keep gzipped if decompression fails 216 + } 217 + } 218 + 219 + // If encoding is missing but data looks gzipped for a text-like file, mark it 220 + if (!encoding && isTextLikeMime(file.mimeType, file.path) && content.length >= 2 && 221 + content[0] === 0x1f && content[1] === 0x8b) { 222 + encoding = 'gzip'; 223 + } 224 + 225 + // Build storage key and metadata 226 + const key = `${did}/${rkey}/${file.path}`; 227 + const metadata: Record<string, string> = {}; 228 + if (encoding) metadata.encoding = encoding; 229 + if (file.mimeType) metadata.mimeType = file.mimeType; 230 + 231 + // Write to hot + warm tiers only (cold/S3 is read-only in hosting-service, 232 + // firehose-service will backfill via revalidate) 233 + await storage.set(key as any, content as any, { 234 + metadata, 235 + skipTiers: [], 236 + }); 237 + } 238 + 239 + function isTextLikeMime(mimeType?: string, path?: string): boolean { 240 + if (mimeType) { 241 + if (mimeType === 'text/html') return true; 242 + if (mimeType === 'text/css') return true; 243 + if (mimeType === 'text/javascript') return true; 244 + if (mimeType === 'application/javascript') return true; 245 + if (mimeType === 'application/json') return true; 246 + if (mimeType === 'application/xml') return true; 247 + if (mimeType === 'image/svg+xml') return true; 248 + } 249 + 250 + if (!path) return false; 251 + const lower = path.toLowerCase(); 252 + return lower.endsWith('.html') || 253 + lower.endsWith('.htm') || 254 + lower.endsWith('.css') || 255 + lower.endsWith('.js') || 256 + lower.endsWith('.json') || 257 + lower.endsWith('.xml') || 258 + lower.endsWith('.svg'); 259 + }
+7 -1
apps/hosting-service/src/lib/revalidate-queue.ts
··· 18 } 19 20 if (!client) { 21 client = new Redis(redisUrl, { 22 maxRetriesPerRequest: 2, 23 enableReadyCheck: true, ··· 26 client.on('error', (err) => { 27 console.error('[Revalidate] Redis error:', err); 28 }); 29 } 30 31 return client; ··· 46 47 try { 48 const dedupeKey = `revalidate:site:${did}:${rkey}`; 49 - const set = await redis.set(dedupeKey, '1', 'NX', 'EX', dedupeTtlSeconds); 50 if (!set) { 51 recordRevalidateResult('deduped'); 52 return { enqueued: false, result: 'deduped' }; ··· 65 Date.now().toString() 66 ); 67 68 recordRevalidateResult('enqueued'); 69 return { enqueued: true, result: 'enqueued' }; 70 } catch (err) {
··· 18 } 19 20 if (!client) { 21 + console.log(`[Revalidate] Connecting to Redis: ${redisUrl}`); 22 client = new Redis(redisUrl, { 23 maxRetriesPerRequest: 2, 24 enableReadyCheck: true, ··· 27 client.on('error', (err) => { 28 console.error('[Revalidate] Redis error:', err); 29 }); 30 + 31 + client.on('ready', () => { 32 + console.log(`[Revalidate] Redis connected, stream: ${streamName}`); 33 + }); 34 } 35 36 return client; ··· 51 52 try { 53 const dedupeKey = `revalidate:site:${did}:${rkey}`; 54 + const set = await redis.set(dedupeKey, '1', 'EX', dedupeTtlSeconds, 'NX'); 55 if (!set) { 56 recordRevalidateResult('deduped'); 57 return { enqueued: false, result: 'deduped' }; ··· 70 Date.now().toString() 71 ); 72 73 + console.log(`[Revalidate] Enqueued ${did}/${rkey} (${reason}) to ${streamName}`); 74 recordRevalidateResult('enqueued'); 75 return { enqueued: true, result: 'enqueued' }; 76 } catch (err) {
+36 -6
apps/hosting-service/src/lib/storage.ts
··· 59 60 constructor(private tier: StorageTier) {} 61 62 - // Read operations - pass through to underlying tier 63 async get(key: string) { 64 - return this.tier.get(key); 65 } 66 67 async getWithMetadata(key: string) { 68 - return this.tier.getWithMetadata?.(key) ?? null; 69 } 70 71 async getStream(key: string) { 72 - return this.tier.getStream?.(key) ?? null; 73 } 74 75 async exists(key: string) { 76 - return this.tier.exists(key); 77 } 78 79 async getMetadata(key: string) { 80 - return this.tier.getMetadata(key); 81 } 82 83 async *listKeys(prefix?: string) { ··· 111 112 async clear() { 113 this.logWriteSkip('clear', 'all keys'); 114 } 115 116 private logWriteSkip(operation: string, key: string) {
··· 59 60 constructor(private tier: StorageTier) {} 61 62 + // Read operations - pass through to underlying tier, catch errors as cache misses 63 async get(key: string) { 64 + try { 65 + return await this.tier.get(key); 66 + } catch (err) { 67 + this.logReadError('get', key, err); 68 + return null; 69 + } 70 } 71 72 async getWithMetadata(key: string) { 73 + try { 74 + return await this.tier.getWithMetadata?.(key) ?? null; 75 + } catch (err) { 76 + this.logReadError('getWithMetadata', key, err); 77 + return null; 78 + } 79 } 80 81 async getStream(key: string) { 82 + try { 83 + return await this.tier.getStream?.(key) ?? null; 84 + } catch (err) { 85 + this.logReadError('getStream', key, err); 86 + return null; 87 + } 88 } 89 90 async exists(key: string) { 91 + try { 92 + return await this.tier.exists(key); 93 + } catch (err) { 94 + this.logReadError('exists', key, err); 95 + return false; 96 + } 97 } 98 99 async getMetadata(key: string) { 100 + try { 101 + return await this.tier.getMetadata(key); 102 + } catch (err) { 103 + this.logReadError('getMetadata', key, err); 104 + return null; 105 + } 106 } 107 108 async *listKeys(prefix?: string) { ··· 136 137 async clear() { 138 this.logWriteSkip('clear', 'all keys'); 139 + } 140 + 141 + private logReadError(operation: string, key: string, err: unknown) { 142 + const msg = err instanceof Error ? err.message : String(err); 143 + console.warn(`[Storage] S3 read error (${operation}) for ${key}: ${msg}`); 144 } 145 146 private logWriteSkip(operation: string, key: string) {
+10 -2
apps/hosting-service/src/server.ts
··· 14 import { serveFromCache, serveFromCacheWithRewrite } from './lib/file-serving'; 15 import { getRevalidateMetrics } from './lib/revalidate-metrics'; 16 17 - const BASE_HOST = process.env.BASE_HOST || 'wisp.place'; 18 19 const app = new Hono(); 20 ··· 38 app.get('/*', async (c) => { 39 const url = new URL(c.req.url); 40 const hostname = c.req.header('host') || ''; 41 - const hostnameWithoutPort = hostname.split(':')[0]; 42 const rawPath = url.pathname.replace(/^\//, ''); 43 const path = sanitizePath(rawPath); 44 45 // Check if this is sites.wisp.place subdomain (strip port for comparison) 46 if (hostnameWithoutPort === `sites.${BASE_HOST}`) { ··· 76 const did = await resolveDid(identifier); 77 if (!did) { 78 return c.text('Invalid identifier', 400); 79 } 80 81 console.log(`[Server] sites.wisp.place request: identifier=${identifier}, site=${site}, filePath=${filePath}`);
··· 14 import { serveFromCache, serveFromCacheWithRewrite } from './lib/file-serving'; 15 import { getRevalidateMetrics } from './lib/revalidate-metrics'; 16 17 + const BASE_HOST_ENV = process.env.BASE_HOST || 'wisp.place'; 18 + const BASE_HOST = BASE_HOST_ENV.split(':')[0] || BASE_HOST_ENV; 19 20 const app = new Hono(); 21 ··· 39 app.get('/*', async (c) => { 40 const url = new URL(c.req.url); 41 const hostname = c.req.header('host') || ''; 42 + const hostnameWithoutPort = hostname.split(':')[0] || ''; 43 const rawPath = url.pathname.replace(/^\//, ''); 44 const path = sanitizePath(rawPath); 45 + 46 + console.log(`[Server] Request: host=${hostname} hostnameWithoutPort=${hostnameWithoutPort} path=${path} BASE_HOST=${BASE_HOST}`); 47 48 // Check if this is sites.wisp.place subdomain (strip port for comparison) 49 if (hostnameWithoutPort === `sites.${BASE_HOST}`) { ··· 79 const did = await resolveDid(identifier); 80 if (!did) { 81 return c.text('Invalid identifier', 400); 82 + } 83 + 84 + // Redirect to trailing slash when accessing site root so relative paths resolve correctly 85 + if (!filePath && !url.pathname.endsWith('/')) { 86 + return c.redirect(`${url.pathname}/${url.search}`, 301); 87 } 88 89 console.log(`[Server] sites.wisp.place request: identifier=${identifier}, site=${site}, filePath=${filePath}`);
+15
docker-compose.yml
··· 17 timeout: 5s 18 retries: 5 19 20 volumes: 21 postgres_data:
··· 17 timeout: 5s 18 retries: 5 19 20 + redis: 21 + image: redis:7-alpine 22 + container_name: wisp-redis 23 + restart: unless-stopped 24 + ports: 25 + - "6379:6379" 26 + volumes: 27 + - redis_data:/data 28 + healthcheck: 29 + test: ["CMD", "redis-cli", "ping"] 30 + interval: 5s 31 + timeout: 5s 32 + retries: 5 33 + 34 volumes: 35 postgres_data: 36 + redis_data:

History

4 rounds 1 comment
sign up or login to add to the discussion
7 commits
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
integrate observability package across hosting and firehose services
Dockerfile
fix storage-miss revalidation loop and tier reporting
1/1 failed
expand
expand 1 comment

this is what is life on us-east-1 right now. seems to be doing fine as of 2/6 10:38pm

pull request successfully merged
5 commits
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
integrate observability package across hosting and firehose services
1/1 failed
expand
expand 0 comments
4 commits
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
1/1 failed
expand
expand 0 comments
1 commit
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
1/1 failed
expand
expand 0 comments