feat: digital (MQTT) mode + fix silent dispatcher bug for camelCase methods
Runtime:
- Fix silent no-op when user selected any camelCase smoothing or outlier
method from the editor. validateEnum in generalFunctions lowercases enum
values (zScore -> zscore, lowPass -> lowpass, ...) but the dispatcher
compared against camelCase keys. Effect: 5 of 11 smoothing methods
(lowPass, highPass, weightedMovingAverage, bandPass, savitzkyGolay) and
2 of 3 outlier methods (zScore, modifiedZScore) silently fell through.
Users got the raw last value or no outlier filtering with no error log.
Review any pre-2026-04-13 flows that relied on these methods.
Fix: normalize method names to lowercase on both sides of the lookup.
- New Channel class (src/channel.js) — self-contained per-channel pipeline:
outlier -> offset -> scaling -> smoothing -> min/max -> constrain -> emit.
Pure domain logic, no Node-RED deps, reusable by future nodes that need
the same signal-conditioning chain.
Digital mode:
- config.mode.current = 'digital' opts in. config.channels declares one
entry per expected JSON key; each channel has its own type, position,
unit, distance, and optional scaling/smoothing/outlierDetection blocks
that override the top-level analog-mode fields. One MQTT-shaped payload
({t:22.5, h:45, p:1013}) dispatches N independent pipelines and emits N
MeasurementContainer slots from a single input message.
- Backward compatible: absent mode config = analog = pre-digital behaviour.
Every existing measurement flow keeps working unchanged.
UI:
- HTML editor: new Mode dropdown and Channels JSON textarea. The Node-RED
help panel is rewritten end-to-end with topic reference, port contracts,
per-mode configuration, smoothing/outlier method tables, and a note
about the pre-fix behaviour.
- README.md rewritten (was a one-line stub).
Tests (12 -> 71, all green):
- test/basic/smoothing-methods.basic.test.js (+16): every smoothing method
including the formerly-broken camelCase ones.
- test/basic/outlier-detection.basic.test.js (+10): every outlier method,
fall-through, toggle.
- test/basic/scaling-and-interpolation.basic.test.js (+10): offset,
interpolateLinear, constrain, handleScaling edge cases, min/max
tracking, updateOutputPercent fallback, updateOutputAbs emit dedup.
- test/basic/calibration-and-stability.basic.test.js (+11): calibrate
(stable and unstable), isStable, evaluateRepeatability refusals,
toggleSimulation, tick simulation on/off.
- test/integration/digital-mode.integration.test.js (+12): channel build
(including malformed entries), payload dispatch, multi-channel emit,
unknown keys, per-channel scaling/smoothing/outlier, empty channels,
non-numeric value rejection, getDigitalOutput shape, analog-default
back-compat.
E2E verified on Dockerized Node-RED: analog regression unchanged; digital
mode deploys with three channels, dispatches MQTT-style payload, emits
per-channel events, accumulates per-channel smoothing, ignores unknown
keys.
Depends on generalFunctions commit e50be2e (permissive unit check +
mode/channels schema).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
311
src/channel.js
Normal file
311
src/channel.js
Normal file
@@ -0,0 +1,311 @@
|
||||
/**
|
||||
* Channel — a single scalar measurement pipeline.
|
||||
*
|
||||
* A Channel owns one rolling window of stored values, one smoothing method,
|
||||
* one outlier detector, one scaling contract, and one MeasurementContainer
|
||||
* slot. It exposes `update(value)` as the single entry point.
|
||||
*
|
||||
* The measurement node composes Channels:
|
||||
* - analog mode -> exactly one Channel built from the flat top-level config
|
||||
* - digital mode -> one Channel per `config.channels[i]` entry, keyed by
|
||||
* `channel.key` (the field inside msg.payload that feeds it)
|
||||
*
|
||||
* This file is pure domain logic. It must never reach into Node-RED APIs.
|
||||
*/
|
||||
|
||||
class Channel {
|
||||
/**
|
||||
* @param {object} opts
|
||||
* @param {string} opts.key - identifier inside an incoming object payload (digital) or null (analog)
|
||||
* @param {string} opts.type - MeasurementContainer axis (e.g. 'pressure')
|
||||
* @param {string} opts.position - 'upstream' | 'atEquipment' | 'downstream'
|
||||
* @param {string} opts.unit - output unit label (e.g. 'mbar')
|
||||
* @param {number|null} opts.distance - physical offset from parent equipment
|
||||
* @param {object} opts.scaling - {enabled, inputMin, inputMax, absMin, absMax, offset}
|
||||
* @param {object} opts.smoothing - {smoothWindow, smoothMethod}
|
||||
* @param {object} [opts.outlierDetection] - {enabled, method, threshold}
|
||||
* @param {object} opts.interpolation - {percentMin, percentMax}
|
||||
* @param {object} opts.measurements - the MeasurementContainer to publish into
|
||||
* @param {object} opts.logger - generalFunctions logger instance
|
||||
*/
|
||||
constructor(opts) {
|
||||
this.key = opts.key || null;
|
||||
this.type = opts.type;
|
||||
this.position = opts.position;
|
||||
this.unit = opts.unit;
|
||||
this.distance = opts.distance ?? null;
|
||||
|
||||
this.scaling = { ...opts.scaling };
|
||||
this.smoothing = { ...opts.smoothing };
|
||||
this.outlierDetection = opts.outlierDetection ? { ...opts.outlierDetection } : { enabled: false, method: 'zscore', threshold: 3 };
|
||||
this.interpolation = { ...(opts.interpolation || { percentMin: 0, percentMax: 100 }) };
|
||||
|
||||
this.measurements = opts.measurements;
|
||||
this.logger = opts.logger;
|
||||
|
||||
this.storedValues = [];
|
||||
this.inputValue = 0;
|
||||
this.outputAbs = 0;
|
||||
this.outputPercent = 0;
|
||||
|
||||
this.totalMinValue = Infinity;
|
||||
this.totalMaxValue = -Infinity;
|
||||
this.totalMinSmooth = 0;
|
||||
this.totalMaxSmooth = 0;
|
||||
|
||||
this.inputRange = Math.abs(this.scaling.inputMax - this.scaling.inputMin);
|
||||
this.processRange = Math.abs(this.scaling.absMax - this.scaling.absMin);
|
||||
}
|
||||
|
||||
// --- Public entry point ---
|
||||
|
||||
/**
|
||||
* Push a new scalar value through the full pipeline:
|
||||
* outlier -> offset -> scaling -> smoothing -> min/max -> emit
|
||||
* @param {number} value
|
||||
* @returns {boolean} true if the value advanced the pipeline (not rejected as outlier)
|
||||
*/
|
||||
update(value) {
|
||||
this.inputValue = value;
|
||||
|
||||
if (this.outlierDetection.enabled && this._isOutlier(value)) {
|
||||
this.logger?.warn?.(`[${this.key || this.type}] Outlier detected. Ignoring value=${value}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
let v = value + (this.scaling.offset || 0);
|
||||
this._updateMinMax(v);
|
||||
|
||||
if (this.scaling.enabled) {
|
||||
v = this._applyScaling(v);
|
||||
}
|
||||
|
||||
const smoothed = this._applySmoothing(v);
|
||||
this._updateSmoothMinMax(smoothed);
|
||||
this._writeOutput(smoothed);
|
||||
return true;
|
||||
}
|
||||
|
||||
getOutput() {
|
||||
return {
|
||||
key: this.key,
|
||||
type: this.type,
|
||||
position: this.position,
|
||||
unit: this.unit,
|
||||
mAbs: this.outputAbs,
|
||||
mPercent: this.outputPercent,
|
||||
totalMinValue: this.totalMinValue === Infinity ? 0 : this.totalMinValue,
|
||||
totalMaxValue: this.totalMaxValue === -Infinity ? 0 : this.totalMaxValue,
|
||||
totalMinSmooth: this.totalMinSmooth,
|
||||
totalMaxSmooth: this.totalMaxSmooth,
|
||||
};
|
||||
}
|
||||
|
||||
// --- Outlier detection ---
|
||||
|
||||
_isOutlier(val) {
|
||||
if (this.storedValues.length < 2) return false;
|
||||
const raw = this.outlierDetection.method;
|
||||
const method = typeof raw === 'string' ? raw.toLowerCase() : raw;
|
||||
switch (method) {
|
||||
case 'zscore': return this._zScore(val);
|
||||
case 'iqr': return this._iqr(val);
|
||||
case 'modifiedzscore': return this._modifiedZScore(val);
|
||||
default:
|
||||
this.logger?.warn?.(`[${this.key || this.type}] Unknown outlier method "${raw}"`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
_zScore(val) {
|
||||
const threshold = this.outlierDetection.threshold || 3;
|
||||
const m = Channel._mean(this.storedValues);
|
||||
const sd = Channel._stdDev(this.storedValues);
|
||||
// Intentionally do NOT early-return on sd===0: a perfectly stable
|
||||
// baseline should make any deviation an outlier (z = Infinity > threshold).
|
||||
const z = sd === 0 ? (val === m ? 0 : Infinity) : (val - m) / sd;
|
||||
return Math.abs(z) > threshold;
|
||||
}
|
||||
|
||||
_iqr(val) {
|
||||
const sorted = [...this.storedValues].sort((a, b) => a - b);
|
||||
const q1 = sorted[Math.floor(sorted.length / 4)];
|
||||
const q3 = sorted[Math.floor(sorted.length * 3 / 4)];
|
||||
const iqr = q3 - q1;
|
||||
return val < q1 - 1.5 * iqr || val > q3 + 1.5 * iqr;
|
||||
}
|
||||
|
||||
_modifiedZScore(val) {
|
||||
const median = Channel._median(this.storedValues);
|
||||
const mad = Channel._median(this.storedValues.map((v) => Math.abs(v - median)));
|
||||
if (mad === 0) return false;
|
||||
const mz = 0.6745 * (val - median) / mad;
|
||||
const threshold = this.outlierDetection.threshold || 3.5;
|
||||
return Math.abs(mz) > threshold;
|
||||
}
|
||||
|
||||
// --- Scaling ---
|
||||
|
||||
_applyScaling(value) {
|
||||
if (this.inputRange <= 0) {
|
||||
this.logger?.warn?.(`[${this.key || this.type}] Input range invalid; falling back to [0,1].`);
|
||||
this.scaling.inputMin = 0;
|
||||
this.scaling.inputMax = 1;
|
||||
this.inputRange = 1;
|
||||
}
|
||||
const clamped = Math.min(Math.max(value, this.scaling.inputMin), this.scaling.inputMax);
|
||||
return this.scaling.absMin + ((clamped - this.scaling.inputMin) * (this.scaling.absMax - this.scaling.absMin)) / this.inputRange;
|
||||
}
|
||||
|
||||
// --- Smoothing ---
|
||||
|
||||
_applySmoothing(value) {
|
||||
this.storedValues.push(value);
|
||||
if (this.storedValues.length > this.smoothing.smoothWindow) {
|
||||
this.storedValues.shift();
|
||||
}
|
||||
|
||||
const raw = this.smoothing.smoothMethod;
|
||||
const method = typeof raw === 'string' ? raw.toLowerCase() : raw;
|
||||
const arr = this.storedValues;
|
||||
|
||||
switch (method) {
|
||||
case 'none': return arr[arr.length - 1];
|
||||
case 'mean': return Channel._mean(arr);
|
||||
case 'min': return Math.min(...arr);
|
||||
case 'max': return Math.max(...arr);
|
||||
case 'sd': return Channel._stdDev(arr);
|
||||
case 'median': return Channel._median(arr);
|
||||
case 'weightedmovingaverage': return Channel._wma(arr);
|
||||
case 'lowpass': return Channel._lowPass(arr);
|
||||
case 'highpass': return Channel._highPass(arr);
|
||||
case 'bandpass': return Channel._bandPass(arr);
|
||||
case 'kalman': return Channel._kalman(arr);
|
||||
case 'savitzkygolay': return Channel._savitzkyGolay(arr);
|
||||
default:
|
||||
this.logger?.error?.(`[${this.key || this.type}] Smoothing method "${raw}" not implemented.`);
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
// --- Output writes ---
|
||||
|
||||
_updateMinMax(value) {
|
||||
if (value < this.totalMinValue) this.totalMinValue = value;
|
||||
if (value > this.totalMaxValue) this.totalMaxValue = value;
|
||||
}
|
||||
|
||||
_updateSmoothMinMax(value) {
|
||||
if (this.totalMinSmooth === 0 && this.totalMaxSmooth === 0) {
|
||||
this.totalMinSmooth = value;
|
||||
this.totalMaxSmooth = value;
|
||||
}
|
||||
if (value < this.totalMinSmooth) this.totalMinSmooth = value;
|
||||
if (value > this.totalMaxSmooth) this.totalMaxSmooth = value;
|
||||
}
|
||||
|
||||
_writeOutput(val) {
|
||||
const clamped = Math.min(Math.max(val, this.scaling.absMin), this.scaling.absMax);
|
||||
const rounded = Math.round(clamped * 100) / 100;
|
||||
|
||||
if (rounded !== this.outputAbs) {
|
||||
this.outputAbs = rounded;
|
||||
this.outputPercent = this._computePercent(clamped);
|
||||
this.measurements
|
||||
?.type(this.type)
|
||||
.variant('measured')
|
||||
.position(this.position)
|
||||
.distance(this.distance)
|
||||
.value(this.outputAbs, Date.now(), this.unit);
|
||||
}
|
||||
}
|
||||
|
||||
_computePercent(value) {
|
||||
const { percentMin, percentMax } = this.interpolation;
|
||||
let pct;
|
||||
if (this.processRange <= 0) {
|
||||
const lo = this.totalMinValue === Infinity ? 0 : this.totalMinValue;
|
||||
const hi = this.totalMaxValue === -Infinity ? 1 : this.totalMaxValue;
|
||||
pct = this._lerp(value, lo, hi, percentMin, percentMax);
|
||||
} else {
|
||||
pct = this._lerp(value, this.scaling.absMin, this.scaling.absMax, percentMin, percentMax);
|
||||
}
|
||||
return Math.round(pct * 100) / 100;
|
||||
}
|
||||
|
||||
_lerp(n, iMin, iMax, oMin, oMax) {
|
||||
if (iMin >= iMax || oMin >= oMax) return n;
|
||||
return oMin + ((n - iMin) * (oMax - oMin)) / (iMax - iMin);
|
||||
}
|
||||
|
||||
// --- Pure math helpers (static so they're reusable) ---
|
||||
|
||||
static _mean(arr) {
|
||||
if (!arr.length) return 0;
|
||||
return arr.reduce((a, b) => a + b, 0) / arr.length;
|
||||
}
|
||||
|
||||
static _stdDev(arr) {
|
||||
if (arr.length <= 1) return 0;
|
||||
const m = Channel._mean(arr);
|
||||
const variance = arr.map((v) => (v - m) ** 2).reduce((a, b) => a + b, 0) / (arr.length - 1);
|
||||
return Math.sqrt(variance);
|
||||
}
|
||||
|
||||
static _median(arr) {
|
||||
const sorted = [...arr].sort((a, b) => a - b);
|
||||
const mid = Math.floor(sorted.length / 2);
|
||||
return sorted.length % 2 !== 0 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2;
|
||||
}
|
||||
|
||||
static _wma(arr) {
|
||||
const weights = arr.map((_, i) => i + 1);
|
||||
const weightedSum = arr.reduce((sum, v, i) => sum + v * weights[i], 0);
|
||||
const weightTotal = weights.reduce((s, w) => s + w, 0);
|
||||
return weightedSum / weightTotal;
|
||||
}
|
||||
|
||||
static _lowPass(arr) {
|
||||
const alpha = 0.2;
|
||||
let out = arr[0];
|
||||
for (let i = 1; i < arr.length; i++) out = alpha * arr[i] + (1 - alpha) * out;
|
||||
return out;
|
||||
}
|
||||
|
||||
static _highPass(arr) {
|
||||
const alpha = 0.8;
|
||||
const filtered = [arr[0]];
|
||||
for (let i = 1; i < arr.length; i++) {
|
||||
filtered[i] = alpha * (filtered[i - 1] + arr[i] - arr[i - 1]);
|
||||
}
|
||||
return filtered[filtered.length - 1];
|
||||
}
|
||||
|
||||
static _bandPass(arr) {
|
||||
const lp = Channel._lowPass(arr);
|
||||
const hp = Channel._highPass(arr);
|
||||
return arr.map((v) => lp + hp - v).pop();
|
||||
}
|
||||
|
||||
static _kalman(arr) {
|
||||
let estimate = arr[0];
|
||||
const measurementNoise = 1;
|
||||
const processNoise = 0.1;
|
||||
const gain = processNoise / (processNoise + measurementNoise);
|
||||
for (let i = 1; i < arr.length; i++) estimate = estimate + gain * (arr[i] - estimate);
|
||||
return estimate;
|
||||
}
|
||||
|
||||
static _savitzkyGolay(arr) {
|
||||
const coeffs = [-3, 12, 17, 12, -3];
|
||||
const norm = coeffs.reduce((a, b) => a + b, 0);
|
||||
if (arr.length < coeffs.length) return arr[arr.length - 1];
|
||||
let s = 0;
|
||||
for (let i = 0; i < coeffs.length; i++) {
|
||||
s += arr[arr.length - coeffs.length + i] * coeffs[i];
|
||||
}
|
||||
return s / norm;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Channel;
|
||||
Reference in New Issue
Block a user