Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place

hosting service writes on cache miss, firehose service properly notifies hosting service on new updates #6

merged opened by nekomimi.pet targeting main from hosting-service-fixes

actually forcing myself to develop good habits this year on my own projects because i deserve them

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:ttdrpj45ibqunmfhdsb4zdwq/sh.tangled.repo.pull/3merr52ear522
+540 -11
Diff #0
+27
apps/firehose-service/.env.example
···
··· 1 + # Database 2 + DATABASE_URL=postgres://postgres:postgres@localhost:5432/wisp 3 + 4 + # Firehose 5 + FIREHOSE_SERVICE=wss://bsky.network 6 + FIREHOSE_MAX_CONCURRENCY=5 7 + 8 + # Redis (cache invalidation + revalidation queue) 9 + REDIS_URL=redis://localhost:6379 10 + 11 + # S3 Storage (leave empty for local disk fallback) 12 + S3_BUCKET= 13 + S3_METADATA_BUCKET= 14 + S3_REGION=auto 15 + S3_ENDPOINT= 16 + S3_PREFIX=sites/ 17 + S3_FORCE_PATH_STYLE=true 18 + 19 + # AWS Credentials (required if using S3) 20 + AWS_ACCESS_KEY_ID= 21 + AWS_SECRET_ACCESS_KEY= 22 + 23 + # Health check server 24 + HEALTH_PORT=3001 25 + 26 + # For local disk fallback (when S3_BUCKET is empty) 27 + CACHE_DIR=./cache/sites
+2
apps/firehose-service/src/index.ts
··· 14 import { storage } from './lib/storage'; 15 import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer'; 16 import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker'; 17 18 const app = new Hono(); 19 ··· 41 42 stopFirehose(); 43 await stopRevalidateWorker(); 44 await closeDatabase(); 45 46 console.log('[Service] Shutdown complete');
··· 14 import { storage } from './lib/storage'; 15 import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer'; 16 import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker'; 17 + import { closeCacheInvalidationPublisher } from './lib/cache-invalidation'; 18 19 const app = new Hono(); 20 ··· 42 43 stopFirehose(); 44 await stopRevalidateWorker(); 45 + await closeCacheInvalidationPublisher(); 46 await closeDatabase(); 47 48 console.log('[Service] Shutdown complete');
+62
apps/firehose-service/src/lib/cache-invalidation.ts
···
··· 1 + /** 2 + * Cache invalidation publisher 3 + * 4 + * Publishes invalidation messages to Redis pub/sub so the hosting-service 5 + * can clear its local caches (tiered storage, redirect rules) when a site 6 + * is updated or deleted via the firehose. 7 + */ 8 + 9 + import Redis from 'ioredis'; 10 + import { config } from '../config'; 11 + 12 + const CHANNEL = 'wisp:cache-invalidate'; 13 + 14 + let publisher: Redis | null = null; 15 + let loggedMissingRedis = false; 16 + 17 + function getPublisher(): Redis | null { 18 + if (!config.redisUrl) { 19 + if (!loggedMissingRedis) { 20 + console.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation publishing disabled'); 21 + loggedMissingRedis = true; 22 + } 23 + return null; 24 + } 25 + 26 + if (!publisher) { 27 + publisher = new Redis(config.redisUrl, { 28 + maxRetriesPerRequest: 2, 29 + enableReadyCheck: true, 30 + }); 31 + 32 + publisher.on('error', (err) => { 33 + console.error('[CacheInvalidation] Redis error:', err); 34 + }); 35 + } 36 + 37 + return publisher; 38 + } 39 + 40 + export async function publishCacheInvalidation( 41 + did: string, 42 + rkey: string, 43 + action: 'update' | 'delete' | 'settings' 44 + ): Promise<void> { 45 + const redis = getPublisher(); 46 + if (!redis) return; 47 + 48 + try { 49 + const message = JSON.stringify({ did, rkey, action }); 50 + await redis.publish(CHANNEL, message); 51 + } catch (err) { 52 + console.error('[CacheInvalidation] Failed to publish:', err); 53 + } 54 + } 55 + 56 + export async function closeCacheInvalidationPublisher(): Promise<void> { 57 + if (publisher) { 58 + const toClose = publisher; 59 + publisher = null; 60 + await toClose.quit(); 61 + } 62 + }
+13
apps/firehose-service/src/lib/cache-writer.ts
··· 15 import { getSiteCache, upsertSiteCache, deleteSiteCache, upsertSiteSettingsCache, deleteSiteSettingsCache } from './db'; 16 import { rewriteHtmlPaths, isHtmlFile } from './html-rewriter'; 17 import { gunzipSync } from 'zlib'; 18 19 /** 20 * Fetch a site record from the PDS ··· 549 await handleSettingsUpdate(did, rkey, settingsRecord.record, settingsRecord.cid); 550 } 551 552 console.log(`[Cache] Successfully cached site ${did}/${rkey}`); 553 } 554 ··· 569 // Delete from DB 570 await deleteSiteCache(did, rkey); 571 572 console.log(`[Cache] Deleted site ${did}/${rkey} (${keys.length} files)`); 573 } 574 ··· 586 cleanUrls: settings.cleanUrls, 587 headers: settings.headers, 588 }); 589 } 590 591 /** ··· 594 export async function handleSettingsDelete(did: string, rkey: string): Promise<void> { 595 console.log(`[Cache] Deleting settings for ${did}/${rkey}`); 596 await deleteSiteSettingsCache(did, rkey); 597 }
··· 15 import { getSiteCache, upsertSiteCache, deleteSiteCache, upsertSiteSettingsCache, deleteSiteSettingsCache } from './db'; 16 import { rewriteHtmlPaths, isHtmlFile } from './html-rewriter'; 17 import { gunzipSync } from 'zlib'; 18 + import { publishCacheInvalidation } from './cache-invalidation'; 19 20 /** 21 * Fetch a site record from the PDS ··· 550 await handleSettingsUpdate(did, rkey, settingsRecord.record, settingsRecord.cid); 551 } 552 553 + // Notify hosting-service to invalidate its local caches 554 + await publishCacheInvalidation(did, rkey, 'update'); 555 + 556 console.log(`[Cache] Successfully cached site ${did}/${rkey}`); 557 } 558 ··· 573 // Delete from DB 574 await deleteSiteCache(did, rkey); 575 576 + // Notify hosting-service to invalidate its local caches 577 + await publishCacheInvalidation(did, rkey, 'delete'); 578 + 579 console.log(`[Cache] Deleted site ${did}/${rkey} (${keys.length} files)`); 580 } 581 ··· 593 cleanUrls: settings.cleanUrls, 594 headers: settings.headers, 595 }); 596 + 597 + // Notify hosting-service to invalidate its local caches (redirect rules depend on settings) 598 + await publishCacheInvalidation(did, rkey, 'settings'); 599 } 600 601 /** ··· 604 export async function handleSettingsDelete(did: string, rkey: string): Promise<void> { 605 console.log(`[Cache] Deleting settings for ${did}/${rkey}`); 606 await deleteSiteSettingsCache(did, rkey); 607 + 608 + // Notify hosting-service to invalidate its local caches 609 + await publishCacheInvalidation(did, rkey, 'settings'); 610 }
+3 -3
apps/firehose-service/src/lib/db.ts
··· 54 recordCid: string, 55 fileCids: Record<string, string> 56 ): Promise<void> { 57 - const fileCidsJson = fileCids ?? {}; 58 console.log(`[DB] upsertSiteCache starting for ${did}/${rkey}`); 59 try { 60 await sql` ··· 94 const directoryListing = settings.directoryListing ?? false; 95 const spaMode = settings.spaMode ?? null; 96 const custom404 = settings.custom404 ?? null; 97 - const indexFilesJson = settings.indexFiles ?? []; 98 const cleanUrls = settings.cleanUrls ?? true; 99 - const headersJson = settings.headers ?? []; 100 101 console.log(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, { 102 directoryListing,
··· 54 recordCid: string, 55 fileCids: Record<string, string> 56 ): Promise<void> { 57 + const fileCidsJson = JSON.stringify(fileCids ?? {}); 58 console.log(`[DB] upsertSiteCache starting for ${did}/${rkey}`); 59 try { 60 await sql` ··· 94 const directoryListing = settings.directoryListing ?? false; 95 const spaMode = settings.spaMode ?? null; 96 const custom404 = settings.custom404 ?? null; 97 + const indexFilesJson = JSON.stringify(settings.indexFiles ?? []); 98 const cleanUrls = settings.cleanUrls ?? true; 99 + const headersJson = JSON.stringify(settings.headers ?? []); 100 101 console.log(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, { 102 directoryListing,
+4 -4
apps/firehose-service/src/lib/revalidate-worker.ts
··· 114 'GROUP', 115 config.revalidateGroup, 116 consumerName, 117 - 'BLOCK', 118 - blockMs, 119 'COUNT', 120 batchSize, 121 'STREAMS', 122 config.revalidateStream, 123 '>' 124 - ); 125 126 if (!response) return; 127 128 for (const [, messages] of response) { 129 - await processMessages(messages as Array<[string, string[]]>); 130 } 131 } 132
··· 114 'GROUP', 115 config.revalidateGroup, 116 consumerName, 117 'COUNT', 118 batchSize, 119 + 'BLOCK', 120 + blockMs, 121 'STREAMS', 122 config.revalidateStream, 123 '>' 124 + ) as [string, Array<[string, string[]]>][] | null; 125 126 if (!response) return; 127 128 for (const [, messages] of response) { 129 + await processMessages(messages); 130 } 131 } 132
+18
apps/hosting-service/.env.example
··· 4 # Server 5 PORT=3001 6 BASE_HOST=wisp.place
··· 4 # Server 5 PORT=3001 6 BASE_HOST=wisp.place 7 + 8 + # Redis (cache invalidation + revalidation queue) 9 + REDIS_URL=redis://localhost:6379 10 + 11 + # S3 Storage (leave empty for local disk fallback) 12 + S3_BUCKET= 13 + S3_METADATA_BUCKET= 14 + S3_REGION=auto 15 + S3_ENDPOINT= 16 + S3_PREFIX=sites/ 17 + S3_FORCE_PATH_STYLE=true 18 + 19 + # AWS Credentials (required if using S3) 20 + AWS_ACCESS_KEY_ID= 21 + AWS_SECRET_ACCESS_KEY= 22 + 23 + # For local disk fallback (when S3_BUCKET is empty) 24 + CACHE_DIR=./cache/sites
+6
apps/hosting-service/src/index.ts
··· 4 import { mkdirSync, existsSync } from 'fs'; 5 import { startDomainCacheCleanup, stopDomainCacheCleanup, closeDatabase } from './lib/db'; 6 import { closeRevalidateQueue } from './lib/revalidate-queue'; 7 import { storage, getStorageConfig } from './lib/storage'; 8 9 // Initialize Grafana exporters if configured ··· 24 // Start domain cache cleanup 25 startDomainCacheCleanup(); 26 27 // Optional: Bootstrap hot cache from warm tier on startup 28 const BOOTSTRAP_HOT_ON_STARTUP = process.env.BOOTSTRAP_HOT_ON_STARTUP === 'true'; 29 const BOOTSTRAP_HOT_LIMIT = process.env.BOOTSTRAP_HOT_LIMIT ? parseInt(process.env.BOOTSTRAP_HOT_LIMIT) : 100; ··· 80 process.on('SIGINT', async () => { 81 console.log('\n๐Ÿ›‘ Shutting down...'); 82 stopDomainCacheCleanup(); 83 await closeRevalidateQueue(); 84 await closeDatabase(); 85 server.close(); ··· 89 process.on('SIGTERM', async () => { 90 console.log('\n๐Ÿ›‘ Shutting down...'); 91 stopDomainCacheCleanup(); 92 await closeRevalidateQueue(); 93 await closeDatabase(); 94 server.close();
··· 4 import { mkdirSync, existsSync } from 'fs'; 5 import { startDomainCacheCleanup, stopDomainCacheCleanup, closeDatabase } from './lib/db'; 6 import { closeRevalidateQueue } from './lib/revalidate-queue'; 7 + import { startCacheInvalidationSubscriber, stopCacheInvalidationSubscriber } from './lib/cache-invalidation'; 8 import { storage, getStorageConfig } from './lib/storage'; 9 10 // Initialize Grafana exporters if configured ··· 25 // Start domain cache cleanup 26 startDomainCacheCleanup(); 27 28 + // Start cache invalidation subscriber (listens for firehose-service updates via Redis pub/sub) 29 + startCacheInvalidationSubscriber(); 30 + 31 // Optional: Bootstrap hot cache from warm tier on startup 32 const BOOTSTRAP_HOT_ON_STARTUP = process.env.BOOTSTRAP_HOT_ON_STARTUP === 'true'; 33 const BOOTSTRAP_HOT_LIMIT = process.env.BOOTSTRAP_HOT_LIMIT ? parseInt(process.env.BOOTSTRAP_HOT_LIMIT) : 100; ··· 84 process.on('SIGINT', async () => { 85 console.log('\n๐Ÿ›‘ Shutting down...'); 86 stopDomainCacheCleanup(); 87 + await stopCacheInvalidationSubscriber(); 88 await closeRevalidateQueue(); 89 await closeDatabase(); 90 server.close(); ··· 94 process.on('SIGTERM', async () => { 95 console.log('\n๐Ÿ›‘ Shutting down...'); 96 stopDomainCacheCleanup(); 97 + await stopCacheInvalidationSubscriber(); 98 await closeRevalidateQueue(); 99 await closeDatabase(); 100 server.close();
+75
apps/hosting-service/src/lib/cache-invalidation.ts
···
··· 1 + /** 2 + * Cache invalidation subscriber 3 + * 4 + * Listens to Redis pub/sub for cache invalidation messages from the firehose-service. 5 + * When a site is updated/deleted, clears the hosting-service's local caches 6 + * (tiered storage hot+warm tiers, redirect rules) so stale data isn't served. 7 + */ 8 + 9 + import Redis from 'ioredis'; 10 + import { storage } from './storage'; 11 + import { clearRedirectRulesCache } from './site-cache'; 12 + 13 + const CHANNEL = 'wisp:cache-invalidate'; 14 + 15 + let subscriber: Redis | null = null; 16 + 17 + export function startCacheInvalidationSubscriber(): void { 18 + const redisUrl = process.env.REDIS_URL; 19 + if (!redisUrl) { 20 + console.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation disabled'); 21 + return; 22 + } 23 + 24 + subscriber = new Redis(redisUrl, { 25 + maxRetriesPerRequest: 2, 26 + enableReadyCheck: true, 27 + }); 28 + 29 + subscriber.on('error', (err) => { 30 + console.error('[CacheInvalidation] Redis error:', err); 31 + }); 32 + 33 + subscriber.subscribe(CHANNEL, (err) => { 34 + if (err) { 35 + console.error('[CacheInvalidation] Failed to subscribe:', err); 36 + } else { 37 + console.log('[CacheInvalidation] Subscribed to', CHANNEL); 38 + } 39 + }); 40 + 41 + subscriber.on('message', async (_channel: string, message: string) => { 42 + try { 43 + const { did, rkey, action } = JSON.parse(message) as { 44 + did: string; 45 + rkey: string; 46 + action: 'update' | 'delete' | 'settings'; 47 + }; 48 + 49 + if (!did || !rkey) { 50 + console.warn('[CacheInvalidation] Invalid message:', message); 51 + return; 52 + } 53 + 54 + console.log(`[CacheInvalidation] Invalidating ${did}/${rkey} (${action})`); 55 + 56 + // Clear tiered storage (hot + warm) for this site 57 + const prefix = `${did}/${rkey}/`; 58 + const deleted = await storage.invalidate(prefix); 59 + console.log(`[CacheInvalidation] Cleared ${deleted} keys from tiered storage for ${did}/${rkey}`); 60 + 61 + // Clear redirect rules cache 62 + clearRedirectRulesCache(did, rkey); 63 + } catch (err) { 64 + console.error('[CacheInvalidation] Error processing message:', err); 65 + } 66 + }); 67 + } 68 + 69 + export async function stopCacheInvalidationSubscriber(): Promise<void> { 70 + if (subscriber) { 71 + const toClose = subscriber; 72 + subscriber = null; 73 + await toClose.quit(); 74 + } 75 + }
+27
apps/hosting-service/src/lib/db.ts
··· 119 console.log('[DB] Read-only mode: skipping upsertSite', { did, rkey, displayName }); 120 } 121 122 export interface SiteRecord { 123 did: string; 124 rkey: string;
··· 119 console.log('[DB] Read-only mode: skipping upsertSite', { did, rkey, displayName }); 120 } 121 122 + /** 123 + * Upsert site cache entry (used by on-demand caching when a site is completely missing) 124 + */ 125 + export async function upsertSiteCache( 126 + did: string, 127 + rkey: string, 128 + recordCid: string, 129 + fileCids: Record<string, string> 130 + ): Promise<void> { 131 + const fileCidsJson = JSON.stringify(fileCids ?? {}); 132 + try { 133 + await sql` 134 + INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at) 135 + VALUES (${did}, ${rkey}, ${recordCid}, ${fileCidsJson}::jsonb, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW())) 136 + ON CONFLICT (did, rkey) 137 + DO UPDATE SET 138 + record_cid = EXCLUDED.record_cid, 139 + file_cids = EXCLUDED.file_cids, 140 + updated_at = EXTRACT(EPOCH FROM NOW()) 141 + `; 142 + } catch (err) { 143 + const error = err instanceof Error ? err : new Error(String(err)); 144 + console.error('[DB] upsertSiteCache error:', { did, rkey, error: error.message }); 145 + throw error; 146 + } 147 + } 148 + 149 export interface SiteRecord { 150 did: string; 151 rkey: string;
+27 -2
apps/hosting-service/src/lib/file-serving.ts
··· 18 import { enqueueRevalidate } from './revalidate-queue'; 19 import { recordStorageMiss } from './revalidate-metrics'; 20 import { normalizeFileCids } from '@wispplace/fs-utils'; 21 22 /** 23 * Helper to retrieve a file with metadata from tiered storage ··· 91 rkey: string, 92 filePath: string, 93 preferRewrittenHtml: boolean 94 - ): Promise<{ result: Awaited<ReturnType<typeof storage.getWithMetadata>>; filePath: string } | null> { 95 const mimeTypeGuess = lookup(filePath) || 'application/octet-stream'; 96 if (preferRewrittenHtml && isHtmlContent(filePath, mimeTypeGuess)) { 97 const rewrittenPath = `.rewritten/${filePath}`; ··· 107 } 108 109 function buildResponseFromStorageResult( 110 - result: Awaited<ReturnType<typeof storage.getWithMetadata>>, 111 filePath: string, 112 settings: WispSettings | null, 113 requestHeaders?: Record<string, string> ··· 148 return new Response(content, { headers }); 149 } 150 151 /** 152 * Helper to serve files from cache (for custom domains and subdomains) 153 */ ··· 158 fullUrl?: string, 159 headers?: Record<string, string> 160 ): Promise<Response> { 161 // Load settings for this site 162 const settings = await getCachedSettings(did, rkey); 163 const indexFiles = getIndexFiles(settings); ··· 445 fullUrl?: string, 446 headers?: Record<string, string> 447 ): Promise<Response> { 448 // Load settings for this site 449 const settings = await getCachedSettings(did, rkey); 450 const indexFiles = getIndexFiles(settings);
··· 18 import { enqueueRevalidate } from './revalidate-queue'; 19 import { recordStorageMiss } from './revalidate-metrics'; 20 import { normalizeFileCids } from '@wispplace/fs-utils'; 21 + import { fetchAndCacheSite } from './on-demand-cache'; 22 + import type { StorageResult } from '@wispplace/tiered-storage'; 23 + 24 + type FileStorageResult = StorageResult<Uint8Array>; 25 26 /** 27 * Helper to retrieve a file with metadata from tiered storage ··· 95 rkey: string, 96 filePath: string, 97 preferRewrittenHtml: boolean 98 + ): Promise<{ result: FileStorageResult; filePath: string } | null> { 99 const mimeTypeGuess = lookup(filePath) || 'application/octet-stream'; 100 if (preferRewrittenHtml && isHtmlContent(filePath, mimeTypeGuess)) { 101 const rewrittenPath = `.rewritten/${filePath}`; ··· 111 } 112 113 function buildResponseFromStorageResult( 114 + result: FileStorageResult, 115 filePath: string, 116 settings: WispSettings | null, 117 requestHeaders?: Record<string, string> ··· 152 return new Response(content, { headers }); 153 } 154 155 + /** 156 + * Ensure a site is cached locally. If the site has no DB entry (completely unknown), 157 + * attempt to fetch and cache it on-demand from the PDS. 158 + */ 159 + async function ensureSiteCached(did: string, rkey: string): Promise<void> { 160 + const existing = await getSiteCache(did, rkey); 161 + if (existing) return; // Site is known, proceed normally 162 + 163 + // Site is completely unknown โ€” try on-demand fetch 164 + console.log(`[FileServing] Site ${did}/${rkey} not in DB, attempting on-demand cache`); 165 + await fetchAndCacheSite(did, rkey); 166 + } 167 + 168 /** 169 * Helper to serve files from cache (for custom domains and subdomains) 170 */ ··· 175 fullUrl?: string, 176 headers?: Record<string, string> 177 ): Promise<Response> { 178 + // Check if this site is completely unknown (not in DB, no files in storage) 179 + // If so, attempt to fetch and cache it on-demand from the PDS 180 + await ensureSiteCached(did, rkey); 181 + 182 // Load settings for this site 183 const settings = await getCachedSettings(did, rkey); 184 const indexFiles = getIndexFiles(settings); ··· 466 fullUrl?: string, 467 headers?: Record<string, string> 468 ): Promise<Response> { 469 + // Check if this site is completely unknown (not in DB, no files in storage) 470 + // If so, attempt to fetch and cache it on-demand from the PDS 471 + await ensureSiteCached(did, rkey); 472 + 473 // Load settings for this site 474 const settings = await getCachedSettings(did, rkey); 475 const indexFiles = getIndexFiles(settings);
+259
apps/hosting-service/src/lib/on-demand-cache.ts
···
··· 1 + /** 2 + * On-demand site caching for the hosting service 3 + * 4 + * When a request hits a site that is completely missing (no DB entry, no files), 5 + * this module fetches the site record from the PDS, downloads all blobs, 6 + * writes them to local storage (hot + warm tiers), and updates the DB. 7 + * 8 + * This gives immediate serving capability. A revalidate is also enqueued 9 + * so the firehose-service backfills S3 (cold tier). 10 + */ 11 + 12 + import type { Record as WispFsRecord, Directory, Entry, File } from '@wispplace/lexicons/types/place/wisp/fs'; 13 + import { safeFetchJson, safeFetchBlob } from '@wispplace/safe-fetch'; 14 + import { extractBlobCid, getPdsForDid } from '@wispplace/atproto-utils'; 15 + import { shouldCompressMimeType } from '@wispplace/atproto-utils/compression'; 16 + import { collectFileCidsFromEntries, countFilesInDirectory } from '@wispplace/fs-utils'; 17 + import { MAX_BLOB_SIZE, MAX_FILE_COUNT, MAX_SITE_SIZE } from '@wispplace/constants'; 18 + import { expandSubfsNodes } from './utils'; 19 + import { storage } from './storage'; 20 + import { upsertSiteCache, tryAcquireLock, releaseLock } from './db'; 21 + import { enqueueRevalidate } from './revalidate-queue'; 22 + import { gunzipSync } from 'zlib'; 23 + 24 + // Track in-flight fetches to avoid duplicate work 25 + const inFlightFetches = new Map<string, Promise<boolean>>(); 26 + 27 + interface FileInfo { 28 + path: string; 29 + cid: string; 30 + blob: any; 31 + encoding?: 'gzip'; 32 + mimeType?: string; 33 + base64?: boolean; 34 + } 35 + 36 + /** 37 + * Attempt to fetch and cache a completely missing site on-demand. 38 + * Returns true if the site was successfully cached, false otherwise. 39 + * 40 + * Uses a distributed lock (pg advisory lock) to prevent multiple 41 + * hosting-service instances from fetching the same site simultaneously. 42 + */ 43 + export async function fetchAndCacheSite(did: string, rkey: string): Promise<boolean> { 44 + const key = `${did}:${rkey}`; 45 + 46 + // Check if there's already an in-flight fetch for this site 47 + const existing = inFlightFetches.get(key); 48 + if (existing) { 49 + return existing; 50 + } 51 + 52 + const fetchPromise = doFetchAndCache(did, rkey); 53 + inFlightFetches.set(key, fetchPromise); 54 + 55 + try { 56 + return await fetchPromise; 57 + } finally { 58 + inFlightFetches.delete(key); 59 + } 60 + } 61 + 62 + async function doFetchAndCache(did: string, rkey: string): Promise<boolean> { 63 + const lockKey = `on-demand-cache:${did}:${rkey}`; 64 + 65 + // Try to acquire a distributed lock 66 + const acquired = await tryAcquireLock(lockKey); 67 + if (!acquired) { 68 + console.log(`[OnDemandCache] Lock not acquired for ${did}/${rkey}, another instance is handling it`); 69 + return false; 70 + } 71 + 72 + try { 73 + console.log(`[OnDemandCache] Fetching missing site ${did}/${rkey}`); 74 + 75 + // Fetch site record from PDS 76 + const pdsEndpoint = await getPdsForDid(did); 77 + if (!pdsEndpoint) { 78 + console.error(`[OnDemandCache] Could not resolve PDS for ${did}`); 79 + return false; 80 + } 81 + 82 + const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(rkey)}`; 83 + 84 + let data: any; 85 + try { 86 + data = await safeFetchJson(recordUrl); 87 + } catch (err) { 88 + const msg = err instanceof Error ? err.message : String(err); 89 + if (msg.includes('HTTP 404') || msg.includes('Not Found')) { 90 + console.log(`[OnDemandCache] Site record not found on PDS: ${did}/${rkey}`); 91 + } else { 92 + console.error(`[OnDemandCache] Failed to fetch site record: ${did}/${rkey}`, msg); 93 + } 94 + return false; 95 + } 96 + 97 + const record = data.value as WispFsRecord; 98 + const recordCid = data.cid || ''; 99 + 100 + if (!record?.root?.entries) { 101 + console.error(`[OnDemandCache] Invalid record structure for ${did}/${rkey}`); 102 + return false; 103 + } 104 + 105 + // Expand subfs nodes 106 + const expandedRoot = await expandSubfsNodes(record.root, pdsEndpoint); 107 + 108 + // Validate limits 109 + const fileCount = countFilesInDirectory(expandedRoot); 110 + if (fileCount > MAX_FILE_COUNT) { 111 + console.error(`[OnDemandCache] Site exceeds file limit: ${fileCount} > ${MAX_FILE_COUNT}`); 112 + return false; 113 + } 114 + 115 + // Collect files 116 + const files = collectFileInfo(expandedRoot.entries); 117 + 118 + // Collect file CIDs for DB 119 + const fileCids: Record<string, string> = {}; 120 + collectFileCidsFromEntries(expandedRoot.entries, '', fileCids); 121 + 122 + // Download and write all files to local storage (hot + warm tiers) 123 + const CONCURRENCY = 10; 124 + let downloaded = 0; 125 + let failed = 0; 126 + 127 + for (let i = 0; i < files.length; i += CONCURRENCY) { 128 + const batch = files.slice(i, i + CONCURRENCY); 129 + const results = await Promise.allSettled( 130 + batch.map(file => downloadAndWriteBlob(did, rkey, file, pdsEndpoint)) 131 + ); 132 + 133 + for (const result of results) { 134 + if (result.status === 'fulfilled') { 135 + downloaded++; 136 + } else { 137 + failed++; 138 + console.error(`[OnDemandCache] Failed to download blob:`, result.reason); 139 + } 140 + } 141 + } 142 + 143 + console.log(`[OnDemandCache] Downloaded ${downloaded} files (${failed} failed) for ${did}/${rkey}`); 144 + 145 + // Update DB with file CIDs so future storage misses can be detected 146 + await upsertSiteCache(did, rkey, recordCid, fileCids); 147 + 148 + // Enqueue revalidate so firehose-service backfills S3 (cold tier) 149 + await enqueueRevalidate(did, rkey, `on-demand-cache`); 150 + 151 + console.log(`[OnDemandCache] Successfully cached site ${did}/${rkey}`); 152 + return downloaded > 0; 153 + } catch (err) { 154 + console.error(`[OnDemandCache] Error caching site ${did}/${rkey}:`, err); 155 + return false; 156 + } finally { 157 + await releaseLock(lockKey); 158 + } 159 + } 160 + 161 + function collectFileInfo(entries: Entry[], pathPrefix: string = ''): FileInfo[] { 162 + const files: FileInfo[] = []; 163 + 164 + for (const entry of entries) { 165 + const currentPath = pathPrefix ? `${pathPrefix}/${entry.name}` : entry.name; 166 + const node = entry.node; 167 + 168 + if ('type' in node && node.type === 'directory' && 'entries' in node) { 169 + files.push(...collectFileInfo(node.entries, currentPath)); 170 + } else if ('type' in node && node.type === 'file' && 'blob' in node) { 171 + const fileNode = node as File; 172 + const cid = extractBlobCid(fileNode.blob); 173 + if (cid) { 174 + files.push({ 175 + path: currentPath, 176 + cid, 177 + blob: fileNode.blob, 178 + encoding: fileNode.encoding, 179 + mimeType: fileNode.mimeType, 180 + base64: fileNode.base64, 181 + }); 182 + } 183 + } 184 + } 185 + 186 + return files; 187 + } 188 + 189 + async function downloadAndWriteBlob( 190 + did: string, 191 + rkey: string, 192 + file: FileInfo, 193 + pdsEndpoint: string 194 + ): Promise<void> { 195 + const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`; 196 + 197 + let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 }); 198 + let encoding = file.encoding; 199 + 200 + // Decode base64 if flagged 201 + if (file.base64) { 202 + const base64String = new TextDecoder().decode(content); 203 + content = Buffer.from(base64String, 'base64'); 204 + } 205 + 206 + // Decompress if needed and shouldn't stay compressed 207 + const shouldStayCompressed = shouldCompressMimeType(file.mimeType); 208 + 209 + if (encoding === 'gzip' && !shouldStayCompressed && content.length >= 2 && 210 + content[0] === 0x1f && content[1] === 0x8b) { 211 + try { 212 + content = gunzipSync(content); 213 + encoding = undefined; 214 + } catch { 215 + // Keep gzipped if decompression fails 216 + } 217 + } 218 + 219 + // If encoding is missing but data looks gzipped for a text-like file, mark it 220 + if (!encoding && isTextLikeMime(file.mimeType, file.path) && content.length >= 2 && 221 + content[0] === 0x1f && content[1] === 0x8b) { 222 + encoding = 'gzip'; 223 + } 224 + 225 + // Build storage key and metadata 226 + const key = `${did}/${rkey}/${file.path}`; 227 + const metadata: Record<string, string> = {}; 228 + if (encoding) metadata.encoding = encoding; 229 + if (file.mimeType) metadata.mimeType = file.mimeType; 230 + 231 + // Write to hot + warm tiers only (cold/S3 is read-only in hosting-service, 232 + // firehose-service will backfill via revalidate) 233 + await storage.set(key as any, content as any, { 234 + metadata, 235 + skipTiers: [], 236 + }); 237 + } 238 + 239 + function isTextLikeMime(mimeType?: string, path?: string): boolean { 240 + if (mimeType) { 241 + if (mimeType === 'text/html') return true; 242 + if (mimeType === 'text/css') return true; 243 + if (mimeType === 'text/javascript') return true; 244 + if (mimeType === 'application/javascript') return true; 245 + if (mimeType === 'application/json') return true; 246 + if (mimeType === 'application/xml') return true; 247 + if (mimeType === 'image/svg+xml') return true; 248 + } 249 + 250 + if (!path) return false; 251 + const lower = path.toLowerCase(); 252 + return lower.endsWith('.html') || 253 + lower.endsWith('.htm') || 254 + lower.endsWith('.css') || 255 + lower.endsWith('.js') || 256 + lower.endsWith('.json') || 257 + lower.endsWith('.xml') || 258 + lower.endsWith('.svg'); 259 + }
+1 -1
apps/hosting-service/src/lib/revalidate-queue.ts
··· 46 47 try { 48 const dedupeKey = `revalidate:site:${did}:${rkey}`; 49 - const set = await redis.set(dedupeKey, '1', 'NX', 'EX', dedupeTtlSeconds); 50 if (!set) { 51 recordRevalidateResult('deduped'); 52 return { enqueued: false, result: 'deduped' };
··· 46 47 try { 48 const dedupeKey = `revalidate:site:${did}:${rkey}`; 49 + const set = await redis.set(dedupeKey, '1', 'EX', dedupeTtlSeconds, 'NX'); 50 if (!set) { 51 recordRevalidateResult('deduped'); 52 return { enqueued: false, result: 'deduped' };
+1 -1
apps/hosting-service/src/server.ts
··· 38 app.get('/*', async (c) => { 39 const url = new URL(c.req.url); 40 const hostname = c.req.header('host') || ''; 41 - const hostnameWithoutPort = hostname.split(':')[0]; 42 const rawPath = url.pathname.replace(/^\//, ''); 43 const path = sanitizePath(rawPath); 44
··· 38 app.get('/*', async (c) => { 39 const url = new URL(c.req.url); 40 const hostname = c.req.header('host') || ''; 41 + const hostnameWithoutPort = hostname.split(':')[0] || ''; 42 const rawPath = url.pathname.replace(/^\//, ''); 43 const path = sanitizePath(rawPath); 44
+15
docker-compose.yml
··· 17 timeout: 5s 18 retries: 5 19 20 volumes: 21 postgres_data:
··· 17 timeout: 5s 18 retries: 5 19 20 + redis: 21 + image: redis:7-alpine 22 + container_name: wisp-redis 23 + restart: unless-stopped 24 + ports: 25 + - "6379:6379" 26 + volumes: 27 + - redis_data:/data 28 + healthcheck: 29 + test: ["CMD", "redis-cli", "ping"] 30 + interval: 5s 31 + timeout: 5s 32 + retries: 5 33 + 34 volumes: 35 postgres_data: 36 + redis_data:

History

4 rounds 1 comment
sign up or login to add to the discussion
7 commits
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
integrate observability package across hosting and firehose services
Dockerfile
fix storage-miss revalidation loop and tier reporting
1/1 failed
expand
expand 1 comment

this is what is life on us-east-1 right now. seems to be doing fine as of 2/6 10:38pm

pull request successfully merged
5 commits
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
integrate observability package across hosting and firehose services
1/1 failed
expand
expand 0 comments
4 commits
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
add redis connection and message logging
fix jsonb double-encoding, s3 error handling, base host routing
fix cache invalidation race, storage miss re-fetch, trailing slash redirect
1/1 failed
expand
expand 0 comments
nekomimi.pet submitted #0
1 commit
expand
hosting service writes on cache miss, firehose service properly notifies hosting service on new updates
1/1 failed
expand
expand 0 comments