Compare commits

...

3 Commits

Author SHA1 Message Date
znetsixe
ce4fb4e5d0 feat(commandRegistry): unify command envelope — origin, unit shorthand, always-convert
Shared command-dispatch layer used by every EVOLV node:
- Always-convert: numeric strings ("60") and {value:"60"} now normalise +
  convert like numbers; closes the gap where strings reached handlers raw.
- unit: 'm3/h' shorthand on descriptors; measure is derived from the unit
  (legacy units:{measure,default} still accepted, measure re-derived).
  Unrecognised declared unit throws at construction.
- msg.origin stamped on every dispatch (parent|GUI|fysical, default parent).
- Opt-in gated:true arbitration: accept only if origin in
  source.config.mode.allowedSources[currentMode]; advisory allow-all when a
  node has no mode model. Handles Set- or array-valued allowedSources.

+18 registry tests (45 total, green). All consumer nodes verified green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 18:41:00 +02:00
znetsixe
5c091cdce9 feat(config): add planner.emergencyPressurePa for MGC rendezvous emergency bypass
Documented, defaults to null (inert). When set, the MGC pre-empts an in-flight
rendezvous lock and re-plans immediately if the resolved header pressure reaches
this canonical-Pa threshold. Mechanism is wired + tested; never fires until a
real value is configured.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 17:47:57 +02:00
znetsixe
c0be50d02c feat(output): alwaysEmit fields, drop undefined/empty Influx tags, time-based movement re-basing
- OutputUtils: new `alwaysEmit` option exempts named fields from delta
  compression so steady-state values (e.g. ctrl) trace continuously.
- flattenTags now drops null/undefined/empty-string tag values, fixing
  literal `category="undefined"` tags that split every Grafana series in two.
- BaseNodeAdapter wires `static alwaysEmitFields` from the subclass.
- movementManager: track position by elapsed wall-time and capture partial
  progress on abort, so a fast-re-commanding parent can't freeze an actuator
  at its start position.
- Tests for the above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 16:09:14 +02:00
8 changed files with 448 additions and 67 deletions

View File

