podcast manager

add peer capabilities announcements

+248 -31
+93 -15
src/client/realm/service-connection-peer.ts
··· 1 import SimplePeer from 'simple-peer' 2 3 import * as protocol from '#common/protocol' 4 import {IdentID} from '#common/protocol' 5 6 - import {BlockingQueue} from '#common/async/blocking-queue' 7 - import {sleep} from '#common/async/sleep' 8 - import z from 'zod/v4' 9 import {RealmSyncManager} from './service-connection-sync' 10 11 const realmRtcAutohandleSchema = z.union([ 12 protocol.realmRtcPingRequestSchema, 13 protocol.realmRtcPongResponseSchema, 14 ]) 15 16 /** a single webrtc peer connection within a realm */ ··· 23 #queue: BlockingQueue<unknown> 24 #abort: AbortController 25 26 constructor(sync: RealmSyncManager, identid: IdentID, initiator: boolean) { 27 super({ 28 initiator, ··· 58 } 59 60 sendJson<T extends unknown>(data: T) { 61 - console.debug('sending:', this.identid, data) 62 63 - this.send(JSON.stringify(data)) 64 } 65 66 #dispatch(type: string, detail?: object) { ··· 140 case 'realm.rtc.pong': 141 await this.#receivePong(parsed.data) 142 continue 143 } 144 } 145 } 146 147 #receivePing = async (ping: protocol.RealmRtcPingRequest) => { 148 const peerClocks = await this.#sync.buildSyncState() 149 ··· 181 console.debug('ping loop running for:', this.identid) 182 183 if (this.initiator) { 184 await this.ping() 185 } 186 ··· 197 const peerClocks = await this.#sync.buildSyncState() 198 const peerRequestsSync = this.identid === this.#sync.chooseSyncPeer(this.#sync.knownPeers) 199 200 - console.log('sending ping to: ', this.identid, peerClocks, peerRequestsSync) 201 - this.sendJson<protocol.RealmRtcPingRequest>({ 202 - typ: 'req', 203 - msg: 'realm.rtc.ping', 204 - seq: this.#seq++, 205 - dat: { 206 - peerClocks, 207 - peerRequestsSync, 208 - }, 209 - }) 210 } 211 }
··· 1 import SimplePeer from 'simple-peer' 2 + import z from 'zod/v4' 3 4 + import {BlockingQueue} from '#common/async/blocking-queue' 5 + import {sleep} from '#common/async/sleep' 6 import * as protocol from '#common/protocol' 7 import {IdentID} from '#common/protocol' 8 + import {DeviceCaps, DeviceInfo} from '#common/protocol/device' 9 10 import {RealmSyncManager} from './service-connection-sync' 11 12 const realmRtcAutohandleSchema = z.union([ 13 protocol.realmRtcPingRequestSchema, 14 protocol.realmRtcPongResponseSchema, 15 + protocol.realmRtcAnnounceRequestSchema, 16 + protocol.realmRtcAnnounceResponseSchema, 17 ]) 18 19 /** a single webrtc peer connection within a realm */ ··· 26 #queue: BlockingQueue<unknown> 27 #abort: AbortController 28 29 + #announced = false 30 + #peercaps?: DeviceCaps 31 + #peerinfo?: DeviceInfo 32 + 33 constructor(sync: RealmSyncManager, identid: IdentID, initiator: boolean) { 34 super({ 35 initiator, ··· 65 } 66 67 sendJson<T extends unknown>(data: T) { 68 + this.send(JSON.stringify(data)) 69 + } 70 71 + get deviceCaps() { 72 + return this.#peercaps 73 + } 74 + get deviceInfo() { 75 + return this.#peerinfo 76 } 77 78 #dispatch(type: string, detail?: object) { ··· 152 case 'realm.rtc.pong': 153 await this.#receivePong(parsed.data) 154 continue 155 + 156 + case 'realm.rtc.announce': 157 + switch (parsed.data.typ) { 158 + case 'req': 159 + await this.#receiveAnnounceReq(parsed.data) 160 + continue 161 + 162 + case 'res': 163 + await this.#receiveAnnounceRes(parsed.data) 164 + continue 165 + } 166 } 167 } 168 } 169 170 + #receiveAnnounceReq = async (ping: protocol.RealmRtcAnnounceRequest) => { 171 + const peerClocks = await this.#sync.buildSyncState() 172 + const {deviceInfo, deviceCaps} = this.#sync 173 + 174 + // reply to pings with pongs 175 + this.sendJson<protocol.RealmRtcAnnounceResponse>({ 176 + typ: 'res', 177 + msg: 'realm.rtc.announce', 178 + seq: ping.seq, 179 + dat: {peerClocks, deviceInfo, deviceCaps}, 180 + }) 181 + 182 + // explicit sync requested 183 + if (ping.dat.peerRequestsSync) { 184 + const actions = await this.#sync.buildSyncDelta(ping.dat.peerClocks) 185 + if (actions.length) { 186 + this.sendJson(actions.map((a) => a.action)) 187 + } 188 + } 189 + 190 + this.#peercaps = ping.dat.deviceCaps 191 + this.#peerinfo = ping.dat.deviceInfo 192 + this.#dispatch('peerannounce', {identid: this.identid}) 193 + } 194 + 195 + // when a peer responds to a ping (lazy sync) 196 + #receiveAnnounceRes = async (pong: protocol.RealmRtcAnnounceResponse) => { 197 + const actions = await this.#sync.buildSyncDelta(pong.dat.peerClocks) 198 + if (actions.length) { 199 + this.sendJson(actions.map((a) => a.action)) 200 + } 201 + 202 + this.#peercaps = pong.dat.deviceCaps 203 + this.#peerinfo = pong.dat.deviceInfo 204 + 205 + this.#dispatch('peerannounce', {identid: this.identid}) 206 + } 207 + 208 #receivePing = async (ping: protocol.RealmRtcPingRequest) => { 209 const peerClocks = await this.#sync.buildSyncState() 210 ··· 242 console.debug('ping loop running for:', this.identid) 243 244 if (this.initiator) { 245 + await sleep(250) 246 await this.ping() 247 } 248 ··· 259 const peerClocks = await this.#sync.buildSyncState() 260 const peerRequestsSync = this.identid === this.#sync.chooseSyncPeer(this.#sync.knownPeers) 261 262 + const announce = !this.#announced || this.#sync.deviceChanged 263 + if (announce) { 264 + const {deviceInfo, deviceCaps} = this.#sync 265 + this.#announced = true 266 + this.sendJson<protocol.RealmRtcAnnounceRequest>({ 267 + typ: 'req', 268 + msg: 'realm.rtc.announce', 269 + seq: this.#seq++, 270 + dat: { 271 + peerClocks, 272 + peerRequestsSync, 273 + deviceCaps, 274 + deviceInfo, 275 + }, 276 + }) 277 + } else { 278 + this.sendJson<protocol.RealmRtcPingRequest>({ 279 + typ: 'req', 280 + msg: 'realm.rtc.ping', 281 + seq: this.#seq++, 282 + dat: { 283 + peerClocks, 284 + peerRequestsSync, 285 + }, 286 + }) 287 + } 288 } 289 }
+13
src/client/realm/service-connection-sync.ts
··· 1 import {Database, StoredAction} from '#client/root/service-database' 2 import {IdentID} from '#common/protocol' 3 import {LCTimestamp, LogicalClock} from '#common/protocol/logical-clock' 4 5 export class RealmSyncManager { 6 #db: Database 7 #clock: LogicalClock 8 #identid: IdentID 9 #peers: Map<IdentID, unknown> 10 11 // objects are shared with realm connection 12 ··· 15 this.#clock = clock 16 this.#identid = identid 17 this.#peers = peers 18 } 19 20 get knownPeers() { 21 return Array.from(this.#peers.keys()) 22 } 23 24 chooseSyncPeer(peerids: IdentID[]): IdentID | null {
··· 1 import {Database, StoredAction} from '#client/root/service-database' 2 import {IdentID} from '#common/protocol' 3 import {LCTimestamp, LogicalClock} from '#common/protocol/logical-clock' 4 + import {DeviceScanner} from './service-device' 5 6 export class RealmSyncManager { 7 #db: Database 8 #clock: LogicalClock 9 #identid: IdentID 10 #peers: Map<IdentID, unknown> 11 + #device: DeviceScanner 12 13 // objects are shared with realm connection 14 ··· 17 this.#clock = clock 18 this.#identid = identid 19 this.#peers = peers 20 + this.#device = new DeviceScanner() 21 } 22 23 get knownPeers() { 24 return Array.from(this.#peers.keys()) 25 + } 26 + 27 + get deviceInfo() { 28 + return this.#device.deviceInfo 29 + } 30 + get deviceCaps() { 31 + return this.#device.deviceCaps 32 + } 33 + get deviceChanged() { 34 + return false 35 } 36 37 chooseSyncPeer(peerids: IdentID[]): IdentID | null {
+24 -14
src/client/realm/service-connection.ts
··· 44 45 #identity: RealmIdentity 46 #sync: RealmSyncManager 47 48 #serverseq = 0 49 #serversync = false ··· 92 } 93 94 async requestSync() { 95 const promises = [this.#pingSocket()] 96 for (const peer of this.#peers.values()) { 97 promises.push(peer.ping()) ··· 121 } 122 123 broadcast(data: unknown, self = false) { 124 - console.debug('broadcasting:', self, data) 125 - 126 const json = JSON.stringify(data) 127 128 this.#peers.forEach((peer, identid) => { ··· 130 }) 131 132 if (this.#serversync) { 133 - console.debug('sending to server:', self, data) 134 this.#socket.send(json) 135 } 136 } 137 138 destroy() { 139 - console.debug('realm connection destroy!') 140 - 141 if (this.connected) { 142 this.#socket.close() 143 } ··· 249 continue 250 } 251 252 - console.debug('connecting...:', peerid) 253 this.#connectPeer(peerid, true) 254 } 255 ··· 332 return 333 334 case 'realm.rtc.pong': { 335 - console.debug('got a pong response from the server', parse) 336 - 337 const actions = await this.#sync.buildSyncDelta(parse.data.dat.peerClocks) 338 if (actions.length) { 339 this.#socket.send(JSON.stringify(actions.map((a) => a.action))) ··· 368 369 async #pingSocket() { 370 const peerClocks = await this.#sync.buildSyncState() 371 - this.#socketSend<protocol.RealmRtcPingRequest>({ 372 - typ: 'req', 373 - msg: 'realm.rtc.ping', 374 - seq: this.#serverseq++, 375 - dat: {peerClocks, peerRequestsSync: true}, 376 - }) 377 } 378 379 // peers
··· 44 45 #identity: RealmIdentity 46 #sync: RealmSyncManager 47 + #announced = false 48 49 #serverseq = 0 50 #serversync = false ··· 93 } 94 95 async requestSync() { 96 + this.#announced = false 97 + 98 const promises = [this.#pingSocket()] 99 for (const peer of this.#peers.values()) { 100 promises.push(peer.ping()) ··· 124 } 125 126 broadcast(data: unknown, self = false) { 127 const json = JSON.stringify(data) 128 129 this.#peers.forEach((peer, identid) => { ··· 131 }) 132 133 if (this.#serversync) { 134 this.#socket.send(json) 135 } 136 } 137 138 destroy() { 139 + console.log('realm connection destroy!') 140 if (this.connected) { 141 this.#socket.close() 142 } ··· 248 continue 249 } 250 251 + console.log('connecting to peer...:', peerid) 252 this.#connectPeer(peerid, true) 253 } 254 ··· 331 return 332 333 case 'realm.rtc.pong': { 334 const actions = await this.#sync.buildSyncDelta(parse.data.dat.peerClocks) 335 if (actions.length) { 336 this.#socket.send(JSON.stringify(actions.map((a) => a.action))) ··· 365 366 async #pingSocket() { 367 const peerClocks = await this.#sync.buildSyncState() 368 + 369 + const announce = !this.#announced || this.#sync.deviceChanged 370 + if (announce) { 371 + const {deviceInfo, deviceCaps} = this.#sync 372 + this.#announced = true 373 + this.#socketSend<protocol.RealmRtcAnnounceRequest>({ 374 + typ: 'req', 375 + msg: 'realm.rtc.announce', 376 + seq: this.#serverseq++, 377 + dat: {peerClocks, peerRequestsSync: true, deviceCaps, deviceInfo}, 378 + }) 379 + } else { 380 + this.#socketSend<protocol.RealmRtcPingRequest>({ 381 + typ: 'req', 382 + msg: 'realm.rtc.ping', 383 + seq: this.#serverseq++, 384 + dat: {peerClocks, peerRequestsSync: true}, 385 + }) 386 + } 387 } 388 389 // peers
+27
src/client/realm/service-device.ts
···
··· 1 + import {DeviceCaps, DeviceInfo} from '#common/protocol/device' 2 + 3 + export class DeviceScanner { 4 + #caps: DeviceCaps 5 + #info: DeviceInfo 6 + 7 + // some day do detection, if we ever actually care 8 + 9 + constructor() { 10 + this.#info = { 11 + ua: window.navigator.userAgent, 12 + } 13 + 14 + this.#caps = { 15 + corsFetch: false, 16 + networkQuality: undefined, 17 + } 18 + } 19 + 20 + get deviceCaps() { 21 + return this.#caps 22 + } 23 + 24 + get deviceInfo() { 25 + return this.#info 26 + } 27 + }
+16
src/common/protocol/device.ts
···
··· 1 + import {z} from 'zod/v4' 2 + 3 + export const deviceCapsSchema = z.object({ 4 + corsFetch: z.boolean().default(false), 5 + networkQuality: z.int().positive().lte(5).optional(), 6 + }) 7 + 8 + export const deviceInfoSchema = z.object({ 9 + ua: z.string().optional(), 10 + name: z.string().optional(), 11 + battery: z.boolean().optional(), 12 + metered: z.boolean().optional(), 13 + }) 14 + 15 + export type DeviceCaps = z.infer<typeof deviceCapsSchema> 16 + export type DeviceInfo = z.infer<typeof deviceInfoSchema>
+29 -2
src/common/protocol/messages.ts
··· 2 import {z} from 'zod/v4' 3 4 import {IdentBrand} from './brands' 5 import {LogicalClock} from './logical-clock' 6 import { 7 makeEmptyRequestSchema, ··· 12 13 export const serverPeerIdSchema = z.literal('server') 14 export type ServerPeerId = z.infer<typeof serverPeerIdSchema> 15 16 /// preauth 17 ··· 62 export const realmRtcPingRequestSchema = makeRequestSchema( 63 'realm.rtc.ping', 64 z.object({ 65 - peerClocks: z.record(z.string(), LogicalClock.schema.nullable()), 66 peerRequestsSync: z.boolean(), 67 }), 68 ) 69 70 export const realmRtcPongResponseSchema = makeResponseSchema( 71 'realm.rtc.pong', 72 z.object({ 73 - peerClocks: z.record(z.string(), LogicalClock.schema.nullable()), 74 }), 75 ) 76 77 export const realmRtcPingPongMessageSchema = z.union([ 78 realmRtcPingRequestSchema, 79 realmRtcPongResponseSchema, 80 ]) ··· 114 ) 115 116 export type RealmBroadcastEvent = z.infer<typeof realmBroadcastEventSchema> 117 export type RealmRtcPingRequest = z.infer<typeof realmRtcPingRequestSchema> 118 export type RealmRtcPongResponse = z.infer<typeof realmRtcPongResponseSchema> 119 export type RealmRtcSignalEvent = z.infer<typeof realmRtcSignalEventSchema>
··· 2 import {z} from 'zod/v4' 3 4 import {IdentBrand} from './brands' 5 + import {deviceCapsSchema, deviceInfoSchema} from './device' 6 import {LogicalClock} from './logical-clock' 7 import { 8 makeEmptyRequestSchema, ··· 13 14 export const serverPeerIdSchema = z.literal('server') 15 export type ServerPeerId = z.infer<typeof serverPeerIdSchema> 16 + 17 + export const peerClocksSchema = z.record(z.string(), LogicalClock.schema.nullable()) 18 19 /// preauth 20 ··· 65 export const realmRtcPingRequestSchema = makeRequestSchema( 66 'realm.rtc.ping', 67 z.object({ 68 + peerClocks: peerClocksSchema, 69 peerRequestsSync: z.boolean(), 70 }), 71 ) 72 73 export const realmRtcPongResponseSchema = makeResponseSchema( 74 'realm.rtc.pong', 75 + z.object({peerClocks: peerClocksSchema}), 76 + ) 77 + 78 + export const realmRtcAnnounceRequestSchema = makeRequestSchema( 79 + 'realm.rtc.announce', 80 z.object({ 81 + // like a ping 82 + peerClocks: peerClocksSchema, 83 + peerRequestsSync: z.boolean(), 84 + 85 + // but with caps and device info 86 + deviceCaps: deviceCapsSchema.optional(), 87 + deviceInfo: deviceInfoSchema.optional(), 88 + }), 89 + ) 90 + 91 + export const realmRtcAnnounceResponseSchema = makeResponseSchema( 92 + 'realm.rtc.announce', 93 + z.object({ 94 + peerClocks: peerClocksSchema, 95 + deviceCaps: deviceCapsSchema.optional(), 96 + deviceInfo: deviceInfoSchema.optional(), 97 }), 98 ) 99 100 export const realmRtcPingPongMessageSchema = z.union([ 101 + realmRtcAnnounceRequestSchema, 102 + realmRtcAnnounceResponseSchema, 103 realmRtcPingRequestSchema, 104 realmRtcPongResponseSchema, 105 ]) ··· 139 ) 140 141 export type RealmBroadcastEvent = z.infer<typeof realmBroadcastEventSchema> 142 + export type RealmRtcAnnounceRequest = z.infer<typeof realmRtcAnnounceRequestSchema> 143 + export type RealmRtcAnnounceResponse = z.infer<typeof realmRtcAnnounceResponseSchema> 144 export type RealmRtcPingRequest = z.infer<typeof realmRtcPingRequestSchema> 145 export type RealmRtcPongResponse = z.infer<typeof realmRtcPongResponseSchema> 146 export type RealmRtcSignalEvent = z.infer<typeof realmRtcSignalEventSchema>
+43
src/server/routes-socket/handler-realm.ts
··· 14 protocol.realmBroadcastEventSchema, 15 protocol.realmRtcSignalEventSchema, 16 protocol.realmRtcPingRequestSchema, 17 z.array(actionMessageSchema), 18 ]) 19 ··· 49 50 case 'realm.rtc.ping': 51 await socketPeerPing(ws, auth, data) 52 continue 53 54 default: ··· 146 } 147 } 148 }
··· 14 protocol.realmBroadcastEventSchema, 15 protocol.realmRtcSignalEventSchema, 16 protocol.realmRtcPingRequestSchema, 17 + protocol.realmRtcAnnounceRequestSchema, 18 z.array(actionMessageSchema), 19 ]) 20 ··· 50 51 case 'realm.rtc.ping': 52 await socketPeerPing(ws, auth, data) 53 + continue 54 + 55 + case 'realm.rtc.announce': 56 + await socketPeerAnnounce(ws, auth, data) 57 continue 58 59 default: ··· 151 } 152 } 153 } 154 + 155 + async function socketPeerAnnounce( 156 + ws: WebSocket, 157 + auth: realm.AuthenticatedIdentity, 158 + announce: protocol.RealmRtcAnnounceRequest, 159 + ) { 160 + console.log('announce from', auth.identid, announce.dat) 161 + 162 + auth.deviceCaps = announce.dat.deviceCaps 163 + auth.deviceInfo = announce.dat.deviceInfo 164 + 165 + const peerClocks = await auth.realm.storage.buildSyncState() 166 + const response: protocol.RealmRtcAnnounceResponse = { 167 + typ: 'res', 168 + msg: 'realm.rtc.announce', 169 + seq: announce.seq, 170 + dat: { 171 + peerClocks, 172 + deviceCaps: { 173 + corsFetch: true, 174 + networkQuality: 5, 175 + }, 176 + deviceInfo: { 177 + ua: process.env.SERVER_UA || 'skypod-realm', 178 + name: process.env.SERVER_NAME || 'Skypod Server', 179 + }, 180 + }, 181 + } 182 + ws.send(JSON.stringify(response)) 183 + 184 + if (announce.dat.peerRequestsSync) { 185 + const actions = await auth.realm.storage.buildSyncDelta(announce.dat.peerClocks) 186 + if (actions.length) { 187 + const actionsJson = actions.map((a) => a.action) 188 + ws.send(JSON.stringify(actionsJson)) 189 + } 190 + } 191 + }
+3
src/server/routes-socket/state.ts
··· 1 import WebSocket from 'isomorphic-ws' 2 3 import {IdentID, RealmID} from '#common/protocol' 4 import {StrictMap} from '#common/strict-map' 5 6 import {RealmStorage} from '#server/realm-storage' ··· 11 realmid: RealmID 12 identid: IdentID 13 pubkey: CryptoKey 14 } 15 16 export interface Realm {
··· 1 import WebSocket from 'isomorphic-ws' 2 3 import {IdentID, RealmID} from '#common/protocol' 4 + import {DeviceCaps, DeviceInfo} from '#common/protocol/device' 5 import {StrictMap} from '#common/strict-map' 6 7 import {RealmStorage} from '#server/realm-storage' ··· 12 realmid: RealmID 13 identid: IdentID 14 pubkey: CryptoKey 15 + deviceCaps?: DeviceCaps 16 + deviceInfo?: DeviceInfo 17 } 18 19 export interface Realm {