tangled
alpha
login
or
join now
danabra.mov
/
statusphere-react
forked from
samuel.fm/statusphere-react
0
fork
atom
the statusphere demo reworked into a vite/react app in a monorepo
0
fork
atom
overview
issues
pulls
pipelines
add jetstream
samuel.fm
1 year ago
1317e8d1
25e553d1
+272
-29
15 changed files
expand all
collapse all
unified
split
lexicons
xyz
statusphere
getStatuses.json
packages
appview
src
context.ts
db.ts
index.ts
ingestors
firehose.ts
index.ts
jetstream.ts
lexicons
lexicons.ts
types
xyz
statusphere
getStatuses.ts
client
src
components
StatusForm.tsx
StatusList.tsx
pages
HomePage.tsx
vite.config.ts
lexicon
src
lexicons.ts
types
xyz
statusphere
getStatuses.ts
-1
lexicons/xyz/statusphere/getStatuses.json
···
22
22
"type": "object",
23
23
"required": ["statuses"],
24
24
"properties": {
25
25
-
"cursor": { "type": "string" },
26
25
"statuses": {
27
26
"type": "array",
28
27
"items": {
+4
-3
packages/appview/src/context.ts
···
2
2
import { Firehose } from '@atproto/sync'
3
3
import pino from 'pino'
4
4
5
5
-
import { Database } from './db'
6
6
-
import { BidirectionalResolver } from './id-resolver'
5
5
+
import { Database } from '#/db'
6
6
+
import { BidirectionalResolver } from '#/id-resolver'
7
7
+
import { Jetstream } from '#/ingestors'
7
8
8
9
// Application state passed to the router and elsewhere
9
10
export type AppContext = {
10
11
db: Database
11
11
-
ingester: Firehose
12
12
+
ingester: Firehose | Jetstream<any>
12
13
logger: pino.Logger
13
14
oauthClient: OAuthClient
14
15
resolver: BidirectionalResolver
+15
packages/appview/src/db.ts
···
53
53
},
54
54
}
55
55
56
56
+
migrations['003'] = {
57
57
+
async up(db: Kysely<unknown>) {},
58
58
+
async down(_db: Kysely<unknown>) {},
59
59
+
}
60
60
+
56
61
migrations['002'] = {
57
62
async up(db: Kysely<unknown>) {
58
63
await db.schema
59
64
.createTable('cursor')
60
65
.addColumn('id', 'integer', (col) => col.primaryKey())
61
66
.addColumn('seq', 'integer', (col) => col.notNull())
67
67
+
.execute()
68
68
+
69
69
+
// Insert initial cursor values:
70
70
+
// id=1 is for firehose, id=2 is for jetstream
71
71
+
await db
72
72
+
.insertInto('cursor' as never)
73
73
+
.values([
74
74
+
{ id: 1, seq: 0 },
75
75
+
{ id: 2, seq: 0 },
76
76
+
])
62
77
.execute()
63
78
},
64
79
async down(db: Kysely<unknown>) {
+4
-3
packages/appview/src/index.ts
···
14
14
import { createDb, migrateToLatest } from '#/db'
15
15
import * as error from '#/error'
16
16
import { createBidirectionalResolver, createIdResolver } from '#/id-resolver'
17
17
-
import { createIngester } from '#/ingester'
17
17
+
import { createFirehoseIngester, createJetstreamIngester } from '#/ingestors'
18
18
import { createServer } from '#/lexicons'
19
19
import { env } from '#/lib/env'
20
20
···
36
36
// Create the atproto utilities
37
37
const oauthClient = await createClient(db)
38
38
const baseIdResolver = createIdResolver()
39
39
-
const ingester = await createIngester(db, baseIdResolver)
39
39
+
const ingester = await createJetstreamIngester(db)
40
40
+
// Alternative: const ingester = await createFirehoseIngester(db, baseIdResolver)
40
41
const resolver = createBidirectionalResolver(baseIdResolver)
41
42
const ctx = {
42
43
db,
···
103
104
})
104
105
}
105
106
} else {
106
106
-
server.xrpc.router.set('trust proxy', true)
107
107
+
app.set('trust proxy', true)
107
108
}
108
109
109
110
// Use the port from env (should be 3001 for the API server)
+4
-1
packages/appview/src/ingester.ts
packages/appview/src/ingestors/firehose.ts
···
5
5
6
6
import type { Database } from '#/db'
7
7
8
8
-
export async function createIngester(db: Database, idResolver: IdResolver) {
8
8
+
export async function createFirehoseIngester(
9
9
+
db: Database,
10
10
+
idResolver: IdResolver,
11
11
+
) {
9
12
const logger = pino({ name: 'firehose ingestion' })
10
13
11
14
const cursor = await db
+2
packages/appview/src/ingestors/index.ts
···
1
1
+
export * from './jetstream'
2
2
+
export * from './firehose'
+213
packages/appview/src/ingestors/jetstream.ts
···
1
1
+
import { XyzStatusphereStatus } from '@statusphere/lexicon'
2
2
+
import pino from 'pino'
3
3
+
import WebSocket from 'ws'
4
4
+
5
5
+
import type { Database } from '#/db'
6
6
+
7
7
+
export async function createJetstreamIngester(db: Database) {
8
8
+
const logger = pino({ name: 'jetstream ingestion' })
9
9
+
10
10
+
const cursor = await db
11
11
+
.selectFrom('cursor')
12
12
+
.where('id', '=', 2)
13
13
+
.select('seq')
14
14
+
.executeTakeFirst()
15
15
+
16
16
+
logger.info(`start cursor: ${cursor?.seq}`)
17
17
+
18
18
+
// For throttling cursor writes
19
19
+
let lastCursorWrite = 0
20
20
+
21
21
+
return new Jetstream<XyzStatusphereStatus.Record>({
22
22
+
logger,
23
23
+
cursor: cursor?.seq || undefined,
24
24
+
setCursor: async (seq) => {
25
25
+
const now = Date.now()
26
26
+
27
27
+
if (now - lastCursorWrite >= 30000) {
28
28
+
lastCursorWrite = now
29
29
+
logger.info(`writing cursor: ${seq}`)
30
30
+
await db
31
31
+
.updateTable('cursor')
32
32
+
.set({ seq })
33
33
+
.where('id', '=', 2)
34
34
+
.execute()
35
35
+
}
36
36
+
},
37
37
+
handleEvent: async (evt) => {
38
38
+
// ignore account and identity events
39
39
+
if (
40
40
+
evt.kind !== 'commit' ||
41
41
+
evt.commit.collection !== 'xyz.statusphere.status'
42
42
+
)
43
43
+
return
44
44
+
45
45
+
const now = new Date()
46
46
+
const uri = `at://${evt.did}/${evt.commit.collection}/${evt.commit.rkey}`
47
47
+
48
48
+
if (
49
49
+
(evt.commit.operation === 'create' ||
50
50
+
evt.commit.operation === 'update') &&
51
51
+
XyzStatusphereStatus.isRecord(evt.commit.record)
52
52
+
) {
53
53
+
const validatedRecord = XyzStatusphereStatus.validateRecord(
54
54
+
evt.commit.record,
55
55
+
)
56
56
+
if (!validatedRecord.success) return
57
57
+
58
58
+
// Store the status in our SQLite
59
59
+
await db
60
60
+
.insertInto('status')
61
61
+
.values({
62
62
+
uri,
63
63
+
authorDid: evt.did,
64
64
+
status: validatedRecord.value.status,
65
65
+
createdAt: validatedRecord.value.createdAt,
66
66
+
indexedAt: now.toISOString(),
67
67
+
})
68
68
+
.onConflict((oc) =>
69
69
+
oc.column('uri').doUpdateSet({
70
70
+
status: validatedRecord.value.status,
71
71
+
indexedAt: now.toISOString(),
72
72
+
}),
73
73
+
)
74
74
+
.execute()
75
75
+
} else if (evt.commit.operation === 'delete') {
76
76
+
// Remove the status from our SQLite
77
77
+
await db.deleteFrom('status').where('uri', '=', uri).execute()
78
78
+
}
79
79
+
},
80
80
+
onError: (err) => {
81
81
+
logger.error({ err }, 'error during jetstream ingestion')
82
82
+
},
83
83
+
wantedCollections: ['xyz.statusphere.status'],
84
84
+
})
85
85
+
}
86
86
+
87
87
+
export class Jetstream<T> {
88
88
+
private logger: pino.Logger
89
89
+
private handleEvent: (evt: JetstreamEvent<T>) => Promise<void>
90
90
+
private onError: (err: unknown) => void
91
91
+
private setCursor?: (seq: number) => Promise<void>
92
92
+
private cursor?: number
93
93
+
private ws?: WebSocket
94
94
+
private isStarted = false
95
95
+
private wantedCollections: string[]
96
96
+
97
97
+
constructor({
98
98
+
logger,
99
99
+
cursor,
100
100
+
setCursor,
101
101
+
handleEvent,
102
102
+
onError,
103
103
+
wantedCollections,
104
104
+
}: {
105
105
+
logger: pino.Logger
106
106
+
cursor?: number
107
107
+
setCursor?: (seq: number) => Promise<void>
108
108
+
handleEvent: (evt: any) => Promise<void>
109
109
+
onError: (err: any) => void
110
110
+
wantedCollections: string[]
111
111
+
}) {
112
112
+
this.logger = logger
113
113
+
this.cursor = cursor
114
114
+
this.setCursor = setCursor
115
115
+
this.handleEvent = handleEvent
116
116
+
this.onError = onError
117
117
+
this.wantedCollections = wantedCollections
118
118
+
}
119
119
+
120
120
+
constructUrlWithQuery = (): string => {
121
121
+
const params = new URLSearchParams()
122
122
+
params.append('wantedCollections', this.wantedCollections.join(','))
123
123
+
if (this.cursor !== undefined) {
124
124
+
params.append('cursor', this.cursor.toString())
125
125
+
}
126
126
+
return `wss://jetstream.mozzius.dev/subscribe?${params.toString()}`
127
127
+
}
128
128
+
129
129
+
start() {
130
130
+
if (this.isStarted) return
131
131
+
this.isStarted = true
132
132
+
this.ws = new WebSocket(this.constructUrlWithQuery())
133
133
+
134
134
+
this.ws.on('open', () => {
135
135
+
this.logger.info('Jetstream connection opened.')
136
136
+
})
137
137
+
138
138
+
this.ws.on('message', async (data) => {
139
139
+
try {
140
140
+
const event: JetstreamEvent<T> = JSON.parse(data.toString())
141
141
+
142
142
+
// Update cursor if provided
143
143
+
if (event.time_us !== undefined && this.setCursor) {
144
144
+
await this.setCursor(event.time_us)
145
145
+
}
146
146
+
147
147
+
await this.handleEvent(event)
148
148
+
} catch (err) {
149
149
+
this.onError(err)
150
150
+
}
151
151
+
})
152
152
+
153
153
+
this.ws.on('error', (err) => {
154
154
+
this.onError(err)
155
155
+
})
156
156
+
157
157
+
this.ws.on('close', (code, reason) => {
158
158
+
this.logger.error(`Jetstream closed. Code: ${code}, Reason: ${reason}`)
159
159
+
this.isStarted = false
160
160
+
})
161
161
+
}
162
162
+
163
163
+
destroy() {
164
164
+
if (this.ws) {
165
165
+
this.ws.close()
166
166
+
this.isStarted = false
167
167
+
}
168
168
+
}
169
169
+
}
170
170
+
171
171
+
type JetstreamEvent<T> = {
172
172
+
did: string
173
173
+
time_us: number
174
174
+
} & (CommitEvent<T> | AccountEvent | IdentityEvent)
175
175
+
176
176
+
type CommitEvent<T> = {
177
177
+
kind: 'commit'
178
178
+
commit:
179
179
+
| {
180
180
+
operation: 'create' | 'update'
181
181
+
record: T
182
182
+
rev: string
183
183
+
collection: string
184
184
+
rkey: string
185
185
+
cid: string
186
186
+
}
187
187
+
| {
188
188
+
operation: 'delete'
189
189
+
rev: string
190
190
+
collection: string
191
191
+
rkey: string
192
192
+
}
193
193
+
}
194
194
+
195
195
+
type IdentityEvent = {
196
196
+
kind: 'identity'
197
197
+
identity: {
198
198
+
did: string
199
199
+
handle: string
200
200
+
seq: number
201
201
+
time: string
202
202
+
}
203
203
+
}
204
204
+
205
205
+
type AccountEvent = {
206
206
+
kind: 'account'
207
207
+
account: {
208
208
+
active: boolean
209
209
+
did: string
210
210
+
seq: number
211
211
+
time: string
212
212
+
}
213
213
+
}
-3
packages/appview/src/lexicons/lexicons.ts
···
79
79
type: 'object',
80
80
required: ['statuses'],
81
81
properties: {
82
82
-
cursor: {
83
83
-
type: 'string',
84
84
-
},
85
82
statuses: {
86
83
type: 'array',
87
84
items: {
-1
packages/appview/src/lexicons/types/xyz/statusphere/getStatuses.ts
···
21
21
export type InputSchema = undefined
22
22
23
23
export interface OutputSchema {
24
24
-
cursor?: string
25
24
statuses: XyzStatusphereDefs.StatusView[]
26
25
}
27
26
+1
-1
packages/client/src/components/StatusForm.tsx
···
5
5
import useAuth from '#/hooks/useAuth'
6
6
import api from '#/services/api'
7
7
8
8
-
const STATUS_OPTIONS = [
8
8
+
export const STATUS_OPTIONS = [
9
9
'👍',
10
10
'👎',
11
11
'💙',
+11
-2
packages/client/src/components/StatusList.tsx
···
2
2
import { useQuery } from '@tanstack/react-query'
3
3
4
4
import api from '#/services/api'
5
5
+
import { STATUS_OPTIONS } from './StatusForm'
5
6
6
7
const StatusList = () => {
7
8
// Use React Query to fetch and cache statuses
···
23
24
24
25
// Destructure data
25
26
const statuses = data?.statuses || []
27
27
+
28
28
+
// Get a random emoji from the STATUS_OPTIONS array
29
29
+
const randomEmoji = STATUS_OPTIONS[Math.floor(Math.random() * STATUS_OPTIONS.length)]
26
30
27
31
if (isPending && !data) {
28
32
return (
29
29
-
<div className="py-4 text-center text-gray-500 dark:text-gray-400">
30
30
-
Loading statuses...
33
33
+
<div className="py-8 text-center">
34
34
+
<div className="text-5xl mb-2 animate-pulse inline-block">
35
35
+
{randomEmoji}
36
36
+
</div>
37
37
+
<div className="text-gray-500 dark:text-gray-400">
38
38
+
Loading statuses...
39
39
+
</div>
31
40
</div>
32
41
)
33
42
}
+7
-10
packages/client/src/pages/HomePage.tsx
···
1
1
import Header from '#/components/Header'
2
2
-
import StatusForm from '#/components/StatusForm'
2
2
+
import StatusForm, { STATUS_OPTIONS } from '#/components/StatusForm'
3
3
import StatusList from '#/components/StatusList'
4
4
import { useAuth } from '#/hooks/useAuth'
5
5
6
6
const HomePage = () => {
7
7
const { user, loading, error } = useAuth()
8
8
9
9
+
// Get a random emoji from the STATUS_OPTIONS array
10
10
+
const randomEmoji =
11
11
+
STATUS_OPTIONS[Math.floor(Math.random() * STATUS_OPTIONS.length)]
12
12
+
9
13
if (loading) {
10
14
return (
11
11
-
<div className="flex justify-center items-center py-16">
12
12
-
<div className="text-center p-6">
13
13
-
<h2 className="text-2xl font-semibold mb-2 text-gray-800 dark:text-gray-200">
14
14
-
Loading Statusphere...
15
15
-
</h2>
16
16
-
<p className="text-gray-600 dark:text-gray-400">
17
17
-
Setting up your experience
18
18
-
</p>
19
19
-
</div>
15
15
+
<div className="flex justify-center items-center h-[80vh]">
16
16
+
<div className="text-9xl animate-pulse">{randomEmoji}</div>
20
17
</div>
21
18
)
22
19
}
+11
packages/client/vite.config.ts
···
21
21
'^/(xrpc|oauth|client-metadata\.json)/.*': {
22
22
target: 'http://localhost:3001',
23
23
changeOrigin: true,
24
24
+
configure: (proxy, _options) => {
25
25
+
proxy.on('error', (err, _req, _res) => {
26
26
+
console.log('PROXY ERROR', err);
27
27
+
});
28
28
+
proxy.on('proxyReq', (proxyReq, req, _res) => {
29
29
+
console.log('PROXY REQUEST', req.method, req.url);
30
30
+
});
31
31
+
proxy.on('proxyRes', (proxyRes, req, _res) => {
32
32
+
console.log('PROXY RESPONSE', req.method, req.url, proxyRes.statusCode);
33
33
+
});
34
34
+
},
24
35
},
25
36
},
26
37
},
-3
packages/lexicon/src/lexicons.ts
···
79
79
type: 'object',
80
80
required: ['statuses'],
81
81
properties: {
82
82
-
cursor: {
83
83
-
type: 'string',
84
84
-
},
85
82
statuses: {
86
83
type: 'array',
87
84
items: {
-1
packages/lexicon/src/types/xyz/statusphere/getStatuses.ts
···
20
20
export type InputSchema = undefined
21
21
22
22
export interface OutputSchema {
23
23
-
cursor?: string
24
23
statuses: XyzStatusphereDefs.StatusView[]
25
24
}
26
25