Files
generalFunctions/src/nodered/commandRegistry.js
znetsixe 5ea968eabc B2.3 + P11.1 + P11.2 + monster schema fix
B2.3 LatestWinsGate fireAndWait:
  Added fireAndWait(value, ctx?) returning per-fire settlement promise.
  Supersede resolves with frozen sentinel {superseded: true} (no
  rejection — callers branch on value without try/catch). Dispatch
  errors also resolve (with undefined); error surfaces via gate.lastError.
  LatestWinsGate.js 75 → 116 lines. 12/12 tests pass.

P11.1 convert.possibilities(measure):
  New helper returning sorted+deduped unit names for a measure.
  Cached per measure. Reuses existing convert measures map. Also
  exposed convert.measures() listing all known measures.
  convert/index.js +21 lines. New test file: 90 lines, 12/12 tests.

P11.2 commandRegistry.units field:
  Pre-dispatch normalisation pipeline. descriptor.units = {measure,
  default}; commandRegistry extracts msg.payload + msg.unit (3 shapes),
  validates against measure, converts to default, falls back + warns
  with accepted-list on unknown/wrong-measure. Falls back gracefully
  if convert.possibilities is missing. commandRegistry.js 164 → 237.
  +7 new tests covering all 4 paths.

monster schema fix (P11.2 sibling):
  generalFunctions/src/configs/monster.json was stripping four
  legitimate constraint keys (nominalFlowMin, flowMax, maxRainRef,
  minSampleIntervalSec). Added them with defaults matching the
  legacy nodeClass coercion. Side effect: this also UNBLOCKED the
  monster cooldown-guard test (separate ROOT-CAUSE entry below).

CONTRACTS.md §4 + §8 updated. 144/144 basic tests + 206/206 full
generalFunctions tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 17:29:14 +02:00

238 lines
9.3 KiB
JavaScript

