Files
generalFunctions/src/nodered/BaseNodeAdapter.js
znetsixe c0be50d02c feat(output): alwaysEmit fields, drop undefined/empty Influx tags, time-based movement re-basing
- OutputUtils: new `alwaysEmit` option exempts named fields from delta
  compression so steady-state values (e.g. ctrl) trace continuously.
- flattenTags now drops null/undefined/empty-string tag values, fixing
  literal `category="undefined"` tags that split every Grafana series in two.
- BaseNodeAdapter wires `static alwaysEmitFields` from the subclass.
- movementManager: track position by elapsed wall-time and capture partial
  progress on abort, so a fast-re-commanding parent can't freeze an actuator
  at its start position.
- Tests for the above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 16:09:14 +02:00

214 lines
8.3 KiB
JavaScript

/**
* BaseNodeAdapter — shared nodeClass scaffolding.
*
* Consolidates the boilerplate every node's nodeClass.js repeats today
* (config build → domain instantiate → registration delay → tick loop →
* status loop → input dispatch → close handler). Subclasses declare what
* varies (DomainClass, commands, output strategy) via static fields and
* override `buildDomainConfig(uiConfig, nodeId)` to produce the per-node
* config slice.
*
* See CONTRACTS.md §2; OPEN_QUESTIONS.md (event-driven default + tick
* fire-and-forget resolution, 2026-05-10).
*/
'use strict';
const ConfigManager = require('../configs/index.js');
const OutputUtils = require('../helper/outputUtils.js');
const { createRegistry } = require('./commandRegistry.js');
const { StatusUpdater } = require('./statusUpdater.js');
const convert = require('../convert');
const REGISTRATION_DELAY_MS = 100;
function _buildImplicitUnitsCommand(getCommands, getNodeName) {
return {
topic: 'query.units',
payloadSchema: { type: 'any' },
description: 'Returns the unit spec (measure, default, accepted) for every topic that declares units.',
handler: (source, msg, ctx) => {
const units = {};
for (const d of getCommands()) {
if (!d.units) continue;
const accepted = (convert && typeof convert.possibilities === 'function')
? convert.possibilities(d.units.measure) : [];
units[d.topic] = {
measure: d.units.measure,
default: d.units.default,
accepted,
};
}
const reply = Object.assign({}, msg, {
topic: 'query.units',
payload: { node: getNodeName(), units },
});
if (ctx && typeof ctx.send === 'function') ctx.send([reply, null, null]);
},
};
}
class BaseNodeAdapter {
constructor(uiConfig, RED, nodeInstance, nameOfNode) {
const ctor = this.constructor;
if (ctor === BaseNodeAdapter) {
throw new Error('BaseNodeAdapter is abstract; subclass it and declare static DomainClass + commands');
}
if (typeof ctor.DomainClass !== 'function') {
throw new Error(`${ctor.name}: static DomainClass is required (a class to instantiate)`);
}
if (!Array.isArray(ctor.commands)) {
throw new Error(`${ctor.name}: static commands is required (array of descriptors; use [] for none)`);
}
if (typeof this.buildDomainConfig !== 'function') {
throw new Error(`${ctor.name}: must implement buildDomainConfig(uiConfig, nodeId)`);
}
this.node = nodeInstance;
this.RED = RED;
this.name = nameOfNode;
const cfgMgr = new ConfigManager();
this.defaultConfig = cfgMgr.getConfig(this.name);
this.config = cfgMgr.buildConfig(
this.name,
uiConfig,
this.node.id,
this.buildDomainConfig(uiConfig, this.node.id) || {},
);
this.source = new ctor.DomainClass(this.config);
// Sibling-node lookup uses RED.nodes.getNode(id).source — see existing
// pumpingStation/measurement nodeClass _attachInputHandler patterns.
this.node.source = this.source;
// `static alwaysEmitFields = ['ctrl', …]` on a subclass exempts those
// fields from delta compression so they trace continuously downstream.
this._output = new OutputUtils({ alwaysEmit: ctor.alwaysEmitFields });
const userHasUnitsQuery = ctor.commands.some(
(c) => c && (c.topic === 'query.units' || (Array.isArray(c.aliases) && c.aliases.includes('query.units'))));
const mergedCommands = userHasUnitsQuery
? ctor.commands
: ctor.commands.concat([_buildImplicitUnitsCommand(
() => this._commands.list(),
() => this.name,
)]);
this._commands = createRegistry(mergedCommands, { logger: this.source?.logger });
this._tickInterval = null;
this._outputChangedListener = null;
this._scheduleRegistration();
this._wireOutputs();
this._statusUpdater = new StatusUpdater({
node: this.node,
source: this.source,
intervalMs: ctor.statusInterval ?? 1000,
logger: this.source?.logger,
});
this._statusUpdater.start();
this._attachInputHandler();
this._attachCloseHandler();
if (typeof this.extraSetup === 'function') this.extraSetup();
}
_scheduleRegistration() {
// Delayed so siblings have finished constructing before the parent
// receives the registration message.
setTimeout(() => {
this.node.send([
null,
null,
{
topic: 'child.register',
payload: this.node.id,
positionVsParent: this.config?.functionality?.positionVsParent ?? 'atEquipment',
distance: this.config?.functionality?.distance ?? null,
},
]);
}, REGISTRATION_DELAY_MS);
}
_wireOutputs() {
const ctor = this.constructor;
const interval = ctor.tickInterval;
if (typeof interval === 'number' && interval > 0) {
this._tickInterval = setInterval(() => {
// Fire-and-forget per OPEN_QUESTIONS 2026-05-10. Domain owns
// its own serialisation via LatestWinsGate when needed.
try { this.source.tick?.(); }
catch (err) { this.source?.logger?.error?.(`tick threw: ${err.message}`); }
this._emitOutputs();
}, interval);
return;
}
// Event-driven default: domain emits 'output-changed' when its
// public output state shifts; adapter pushes outputs in response.
const emitter = this.source?.emitter;
if (emitter && typeof emitter.on === 'function') {
this._outputChangedListener = () => this._emitOutputs();
emitter.on('output-changed', this._outputChangedListener);
}
}
_emitOutputs() {
if (typeof this.source.getOutput !== 'function') return;
const raw = this.source.getOutput();
const cfg = this.source.config || this.config;
const processMsg = this._output.formatMsg(raw, cfg, 'process');
const influxMsg = this._output.formatMsg(raw, cfg, 'influxdb');
this.node.send([processMsg, influxMsg, null]);
}
_attachInputHandler() {
this.node.on('input', async (msg, send, done) => {
try {
await this._commands.dispatch(msg, this.source, {
node: this.node,
RED: this.RED,
send,
logger: this.source?.logger,
});
if (typeof this.extraInputDispatch === 'function') {
await this.extraInputDispatch(msg, send, done);
}
} catch (err) {
this.source?.logger?.error?.(err.message);
} finally {
if (typeof done === 'function') done();
}
});
}
_attachCloseHandler() {
this.node.on('close', (done) => {
try {
if (this._tickInterval) {
clearInterval(this._tickInterval);
this._tickInterval = null;
}
if (this._outputChangedListener && this.source?.emitter?.off) {
this.source.emitter.off('output-changed', this._outputChangedListener);
this._outputChangedListener = null;
}
this._statusUpdater?.stop();
this.source?.close?.();
if (typeof this.extraClose === 'function') this.extraClose();
try { this.node.status({}); } catch (_) { /* best effort */ }
} catch (err) {
this.source?.logger?.error?.(`close handler threw: ${err.message}`);
} finally {
if (typeof done === 'function') done();
}
});
}
}
// Defaults overridable via subclass static fields.
BaseNodeAdapter.tickInterval = null;
BaseNodeAdapter.statusInterval = 1000;
module.exports = BaseNodeAdapter;