+76
src/pds.js
+76
src/pds.js
···
576
576
did, commitCidStr, evt
577
577
)
578
578
579
+
// Broadcast to subscribers
580
+
const evtRows = this.sql.exec(
581
+
`SELECT * FROM seq_events ORDER BY seq DESC LIMIT 1`
582
+
).toArray()
583
+
if (evtRows.length > 0) {
584
+
this.broadcastEvent(evtRows[0])
585
+
}
586
+
579
587
return { uri, cid: recordCidStr, commit: commitCidStr }
588
+
}
589
+
590
+
formatEvent(evt) {
591
+
// AT Protocol frame format: header + body
592
+
const header = cborEncode({ op: 1, t: '#commit' })
593
+
const body = cborEncode({
594
+
seq: evt.seq,
595
+
rebase: false,
596
+
tooBig: false,
597
+
repo: evt.did,
598
+
commit: cidToBytes(evt.commit_cid),
599
+
rev: createTid(),
600
+
since: null,
601
+
blocks: new Uint8Array(0), // Simplified - real impl includes CAR slice
602
+
ops: cborDecode(new Uint8Array(evt.evt)).ops,
603
+
blobs: [],
604
+
time: new Date().toISOString()
605
+
})
606
+
607
+
// Concatenate header + body
608
+
const frame = new Uint8Array(header.length + body.length)
609
+
frame.set(header)
610
+
frame.set(body, header.length)
611
+
return frame
612
+
}
613
+
614
+
async webSocketMessage(ws, message) {
615
+
// Handle ping
616
+
if (message === 'ping') ws.send('pong')
617
+
}
618
+
619
+
async webSocketClose(ws, code, reason) {
620
+
// Durable Object will hibernate when no connections remain
621
+
}
622
+
623
+
broadcastEvent(evt) {
624
+
const frame = this.formatEvent(evt)
625
+
for (const ws of this.state.getWebSockets()) {
626
+
try {
627
+
ws.send(frame)
628
+
} catch (e) {
629
+
// Client disconnected
630
+
}
631
+
}
580
632
}
581
633
582
634
async fetch(request) {
···
700
752
return new Response(car, {
701
753
headers: { 'content-type': 'application/vnd.ipld.car' }
702
754
})
755
+
}
756
+
if (url.pathname === '/xrpc/com.atproto.sync.subscribeRepos') {
757
+
const upgradeHeader = request.headers.get('Upgrade')
758
+
if (upgradeHeader !== 'websocket') {
759
+
return new Response('expected websocket', { status: 426 })
760
+
}
761
+
762
+
const { 0: client, 1: server } = new WebSocketPair()
763
+
this.state.acceptWebSocket(server)
764
+
765
+
// Send backlog if cursor provided
766
+
const cursor = url.searchParams.get('cursor')
767
+
if (cursor) {
768
+
const events = this.sql.exec(
769
+
`SELECT * FROM seq_events WHERE seq > ? ORDER BY seq`,
770
+
parseInt(cursor)
771
+
).toArray()
772
+
773
+
for (const evt of events) {
774
+
server.send(this.formatEvent(evt))
775
+
}
776
+
}
777
+
778
+
return new Response(null, { status: 101, webSocket: client })
703
779
}
704
780
return new Response('pds running', { status: 200 })
705
781
}