@@ -149,6 +149,13 @@
"type": "boolean", "type": "boolean",
"description": "If true, every dispatch is routed through the rendezvous planner regardless of control strategy: per-pump moves are delayed so all pumps reach their setpoint at the same wall-clock instant t* = max(eta_i). If false, all flowmovement commands fire immediately and each pump ramps at its own speed (legacy behaviour)." "description": "If true, every dispatch is routed through the rendezvous planner regardless of control strategy: per-pump moves are delayed so all pumps reach their setpoint at the same wall-clock instant t* = max(eta_i). If false, all flowmovement commands fire immediately and each pump ramps at its own speed (legacy behaviour)."
} }
},
"emergencyPressurePa": {
"default": null,
"rules": {
"type": "number",
"description": "Safety threshold (canonical Pa) for the rendezvous emergency bypass. While a rendezvous is in flight new setpoints are locked out and queued sequentially; if the resolved header pressure reaches this value the lock is pre-empted and the group re-plans immediately. Null/unset (the default) leaves the bypass mechanism wired but INERT — it never fires until a real threshold is configured."
}
} }
}, },
"mode": { "mode": {

View File

@@ -2,8 +2,16 @@ const { getFormatter } = require('./formatters');
//this class will handle the output events for the node red node //this class will handle the output events for the node red node
class OutputUtils { class OutputUtils {
constructor() { // `options.alwaysEmit` is an optional list of field keys that bypass delta
// compression: they are re-emitted on every tick even when unchanged. Use it
// sparingly for slowly-varying values that must still trace as a continuous
// line downstream (e.g. a pump's realized control position `ctrl`, which sits
// constant in steady state and otherwise produces ~1 point per long stretch —
// invisible in a Grafana timeseries with createEmpty:false). Defaults to none,
// so existing nodes keep pure delta-compression behaviour.
constructor(options = {}) {
this.output = {}; this.output = {};
this.alwaysEmit = new Set(options.alwaysEmit || []);
} }
checkForChanges(output, format) { checkForChanges(output, format) {
@@ -13,7 +21,9 @@ class OutputUtils {
this.output[format] = this.output[format] || {}; this.output[format] = this.output[format] || {};
const changedFields = {}; const changedFields = {};
for (const key in output) { for (const key in output) {
if (Object.prototype.hasOwnProperty.call(output, key) && output[key] !== this.output[format][key]) { if (!Object.prototype.hasOwnProperty.call(output, key)) continue;
const forced = this.alwaysEmit.has(key) && output[key] !== undefined;
if (forced || output[key] !== this.output[format][key]) {
let value = output[key]; let value = output[key];
// For fields: if the value is an object (and not a Date), stringify it. // For fields: if the value is an object (and not a Date), stringify it.
if (value !== null && typeof value === 'object' && !(value instanceof Date)) { if (value !== null && typeof value === 'object' && !(value instanceof Date)) {
@@ -79,7 +89,13 @@ class OutputUtils {
for (const key in obj) { for (const key in obj) {
if (Object.prototype.hasOwnProperty.call(obj, key)) { if (Object.prototype.hasOwnProperty.call(obj, key)) {
const value = obj[key]; const value = obj[key];
if (value !== null && typeof value === 'object' && !(value instanceof Date)) { // Skip tags that carry no information. When a config field is unset,
// extractRelevantConfig hands us `undefined`; stringifying that wrote
// literal `category="undefined"` / `geoLocation="undefined"` tags that
// clutter every Grafana legend and needlessly inflate tag cardinality.
// Drop null / undefined / empty-string before they reach InfluxDB.
if (value === null || value === undefined || value === '') continue;
if (typeof value === 'object' && !(value instanceof Date)) {
// Recursively flatten the nested object. // Recursively flatten the nested object.
const flatChild = this.flattenTags(value); const flatChild = this.flattenTags(value);
for (const childKey in flatChild) { for (const childKey in flatChild) {

View File

@@ -82,7 +82,9 @@ class BaseNodeAdapter {
// pumpingStation/measurement nodeClass _attachInputHandler patterns. // pumpingStation/measurement nodeClass _attachInputHandler patterns.
this.node.source = this.source; this.node.source = this.source;
this._output = new OutputUtils(); // `static alwaysEmitFields = ['ctrl', …]` on a subclass exempts those
// fields from delta compression so they trace continuously downstream.
this._output = new OutputUtils({ alwaysEmit: ctor.alwaysEmitFields });
const userHasUnitsQuery = ctor.commands.some( const userHasUnitsQuery = ctor.commands.some(
(c) => c && (c.topic === 'query.units' || (Array.isArray(c.aliases) && c.aliases.includes('query.units')))); (c) => c && (c.topic === 'query.units' || (Array.isArray(c.aliases) && c.aliases.includes('query.units'))));
const mergedCommands = userHasUnitsQuery const mergedCommands = userHasUnitsQuery

View File

@@ -26,14 +26,60 @@ function _describeUnit(unit) {
try { return convert().describe(unit); } catch (_) { return null; } try { return convert().describe(unit); } catch (_) { return null; }
} }
// A numeric scalar is a finite number, or a non-empty string that parses to a
// finite number ("60", "1.5"). Node-RED inject/`change` nodes and upstream MQTT
// payloads routinely arrive as strings; treating them as non-numeric here is the
// gap that let values reach a handler unconverted.
function _asNumber(x) {
if (typeof x === 'number') return Number.isFinite(x) ? x : null;
if (typeof x === 'string' && x.trim() !== '') {
const n = Number(x);
return Number.isFinite(n) ? n : null;
}
return null;
}
function _extractValueAndUnit(msg) { function _extractValueAndUnit(msg) {
if (!msg || typeof msg !== 'object') return null; if (!msg || typeof msg !== 'object') return null;
const p = msg.payload; const p = msg.payload;
if (typeof p === 'number') return { value: p, unit: msg.unit }; if (p && typeof p === 'object') {
if (p && typeof p === 'object' && typeof p.value === 'number') { const value = _asNumber(p.value);
return { value: p.value, unit: p.unit ?? msg.unit }; if (value === null) return null;
return { value, unit: p.unit ?? msg.unit };
} }
return null; const value = _asNumber(p);
if (value === null) return null;
return { value, unit: msg.unit };
}
// Derive the dimensional measure (e.g. 'volumeFlowRate') from a unit string.
// Returns null when convert doesn't recognise the unit.
function _measureOf(unit) {
const desc = _describeUnit(unit);
return desc ? desc.measure : null;
}
// Command origin = which control authority issued this message (the rotatingMachine
// `allowedSources` vocabulary: 'parent' = automation/parent controller, 'GUI' =
// SCADA/HMI operator, 'fysical' = physical buttons). Default 'parent'. Named
// `origin` on the message because `source` is already the domain instance handed
// to handlers.
const DEFAULT_ORIGIN = 'parent';
function _resolveOrigin(msg, descriptor) {
const o = msg && typeof msg.origin === 'string' && msg.origin.trim() !== ''
? msg.origin.trim()
: (descriptor.defaultOrigin || DEFAULT_ORIGIN);
return o;
}
// allowedSources values may be a Set (post config processing, as rotatingMachine
// stores them) or a plain array (raw config / other nodes). Accept both.
function _setHas(coll, value) {
if (!coll) return false;
if (typeof coll.has === 'function') return coll.has(value);
if (Array.isArray(coll)) return coll.includes(value);
return false;
} }
class CommandRegistry { class CommandRegistry {
@@ -76,6 +122,8 @@ class CommandRegistry {
payloadSchema: cmd.payloadSchema || null, payloadSchema: cmd.payloadSchema || null,
description: typeof cmd.description === 'string' ? cmd.description : null, description: typeof cmd.description === 'string' ? cmd.description : null,
units, units,
gated: cmd.gated === true,
defaultOrigin: typeof cmd.defaultOrigin === 'string' ? cmd.defaultOrigin : null,
handler: cmd.handler, handler: cmd.handler,
}; };
this._byKey.set(cmd.topic, descriptor); this._byKey.set(cmd.topic, descriptor);
@@ -87,12 +135,25 @@ class CommandRegistry {
} }
_validateUnits(cmd) { _validateUnits(cmd) {
if (cmd.units === undefined || cmd.units === null) return null; // Two ways to declare the unit, normalised to the same internal shape:
const { measure, default: def } = cmd.units; // unit: 'm3/h' (preferred — measure derived)
if (typeof measure !== 'string' || measure.length === 0 || // units: { default: 'm3/h' } (measure derived)
typeof def !== 'string' || def.length === 0) { // units: { measure, default: 'm3/h' } (legacy — measure ignored, derived)
// The measure is always derived from the unit so it can never drift from it.
let def;
if (typeof cmd.unit === 'string') def = cmd.unit;
else if (cmd.units === undefined || cmd.units === null) return null;
else if (typeof cmd.units === 'string') def = cmd.units;
else def = cmd.units.default;
if (typeof def !== 'string' || def.length === 0) {
throw new TypeError( throw new TypeError(
`command '${cmd.topic}' units requires { measure: string, default: string }`); `command '${cmd.topic}' requires a unit string (unit: 'm3/h' or units: { default: 'm3/h' })`);
}
const measure = _measureOf(def);
if (!measure) {
throw new TypeError(
`command '${cmd.topic}' declares unit '${def}' which convert does not recognise`);
} }
return { measure, default: def }; return { measure, default: def };
} }
@@ -137,11 +198,31 @@ class CommandRegistry {
return; return;
} }
if (topic !== descriptor.topic) this._noteAlias(topic, descriptor.topic, log); if (topic !== descriptor.topic) this._noteAlias(topic, descriptor.topic, log);
// Always stamp the command origin so handlers + gating can rely on it.
msg.origin = _resolveOrigin(msg, descriptor);
if (!this._originAllowed(descriptor, source, msg.origin, log)) return;
if (descriptor.units) this._normaliseUnits(descriptor, msg, log); if (descriptor.units) this._normaliseUnits(descriptor, msg, log);
if (!this._validatePayload(descriptor, msg, log)) return; if (!this._validatePayload(descriptor, msg, log)) return;
return descriptor.handler(source, msg, ctx); return descriptor.handler(source, msg, ctx);
} }
// Mode-gated control-authority arbitration. Opt-in per command via
// `gated: true`. The asset's mode (e.g. rotatingMachine's auto /
// virtualControl / fysicalControl) decides which origins it accepts via
// `source.config.mode.allowedSources[mode]`. Release = changing the mode.
// Nodes without a mode model are advisory (allow-all) so this is inert
// until a node opts in — never a silent behaviour change.
_originAllowed(descriptor, source, origin, log) {
if (!descriptor.gated) return true;
const allowedSources = source && source.config && source.config.mode
? source.config.mode.allowedSources : null;
const mode = source ? source.currentMode : undefined;
if (!allowedSources || !mode) return true; // no mode model → advisory
if (_setHas(allowedSources[mode], origin)) return true;
log.warn?.(`${descriptor.topic}: origin '${origin}' not allowed in mode '${mode}'`);
return false;
}
_noteAlias(alias, canonical, log) { _noteAlias(alias, canonical, log) {
const prev = this._deprecationCounts.get(alias) || 0; const prev = this._deprecationCounts.get(alias) || 0;
this._deprecationCounts.set(alias, prev + 1); this._deprecationCounts.set(alias, prev + 1);

View File

@@ -79,65 +79,70 @@ class movementManager {
// Clamp the final target into [minPosition, maxPosition] // Clamp the final target into [minPosition, maxPosition]
targetPosition = this.constrain(targetPosition); targetPosition = this.constrain(targetPosition);
// Compute direction and remaining distance // Snapshot the starting point. Position is derived from ELAPSED WALL-TIME
const direction = targetPosition > this.currentPosition ? 1 : -1; // (not accumulated per-tick steps) so an interruption that lands between
const distance = Math.abs(targetPosition - this.currentPosition); // ticks — or before the very first tick — still leaves currentPosition at
// the real distance travelled. A fast re-commanding parent (e.g. MGC
// updating demand every tick) then re-bases from the true position instead
// of freezing at the start. See _settleAt / the abort handler below.
const startPosition = this.currentPosition;
const direction = targetPosition > startPosition ? 1 : -1;
const distance = Math.abs(targetPosition - startPosition);
const velocity = this.getVelocity(); // units per second const velocity = this.getVelocity(); // units per second
if (velocity <= 0) { if (velocity <= 0) {
return reject(new Error("Movement aborted: zero speed")); return reject(new Error("Movement aborted: zero speed"));
} }
// Duration and bookkeeping const duration = distance / velocity; // seconds to go the full distance
const duration = distance / velocity; // seconds to go the remaining distance this.timeleft = duration;
this.timeleft = duration;
this.logger.debug( this.logger.debug(
`Linear move: dir=${direction}, dist=${distance}, vel=${velocity.toFixed(2)} u/s, dur=${duration.toFixed(2)}s` `Linear move: dir=${direction}, dist=${distance}, vel=${velocity.toFixed(2)} u/s, dur=${duration.toFixed(2)}s`
); );
// Compute how much to move each tick const intervalMs = this.interval;
const intervalMs = this.interval; const startTime = Date.now();
const intervalSec = intervalMs / 1000;
const stepSize = direction * velocity * intervalSec;
const startTime = Date.now(); // Position reached after `elapsedSec` of travel, clamped to the target.
const posAt = (elapsedSec) =>
this.constrain(startPosition + direction * Math.min(distance, velocity * elapsedSec));
// Re-base currentPosition (and timeleft) onto the real elapsed progress.
const settle = () => {
const elapsed = (Date.now() - startTime) / 1000;
this.currentPosition = posAt(elapsed);
this.timeleft = Math.max(0, duration - elapsed);
this.emitPos(this.currentPosition);
return elapsed;
};
// Kick off the loop // Kick off the loop
const intervalId = setInterval(() => { const intervalId = setInterval(() => {
// 7a) Abort check
if (signal?.aborted) { if (signal?.aborted) {
clearInterval(intervalId); clearInterval(intervalId);
settle();
return reject(new Error("Movement aborted")); return reject(new Error("Movement aborted"));
} }
// Advance position and clamp const elapsed = settle();
this.currentPosition += stepSize;
this.currentPosition = this.constrain(this.currentPosition);
this.emitPos(this.currentPosition);
// Update timeleft
const elapsed = (Date.now() - startTime) / 1000;
this.timeleft = Math.max(0, duration - elapsed);
this.logger.debug( this.logger.debug(
`pos=${this.currentPosition.toFixed(2)}, timeleft=${this.timeleft.toFixed(2)}` `pos=${this.currentPosition.toFixed(2)}, timeleft=${this.timeleft.toFixed(2)}`
); );
// Completed the move? // Completed the move? (time-based so it can't overshoot/undershoot)
if ( if (elapsed >= duration) {
(direction > 0 && this.currentPosition >= targetPosition) ||
(direction < 0 && this.currentPosition <= targetPosition)
) {
clearInterval(intervalId); clearInterval(intervalId);
this.currentPosition = targetPosition; this.currentPosition = targetPosition;
this.timeleft = 0;
this.emitPos(this.currentPosition); this.emitPos(this.currentPosition);
return resolve("Reached target move."); return resolve("Reached target move.");
} }
}, intervalMs); }, intervalMs);
// 8) Also catch aborts that happen before the first tick // Catch aborts that happen between ticks (incl. before the first tick):
// capture the partial progress so the move re-bases instead of freezing.
signal?.addEventListener("abort", () => { signal?.addEventListener("abort", () => {
clearInterval(intervalId); clearInterval(intervalId);
settle();
reject(new Error("Movement aborted")); reject(new Error("Movement aborted"));
}); });
}); });
@@ -213,8 +218,8 @@ class movementManager {
return reject(new Error("Movement aborted")); return reject(new Error("Movement aborted"));
} }
const totalDistance = Math.abs(targetPosition - this.currentPosition);
const startPosition = this.currentPosition; const startPosition = this.currentPosition;
const totalDistance = Math.abs(targetPosition - this.currentPosition);
const velocity = this.getVelocity(); const velocity = this.getVelocity();
if (velocity <= 0) { if (velocity <= 0) {
return reject(new Error("Movement aborted: zero speed")); return reject(new Error("Movement aborted: zero speed"));
@@ -223,45 +228,53 @@ class movementManager {
const easeFunction = (t) => const easeFunction = (t) =>
t < 0.5 ? 4 * t * t * t : 1 - Math.pow(-2 * t + 2, 3) / 2; t < 0.5 ? 4 * t * t * t : 1 - Math.pow(-2 * t + 2, 3) / 2;
let elapsedTime = 0;
const duration = totalDistance / velocity; const duration = totalDistance / velocity;
this.timeleft = duration; this.timeleft = duration;
const interval = this.interval; const interval = this.interval;
const startTime = Date.now();
// Position from ELAPSED WALL-TIME (eased), so an interruption between
// ticks re-bases from the real position rather than freezing at the
// start — same rationale as moveLinear.
const posAt = (elapsedSec) => {
const progress = duration > 0 ? Math.min(elapsedSec / duration, 1) : 1;
return startPosition + (targetPosition - startPosition) * easeFunction(progress);
};
const settle = () => {
const elapsed = (Date.now() - startTime) / 1000;
this.currentPosition = posAt(elapsed);
this.timeleft = Math.max(0, duration - elapsed);
this.emitPos(this.currentPosition);
return elapsed;
};
// 2) Start the moving loop // 2) Start the moving loop
const intervalId = setInterval(() => { const intervalId = setInterval(() => {
// 3) Check for abort on each tick // 3) Check for abort on each tick
if (signal?.aborted) { if (signal?.aborted) {
clearInterval(intervalId); clearInterval(intervalId);
settle();
return reject(new Error("Movement aborted")); return reject(new Error("Movement aborted"));
} }
elapsedTime += interval / 1000; const elapsed = settle();
const progress = Math.min(elapsedTime / duration, 1);
this.timeleft = duration - elapsedTime;
const easedProgress = easeFunction(progress);
const newPosition =
startPosition + (targetPosition - startPosition) * easedProgress;
this.emitPos(newPosition);
this.logger.debug( this.logger.debug(
`Using ${this.movementMode} => Progress=${progress.toFixed( `Using ${this.movementMode} => elapsed=${elapsed.toFixed(2)}s, pos=${this.currentPosition.toFixed(2)}`
2
)}, Eased=${easedProgress.toFixed(2)}`
); );
if (progress >= 1) { if (elapsed >= duration) {
clearInterval(intervalId); clearInterval(intervalId);
this.currentPosition = targetPosition; this.currentPosition = targetPosition;
this.timeleft = 0;
this.emitPos(this.currentPosition);
resolve(`Reached target move.`); resolve(`Reached target move.`);
} else {
this.currentPosition = newPosition;
} }
}, interval); }, interval);
// 4) Also listen once for abort before first tick // 4) Capture partial progress on aborts between/before ticks.
signal?.addEventListener("abort", () => { signal?.addEventListener("abort", () => {
clearInterval(intervalId); clearInterval(intervalId);
settle();
reject(new Error("Movement aborted")); reject(new Error("Movement aborted"));
}); });
}); });

View File

@@ -394,43 +394,163 @@ test('units: object payload {value} without unit falls back to default unit sile
assert.equal(logger._calls.warn.length, 0); assert.equal(logger._calls.warn.length, 0);
}); });
test('units: non-numeric payload (no normalisation applied) passes through to handler', async () => { test('units: non-NUMERIC string payload (not normalisable) passes through to handler', async () => {
const logger = makeLogger(); const logger = makeLogger();
const seen = []; const seen = [];
const reg = createRegistry([{ const reg = createRegistry([{
topic: 'set.demand', topic: 'set.demand',
units: { measure: 'volumeFlowRate', default: 'm3/h' }, unit: 'm3/h',
handler: (_s, msg) => { seen.push(msg.payload); }, handler: (_s, msg) => { seen.push(msg.payload); },
}], { logger }); }], { logger });
// string payload — not normalisable. Should not crash; handler still fires. // non-numeric string — not normalisable. Should not crash; handler still fires.
await reg.dispatch({ topic: 'set.demand', payload: 'magic' }, {}, {}); await reg.dispatch({ topic: 'set.demand', payload: 'magic' }, {}, {});
assert.equal(seen.length, 1); assert.equal(seen.length, 1);
assert.equal(seen[0], 'magic'); assert.equal(seen[0], 'magic');
}); });
test('units: missing default field throws at construction', () => { test('units: NUMERIC string payload is always converted (closes the string gap)', async () => {
const logger = makeLogger();
const seen = [];
const reg = createRegistry([{
topic: 'set.demand',
unit: 'm3/h',
handler: (_s, msg) => { seen.push({ payload: msg.payload, unit: msg.unit }); },
}], { logger });
// "1" m3/s must convert to 3600 m3/h — same as the numeric-payload case.
await reg.dispatch({ topic: 'set.demand', payload: '1', unit: 'm3/s' }, {}, {});
assert.equal(seen.length, 1);
assert.ok(Math.abs(seen[0].payload - 3600) < 1e-6, `expected 3600, got ${seen[0].payload}`);
assert.equal(seen[0].unit, 'm3/h');
assert.equal(logger._calls.warn.length, 0);
});
test('units: { value: numeric-string } object payload is converted too', async () => {
const seen = [];
const reg = createRegistry([{
topic: 'set.pressure',
unit: 'Pa',
handler: (_s, msg) => { seen.push({ payload: msg.payload, unit: msg.unit }); },
}]);
await reg.dispatch({ topic: 'set.pressure', payload: { value: '5', unit: 'mbar' } }, {}, {});
assert.ok(Math.abs(seen[0].payload - 500) < 1e-6, `expected 500, got ${seen[0].payload}`);
assert.equal(seen[0].unit, 'Pa');
});
test('unit: shorthand declares the unit and DERIVES the measure', async () => {
const seen = [];
const reg = createRegistry([{
topic: 'set.demand',
unit: 'm3/h',
handler: (_s, msg) => { seen.push({ payload: msg.payload, unit: msg.unit }); },
}]);
await reg.dispatch({ topic: 'set.demand', payload: 1, unit: 'm3/s' }, {}, {});
assert.ok(Math.abs(seen[0].payload - 3600) < 1e-6);
assert.equal(seen[0].unit, 'm3/h');
});
test('units: { default } without measure derives the measure (measure no longer required)', () => {
const reg = createRegistry([{
topic: 'set.demand',
units: { default: 'm3/h' },
handler: () => {},
}]);
assert.deepEqual(reg.list()[0].units, { measure: 'volumeFlowRate', default: 'm3/h' });
});
test('units: legacy { measure, default } still works; measure is re-derived from the unit', () => {
const reg = createRegistry([{
topic: 'set.demand',
units: { measure: 'totallyWrong', default: 'm3/h' },
handler: () => {},
}]);
// declared measure ignored — derived from the unit so it can never drift.
assert.deepEqual(reg.list()[0].units, { measure: 'volumeFlowRate', default: 'm3/h' });
});
test('units: missing unit/default throws at construction', () => {
assert.throws(() => createRegistry([{ assert.throws(() => createRegistry([{
topic: 'set.demand', topic: 'set.demand',
units: { measure: 'volumeFlowRate' }, units: { measure: 'volumeFlowRate' },
handler: () => {}, handler: () => {},
}]), /units requires/); }]), /requires a unit string/);
}); });
test('units: missing measure field throws at construction', () => { test('units: unrecognised declared unit throws at construction (descriptor bug caught early)', () => {
assert.throws(() => createRegistry([{ assert.throws(() => createRegistry([{
topic: 'set.demand', topic: 'set.demand',
units: { default: 'm3/h' }, unit: 'flarbargs',
handler: () => {}, handler: () => {},
}]), /units requires/); }]), /convert does not recognise/);
}); });
test('units: descriptor.units surfaces in list() output', () => { test('units: descriptor.units surfaces in list() output', () => {
const reg = createRegistry([ const reg = createRegistry([
{ topic: 'set.demand', units: { measure: 'volumeFlowRate', default: 'm3/h' }, handler: () => {} }, { topic: 'set.demand', unit: 'm3/h', handler: () => {} },
{ topic: 'set.mode', handler: () => {} }, { topic: 'set.mode', handler: () => {} },
]); ]);
const list = reg.list(); const list = reg.list();
assert.deepEqual(list[0].units, { measure: 'volumeFlowRate', default: 'm3/h' }); assert.deepEqual(list[0].units, { measure: 'volumeFlowRate', default: 'm3/h' });
assert.equal(list[1].units, null); assert.equal(list[1].units, null);
}); });
// ---------------------------------------------------------------------------
// origin — command provenance + mode-gated control-authority arbitration
// ---------------------------------------------------------------------------
test('origin: defaults to parent and is stamped on msg before the handler runs', async () => {
const seen = [];
const reg = createRegistry([{ topic: 'set.mode', handler: (_s, msg) => { seen.push(msg.origin); } }]);
await reg.dispatch({ topic: 'set.mode', payload: 'auto' }, {}, {});
assert.equal(seen[0], 'parent');
});
test('origin: explicit msg.origin is preserved and trimmed', async () => {
const seen = [];
const reg = createRegistry([{ topic: 'set.mode', handler: (_s, msg) => { seen.push(msg.origin); } }]);
await reg.dispatch({ topic: 'set.mode', payload: 'auto', origin: ' GUI ' }, {}, {});
assert.equal(seen[0], 'GUI');
});
test('origin: defaultOrigin descriptor overrides the global parent default', async () => {
const seen = [];
const reg = createRegistry([{ topic: 'set.x', defaultOrigin: 'fysical', handler: (_s, msg) => { seen.push(msg.origin); } }]);
await reg.dispatch({ topic: 'set.x' }, {}, {});
assert.equal(seen[0], 'fysical');
});
test('origin gating: non-gated command is never blocked, even with a mode model', async () => {
let invoked = false;
const reg = createRegistry([{ topic: 'set.x', handler: () => { invoked = true; } }]);
const source = { currentMode: 'fysicalControl', config: { mode: { allowedSources: { fysicalControl: ['fysical'] } } } };
await reg.dispatch({ topic: 'set.x', origin: 'GUI' }, source, {});
assert.equal(invoked, true);
});
test('origin gating: gated command rejects an origin disallowed by the current mode', async () => {
const logger = makeLogger();
let invoked = false;
const reg = createRegistry([{ topic: 'set.x', gated: true, handler: () => { invoked = true; } }], { logger });
const source = { currentMode: 'fysicalControl', config: { mode: { allowedSources: { fysicalControl: ['fysical'] } } } };
await reg.dispatch({ topic: 'set.x', origin: 'GUI' }, source, {});
assert.equal(invoked, false);
assert.ok(logger._calls.warn.some((m) => m.includes("origin 'GUI' not allowed in mode 'fysicalControl'")));
});
test('origin gating: gated command accepts an allowed origin (Set or array allowedSources)', async () => {
let count = 0;
const reg = createRegistry([{ topic: 'set.x', gated: true, handler: () => { count += 1; } }]);
const arraySrc = { currentMode: 'auto', config: { mode: { allowedSources: { auto: ['parent', 'GUI', 'fysical'] } } } };
const setSrc = { currentMode: 'auto', config: { mode: { allowedSources: { auto: new Set(['parent', 'GUI', 'fysical']) } } } };
await reg.dispatch({ topic: 'set.x', origin: 'GUI' }, arraySrc, {});
await reg.dispatch({ topic: 'set.x', origin: 'GUI' }, setSrc, {});
assert.equal(count, 2);
});
test('origin gating: gated command on a node WITHOUT a mode model is advisory (allow-all)', async () => {
let invoked = false;
const reg = createRegistry([{ topic: 'set.x', gated: true, handler: () => { invoked = true; } }]);
await reg.dispatch({ topic: 'set.x', origin: 'GUI' }, { id: 'no-mode' }, {});
assert.equal(invoked, true);
});

View File

@@ -0,0 +1,78 @@
const test = require('node:test');
const assert = require('node:assert/strict');
const EventEmitter = require('events');
const MovementManager = require('../src/state/movementManager');
const noopLogger = { debug() {}, info() {}, warn() {}, error() {} };
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
function makeManager({ mode = 'staticspeed', speed = 50, interval = 1000, initial = 0 } = {}) {
// speed%/s on a 0..100 range → velocity = speed %/s. interval defaults to the
// production 1000ms so the abort-before-first-tick race is reproduced exactly.
return new MovementManager(
{
position: { min: 0, max: 100, initial },
movement: { mode, speed, maxSpeed: 1000, interval },
},
noopLogger,
new EventEmitter(),
);
}
// Regression: before the time-based fix, currentPosition only advanced inside
// setInterval(…, interval). An abort landing before the first tick (the MGC's
// ~1s re-command cadence vs the 1000ms tick) left the pump frozen at the start.
for (const mode of ['staticspeed', 'dynspeed']) {
test(`${mode}: abort before the first tick still advances position (no freeze)`, async () => {
const mgr = makeManager({ mode, speed: 50, interval: 1000 });
const ac = new AbortController();
const moving = mgr.moveTo(80, ac.signal); // ~1.6s of travel; first tick at 1000ms
await sleep(200); // interrupt well before the first tick
ac.abort();
await moving;
const pos = mgr.getCurrentPosition();
// The fix: any non-zero progress means the abort re-based instead of
// freezing at the start. (dynspeed eases in, so its early travel is small
// but must still be > 0; staticspeed travels ~velocity·elapsed.)
assert.ok(pos > 0, `expected partial progress, got frozen at ${pos}`);
assert.ok(pos < 80, `should not have reached target, got ${pos}`);
});
test(`${mode}: a fresh setpoint re-bases from the interrupted position`, async () => {
const mgr = makeManager({ mode, speed: 50, interval: 1000 });
const ac1 = new AbortController();
const m1 = mgr.moveTo(80, ac1.signal);
await sleep(200);
ac1.abort();
await m1;
const afterFirst = mgr.getCurrentPosition();
// New command toward 0 must start from afterFirst, not from 80 or a reset.
const ac2 = new AbortController();
const m2 = mgr.moveTo(0, ac2.signal);
await sleep(100);
ac2.abort();
await m2;
const afterSecond = mgr.getCurrentPosition();
assert.ok(afterSecond < afterFirst, `expected re-base downward from ${afterFirst}, got ${afterSecond}`);
assert.ok(afterSecond >= 0, `position must stay in range, got ${afterSecond}`);
});
}
test('staticspeed: an uninterrupted move reaches the exact target', async () => {
const mgr = makeManager({ mode: 'staticspeed', speed: 500, interval: 10 }); // fast
await mgr.moveTo(40, new AbortController().signal);
assert.equal(mgr.getCurrentPosition(), 40);
});
test('position is clamped to [min,max] on a re-based abort', async () => {
const mgr = makeManager({ mode: 'staticspeed', speed: 5000, interval: 1000, initial: 0 });
const ac = new AbortController();
const moving = mgr.moveTo(100, ac.signal);
await sleep(150);
ac.abort();
await moving;
const pos = mgr.getCurrentPosition();
assert.ok(pos >= 0 && pos <= 100, `clamped, got ${pos}`);
});

View File

@@ -30,6 +30,35 @@ test('process format emits message with changed fields only', () => {
assert.deepEqual(third.payload, { b: 3, c: JSON.stringify({ x: 1 }) }); assert.deepEqual(third.payload, { b: 3, c: JSON.stringify({ x: 1 }) });
}); });
test('alwaysEmit fields bypass delta compression (re-emitted while unchanged)', () => {
const out = new OutputUtils({ alwaysEmit: ['ctrl'] });
const first = out.formatMsg({ ctrl: 40, flow: 12 }, config, 'influxdb');
assert.deepEqual(first.payload.fields, { ctrl: 40, flow: 12 });
// flow unchanged → dropped; ctrl unchanged but forced → still emitted.
const second = out.formatMsg({ ctrl: 40, flow: 12 }, config, 'influxdb');
assert.deepEqual(second.payload.fields, { ctrl: 40 });
// ctrl changed → emitted with its new value.
const third = out.formatMsg({ ctrl: 41, flow: 12 }, config, 'influxdb');
assert.deepEqual(third.payload.fields, { ctrl: 41 });
});
test('alwaysEmit is per-format and does not force a missing/undefined field', () => {
const out = new OutputUtils({ alwaysEmit: ['ctrl'] });
// ctrl absent from the output → nothing to force; with no other change the
// message is suppressed as usual.
out.formatMsg({ flow: 5 }, config, 'influxdb');
assert.equal(out.formatMsg({ flow: 5 }, config, 'influxdb'), null);
});
test('default OutputUtils keeps pure delta compression (no alwaysEmit)', () => {
const out = new OutputUtils();
out.formatMsg({ ctrl: 40 }, config, 'influxdb');
assert.equal(out.formatMsg({ ctrl: 40 }, config, 'influxdb'), null);
});
test('influx format flattens tags and stringifies tag values', () => { test('influx format flattens tags and stringifies tag values', () => {
const out = new OutputUtils(); const out = new OutputUtils();
const msg = out.formatMsg({ value: 10 }, config, 'influxdb'); const msg = out.formatMsg({ value: 10 }, config, 'influxdb');
@@ -41,3 +70,38 @@ test('influx format flattens tags and stringifies tag values', () => {
assert.equal(msg.payload.tags.tagcode, 't1'); assert.equal(msg.payload.tags.tagcode, 't1');
assert.ok(msg.payload.timestamp instanceof Date); assert.ok(msg.payload.timestamp instanceof Date);
}); });
test('influx format omits tags whose config value is unset', () => {
const out = new OutputUtils();
// No asset block at all: uuid/tagcode/geoLocation/category/type/model are
// all undefined and must NOT appear as `="undefined"` tags.
const sparse = {
functionality: { softwareType: 'measurement' },
general: { id: 'abc' },
};
const msg = out.formatMsg({ value: 10 }, sparse, 'influxdb');
for (const t of ['geoLocation', 'category', 'type', 'model', 'uuid', 'tagcode', 'unit', 'role']) {
assert.ok(!(t in msg.payload.tags), `tag "${t}" should be omitted when unset, got "${msg.payload.tags[t]}"`);
}
// Tags that DO have values still come through.
assert.equal(msg.payload.tags.id, 'abc');
assert.equal(msg.payload.tags.softwareType, 'measurement');
// Nothing should stringify to the literal "undefined".
for (const v of Object.values(msg.payload.tags)) {
assert.notEqual(v, 'undefined');
}
});
test('influx format drops empty-string tag values too', () => {
const out = new OutputUtils();
const cfg = {
functionality: { softwareType: 'pump', role: '' },
general: { id: 'p1' },
asset: { category: '', model: 'M9' },
};
const msg = out.formatMsg({ value: 1 }, cfg, 'influxdb');
assert.ok(!('role' in msg.payload.tags));
assert.ok(!('category' in msg.payload.tags));
assert.equal(msg.payload.tags.model, 'M9');
});