the statusphere demo reworked into a vite/react app in a monorepo

more firehose improvements

dholms 5d629914 a4ee1607

+129 -16
+67 -11
src/firehose/firehose.ts
··· 4 4 import { Subscription } from "@atproto/xrpc-server"; 5 5 import type { CID } from "multiformats/cid"; 6 6 import { 7 + type Account, 7 8 type Commit, 9 + type Identity, 8 10 type RepoEvent, 11 + isAccount, 9 12 isCommit, 13 + isIdentity, 10 14 isValidRepoEvent, 11 15 } from "./lexicons"; 12 16 ··· 15 19 getCursor?: () => Promise<number | undefined>; 16 20 setCursor?: (cursor: number) => Promise<void>; 17 21 subscriptionReconnectDelay?: number; 22 + filterCollections?: string[]; 23 + excludeIdentity?: boolean; 24 + excludeAccount?: boolean; 25 + excludeCommit?: boolean; 18 26 }; 19 27 20 28 export class Firehose { ··· 46 54 try { 47 55 for await (const evt of this.sub) { 48 56 try { 49 - const parsed = await parseEvent(evt); 50 - for (const op of parsed) { 51 - yield op; 57 + if (isCommit(evt) && !this.opts.excludeCommit) { 58 + const parsed = await parseCommit(evt); 59 + for (const write of parsed) { 60 + if ( 61 + !this.opts.filterCollections || 62 + this.opts.filterCollections.includes(write.uri.collection) 63 + ) { 64 + yield write; 65 + } 66 + } 67 + } else if (isAccount(evt) && !this.opts.excludeAccount) { 68 + const parsed = parseAccount(evt); 69 + if (parsed) { 70 + yield parsed; 71 + } 72 + } else if (isIdentity(evt) && !this.opts.excludeIdentity) { 73 + yield parseIdentity(evt); 52 74 } 53 75 } catch (err) { 54 76 console.error("repo subscription could not handle message", err); ··· 71 93 } 72 94 } 73 95 74 - export const parseEvent = async (evt: RepoEvent): Promise<Event[]> => { 75 - if (!isCommit(evt)) return []; 76 - return parseCommit(evt); 77 - }; 78 - 79 - export const parseCommit = async (evt: Commit): Promise<Event[]> => { 96 + export const parseCommit = async (evt: Commit): Promise<CommitEvt[]> => { 80 97 const car = await readCar(evt.blocks); 81 98 82 - const evts: Event[] = []; 99 + const evts: CommitEvt[] = []; 83 100 84 101 for (const op of evt.ops) { 85 102 const uri = new AtUri(`at://${evt.repo}/${op.path}`); ··· 115 132 return evts; 116 133 }; 117 134 118 - type Event = Create | Update | Delete; 135 + export const parseIdentity = (evt: Identity): IdentityEvt => { 136 + return { 137 + event: "identity", 138 + did: evt.did, 139 + handle: evt.handle, 140 + }; 141 + }; 142 + 143 + export const parseAccount = (evt: Account): AccountEvt | undefined => { 144 + if (evt.status && !isValidStatus(evt.status)) return; 145 + return { 146 + event: "account", 147 + did: evt.did, 148 + active: evt.active, 149 + status: evt.status as AccountStatus, 150 + }; 151 + }; 152 + 153 + const isValidStatus = (str: string): str is AccountStatus => { 154 + return ["takendown", "suspended", "deleted", "deactivated"].includes(str); 155 + }; 156 + 157 + type Event = CommitEvt | IdentityEvt | AccountEvt; 119 158 120 159 type CommitMeta = { 121 160 uri: AtUri; ··· 123 162 collection: string; 124 163 rkey: string; 125 164 }; 165 + 166 + type CommitEvt = Create | Update | Delete; 126 167 127 168 type Create = CommitMeta & { 128 169 event: "create"; ··· 137 178 type Delete = CommitMeta & { 138 179 event: "delete"; 139 180 }; 181 + 182 + type IdentityEvt = { 183 + event: "identity"; 184 + did: string; 185 + handle?: string; 186 + }; 187 + 188 + type AccountEvt = { 189 + event: "account"; 190 + did: string; 191 + active: boolean; 192 + status?: AccountStatus; 193 + }; 194 + 195 + type AccountStatus = "takendown" | "suspended" | "deleted" | "deactivated";
+62 -5
src/firehose/lexicons.ts
··· 18 18 } 19 19 20 20 export interface QueryParams { 21 - /** The last known event to backfill from. */ 21 + /** The last known event seq number to backfill from. */ 22 22 cursor?: number; 23 23 } 24 24 25 25 export type RepoEvent = 26 26 | Commit 27 + | Identity 28 + | Account 27 29 | Handle 28 30 | Migrate 29 31 | Tombstone ··· 41 43 ctx: HandlerReqCtx<HA> 42 44 ) => AsyncIterable<HandlerOutput>; 43 45 46 + /** Represents an update of repository state. Note that empty commits are allowed, which include no repo data changes, but an update to rev and signature. */ 44 47 export interface Commit { 48 + /** The stream sequence number of this message. */ 45 49 seq: number; 50 + /** DEPRECATED -- unused */ 46 51 rebase: boolean; 52 + /** Indicates that this commit contained too many ops, or data size was too large. Consumers will need to make a separate request to get missing data. */ 47 53 tooBig: boolean; 54 + /** The repo this event comes from. */ 48 55 repo: string; 56 + /** Repo commit object CID. */ 49 57 commit: CID; 58 + /** DEPRECATED -- unused. WARNING -- nullable and optional; stick with optional to ensure golang interoperability. */ 50 59 prev?: CID | null; 51 - /** The rev of the emitted commit */ 60 + /** The rev of the emitted commit. Note that this information is also in the commit object included in blocks, unless this is a tooBig event. */ 52 61 rev: string; 53 - /** The rev of the last emitted commit from this repo */ 62 + /** The rev of the last emitted commit from this repo (if any). */ 54 63 since: string | null; 55 - /** CAR file containing relevant blocks */ 64 + /** CAR file containing relevant blocks, as a diff since the previous repo state. */ 56 65 blocks: Uint8Array; 57 66 ops: RepoOp[]; 58 67 blobs: CID[]; 68 + /** Timestamp of when this message was originally broadcast. */ 59 69 time: string; 60 70 [k: string]: unknown; 61 71 } ··· 68 78 ); 69 79 } 70 80 81 + /** Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache. */ 82 + export interface Identity { 83 + seq: number; 84 + did: string; 85 + time: string; 86 + /** The current handle for the account, or 'handle.invalid' if validation fails. This field is optional, might have been validated or passed-through from an upstream source. Semantics and behaviors for PDS vs Relay may evolve in the future; see atproto specs for more details. */ 87 + handle?: string; 88 + [k: string]: unknown; 89 + } 90 + 91 + export function isIdentity(v: unknown): v is Identity { 92 + return ( 93 + isObj(v) && 94 + hasProp(v, "$type") && 95 + v.$type === "com.atproto.sync.subscribeRepos#identity" 96 + ); 97 + } 98 + 99 + /** Represents a change to an account's status on a host (eg, PDS or Relay). The semantics of this event are that the status is at the host which emitted the event, not necessarily that at the currently active PDS. Eg, a Relay takedown would emit a takedown with active=false, even if the PDS is still active. */ 100 + export interface Account { 101 + seq: number; 102 + did: string; 103 + time: string; 104 + /** Indicates that the account has a repository which can be fetched from the host that emitted this event. */ 105 + active: boolean; 106 + /** If active=false, this optional field indicates a reason for why the account is not active. */ 107 + status?: 108 + | "takendown" 109 + | "suspended" 110 + | "deleted" 111 + | "deactivated" 112 + | (string & {}); 113 + [k: string]: unknown; 114 + } 115 + 116 + export function isAccount(v: unknown): v is Account { 117 + return ( 118 + isObj(v) && 119 + hasProp(v, "$type") && 120 + v.$type === "com.atproto.sync.subscribeRepos#account" 121 + ); 122 + } 123 + 124 + /** DEPRECATED -- Use #identity event instead */ 71 125 export interface Handle { 72 126 seq: number; 73 127 did: string; ··· 84 138 ); 85 139 } 86 140 141 + /** DEPRECATED -- Use #account event instead */ 87 142 export interface Migrate { 88 143 seq: number; 89 144 did: string; ··· 100 155 ); 101 156 } 102 157 158 + /** DEPRECATED -- Use #account event instead */ 103 159 export interface Tombstone { 104 160 seq: number; 105 161 did: string; ··· 129 185 ); 130 186 } 131 187 132 - /** A repo operation, ie a write of a single record. For creates and updates, cid is the record's CID as of this operation. For deletes, it's null. */ 188 + /** A repo operation, ie a mutation of a single record. */ 133 189 export interface RepoOp { 134 190 action: "create" | "update" | "delete" | (string & {}); 135 191 path: string; 192 + /** For creates and updates, the new record CID. For deletions, null. */ 136 193 cid: CID | null; 137 194 [k: string]: unknown; 138 195 }