tangled
alpha
login
or
join now
graham.systems
/
statusphere-react
forked from
samuel.fm/statusphere-react
0
fork
atom
the statusphere demo reworked into a vite/react app in a monorepo
0
fork
atom
overview
issues
pulls
pipelines
tidy firehose
dholms
2 years ago
9990f3e3
f99257fb
+2
-165
1 changed file
expand all
collapse all
unified
split
src
firehose.ts
+2
-165
src/firehose.ts
···
1
1
-
// import { BlobRef } from "@atproto/lexicon";
2
1
import { cborToLexRecord, readCar } from "@atproto/repo";
3
2
import { Subscription } from "@atproto/xrpc-server";
4
3
import { Post } from "#/db";
5
5
-
// import type { Database } from "../db";
6
6
-
// import { ids, lexicons } from "../lexicon/lexicons";
7
7
-
// import type { Record as LikeRecord } from "../lexicon/types/app/bsky/feed/like";
8
8
-
// import type { Record as PostRecord } from "../lexicon/types/app/bsky/feed/post";
9
9
-
// import type { Record as RepostRecord } from "../lexicon/types/app/bsky/feed/repost";
10
10
-
// import type { Record as FollowRecord } from "../lexicon/types/app/bsky/graph/follow";
11
11
-
// import {
12
12
-
// type Commit,
13
13
-
// type OutputSchema as RepoEvent,
14
14
-
// isCommit,
15
15
-
// } from "../lexicon/types/com/atproto/sync/subscribeRepos";
16
4
17
5
export class Firehose {
18
6
public sub: Subscription<unknown>;
···
21
9
this.sub = new Subscription({
22
10
service: service,
23
11
method: "com.atproto.sync.subscribeRepos",
24
24
-
getParams: () => this.getCursor(),
25
25
-
validate: (value: unknown) => {
26
26
-
return value;
27
27
-
// try {
28
28
-
// return lexicons.assertValidXrpcMessage<RepoEvent>(
29
29
-
// ids.ComAtprotoSyncSubscribeRepos,
30
30
-
// value
31
31
-
// );
32
32
-
// } catch (err) {
33
33
-
// console.error("repo subscription skipped invalid message", err);
34
34
-
// }
35
35
-
},
12
12
+
getParams: () => ({}),
13
13
+
validate: (value: unknown) => value,
36
14
});
37
15
}
38
16
···
65
43
} catch (err) {
66
44
console.error("repo subscription could not handle message", err);
67
45
}
68
68
-
// // update stored cursor every 20 events or so
69
69
-
// if (isCommit(evt) && evt.seq % 20 === 0) {
70
70
-
// await this.updateCursor(evt.seq);
71
71
-
// }
72
46
}
73
47
} catch (err) {
74
48
console.error("repo subscription errored", err);
75
49
setTimeout(() => this.run(subscriptionReconnectDelay), subscriptionReconnectDelay);
76
50
}
77
51
}
78
78
-
79
79
-
async getCursor() {
80
80
-
return {};
81
81
-
}
82
82
-
83
83
-
// async updateCursor(cursor: number) {
84
84
-
// await this.db
85
85
-
// .updateTable("sub_state")
86
86
-
// .set({ cursor })
87
87
-
// .where("service", "=", this.service)
88
88
-
// .execute();
89
89
-
// }
90
90
-
91
91
-
// async getCursor(): Promise<{ cursor?: number }> {
92
92
-
// const res = await this.db
93
93
-
// .selectFrom("sub_state")
94
94
-
// .selectAll()
95
95
-
// .where("service", "=", this.service)
96
96
-
// .executeTakeFirst();
97
97
-
// return res ? { cursor: res.cursor } : {};
98
98
-
// }
99
52
}
100
100
-
101
101
-
// export const getOpsByType = async (evt: Commit): Promise<OperationsByType> => {
102
102
-
// const car = await readCar(evt.blocks);
103
103
-
// const opsByType: OperationsByType = {
104
104
-
// posts: { creates: [], deletes: [] },
105
105
-
// reposts: { creates: [], deletes: [] },
106
106
-
// likes: { creates: [], deletes: [] },
107
107
-
// follows: { creates: [], deletes: [] },
108
108
-
// };
109
109
-
110
110
-
// for (const op of evt.ops) {
111
111
-
// const uri = `at://${evt.repo}/${op.path}`;
112
112
-
// const [collection] = op.path.split("/");
113
113
-
114
114
-
// if (op.action === "update") continue; // updates not supported yet
115
115
-
116
116
-
// if (op.action === "create") {
117
117
-
// if (!op.cid) continue;
118
118
-
// const recordBytes = car.blocks.get(op.cid);
119
119
-
// if (!recordBytes) continue;
120
120
-
// const record = cborToLexRecord(recordBytes);
121
121
-
// const create = { uri, cid: op.cid.toString(), author: evt.repo };
122
122
-
// if (collection === ids.AppBskyFeedPost && isPost(record)) {
123
123
-
// opsByType.posts.creates.push({ record, ...create });
124
124
-
// } else if (collection === ids.AppBskyFeedRepost && isRepost(record)) {
125
125
-
// opsByType.reposts.creates.push({ record, ...create });
126
126
-
// } else if (collection === ids.AppBskyFeedLike && isLike(record)) {
127
127
-
// opsByType.likes.creates.push({ record, ...create });
128
128
-
// } else if (collection === ids.AppBskyGraphFollow && isFollow(record)) {
129
129
-
// opsByType.follows.creates.push({ record, ...create });
130
130
-
// }
131
131
-
// }
132
132
-
133
133
-
// if (op.action === "delete") {
134
134
-
// if (collection === ids.AppBskyFeedPost) {
135
135
-
// opsByType.posts.deletes.push({ uri });
136
136
-
// } else if (collection === ids.AppBskyFeedRepost) {
137
137
-
// opsByType.reposts.deletes.push({ uri });
138
138
-
// } else if (collection === ids.AppBskyFeedLike) {
139
139
-
// opsByType.likes.deletes.push({ uri });
140
140
-
// } else if (collection === ids.AppBskyGraphFollow) {
141
141
-
// opsByType.follows.deletes.push({ uri });
142
142
-
// }
143
143
-
// }
144
144
-
// }
145
145
-
146
146
-
// return opsByType;
147
147
-
// };
148
148
-
149
149
-
// type OperationsByType = {
150
150
-
// posts: Operations<PostRecord>;
151
151
-
// reposts: Operations<RepostRecord>;
152
152
-
// likes: Operations<LikeRecord>;
153
153
-
// follows: Operations<FollowRecord>;
154
154
-
// };
155
155
-
156
156
-
// type Operations<T = Record<string, unknown>> = {
157
157
-
// creates: CreateOp<T>[];
158
158
-
// deletes: DeleteOp[];
159
159
-
// };
160
160
-
161
161
-
// type CreateOp<T> = {
162
162
-
// uri: string;
163
163
-
// cid: string;
164
164
-
// author: string;
165
165
-
// record: T;
166
166
-
// };
167
167
-
168
168
-
// type DeleteOp = {
169
169
-
// uri: string;
170
170
-
// };
171
171
-
172
172
-
// export const isPost = (obj: unknown): obj is PostRecord => {
173
173
-
// return isType(obj, ids.AppBskyFeedPost);
174
174
-
// };
175
175
-
176
176
-
// export const isRepost = (obj: unknown): obj is RepostRecord => {
177
177
-
// return isType(obj, ids.AppBskyFeedRepost);
178
178
-
// };
179
179
-
180
180
-
// export const isLike = (obj: unknown): obj is LikeRecord => {
181
181
-
// return isType(obj, ids.AppBskyFeedLike);
182
182
-
// };
183
183
-
184
184
-
// export const isFollow = (obj: unknown): obj is FollowRecord => {
185
185
-
// return isType(obj, ids.AppBskyGraphFollow);
186
186
-
// };
187
187
-
188
188
-
// const isType = (obj: unknown, nsid: string) => {
189
189
-
// try {
190
190
-
// lexicons.assertValidRecord(nsid, fixBlobRefs(obj));
191
191
-
// return true;
192
192
-
// } catch (err) {
193
193
-
// return false;
194
194
-
// }
195
195
-
// };
196
196
-
197
197
-
// // @TODO right now record validation fails on BlobRefs
198
198
-
// // simply because multiple packages have their own copy
199
199
-
// // of the BlobRef class, causing instanceof checks to fail.
200
200
-
// // This is a temporary solution.
201
201
-
// const fixBlobRefs = (obj: unknown): unknown => {
202
202
-
// if (Array.isArray(obj)) {
203
203
-
// return obj.map(fixBlobRefs);
204
204
-
// }
205
205
-
// if (obj && typeof obj === "object") {
206
206
-
// if (obj.constructor.name === "BlobRef") {
207
207
-
// const blob = obj as BlobRef;
208
208
-
// return new BlobRef(blob.ref, blob.mimeType, blob.size, blob.original);
209
209
-
// }
210
210
-
// return Object.entries(obj).reduce((acc, [key, val]) => {
211
211
-
// return Object.assign(acc, { [key]: fixBlobRefs(val) });
212
212
-
// }, {} as Record<string, unknown>);
213
213
-
// }
214
214
-
// return obj;
215
215
-
// };