Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.

hosting service writes on cache miss, firehose service properly notifies hosting service on new updates

authored by nekomimi.pet and committed by

Tangled f7471812 c6843336

+540 -11
+27
apps/firehose-service/.env.example
··· 1 + # Database 2 + DATABASE_URL=postgres://postgres:postgres@localhost:5432/wisp 3 + 4 + # Firehose 5 + FIREHOSE_SERVICE=wss://bsky.network 6 + FIREHOSE_MAX_CONCURRENCY=5 7 + 8 + # Redis (cache invalidation + revalidation queue) 9 + REDIS_URL=redis://localhost:6379 10 + 11 + # S3 Storage (leave empty for local disk fallback) 12 + S3_BUCKET= 13 + S3_METADATA_BUCKET= 14 + S3_REGION=auto 15 + S3_ENDPOINT= 16 + S3_PREFIX=sites/ 17 + S3_FORCE_PATH_STYLE=true 18 + 19 + # AWS Credentials (required if using S3) 20 + AWS_ACCESS_KEY_ID= 21 + AWS_SECRET_ACCESS_KEY= 22 + 23 + # Health check server 24 + HEALTH_PORT=3001 25 + 26 + # For local disk fallback (when S3_BUCKET is empty) 27 + CACHE_DIR=./cache/sites
+2
apps/firehose-service/src/index.ts
··· 14 14 import { storage } from './lib/storage'; 15 15 import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer'; 16 16 import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker'; 17 + import { closeCacheInvalidationPublisher } from './lib/cache-invalidation'; 17 18 18 19 const app = new Hono(); 19 20 ··· 41 42 42 43 stopFirehose(); 43 44 await stopRevalidateWorker(); 45 + await closeCacheInvalidationPublisher(); 44 46 await closeDatabase(); 45 47 46 48 console.log('[Service] Shutdown complete');
+62
apps/firehose-service/src/lib/cache-invalidation.ts
··· 1 + /** 2 + * Cache invalidation publisher 3 + * 4 + * Publishes invalidation messages to Redis pub/sub so the hosting-service 5 + * can clear its local caches (tiered storage, redirect rules) when a site 6 + * is updated or deleted via the firehose. 7 + */ 8 + 9 + import Redis from 'ioredis'; 10 + import { config } from '../config'; 11 + 12 + const CHANNEL = 'wisp:cache-invalidate'; 13 + 14 + let publisher: Redis | null = null; 15 + let loggedMissingRedis = false; 16 + 17 + function getPublisher(): Redis | null { 18 + if (!config.redisUrl) { 19 + if (!loggedMissingRedis) { 20 + console.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation publishing disabled'); 21 + loggedMissingRedis = true; 22 + } 23 + return null; 24 + } 25 + 26 + if (!publisher) { 27 + publisher = new Redis(config.redisUrl, { 28 + maxRetriesPerRequest: 2, 29 + enableReadyCheck: true, 30 + }); 31 + 32 + publisher.on('error', (err) => { 33 + console.error('[CacheInvalidation] Redis error:', err); 34 + }); 35 + } 36 + 37 + return publisher; 38 + } 39 + 40 + export async function publishCacheInvalidation( 41 + did: string, 42 + rkey: string, 43 + action: 'update' | 'delete' | 'settings' 44 + ): Promise<void> { 45 + const redis = getPublisher(); 46 + if (!redis) return; 47 + 48 + try { 49 + const message = JSON.stringify({ did, rkey, action }); 50 + await redis.publish(CHANNEL, message); 51 + } catch (err) { 52 + console.error('[CacheInvalidation] Failed to publish:', err); 53 + } 54 + } 55 + 56 + export async function closeCacheInvalidationPublisher(): Promise<void> { 57 + if (publisher) { 58 + const toClose = publisher; 59 + publisher = null; 60 + await toClose.quit(); 61 + } 62 + }
+13
apps/firehose-service/src/lib/cache-writer.ts
··· 15 15 import { getSiteCache, upsertSiteCache, deleteSiteCache, upsertSiteSettingsCache, deleteSiteSettingsCache } from './db'; 16 16 import { rewriteHtmlPaths, isHtmlFile } from './html-rewriter'; 17 17 import { gunzipSync } from 'zlib'; 18 + import { publishCacheInvalidation } from './cache-invalidation'; 18 19 19 20 /** 20 21 * Fetch a site record from the PDS ··· 549 550 await handleSettingsUpdate(did, rkey, settingsRecord.record, settingsRecord.cid); 550 551 } 551 552 553 + // Notify hosting-service to invalidate its local caches 554 + await publishCacheInvalidation(did, rkey, 'update'); 555 + 552 556 console.log(`[Cache] Successfully cached site ${did}/${rkey}`); 553 557 } 554 558 ··· 568 572 569 573 // Delete from DB 570 574 await deleteSiteCache(did, rkey); 575 + 576 + // Notify hosting-service to invalidate its local caches 577 + await publishCacheInvalidation(did, rkey, 'delete'); 571 578 572 579 console.log(`[Cache] Deleted site ${did}/${rkey} (${keys.length} files)`); 573 580 } ··· 586 593 cleanUrls: settings.cleanUrls, 587 594 headers: settings.headers, 588 595 }); 596 + 597 + // Notify hosting-service to invalidate its local caches (redirect rules depend on settings) 598 + await publishCacheInvalidation(did, rkey, 'settings'); 589 599 } 590 600 591 601 /** ··· 594 604 export async function handleSettingsDelete(did: string, rkey: string): Promise<void> { 595 605 console.log(`[Cache] Deleting settings for ${did}/${rkey}`); 596 606 await deleteSiteSettingsCache(did, rkey); 607 + 608 + // Notify hosting-service to invalidate its local caches 609 + await publishCacheInvalidation(did, rkey, 'settings'); 597 610 }
+3 -3
apps/firehose-service/src/lib/db.ts
··· 54 54 recordCid: string, 55 55 fileCids: Record<string, string> 56 56 ): Promise<void> { 57 - const fileCidsJson = fileCids ?? {}; 57 + const fileCidsJson = JSON.stringify(fileCids ?? {}); 58 58 console.log(`[DB] upsertSiteCache starting for ${did}/${rkey}`); 59 59 try { 60 60 await sql` ··· 94 94 const directoryListing = settings.directoryListing ?? false; 95 95 const spaMode = settings.spaMode ?? null; 96 96 const custom404 = settings.custom404 ?? null; 97 - const indexFilesJson = settings.indexFiles ?? []; 97 + const indexFilesJson = JSON.stringify(settings.indexFiles ?? []); 98 98 const cleanUrls = settings.cleanUrls ?? true; 99 - const headersJson = settings.headers ?? []; 99 + const headersJson = JSON.stringify(settings.headers ?? []); 100 100 101 101 console.log(`[DB] upsertSiteSettingsCache starting for ${did}/${rkey}`, { 102 102 directoryListing,
+4 -4
apps/firehose-service/src/lib/revalidate-worker.ts
··· 114 114 'GROUP', 115 115 config.revalidateGroup, 116 116 consumerName, 117 + 'COUNT', 118 + batchSize, 117 119 'BLOCK', 118 120 blockMs, 119 - 'COUNT', 120 - batchSize, 121 121 'STREAMS', 122 122 config.revalidateStream, 123 123 '>' 124 - ); 124 + ) as [string, Array<[string, string[]]>][] | null; 125 125 126 126 if (!response) return; 127 127 128 128 for (const [, messages] of response) { 129 - await processMessages(messages as Array<[string, string[]]>); 129 + await processMessages(messages); 130 130 } 131 131 } 132 132
+18
apps/hosting-service/.env.example
··· 4 4 # Server 5 5 PORT=3001 6 6 BASE_HOST=wisp.place 7 + 8 + # Redis (cache invalidation + revalidation queue) 9 + REDIS_URL=redis://localhost:6379 10 + 11 + # S3 Storage (leave empty for local disk fallback) 12 + S3_BUCKET= 13 + S3_METADATA_BUCKET= 14 + S3_REGION=auto 15 + S3_ENDPOINT= 16 + S3_PREFIX=sites/ 17 + S3_FORCE_PATH_STYLE=true 18 + 19 + # AWS Credentials (required if using S3) 20 + AWS_ACCESS_KEY_ID= 21 + AWS_SECRET_ACCESS_KEY= 22 + 23 + # For local disk fallback (when S3_BUCKET is empty) 24 + CACHE_DIR=./cache/sites
+6
apps/hosting-service/src/index.ts
··· 4 4 import { mkdirSync, existsSync } from 'fs'; 5 5 import { startDomainCacheCleanup, stopDomainCacheCleanup, closeDatabase } from './lib/db'; 6 6 import { closeRevalidateQueue } from './lib/revalidate-queue'; 7 + import { startCacheInvalidationSubscriber, stopCacheInvalidationSubscriber } from './lib/cache-invalidation'; 7 8 import { storage, getStorageConfig } from './lib/storage'; 8 9 9 10 // Initialize Grafana exporters if configured ··· 23 24 24 25 // Start domain cache cleanup 25 26 startDomainCacheCleanup(); 27 + 28 + // Start cache invalidation subscriber (listens for firehose-service updates via Redis pub/sub) 29 + startCacheInvalidationSubscriber(); 26 30 27 31 // Optional: Bootstrap hot cache from warm tier on startup 28 32 const BOOTSTRAP_HOT_ON_STARTUP = process.env.BOOTSTRAP_HOT_ON_STARTUP === 'true'; ··· 80 84 process.on('SIGINT', async () => { 81 85 console.log('\n🛑 Shutting down...'); 82 86 stopDomainCacheCleanup(); 87 + await stopCacheInvalidationSubscriber(); 83 88 await closeRevalidateQueue(); 84 89 await closeDatabase(); 85 90 server.close(); ··· 89 94 process.on('SIGTERM', async () => { 90 95 console.log('\n🛑 Shutting down...'); 91 96 stopDomainCacheCleanup(); 97 + await stopCacheInvalidationSubscriber(); 92 98 await closeRevalidateQueue(); 93 99 await closeDatabase(); 94 100 server.close();
+75
apps/hosting-service/src/lib/cache-invalidation.ts
··· 1 + /** 2 + * Cache invalidation subscriber 3 + * 4 + * Listens to Redis pub/sub for cache invalidation messages from the firehose-service. 5 + * When a site is updated/deleted, clears the hosting-service's local caches 6 + * (tiered storage hot+warm tiers, redirect rules) so stale data isn't served. 7 + */ 8 + 9 + import Redis from 'ioredis'; 10 + import { storage } from './storage'; 11 + import { clearRedirectRulesCache } from './site-cache'; 12 + 13 + const CHANNEL = 'wisp:cache-invalidate'; 14 + 15 + let subscriber: Redis | null = null; 16 + 17 + export function startCacheInvalidationSubscriber(): void { 18 + const redisUrl = process.env.REDIS_URL; 19 + if (!redisUrl) { 20 + console.warn('[CacheInvalidation] REDIS_URL not set; cache invalidation disabled'); 21 + return; 22 + } 23 + 24 + subscriber = new Redis(redisUrl, { 25 + maxRetriesPerRequest: 2, 26 + enableReadyCheck: true, 27 + }); 28 + 29 + subscriber.on('error', (err) => { 30 + console.error('[CacheInvalidation] Redis error:', err); 31 + }); 32 + 33 + subscriber.subscribe(CHANNEL, (err) => { 34 + if (err) { 35 + console.error('[CacheInvalidation] Failed to subscribe:', err); 36 + } else { 37 + console.log('[CacheInvalidation] Subscribed to', CHANNEL); 38 + } 39 + }); 40 + 41 + subscriber.on('message', async (_channel: string, message: string) => { 42 + try { 43 + const { did, rkey, action } = JSON.parse(message) as { 44 + did: string; 45 + rkey: string; 46 + action: 'update' | 'delete' | 'settings'; 47 + }; 48 + 49 + if (!did || !rkey) { 50 + console.warn('[CacheInvalidation] Invalid message:', message); 51 + return; 52 + } 53 + 54 + console.log(`[CacheInvalidation] Invalidating ${did}/${rkey} (${action})`); 55 + 56 + // Clear tiered storage (hot + warm) for this site 57 + const prefix = `${did}/${rkey}/`; 58 + const deleted = await storage.invalidate(prefix); 59 + console.log(`[CacheInvalidation] Cleared ${deleted} keys from tiered storage for ${did}/${rkey}`); 60 + 61 + // Clear redirect rules cache 62 + clearRedirectRulesCache(did, rkey); 63 + } catch (err) { 64 + console.error('[CacheInvalidation] Error processing message:', err); 65 + } 66 + }); 67 + } 68 + 69 + export async function stopCacheInvalidationSubscriber(): Promise<void> { 70 + if (subscriber) { 71 + const toClose = subscriber; 72 + subscriber = null; 73 + await toClose.quit(); 74 + } 75 + }
+27
apps/hosting-service/src/lib/db.ts
··· 119 119 console.log('[DB] Read-only mode: skipping upsertSite', { did, rkey, displayName }); 120 120 } 121 121 122 + /** 123 + * Upsert site cache entry (used by on-demand caching when a site is completely missing) 124 + */ 125 + export async function upsertSiteCache( 126 + did: string, 127 + rkey: string, 128 + recordCid: string, 129 + fileCids: Record<string, string> 130 + ): Promise<void> { 131 + const fileCidsJson = JSON.stringify(fileCids ?? {}); 132 + try { 133 + await sql` 134 + INSERT INTO site_cache (did, rkey, record_cid, file_cids, cached_at, updated_at) 135 + VALUES (${did}, ${rkey}, ${recordCid}, ${fileCidsJson}::jsonb, EXTRACT(EPOCH FROM NOW()), EXTRACT(EPOCH FROM NOW())) 136 + ON CONFLICT (did, rkey) 137 + DO UPDATE SET 138 + record_cid = EXCLUDED.record_cid, 139 + file_cids = EXCLUDED.file_cids, 140 + updated_at = EXTRACT(EPOCH FROM NOW()) 141 + `; 142 + } catch (err) { 143 + const error = err instanceof Error ? err : new Error(String(err)); 144 + console.error('[DB] upsertSiteCache error:', { did, rkey, error: error.message }); 145 + throw error; 146 + } 147 + } 148 + 122 149 export interface SiteRecord { 123 150 did: string; 124 151 rkey: string;
+27 -2
apps/hosting-service/src/lib/file-serving.ts
··· 18 18 import { enqueueRevalidate } from './revalidate-queue'; 19 19 import { recordStorageMiss } from './revalidate-metrics'; 20 20 import { normalizeFileCids } from '@wispplace/fs-utils'; 21 + import { fetchAndCacheSite } from './on-demand-cache'; 22 + import type { StorageResult } from '@wispplace/tiered-storage'; 23 + 24 + type FileStorageResult = StorageResult<Uint8Array>; 21 25 22 26 /** 23 27 * Helper to retrieve a file with metadata from tiered storage ··· 91 95 rkey: string, 92 96 filePath: string, 93 97 preferRewrittenHtml: boolean 94 - ): Promise<{ result: Awaited<ReturnType<typeof storage.getWithMetadata>>; filePath: string } | null> { 98 + ): Promise<{ result: FileStorageResult; filePath: string } | null> { 95 99 const mimeTypeGuess = lookup(filePath) || 'application/octet-stream'; 96 100 if (preferRewrittenHtml && isHtmlContent(filePath, mimeTypeGuess)) { 97 101 const rewrittenPath = `.rewritten/${filePath}`; ··· 107 111 } 108 112 109 113 function buildResponseFromStorageResult( 110 - result: Awaited<ReturnType<typeof storage.getWithMetadata>>, 114 + result: FileStorageResult, 111 115 filePath: string, 112 116 settings: WispSettings | null, 113 117 requestHeaders?: Record<string, string> ··· 149 153 } 150 154 151 155 /** 156 + * Ensure a site is cached locally. If the site has no DB entry (completely unknown), 157 + * attempt to fetch and cache it on-demand from the PDS. 158 + */ 159 + async function ensureSiteCached(did: string, rkey: string): Promise<void> { 160 + const existing = await getSiteCache(did, rkey); 161 + if (existing) return; // Site is known, proceed normally 162 + 163 + // Site is completely unknown — try on-demand fetch 164 + console.log(`[FileServing] Site ${did}/${rkey} not in DB, attempting on-demand cache`); 165 + await fetchAndCacheSite(did, rkey); 166 + } 167 + 168 + /** 152 169 * Helper to serve files from cache (for custom domains and subdomains) 153 170 */ 154 171 export async function serveFromCache( ··· 158 175 fullUrl?: string, 159 176 headers?: Record<string, string> 160 177 ): Promise<Response> { 178 + // Check if this site is completely unknown (not in DB, no files in storage) 179 + // If so, attempt to fetch and cache it on-demand from the PDS 180 + await ensureSiteCached(did, rkey); 181 + 161 182 // Load settings for this site 162 183 const settings = await getCachedSettings(did, rkey); 163 184 const indexFiles = getIndexFiles(settings); ··· 445 466 fullUrl?: string, 446 467 headers?: Record<string, string> 447 468 ): Promise<Response> { 469 + // Check if this site is completely unknown (not in DB, no files in storage) 470 + // If so, attempt to fetch and cache it on-demand from the PDS 471 + await ensureSiteCached(did, rkey); 472 + 448 473 // Load settings for this site 449 474 const settings = await getCachedSettings(did, rkey); 450 475 const indexFiles = getIndexFiles(settings);
+259
apps/hosting-service/src/lib/on-demand-cache.ts
··· 1 + /** 2 + * On-demand site caching for the hosting service 3 + * 4 + * When a request hits a site that is completely missing (no DB entry, no files), 5 + * this module fetches the site record from the PDS, downloads all blobs, 6 + * writes them to local storage (hot + warm tiers), and updates the DB. 7 + * 8 + * This gives immediate serving capability. A revalidate is also enqueued 9 + * so the firehose-service backfills S3 (cold tier). 10 + */ 11 + 12 + import type { Record as WispFsRecord, Directory, Entry, File } from '@wispplace/lexicons/types/place/wisp/fs'; 13 + import { safeFetchJson, safeFetchBlob } from '@wispplace/safe-fetch'; 14 + import { extractBlobCid, getPdsForDid } from '@wispplace/atproto-utils'; 15 + import { shouldCompressMimeType } from '@wispplace/atproto-utils/compression'; 16 + import { collectFileCidsFromEntries, countFilesInDirectory } from '@wispplace/fs-utils'; 17 + import { MAX_BLOB_SIZE, MAX_FILE_COUNT, MAX_SITE_SIZE } from '@wispplace/constants'; 18 + import { expandSubfsNodes } from './utils'; 19 + import { storage } from './storage'; 20 + import { upsertSiteCache, tryAcquireLock, releaseLock } from './db'; 21 + import { enqueueRevalidate } from './revalidate-queue'; 22 + import { gunzipSync } from 'zlib'; 23 + 24 + // Track in-flight fetches to avoid duplicate work 25 + const inFlightFetches = new Map<string, Promise<boolean>>(); 26 + 27 + interface FileInfo { 28 + path: string; 29 + cid: string; 30 + blob: any; 31 + encoding?: 'gzip'; 32 + mimeType?: string; 33 + base64?: boolean; 34 + } 35 + 36 + /** 37 + * Attempt to fetch and cache a completely missing site on-demand. 38 + * Returns true if the site was successfully cached, false otherwise. 39 + * 40 + * Uses a distributed lock (pg advisory lock) to prevent multiple 41 + * hosting-service instances from fetching the same site simultaneously. 42 + */ 43 + export async function fetchAndCacheSite(did: string, rkey: string): Promise<boolean> { 44 + const key = `${did}:${rkey}`; 45 + 46 + // Check if there's already an in-flight fetch for this site 47 + const existing = inFlightFetches.get(key); 48 + if (existing) { 49 + return existing; 50 + } 51 + 52 + const fetchPromise = doFetchAndCache(did, rkey); 53 + inFlightFetches.set(key, fetchPromise); 54 + 55 + try { 56 + return await fetchPromise; 57 + } finally { 58 + inFlightFetches.delete(key); 59 + } 60 + } 61 + 62 + async function doFetchAndCache(did: string, rkey: string): Promise<boolean> { 63 + const lockKey = `on-demand-cache:${did}:${rkey}`; 64 + 65 + // Try to acquire a distributed lock 66 + const acquired = await tryAcquireLock(lockKey); 67 + if (!acquired) { 68 + console.log(`[OnDemandCache] Lock not acquired for ${did}/${rkey}, another instance is handling it`); 69 + return false; 70 + } 71 + 72 + try { 73 + console.log(`[OnDemandCache] Fetching missing site ${did}/${rkey}`); 74 + 75 + // Fetch site record from PDS 76 + const pdsEndpoint = await getPdsForDid(did); 77 + if (!pdsEndpoint) { 78 + console.error(`[OnDemandCache] Could not resolve PDS for ${did}`); 79 + return false; 80 + } 81 + 82 + const recordUrl = `${pdsEndpoint}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(did)}&collection=place.wisp.fs&rkey=${encodeURIComponent(rkey)}`; 83 + 84 + let data: any; 85 + try { 86 + data = await safeFetchJson(recordUrl); 87 + } catch (err) { 88 + const msg = err instanceof Error ? err.message : String(err); 89 + if (msg.includes('HTTP 404') || msg.includes('Not Found')) { 90 + console.log(`[OnDemandCache] Site record not found on PDS: ${did}/${rkey}`); 91 + } else { 92 + console.error(`[OnDemandCache] Failed to fetch site record: ${did}/${rkey}`, msg); 93 + } 94 + return false; 95 + } 96 + 97 + const record = data.value as WispFsRecord; 98 + const recordCid = data.cid || ''; 99 + 100 + if (!record?.root?.entries) { 101 + console.error(`[OnDemandCache] Invalid record structure for ${did}/${rkey}`); 102 + return false; 103 + } 104 + 105 + // Expand subfs nodes 106 + const expandedRoot = await expandSubfsNodes(record.root, pdsEndpoint); 107 + 108 + // Validate limits 109 + const fileCount = countFilesInDirectory(expandedRoot); 110 + if (fileCount > MAX_FILE_COUNT) { 111 + console.error(`[OnDemandCache] Site exceeds file limit: ${fileCount} > ${MAX_FILE_COUNT}`); 112 + return false; 113 + } 114 + 115 + // Collect files 116 + const files = collectFileInfo(expandedRoot.entries); 117 + 118 + // Collect file CIDs for DB 119 + const fileCids: Record<string, string> = {}; 120 + collectFileCidsFromEntries(expandedRoot.entries, '', fileCids); 121 + 122 + // Download and write all files to local storage (hot + warm tiers) 123 + const CONCURRENCY = 10; 124 + let downloaded = 0; 125 + let failed = 0; 126 + 127 + for (let i = 0; i < files.length; i += CONCURRENCY) { 128 + const batch = files.slice(i, i + CONCURRENCY); 129 + const results = await Promise.allSettled( 130 + batch.map(file => downloadAndWriteBlob(did, rkey, file, pdsEndpoint)) 131 + ); 132 + 133 + for (const result of results) { 134 + if (result.status === 'fulfilled') { 135 + downloaded++; 136 + } else { 137 + failed++; 138 + console.error(`[OnDemandCache] Failed to download blob:`, result.reason); 139 + } 140 + } 141 + } 142 + 143 + console.log(`[OnDemandCache] Downloaded ${downloaded} files (${failed} failed) for ${did}/${rkey}`); 144 + 145 + // Update DB with file CIDs so future storage misses can be detected 146 + await upsertSiteCache(did, rkey, recordCid, fileCids); 147 + 148 + // Enqueue revalidate so firehose-service backfills S3 (cold tier) 149 + await enqueueRevalidate(did, rkey, `on-demand-cache`); 150 + 151 + console.log(`[OnDemandCache] Successfully cached site ${did}/${rkey}`); 152 + return downloaded > 0; 153 + } catch (err) { 154 + console.error(`[OnDemandCache] Error caching site ${did}/${rkey}:`, err); 155 + return false; 156 + } finally { 157 + await releaseLock(lockKey); 158 + } 159 + } 160 + 161 + function collectFileInfo(entries: Entry[], pathPrefix: string = ''): FileInfo[] { 162 + const files: FileInfo[] = []; 163 + 164 + for (const entry of entries) { 165 + const currentPath = pathPrefix ? `${pathPrefix}/${entry.name}` : entry.name; 166 + const node = entry.node; 167 + 168 + if ('type' in node && node.type === 'directory' && 'entries' in node) { 169 + files.push(...collectFileInfo(node.entries, currentPath)); 170 + } else if ('type' in node && node.type === 'file' && 'blob' in node) { 171 + const fileNode = node as File; 172 + const cid = extractBlobCid(fileNode.blob); 173 + if (cid) { 174 + files.push({ 175 + path: currentPath, 176 + cid, 177 + blob: fileNode.blob, 178 + encoding: fileNode.encoding, 179 + mimeType: fileNode.mimeType, 180 + base64: fileNode.base64, 181 + }); 182 + } 183 + } 184 + } 185 + 186 + return files; 187 + } 188 + 189 + async function downloadAndWriteBlob( 190 + did: string, 191 + rkey: string, 192 + file: FileInfo, 193 + pdsEndpoint: string 194 + ): Promise<void> { 195 + const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`; 196 + 197 + let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 }); 198 + let encoding = file.encoding; 199 + 200 + // Decode base64 if flagged 201 + if (file.base64) { 202 + const base64String = new TextDecoder().decode(content); 203 + content = Buffer.from(base64String, 'base64'); 204 + } 205 + 206 + // Decompress if needed and shouldn't stay compressed 207 + const shouldStayCompressed = shouldCompressMimeType(file.mimeType); 208 + 209 + if (encoding === 'gzip' && !shouldStayCompressed && content.length >= 2 && 210 + content[0] === 0x1f && content[1] === 0x8b) { 211 + try { 212 + content = gunzipSync(content); 213 + encoding = undefined; 214 + } catch { 215 + // Keep gzipped if decompression fails 216 + } 217 + } 218 + 219 + // If encoding is missing but data looks gzipped for a text-like file, mark it 220 + if (!encoding && isTextLikeMime(file.mimeType, file.path) && content.length >= 2 && 221 + content[0] === 0x1f && content[1] === 0x8b) { 222 + encoding = 'gzip'; 223 + } 224 + 225 + // Build storage key and metadata 226 + const key = `${did}/${rkey}/${file.path}`; 227 + const metadata: Record<string, string> = {}; 228 + if (encoding) metadata.encoding = encoding; 229 + if (file.mimeType) metadata.mimeType = file.mimeType; 230 + 231 + // Write to hot + warm tiers only (cold/S3 is read-only in hosting-service, 232 + // firehose-service will backfill via revalidate) 233 + await storage.set(key as any, content as any, { 234 + metadata, 235 + skipTiers: [], 236 + }); 237 + } 238 + 239 + function isTextLikeMime(mimeType?: string, path?: string): boolean { 240 + if (mimeType) { 241 + if (mimeType === 'text/html') return true; 242 + if (mimeType === 'text/css') return true; 243 + if (mimeType === 'text/javascript') return true; 244 + if (mimeType === 'application/javascript') return true; 245 + if (mimeType === 'application/json') return true; 246 + if (mimeType === 'application/xml') return true; 247 + if (mimeType === 'image/svg+xml') return true; 248 + } 249 + 250 + if (!path) return false; 251 + const lower = path.toLowerCase(); 252 + return lower.endsWith('.html') || 253 + lower.endsWith('.htm') || 254 + lower.endsWith('.css') || 255 + lower.endsWith('.js') || 256 + lower.endsWith('.json') || 257 + lower.endsWith('.xml') || 258 + lower.endsWith('.svg'); 259 + }
+1 -1
apps/hosting-service/src/lib/revalidate-queue.ts
··· 46 46 47 47 try { 48 48 const dedupeKey = `revalidate:site:${did}:${rkey}`; 49 - const set = await redis.set(dedupeKey, '1', 'NX', 'EX', dedupeTtlSeconds); 49 + const set = await redis.set(dedupeKey, '1', 'EX', dedupeTtlSeconds, 'NX'); 50 50 if (!set) { 51 51 recordRevalidateResult('deduped'); 52 52 return { enqueued: false, result: 'deduped' };
+1 -1
apps/hosting-service/src/server.ts
··· 38 38 app.get('/*', async (c) => { 39 39 const url = new URL(c.req.url); 40 40 const hostname = c.req.header('host') || ''; 41 - const hostnameWithoutPort = hostname.split(':')[0]; 41 + const hostnameWithoutPort = hostname.split(':')[0] || ''; 42 42 const rawPath = url.pathname.replace(/^\//, ''); 43 43 const path = sanitizePath(rawPath); 44 44
+15
docker-compose.yml
··· 17 17 timeout: 5s 18 18 retries: 5 19 19 20 + redis: 21 + image: redis:7-alpine 22 + container_name: wisp-redis 23 + restart: unless-stopped 24 + ports: 25 + - "6379:6379" 26 + volumes: 27 + - redis_data:/data 28 + healthcheck: 29 + test: ["CMD", "redis-cli", "ping"] 30 + interval: 5s 31 + timeout: 5s 32 + retries: 5 33 + 20 34 volumes: 21 35 postgres_data: 36 + redis_data: