// FlowAggregator — owns the predicted-volume integrator + net-flow selection // + remaining-time projection for the pumping-station basin. // // Pure domain. Takes a context bag with the live MeasurementContainer, the // basin geometry, and the merged config; mutates measurements in place and // keeps a tiny piece of integrator state internally. // // Ports from basin-docs: // - Predicted-volume integrator clamped to [dryRunSafetyVol, maxVolAtOverflow] // with hard physical floor at 0 (predicted volume can never go negative). // - Synthetic spill flow at position 'overflow' so net-flow balance // reads ~0 while pinned at overflow. // - Cumulative overflowVolume + underflowVolume streams for compliance / // diagnostic reporting via InfluxDB. const { interpolation } = require('generalFunctions'); const DEFAULT_FLOW_THRESHOLD = 1e-4; const DEFAULT_FLOW_VARIANTS = ['measured', 'predicted']; const DEFAULT_LEVEL_VARIANTS = ['measured', 'predicted']; const DEFAULT_FLOW_POSITIONS = { inflow: ['in', 'upstream'], outflow: ['out', 'downstream'], }; class FlowAggregator { constructor(ctx = {}) { if (!ctx.measurements) throw new Error('FlowAggregator: ctx.measurements is required'); if (!ctx.basin) throw new Error('FlowAggregator: ctx.basin is required'); this.measurements = ctx.measurements; this.basin = ctx.basin; this.config = ctx.config || {}; this.logger = ctx.logger || null; this._interp = ctx.interpolation || new interpolation(); this.flowVariants = ctx.flowVariants || DEFAULT_FLOW_VARIANTS; this.levelVariants = ctx.levelVariants || DEFAULT_LEVEL_VARIANTS; this.flowPositions = ctx.flowPositions || DEFAULT_FLOW_POSITIONS; const cfgThresh = Number(this.config?.general?.flowThreshold); this.flowThreshold = Number.isFinite(ctx.flowThreshold) ? ctx.flowThreshold : (Number.isFinite(cfgThresh) ? cfgThresh : DEFAULT_FLOW_THRESHOLD); // Optional callback so the host can supply derived safety thresholds // without us re-importing the validator. Returns { dryRunSafetyVol, ... }. this._computeSafetyPoints = ctx.computeSafetyPoints || (() => ({ dryRunSafetyVol: 0 })); this._predictedFlowState = null; this._lastNetFlow = { value: 0, source: null, direction: 'steady' }; this._lastRemaining = { seconds: null, source: null }; this._lastLevelRateNetFlow = null; } resetState(timestamp = Date.now()) { this._predictedFlowState = { inflow: 0, outflow: 0, lastTimestamp: timestamp }; } update() { const flowUnit = 'm3/s'; const now = Date.now(); // Synthetic spill flow lives at its OWN position ('overflow') — // not as a child of 'out'. That keeps it out of the operational // outflow sum here so no self-subtraction is needed. const inflow = this.measurements.sum('flow', 'predicted', this.flowPositions.inflow, flowUnit) || 0; const outflowReal = this.measurements.sum('flow', 'predicted', this.flowPositions.outflow, flowUnit) || 0; if (!this._predictedFlowState) this._predictedFlowState = { inflow, outflow: outflowReal, lastTimestamp: now }; const tPrev = this._predictedFlowState.lastTimestamp ?? now; const dt = Math.max((now - tPrev) / 1000, 0); const dV = dt > 0 ? (inflow - outflowReal) * dt : 0; const currentVol = this.measurements .type('volume').variant('predicted').position('atequipment').getCurrentValue('m3') ?? this.basin.minVol ?? 0; const writeTs = tPrev + dt * 1000; // Bounds. // Upper (hard physical): maxVolAtOverflow — past this the basin // spills; predicted level pins at overflowLevel and the excess // becomes cumulative overflowVolume + synthetic spill flow. // Lower (operational): dryRunSafetyVol — clamps ON TRANSITION // from above so the integrator can't drop into the unphysical // band. A basin seeded BELOW it is left alone (startup from empty). // Lower (hard physical): 0 — basin cannot hold negative water. // Any negative excess is tracked as underflowVolume (diagnostic). const safety = this._computeSafetyPoints(); const upperClamp = this.basin.maxVolAtOverflow; const lowerClamp = Math.max(0, safety.dryRunSafetyVol ?? 0); const proposedVolume = currentVol + dV; let nextVolume = proposedVolume; let overflowIncrement = 0; let underflowIncrement = 0; if (proposedVolume > upperClamp) { overflowIncrement = proposedVolume - upperClamp; nextVolume = upperClamp; } else if (proposedVolume < lowerClamp && currentVol >= lowerClamp) { nextVolume = lowerClamp; } if (nextVolume < 0) { underflowIncrement = -nextVolume; nextVolume = 0; } // Synthetic spill flow at position 'overflow'. let spillRate = 0; if (nextVolume >= upperClamp - 1e-9 && (inflow - outflowReal) > this.flowThreshold) { spillRate = inflow - outflowReal; } this.measurements .type('flow').variant('predicted').position('overflow') .value(spillRate, writeTs, 'm3/s').unit('m3/s'); if (overflowIncrement > 0) { const prev = this.measurements .type('overflowVolume').variant('predicted').position('atequipment').getCurrentValue('m3') ?? 0; this.measurements .type('overflowVolume').variant('predicted').position('atequipment') .value(prev + overflowIncrement, writeTs, 'm3').unit('m3'); } if (underflowIncrement > 0) { const prev = this.measurements .type('underflowVolume').variant('predicted').position('atequipment').getCurrentValue('m3') ?? 0; this.measurements .type('underflowVolume').variant('predicted').position('atequipment') .value(prev + underflowIncrement, writeTs, 'm3').unit('m3'); } this.measurements.type('volume').variant('predicted').position('atequipment') .value(nextVolume, writeTs, 'm3').unit('m3'); const surfaceArea = this.basin.surfaceArea; const nextLevel = surfaceArea > 0 ? Math.max(nextVolume, 0) / surfaceArea : 0; this.measurements.type('level').variant('predicted').position('atequipment') .value(nextLevel, writeTs, 'm').unit('m'); const percent = this._interp.interpolate_lin_single_point( nextVolume, this.basin.minVol, this.basin.maxVolAtOverflow, 0, 100 ); this.measurements.type('volumePercent').variant('predicted').position('atequipment') .value(percent, writeTs, '%'); this._predictedFlowState = { inflow, outflow: outflowReal, lastTimestamp: writeTs }; } selectBestNetFlow() { const type = 'flow'; const unit = this.measurements.getUnit(type) || 'm3/s'; for (const variant of this.flowVariants) { const bucket = this.measurements.measurements?.[type]?.[variant]; if (!bucket || Object.keys(bucket).length === 0) continue; const inflow = this.measurements.sum(type, variant, this.flowPositions.inflow, unit) || 0; const outflowReal = this.measurements.sum(type, variant, this.flowPositions.outflow, unit) || 0; // Fold synthetic spill (position 'overflow') into the outflow side // so net-flow balance reads ~0 while pinned at the overflow level. const spill = this.measurements.sum(type, variant, ['overflow'], unit) || 0; const outflow = outflowReal + spill; if (Math.abs(inflow) < this.flowThreshold && Math.abs(outflow) < this.flowThreshold) continue; const net = inflow - outflow; this.measurements.type('netFlowRate').variant(variant).position('atequipment') .value(net, Date.now(), unit); const result = { value: net, source: variant, direction: this.deriveDirection(net) }; this._lastNetFlow = result; return result; } for (const variant of this.levelVariants) { const rate = this._levelRate(variant); if (!Number.isFinite(rate)) continue; const lvl = this.measurements.type('level').variant(variant).position('atequipment').getCurrentValue('m'); const pinnedAtOverflow = Number.isFinite(lvl) && Number.isFinite(this.basin.overflowLevel) && lvl >= this.basin.overflowLevel - 1e-9; const rateNearZero = Math.abs(rate) < 1e-9; let netFlow = rate * this.basin.surfaceArea; // Pinned at overflow — dL/dt collapses to 0 but flow IS still // moving (in → spill). Hold the last known non-zero net-flow. if (pinnedAtOverflow && rateNearZero && Number.isFinite(this._lastLevelRateNetFlow)) { netFlow = this._lastLevelRateNetFlow; } else if (!rateNearZero) { this._lastLevelRateNetFlow = netFlow; } const result = { value: netFlow, source: `level:${variant}`, direction: this.deriveDirection(netFlow) }; this._lastNetFlow = result; return result; } if (this.logger) this.logger.warn('No usable measurements to compute net flow; assuming steady.'); const result = { value: 0, source: null, direction: 'steady' }; this._lastNetFlow = result; return result; } computeRemainingTime(netFlow) { if (!netFlow || Math.abs(netFlow.value) < this.flowThreshold) { this._lastRemaining = { seconds: null, source: null }; return this._lastRemaining; } const { overflowLevel, outflowLevel, surfaceArea } = this.basin; if (!Number.isFinite(surfaceArea) || surfaceArea <= 0) { this._lastRemaining = { seconds: null, source: null }; return this._lastRemaining; } for (const variant of this.levelVariants) { const lvl = this.measurements.type('level').variant(variant).position('atequipment').getCurrentValue('m'); if (!Number.isFinite(lvl)) continue; const remainingHeight = netFlow.value > 0 ? Math.max(overflowLevel - lvl, 0) : Math.max(lvl - outflowLevel, 0); const seconds = (remainingHeight * surfaceArea) / Math.abs(netFlow.value); if (!Number.isFinite(seconds)) continue; this._lastRemaining = { seconds, source: `${netFlow.source}/${variant}` }; return this._lastRemaining; } this._lastRemaining = { seconds: null, source: netFlow.source }; return this._lastRemaining; } deriveDirection(netFlow) { if (netFlow > this.flowThreshold) return 'filling'; if (netFlow < -this.flowThreshold) return 'draining'; return 'steady'; } tick() { this.update(); const netFlow = this.selectBestNetFlow(); const remaining = this.computeRemainingTime(netFlow); return { netFlow, remaining }; } snapshot() { return { direction: this._lastNetFlow.direction, netFlow: this._lastNetFlow.value, flowSource: this._lastNetFlow.source, secondsRemaining: this._lastRemaining.seconds, }; } _levelRate(variant) { const m = this.measurements.type('level').variant(variant).position('atequipment').get(); if (!m || !m.values || m.values.length < 2) return null; const current = m.getLaggedSample?.(0); const previous = m.getLaggedSample?.(1); if (!current || !previous || previous.timestamp == null) return null; const dt = (current.timestamp - previous.timestamp) / 1000; if (!Number.isFinite(dt) || dt <= 0) return null; return (current.value - previous.value) / dt; } } module.exports = FlowAggregator;