tangled
alpha
login
or
join now
graham.systems
/
cistern
3
fork
atom
Encrypted, ephemeral, private memos on atproto
3
fork
atom
overview
issues
pulls
pipelines
refactor(consumer): extract class from mod.ts
graham.systems
4 months ago
6e427190
ee990e7a
verified
This commit was signed with the committer's
known signature
.
graham.systems
SSH Key Fingerprint:
SHA256:Fvaam8TgCBeBlr/Fo7eA6VGAIAWmzjwUqUTw5o6anWA=
+221
-219
2 changed files
expand all
collapse all
unified
split
packages
consumer
client.ts
mod.ts
+219
packages/consumer/client.ts
···
1
1
+
import {
2
2
+
produceRequirements,
3
3
+
type XRPCProcedures,
4
4
+
type XRPCQueries,
5
5
+
} from "@cistern/shared";
6
6
+
import { decryptText, generateKeys } from "@cistern/crypto";
7
7
+
import { generateRandomName } from "@puregarlic/randimal";
8
8
+
import { is, parse, type RecordKey } from "@atcute/lexicons";
9
9
+
import { JetstreamSubscription } from "@atcute/jetstream";
10
10
+
import type { Did } from "@atcute/lexicons/syntax";
11
11
+
import type { Client } from "@atcute/client";
12
12
+
import { AppCisternMemo, type AppCisternPubkey } from "@cistern/lexicon";
13
13
+
import type {
14
14
+
ConsumerOptions,
15
15
+
ConsumerParams,
16
16
+
DecryptedMemo,
17
17
+
LocalKeyPair,
18
18
+
} from "./types.ts";
19
19
+
20
20
+
/**
21
21
+
* Client for generating keys and decoding Cistern memos.
22
22
+
*/
23
23
+
export class Consumer {
24
24
+
/** DID of the user this consumer acts on behalf of */
25
25
+
did: Did;
26
26
+
27
27
+
/** `@atcute/client` instance with credential manager */
28
28
+
rpc: Client<XRPCQueries, XRPCProcedures>;
29
29
+
30
30
+
/** Private key used for decrypting and the AT URI of its associated public key */
31
31
+
keypair?: LocalKeyPair;
32
32
+
33
33
+
constructor(params: ConsumerParams) {
34
34
+
this.did = params.miniDoc.did;
35
35
+
this.keypair = params.options.keypair
36
36
+
? {
37
37
+
privateKey: Uint8Array.fromBase64(params.options.keypair.privateKey),
38
38
+
publicKey: params.options.keypair.publicKey,
39
39
+
}
40
40
+
: undefined;
41
41
+
this.rpc = params.rpc;
42
42
+
}
43
43
+
44
44
+
/**
45
45
+
* Generates a key pair, uploading the public key to PDS and returning the pair.
46
46
+
*/
47
47
+
async generateKeyPair(): Promise<LocalKeyPair> {
48
48
+
if (this.keypair) {
49
49
+
throw new Error("client already has a key pair");
50
50
+
}
51
51
+
52
52
+
const keys = generateKeys();
53
53
+
const name = await generateRandomName();
54
54
+
55
55
+
const record: AppCisternPubkey.Main = {
56
56
+
$type: "app.cistern.pubkey",
57
57
+
name,
58
58
+
algorithm: "x_wing",
59
59
+
content: { $bytes: keys.publicKey.toBase64() },
60
60
+
createdAt: new Date().toISOString(),
61
61
+
};
62
62
+
const res = await this.rpc.post("com.atproto.repo.createRecord", {
63
63
+
input: {
64
64
+
collection: "app.cistern.pubkey",
65
65
+
repo: this.did,
66
66
+
record,
67
67
+
},
68
68
+
});
69
69
+
70
70
+
if (!res.ok) {
71
71
+
throw new Error(
72
72
+
`failed to save public key: ${res.status} ${res.data.error}`,
73
73
+
);
74
74
+
}
75
75
+
76
76
+
const keypair = {
77
77
+
privateKey: keys.secretKey,
78
78
+
publicKey: res.data.uri,
79
79
+
};
80
80
+
81
81
+
this.keypair = keypair;
82
82
+
83
83
+
return keypair;
84
84
+
}
85
85
+
86
86
+
/**
87
87
+
* Asynchronously iterate through memos in the user's PDS
88
88
+
*/
89
89
+
async *listMemos(): AsyncGenerator<
90
90
+
DecryptedMemo,
91
91
+
void,
92
92
+
undefined
93
93
+
> {
94
94
+
if (!this.keypair) {
95
95
+
throw new Error("no key pair set; generate a key before listing memos");
96
96
+
}
97
97
+
98
98
+
let cursor: string | undefined;
99
99
+
100
100
+
while (true) {
101
101
+
const res = await this.rpc.get("com.atproto.repo.listRecords", {
102
102
+
params: {
103
103
+
collection: "app.cistern.memo",
104
104
+
repo: this.did,
105
105
+
cursor,
106
106
+
},
107
107
+
});
108
108
+
109
109
+
if (!res.ok) {
110
110
+
throw new Error(
111
111
+
`failed to list memos: ${res.status} ${res.data.error}`,
112
112
+
);
113
113
+
}
114
114
+
115
115
+
cursor = res.data.cursor;
116
116
+
117
117
+
for (const record of res.data.records) {
118
118
+
const memo = parse(AppCisternMemo.mainSchema, record.value);
119
119
+
120
120
+
if (memo.pubkey !== this.keypair.publicKey) continue;
121
121
+
122
122
+
const decrypted = decryptText(this.keypair.privateKey, {
123
123
+
nonce: memo.nonce.$bytes,
124
124
+
cipherText: memo.ciphertext.$bytes,
125
125
+
content: memo.payload.$bytes,
126
126
+
hash: memo.contentHash.$bytes,
127
127
+
length: memo.contentLength,
128
128
+
});
129
129
+
130
130
+
yield {
131
131
+
tid: memo.tid,
132
132
+
text: decrypted,
133
133
+
};
134
134
+
}
135
135
+
136
136
+
if (!cursor) return;
137
137
+
}
138
138
+
}
139
139
+
140
140
+
/**
141
141
+
* Subscribes to the Jetstreams for the user's memos. Pass `"stop"` into `subscription.next(...)` to cancel
142
142
+
* @todo Allow specifying Jetstream endpoint
143
143
+
*/
144
144
+
async *subscribeToMemos(): AsyncGenerator<
145
145
+
DecryptedMemo,
146
146
+
void,
147
147
+
"stop" | undefined
148
148
+
> {
149
149
+
if (!this.keypair) {
150
150
+
throw new Error("no key pair set; generate a key before subscribing");
151
151
+
}
152
152
+
153
153
+
const subscription = new JetstreamSubscription({
154
154
+
url: "wss://jetstream2.us-east.bsky.network",
155
155
+
wantedCollections: ["app.cistern.memo"],
156
156
+
wantedDids: [this.did],
157
157
+
});
158
158
+
159
159
+
for await (const event of subscription) {
160
160
+
if (event.kind === "commit" && event.commit.operation === "create") {
161
161
+
const record = event.commit.record;
162
162
+
163
163
+
if (!is(AppCisternMemo.mainSchema, record)) {
164
164
+
continue;
165
165
+
}
166
166
+
167
167
+
if (record.pubkey !== this.keypair.publicKey) {
168
168
+
continue;
169
169
+
}
170
170
+
171
171
+
const decrypted = decryptText(this.keypair.privateKey, {
172
172
+
nonce: record.nonce.$bytes,
173
173
+
cipherText: record.ciphertext.$bytes,
174
174
+
content: record.payload.$bytes,
175
175
+
hash: record.contentHash.$bytes,
176
176
+
length: record.contentLength,
177
177
+
});
178
178
+
179
179
+
const command = yield { tid: record.tid, text: decrypted };
180
180
+
181
181
+
if (command === "stop") return;
182
182
+
}
183
183
+
}
184
184
+
}
185
185
+
186
186
+
/**
187
187
+
* Deletes a memo from the user's PDS by record key.
188
188
+
*/
189
189
+
async deleteMemo(key: RecordKey) {
190
190
+
const res = await this.rpc.post("com.atproto.repo.deleteRecord", {
191
191
+
input: {
192
192
+
collection: "app.cistern.memo",
193
193
+
repo: this.did,
194
194
+
rkey: key,
195
195
+
},
196
196
+
});
197
197
+
198
198
+
if (!res.ok) {
199
199
+
throw new Error(
200
200
+
`failed to delete memo ${key}: ${res.status} ${res.data.error}`,
201
201
+
);
202
202
+
}
203
203
+
}
204
204
+
}
205
205
+
206
206
+
/**
207
207
+
* Creates a `Consumer` instance with all necessary requirements. This is the recommended way to construct a `Consumer`.
208
208
+
*
209
209
+
* @description Resolves the user's DID using Slingshot, instantiates an `@atcute/client` instance, creates an initial session, and then returns a new Consumer.
210
210
+
* @param {ConsumerOptions} options - Information for constructing the underlying XRPC client
211
211
+
* @returns {Promise<Consumer>} A Cistern consumer client with an authorized session
212
212
+
*/
213
213
+
export async function createConsumer(
214
214
+
options: ConsumerOptions,
215
215
+
): Promise<Consumer> {
216
216
+
const reqs = await produceRequirements(options);
217
217
+
218
218
+
return new Consumer(reqs);
219
219
+
}
+2
-219
packages/consumer/mod.ts
···
1
1
-
import {
2
2
-
produceRequirements,
3
3
-
type XRPCProcedures,
4
4
-
type XRPCQueries,
5
5
-
} from "@cistern/shared";
6
6
-
import { decryptText, generateKeys } from "@cistern/crypto";
7
7
-
import { generateRandomName } from "@puregarlic/randimal";
8
8
-
import { is, parse, type RecordKey } from "@atcute/lexicons";
9
9
-
import { JetstreamSubscription } from "@atcute/jetstream";
10
10
-
import type { Did } from "@atcute/lexicons/syntax";
11
11
-
import type { Client, CredentialManager } from "@atcute/client";
12
12
-
import { AppCisternMemo, type AppCisternPubkey } from "@cistern/lexicon";
13
13
-
import type {
14
14
-
ConsumerOptions,
15
15
-
ConsumerParams,
16
16
-
DecryptedMemo,
17
17
-
LocalKeyPair,
18
18
-
} from "./types.ts";
19
19
-
20
20
-
/**
21
21
-
* Creates a `Consumer` instance with all necessary requirements. This is the recommended way to construct a `Consumer`.
22
22
-
*
23
23
-
* @description Resolves the user's DID using Slingshot, instantiates an `@atcute/client` instance, creates an initial session, and then returns a new Consumer.
24
24
-
* @param {ConsumerOptions} options - Information for constructing the underlying XRPC client
25
25
-
* @returns {Promise<Consumer>} A Cistern consumer client with an authorized session
26
26
-
*/
27
27
-
export async function createConsumer(
28
28
-
options: ConsumerOptions,
29
29
-
): Promise<Consumer> {
30
30
-
const reqs = await produceRequirements(options);
31
31
-
32
32
-
return new Consumer(reqs);
33
33
-
}
34
34
-
35
35
-
/**
36
36
-
* Client for generating keys and decoding Cistern memos.
37
37
-
*/
38
38
-
export class Consumer {
39
39
-
/** DID of the user this consumer acts on behalf of */
40
40
-
did: Did;
41
41
-
42
42
-
/** `@atcute/client` instance with credential manager */
43
43
-
rpc: Client<XRPCQueries, XRPCProcedures>;
44
44
-
45
45
-
/** Private key used for decrypting and the AT URI of its associated public key */
46
46
-
keypair?: LocalKeyPair;
47
47
-
48
48
-
constructor(params: ConsumerParams) {
49
49
-
this.did = params.miniDoc.did;
50
50
-
this.keypair = params.options.keypair
51
51
-
? {
52
52
-
privateKey: Uint8Array.fromBase64(params.options.keypair.privateKey),
53
53
-
publicKey: params.options.keypair.publicKey,
54
54
-
}
55
55
-
: undefined;
56
56
-
this.rpc = params.rpc;
57
57
-
}
58
58
-
59
59
-
/**
60
60
-
* Generates a key pair, uploading the public key to PDS and returning the pair.
61
61
-
*/
62
62
-
async generateKeyPair(): Promise<LocalKeyPair> {
63
63
-
if (this.keypair) {
64
64
-
throw new Error("client already has a key pair");
65
65
-
}
66
66
-
67
67
-
const keys = generateKeys();
68
68
-
const name = await generateRandomName();
69
69
-
70
70
-
const record: AppCisternPubkey.Main = {
71
71
-
$type: "app.cistern.pubkey",
72
72
-
name,
73
73
-
algorithm: "x_wing",
74
74
-
content: { $bytes: keys.publicKey.toBase64() },
75
75
-
createdAt: new Date().toISOString(),
76
76
-
};
77
77
-
const res = await this.rpc.post("com.atproto.repo.createRecord", {
78
78
-
input: {
79
79
-
collection: "app.cistern.pubkey",
80
80
-
repo: this.did,
81
81
-
record,
82
82
-
},
83
83
-
});
84
84
-
85
85
-
if (!res.ok) {
86
86
-
throw new Error(
87
87
-
`failed to save public key: ${res.status} ${res.data.error}`,
88
88
-
);
89
89
-
}
90
90
-
91
91
-
const keypair = {
92
92
-
privateKey: keys.secretKey,
93
93
-
publicKey: res.data.uri,
94
94
-
};
95
95
-
96
96
-
this.keypair = keypair;
97
97
-
98
98
-
return keypair;
99
99
-
}
100
100
-
101
101
-
/**
102
102
-
* Asynchronously iterate through memos in the user's PDS
103
103
-
*/
104
104
-
async *listMemos(): AsyncGenerator<
105
105
-
DecryptedMemo,
106
106
-
void,
107
107
-
undefined
108
108
-
> {
109
109
-
if (!this.keypair) {
110
110
-
throw new Error("no key pair set; generate a key before listing memos");
111
111
-
}
112
112
-
113
113
-
let cursor: string | undefined;
114
114
-
115
115
-
while (true) {
116
116
-
const res = await this.rpc.get("com.atproto.repo.listRecords", {
117
117
-
params: {
118
118
-
collection: "app.cistern.memo",
119
119
-
repo: this.did,
120
120
-
cursor,
121
121
-
},
122
122
-
});
123
123
-
124
124
-
if (!res.ok) {
125
125
-
throw new Error(
126
126
-
`failed to list memos: ${res.status} ${res.data.error}`,
127
127
-
);
128
128
-
}
129
129
-
130
130
-
cursor = res.data.cursor;
131
131
-
132
132
-
for (const record of res.data.records) {
133
133
-
const memo = parse(AppCisternMemo.mainSchema, record.value);
134
134
-
135
135
-
if (memo.pubkey !== this.keypair.publicKey) continue;
136
136
-
137
137
-
const decrypted = decryptText(this.keypair.privateKey, {
138
138
-
nonce: memo.nonce.$bytes,
139
139
-
cipherText: memo.ciphertext.$bytes,
140
140
-
content: memo.payload.$bytes,
141
141
-
hash: memo.contentHash.$bytes,
142
142
-
length: memo.contentLength,
143
143
-
});
144
144
-
145
145
-
yield {
146
146
-
tid: memo.tid,
147
147
-
text: decrypted,
148
148
-
};
149
149
-
}
150
150
-
151
151
-
if (!cursor) return;
152
152
-
}
153
153
-
}
154
154
-
155
155
-
/**
156
156
-
* Subscribes to the Jetstreams for the user's memos. Pass `"stop"` into `subscription.next(...)` to cancel
157
157
-
* @todo Allow specifying Jetstream endpoint
158
158
-
*/
159
159
-
async *subscribeToMemos(): AsyncGenerator<
160
160
-
DecryptedMemo,
161
161
-
void,
162
162
-
"stop" | undefined
163
163
-
> {
164
164
-
if (!this.keypair) {
165
165
-
throw new Error("no key pair set; generate a key before subscribing");
166
166
-
}
167
167
-
168
168
-
const subscription = new JetstreamSubscription({
169
169
-
url: "wss://jetstream2.us-east.bsky.network",
170
170
-
wantedCollections: ["app.cistern.memo"],
171
171
-
wantedDids: [this.did],
172
172
-
});
173
173
-
174
174
-
for await (const event of subscription) {
175
175
-
if (event.kind === "commit" && event.commit.operation === "create") {
176
176
-
const record = event.commit.record;
177
177
-
178
178
-
if (!is(AppCisternMemo.mainSchema, record)) {
179
179
-
continue;
180
180
-
}
181
181
-
182
182
-
if (record.pubkey !== this.keypair.publicKey) {
183
183
-
continue;
184
184
-
}
185
185
-
186
186
-
const decrypted = decryptText(this.keypair.privateKey, {
187
187
-
nonce: record.nonce.$bytes,
188
188
-
cipherText: record.ciphertext.$bytes,
189
189
-
content: record.payload.$bytes,
190
190
-
hash: record.contentHash.$bytes,
191
191
-
length: record.contentLength,
192
192
-
});
193
193
-
194
194
-
const command = yield { tid: record.tid, text: decrypted };
195
195
-
196
196
-
if (command === "stop") return;
197
197
-
}
198
198
-
}
199
199
-
}
200
200
-
201
201
-
/**
202
202
-
* Deletes a memo from the user's PDS by record key.
203
203
-
*/
204
204
-
async deleteMemo(key: RecordKey) {
205
205
-
const res = await this.rpc.post("com.atproto.repo.deleteRecord", {
206
206
-
input: {
207
207
-
collection: "app.cistern.memo",
208
208
-
repo: this.did,
209
209
-
rkey: key,
210
210
-
},
211
211
-
});
212
212
-
213
213
-
if (!res.ok) {
214
214
-
throw new Error(
215
215
-
`failed to delete memo ${key}: ${res.status} ${res.data.error}`,
216
216
-
);
217
217
-
}
218
218
-
}
219
219
-
}
1
1
+
export * from "./client.ts";
2
2
+
export * from "./types.ts";