/** * Channel — a single scalar measurement pipeline. * * A Channel owns one rolling window of stored values, one smoothing method, * one outlier detector, one scaling contract, and one MeasurementContainer * slot. It exposes `update(value)` as the single entry point. * * The measurement node composes Channels: * - analog mode -> exactly one Channel built from the flat top-level config * - digital mode -> one Channel per `config.channels[i]` entry, keyed by * `channel.key` (the field inside msg.payload that feeds it) * * This file is pure domain logic. It must never reach into Node-RED APIs. */ class Channel { /** * @param {object} opts * @param {string} opts.key - identifier inside an incoming object payload (digital) or null (analog) * @param {string} opts.type - MeasurementContainer axis (e.g. 'pressure') * @param {string} opts.position - 'upstream' | 'atEquipment' | 'downstream' * @param {string} opts.unit - output unit label (e.g. 'mbar') * @param {number|null} opts.distance - physical offset from parent equipment * @param {object} opts.scaling - {enabled, inputMin, inputMax, absMin, absMax, offset} * @param {object} opts.smoothing - {smoothWindow, smoothMethod} * @param {object} [opts.outlierDetection] - {enabled, method, threshold} * @param {object} opts.interpolation - {percentMin, percentMax} * @param {object} opts.measurements - the MeasurementContainer to publish into * @param {object} opts.logger - generalFunctions logger instance */ constructor(opts) { this.key = opts.key || null; this.type = opts.type; this.position = opts.position; this.unit = opts.unit; this.distance = opts.distance ?? null; this.scaling = { ...opts.scaling }; this.smoothing = { ...opts.smoothing }; this.outlierDetection = opts.outlierDetection ? { ...opts.outlierDetection } : { enabled: false, method: 'zscore', threshold: 3 }; this.interpolation = { ...(opts.interpolation || { percentMin: 0, percentMax: 100 }) }; this.measurements = opts.measurements; this.logger = opts.logger; this.storedValues = []; this.inputValue = 0; this.outputAbs = 0; this.outputPercent = 0; this.totalMinValue = Infinity; this.totalMaxValue = -Infinity; this.totalMinSmooth = 0; this.totalMaxSmooth = 0; this.inputRange = Math.abs(this.scaling.inputMax - this.scaling.inputMin); this.processRange = Math.abs(this.scaling.absMax - this.scaling.absMin); } // --- Public entry point --- /** * Push a new scalar value through the full pipeline: * outlier -> offset -> scaling -> smoothing -> min/max -> emit * @param {number} value * @returns {boolean} true if the value advanced the pipeline (not rejected as outlier) */ update(value) { this.inputValue = value; if (this.outlierDetection.enabled && this._isOutlier(value)) { this.logger?.warn?.(`[${this.key || this.type}] Outlier detected. Ignoring value=${value}`); return false; } let v = value + (this.scaling.offset || 0); this._updateMinMax(v); if (this.scaling.enabled) { v = this._applyScaling(v); } const smoothed = this._applySmoothing(v); this._updateSmoothMinMax(smoothed); this._writeOutput(smoothed); return true; } getOutput() { return { key: this.key, type: this.type, position: this.position, unit: this.unit, mAbs: this.outputAbs, mPercent: this.outputPercent, totalMinValue: this.totalMinValue === Infinity ? 0 : this.totalMinValue, totalMaxValue: this.totalMaxValue === -Infinity ? 0 : this.totalMaxValue, totalMinSmooth: this.totalMinSmooth, totalMaxSmooth: this.totalMaxSmooth, }; } // --- Outlier detection --- _isOutlier(val) { if (this.storedValues.length < 2) return false; const raw = this.outlierDetection.method; const method = typeof raw === 'string' ? raw.toLowerCase() : raw; switch (method) { case 'zscore': return this._zScore(val); case 'iqr': return this._iqr(val); case 'modifiedzscore': return this._modifiedZScore(val); default: this.logger?.warn?.(`[${this.key || this.type}] Unknown outlier method "${raw}"`); return false; } } _zScore(val) { const threshold = this.outlierDetection.threshold || 3; const m = Channel._mean(this.storedValues); const sd = Channel._stdDev(this.storedValues); // Intentionally do NOT early-return on sd===0: a perfectly stable // baseline should make any deviation an outlier (z = Infinity > threshold). const z = sd === 0 ? (val === m ? 0 : Infinity) : (val - m) / sd; return Math.abs(z) > threshold; } _iqr(val) { const sorted = [...this.storedValues].sort((a, b) => a - b); const q1 = sorted[Math.floor(sorted.length / 4)]; const q3 = sorted[Math.floor(sorted.length * 3 / 4)]; const iqr = q3 - q1; return val < q1 - 1.5 * iqr || val > q3 + 1.5 * iqr; } _modifiedZScore(val) { const median = Channel._median(this.storedValues); const mad = Channel._median(this.storedValues.map((v) => Math.abs(v - median))); if (mad === 0) return false; const mz = 0.6745 * (val - median) / mad; const threshold = this.outlierDetection.threshold || 3.5; return Math.abs(mz) > threshold; } // --- Scaling --- _applyScaling(value) { if (this.inputRange <= 0) { this.logger?.warn?.(`[${this.key || this.type}] Input range invalid; falling back to [0,1].`); this.scaling.inputMin = 0; this.scaling.inputMax = 1; this.inputRange = 1; } const clamped = Math.min(Math.max(value, this.scaling.inputMin), this.scaling.inputMax); return this.scaling.absMin + ((clamped - this.scaling.inputMin) * (this.scaling.absMax - this.scaling.absMin)) / this.inputRange; } // --- Smoothing --- _applySmoothing(value) { this.storedValues.push(value); if (this.storedValues.length > this.smoothing.smoothWindow) { this.storedValues.shift(); } const raw = this.smoothing.smoothMethod; const method = typeof raw === 'string' ? raw.toLowerCase() : raw; const arr = this.storedValues; switch (method) { case 'none': return arr[arr.length - 1]; case 'mean': return Channel._mean(arr); case 'min': return Math.min(...arr); case 'max': return Math.max(...arr); case 'sd': return Channel._stdDev(arr); case 'median': return Channel._median(arr); case 'weightedmovingaverage': return Channel._wma(arr); case 'lowpass': return Channel._lowPass(arr); case 'highpass': return Channel._highPass(arr); case 'bandpass': return Channel._bandPass(arr); case 'kalman': return Channel._kalman(arr); case 'savitzkygolay': return Channel._savitzkyGolay(arr); default: this.logger?.error?.(`[${this.key || this.type}] Smoothing method "${raw}" not implemented.`); return value; } } // --- Output writes --- _updateMinMax(value) { if (value < this.totalMinValue) this.totalMinValue = value; if (value > this.totalMaxValue) this.totalMaxValue = value; } _updateSmoothMinMax(value) { if (this.totalMinSmooth === 0 && this.totalMaxSmooth === 0) { this.totalMinSmooth = value; this.totalMaxSmooth = value; } if (value < this.totalMinSmooth) this.totalMinSmooth = value; if (value > this.totalMaxSmooth) this.totalMaxSmooth = value; } _writeOutput(val) { const clamped = Math.min(Math.max(val, this.scaling.absMin), this.scaling.absMax); const rounded = Math.round(clamped * 100) / 100; if (rounded !== this.outputAbs) { this.outputAbs = rounded; this.outputPercent = this._computePercent(clamped); this.measurements ?.type(this.type) .variant('measured') .position(this.position) .distance(this.distance) .value(this.outputAbs, Date.now(), this.unit); } } _computePercent(value) { const { percentMin, percentMax } = this.interpolation; let pct; if (this.processRange <= 0) { const lo = this.totalMinValue === Infinity ? 0 : this.totalMinValue; const hi = this.totalMaxValue === -Infinity ? 1 : this.totalMaxValue; pct = this._lerp(value, lo, hi, percentMin, percentMax); } else { pct = this._lerp(value, this.scaling.absMin, this.scaling.absMax, percentMin, percentMax); } return Math.round(pct * 100) / 100; } _lerp(n, iMin, iMax, oMin, oMax) { if (iMin >= iMax || oMin >= oMax) return n; return oMin + ((n - iMin) * (oMax - oMin)) / (iMax - iMin); } // --- Pure math helpers (static so they're reusable) --- static _mean(arr) { if (!arr.length) return 0; return arr.reduce((a, b) => a + b, 0) / arr.length; } static _stdDev(arr) { if (arr.length <= 1) return 0; const m = Channel._mean(arr); const variance = arr.map((v) => (v - m) ** 2).reduce((a, b) => a + b, 0) / (arr.length - 1); return Math.sqrt(variance); } static _median(arr) { const sorted = [...arr].sort((a, b) => a - b); const mid = Math.floor(sorted.length / 2); return sorted.length % 2 !== 0 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2; } static _wma(arr) { const weights = arr.map((_, i) => i + 1); const weightedSum = arr.reduce((sum, v, i) => sum + v * weights[i], 0); const weightTotal = weights.reduce((s, w) => s + w, 0); return weightedSum / weightTotal; } static _lowPass(arr) { const alpha = 0.2; let out = arr[0]; for (let i = 1; i < arr.length; i++) out = alpha * arr[i] + (1 - alpha) * out; return out; } static _highPass(arr) { const alpha = 0.8; const filtered = [arr[0]]; for (let i = 1; i < arr.length; i++) { filtered[i] = alpha * (filtered[i - 1] + arr[i] - arr[i - 1]); } return filtered[filtered.length - 1]; } static _bandPass(arr) { const lp = Channel._lowPass(arr); const hp = Channel._highPass(arr); return arr.map((v) => lp + hp - v).pop(); } static _kalman(arr) { let estimate = arr[0]; const measurementNoise = 1; const processNoise = 0.1; const gain = processNoise / (processNoise + measurementNoise); for (let i = 1; i < arr.length; i++) estimate = estimate + gain * (arr[i] - estimate); return estimate; } static _savitzkyGolay(arr) { const coeffs = [-3, 12, 17, 12, -3]; const norm = coeffs.reduce((a, b) => a + b, 0); if (arr.length < coeffs.length) return arr[arr.length - 1]; let s = 0; for (let i = 0; i < coeffs.length; i++) { s += arr[arr.length - coeffs.length + i] * coeffs[i]; } return s / norm; } } module.exports = Channel;