diff --git a/src/helper/outputUtils.js b/src/helper/outputUtils.js index 5d7f24f..aa0f00b 100644 --- a/src/helper/outputUtils.js +++ b/src/helper/outputUtils.js @@ -2,8 +2,16 @@ const { getFormatter } = require('./formatters'); //this class will handle the output events for the node red node class OutputUtils { - constructor() { + // `options.alwaysEmit` is an optional list of field keys that bypass delta + // compression: they are re-emitted on every tick even when unchanged. Use it + // sparingly for slowly-varying values that must still trace as a continuous + // line downstream (e.g. a pump's realized control position `ctrl`, which sits + // constant in steady state and otherwise produces ~1 point per long stretch — + // invisible in a Grafana timeseries with createEmpty:false). Defaults to none, + // so existing nodes keep pure delta-compression behaviour. + constructor(options = {}) { this.output = {}; + this.alwaysEmit = new Set(options.alwaysEmit || []); } checkForChanges(output, format) { @@ -13,7 +21,9 @@ class OutputUtils { this.output[format] = this.output[format] || {}; const changedFields = {}; for (const key in output) { - if (Object.prototype.hasOwnProperty.call(output, key) && output[key] !== this.output[format][key]) { + if (!Object.prototype.hasOwnProperty.call(output, key)) continue; + const forced = this.alwaysEmit.has(key) && output[key] !== undefined; + if (forced || output[key] !== this.output[format][key]) { let value = output[key]; // For fields: if the value is an object (and not a Date), stringify it. if (value !== null && typeof value === 'object' && !(value instanceof Date)) { @@ -79,7 +89,13 @@ class OutputUtils { for (const key in obj) { if (Object.prototype.hasOwnProperty.call(obj, key)) { const value = obj[key]; - if (value !== null && typeof value === 'object' && !(value instanceof Date)) { + // Skip tags that carry no information. When a config field is unset, + // extractRelevantConfig hands us `undefined`; stringifying that wrote + // literal `category="undefined"` / `geoLocation="undefined"` tags that + // clutter every Grafana legend and needlessly inflate tag cardinality. + // Drop null / undefined / empty-string before they reach InfluxDB. + if (value === null || value === undefined || value === '') continue; + if (typeof value === 'object' && !(value instanceof Date)) { // Recursively flatten the nested object. const flatChild = this.flattenTags(value); for (const childKey in flatChild) { diff --git a/src/nodered/BaseNodeAdapter.js b/src/nodered/BaseNodeAdapter.js index 5110dbf..5273313 100644 --- a/src/nodered/BaseNodeAdapter.js +++ b/src/nodered/BaseNodeAdapter.js @@ -82,7 +82,9 @@ class BaseNodeAdapter { // pumpingStation/measurement nodeClass _attachInputHandler patterns. this.node.source = this.source; - this._output = new OutputUtils(); + // `static alwaysEmitFields = ['ctrl', …]` on a subclass exempts those + // fields from delta compression so they trace continuously downstream. + this._output = new OutputUtils({ alwaysEmit: ctor.alwaysEmitFields }); const userHasUnitsQuery = ctor.commands.some( (c) => c && (c.topic === 'query.units' || (Array.isArray(c.aliases) && c.aliases.includes('query.units')))); const mergedCommands = userHasUnitsQuery diff --git a/src/state/movementManager.js b/src/state/movementManager.js index e3a0c37..18e6ecb 100644 --- a/src/state/movementManager.js +++ b/src/state/movementManager.js @@ -79,65 +79,70 @@ class movementManager { // Clamp the final target into [minPosition, maxPosition] targetPosition = this.constrain(targetPosition); - // Compute direction and remaining distance - const direction = targetPosition > this.currentPosition ? 1 : -1; - const distance = Math.abs(targetPosition - this.currentPosition); + // Snapshot the starting point. Position is derived from ELAPSED WALL-TIME + // (not accumulated per-tick steps) so an interruption that lands between + // ticks — or before the very first tick — still leaves currentPosition at + // the real distance travelled. A fast re-commanding parent (e.g. MGC + // updating demand every tick) then re-bases from the true position instead + // of freezing at the start. See _settleAt / the abort handler below. + const startPosition = this.currentPosition; + const direction = targetPosition > startPosition ? 1 : -1; + const distance = Math.abs(targetPosition - startPosition); const velocity = this.getVelocity(); // units per second if (velocity <= 0) { return reject(new Error("Movement aborted: zero speed")); } - // Duration and bookkeeping - const duration = distance / velocity; // seconds to go the remaining distance - this.timeleft = duration; + const duration = distance / velocity; // seconds to go the full distance + this.timeleft = duration; this.logger.debug( `Linear move: dir=${direction}, dist=${distance}, vel=${velocity.toFixed(2)} u/s, dur=${duration.toFixed(2)}s` ); - // Compute how much to move each tick - const intervalMs = this.interval; - const intervalSec = intervalMs / 1000; - const stepSize = direction * velocity * intervalSec; + const intervalMs = this.interval; + const startTime = Date.now(); - const startTime = Date.now(); + // Position reached after `elapsedSec` of travel, clamped to the target. + const posAt = (elapsedSec) => + this.constrain(startPosition + direction * Math.min(distance, velocity * elapsedSec)); + // Re-base currentPosition (and timeleft) onto the real elapsed progress. + const settle = () => { + const elapsed = (Date.now() - startTime) / 1000; + this.currentPosition = posAt(elapsed); + this.timeleft = Math.max(0, duration - elapsed); + this.emitPos(this.currentPosition); + return elapsed; + }; // Kick off the loop const intervalId = setInterval(() => { - // 7a) Abort check if (signal?.aborted) { clearInterval(intervalId); + settle(); return reject(new Error("Movement aborted")); } - // Advance position and clamp - this.currentPosition += stepSize; - this.currentPosition = this.constrain(this.currentPosition); - this.emitPos(this.currentPosition); - - // Update timeleft - const elapsed = (Date.now() - startTime) / 1000; - this.timeleft = Math.max(0, duration - elapsed); - + const elapsed = settle(); this.logger.debug( `pos=${this.currentPosition.toFixed(2)}, timeleft=${this.timeleft.toFixed(2)}` ); - // Completed the move? - if ( - (direction > 0 && this.currentPosition >= targetPosition) || - (direction < 0 && this.currentPosition <= targetPosition) - ) { + // Completed the move? (time-based so it can't overshoot/undershoot) + if (elapsed >= duration) { clearInterval(intervalId); this.currentPosition = targetPosition; + this.timeleft = 0; this.emitPos(this.currentPosition); return resolve("Reached target move."); } }, intervalMs); - // 8) Also catch aborts that happen before the first tick + // Catch aborts that happen between ticks (incl. before the first tick): + // capture the partial progress so the move re-bases instead of freezing. signal?.addEventListener("abort", () => { clearInterval(intervalId); + settle(); reject(new Error("Movement aborted")); }); }); @@ -213,8 +218,8 @@ class movementManager { return reject(new Error("Movement aborted")); } - const totalDistance = Math.abs(targetPosition - this.currentPosition); const startPosition = this.currentPosition; + const totalDistance = Math.abs(targetPosition - this.currentPosition); const velocity = this.getVelocity(); if (velocity <= 0) { return reject(new Error("Movement aborted: zero speed")); @@ -223,45 +228,53 @@ class movementManager { const easeFunction = (t) => t < 0.5 ? 4 * t * t * t : 1 - Math.pow(-2 * t + 2, 3) / 2; - let elapsedTime = 0; const duration = totalDistance / velocity; this.timeleft = duration; const interval = this.interval; + const startTime = Date.now(); + + // Position from ELAPSED WALL-TIME (eased), so an interruption between + // ticks re-bases from the real position rather than freezing at the + // start — same rationale as moveLinear. + const posAt = (elapsedSec) => { + const progress = duration > 0 ? Math.min(elapsedSec / duration, 1) : 1; + return startPosition + (targetPosition - startPosition) * easeFunction(progress); + }; + const settle = () => { + const elapsed = (Date.now() - startTime) / 1000; + this.currentPosition = posAt(elapsed); + this.timeleft = Math.max(0, duration - elapsed); + this.emitPos(this.currentPosition); + return elapsed; + }; // 2) Start the moving loop const intervalId = setInterval(() => { // 3) Check for abort on each tick if (signal?.aborted) { clearInterval(intervalId); + settle(); return reject(new Error("Movement aborted")); } - elapsedTime += interval / 1000; - const progress = Math.min(elapsedTime / duration, 1); - this.timeleft = duration - elapsedTime; - const easedProgress = easeFunction(progress); - const newPosition = - startPosition + (targetPosition - startPosition) * easedProgress; - - this.emitPos(newPosition); + const elapsed = settle(); this.logger.debug( - `Using ${this.movementMode} => Progress=${progress.toFixed( - 2 - )}, Eased=${easedProgress.toFixed(2)}` + `Using ${this.movementMode} => elapsed=${elapsed.toFixed(2)}s, pos=${this.currentPosition.toFixed(2)}` ); - if (progress >= 1) { + if (elapsed >= duration) { clearInterval(intervalId); this.currentPosition = targetPosition; + this.timeleft = 0; + this.emitPos(this.currentPosition); resolve(`Reached target move.`); - } else { - this.currentPosition = newPosition; } }, interval); - // 4) Also listen once for abort before first tick + // 4) Capture partial progress on aborts between/before ticks. signal?.addEventListener("abort", () => { clearInterval(intervalId); + settle(); reject(new Error("Movement aborted")); }); }); diff --git a/test/movement-manager.test.js b/test/movement-manager.test.js new file mode 100644 index 0000000..3ebd25c --- /dev/null +++ b/test/movement-manager.test.js @@ -0,0 +1,78 @@ +const test = require('node:test'); +const assert = require('node:assert/strict'); +const EventEmitter = require('events'); + +const MovementManager = require('../src/state/movementManager'); + +const noopLogger = { debug() {}, info() {}, warn() {}, error() {} }; +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); + +function makeManager({ mode = 'staticspeed', speed = 50, interval = 1000, initial = 0 } = {}) { + // speed%/s on a 0..100 range → velocity = speed %/s. interval defaults to the + // production 1000ms so the abort-before-first-tick race is reproduced exactly. + return new MovementManager( + { + position: { min: 0, max: 100, initial }, + movement: { mode, speed, maxSpeed: 1000, interval }, + }, + noopLogger, + new EventEmitter(), + ); +} + +// Regression: before the time-based fix, currentPosition only advanced inside +// setInterval(…, interval). An abort landing before the first tick (the MGC's +// ~1s re-command cadence vs the 1000ms tick) left the pump frozen at the start. +for (const mode of ['staticspeed', 'dynspeed']) { + test(`${mode}: abort before the first tick still advances position (no freeze)`, async () => { + const mgr = makeManager({ mode, speed: 50, interval: 1000 }); + const ac = new AbortController(); + const moving = mgr.moveTo(80, ac.signal); // ~1.6s of travel; first tick at 1000ms + await sleep(200); // interrupt well before the first tick + ac.abort(); + await moving; + const pos = mgr.getCurrentPosition(); + // The fix: any non-zero progress means the abort re-based instead of + // freezing at the start. (dynspeed eases in, so its early travel is small + // but must still be > 0; staticspeed travels ~velocity·elapsed.) + assert.ok(pos > 0, `expected partial progress, got frozen at ${pos}`); + assert.ok(pos < 80, `should not have reached target, got ${pos}`); + }); + + test(`${mode}: a fresh setpoint re-bases from the interrupted position`, async () => { + const mgr = makeManager({ mode, speed: 50, interval: 1000 }); + const ac1 = new AbortController(); + const m1 = mgr.moveTo(80, ac1.signal); + await sleep(200); + ac1.abort(); + await m1; + const afterFirst = mgr.getCurrentPosition(); + + // New command toward 0 must start from afterFirst, not from 80 or a reset. + const ac2 = new AbortController(); + const m2 = mgr.moveTo(0, ac2.signal); + await sleep(100); + ac2.abort(); + await m2; + const afterSecond = mgr.getCurrentPosition(); + assert.ok(afterSecond < afterFirst, `expected re-base downward from ${afterFirst}, got ${afterSecond}`); + assert.ok(afterSecond >= 0, `position must stay in range, got ${afterSecond}`); + }); +} + +test('staticspeed: an uninterrupted move reaches the exact target', async () => { + const mgr = makeManager({ mode: 'staticspeed', speed: 500, interval: 10 }); // fast + await mgr.moveTo(40, new AbortController().signal); + assert.equal(mgr.getCurrentPosition(), 40); +}); + +test('position is clamped to [min,max] on a re-based abort', async () => { + const mgr = makeManager({ mode: 'staticspeed', speed: 5000, interval: 1000, initial: 0 }); + const ac = new AbortController(); + const moving = mgr.moveTo(100, ac.signal); + await sleep(150); + ac.abort(); + await moving; + const pos = mgr.getCurrentPosition(); + assert.ok(pos >= 0 && pos <= 100, `clamped, got ${pos}`); +}); diff --git a/test/output-utils.test.js b/test/output-utils.test.js index 144c550..88a95b8 100644 --- a/test/output-utils.test.js +++ b/test/output-utils.test.js @@ -30,6 +30,35 @@ test('process format emits message with changed fields only', () => { assert.deepEqual(third.payload, { b: 3, c: JSON.stringify({ x: 1 }) }); }); +test('alwaysEmit fields bypass delta compression (re-emitted while unchanged)', () => { + const out = new OutputUtils({ alwaysEmit: ['ctrl'] }); + + const first = out.formatMsg({ ctrl: 40, flow: 12 }, config, 'influxdb'); + assert.deepEqual(first.payload.fields, { ctrl: 40, flow: 12 }); + + // flow unchanged → dropped; ctrl unchanged but forced → still emitted. + const second = out.formatMsg({ ctrl: 40, flow: 12 }, config, 'influxdb'); + assert.deepEqual(second.payload.fields, { ctrl: 40 }); + + // ctrl changed → emitted with its new value. + const third = out.formatMsg({ ctrl: 41, flow: 12 }, config, 'influxdb'); + assert.deepEqual(third.payload.fields, { ctrl: 41 }); +}); + +test('alwaysEmit is per-format and does not force a missing/undefined field', () => { + const out = new OutputUtils({ alwaysEmit: ['ctrl'] }); + // ctrl absent from the output → nothing to force; with no other change the + // message is suppressed as usual. + out.formatMsg({ flow: 5 }, config, 'influxdb'); + assert.equal(out.formatMsg({ flow: 5 }, config, 'influxdb'), null); +}); + +test('default OutputUtils keeps pure delta compression (no alwaysEmit)', () => { + const out = new OutputUtils(); + out.formatMsg({ ctrl: 40 }, config, 'influxdb'); + assert.equal(out.formatMsg({ ctrl: 40 }, config, 'influxdb'), null); +}); + test('influx format flattens tags and stringifies tag values', () => { const out = new OutputUtils(); const msg = out.formatMsg({ value: 10 }, config, 'influxdb'); @@ -41,3 +70,38 @@ test('influx format flattens tags and stringifies tag values', () => { assert.equal(msg.payload.tags.tagcode, 't1'); assert.ok(msg.payload.timestamp instanceof Date); }); + +test('influx format omits tags whose config value is unset', () => { + const out = new OutputUtils(); + // No asset block at all: uuid/tagcode/geoLocation/category/type/model are + // all undefined and must NOT appear as `="undefined"` tags. + const sparse = { + functionality: { softwareType: 'measurement' }, + general: { id: 'abc' }, + }; + const msg = out.formatMsg({ value: 10 }, sparse, 'influxdb'); + + for (const t of ['geoLocation', 'category', 'type', 'model', 'uuid', 'tagcode', 'unit', 'role']) { + assert.ok(!(t in msg.payload.tags), `tag "${t}" should be omitted when unset, got "${msg.payload.tags[t]}"`); + } + // Tags that DO have values still come through. + assert.equal(msg.payload.tags.id, 'abc'); + assert.equal(msg.payload.tags.softwareType, 'measurement'); + // Nothing should stringify to the literal "undefined". + for (const v of Object.values(msg.payload.tags)) { + assert.notEqual(v, 'undefined'); + } +}); + +test('influx format drops empty-string tag values too', () => { + const out = new OutputUtils(); + const cfg = { + functionality: { softwareType: 'pump', role: '' }, + general: { id: 'p1' }, + asset: { category: '', model: 'M9' }, + }; + const msg = out.formatMsg({ value: 1 }, cfg, 'influxdb'); + assert.ok(!('role' in msg.payload.tags)); + assert.ok(!('category' in msg.payload.tags)); + assert.equal(msg.payload.tags.model, 'M9'); +});