Adds platform infrastructure used by the upcoming refactor of
nodeClass / specificClass across all 12 nodes:
- src/domain/UnitPolicy.js — extracted from rotatingMachine/MGC
- src/domain/ChildRouter.js — declarative event routing on top of childRegistrationUtils
- src/domain/LatestWinsGate.js — extracted from MGC dispatch gate
- src/domain/HealthStatus.js — standardised {level, flags, message, source}
- src/nodered/statusBadge.js — compose / error / idle / byState / text helpers
- src/stats/index.js — mean / stdDev / median / mad / lerp
All additive — no existing exports change shape.
56 unit tests pass under node:test.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
153 lines
4.3 KiB
JavaScript
153 lines
4.3 KiB
JavaScript
'use strict';
|
|
|
|
const { test } = require('node:test');
|
|
const assert = require('node:assert/strict');
|
|
|
|
const LatestWinsGate = require('../../src/domain/LatestWinsGate');
|
|
|
|
// Helper: a deferred promise so a test can pause a dispatch and inspect
|
|
// gate state before resolving. Avoids real timers entirely.
|
|
function deferred() {
|
|
let resolve;
|
|
let reject;
|
|
const promise = new Promise((res, rej) => { resolve = res; reject = rej; });
|
|
return { promise, resolve, reject };
|
|
}
|
|
|
|
test('single fire calls dispatch with the value', async () => {
|
|
const calls = [];
|
|
const gate = new LatestWinsGate(async (v) => { calls.push(v); });
|
|
gate.fire('a');
|
|
await gate.drain();
|
|
assert.deepEqual(calls, ['a']);
|
|
});
|
|
|
|
test('two fires while in-flight: second value runs after first settles', async () => {
|
|
const calls = [];
|
|
const gates = [deferred(), deferred()];
|
|
const started = [deferred(), deferred()];
|
|
let n = 0;
|
|
const gate = new LatestWinsGate(async (v) => {
|
|
const slot = n++;
|
|
calls.push(v);
|
|
started[slot].resolve();
|
|
await gates[slot].promise;
|
|
});
|
|
|
|
gate.fire('first');
|
|
gate.fire('second'); // parks while 'first' is in flight
|
|
await started[0].promise;
|
|
assert.deepEqual(calls, ['first']);
|
|
assert.equal(gate.size, 2);
|
|
|
|
gates[0].resolve();
|
|
await started[1].promise;
|
|
assert.deepEqual(calls, ['first', 'second']);
|
|
|
|
gates[1].resolve();
|
|
await gate.drain();
|
|
});
|
|
|
|
test('three fires back-to-back: only the last runs after the first settles', async () => {
|
|
const calls = [];
|
|
const first = deferred();
|
|
const firstStarted = deferred();
|
|
let count = 0;
|
|
const gate = new LatestWinsGate(async (v) => {
|
|
calls.push(v);
|
|
if (count++ === 0) {
|
|
firstStarted.resolve();
|
|
await first.promise;
|
|
}
|
|
});
|
|
|
|
gate.fire(1);
|
|
gate.fire(2); // parked
|
|
gate.fire(3); // overwrites 2
|
|
|
|
await firstStarted.promise;
|
|
assert.deepEqual(calls, [1]);
|
|
first.resolve();
|
|
await gate.drain();
|
|
assert.deepEqual(calls, [1, 3]);
|
|
});
|
|
|
|
test('drain() resolves only after all queued work has run', async () => {
|
|
const calls = [];
|
|
const d = deferred();
|
|
let started = 0;
|
|
const gate = new LatestWinsGate(async (v) => {
|
|
calls.push(v);
|
|
if (started++ === 0) await d.promise;
|
|
});
|
|
|
|
gate.fire('x');
|
|
gate.fire('y');
|
|
|
|
let drained = false;
|
|
const p = gate.drain().then(() => { drained = true; });
|
|
|
|
// While first is paused, drain must not have resolved yet.
|
|
await Promise.resolve();
|
|
await Promise.resolve();
|
|
assert.equal(drained, false);
|
|
|
|
d.resolve();
|
|
await p;
|
|
assert.deepEqual(calls, ['x', 'y']);
|
|
assert.equal(drained, true);
|
|
});
|
|
|
|
test('error in dispatch does not prevent subsequent fire from working', async () => {
|
|
const calls = [];
|
|
let throwNext = true;
|
|
const errors = [];
|
|
const logger = { error: (e) => errors.push(e) };
|
|
const gate = new LatestWinsGate(async (v) => {
|
|
calls.push(v);
|
|
if (throwNext) {
|
|
throwNext = false;
|
|
throw new Error('boom');
|
|
}
|
|
}, { logger });
|
|
|
|
gate.fire('a');
|
|
await gate.drain();
|
|
assert.equal(calls.length, 1);
|
|
assert.equal(errors.length, 1);
|
|
assert.match(errors[0].message, /boom/);
|
|
assert.ok(gate.lastError instanceof Error);
|
|
|
|
// Gate must still accept further work.
|
|
gate.fire('b');
|
|
await gate.drain();
|
|
assert.deepEqual(calls, ['a', 'b']);
|
|
});
|
|
|
|
test('error is recorded on lastError when no logger is supplied', async () => {
|
|
const gate = new LatestWinsGate(async () => { throw new Error('silent'); });
|
|
gate.fire('only');
|
|
await gate.drain();
|
|
assert.ok(gate.lastError instanceof Error);
|
|
assert.match(gate.lastError.message, /silent/);
|
|
});
|
|
|
|
test('size reports 0 / 1 / 2 across the lifecycle', async () => {
|
|
const d1 = deferred();
|
|
const gate = new LatestWinsGate(async () => { await d1.promise; });
|
|
|
|
assert.equal(gate.size, 0);
|
|
|
|
gate.fire('one');
|
|
// fire is sync, but _dispatch starts on a microtask. Either way the
|
|
// gate is marked in-flight synchronously.
|
|
assert.equal(gate.size, 1);
|
|
|
|
gate.fire('two'); // parked
|
|
assert.equal(gate.size, 2);
|
|
|
|
d1.resolve();
|
|
await gate.drain();
|
|
assert.equal(gate.size, 0);
|
|
});
|