'use strict';
// Declarative dispatch for a node's input topics. Each node declares its
// commands as an array of descriptors; the registry builds an O(1) lookup
// keyed by canonical topic + alias, validates the payload against a small
// shape schema, and invokes the handler. Replaces the per-node ~100-line
// `switch (msg.topic)` block in nodeClass._attachInputHandler.
//
// Lightweight on purpose: the schema is a typeof-check ladder, not full
// JSON-Schema. Anything richer belongs in the handler itself, which has
// access to logger via ctx.
const convert = require('../convert');
const SCALAR_TYPES = new Set(['string', 'number', 'boolean', 'object', 'any', 'none']);
function _acceptedList(measure) {
if (convert && typeof convert.possibilities === 'function') {
const list = convert.possibilities(measure);
if (Array.isArray(list) && list.length) return list.join(', ');
}
return '(see convert docs)';
}
function _describeUnit(unit) {
try { return convert().describe(unit); } catch (_) { return null; }
}
function _extractValueAndUnit(msg) {
if (!msg || typeof msg !== 'object') return null;
const p = msg.payload;
if (typeof p === 'number') return { value: p, unit: msg.unit };
if (p && typeof p === 'object' && typeof p.value === 'number') {
return { value: p.value, unit: p.unit ?? msg.unit };
}
return null;
}
class CommandRegistry {
constructor(commands, options = {}) {
if (!Array.isArray(commands)) {
throw new TypeError('CommandRegistry requires an array of command descriptors');
}
this._logger = options.logger || null;
this._byKey = new Map(); // topic-or-alias -> descriptor
this._canonicalByAlias = new Map();
this._descriptors = [];
this._deprecationCounts = new Map();
this._deprecationLogged = new Set();
for (const cmd of commands) this._register(cmd);
}
_register(cmd) {
if (!cmd || typeof cmd.topic !== 'string' || cmd.topic.length === 0) {
throw new TypeError('command descriptor requires a non-empty string topic');
}
if (typeof cmd.handler !== 'function') {
throw new TypeError(`command '${cmd.topic}' requires a handler function`);
}
if (this._byKey.has(cmd.topic)) {
throw new Error(`duplicate command topic '${cmd.topic}'`);
}
const aliases = Array.isArray(cmd.aliases) ? cmd.aliases.slice() : [];
for (const alias of aliases) {
if (typeof alias !== 'string' || alias.length === 0) {
throw new TypeError(`command '${cmd.topic}' has an invalid alias`);
}
if (this._byKey.has(alias)) {
throw new Error(`alias '${alias}' for '${cmd.topic}' collides with existing topic or alias`);
}
}
const units = this._validateUnits(cmd);
const descriptor = {
topic: cmd.topic,
aliases,
payloadSchema: cmd.payloadSchema || null,
description: typeof cmd.description === 'string' ? cmd.description : null,
units,
handler: cmd.handler,
};
this._byKey.set(cmd.topic, descriptor);
for (const alias of aliases) {
this._byKey.set(alias, descriptor);
this._canonicalByAlias.set(alias, cmd.topic);
}
this._descriptors.push(descriptor);
}
_validateUnits(cmd) {
if (cmd.units === undefined || cmd.units === null) return null;
const { measure, default: def } = cmd.units;
if (typeof measure !== 'string' || measure.length === 0 ||
typeof def !== 'string' || def.length === 0) {
throw new TypeError(
`command '${cmd.topic}' units requires { measure: string, default: string }`);
}
return { measure, default: def };
}
has(topic) {
return typeof topic === 'string' && this._byKey.has(topic);
}
canonical(topic) {
if (typeof topic !== 'string') return topic;
return this._canonicalByAlias.get(topic) || topic;
}
list() {
// Strip handler so callers can safely log / serialise the result
// (handler functions are noisy and not contract-relevant).
return this._descriptors.map((d) => ({
topic: d.topic,
aliases: d.aliases.slice(),
payloadSchema: d.payloadSchema,
description: d.description,
units: d.units ? { measure: d.units.measure, default: d.units.default } : null,
}));
}
deprecationStats() {
const out = {};
for (const [alias, count] of this._deprecationCounts) out[alias] = count;
return out;
}
async dispatch(msg, source, ctx) {
const log = this._loggerFor(ctx);
const topic = msg && typeof msg.topic === 'string' ? msg.topic : null;
if (!topic) {
log.warn?.('commandRegistry: msg has no topic; ignoring');
return;
}
const descriptor = this._byKey.get(topic);
if (!descriptor) {
log.warn?.(`commandRegistry: unknown topic '${topic}'`);
return;
}
if (topic !== descriptor.topic) this._noteAlias(topic, descriptor.topic, log);
if (descriptor.units) this._normaliseUnits(descriptor, msg, log);
if (!this._validatePayload(descriptor, msg, log)) return;
return descriptor.handler(source, msg, ctx);
}
_noteAlias(alias, canonical, log) {
const prev = this._deprecationCounts.get(alias) || 0;
this._deprecationCounts.set(alias, prev + 1);
if (this._deprecationLogged.has(alias)) return;
this._deprecationLogged.add(alias);
log.warn?.(`topic '${alias}' is deprecated; use '${canonical}'`);
}
_normaliseUnits(descriptor, msg, log) {
const { measure, default: defaultUnit } = descriptor.units;
const extracted = _extractValueAndUnit(msg);
if (!extracted) return; // unknown shape — let payload validator handle it
let { value, unit } = extracted;
if (unit === undefined || unit === null || unit === '') {
// No unit supplied — assume default, silent.
msg.payload = value;
msg.unit = defaultUnit;
return;
}
const desc = _describeUnit(unit);
if (!desc) {
log.warn?.(`${descriptor.topic}: unknown unit '${unit}'. Accepted: ${_acceptedList(measure)}. Treating ${value} as ${defaultUnit}.`);
msg.payload = value;
msg.unit = defaultUnit;
return;
}
if (desc.measure !== measure) {
log.warn?.(`${descriptor.topic}: unit '${unit}' is ${desc.measure}, expected ${measure}. Accepted: ${_acceptedList(measure)}. Treating ${value} as ${defaultUnit}.`);
msg.payload = value;
msg.unit = defaultUnit;
return;
}
try {
msg.payload = convert(value).from(unit).to(defaultUnit);
msg.unit = defaultUnit;
} catch (err) {
log.warn?.(`${descriptor.topic}: failed to convert ${value} ${unit} -> ${defaultUnit} (${err.message}). Treating as ${defaultUnit}.`);
msg.payload = value;
msg.unit = defaultUnit;
}
}
_validatePayload(descriptor, msg, log) {
const schema = descriptor.payloadSchema;
if (!schema) return true;
const payload = msg.payload;
const type = schema.type || 'any';
if (!SCALAR_TYPES.has(type)) {
log.warn?.(`commandRegistry: command '${descriptor.topic}' has unknown schema type '${type}'`);
return true;
}
if (type === 'any') return true;
if (type === 'none') {
if (payload !== undefined && payload !== null) {
log.warn?.(`${descriptor.topic}: payload ignored — this is a trigger-only topic`);
}
return true;
}
// typeof null === 'object' — explicit null fails an object schema.
if (type === 'object') {
if (payload === null || typeof payload !== 'object') {
log.warn?.(`commandRegistry: '${descriptor.topic}' expected object payload, got ${payload === null ? 'null' : typeof payload}`);
return false;
}
} else if (typeof payload !== type) {
log.warn?.(`commandRegistry: '${descriptor.topic}' expected ${type} payload, got ${typeof payload}`);
return false;
}
if (type === 'object' && schema.properties && typeof schema.properties === 'object') {
for (const [key, expected] of Object.entries(schema.properties)) {
if (!(key in payload)) continue; // missing keys allowed
if (typeof payload[key] !== expected) {
log.warn?.(`commandRegistry: '${descriptor.topic}' payload.${key} expected ${expected}, got ${typeof payload[key]}`);
return false;
}
}
}
return true;
}
_loggerFor(ctx) {
const candidate = (ctx && ctx.logger) || this._logger;
return candidate || NOOP_LOGGER;
}
}
const NOOP_LOGGER = { warn() {}, error() {}, info() {}, debug() {} };
function createRegistry(commands, options) {
return new CommandRegistry(commands, options);
}
module.exports = { createRegistry, CommandRegistry };