📂🌊 An event stream of ATProto blobs blobstream.zio.blue
atproto deno jetstream
at main 290 lines 8.7 kB view raw
1import SubscribeFilters from "./classes/SubscribeFilters.ts"; 2import Meta from "./classes/Meta.ts"; 3import { args, didPdsCache, stats } from "./store.ts"; 4 5export async function parseEvent(e: MessageEvent<any>) { 6 const data = JSON.parse(e.data); 7 const cursor = data.time_us || null; 8 const blobs = await findBlobObjects(data, cursor); 9 10 return blobs; 11} 12 13export function printLandingHtml(): string { 14 const header = ` ____ _ _ _ 15| __ )| | ___ | |__ ___| |_ _ __ ___ __ _ _ __ ___ 16| _ \\| |/ _ \\| '_ \\/ __| __| '__/ _ \\/ _\` | '_ \` _ \\ 17| |_) | | (_) | |_) \\__ | |_| | | __| (_| | | | | | | 18|____/|_|\\___/|_.__/|___/\\__|_| \\___|\\__,_|_| |_| |_|`; 19 20 const outputHtml = ` 21 <!DOCTYPE html> 22 <html> 23 <head> 24 <style> 25 :root { 26 color-scheme: light dark; 27 } 28 29 body { 30 color: light-dark(#000000, #ffffff); 31 background-color: light-dark(#ffffff, #18191b); 32 font-family: monospace; 33 margin: 25px; 34 white-space: pre-wrap; 35 word-wrap: break-word; 36 } 37 38 a { 39 color: light-dark(#0a5dbd, #a4cefe); 40 } 41 42 .subtle { 43 opacity: 0.8; 44 } 45 </style> 46 <title>Blobstream</title> 47 </head> 48 <body>${header} 49 50✨ Events: ${stats.events.total.toLocaleString()} 51 ↳ Dropped: ${stats.events.dropped.toLocaleString()} 52 ↳ Failed: ${stats.events.failures.toLocaleString()} 53 ↳ <span title="Average Process Time">APT</span>: ${ 54 stats.events.averageProcessTime.toFixed(5) 55 }ms 56🕗 Uptime: ${getTimeSinceStart()} 57📍 Cursor: ${stats.currentCursor} 58 ↳ Latest: ${parseDate(new Date(stats.currentCursor / 1000))} 59 ↳ Current: ${parseDate(new Date())} 60📇 DIDs: ${stats.didCache.cached.toLocaleString()} 61 ↳ Added: +${stats.didCache.added.toLocaleString()} 62 ↳ Dropped: -${stats.didCache.popped.toLocaleString()} 63👤 Clients: ${stats.totalClients.toLocaleString()} 64⬆️ Upstream: ${new URL(args.endpointJetstream as string).host} 65 66WebSocket endpoint at <strong>/subscribe</strong>. Filter with queries: 67* <strong>wantedCollections</strong> &mdash; Collection(s) to filter by 68 <em class="subtle">(e.g. app.bsky.feed.post, blue.zio.atfile.upload)</em> 69* <strong>wantedDids</strong> &mdash; DID(s) to filter by 70 <em class="subtle">(e.g. did:plc:z72i7hdynmk6r22z27h6tvur, did:web:didd.uk)</em> 71* <strong>wantedMimeTypes</strong> &mdash; MimeType(s) to filter by 72 <em class="subtle">(e.g. image/jpeg, video/mp4)</em> 73* <strong>wantedPdses</strong> &mdash; PDS(es) to filter by (without protocol) 74 <em class="subtle">(e.g. shimeji.us-east.host.bsky.network, zio.blue) 75 76<a href="https://tangled.org/@zio.sh/blobstream">🐑 @zio.sh/blobstream</a> ・ ${Meta.version} 77 </body> 78 </html> 79 `; 80 81 return outputHtml; 82} 83 84export function printSocketMessage(closing: boolean, ip: string, filters: SubscribeFilters) { 85 const verb = closing ? "Closing" : "Opening"; 86 console.log( 87 `${verb} (${stats.totalClients.toLocaleString()}): ${ip} (${ 88 filters.toString().replace(", ", "; ") 89 })`, 90 ); 91} 92 93function extractStringsFromRecord(record: any) { 94 let type = record.$type; 95 let associated: string[] = []; 96 97 switch (type) { 98 case "app.bsky.actor.profile": 99 if(record.description != undefined && record.description != null) 100 associated.push(record.description); 101 break; 102 case "app.bsky.feed.post": 103 if (record.embed != undefined) { 104 switch (record.embed.$type) { 105 // deno-lint-ignore no-case-declarations 106 case "app.bsky.embed.images": 107 const imgs = record.embed.images; 108 if (imgs) { 109 const images = Array.isArray(imgs) 110 ? imgs 111 : (Array.isArray((imgs as any).images) 112 ? (imgs as any).images 113 : [imgs]); 114 115 for (const image of images) { 116 if (!image) continue; 117 const alt = (typeof image === "object") 118 ? (image.alt ?? undefined) 119 : (typeof image === "string" ? image : undefined); 120 if (typeof alt === "string" && alt != "") { 121 associated.push(alt); 122 } 123 } 124 } 125 break; 126 case "app.bsky.embed.video": 127 if(record.embed.alt != undefined && record.embed.alt != "") 128 associated.push(record.embed.alt); 129 break; 130 } 131 } 132 133 if ( 134 record.labels != undefined && 135 record.labels.$type == "com.atproto.label.defs#selfLabels" && 136 record.labels.values != undefined && record.labels.values.length != 0 137 ) { 138 const labels = (record.labels.values as any[]) 139 .map((v) => (typeof v === "string" ? v : v?.val)) 140 .filter(Boolean); 141 if (labels.length) { 142 for(const label in labels) { 143 associated.push(label); 144 } 145 } 146 } 147 148 if (record.tags != undefined && record.tags.length != 0) { 149 for(const tag in record.tags) { 150 associated.push(`#${record.tags}`); 151 } 152 } 153 if (record.text != undefined && record.text != "") { 154 associated.push(record.text); 155 } 156 break; 157 } 158 159 return associated; 160} 161 162async function findBlobObjects( 163 obj: any, 164 cursor: number | null, 165): Promise<any[]> { 166 const blobs: any[] = []; 167 168 async function search( 169 current: any, 170 rkey: string | null = null, 171 collection: string | null = null, 172 did: string | null = null, 173 record: any | null = null, 174 ): Promise<void> { 175 if (!current || typeof current !== "object") return; 176 177 if (current.rkey) rkey = current.rkey; 178 if (current.collection) collection = current.collection; 179 if (current.did) did = current.did; 180 if (current.record) record = current.record; 181 182 if (current.$type === "blob" && current.ref?.["$link"]) { 183 let associated; 184 let pds; 185 186 if (args.disableAssociatedExtracting != true && record != null) { 187 associated = extractStringsFromRecord(record); 188 } 189 if (args.disablePdsResolving != true) pds = await getPds(did); 190 191 blobs.push({ 192 blob: { 193 cid: current.ref["$link"], 194 mimeType: current.mimeType, 195 size: current.size, 196 }, 197 source: { 198 collection: collection, 199 did: did, 200 rkey: rkey, 201 pds: pds, 202 }, 203 associated: associated, 204 cursor: cursor, 205 }); 206 207 if (cursor !== null) stats.currentCursor = cursor; 208 stats.events.total++; 209 } else { 210 stats.events.dropped++; 211 } 212 213 if (Array.isArray(current)) { 214 for (const item of current) { 215 await search(item, rkey, collection, did, record); 216 } 217 } else { 218 for (const key in current) { 219 await search(current[key], rkey, collection, did, record); 220 } 221 } 222 } 223 224 await search(obj); 225 return blobs; 226} 227 228async function getPds(did: string | null): Promise<string | undefined> { 229 if (!did) return undefined; 230 let didUrl = undefined; 231 232 const cached = didPdsCache.find((item) => item.did === did); 233 if (cached) { 234 if (cached.pds === null) return undefined; 235 return cached.pds; 236 } 237 238 if (did.startsWith("did:plc:")) { 239 didUrl = `${args.endpointPlcdir}/${did}`; 240 } else if (did.startsWith("did:web:")) { 241 didUrl = `https://${did.replace("did:web:", "")}/.well-known/did.json`; 242 } 243 244 if (didUrl !== undefined) { 245 const didDoc = await fetch(didUrl); 246 if (!didDoc.ok) return undefined; 247 const didDocJson = await didDoc.json(); 248 const pds = didDocJson?.service?.[0]?.serviceEndpoint; 249 250 if (pds && pds.startsWith("https://")) { 251 // cache result for future lookups 252 didPdsCache.push({ did: did, pds: pds }); 253 stats.didCache.added++; 254 255 if (didPdsCache.length > args.cacheDid) { 256 stats.didCache.popped++; 257 didPdsCache.shift(); 258 } 259 260 stats.didCache.cached = didPdsCache.length; 261 262 return pds; 263 } 264 } 265 266 return undefined; 267} 268 269function getTimeSinceStart(): string { 270 if (!stats.startTime) { 271 return "0d 0h 0m 0s"; 272 } 273 274 const elapsedMs = Date.now() - stats.startTime.getTime(); 275 const seconds = Math.floor(elapsedMs / 1000); 276 const minutes = Math.floor(seconds / 60); 277 const hours = Math.floor(minutes / 60); 278 const days = Math.floor(hours / 24); 279 280 const remainingHours = hours % 24; 281 const remainingMinutes = minutes % 60; 282 const remainingSeconds = seconds % 60; 283 284 return `${days}d ${remainingHours}h ${remainingMinutes}m ${remainingSeconds}s`; 285} 286 287 288function parseDate(date: Date): string { 289 return date.toISOString().slice(0, 19).replace("T", " "); 290}