tangled
alpha
login
or
join now
bad-example.com
/
spacedust-utils
6
fork
atom
demos for spacedust
6
fork
atom
overview
issues
pulls
pipelines
share a spacedust websocket
bad-example.com
9 months ago
286dbc49
97b5a657
+122
-32
1 changed file
expand all
collapse all
unified
split
server
index.js
+122
-32
server/index.js
···
1
1
#!/usr/bin/env node
2
2
+
"use strict";
2
3
3
4
const webpush = require('web-push');
4
5
const fs = require('node:fs');
5
6
const http = require('http');
6
7
8
8
+
const DUMMY_DID = 'did:plc:zzzzzzzzzzzzzzzzzzzzzzzz';
9
9
+
10
10
+
let spacedust;
11
11
+
let spacedustEverStarted = false;
12
12
+
const subs = new Map();
13
13
+
14
14
+
const addSub = (did, sub) => {
15
15
+
if (!subs.has(did)) {
16
16
+
subs.set(did, []);
17
17
+
}
18
18
+
subs.get(did).push(sub);
19
19
+
updateSubs();
20
20
+
};
21
21
+
22
22
+
const updateSubs = () => {
23
23
+
if (!spacedust) {
24
24
+
console.warn('not updating subscription, no spacedust (reconnecting?)');
25
25
+
return;
26
26
+
}
27
27
+
const wantedSubjectDids = Array.from(subs.keys());
28
28
+
if (wantedSubjectDids.length === 0) {
29
29
+
wantedSubjectDids.push(DUMMY_DID);
30
30
+
}
31
31
+
console.log('updating for wantedSubjectDids', wantedSubjectDids);
32
32
+
spacedust.send(JSON.stringify({
33
33
+
type: 'options_update',
34
34
+
payload: {
35
35
+
wantedSubjectDids,
36
36
+
},
37
37
+
}));
38
38
+
};
39
39
+
40
40
+
const handleDust = async event => {
41
41
+
console.log('got', event.data);
42
42
+
let data;
43
43
+
try {
44
44
+
data = JSON.parse(event.data);
45
45
+
} catch (err) {
46
46
+
console.error(err);
47
47
+
return;
48
48
+
}
49
49
+
const { link: { subject, source, source_record } } = data;
50
50
+
51
51
+
let did;
52
52
+
if (subject.startsWith('did:')) did = subject;
53
53
+
else if (subject.startsWith('at://')) {
54
54
+
const [id, ..._] = subject.slice('at://'.length).split('/');
55
55
+
if (id.startsWith('did:')) did = id;
56
56
+
}
57
57
+
if (!did) {
58
58
+
console.warn(`ignoring link with non-DID subject: ${subject}`)
59
59
+
return;
60
60
+
}
61
61
+
62
62
+
const expiredSubs = [];
63
63
+
for (const sub of subs.get(did) ?? []) {
64
64
+
const title = `new ${source}`;
65
65
+
const body = `from ${source_record}`;
66
66
+
try {
67
67
+
await webpush.sendNotification(sub, JSON.stringify({ title, body }));
68
68
+
} catch (err) {
69
69
+
if (400 <= err.statusCode && err.statusCode < 500) {
70
70
+
expiredSubs.push(sub);
71
71
+
console.info(`removing sub for ${err.statusCode}`);
72
72
+
}
73
73
+
}
74
74
+
}
75
75
+
if (expiredSubs.length > 0) {
76
76
+
const activeSubs = subs.get(did)?.filter(s => !expiredSubs.includes(s));
77
77
+
if (!activeSubs) { // concurrently removed already
78
78
+
return;
79
79
+
}
80
80
+
if (activeSubs.length === 0) {
81
81
+
console.info('removed last subscriber for', did);
82
82
+
subs.delete(did);
83
83
+
updateSubs();
84
84
+
} else {
85
85
+
subs.set(did, activeSubs);
86
86
+
}
87
87
+
}
88
88
+
};
89
89
+
90
90
+
const connectSpacedust = host => {
91
91
+
spacedust = new WebSocket(`${host}/subscribe?instant=true&wantedSubjectDids=${DUMMY_DID}`);
92
92
+
let restarting = false;
93
93
+
94
94
+
const restart = () => {
95
95
+
if (restarting) return;
96
96
+
restarting = true;
97
97
+
let wait = Math.round(500 + (Math.random() * 1000));
98
98
+
console.info(`restarting spacedust connection in ${wait}ms...`);
99
99
+
setTimeout(() => connectSpacedust(host), wait);
100
100
+
spacedust = null;
101
101
+
}
102
102
+
103
103
+
spacedust.onopen = updateSubs
104
104
+
spacedust.onmessage = handleDust;
105
105
+
106
106
+
spacedust.onerror = e => {
107
107
+
console.error('spacedust errored:', e);
108
108
+
restart();
109
109
+
};
110
110
+
111
111
+
spacedust.onclose = () => {
112
112
+
console.log('spacedust closed');
113
113
+
restart();
114
114
+
};
115
115
+
}
116
116
+
117
117
+
const subscribeSpacedust = (did, sub) => {
118
118
+
if (!subs.has(did)) {
119
119
+
subs.set(did, []);
120
120
+
}
121
121
+
}
122
122
+
7
123
const getOrCreateKeys = filename => {
8
124
let keys;
9
125
try {
···
50
166
const handleSubscribe = async (req, res) => {
51
167
const body = await getRequesBody(req);
52
168
const { did, sub } = JSON.parse(body);
53
53
-
doStuff(did, sub);
169
169
+
addSub(did, sub);
54
170
res.setHeader('Content-Type', 'application/json');
55
171
res.writeHead(201);
56
172
res.end('{"oh": "hi"}');
57
173
}
58
174
59
59
-
const doStuff = (did, sub) => {
60
60
-
console.log('subscribing for', did);
61
61
-
const ws = new WebSocket(`wss://spacedust.microcosm.blue/subscribe?instant=true&wantedSubjectDids=${did}`);
62
62
-
63
63
-
ws.addEventListener('message', event => {
64
64
-
console.log('got', event.data);
65
65
-
let data;
66
66
-
try {
67
67
-
data = JSON.parse(event.data);
68
68
-
} catch (err) {
69
69
-
console.error(err);
70
70
-
return;
71
71
-
}
72
72
-
const { link: { source, source_record } } = data;
73
73
-
const title = `new ${source}`;
74
74
-
const body = `from ${source_record}`;
75
75
-
webpush.sendNotification(sub, JSON.stringify({ title, body }));
76
76
-
});
77
77
-
78
78
-
ws.addEventListener('error', err => {
79
79
-
console.log('uh oh', err);
80
80
-
});
81
81
-
82
82
-
ws.addEventListener('close', () => {
83
83
-
console.log('closed. bye!');
84
84
-
});
85
85
-
86
86
-
}
87
87
-
88
175
const requestListener = pubkey => (req, res) => {
89
176
if (req.method === 'GET' && req.url === '/')
90
177
return handleIndex(req, res, { PUBKEY: pubkey });
···
108
195
keys.privateKey,
109
196
);
110
197
111
111
-
const host = env.HOST || 'localhost';
112
112
-
const port = parseInt(env.PORT || 8000, 10);
198
198
+
const spacedustHost = env.SPACEDUST_HOST ?? 'wss://spacedust.microcosm.blue';
199
199
+
connectSpacedust(spacedustHost);
200
200
+
201
201
+
const host = env.HOST ?? 'localhost';
202
202
+
const port = parseInt(env.PORT ?? 8000, 10);
113
203
114
204
http
115
205
.createServer(requestListener(keys.publicKey))