B2.3 LatestWinsGate fireAndWait:
Added fireAndWait(value, ctx?) returning per-fire settlement promise.
Supersede resolves with frozen sentinel {superseded: true} (no
rejection — callers branch on value without try/catch). Dispatch
errors also resolve (with undefined); error surfaces via gate.lastError.
LatestWinsGate.js 75 → 116 lines. 12/12 tests pass.
P11.1 convert.possibilities(measure):
New helper returning sorted+deduped unit names for a measure.
Cached per measure. Reuses existing convert measures map. Also
exposed convert.measures() listing all known measures.
convert/index.js +21 lines. New test file: 90 lines, 12/12 tests.
P11.2 commandRegistry.units field:
Pre-dispatch normalisation pipeline. descriptor.units = {measure,
default}; commandRegistry extracts msg.payload + msg.unit (3 shapes),
validates against measure, converts to default, falls back + warns
with accepted-list on unknown/wrong-measure. Falls back gracefully
if convert.possibilities is missing. commandRegistry.js 164 → 237.
+7 new tests covering all 4 paths.
monster schema fix (P11.2 sibling):
generalFunctions/src/configs/monster.json was stripping four
legitimate constraint keys (nominalFlowMin, flowMax, maxRainRef,
minSampleIntervalSec). Added them with defaults matching the
legacy nodeClass coercion. Side effect: this also UNBLOCKED the
monster cooldown-guard test (separate ROOT-CAUSE entry below).
CONTRACTS.md §4 + §8 updated. 144/144 basic tests + 206/206 full
generalFunctions tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
117 lines
4.4 KiB
JavaScript
117 lines
4.4 KiB
JavaScript
'use strict';
|
|
|
|
// Serialises an async dispatch so that high-frequency callers cannot stack
|
|
// up overlapping invocations. Intermediate values are dropped — only the
|
|
// most recent fire()/fireAndWait() during an in-flight dispatch is replayed
|
|
// afterwards. Extracted from machineGroupControl's _dispatchInFlight +
|
|
// _delayedCall pattern so MGC, pumpingStation, valveGroupControl etc. can
|
|
// share it.
|
|
//
|
|
// fire(value) — never blocks; returns void.
|
|
// fireAndWait(value) — returns a promise that settles when THIS value's
|
|
// dispatch runs to completion. If a later fireAndWait
|
|
// arrives during the in-flight call and supersedes
|
|
// this one in the pending slot, the returned promise
|
|
// RESOLVES with { superseded: true } instead of
|
|
// rejecting — callers can branch on a sentinel
|
|
// without try/catch. The dispatch's own return value
|
|
// (when not superseded) is forwarded as the resolution.
|
|
|
|
const SUPERSEDED = Object.freeze({ superseded: true });
|
|
|
|
class LatestWinsGate {
|
|
constructor(asyncDispatchFn, options = {}) {
|
|
if (typeof asyncDispatchFn !== 'function') {
|
|
throw new TypeError('LatestWinsGate requires an async dispatch function');
|
|
}
|
|
this._dispatch = asyncDispatchFn;
|
|
this._logger = options.logger || null;
|
|
this._inFlight = false;
|
|
this._pending = null; // { value, ctx, settle? } | null
|
|
this._drainResolvers = []; // resolved when idle again
|
|
this.lastError = null;
|
|
}
|
|
|
|
// 0 = idle, 1 = running with no pending, 2 = running with pending.
|
|
get size() {
|
|
if (!this._inFlight) return 0;
|
|
return this._pending ? 2 : 1;
|
|
}
|
|
|
|
// Never blocks. If a dispatch is in flight, the latest value is parked;
|
|
// older parked values are silently overwritten.
|
|
fire(value, ctx) {
|
|
if (this._inFlight) {
|
|
this._supersedePending();
|
|
this._pending = { value, ctx, settle: null };
|
|
return;
|
|
}
|
|
this._run(value, ctx, null);
|
|
}
|
|
|
|
// Returns a promise that resolves when THIS fire's dispatch settles.
|
|
// If this fire gets overwritten while parked, resolves with the
|
|
// SUPERSEDED sentinel ({ superseded: true }) — callers branch on
|
|
// result.superseded === true without try/catch.
|
|
fireAndWait(value, ctx) {
|
|
return new Promise((resolve) => {
|
|
const settle = resolve;
|
|
if (this._inFlight) {
|
|
this._supersedePending();
|
|
this._pending = { value, ctx, settle };
|
|
return;
|
|
}
|
|
this._run(value, ctx, settle);
|
|
});
|
|
}
|
|
|
|
drain() {
|
|
if (!this._inFlight && !this._pending) return Promise.resolve();
|
|
return new Promise((resolve) => { this._drainResolvers.push(resolve); });
|
|
}
|
|
|
|
_supersedePending() {
|
|
const prev = this._pending;
|
|
if (prev && typeof prev.settle === 'function') prev.settle(SUPERSEDED);
|
|
this._pending = null;
|
|
}
|
|
|
|
_run(value, ctx, settle) {
|
|
this._inFlight = true;
|
|
// Kick the dispatch on a microtask so fire()/fireAndWait() always
|
|
// return synchronously, even if _dispatch resolves immediately.
|
|
Promise.resolve()
|
|
.then(() => this._dispatch(value, ctx))
|
|
.then((result) => {
|
|
if (typeof settle === 'function') settle(result);
|
|
}, (err) => {
|
|
this.lastError = err;
|
|
if (this._logger && typeof this._logger.error === 'function') {
|
|
this._logger.error(err);
|
|
}
|
|
// Resolve (not reject) so fireAndWait callers don't need
|
|
// try/catch. Dispatch errors stay observable via lastError.
|
|
if (typeof settle === 'function') settle(undefined);
|
|
})
|
|
.then(() => this._afterDispatch());
|
|
}
|
|
|
|
_afterDispatch() {
|
|
this._inFlight = false;
|
|
if (this._pending) {
|
|
const { value, ctx, settle } = this._pending;
|
|
this._pending = null;
|
|
this._run(value, ctx, settle);
|
|
return;
|
|
}
|
|
// Idle — release any drain() waiters.
|
|
const waiters = this._drainResolvers;
|
|
this._drainResolvers = [];
|
|
for (const r of waiters) r();
|
|
}
|
|
}
|
|
|
|
LatestWinsGate.SUPERSEDED = SUPERSEDED;
|
|
|
|
module.exports = LatestWinsGate;
|