tangled
alpha
login
or
join now
accidental.cc
/
skypod
3
fork
atom
podcast manager
3
fork
atom
overview
issues
pulls
pipelines
refactoring the context a bit
Jonathan Raphaelson
4 months ago
7b3bd3bd
dc733619
+437
-341
16 changed files
expand all
collapse all
unified
split
eslint.config.js
src
client
components
feed-import-nytimes.tsx
feed-import-podcasts.tsx
feed-import-tech.tsx
messenger.tsx
page-app.tsx
skypod
action-dispatch
context.tsx
context.tsx
effects-connection.tsx
effects-feed-processor.tsx
feed-processor
api.ts
middleware.ts
worker.ts
server
routes-api
middleware-cors.ts
middleware-reader.ts
middleware.ts
+1
-1
eslint.config.js
···
88
88
// mostly cribbed from preact's config, but that's not setup to handle eslint9
89
89
// https://github.com/preactjs/eslint-config-preact/blob/master/index.js
90
90
name: 'client files',
91
91
-
files: ['src/client/**/*.@(js|jsx|ts|tsx)'],
91
91
+
files: ['src/client/**/*.@(js|ts)', 'src/**/*.@(jsx|tsx)'],
92
92
languageOptions: {
93
93
globals: {
94
94
...globals.es2024,
+2
-2
src/client/components/feed-import-nytimes.tsx
···
1
1
import {useSignal} from '@preact/signals'
2
2
import {useCallback} from 'preact/hooks'
3
3
4
4
-
import {useSkypod} from '#client/skypod/context'
4
4
+
import {useActionDispatch} from '#client/skypod/action-dispatch/context.js'
5
5
import {useRealmIdentity} from '#realm/client/context-identity'
6
6
7
7
// NY Times RSS feeds from https://www.nytimes.com/rss
···
73
73
74
74
export const FeedImportNYTimes: preact.FunctionComponent = () => {
75
75
const {identity} = useRealmIdentity()
76
76
-
const store = useSkypod()
76
76
+
const store = useActionDispatch()
77
77
78
78
const importing$ = useSignal(false)
79
79
const imported$ = useSignal(0)
+2
-2
src/client/components/feed-import-podcasts.tsx
···
1
1
import {useSignal} from '@preact/signals'
2
2
import {useCallback} from 'preact/hooks'
3
3
4
4
-
import {useSkypod} from '#client/skypod/context'
4
4
+
import {useActionDispatch} from '#client/skypod/action-dispatch/context.js'
5
5
import {useRealmIdentity} from '#realm/client/context-identity'
6
6
7
7
// Popular podcasts
···
21
21
22
22
export const FeedImportPodcasts: preact.FunctionComponent = () => {
23
23
const {identity} = useRealmIdentity()
24
24
-
const store = useSkypod()
24
24
+
const store = useActionDispatch()
25
25
26
26
const importing$ = useSignal(false)
27
27
const imported$ = useSignal(0)
+2
-2
src/client/components/feed-import-tech.tsx
···
1
1
import {useSignal} from '@preact/signals'
2
2
import {useCallback} from 'preact/hooks'
3
3
4
4
-
import {useSkypod} from '#client/skypod/context'
4
4
+
import {useActionDispatch} from '#client/skypod/action-dispatch/context.js'
5
5
import {useRealmIdentity} from '#realm/client/context-identity'
6
6
7
7
// Popular tech/programming feeds
···
20
20
21
21
export const FeedImportTech: preact.FunctionComponent = () => {
22
22
const {identity} = useRealmIdentity()
23
23
-
const store = useSkypod()
23
23
+
const store = useActionDispatch()
24
24
25
25
const importing$ = useSignal(false)
26
26
const imported$ = useSignal(0)
+2
-2
src/client/components/messenger.tsx
···
2
2
import {useCallback} from 'preact/hooks'
3
3
4
4
import {useDatabase} from '#client/root/context-database'
5
5
-
import {useSkypod} from '#client/skypod/context'
5
5
+
import {useActionDispatch} from '#client/skypod/action-dispatch/context'
6
6
import {useRealmIdentity} from '#realm/client/context-identity'
7
7
8
8
export const Messenger: preact.FunctionComponent = () => {
9
9
const {useDbSignal} = useDatabase()
10
10
const {identity} = useRealmIdentity()
11
11
-
const store = useSkypod()
11
11
+
const store = useActionDispatch()
12
12
13
13
const feeds$ = useDbSignal((db) => db.feeds.toArray())
14
14
+15
-21
src/client/page-app.tsx
···
1
1
-
import {DatabaseProvider} from '#client/root/context-database'
2
1
import {SkypodProvider} from '#client/skypod/context'
3
2
import {RealmConnectionManager} from '#realm/client/components/connection-manager'
4
4
-
import {
5
5
-
RealmConnectionFallbackProps,
6
6
-
RealmConnectionProvider,
7
7
-
} from '#realm/client/context-connection'
8
8
-
import {RealmIdentityFallbackProps, RealmIdentityProvider} from '#realm/client/context-identity'
3
3
+
import {RealmConnectionFallbackProps} from '#realm/client/context-connection'
4
4
+
import {RealmIdentityFallbackProps} from '#realm/client/context-identity'
9
5
10
6
import {DebugNuke} from './components/debug-nuke'
11
7
import {FeedImportNYTimes} from './components/feed-import-nytimes'
···
26
22
const wsurl = `${wsproto}://${wshost}/stream`
27
23
28
24
return (
29
29
-
<DatabaseProvider>
30
30
-
<RealmIdentityProvider fallback={identityFallback}>
31
31
-
<RealmConnectionProvider fallback={connectionFallback} url={wsurl}>
32
32
-
<SkypodProvider>
33
33
-
<RealmConnectionManager />
34
34
-
<PeerList />
35
35
-
<FeedImportNYTimes />
36
36
-
<FeedImportTech />
37
37
-
<FeedImportPodcasts />
38
38
-
<Messenger />
39
39
-
<DebugNuke />
40
40
-
</SkypodProvider>
41
41
-
</RealmConnectionProvider>
42
42
-
</RealmIdentityProvider>
43
43
-
</DatabaseProvider>
25
25
+
<SkypodProvider
26
26
+
identityFallback={identityFallback}
27
27
+
connectionFallback={connectionFallback}
28
28
+
websocketUrl={wsurl}
29
29
+
>
30
30
+
<RealmConnectionManager />
31
31
+
<PeerList />
32
32
+
<FeedImportNYTimes />
33
33
+
<FeedImportTech />
34
34
+
<FeedImportPodcasts />
35
35
+
<Messenger />
36
36
+
<DebugNuke />
37
37
+
</SkypodProvider>
44
38
)
45
39
}
+150
src/client/skypod/action-dispatch/context.tsx
···
1
1
+
import {createContext} from 'preact'
2
2
+
import {useContext, useRef} from 'preact/hooks'
3
3
+
4
4
+
import {useDatabase} from '#client/root/context-database'
5
5
+
import {useRealmConnection} from '#realm/client/context-connection'
6
6
+
import {useRealmIdentity} from '#realm/client/context-identity'
7
7
+
import {LogicalClock} from '#realm/protocol/logical-clock'
8
8
+
9
9
+
import {Action, ActionMap, ActionOpts} from '#skypod/actions'
10
10
+
11
11
+
export type MiddlewareFn = (
12
12
+
this: undefined,
13
13
+
action: Action,
14
14
+
) => void | Action[] | Promise<void | Action[]>
15
15
+
16
16
+
export type MiddlewareRouterFn<K extends keyof ActionMap> = (
17
17
+
this: undefined,
18
18
+
action: ActionMap[K],
19
19
+
) => void | Action[] | Promise<void | Action[]>
20
20
+
21
21
+
export type MiddlewarePosition = 'push' | 'shift' | number
22
22
+
23
23
+
export type Middleware = MiddlewareFn | {[K in keyof ActionMap]?: MiddlewareRouterFn<K>}
24
24
+
25
25
+
export interface ActionDispatchContext {
26
26
+
action<N extends keyof ActionMap>(msg: N, dat: ActionMap[N]['dat'], opt?: ActionOpts): Action
27
27
+
dispatch(this: void, action: Action): Promise<void>
28
28
+
29
29
+
addMiddleware(this: void, handler: Middleware, position?: MiddlewarePosition): void
30
30
+
removeMiddleware(this: void, handler: Middleware): void
31
31
+
}
32
32
+
33
33
+
const ActionDispatchContext = createContext<ActionDispatchContext | null>(null)
34
34
+
35
35
+
export const ActionDispatchProvider: preact.FunctionComponent<{
36
36
+
children: preact.ComponentChildren
37
37
+
}> = (props) => {
38
38
+
const {db} = useDatabase()
39
39
+
const {identity} = useRealmIdentity()
40
40
+
const {realm} = useRealmConnection()
41
41
+
const middleware = useRef<Middleware[]>([])
42
42
+
43
43
+
const context = useRef<ActionDispatchContext>({
44
44
+
dispatch: async <K extends keyof ActionMap>(action: ActionMap[K]) => {
45
45
+
try {
46
46
+
const {identid: actor} = LogicalClock.extract(action.clk)
47
47
+
await db.transaction('rw', [db.clocks, db.actions], async (tx) => {
48
48
+
await db.actions.add({clock: action.clk, actor, action})
49
49
+
50
50
+
const extant = await tx.clocks.get(actor)
51
51
+
if (!extant || LogicalClock.compare(extant.clock, action.clk) < 0)
52
52
+
await tx.clocks.put({actor, clock: action.clk})
53
53
+
})
54
54
+
} catch (err: unknown) {
55
55
+
if (typeof err === 'object' && err && 'name' in err && err.name === 'ConstraintError') {
56
56
+
console.debug('duplicate action ignored', action)
57
57
+
} else {
58
58
+
console.error('failed to store action:', err)
59
59
+
}
60
60
+
61
61
+
return
62
62
+
}
63
63
+
64
64
+
const actions = [action] as Action[]
65
65
+
for (const action of actions) {
66
66
+
for (const mware of middleware.current) {
67
67
+
const mwarefn =
68
68
+
typeof mware === 'function'
69
69
+
? mware
70
70
+
: (mware[action.msg] as MiddlewareRouterFn<typeof action.msg> | undefined)
71
71
+
72
72
+
if (mwarefn) {
73
73
+
try {
74
74
+
const result = await Promise.resolve(mwarefn.call(undefined, action))
75
75
+
if (result !== void 0) {
76
76
+
actions.push(...result)
77
77
+
}
78
78
+
} catch (err: unknown) {
79
79
+
console.error(`middleware error for ${action.msg}:`, err)
80
80
+
continue // TODO: ???
81
81
+
}
82
82
+
}
83
83
+
}
84
84
+
}
85
85
+
86
86
+
// broadcast if not a local action
87
87
+
if (!action.opt?.local) {
88
88
+
realm.value?.broadcast([action], false)
89
89
+
}
90
90
+
},
91
91
+
92
92
+
action: <N extends keyof ActionMap>(
93
93
+
msg: N,
94
94
+
dat: ActionMap[N]['dat'],
95
95
+
opt?: ActionOpts,
96
96
+
): ActionMap[N] => {
97
97
+
const clk = identity.clock.now()
98
98
+
return {typ: 'act', clk, msg, dat, opt} as ActionMap[N]
99
99
+
},
100
100
+
101
101
+
addMiddleware: (handler, position) => {
102
102
+
if (middleware.current.indexOf(handler) !== -1) {
103
103
+
return
104
104
+
}
105
105
+
106
106
+
switch (position) {
107
107
+
case 'push':
108
108
+
case undefined:
109
109
+
middleware.current = [...middleware.current, handler]
110
110
+
break
111
111
+
112
112
+
case 'shift':
113
113
+
middleware.current = [handler, ...middleware.current]
114
114
+
break
115
115
+
116
116
+
default: {
117
117
+
if (middleware.current.length > position) {
118
118
+
const prefix = middleware.current.slice(0, position)
119
119
+
const suffix = middleware.current.slice(position)
120
120
+
121
121
+
middleware.current = [...prefix, handler, ...suffix]
122
122
+
}
123
123
+
}
124
124
+
}
125
125
+
},
126
126
+
127
127
+
removeMiddleware: (handler) => {
128
128
+
const index = middleware.current.indexOf(handler)
129
129
+
if (index >= 0) {
130
130
+
const prefix = middleware.current.slice(0, index)
131
131
+
const suffix = middleware.current.slice(index + 1)
132
132
+
133
133
+
middleware.current = [...prefix, ...suffix]
134
134
+
}
135
135
+
},
136
136
+
})
137
137
+
138
138
+
return (
139
139
+
<ActionDispatchContext.Provider value={context.current}>
140
140
+
{props.children}
141
141
+
</ActionDispatchContext.Provider>
142
142
+
)
143
143
+
}
144
144
+
145
145
+
export function useActionDispatch() {
146
146
+
const context = useContext(ActionDispatchContext)
147
147
+
if (!context) throw new Error('expected to be called inside an actions dispatch context!')
148
148
+
149
149
+
return context
150
150
+
}
+25
-241
src/client/skypod/context.tsx
···
1
1
-
import {createContext} from 'preact'
2
2
-
import {useContext, useEffect, useRef} from 'preact/hooks'
3
3
-
import {z} from 'zod/v4'
4
4
-
5
5
-
import {IdentID} from '#realm/protocol/index'
6
6
-
import {Action, ActionMap, ActionOpts, actionSchema} from '#skypod/actions'
7
7
-
import {feedSchema} from '#skypod/schema'
8
8
-
9
9
-
import {useDatabase} from '#client/root/context-database'
10
10
-
import {useRealmConnection} from '#realm/client/context-connection'
11
11
-
import {useRealmIdentity} from '#realm/client/context-identity'
12
12
-
13
13
-
import {LogicalClock} from '#realm/protocol/logical-clock'
14
14
-
import FeedFetchWorker from './feed-fetch.worker?worker'
15
15
-
import {createFeedMiddleware} from './middleware-feeds'
16
16
-
17
17
-
export type MiddlewareFn = (
18
18
-
this: undefined,
19
19
-
action: Action,
20
20
-
) => void | Action[] | Promise<void | Action[]>
21
21
-
22
22
-
export type MiddlewareRouterFn<K extends keyof ActionMap> = (
23
23
-
this: undefined,
24
24
-
action: ActionMap[K],
25
25
-
) => void | Action[] | Promise<void | Action[]>
26
26
-
27
27
-
export type MiddlewarePosition = 'push' | 'shift' | number
28
28
-
29
29
-
export type Middleware = MiddlewareFn | {[K in keyof ActionMap]?: MiddlewareRouterFn<K>}
30
30
-
31
31
-
export interface SkypodContext {
32
32
-
action<N extends keyof ActionMap>(msg: N, dat: ActionMap[N]['dat'], opt?: ActionOpts): Action
33
33
-
dispatch(this: void, action: Action): Promise<void>
34
34
-
35
35
-
addMiddleware(this: void, handler: Middleware, position?: MiddlewarePosition): void
36
36
-
removeMiddleware(this: void, handler: Middleware): void
37
37
-
}
38
38
-
39
39
-
const SkypodContext = createContext<SkypodContext | null>(null)
40
40
-
41
41
-
///
42
42
-
43
43
-
export const SkypodProvider: preact.FunctionComponent<{children: preact.ComponentChildren}> = (
44
44
-
props,
45
45
-
) => {
46
46
-
const {db} = useDatabase()
47
47
-
const {identity} = useRealmIdentity()
48
48
-
const {realm} = useRealmConnection()
49
49
-
50
50
-
const processor = useRef<Worker>(null)
51
51
-
52
52
-
// Initial middleware - feed management
53
53
-
const middleware = useRef<Middleware[]>([createFeedMiddleware(db, processor.current)])
54
54
-
55
55
-
const context = useRef<SkypodContext>({
56
56
-
dispatch: async <K extends keyof ActionMap>(action: ActionMap[K]) => {
57
57
-
try {
58
58
-
const {identid: actor} = LogicalClock.extract(action.clk)
59
59
-
await db.transaction('rw', [db.clocks, db.actions], async (tx) => {
60
60
-
await db.actions.add({clock: action.clk, actor, action})
61
61
-
62
62
-
const extant = await tx.clocks.get(actor)
63
63
-
if (!extant || LogicalClock.compare(extant.clock, action.clk) < 0)
64
64
-
await tx.clocks.put({actor, clock: action.clk})
65
65
-
})
66
66
-
} catch (err: unknown) {
67
67
-
if (typeof err === 'object' && err && 'name' in err && err.name === 'ConstraintError') {
68
68
-
console.debug('duplicate action ignored', action)
69
69
-
} else {
70
70
-
console.error('failed to store action:', err)
71
71
-
}
72
72
-
73
73
-
return
74
74
-
}
75
75
-
76
76
-
const actions = [action] as Action[]
77
77
-
for (const action of actions) {
78
78
-
for (const mware of middleware.current) {
79
79
-
const mwarefn =
80
80
-
typeof mware === 'function'
81
81
-
? mware
82
82
-
: (mware[action.msg] as MiddlewareRouterFn<typeof action.msg> | undefined)
83
83
-
84
84
-
if (mwarefn) {
85
85
-
try {
86
86
-
const result = await Promise.resolve(mwarefn.call(undefined, action))
87
87
-
if (result !== void 0) {
88
88
-
actions.push(...result)
89
89
-
}
90
90
-
} catch (err: unknown) {
91
91
-
console.error(`middleware error for ${action.msg}:`, err)
92
92
-
continue // TODO: ???
93
93
-
}
94
94
-
}
95
95
-
}
96
96
-
}
97
97
-
98
98
-
// broadcast if not a local action
99
99
-
if (!action.opt?.local) {
100
100
-
realm.value?.broadcast([action], false)
101
101
-
}
102
102
-
},
103
103
-
104
104
-
action: <N extends keyof ActionMap>(
105
105
-
msg: N,
106
106
-
dat: ActionMap[N]['dat'],
107
107
-
opt?: ActionOpts,
108
108
-
): ActionMap[N] => {
109
109
-
const clk = identity.clock.now()
110
110
-
return {typ: 'act', clk, msg, dat, opt} as ActionMap[N]
111
111
-
},
112
112
-
113
113
-
addMiddleware: (handler, position) => {
114
114
-
if (middleware.current.indexOf(handler) !== -1) {
115
115
-
return
116
116
-
}
117
117
-
118
118
-
switch (position) {
119
119
-
case 'push':
120
120
-
case undefined:
121
121
-
middleware.current = [...middleware.current, handler]
122
122
-
break
123
123
-
124
124
-
case 'shift':
125
125
-
middleware.current = [handler, ...middleware.current]
126
126
-
break
127
127
-
128
128
-
default: {
129
129
-
if (middleware.current.length > position) {
130
130
-
const prefix = middleware.current.slice(0, position)
131
131
-
const suffix = middleware.current.slice(position)
132
132
-
133
133
-
middleware.current = [...prefix, handler, ...suffix]
134
134
-
}
135
135
-
}
136
136
-
}
137
137
-
},
138
138
-
139
139
-
removeMiddleware: (handler) => {
140
140
-
const index = middleware.current.indexOf(handler)
141
141
-
if (index >= 0) {
142
142
-
const prefix = middleware.current.slice(0, index)
143
143
-
const suffix = middleware.current.slice(index + 1)
144
144
-
145
145
-
middleware.current = [...prefix, ...suffix]
146
146
-
}
147
147
-
},
148
148
-
})
1
1
+
import {DatabaseProvider} from '#client/root/context-database'
2
2
+
import {RealmConnectionFallbackProps, RealmConnectionProvider} from '#realm/client/context-connection'
3
3
+
import {RealmIdentityFallbackProps, RealmIdentityProvider} from '#realm/client/context-identity'
149
4
150
150
-
// watch the connection
151
151
-
// while we're connected, watch peers for action messages
152
152
-
useEffect(() => {
153
153
-
const connection = realm.value
154
154
-
if (!connection) return
5
5
+
import {ActionDispatchProvider} from './action-dispatch/context'
6
6
+
import {EffectsConnection} from './effects-connection'
7
7
+
import {EffectsFeedProcessor} from './effects-feed-processor'
155
8
156
156
-
// we're connected, handle messages from peers
157
157
-
const handler = (event: CustomEvent<{identid: IdentID; data: unknown}>) => {
158
158
-
const go = async () => {
159
159
-
const json: unknown =
160
160
-
typeof event.detail.data === 'string' ? JSON.parse(event.detail.data) : event.detail.data
161
161
-
const data: unknown[] = Array.isArray(json) ? json : [json]
162
162
-
163
163
-
for (const datum of data) {
164
164
-
const parsed = actionSchema.safeParse(datum)
165
165
-
if (parsed.success) {
166
166
-
console.log('handling forwarded event:', parsed)
167
167
-
168
168
-
identity.clock.tick(parsed.data.clk)
169
169
-
await context.current.dispatch({...parsed.data, opt: {local: true}})
170
170
-
}
171
171
-
}
172
172
-
}
173
173
-
174
174
-
go().catch((err: unknown) => {
175
175
-
console.error(err)
176
176
-
return
177
177
-
})
178
178
-
}
179
179
-
180
180
-
connection.addEventListener('peerdata', handler as EventListener)
181
181
-
connection.addEventListener('wsdata', handler as EventListener)
182
182
-
return () => {
183
183
-
connection.removeEventListener('peerdata', handler as EventListener)
184
184
-
connection.removeEventListener('wsdata', handler as EventListener)
185
185
-
}
186
186
-
}, [context, identity, realm.value])
187
187
-
188
188
-
const patchSchema = z.union([
189
189
-
z.object({
190
190
-
msg: z.literal('patch'),
191
191
-
key: z.string(),
192
192
-
changes: feedSchema.partial(),
193
193
-
}),
194
194
-
z.object({
195
195
-
msg: z.literal('error'),
196
196
-
key: z.string(),
197
197
-
error: z.string(),
198
198
-
}),
199
199
-
])
200
200
-
201
201
-
// start feed processor worker
202
202
-
useEffect(() => {
203
203
-
const worker = new FeedFetchWorker()
204
204
-
205
205
-
worker.onmessage = async (event: MessageEvent) => {
206
206
-
const parsed = patchSchema.safeParse(event.data)
207
207
-
console.log('message from fetch worker', parsed)
208
208
-
209
209
-
switch (parsed.data?.msg) {
210
210
-
case 'patch': {
211
211
-
const action = context.current.action('feed:patch', {
212
212
-
url: parsed.data.key,
213
213
-
payload: parsed.data.changes,
214
214
-
})
215
215
-
console.log('sending action:', action)
216
216
-
await context.current.dispatch(action)
217
217
-
break
218
218
-
}
219
219
-
220
220
-
case 'error':
221
221
-
default:
222
222
-
console.error('unknown message from worker', parsed)
223
223
-
}
224
224
-
}
225
225
-
226
226
-
worker.onerror = (error) => {
227
227
-
console.error('Feed processor worker error:', error)
228
228
-
}
229
229
-
230
230
-
worker.postMessage({msg: 'start', identid: identity.identid})
231
231
-
232
232
-
processor.current = worker
233
233
-
return () => {
234
234
-
worker.terminate()
235
235
-
}
236
236
-
})
237
237
-
238
238
-
console.log('rendering the skypod context')
239
239
-
return <SkypodContext.Provider value={context.current}>{props.children}</SkypodContext.Provider>
9
9
+
export interface SkypodProviderProps {
10
10
+
websocketUrl: string
11
11
+
identityFallback: (props: RealmIdentityFallbackProps) => preact.ComponentChild
12
12
+
connectionFallback: (props: RealmConnectionFallbackProps) => preact.ComponentChild
13
13
+
children: preact.ComponentChildren
240
14
}
241
15
242
242
-
export function useSkypod() {
243
243
-
const context = useContext(SkypodContext)
244
244
-
if (!context) throw new Error('expected to be called inside a database context!')
16
16
+
export const SkypodProvider: preact.FunctionComponent<SkypodProviderProps> = (props) => {
17
17
+
return (
18
18
+
<DatabaseProvider>
19
19
+
<RealmIdentityProvider fallback={props.identityFallback}>
20
20
+
<RealmConnectionProvider fallback={props.connectionFallback} url={props.websocketUrl}>
21
21
+
<ActionDispatchProvider>
22
22
+
<EffectsConnection />
23
23
+
<EffectsFeedProcessor />
245
24
246
246
-
return context
25
25
+
{props.children}
26
26
+
</ActionDispatchProvider>
27
27
+
</RealmConnectionProvider>
28
28
+
</RealmIdentityProvider>
29
29
+
</DatabaseProvider>
30
30
+
)
247
31
}
+52
src/client/skypod/effects-connection.tsx
···
1
1
+
import {useEffect} from 'preact/hooks'
2
2
+
3
3
+
import {useRealmConnection} from '#realm/client/context-connection'
4
4
+
import {useRealmIdentity} from '#realm/client/context-identity'
5
5
+
import {IdentID} from '#realm/protocol/index'
6
6
+
7
7
+
import {actionSchema} from '#skypod/actions'
8
8
+
import {useActionDispatch} from './action-dispatch/context'
9
9
+
10
10
+
export const EffectsConnection: preact.FunctionComponent = () => {
11
11
+
const {identity} = useRealmIdentity()
12
12
+
const {realm} = useRealmConnection()
13
13
+
const dispatcher = useActionDispatch()
14
14
+
15
15
+
// watch the connection
16
16
+
// while we're connected, watch peers for action messages
17
17
+
18
18
+
useEffect(() => {
19
19
+
const connection = realm.value
20
20
+
if (!connection) return
21
21
+
22
22
+
const handler = (event: CustomEvent<{identid: IdentID; data: unknown}>) => {
23
23
+
const go = async () => {
24
24
+
const json: unknown = typeof event.detail.data === 'string' ? JSON.parse(event.detail.data) : event.detail.data
25
25
+
const data: unknown[] = Array.isArray(json) ? json : [json]
26
26
+
27
27
+
for (const datum of data) {
28
28
+
const parsed = actionSchema.safeParse(datum)
29
29
+
if (parsed.success) {
30
30
+
console.log('handling forwarded event:', parsed)
31
31
+
32
32
+
identity.clock.tick(parsed.data.clk)
33
33
+
await dispatcher.dispatch({...parsed.data, opt: {local: true}})
34
34
+
}
35
35
+
}
36
36
+
}
37
37
+
38
38
+
go().catch((exc: unknown) => {
39
39
+
console.error(exc)
40
40
+
})
41
41
+
}
42
42
+
43
43
+
connection.addEventListener('peerdata', handler as EventListener)
44
44
+
connection.addEventListener('wsdata', handler as EventListener)
45
45
+
return () => {
46
46
+
connection.removeEventListener('peerdata', handler as EventListener)
47
47
+
connection.removeEventListener('wsdata', handler as EventListener)
48
48
+
}
49
49
+
}, [dispatcher, identity, realm.value])
50
50
+
51
51
+
return <></>
52
52
+
}
+70
src/client/skypod/effects-feed-processor.tsx
···
1
1
+
import {useEffect} from 'preact/hooks'
2
2
+
import {z} from 'zod/v4'
3
3
+
4
4
+
import {useDatabase} from '#client/root/context-database'
5
5
+
import {Action} from '#skypod/actions'
6
6
+
7
7
+
import {useActionDispatch} from './action-dispatch/context'
8
8
+
9
9
+
import {useRealmIdentity} from '#realm/client/context-identity.js'
10
10
+
import {ReqEvent, respSchema} from './feed-processor/api'
11
11
+
import {createFeedMiddleware} from './feed-processor/middleware'
12
12
+
import FeedFetchWorker from './feed-processor/worker?worker'
13
13
+
14
14
+
export const EffectsFeedProcessor: preact.FunctionComponent = () => {
15
15
+
const {db} = useDatabase()
16
16
+
const {identity} = useRealmIdentity()
17
17
+
const dispatcher = useActionDispatch()
18
18
+
19
19
+
useEffect(() => {
20
20
+
const worker = new FeedFetchWorker()
21
21
+
22
22
+
worker.onmessage = async (event: MessageEvent) => {
23
23
+
const parsed = respSchema.safeParse(event.data)
24
24
+
if (!parsed.success) {
25
25
+
console.warn('unknown message from feed worker?', z.treeifyError(parsed.error))
26
26
+
return
27
27
+
}
28
28
+
29
29
+
let action: Action
30
30
+
switch (parsed.data.msg) {
31
31
+
case 'feed:patch':
32
32
+
action = dispatcher.action('feed:patch', {
33
33
+
url: parsed.data.dat.feedurl,
34
34
+
payload: parsed.data.dat.changes,
35
35
+
})
36
36
+
break
37
37
+
38
38
+
default:
39
39
+
console.log('not handled yet:', parsed.data)
40
40
+
return
41
41
+
}
42
42
+
43
43
+
console.log('sending action:', action)
44
44
+
await dispatcher.dispatch(action)
45
45
+
}
46
46
+
47
47
+
worker.onerror = (error: unknown) => {
48
48
+
console.error('Feed processor worker error:', error)
49
49
+
}
50
50
+
51
51
+
// attach the middleware
52
52
+
const middleware = createFeedMiddleware(db, worker)
53
53
+
dispatcher.addMiddleware(middleware)
54
54
+
55
55
+
// kick it off only after we've attached middleware to handle the results
56
56
+
worker.postMessage({
57
57
+
typ: 'evt',
58
58
+
msg: 'init',
59
59
+
dat: {identid: identity.identid},
60
60
+
} satisfies ReqEvent)
61
61
+
62
62
+
// shut it down when we're done
63
63
+
return () => {
64
64
+
dispatcher.removeMiddleware(middleware)
65
65
+
worker.terminate()
66
66
+
}
67
67
+
}, [identity, db, dispatcher])
68
68
+
69
69
+
return <></>
70
70
+
}
+46
-46
src/client/skypod/feed-fetch.worker.ts
src/client/skypod/feed-processor/worker.ts
···
1
1
import {IndexableType} from 'dexie'
2
2
-
import {z} from 'zod/v4'
3
2
4
3
import {Database} from '#client/root/service-database'
5
4
import {normalizeProtocolError} from '#common/errors'
6
6
-
import {IdentBrand, IdentID} from '#realm/protocol/index'
5
5
+
import {IdentID} from '#realm/protocol/index'
7
6
import {LCTimestamp, LogicalClock} from '#realm/protocol/logical-clock'
8
7
9
9
-
const msgStartSchema = z.object({
10
10
-
msg: z.literal('start'),
11
11
-
identid: IdentBrand.schema,
12
12
-
})
8
8
+
import {reqSchema} from './api'
13
9
14
14
-
const msgPollSchema = z.object({
15
15
-
msg: z.literal('poll'),
16
16
-
})
10
10
+
let instance: FeedFetch | undefined
11
11
+
onmessage = (event: MessageEvent) => {
12
12
+
const parsed = reqSchema.safeParse(event.data)
13
13
+
if (!parsed.success) return // unexpected (preact sends page events)
17
14
18
18
-
const msgStopSchema = z.object({
19
19
-
msg: z.literal('stop'),
20
20
-
})
15
15
+
switch (parsed.data.msg) {
16
16
+
case 'init':
17
17
+
instance = new FeedFetch(parsed.data.dat.identid)
18
18
+
break
21
19
22
22
-
const msgSchema = z.discriminatedUnion('msg', [msgStartSchema, msgPollSchema, msgStopSchema])
20
20
+
case 'work':
21
21
+
instance?.processUrls(parsed.data.dat.urls).catch((exc: unknown) => {
22
22
+
console.error('error processing urls:', exc)
23
23
+
})
24
24
+
break
25
25
+
}
26
26
+
}
23
27
24
28
class FeedFetch {
25
29
#db: Database
26
30
#owner: IdentID
27
31
#clock: LogicalClock
28
28
-
#timeout: ReturnType<typeof setTimeout>
32
32
+
#timeout?: ReturnType<typeof setTimeout>
33
33
+
34
34
+
// worker, so we have it get it's own db and clock
29
35
30
36
constructor(identid: IdentID) {
31
37
this.#db = new Database()
32
38
this.#clock = new LogicalClock(identid)
33
39
this.#owner = identid
34
40
35
35
-
this.#timeout = setTimeout(this.#poll, 10000)
41
41
+
this.#poll()
36
42
}
37
43
38
44
stop() {
39
39
-
clearTimeout(this.#timeout)
45
45
+
if (this.#timeout) clearTimeout(this.#timeout)
40
46
}
41
47
42
42
-
poll() {
43
43
-
clearTimeout(this.#timeout)
44
44
-
this.#poll()
48
48
+
async processPending() {
49
49
+
const pendingFeeds = this.#db.feeds.where('lastRefresh.status').equals('pending')
50
50
+
return await this.#db.withLock(
51
51
+
'feeds',
52
52
+
pendingFeeds,
53
53
+
this.#clock,
54
54
+
this.#owner,
55
55
+
this.#processLocked,
56
56
+
)
57
57
+
}
58
58
+
59
59
+
async processUrls(urls: string[]) {
60
60
+
const requestedFeeds = this.#db.feeds.where('url').anyOf(urls)
61
61
+
return await this.#db.withLock(
62
62
+
'feeds',
63
63
+
requestedFeeds,
64
64
+
this.#clock,
65
65
+
this.#owner,
66
66
+
this.#processLocked,
67
67
+
)
45
68
}
46
69
47
70
#poll = () => {
48
48
-
const pendingFeeds = this.#db.feeds.where('lastRefresh.status').equals('pending')
49
49
-
this.#db
50
50
-
.withLock('feeds', pendingFeeds, this.#clock, this.#owner, this.#pollLocked)
71
71
+
if (this.#timeout) clearTimeout(this.#timeout)
72
72
+
73
73
+
this.processPending()
51
74
.catch((ex: unknown) => {
52
75
console.error('problem locking pending feeds', ex)
53
76
})
···
56
79
})
57
80
}
58
81
59
59
-
#pollLocked = async (urls: IndexableType[], lock: LCTimestamp) => {
82
82
+
#processLocked = async (urls: IndexableType[], lock: LCTimestamp) => {
60
83
console.log('checking feeds...', urls, lock)
61
84
62
85
try {
···
107
130
} catch (ex: unknown) {
108
131
console.error('problem fetching pending feeds:', ex)
109
132
}
110
110
-
}
111
111
-
}
112
112
-
113
113
-
let fetcher: FeedFetch
114
114
-
115
115
-
onmessage = (event: MessageEvent) => {
116
116
-
const parsed = msgSchema.safeParse(event.data)
117
117
-
switch (parsed.data?.msg) {
118
118
-
case 'start':
119
119
-
fetcher = new FeedFetch(parsed.data.identid)
120
120
-
break
121
121
-
122
122
-
case 'poll':
123
123
-
fetcher.poll()
124
124
-
break
125
125
-
126
126
-
case 'stop':
127
127
-
fetcher.stop()
128
128
-
break
129
129
-
130
130
-
default:
131
131
-
console.warn('unknown message, bailing', event.data, parsed.error)
132
132
-
return
133
133
}
134
134
}
135
135
+42
src/client/skypod/feed-processor/api.ts
···
1
1
+
import {IdentBrand} from '#realm/protocol'
2
2
+
import {makeEventSchema} from '#realm/protocol/schema.js'
3
3
+
import {z} from 'zod/v4'
4
4
+
5
5
+
import {feedEntrySchema, feedSchema} from '#skypod/schema'
6
6
+
7
7
+
export const initEvent = makeEventSchema(
8
8
+
'init',
9
9
+
z.object({
10
10
+
identid: IdentBrand.schema,
11
11
+
}),
12
12
+
)
13
13
+
14
14
+
export const workEvent = makeEventSchema(
15
15
+
'work',
16
16
+
z.object({
17
17
+
urls: z.array(z.url()),
18
18
+
}),
19
19
+
)
20
20
+
21
21
+
export const reqSchema = z.union([initEvent, workEvent])
22
22
+
export type ReqEvent = z.infer<typeof reqSchema>
23
23
+
24
24
+
export const patchFeedEvent = makeEventSchema(
25
25
+
'feed:patch',
26
26
+
z.object({
27
27
+
feedurl: z.string(),
28
28
+
changes: feedSchema.partial(),
29
29
+
}),
30
30
+
)
31
31
+
32
32
+
export const patchFeedEntryEvent = makeEventSchema(
33
33
+
'feedentry:patch',
34
34
+
z.object({
35
35
+
feedurl: z.string(),
36
36
+
entryguid: z.string(),
37
37
+
changes: feedEntrySchema.partial(),
38
38
+
}),
39
39
+
)
40
40
+
41
41
+
export const respSchema = z.union([patchFeedEvent, patchFeedEntryEvent])
42
42
+
export type RespEvent = z.infer<typeof respSchema>
+12
-10
src/client/skypod/middleware-feeds.ts
src/client/skypod/feed-processor/middleware.ts
···
1
1
import {Database} from '#client/root/service-database'
2
2
-
import {Middleware} from './context'
2
2
+
import {Middleware} from '../action-dispatch/context'
3
3
4
4
-
/**
5
5
-
* Feed management middleware
6
6
-
*
7
7
-
* Handles feed lifecycle actions:
8
8
-
* - feed:add - Creates feed record and triggers fetch
9
9
-
* - feed:remove - Deletes feed from database
10
10
-
* - feed:patch - Updates feed metadata
11
11
-
*/
4
4
+
import {ReqEvent} from './api'
5
5
+
12
6
export function createFeedMiddleware(db: Database, worker: Worker | null): Middleware {
13
7
return {
14
8
'feed:add': async (action) => {
···
26
20
},
27
21
})
28
22
29
29
-
worker?.postMessage({msg: 'poll'})
23
23
+
worker?.postMessage({
24
24
+
typ: 'evt',
25
25
+
msg: 'work',
26
26
+
dat: {urls: [action.dat.url]},
27
27
+
} satisfies ReqEvent)
30
28
},
31
29
32
30
'feed:remove': async (action) => {
···
35
33
36
34
'feed:patch': async (action) => {
37
35
await db.feeds.update(action.dat.url, action.dat.payload)
36
36
+
},
37
37
+
38
38
+
'feedentry:patch': (action) => {
39
39
+
console.log('feedentry patch', action)
38
40
},
39
41
}
40
42
}
+1
-1
src/server/routes-api/middleware-cors.ts
···
1
1
-
import { RequestHandler } from "express"
1
1
+
import {RequestHandler} from 'express'
2
2
3
3
export const corsProxy: RequestHandler = async (req, res) => {
4
4
const url = req.query.url
+13
-11
src/server/routes-api/middleware-reader.ts
···
1
1
import {Readability} from '@mozilla/readability'
2
2
-
import {RequestHandler} from "express"
2
2
+
import {RequestHandler} from 'express'
3
3
import {parseHTML} from 'linkedom'
4
4
5
5
export const readabilityProxy: RequestHandler = async (req, res) => {
···
36
36
}
37
37
38
38
res.setHeader('Content-Type', 'application/json; charset=utf-8')
39
39
-
res.send(JSON.stringify({
40
40
-
title: article.title,
41
41
-
byline: article.byline,
42
42
-
content: article.content,
43
43
-
textContent: article.textContent,
44
44
-
excerpt: article.excerpt,
45
45
-
siteName: article.siteName,
46
46
-
length: article.length,
47
47
-
url,
48
48
-
}))
39
39
+
res.send(
40
40
+
JSON.stringify({
41
41
+
title: article.title,
42
42
+
byline: article.byline,
43
43
+
content: article.content,
44
44
+
textContent: article.textContent,
45
45
+
excerpt: article.excerpt,
46
46
+
siteName: article.siteName,
47
47
+
length: article.length,
48
48
+
url,
49
49
+
}),
50
50
+
)
49
51
} catch (error) {
50
52
console.error('Reader mode error:', error)
51
53
res.status(500).json({
+2
-2
src/server/routes-api/middleware.ts
···
1
1
import {Router} from 'express'
2
2
-
import { corsProxy } from './middleware-cors'
3
3
-
import { readabilityProxy } from './middleware-reader'
2
2
+
import {corsProxy} from './middleware-cors'
3
3
+
import {readabilityProxy} from './middleware-reader'
4
4
5
5
export const apiRouter = Router()
6
6