Adds platform infrastructure used by the upcoming refactor of
nodeClass / specificClass across all 12 nodes:
- src/domain/UnitPolicy.js — extracted from rotatingMachine/MGC
- src/domain/ChildRouter.js — declarative event routing on top of childRegistrationUtils
- src/domain/LatestWinsGate.js — extracted from MGC dispatch gate
- src/domain/HealthStatus.js — standardised {level, flags, message, source}
- src/nodered/statusBadge.js — compose / error / idle / byState / text helpers
- src/stats/index.js — mean / stdDev / median / mad / lerp
All additive — no existing exports change shape.
56 unit tests pass under node:test.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
185 lines
7.5 KiB
JavaScript
185 lines
7.5 KiB
JavaScript
/**
|
|
* ChildRouter — declarative parent-side child registration & event routing.
|
|
*
|
|
* Replaces the per-node `registerChild` switch + manual
|
|
* `child.measurements.emitter.on(...)` wiring repeated in pumpingStation,
|
|
* rotatingMachine and machineGroupControl.
|
|
*
|
|
* See CONTRACTS.md §5. Built on top of `childRegistrationUtils`, which
|
|
* already canonicalises softwareType (e.g. rotatingmachine → machine).
|
|
*/
|
|
|
|
// Same alias map as childRegistrationUtils. Duplicated rather than imported
|
|
// because we need to canonicalise inputs to onRegister/onMeasurement/onPrediction
|
|
// at *declaration* time (before any child has registered), so that a domain
|
|
// can write `onRegister('rotatingmachine', ...)` or `onRegister('machine', ...)`
|
|
// interchangeably and have the dispatch match.
|
|
const SOFTWARE_TYPE_ALIASES = {
|
|
rotatingmachine: 'machine',
|
|
machinegroupcontrol: 'machinegroup',
|
|
};
|
|
|
|
function canonicalType(rawType) {
|
|
const t = String(rawType || '').toLowerCase();
|
|
return SOFTWARE_TYPE_ALIASES[t] || t;
|
|
}
|
|
|
|
class ChildRouter {
|
|
constructor(domain) {
|
|
this.domain = domain;
|
|
this.logger = domain?.logger || null;
|
|
|
|
// Subscription tables, keyed by canonical softwareType.
|
|
this._registerSubs = new Map(); // softwareType -> Array<fn>
|
|
this._measurementSubs = new Map(); // softwareType -> Array<{filter, fn}>
|
|
this._predictionSubs = new Map(); // softwareType -> Array<{filter, fn}>
|
|
|
|
// Track every emitter listener we attach so tearDown can remove them.
|
|
this._attached = [];
|
|
}
|
|
|
|
// ── declaration API ────────────────────────────────────────────────
|
|
|
|
onRegister(softwareType, fn) {
|
|
if (typeof fn !== 'function') {
|
|
throw new TypeError('ChildRouter.onRegister: fn must be a function');
|
|
}
|
|
const key = canonicalType(softwareType);
|
|
if (!this._registerSubs.has(key)) this._registerSubs.set(key, []);
|
|
this._registerSubs.get(key).push(fn);
|
|
return this;
|
|
}
|
|
|
|
onMeasurement(softwareType, filter, fn) {
|
|
return this._addEventSub(this._measurementSubs, softwareType, filter, fn, 'onMeasurement');
|
|
}
|
|
|
|
onPrediction(softwareType, filter, fn) {
|
|
return this._addEventSub(this._predictionSubs, softwareType, filter, fn, 'onPrediction');
|
|
}
|
|
|
|
_addEventSub(table, softwareType, filter, fn, label) {
|
|
if (typeof filter === 'function' && fn === undefined) {
|
|
// Allow `onMeasurement(type, fn)` shorthand — no filter.
|
|
fn = filter;
|
|
filter = {};
|
|
}
|
|
if (typeof fn !== 'function') {
|
|
throw new TypeError(`ChildRouter.${label}: fn must be a function`);
|
|
}
|
|
const key = canonicalType(softwareType);
|
|
if (!table.has(key)) table.set(key, []);
|
|
table.get(key).push({ filter: filter || {}, fn });
|
|
return this;
|
|
}
|
|
|
|
// ── dispatch ──────────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Called by the domain's registerChild(). Runs onRegister handlers, then
|
|
* attaches measurement/prediction listeners on the child's emitter.
|
|
*/
|
|
dispatchRegister(child, softwareType) {
|
|
const key = canonicalType(softwareType);
|
|
|
|
const regHandlers = this._registerSubs.get(key) || [];
|
|
for (const fn of regHandlers) {
|
|
try { fn.call(this.domain, child, key); }
|
|
catch (err) { this._logHandlerError('onRegister', key, err); }
|
|
}
|
|
|
|
const emitter = child?.measurements?.emitter;
|
|
if (!emitter || typeof emitter.on !== 'function') return;
|
|
|
|
this._attachVariantListeners(child, key, emitter, 'measured', this._measurementSubs);
|
|
this._attachVariantListeners(child, key, emitter, 'predicted', this._predictionSubs);
|
|
}
|
|
|
|
_attachVariantListeners(child, key, emitter, variant, table) {
|
|
const subs = table.get(key) || [];
|
|
for (const { filter, fn } of subs) {
|
|
// Build the set of (type, position) tuples this sub matches. If a filter
|
|
// omits one or both of {type, position}, we can't pre-enumerate the event
|
|
// names — fall back to a wildcard listener via `emit`-time matching.
|
|
if (filter.type && filter.position) {
|
|
const eventName = `${filter.type}.${variant}.${String(filter.position).toLowerCase()}`;
|
|
this._attach(emitter, eventName, (data) => this._invoke(fn, data, child, variant));
|
|
continue;
|
|
}
|
|
|
|
// Wildcard: subscribe to a generic catch-all by patching emitter.emit.
|
|
// EventEmitter has no built-in wildcard — install a one-off proxy listener
|
|
// that intercepts every emit on this emitter and filters by name.
|
|
const proxyKey = `__childRouter_proxy_${variant}__`;
|
|
if (!emitter[proxyKey]) {
|
|
const origEmit = emitter.emit.bind(emitter);
|
|
const proxies = [];
|
|
emitter[proxyKey] = proxies;
|
|
emitter.emit = (eventName, ...args) => {
|
|
const parts = String(eventName).split('.');
|
|
if (parts.length === 3 && parts[1] === variant) {
|
|
for (const p of proxies) p({ type: parts[0], position: parts[2], args });
|
|
}
|
|
return origEmit(eventName, ...args);
|
|
};
|
|
// Track the proxy install for tearDown to undo.
|
|
this._attached.push({ emitter, kind: 'proxy', variant, original: origEmit, proxyKey });
|
|
}
|
|
const proxyFn = ({ type, position, args }) => {
|
|
if (filter.type && type !== filter.type) return;
|
|
if (filter.position && position !== String(filter.position).toLowerCase()) return;
|
|
this._invoke(fn, args[0], child, variant);
|
|
};
|
|
emitter[proxyKey].push(proxyFn);
|
|
this._attached.push({ emitter, kind: 'proxyEntry', proxyKey, proxyFn });
|
|
}
|
|
}
|
|
|
|
_attach(emitter, eventName, listener) {
|
|
emitter.on(eventName, listener);
|
|
this._attached.push({ emitter, kind: 'listener', eventName, listener });
|
|
}
|
|
|
|
_invoke(fn, eventData, child, variant) {
|
|
try { fn.call(this.domain, eventData, child); }
|
|
catch (err) { this._logHandlerError(`on${variant === 'measured' ? 'Measurement' : 'Prediction'}`, '', err); }
|
|
}
|
|
|
|
_logHandlerError(kind, key, err) {
|
|
if (this.logger?.warn) {
|
|
this.logger.warn(`ChildRouter ${kind}${key ? `[${key}]` : ''} handler threw: ${err?.message || err}`);
|
|
}
|
|
}
|
|
|
|
// ── teardown ──────────────────────────────────────────────────────
|
|
|
|
tearDown() {
|
|
// Two passes: drop concrete listeners + proxy entries first, then unwrap
|
|
// any proxies whose entry list is now empty. Order matters — restoring
|
|
// emit before clearing entries would leave dangling proxy state.
|
|
for (const rec of this._attached) {
|
|
if (rec.kind === 'listener') {
|
|
if (typeof rec.emitter.off === 'function') rec.emitter.off(rec.eventName, rec.listener);
|
|
else if (typeof rec.emitter.removeListener === 'function') rec.emitter.removeListener(rec.eventName, rec.listener);
|
|
} else if (rec.kind === 'proxyEntry') {
|
|
const proxies = rec.emitter[rec.proxyKey];
|
|
if (Array.isArray(proxies)) {
|
|
const idx = proxies.indexOf(rec.proxyFn);
|
|
if (idx >= 0) proxies.splice(idx, 1);
|
|
}
|
|
}
|
|
}
|
|
for (const rec of this._attached) {
|
|
if (rec.kind !== 'proxy') continue;
|
|
const proxies = rec.emitter[rec.proxyKey];
|
|
if (!Array.isArray(proxies) || proxies.length === 0) {
|
|
rec.emitter.emit = rec.original;
|
|
delete rec.emitter[rec.proxyKey];
|
|
}
|
|
}
|
|
this._attached = [];
|
|
}
|
|
}
|
|
|
|
module.exports = ChildRouter;
|