···1+/** @module common/async */
2+3+import { Semaphore } from './semaphore.js'
4+5+/**
6+ * simple blocking atom, for waiting for a value.
7+ * cribbed mostly from {@link https://github.com/ComFreek/async-playground}
8+ *
9+ * @template T - the type we're holding
10+ */
11+export class BlockingAtom {
12+13+ /** @type {T | undefined} */
14+ #item
15+16+ /** @type {Semaphore} */
17+ #sema
18+19+ constructor() {
20+ this.#sema = new Semaphore()
21+ this.#item = undefined
22+ }
23+24+ /**
25+ * puts an item into the atom and unblocks an awaiter.
26+ *
27+ * @param {T} item the item to put into the atom
28+ */
29+ set(item) {
30+ this.#item = item
31+ this.#sema.free()
32+ }
33+34+ /**
35+ * tries to get the item from the atom, and blocks until available.
36+ *
37+ * @example
38+ * if (await atom.take())
39+ * console.log('got it!')
40+ *
41+ * @param {AbortSignal | undefined} signal - an abort signal to cancel the await
42+ * @returns {Promise<T | undefined>} a promise for the item, or undefined if something aborted.
43+ */
44+ async get(signal) {
45+ if (await this.#sema.take(signal)) {
46+ return this.#item
47+ }
48+49+ signal?.throwIfAborted()
50+ return undefined
51+ }
52+53+}
···1+/** @module common/async */
2+3+/**
4+ * A Breaker, which allows creating wrapped functions which will only be executed before
5+ * the breaker is tripped.
6+ *
7+ * @example
8+ * const breaker = makeBreaker()
9+ *
10+ * state.addEventHandler('finish', breaker.tripThen((e) => {
11+ * // this will only be allowed to run once
12+ * // the second time the event fired, the handler is a no-op
13+ * })
14+ *
15+ * state.addEventHandler('error', breaker.tripThen((e) => {
16+ * // all wrapped functions created by the same breaker share state
17+ * // so if the above fired, this can never be called
18+ * })
19+ *
20+ * state.addEventHandler('message', breaker.untilTripped((e) => {
21+ * // this will only be allowed to run many times
22+ * // but not *after* any of the _once_ wrappers has been called
23+ * })
24+ */
25+export class Breaker {
26+27+ /** @type {undefined | VoidCallback} */
28+ #onTripped
29+30+ /** @type {boolean} */
31+ #tripped
32+33+ /**
34+ * @param {VoidCallback} [onTripped]
35+ * an optional callback, called when the breaker is tripped, /before/ any wrapped functions.
36+ */
37+ constructor(onTripped) {
38+ this.#tripped = false
39+ this.#onTripped = onTripped
40+ }
41+42+ /** @returns {boolean} true if the breaker has already tripped */
43+ tripped() {
44+ return this.#tripped
45+ }
46+47+ /**
48+ * wrap the given callback in a function that will trip the breaker before it's called.
49+ * any subsequent calls to the wrapped function will be no-ops.
50+ *
51+ * @param {Callback} fn the function to be wrapped in the breaker
52+ * @returns {Callback} a wrapped function, controlled by the breaker
53+ */
54+ tripThen(fn) {
55+ return (...args) => {
56+ if (!this.#tripped) {
57+ this.#tripped = true
58+59+ // TODO: if these throw, what to do?
60+ this.#onTripped?.()
61+ fn(...args)
62+ }
63+ }
64+ }
65+66+ /**
67+ * wrap the given callback in a function that check the breaker before it's called.
68+ * once the breaker has been tripped, calls to the wrapped function will be no-ops.
69+ *
70+ * @param {Callback} fn the function to be wrapped in the breaker
71+ * @returns {Callback} a wrapped function, controlled by the breaker
72+ */
73+ untilTripped(fn) {
74+ return (...args) => {
75+ if (!this.#tripped) {
76+ // TODO: if these throw, what to do?
77+ fn(...args)
78+ }
79+ }
80+ }
81+82+}
···1+/** @module common/async */
2+3+/**
4+ * Simple counting semaphore, for blocking async ops.
5+ * cribbed mostly from {@link https://github.com/ComFreek/async-playground}
6+ */
7+export class Semaphore {
8+9+ /** @type { number } */
10+ #counter = 0
11+12+ /** @type {Array<function(boolean): void>} */
13+ #resolvers = []
14+15+ constructor(count = 0) {
16+ this.#counter = count
17+ }
18+19+ /**
20+ * try to take from the semaphore, reducing it's count
21+ * if the semaphore is empty, blocks until available, or the given signal aborts.
22+ *
23+ * @param {AbortSignal | undefined} signal a signal to use to abort the block
24+ * @returns {Promise<boolean>} true if the semaphore was successfully taken, false if aborted.
25+ */
26+ take(signal) {
27+ return new Promise((resolve) => {
28+ if (signal?.aborted) return resolve(false)
29+30+ // if there's resources available, use them
31+32+ this.#counter--
33+ if (this.#counter >= 0) return resolve(true)
34+35+ // otherwise add to pending
36+ // and explicitly remove the resolver from the list on abort
37+38+ this.#resolvers.push(resolve)
39+ signal?.addEventListener('abort', () => {
40+ const index = this.#resolvers.indexOf(resolve)
41+ if (index >= 0) {
42+ this.#resolvers.splice(index, 1)
43+ this.#counter++
44+ }
45+46+ resolve(false)
47+ })
48+ })
49+ }
50+51+ /**
52+ * try to take from the semaphore, reducing it's count, *without blocking*.
53+ *
54+ * @returns {boolean} true if the semaphore was taken, false otherwise.
55+ */
56+ poll() {
57+ if (this.#counter <= 0) return false
58+59+ this.#counter--
60+ return true
61+ }
62+63+ /** announce that the semaphore is free to be taken by another awaiter. */
64+ free() {
65+ this.#counter++
66+67+ if (this.#resolvers.length > 0) {
68+ const resolver = this.#resolvers.shift()
69+ resolver && queueMicrotask(() => resolver(true))
70+ }
71+ }
72+73+}