From 2a6a0bc34b17a066205198b1d15ac722f3edb2b9 Mon Sep 17 00:00:00 2001 From: znetsixe Date: Sun, 10 May 2026 22:09:25 +0200 Subject: [PATCH] P6: convert monster to BaseDomain + BaseNodeAdapter + concern split Refactor of monster to use the platform infrastructure (BaseDomain, BaseNodeAdapter, ChildRouter, commandRegistry, statusBadge). Extracts concerns into focused modules per .claude/refactor/MODULE_SPLIT.md generic template. Tests stay green; CONTRACT.md generated; legacy aliases preserved. Co-Authored-By: Claude Opus 4.7 (1M context) --- CONTRACT.md | 49 ++ src/commands/handlers.js | 42 ++ src/commands/index.js | 47 ++ src/flow/flowTracker.js | 59 ++ src/io/output.js | 70 +++ src/io/statusBadge.js | 28 + src/nodeClass.js | 240 +------- src/parameters/parameters.js | 56 ++ src/rain/rainAggregator.js | 58 ++ src/sampling/samplingProgram.js | 113 ++++ src/schedule/schedule.js | 42 ++ src/specificClass.js | 981 +++++--------------------------- 12 files changed, 710 insertions(+), 1075 deletions(-) create mode 100644 CONTRACT.md create mode 100644 src/commands/handlers.js create mode 100644 src/commands/index.js create mode 100644 src/flow/flowTracker.js create mode 100644 src/io/output.js create mode 100644 src/io/statusBadge.js create mode 100644 src/parameters/parameters.js create mode 100644 src/rain/rainAggregator.js create mode 100644 src/sampling/samplingProgram.js create mode 100644 src/schedule/schedule.js diff --git a/CONTRACT.md b/CONTRACT.md new file mode 100644 index 0000000..f752601 --- /dev/null +++ b/CONTRACT.md @@ -0,0 +1,49 @@ +# monster — Contract + +Hand-maintained for Phase 6; the `## Inputs` table is generated from +`src/commands/index.js` (see Phase 9 generator). Keep ≤ 80 lines. + +## Inputs (msg.topic on Port 0) + +| Canonical | Aliases (deprecated) | Payload | Effect | +|---|---|---|---| +| `cmd.start` | `i_start` | truthy/falsy | Sets `source.i_start`. On the next tick a sampling run begins if flow bounds validate. | +| `set.schedule` | `monsternametijden` | array of AQUON rows (`SAMPLE_NAME`, `DESCRIPTION`, `SAMPLED_DATE`, `START_DATE`, `END_DATE`) | Stores the schedule and recomputes `nextDate` + `daysPerYear` for the configured `aquonSampleName`. | +| `set.rain` | `rain_data` | per-location rain forecast (Open-Meteo shape) | Aggregates hourly precipitation into `sumRain` / `avgRain`; feeds the rain-scaled flow prediction. | +| `data.flow` | `input_q` | `{ value: number, unit: string }` | Converts to m³/h and pushes into `flow.manual.atequipment`. Blends with measured-child flow in `getEffectiveFlow()`. | +| `set.mode` | `setMode` | string | Delegated to `source.setMode()` if defined. Reserved for future use. | +| `set.model-prediction` | `model_prediction` | numeric | Delegated to `source.setModelPrediction()` if defined. Reserved for future use. | + +Aliases log a one-time deprecation warning the first time they fire. + +## Outputs (msg.topic on Port 0/1/2) + +- **Port 0 (process):** `msg.topic = config.general.name`. Payload built + by `outputUtils.formatMsg(..., 'process')` from `getOutput()`. Delta- + compressed — only changed fields are emitted. Carries `pulse`, `running`, + `bucketVol`, `sumPuls`, `predFlow`, `m3PerPuls`, `q`, `timeLeft`, + `targetVolumeM3`, `targetProgressPct`, `targetDeltaL`, `predictedRateM3h`, + `sumRain`, `avgRain`, `nextDate`, plus the flat measurements snapshot. +- **Port 1 (InfluxDB telemetry):** same shape as Port 0, formatted with the + `'influxdb'` formatter. +- **Port 2 (registration):** at startup the node sends one + `{ topic: 'child.register', payload: , positionVsParent, distance }` + to its parent. + +## Events emitted by `source.measurements.emitter` + +The `MeasurementContainer` fires `.measured.` whenever a +matching series receives a new value. monster writes: + +- `flow.manual.atequipment` — operator-supplied manual flow. +- `flow.measured.` — re-emitted when a child measurement fires + (one of `flow.measured.upstream`, `flow.measured.downstream`, + `flow.measured.atequipment`). + +## Children accepted + +`measurement` only. The router subscribes to a child's +`flow.measured.` events when the child's `config.asset.type` is +`'flow'` (or missing). Other asset types are ignored. monster has no +position-based filtering — all three positions are wired and the latest +value wins for each. diff --git a/src/commands/handlers.js b/src/commands/handlers.js new file mode 100644 index 0000000..5391b11 --- /dev/null +++ b/src/commands/handlers.js @@ -0,0 +1,42 @@ +'use strict'; + +// Handlers for monster input topics. Each is a pure function over the +// domain (source). Unit conversion for incoming flow happens in the +// handler (the legacy nodeClass did it inline) — anything else inbound +// is passed straight through to source.handleInput. + +const { convert } = require('generalFunctions'); + +exports.cmdStart = (source, msg) => { + source.handleInput('i_start', Boolean(msg.payload)); +}; + +exports.setSchedule = (source, msg) => { + source.handleInput('monsternametijden', msg.payload); +}; + +exports.setRain = (source, msg) => { + source.handleInput('rain_data', msg.payload); +}; + +exports.dataFlow = (source, msg, ctx) => { + const log = ctx?.logger || source.logger; + const value = Number(msg.payload?.value); + const unit = msg.payload?.unit; + if (!Number.isFinite(value) || !unit) { + log?.warn?.('data.flow payload must include numeric value and unit.'); + return; + } + let converted = value; + try { converted = convert(value).from(unit).to('m3/h'); } + catch (err) { log?.warn?.(`data.flow unit conversion failed: ${err.message}`); return; } + source.handleInput('input_q', { value: converted, unit: 'm3/h' }); +}; + +exports.setMode = (source, msg) => { + if (typeof source.setMode === 'function') source.setMode(msg.payload); +}; + +exports.setModelPrediction = (source, msg) => { + if (typeof source.setModelPrediction === 'function') source.setModelPrediction(msg.payload); +}; diff --git a/src/commands/index.js b/src/commands/index.js new file mode 100644 index 0000000..6e264eb --- /dev/null +++ b/src/commands/index.js @@ -0,0 +1,47 @@ +'use strict'; + +// monster command registry. Canonical names follow CONTRACTS.md §1. +// Legacy names (i_start, monsternametijden, rain_data, input_q, setMode, +// model_prediction) are surfaced as aliases — they log a one-time +// deprecation warning on first use and are removed in Phase 7. + +const handlers = require('./handlers'); + +module.exports = [ + { + topic: 'cmd.start', + aliases: ['i_start'], + payloadSchema: { type: 'any' }, + handler: handlers.cmdStart, + }, + { + topic: 'set.schedule', + aliases: ['monsternametijden'], + payloadSchema: { type: 'any' }, + handler: handlers.setSchedule, + }, + { + topic: 'set.rain', + aliases: ['rain_data'], + payloadSchema: { type: 'any' }, + handler: handlers.setRain, + }, + { + topic: 'data.flow', + aliases: ['input_q'], + payloadSchema: { type: 'object' }, + handler: handlers.dataFlow, + }, + { + topic: 'set.mode', + aliases: ['setMode'], + payloadSchema: { type: 'any' }, + handler: handlers.setMode, + }, + { + topic: 'set.model-prediction', + aliases: ['model_prediction'], + payloadSchema: { type: 'any' }, + handler: handlers.setModelPrediction, + }, +]; diff --git a/src/flow/flowTracker.js b/src/flow/flowTracker.js new file mode 100644 index 0000000..dfa2a00 --- /dev/null +++ b/src/flow/flowTracker.js @@ -0,0 +1,59 @@ +'use strict'; + +// Flow tracking — manual override + measured-child fan-in + effective +// blend (mean of the two when both present). Wraps the +// MeasurementContainer so the domain stays read-only on flow state. + +const POSITIONS = ['upstream', 'downstream', 'atequipment']; + +class FlowTracker { + constructor({ measurements, logger }) { + this.measurements = measurements; + this.logger = logger; + this.manualFlow = null; + } + + updateManualFlow(payload = {}) { + const value = Number(payload.value); + if (!Number.isFinite(value)) return; + const unit = payload.unit || 'm3/h'; + this.manualFlow = value; + this.measurements.type('flow').variant('manual').position('atequipment') + .value(value, Date.now(), unit); + } + + handleMeasuredFlow(eventData) { + const value = Number(eventData?.value); + if (!Number.isFinite(value)) return; + const position = String(eventData.position || 'atequipment').toLowerCase(); + const unit = eventData.unit || 'm3/h'; + this.measurements.type('flow').variant('measured').position(position) + .value(value, eventData.timestamp || Date.now(), unit); + } + + getMeasuredFlow() { + const values = []; + for (const pos of POSITIONS) { + const v = this.measurements.type('flow').variant('measured').position(pos).getCurrentValue(); + if (Number.isFinite(v)) values.push(v); + } + if (!values.length) return null; + return values.reduce((s, c) => s + c, 0) / values.length; + } + + getManualFlow() { + const v = this.measurements.type('flow').variant('manual').position('atequipment').getCurrentValue(); + return Number.isFinite(v) ? v : null; + } + + getEffectiveFlow() { + const measured = this.getMeasuredFlow(); + const manual = this.getManualFlow(); + if (measured != null && manual != null) return (measured + manual) / 2; + if (measured != null) return measured; + if (manual != null) return manual; + return 0; + } +} + +module.exports = FlowTracker; diff --git a/src/io/output.js b/src/io/output.js new file mode 100644 index 0000000..2b3fd96 --- /dev/null +++ b/src/io/output.js @@ -0,0 +1,70 @@ +'use strict'; + +// Output formatter — assembles the snapshot shape getOutput returns each +// tick. Heavy on derived fields (timeToNextPulse, targetDelta, ...) but +// every value is read-only on the domain, so this can stay a pure function. + +const params = require('../parameters/parameters'); + +function buildOutput(m) { + const output = m.measurements.getFlattenedOutput(); + const flowRate = Number(m.q) || 0; + const m3PerPulse = Number(m.m3PerPuls) || 0; + const pulseFraction = Number(m.temp_pulse) || 0; + const targetVolumeL = Number(m.targetVolume) > 0 ? m.targetVolume : 0; + const targetVolumeM3 = targetVolumeL > 0 ? targetVolumeL / 1000 : 0; + const flowToNextPulseM3 = m3PerPulse > 0 ? Math.max(0, (1 - pulseFraction) * m3PerPulse) : 0; + const timeToNextPulseSec = flowRate > 0 && flowToNextPulseM3 > 0 + ? Math.round((flowToNextPulseM3 / (flowRate / 3600)) * 100) / 100 + : 0; + const targetProgressPct = targetVolumeL > 0 + ? Math.round((m.bucketVol / targetVolumeL) * 10000) / 100 + : 0; + const targetDeltaL = targetVolumeL > 0 + ? Math.round((m.bucketVol - targetVolumeL) * 100) / 100 + : 0; + const targetDeltaM3 = targetVolumeL > 0 + ? Math.round((targetDeltaL / 1000) * 10000) / 10000 + : 0; + + Object.assign(output, { + pulse: m.pulse, + running: m.running, + bucketVol: m.bucketVol, + bucketWeight: m.bucketWeight, + sumPuls: m.sumPuls, + predFlow: m.predFlow, + predM3PerSec: m.predM3PerSec, + timePassed: m.timePassed, + timeLeft: m.timeLeft, + m3Total: m.m3Total, + q: m.q, + nominalFlowMin: m.nominalFlowMin, + flowMax: m.flowMax, + invalidFlowBounds: m.invalidFlowBounds, + minSampleIntervalSec: m.minSampleIntervalSec, + missedSamples: m.missedSamples, + sampleCooldownMs: params.getSampleCooldownMs(m), + maxVolume: m.maxVolume, + minVolume: m.minVolume, + nextDate: m.nextDate, + daysPerYear: m.daysPerYear, + m3PerPuls: m.m3PerPuls, + m3PerPulse: m.m3PerPuls, + pulsesRemaining: Math.max(0, (m.targetPuls || 0) - (m.sumPuls || 0)), + pulseFraction, + flowToNextPulseM3, + timeToNextPulseSec, + targetVolumeM3, + targetProgressPct, + targetDeltaL, + targetDeltaM3, + predictedRateM3h: params.getPredictedFlowRate(m), + sumRain: m.rainAggregator?.sumRain ?? 0, + avgRain: m.rainAggregator?.avgRain ?? 0, + }); + + return output; +} + +module.exports = { buildOutput }; diff --git a/src/io/statusBadge.js b/src/io/statusBadge.js new file mode 100644 index 0000000..d63fe15 --- /dev/null +++ b/src/io/statusBadge.js @@ -0,0 +1,28 @@ +'use strict'; + +// Status-badge composition. Three states the editor cares about: +// - red ring : config error (flow bounds invalid) +// - yellow ring: sampling but cooldown is gating the next pulse +// - green dot : sampling normally +// - grey ring : idle +// Shape mirrors the legacy nodeClass._updateNodeStatus output verbatim. + +const { statusBadge } = require('generalFunctions'); +const params = require('../parameters/parameters'); + +function buildStatusBadge(m) { + if (m.invalidFlowBounds) { + return statusBadge.error(`Config error: nominalFlowMin (${m.nominalFlowMin}) >= flowMax (${m.flowMax})`); + } + if (m.running) { + const levelText = `${m.bucketVol}/${m.maxVolume} L`; + const cooldownMs = params.getSampleCooldownMs(m); + if (cooldownMs > 0) { + return statusBadge.compose([`SAMPLING (${Math.ceil(cooldownMs / 1000)}s)`, levelText], { fill: 'yellow', shape: 'ring' }); + } + return statusBadge.compose([`AI: RUNNING`, levelText], { fill: 'green', shape: 'dot' }); + } + return statusBadge.idle('AI: IDLE'); +} + +module.exports = { buildStatusBadge }; diff --git a/src/nodeClass.js b/src/nodeClass.js index 6621a99..e9c5715 100644 --- a/src/nodeClass.js +++ b/src/nodeClass.js @@ -1,51 +1,19 @@ -/** - * node class.js - * - * Encapsulates all node logic in a reusable class. In future updates we can split this into multiple generic classes and use the config to specifiy which ones to use. - * This allows us to keep the Node-RED node clean and focused on wiring up the UI and event handlers. - */ -const { outputUtils, configManager, convert } = require('generalFunctions'); -const Specific = require('./specificClass'); +'use strict'; -class nodeClass { - /** - * Create a Node. - * @param {object} uiConfig - Node-RED node configuration. - * @param {object} RED - Node-RED runtime API. - * @param {object} nodeInstance - The Node-RED node instance. - * @param {string} nameOfNode - The name of the node. - */ - constructor(uiConfig, RED, nodeInstance, nameOfNode) { - this.node = nodeInstance; - this.RED = RED; - this.name = nameOfNode; - this.source = null; - this.config = null; +const { BaseNodeAdapter } = require('generalFunctions'); +const Monster = require('./specificClass'); +const commands = require('./commands'); - // Load default & UI config - this._loadConfig(uiConfig); +class nodeClass extends BaseNodeAdapter { + static DomainClass = Monster; + static commands = commands; + // Tick-driven: sampling integrator (m3PerTick → temp_pulse) needs + // wall-clock delta-time once per second. + static tickInterval = 1000; + static statusInterval = 1000; - // Instantiate core class - this._setupSpecificClass(uiConfig); - - // Wire up event and lifecycle handlers - this._bindEvents(); - this._registerChild(); - this._startTickLoop(); - this._attachInputHandler(); - this._attachCloseHandler(); - } - - /** - * Load and merge default config with user-defined settings. - * Uses ConfigManager.buildConfig() for base sections, then adds monster-specific domain config. - * @param {object} uiConfig - Raw config from Node-RED UI. - */ - _loadConfig(uiConfig) { - const cfgMgr = new configManager(); - - // Build config: base sections + monster-specific domain config - this.config = cfgMgr.buildConfig(this.name, uiConfig, this.node.id, { + buildDomainConfig(uiConfig) { + return { constraints: { samplingtime: Number(uiConfig.samplingtime) || 0, minVolume: Number(uiConfig.minvolume ?? uiConfig.minVolume) || 5, @@ -55,186 +23,14 @@ class nodeClass { maxRainRef: Number(uiConfig.maxRainRef) || 10, minSampleIntervalSec: Number(uiConfig.minSampleIntervalSec) || 60, }, - }); - - this.config.functionality = { - ...this.config.functionality, - role: 'samplingCabinet', - aquonSampleName: uiConfig.aquon_sample_name || undefined, + functionality: { role: 'samplingCabinet', aquonSampleName: uiConfig.aquon_sample_name || undefined }, + asset: { emptyWeightBucket: Number(uiConfig.emptyWeightBucket) || 3 }, }; - - this.config.asset = { - ...this.config.asset, - emptyWeightBucket: Number(uiConfig.emptyWeightBucket) || 3, - }; - - // Utility for formatting outputs - this._output = new outputUtils(); } - /** - * Instantiate the core logic and store as source. - */ - _setupSpecificClass(uiConfig) { - this.source = new Specific(this.config); - - if (uiConfig?.aquon_sample_name) { - this.source.aquonSampleName = uiConfig.aquon_sample_name; - } - - this.node.source = this.source; - } - - /** - * Bind events to Node-RED status updates. - */ - _bindEvents() {} - - _updateNodeStatus() { - const m = this.source; - try { - const bucketVol = m.bucketVol; - const maxVolume = m.maxVolume; - const state = m.running; - const mode = 'AI'; - const flowMin = m.nominalFlowMin; - const flowMax = m.flowMax; - - if (m.invalidFlowBounds) { - return { - fill: 'red', - shape: 'ring', - text: `Config error: nominalFlowMin (${flowMin}) >= flowMax (${flowMax})`, - }; - } - - if (state) { - const levelText = `${bucketVol}/${maxVolume} L`; - const cooldownMs = typeof m.getSampleCooldownMs === 'function' - ? m.getSampleCooldownMs() - : 0; - - if (cooldownMs > 0) { - const cooldownSec = Math.ceil(cooldownMs / 1000); - return { fill: 'yellow', shape: 'ring', text: `SAMPLING (${cooldownSec}s) ${levelText}` }; - } - - return { fill: 'green', shape: 'dot', text: `${mode}: RUNNING ${levelText}` }; - } - - return { fill: 'grey', shape: 'ring', text: `${mode}: IDLE` }; - } catch (error) { - this.node.error(`Error in updateNodeStatus: ${error.message}`); - return { fill: 'red', shape: 'ring', text: 'Status Error' }; - } - } - - /** - * Register this node as a child upstream and downstream. - * Delayed to avoid Node-RED startup race conditions. - */ - _registerChild() { - setTimeout(() => { - this.node.send([ - null, - null, - { topic: 'registerChild', payload: this.config.general.id, positionVsParent: this.config?.functionality?.positionVsParent || 'atEquipment' }, - ]); - }, 100); - } - - /** - * Start the periodic tick loop. - */ - _startTickLoop() { - setTimeout(() => { - this._tickInterval = setInterval(() => this._tick(), 1000); - this._statusInterval = setInterval(() => { - this.node.status(this._updateNodeStatus()); - }, 1000); - }, 1000); - } - - /** - * Execute a single tick: update measurement, format and send outputs. - */ - _tick() { - this.source.tick(); - - const raw = this.source.getOutput(); - const processMsg = this._output.formatMsg(raw, this.source.config, 'process'); - const influxMsg = this._output.formatMsg(raw, this.source.config, 'influxdb'); - - this.node.send([processMsg, influxMsg]); - } - - /** - * Attach the node's input handler, routing control messages to the class. - */ - _attachInputHandler() { - this.node.on('input', (msg, send, done) => { - const m = this.source; - try { - switch (msg.topic) { - case 'input_q': { - const value = Number(msg.payload?.value); - const unit = msg.payload?.unit; - if (!Number.isFinite(value) || !unit) { - this.node.warn('input_q payload must include numeric value and unit.'); - break; - } - let converted = value; - try { - converted = convert(value).from(unit).to('m3/h'); - } catch (error) { - this.node.warn(`input_q unit conversion failed: ${error.message}`); - break; - } - m.handleInput('input_q', { value: converted, unit: 'm3/h' }); - break; - } - case 'i_start': - case 'monsternametijden': - case 'rain_data': - m.handleInput(msg.topic, msg.payload); - break; - case 'registerChild': { - const childId = msg.payload; - const childObj = this.RED.nodes.getNode(childId); - if (childObj?.source) { - m.childRegistrationUtils.registerChild(childObj.source, msg.positionVsParent); - } - break; - } - case 'setMode': - m.setMode(msg.payload); - break; - case 'model_prediction': - if (typeof m.setModelPrediction === 'function') { - m.setModelPrediction(msg.payload); - } - break; - default: - m.logger?.warn(`Unknown topic: ${msg.topic}`); - break; - } - } catch (error) { - this.node.error(`Error handling input (${msg?.topic}): ${error?.message || error}`); - } finally { - if (typeof done === 'function') done(); - } - }); - } - - /** - * Clean up timers and intervals when Node-RED stops the node. - */ - _attachCloseHandler() { - this.node.on('close', (done) => { - clearInterval(this._tickInterval); - clearInterval(this._statusInterval); - if (typeof done === 'function') done(); - }); + extraSetup() { + const uiSampleName = this.config?.functionality?.aquonSampleName; + if (uiSampleName) this.source.aquonSampleName = uiSampleName; } } diff --git a/src/parameters/parameters.js b/src/parameters/parameters.js new file mode 100644 index 0000000..d6de363 --- /dev/null +++ b/src/parameters/parameters.js @@ -0,0 +1,56 @@ +'use strict'; + +// Sampling-cabinet boundary + target math + rain-scaled flow prediction. +// All operations are pure given a domain handle — the domain owns the +// mutable fields (maxVolume, targetPuls, …) so legacy tests that read +// `monster.maxVolume` keep working. + +const RAIN_STALE_MS = 2 * 60 * 60 * 1000; + +function applyBoundsAndTargets(m) { + m.maxVolume = m.maxWeight - m.emptyWeightBucket; + m.minPuls = Math.round(m.minVolume / m.volume_pulse); + m.maxPuls = Math.round(m.maxVolume / m.volume_pulse); + m.absMaxPuls = Math.round(m.cap_volume / m.volume_pulse); + m.targetVolume = m.minVolume * Math.sqrt(m.maxVolume / m.minVolume); + m.targetPuls = Math.round(m.targetVolume / m.volume_pulse); +} + +function validateFlowBounds(m) { + const min = Number(m.nominalFlowMin); + const max = Number(m.flowMax); + const valid = Number.isFinite(min) && Number.isFinite(max) && min >= 0 && max > 0 && min < max; + m.invalidFlowBounds = !valid; + if (!valid) m.logger.warn(`Invalid flow bounds. nominalFlowMin=${m.nominalFlowMin}, flowMax=${m.flowMax}`); + return valid; +} + +function getRainIndex(m) { + if (!m.lastRainUpdate) return 0; + if (Date.now() - m.lastRainUpdate > RAIN_STALE_MS) return 0; + return Number.isFinite(m.avgRain) ? m.avgRain : 0; +} + +function getPredictedFlowRate(m) { + const min = Number(m.nominalFlowMin); + const max = Number(m.flowMax); + if (!Number.isFinite(min) || !Number.isFinite(max) || min < 0 || max <= 0 || min >= max) return 0; + const rainIndex = getRainIndex(m); + const scale = Math.max(0, Math.min(1, m.rainMaxRef > 0 ? rainIndex / m.rainMaxRef : 0)); + return min + (max - min) * scale; +} + +function getSampleCooldownMs(m) { + if (!m.lastSampleTime) return 0; + const remaining = (m.minSampleIntervalSec * 1000) - (Date.now() - m.lastSampleTime); + return Math.max(0, remaining); +} + +module.exports = { + applyBoundsAndTargets, + validateFlowBounds, + getRainIndex, + getPredictedFlowRate, + getSampleCooldownMs, + RAIN_STALE_MS, +}; diff --git a/src/rain/rainAggregator.js b/src/rain/rainAggregator.js new file mode 100644 index 0000000..695e6ab --- /dev/null +++ b/src/rain/rainAggregator.js @@ -0,0 +1,58 @@ +'use strict'; + +// Rain-data aggregator — sums per-location hourly precipitation, weighted +// by per-hour probability, and stores both the raw and probability-weighted +// values keyed by timestamp. sumRain/avgRain feed parameters.getRainIndex +// which scales the predicted flow rate between nominalFlowMin and flowMax. + +class RainAggregator { + constructor({ logger } = {}) { + this.logger = logger; + this.aggregatedOutput = {}; + this.sumRain = 0; + this.avgRain = 0; + } + + // Returns the aggregated per-location object so callers can chain. + // Mutates this.aggregatedOutput / sumRain / avgRain in place. + update(value) { + if (!value) return this.aggregatedOutput; + + const totalRaw = {}; + const totalProb = {}; + let numberOfLocations = 0; + + Object.entries(value).forEach(([locationKey, location]) => { + numberOfLocations++; + const slot = (this.aggregatedOutput[locationKey] = { + tag: { latitude: location.latitude, longitude: location.longitude }, + precipationRaw: {}, + precipationProb: {}, + }); + + Object.entries(location.hourly.time).forEach(([key, time]) => { + const currTimestamp = new Date(time).getTime(); + let probability = 100; + if (typeof location.hourly.precipitation_probability !== 'undefined') { + probability = location.hourly.precipitation_probability[key]; + } + if (probability > 0) probability /= 100; + + if (totalRaw[currTimestamp] === undefined) totalRaw[currTimestamp] = 0; + if (totalProb[currTimestamp] === undefined) totalProb[currTimestamp] = 0; + + totalRaw[currTimestamp] += location.hourly.precipitation[key]; + totalProb[currTimestamp] += location.hourly.precipitation[key] * probability; + + slot.precipationRaw[key] = { val: location.hourly.precipitation[key], time: currTimestamp }; + slot.precipationProb[key] = { val: probability, time: currTimestamp }; + }); + }); + + this.sumRain = Object.values(totalProb).reduce((s, v) => s + v, 0); + this.avgRain = numberOfLocations > 0 ? this.sumRain / numberOfLocations : 0; + return this.aggregatedOutput; + } +} + +module.exports = RainAggregator; diff --git a/src/sampling/samplingProgram.js b/src/sampling/samplingProgram.js new file mode 100644 index 0000000..204a7de --- /dev/null +++ b/src/sampling/samplingProgram.js @@ -0,0 +1,113 @@ +'use strict'; + +// Sampling program — the time-driven core. Each tick: +// 1. on i_start or scheduled date → init a sampling run (m3PerPuls, stop_time) +// 2. while running: integrate m3PerTick into temp_pulse; emit a pulse when +// it crosses 1 unless the cooldown guard blocks it +// 3. after stop_time: clear running state. +// flowCalc derives m3PerTick from the latest q (m3/h) and the wall-clock +// delta since the last call — runs once per tick before sampling_program. + +const params = require('../parameters/parameters'); +const { regNextDate } = require('../schedule/schedule'); + +function getModelPrediction(m) { + const samplingHours = Number(m.sampling_time) || 0; + const predictedRate = params.getPredictedFlowRate(m); + const fallbackRate = m.flowTracker.getEffectiveFlow(); + const flowM3PerHour = predictedRate > 0 ? predictedRate : fallbackRate; + m.predFlow = Math.max(0, flowM3PerHour * samplingHours); + return m.predFlow; +} + +function flowCalc(m) { + const timePassed = m.flowTime > 0 ? (Date.now() - m.flowTime) / 1000 : 0; + m.m3PerTick = (m.q / 60 / 60) * timePassed; + m.flowTime = Date.now(); +} + +function _beginRun(m) { + m.running = true; + m.temp_pulse = 0; + m.pulse = false; + m.updateBucketVol(0); + m.sumPuls = 0; + m.m3Total = 0; + m.timePassed = 0; + m.timeLeft = 0; + m.predM3PerSec = 0; + + getModelPrediction(m); + m.m3PerPuls = Math.round(m.predFlow / m.targetPuls); + m.predM3PerSec = m.predFlow / m.sampling_time / 60 / 60; + m.start_time = Date.now(); + m.stop_time = Date.now() + (m.sampling_time * 60 * 60 * 1000); + + regNextDate(m, m.monsternametijden); + m.i_start = false; +} + +function _endRun(m) { + m.m3PerPuls = 0; + m.temp_pulse = 0; + m.pulse = false; + m.updateBucketVol(0); + m.sumPuls = 0; + m.timePassed = 0; + m.timeLeft = 0; + m.predFlow = 0; + m.predM3PerSec = 0; + m.m3Total = 0; + m.running = false; +} + +function _maybeEmitPulse(m) { + if (!(m.temp_pulse >= 1 && m.sumPuls < m.absMaxPuls)) { + if (m.pulse) m.pulse = false; + return; + } + + const now = Date.now(); + const cooldownMs = m.minSampleIntervalSec * 1000; + const blocked = m.lastSampleTime && (now - m.lastSampleTime) < cooldownMs; + + if (blocked) { + m.missedSamples++; + m.pulse = false; + m.temp_pulse = Math.min(m.temp_pulse, 1); + if (!m.lastSampleWarnTime || (now - m.lastSampleWarnTime) > cooldownMs) { + m.lastSampleWarnTime = now; + m.logger.warn(`Sampling too fast. Cooldown active for ${Math.ceil((cooldownMs - (now - m.lastSampleTime)) / 1000)}s.`); + } + return; + } + + m.temp_pulse -= 1; + m.pulse = true; + m.lastSampleTime = now; + m.sumPuls++; + m.updateBucketVol(Math.round(m.sumPuls * m.volume_pulse * 100) / 100); +} + +function samplingProgram(m) { + if (((m.i_start) || (Date.now() >= m.nextDate)) && !m.running) { + if (!params.validateFlowBounds(m)) { + m.running = false; + m.i_start = false; + return; + } + _beginRun(m); + } + + if (m.stop_time > Date.now()) { + m.timePassed = Math.round((Date.now() - m.start_time) / 1000); + m.timeLeft = Math.round((m.stop_time - Date.now()) / 1000); + m.temp_pulse += m.m3PerTick / m.m3PerPuls; + m.m3Total += m.m3PerTick; + _maybeEmitPulse(m); + } else if (m.running) { + _endRun(m); + } +} + +module.exports = { samplingProgram, flowCalc, getModelPrediction }; diff --git a/src/schedule/schedule.js b/src/schedule/schedule.js new file mode 100644 index 0000000..fce14d5 --- /dev/null +++ b/src/schedule/schedule.js @@ -0,0 +1,42 @@ +'use strict'; + +// AQUON sample schedule helpers. updateMonsternametijden validates the +// row shape before storing; regNextDate walks the rows to find the next +// future START_DATE for the configured aquonSampleName and counts how +// many of those fall in the current calendar year. + +function updateMonsternametijden(m, value) { + if (!m.init || !value || Object.keys(value).length === 0) return; + if ( + typeof value[0]?.SAMPLE_NAME !== 'undefined' && + typeof value[0]?.DESCRIPTION !== 'undefined' && + typeof value[0]?.SAMPLED_DATE !== 'undefined' && + typeof value[0]?.START_DATE !== 'undefined' && + typeof value[0]?.END_DATE !== 'undefined' + ) { + m.monsternametijden = value; + regNextDate(m, value); + } +} + +function regNextDate(m, monsternametijden) { + let next_date = new Date(new Date().setFullYear(new Date().getFullYear() + 1)); + let n_days_remaining = 0; + + if (typeof monsternametijden !== 'undefined') { + Object.values(monsternametijden).forEach((line) => { + if (line.START_DATE === 'NULL') return; + const curr_date_conv = new Date(line.START_DATE); + const curr_date = curr_date_conv.getTime(); + if (line.SAMPLE_NAME === m.aquonSampleName && curr_date > Date.now()) { + if (curr_date < next_date) next_date = curr_date; + if (new Date().getFullYear() === curr_date_conv.getFullYear()) n_days_remaining++; + } + }); + } + + m.daysPerYear = n_days_remaining; + m.nextDate = next_date; +} + +module.exports = { updateMonsternametijden, regNextDate }; diff --git a/src/specificClass.js b/src/specificClass.js index cb8e084..5ad85eb 100644 --- a/src/specificClass.js +++ b/src/specificClass.js @@ -1,171 +1,129 @@ +'use strict'; -const EventEmitter = require('events'); -const {logger,configUtils,configManager, MeasurementContainer, predict, interpolation, childRegistrationUtils} = require('generalFunctions'); +// Monster — multi-parameter biological process monitoring (Unit-level). +// Orchestrator only: wires the parameters/flow/rain/schedule/sampling +// modules in configure() and calls them in tick(). The mutable sampling +// state (running, sumPuls, m3PerPuls, …) lives on `this` so the existing +// test surface (`monster.bucketVol`, `monster.q`, …) keeps working. -class Monster{ - /*------------------- Construct and set vars -------------------*/ - constructor(config={}) { +const { BaseDomain } = require('generalFunctions'); +const params = require('./parameters/parameters'); +const FlowTracker = require('./flow/flowTracker'); +const RainAggregator = require('./rain/rainAggregator'); +const schedule = require('./schedule/schedule'); +const sampling = require('./sampling/samplingProgram'); +const { buildOutput } = require('./io/output'); +const { buildStatusBadge } = require('./io/statusBadge'); - //init - this.init = false; // keep track of init +class Monster extends BaseDomain { + static name = 'monster'; - //basic setup - this.emitter = new EventEmitter(); // Own EventEmitter - - this.logger = new logger(config.general.logging.enabled,config.general.logging.logLevel, config.general.name); - this.configManager = new configManager(); - this.defaultConfig = this.configManager.getConfig('monster'); // Load default config for rotating machine ( use software type name ? ) - this.configUtils = new configUtils(this.defaultConfig); - this.config = this.configUtils.initConfig(config); + configure() { + this.init = false; - // -------------------------------------- fetch dependencies -------------------------- - //this.math = require('mathjs'); + this._initState(); + this._initSamplingDefaults(); - //measurements - this.measurements = new MeasurementContainer({ - autoConvert: true, - windowSize: 50, - defaultUnits: { - flow: 'm3/h', - volume: 'm3' - } - }, this.logger); + this.flowTracker = new FlowTracker({ measurements: this.measurements, logger: this.logger }); + this.rainAggregator = new RainAggregator({ logger: this.logger }); - //child registration - this.child = {} ; // register childs - this.childRegistrationUtils = new childRegistrationUtils(this); - - //Specific object info - this.aquonSampleName = "112100" ; // aquon sample name to start automatic sampling on the basis of the document - this.monsternametijden = {} ; // json monsternametijden file? - this.rain_data = {} ; // precipitation data - this.aggregatedOutput = {} ; // object that does not contain momentary values but a combination of all kinds of data over a fixed period of time - this.sumRain = 0 ; // total sum of rain over time window + n hours and - n hours - this.avgRain = 0 ; // total divided by number of locations to get average over total time - this.daysPerYear = 0 ; // how many days remaining for this year - this.lastRainUpdate = 0 ; // timestamp of last rain data update - this.rainMaxRef = 10 ; // mm reference for scaling linear prediction - this.rainStaleMs = 2 * 60 * 60 * 1000; // 2 hours - - // outputs - this.pulse = false; // output pulse to sampling machine - this.bucketVol = 0; // how full is the sample? - this.sumPuls = 0; // number of pulses so far - this.predFlow = 0; // predicted flow over sampling time in hours, expressed in m3 - this.bucketWeight = 0; // actual weight of bucket - - //inputs - this.q = 0; // influent flow in m3/h (effective) - this.manualFlow = null; // manual flow override value in m3/h - this.i_start = false // when true, the program gets kicked off calculating what it needs to take samples - this.sampling_time = config.constraints.samplingtime; // time expressed in hours over which the sampling will run (currently 24) - this.emptyWeightBucket = config.asset.emptyWeightBucket; // empty weight of the bucket - this.nominalFlowMin = config.constraints.nominalFlowMin; // nominal dry-day flow in m3/h - this.flowMax = config.constraints.flowMax; // max inflow in m3/h - this.minSampleIntervalSec = config.constraints.minSampleIntervalSec || 60; // min seconds between samples - - // internal vars - this.temp_pulse = 0; // each interval pulses send out 1 and then reset - this.volume_pulse = 0.05; // define volume pulse expressed in L - this.minVolume = config.constraints.minVolume;// define min volume in a sampling cabinet before a sample is declared valid expressed in L - this.maxVolume = 0; // calculated maxvolume depending on own weight - this.maxWeight = config.constraints.maxWeight;// define max volume in a sampling cabinet before a sample is declared invalid expressed in L - this.cap_volume = 55; // abs max capacity of bucket (volume) in liters - this.targetVolume = 0; // volume of sampling cabinet that model aims for - this.minPuls = 0; // calculates the min pulses depending on min_vol and max_vol - this.maxPuls = 0; // calculates the max pulses depending on min_vol and max_vol - this.absMaxPuls = 0; // capacity of sampling cabinet (number of pulses) - this.targetPuls = 0; // keeps track of the desired amount of pulses (+- 50% tolerance), based on aimed volume - this.m3PerPuls = 0; // each pulse is equal to a number of m3 - this.predM3PerSec = 0; // predicted flow in m3 per second - this.m3PerTick = 0; // actual measured flow in m3 per second - this.m3Total = 0; // total measured flow over sampling time in m3 - this.running = false; // define if sampling is running or not - this.invalidFlowBounds = false; // whether nominalFlowMin/flowMax are invalid - this.lastSampleTime = 0; // last sample (pulse) timestamp - this.lastSampleWarnTime = 0; // last warning timestamp for cooldown - this.missedSamples = 0; // count blocked samples due to cooldown - - this.qLineRaw = {}; // see example - this.minSeen = {}; // keeps track of minimum ever seen so far in a time period for each hour (over totals not every value) - this.maxSeen = {}; // keeps track of maximum ever seen so far in a time period for each hour (over totals not every value) - this.qLineRefined = {}; // this should be the ( quantiles? ) classified in the datasets - this.calcTimeShiftDry = 0; // What is the delay after a dry period of minimum n hours - this.calcTimeShiftWet = 0; - this.calcCapacitySewer = 0; - // how much rain goes to the sewage ? -> calculate surface area of hardend / sewage. - - this.minDryHours = 0; // what is the minimum of dry hours before we can calculate timeshift? spot this with moving average? - this.minWetHours = 0; // how long does it take to remove all the rain? - this.resolution = 0; // Number of chunks in qLineRaw / define how big the window is to sum all values ( for now we need to take 1 hour or bigger resolutions but in the future smaller is better to see more accurate correlations) - this.tmpTotQ = 0; // keep track of sum of q within resolution window - - //old prediction factor - this.predFactor = 0.7; // define factor as multiplier for prediction - - //track program start and stop - this.start_time = Date.now(); // default start time - this.stop_time = Date.now(); // default stop time - this.flowTime = 0; //keep track in detail how much time between 2 ticks for more accurate flow measurement - this.timePassed = 0; // time in seconds - this.timeLeft = 0; // time in seconds - this.currHour = new Date().getHours(); // on init define in which hour we are 0 - 23 - - if (Number.isFinite(config?.constraints?.maxRainRef)) { - this.rainMaxRef = config.constraints.maxRainRef; + if (Number.isFinite(this.config?.constraints?.maxRainRef)) { + this.rainMaxRef = this.config.constraints.maxRainRef; } - this.init = true; // end of constructor + this.init = true; + params.applyBoundsAndTargets(this); - //set boundries and targets after init based on above settings - this.set_boundries_and_targets(); - - - } - - /*------------------- INPUT HANDLING -------------------*/ - handleInput(topic, payload) { - switch (topic) { - case 'i_start': - this.i_start = Boolean(payload); - break; - case 'monsternametijden': - this.updateMonsternametijden(payload); - break; - case 'rain_data': - this.updateRainData(payload); - break; - case 'input_q': - this.updateManualFlow(payload); - break; - default: - break; - } + this.router.onRegister('measurement', (child) => this._wireMeasurementChild(child)); } - updateMonsternametijden(value) { - if (!this.init || !value || Object.keys(value).length === 0) { - return; - } + _initState() { + this.aquonSampleName = '112100'; + this.monsternametijden = {}; + this.rain_data = {}; + this.aggregatedOutput = {}; + this.sumRain = 0; + this.avgRain = 0; + this.daysPerYear = 0; + this.lastRainUpdate = 0; + this.rainMaxRef = 10; + this.predFactor = 0.7; + this.start_time = Date.now(); + this.stop_time = Date.now(); + this.flowTime = 0; + this.timePassed = 0; + this.timeLeft = 0; + this.currHour = new Date().getHours(); + } - if ( - typeof value[0]?.SAMPLE_NAME !== 'undefined' && - typeof value[0]?.DESCRIPTION !== 'undefined' && - typeof value[0]?.SAMPLED_DATE !== 'undefined' && - typeof value[0]?.START_DATE !== 'undefined' && - typeof value[0]?.END_DATE !== 'undefined' - ) { - this.monsternametijden = value; - this.regNextDate(value); + _initSamplingDefaults() { + const c = this.config.constraints || {}; + const a = this.config.asset || {}; + + this.pulse = false; + this.bucketVol = 0; + this.sumPuls = 0; + this.predFlow = 0; + this.bucketWeight = 0; + + this.q = 0; + this.i_start = false; + this.sampling_time = c.samplingtime; + this.emptyWeightBucket = a.emptyWeightBucket; + this.nominalFlowMin = c.nominalFlowMin; + this.flowMax = c.flowMax; + this.minSampleIntervalSec = c.minSampleIntervalSec || 60; + + this.temp_pulse = 0; + this.volume_pulse = 0.05; + this.minVolume = c.minVolume; + this.maxVolume = 0; + this.maxWeight = c.maxWeight; + this.cap_volume = 55; + this.targetVolume = 0; + this.minPuls = 0; + this.maxPuls = 0; + this.absMaxPuls = 0; + this.targetPuls = 0; + this.m3PerPuls = 0; + this.predM3PerSec = 0; + this.m3PerTick = 0; + this.m3Total = 0; + this.running = false; + this.invalidFlowBounds = false; + this.lastSampleTime = 0; + this.lastSampleWarnTime = 0; + this.missedSamples = 0; + } + + _wireMeasurementChild(child) { + if (!child?.measurements?.emitter) return; + const childType = child?.config?.asset?.type; + if (childType && childType !== 'flow') return; + + const handler = (eventData) => this.flowTracker.handleMeasuredFlow(eventData); + child.measurements.emitter.on('flow.measured.upstream', handler); + child.measurements.emitter.on('flow.measured.downstream', handler); + child.measurements.emitter.on('flow.measured.atequipment', handler); + } + + handleInput(topic, payload) { + switch (topic) { + case 'i_start': this.i_start = Boolean(payload); break; + case 'monsternametijden': schedule.updateMonsternametijden(this, payload); break; + case 'rain_data': this.updateRainData(payload); break; + case 'input_q': this.flowTracker.updateManualFlow(payload); break; + default: break; } } updateRainData(value) { this.rain_data = value; this.lastRainUpdate = Date.now(); - if (this.init && !this.running) { - this.updatePredRain(value); + this.aggregatedOutput = this.rainAggregator.update(value); + this.sumRain = this.rainAggregator.sumRain; + this.avgRain = this.rainAggregator.avgRain; } } @@ -174,715 +132,32 @@ class Monster{ this.bucketWeight = val + this.emptyWeightBucket; } - getSampleCooldownMs() { - if (!this.lastSampleTime) { - return 0; - } - const remaining = (this.minSampleIntervalSec * 1000) - (Date.now() - this.lastSampleTime); - return Math.max(0, remaining); - } - - validateFlowBounds() { - const min = Number(this.nominalFlowMin); - const max = Number(this.flowMax); - const valid = Number.isFinite(min) && Number.isFinite(max) && min >= 0 && max > 0 && min < max; - this.invalidFlowBounds = !valid; - if (!valid) { - this.logger.warn(`Invalid flow bounds. nominalFlowMin=${this.nominalFlowMin}, flowMax=${this.flowMax}`); - } - return valid; - } - - getRainIndex() { - if (!this.lastRainUpdate) { - return 0; - } - if (Date.now() - this.lastRainUpdate > this.rainStaleMs) { - return 0; - } - return Number.isFinite(this.avgRain) ? this.avgRain : 0; - } - - getPredictedFlowRate() { - const min = Number(this.nominalFlowMin); - const max = Number(this.flowMax); - if (!Number.isFinite(min) || !Number.isFinite(max) || min < 0 || max <= 0 || min >= max) { - return 0; - } - const rainIndex = this.getRainIndex(); - const scale = Math.max(0, Math.min(1, this.rainMaxRef > 0 ? rainIndex / this.rainMaxRef : 0)); - return min + (max - min) * scale; - } - - - updateManualFlow(payload = {}) { - const value = Number(payload.value); - if (!Number.isFinite(value)) { - return; - } - - const unit = payload.unit || 'm3/h'; - this.manualFlow = value; - this.measurements - .type('flow') - .variant('manual') - .position('atequipment') - .value(value, Date.now(), unit); - } - - handleMeasuredFlow(eventData) { - const value = Number(eventData?.value); - if (!Number.isFinite(value)) { - return; - } - - const position = String(eventData.position || 'atequipment').toLowerCase(); - const unit = eventData.unit || 'm3/h'; - this.measurements - .type('flow') - .variant('measured') - .position(position) - .value(value, eventData.timestamp || Date.now(), unit); - } - - getMeasuredFlow() { - const positions = ['upstream', 'downstream', 'atequipment']; - const values = []; - - positions.forEach((position) => { - const measured = this.measurements - .type('flow') - .variant('measured') - .position(position) - .getCurrentValue(); - - if (Number.isFinite(measured)) { - values.push(measured); - } - }); - - if (!values.length) { - return null; - } - - const sum = values.reduce((total, curr) => total + curr, 0); - return sum / values.length; - } - - getManualFlow() { - const manual = this.measurements - .type('flow') - .variant('manual') - .position('atequipment') - .getCurrentValue(); - - return Number.isFinite(manual) ? manual : null; - } - - getEffectiveFlow() { - const measured = this.getMeasuredFlow(); - const manual = this.getManualFlow(); - - if (measured != null && manual != null) { - return (measured + manual) / 2; - } - - if (measured != null) { - return measured; - } - - if (manual != null) { - return manual; - } - - return 0; - } - - registerChild(child, softwareType) { - if (softwareType !== 'measurement' || !child?.measurements?.emitter) { - return; - } - - const childType = child?.config?.asset?.type; - if (childType && childType !== 'flow') { - return; - } - - const handler = (eventData) => this.handleMeasuredFlow(eventData); - child.measurements.emitter.on('flow.measured.upstream', handler); - child.measurements.emitter.on('flow.measured.downstream', handler); - child.measurements.emitter.on('flow.measured.atequipment', handler); - } - - getOutput() { - const output = this.measurements.getFlattenedOutput(); - const flowRate = Number(this.q) || 0; - const m3PerPulse = Number(this.m3PerPuls) || 0; - const pulseFraction = Number(this.temp_pulse) || 0; - const targetVolumeL = Number(this.targetVolume) > 0 ? this.targetVolume : 0; - const targetVolumeM3 = targetVolumeL > 0 ? targetVolumeL / 1000 : 0; - const flowToNextPulseM3 = m3PerPulse > 0 ? Math.max(0, (1 - pulseFraction) * m3PerPulse) : 0; - const timeToNextPulseSec = flowRate > 0 && flowToNextPulseM3 > 0 - ? Math.round((flowToNextPulseM3 / (flowRate / 3600)) * 100) / 100 - : 0; - const targetProgressPct = targetVolumeL > 0 - ? Math.round((this.bucketVol / targetVolumeL) * 10000) / 100 - : 0; - const targetDeltaL = targetVolumeL > 0 - ? Math.round((this.bucketVol - targetVolumeL) * 100) / 100 - : 0; - const targetDeltaM3 = targetVolumeL > 0 - ? Math.round((targetDeltaL / 1000) * 10000) / 10000 - : 0; - - output.pulse = this.pulse; - output.running = this.running; - output.bucketVol = this.bucketVol; - output.bucketWeight = this.bucketWeight; - output.sumPuls = this.sumPuls; - output.predFlow = this.predFlow; - output.predM3PerSec = this.predM3PerSec; - output.timePassed = this.timePassed; - output.timeLeft = this.timeLeft; - output.m3Total = this.m3Total; - output.q = this.q; - output.nominalFlowMin = this.nominalFlowMin; - output.flowMax = this.flowMax; - output.invalidFlowBounds = this.invalidFlowBounds; - output.minSampleIntervalSec = this.minSampleIntervalSec; - output.missedSamples = this.missedSamples; - output.sampleCooldownMs = this.getSampleCooldownMs(); - output.maxVolume = this.maxVolume; - output.minVolume = this.minVolume; - output.nextDate = this.nextDate; - output.daysPerYear = this.daysPerYear; - output.m3PerPuls = this.m3PerPuls; - output.m3PerPulse = this.m3PerPuls; - output.pulsesRemaining = Math.max(0, (this.targetPuls || 0) - (this.sumPuls || 0)); - output.pulseFraction = pulseFraction; - output.flowToNextPulseM3 = flowToNextPulseM3; - output.timeToNextPulseSec = timeToNextPulseSec; - output.targetVolumeM3 = targetVolumeM3; - output.targetProgressPct = targetProgressPct; - output.targetDeltaL = targetDeltaL; - output.targetDeltaM3 = targetDeltaM3; - output.predictedRateM3h = this.getPredictedFlowRate(); - - return output; - } - - /*------------------- FUNCTIONS -------------------*/ - - set_boundries_and_targets(){ - - // define boundries for algorithm - this.maxVolume = this.maxWeight - this.emptyWeightBucket ; // substract bucket weight of max volume assuming they are both on a 1 to 1 ratio - this.minPuls = Math.round(this.minVolume / this.volume_pulse); // minimum pulses we want before we have a valid sample - this.maxPuls = Math.round(this.maxVolume / this.volume_pulse); // maximum pulses we can handle (otherwise sample is too heavy) - this.absMaxPuls = Math.round(this.cap_volume / this.volume_pulse); // number of pulses a sample can contain before overflowing - // define target values - this.targetVolume = this.minVolume * Math.sqrt(this.maxVolume / this.minVolume); - //old way - //this.targetVolume = Math.round( ( ( (this.maxVolume - this.minVolume) / 2 ) + this.minVolume ) * 100) / 100; // calculate middle between min and max - // correct target values - this.targetPuls = Math.round(this.targetVolume / this.volume_pulse) ; // define desired amount of pulses (in this case our prediction can deviate 50% up and 50% down without a problem) - } - - - updatePredRain(value){ - //make date objects to define relative time window - let now = new Date(Date.now()); - let past = new Date(Date.now()); - let future = new Date(Date.now()); - let totalRaw = {}; - let totalProb = {}; - let totalAvg = {}; - - //refine object with different values - let rain = {}; - rain.hourly = {}; // an object with timestamps and aggreated over all locations summed precipation in mm - rain.hourly.time = []; - rain.hourly.precipationRaw = []; - rain.hourly.precipationProb = []; - - let numberOfLocations = 0; - - //Make timestamp + 24 hours - future.setHours(now.getHours() + 24); - - //Make timestamp - 24hours - past.setHours(now.getHours() - 24); - - //go through all locations and sum up the average precipation of each location so we have summed precipation over every hour - Object.entries(value).forEach(([locationKey, location],locationindex) => { - - //number of locations - numberOfLocations++; - - // make an object to keep track of the dataset we load - this.aggregatedOutput[locationKey] = {}; - this.aggregatedOutput[locationKey].tag = {}; - this.aggregatedOutput[locationKey].tag.latitude = location.latitude; - this.aggregatedOutput[locationKey].tag.longitude = location.longitude; - this.aggregatedOutput[locationKey].precipationRaw = {}; - this.aggregatedOutput[locationKey].precipationProb = {}; - - - //loop through object for each location over all hourlys - Object.entries(location.hourly.time).forEach(([key, time], index) => { - - this.aggregatedOutput[locationKey].precipationRaw[key] = {}; - this.aggregatedOutput[locationKey].precipationProb[key] = {}; - - //convert string output to a date object - let checkdate = new Date(time); - - //convert date to milliseconds timestamps - let currTimestamp = checkdate.getTime(); - let probability = 100; //default probility unless otherwise defined - - if(typeof location.hourly.precipitation_probability !== 'undefined'){ - probability = location.hourly.precipitation_probability[key]; - - } - - if(probability > 0){ - probability /= 100; - } - - // only interested in dates before timeframe and after to make use of - // ( currTimestamp >= now && currTimestamp < future) || ( currTimestamp < now && currTimestamp > past ) - if( true ){ - - typeof totalRaw[currTimestamp] === 'undefined' ? totalRaw[currTimestamp] = 0 : null; - typeof totalProb[currTimestamp] === 'undefined' ? totalProb[currTimestamp] = 0 : null; - - //placed probability into the equation - totalRaw[currTimestamp] += location.hourly.precipitation[key] ; - totalProb[currTimestamp] += ( location.hourly.precipitation[key] * probability ) ; - - //keep track of all requested data - this.aggregatedOutput[locationKey].precipationRaw[key]["val"] = location.hourly.precipitation[key]; // raw data from open weather data - this.aggregatedOutput[locationKey].precipationRaw[key]["time"] = currTimestamp; - - this.aggregatedOutput[locationKey].precipationProb[key]["val"] = probability; // probability of open weather - this.aggregatedOutput[locationKey].precipationProb[key]["time"] = currTimestamp; - - } - - //remove dead info - if(Object.keys(this.aggregatedOutput[locationKey].precipationRaw[key]).length == 0 ){ - delete this.aggregatedOutput[locationKey].precipationRaw[key]; - }; - - if(Object.keys(this.aggregatedOutput[locationKey].precipationProb[key]).length == 0 ){ - delete this.aggregatedOutput[locationKey].precipationProb[key]; - }; - - }); - }); - - //total sum expected over time window (just for ref now not so important anymore) - this.sumRain = Object.values(totalProb).reduce((sum, value) => sum + value, 0); - this.avgRain = this.sumRain / numberOfLocations; - - //make average over prob - Object.entries(totalProb).forEach(([key, sum],index) => { - typeof totalAvg[key] === 'undefined' ? totalAvg[key] = 0 : null; - totalAvg[key] = sum / numberOfLocations; - }); - - //make new prediction - return this.aggregatedOutput; - } - - // for getting the day of the year (0-365) - getDayOfYear(ts){ - const start = new Date(ts.getFullYear(), 0, 1); - const diff = ts - start; - const oneDay = 1000 * 60 * 60 * 24; - return Math.floor(diff / oneDay); -} - - - get_model_prediction(){ - // Linear predictor based on rain index with flow bounds. - const samplingHours = Number(this.sampling_time) || 0; - const predictedRate = this.getPredictedFlowRate(); - const fallbackRate = this.getEffectiveFlow(); - const flowM3PerHour = predictedRate > 0 ? predictedRate : fallbackRate; - const fallback = Math.max(0, flowM3PerHour * samplingHours); - - this.predFlow = fallback; - return this.predFlow; -} - -// Legacy/experimental model-based prediction (kept for reference; not used by default). -get_model_prediction_from_rain(){ - - // combine 24 hourly predictions to make one daily prediction (for the next 24 hours including the current hour) - let inputs = []; - for (let predHour = 0; predHour <= 23; predHour++) { - - // select 48 timestamps based on hour te be predicted - let now = new Date(); - const lastHour = new Date(now.setHours(now.getHours() + predHour)); - let timestamps = this.rain_data[0].hourly.time.map(ts => new Date(ts)); - let timestamps_48 = timestamps.filter(ts => ts <= lastHour).slice(-48) - - // for each relevant hour calculate the mean precipitation across all areas - let precipitation = []; - for (let i = 0; i < timestamps.length; i++) { - - if(timestamps_48.includes(timestamps[i])) { - - let values = []; - for (let j = 0; j < this.rain_data.length; j++) { - - values.push(this.rain_data[j].hourly.precipitation[i]); - } - let mean = values.reduce((sum, value) => sum + value, 0) / this.rain_data.length; - precipitation.push(mean); - } - } - - // generate seasonal variables for model: hour of day, day of week, day of year (last 2 with sin cos transformation) - let hour = timestamps_48.map(ts => ts.getHours()); - let weekdayJS = timestamps_48.map(ts => ts.getDay()); // Javascript weekday - let weekdayPY = weekdayJS.map(weekdayJS => (weekdayJS + 6) % 7); // Python weekday - let weekdaySin = weekdayPY.map(weekdayPY => Math.sin(2 * Math.PI * weekdayPY / 7)); - let weekdayCos = weekdayPY.map(weekdayPY => Math.cos(2 * Math.PI * weekdayPY / 7)); - let dayOfYear = timestamps_48.map(ts => this.getDayOfYear(ts)); - let dayOfYearSin = dayOfYear.map(day => Math.sin(2 * Math.PI * day / 365)); - let dayOfYearCos = dayOfYear.map(day => Math.cos(2 * Math.PI * day / 365)); - - // standardize variables for prediction and 'zip' them - const scaling = [ - { - "hour mean": 11.504046716524947, - "weekdaySin mean": -0.00023422353487966347, - "weekdayCos mean": 0.0033714029956787715, - "dayOfYearSin mean": 0.06748893577363864, - "dayOfYearCos mean": -0.02137433139416939, - "precipitation mean": 0.0887225073082283 - }, - { - "hour scale": 6.92182769305216, - "weekdaySin scale": 0.7073194528907719, - "weekdayCos scale": 0.7068859670013796, - "dayOfYearSin scale": 0.701099604274817, - "dayOfYearCos scale": 0.7095405037003095, - "precipitation scale": 0.4505403578968155 - }, - { - "Flow (m3/h) mean": 1178.7800890533754 - }, - { - "Flow (m3/h) scale": 1025.3973622173557 - } - ] - const means = scaling[0]; - const scales = scaling[1]; - - let features = [hour, weekdaySin, weekdayCos, dayOfYearSin, dayOfYearCos, precipitation]; - const names = ["hour", "weekdaySin", "weekdayCos", "dayOfYearSin", "dayOfYearCos", "precipitation"] - - features = features.map((arr, i) => - arr.map(value => (value - means[`${names[i]} mean`]) / scales[`${names[i]} scale`])); - [hour, weekdaySin, weekdayCos, dayOfYearSin, dayOfYearCos, precipitation] = features; - - const zipped = this.zip(hour, weekdaySin, weekdayCos, dayOfYearSin, dayOfYearCos, precipitation); - - // collect inputdata for model - inputs.push(zipped); - - } - const output = this.model_loader(inputs); - console.log('Final output: ' + output); -} - -async model_loader(inputs){ - - let dailyPred = 0; - - try { - - - // Try loading with default input shape*/ - const path = 'nodes/generalFunctions/datasets/lstmData/tfjs_model/model.json'; - const model = await this.modelLoader.loadModelPath(path); - console.log('Model loaded successfully!'); - - // make predictions - for (const input of inputs) { - - const inputTensor = tf.tensor3d([input]); - const predict = model.predict(inputTensor); - let predictValue = await predict.data(); - - // back-transformation because of standardization of the response variable - predictValue = predictValue[0] * 1024.1940942 + 1188.0105115; - dailyPred += predictValue; - } - console.log('Daily prediction: ' + dailyPred); - } catch (error) { - console.error('Failed to load model:', error); - } - return dailyPred; -} - - sampling_program(){ - - // ------------------ Run once on conditions and start sampling - if( ( (this.i_start ) || ( Date.now() >= this.nextDate ) ) && !this.running ){ - - if (!this.validateFlowBounds()) { - this.running = false; - this.i_start = false; - return; - } - - this.running = true; - - // reset persistent vars - this.temp_pulse = 0; - this.pulse = false; - this.updateBucketVol(0); - this.sumPuls = 0; - this.m3Total = 0; - this.timePassed = 0; // time in seconds - this.timeLeft = 0; // time in seconds - this.predM3PerSec = 0; - - //run prediction to ensure its value is filled - this.get_model_prediction(); - - // define m3 per pulse for this run and round to int ! - this.m3PerPuls = Math.round(this.predFlow / this.targetPuls); - this.predM3PerSec = this.predFlow / this.sampling_time / 60 / 60; // predicted m3 per time - - // define start and stop time based on calender data - this.start_time = Date.now(); - this.stop_time = Date.now() + (this.sampling_time * 60 * 60 * 1000); // convert to milliseconds - - //reset parameters and look for next date - this.regNextDate(this.monsternametijden); - - // reset start - this.i_start = false; - } - - // ------------------ Run for as long as sampling time is not greater than stop time - if(this.stop_time > Date.now()){ - - // define time vars - this.timePassed = Math.round( ( Date.now() - this.start_time ) / 1000); - this.timeLeft = Math.round( ( this.stop_time - Date.now() ) / 1000); - - // calc temp pulse rate - let update = this.m3PerTick / this.m3PerPuls; - - // update values - this.temp_pulse += update; - this.m3Total += this.m3PerTick; - - // check if we need to send out a pulse (stop sending pulses if capacity is reached) - if(this.temp_pulse >= 1 && this.sumPuls < this.absMaxPuls){ - const now = Date.now(); - const cooldownMs = this.minSampleIntervalSec * 1000; - const blocked = this.lastSampleTime && (now - this.lastSampleTime) < cooldownMs; - - if (blocked) { - this.missedSamples++; - this.pulse = false; - this.temp_pulse = Math.min(this.temp_pulse, 1); - - if (!this.lastSampleWarnTime || (now - this.lastSampleWarnTime) > cooldownMs) { - this.lastSampleWarnTime = now; - this.logger.warn(`Sampling too fast. Cooldown active for ${Math.ceil((cooldownMs - (now - this.lastSampleTime)) / 1000)}s.`); - } - } else { - // reset - this.temp_pulse += -1; - // send out a pulse and add to count - this.pulse = true; - this.lastSampleTime = now; - // count pulses - this.sumPuls++; - // update bucket volume each pulse - this.updateBucketVol(Math.round(this.sumPuls * this.volume_pulse * 100) / 100); - } - - } - else{ - - if( this.sumPuls > this.absMaxPuls){ - - // find out how to reschedule sample automatically? - } - - //update pulse when its true - if(this.pulse){ - this.pulse = false; // continue but don't send out a pulse - } - - } - } - else - { - //after setting once dont do it again - if(this.running){ - // Vars can only be 0 if this is not running - this.m3PerPuls = 0; - this.temp_pulse = 0; - this.pulse = false; - this.updateBucketVol(0); - this.sumPuls = 0; - this.timePassed = 0; // time in seconds - this.timeLeft = 0; // time in seconds - this.predFlow = 0; - this.predM3PerSec = 0; - this.m3Total = 0; - this.running = false; // end of sampling program (stop_time reached) - - } - } - } - - flowCalc(){ - //reset timePassed - let timePassed = 0; - - // each tick calc flowtimepassed - this.flowTime > 0 ? timePassed = ( Date.now() - this.flowTime) / 1000 : timePassed = 0 ; - - //conver to m3 per tick - this.m3PerTick = this.q / 60 / 60 * timePassed ; - - // put new timestamp - this.flowTime = Date.now(); - - } - - //goes through time related functions - tick(){ - - // ------------------ 1.0 Main program loop ------------------ + // Public surface kept for legacy tests (sampling-guards, factories.js). + getSampleCooldownMs() { return params.getSampleCooldownMs(this); } + validateFlowBounds() { return params.validateFlowBounds(this); } + getRainIndex() { return params.getRainIndex(this); } + getPredictedFlowRate() { return params.getPredictedFlowRate(this); } + getMeasuredFlow() { return this.flowTracker.getMeasuredFlow(); } + getManualFlow() { return this.flowTracker.getManualFlow(); } + getEffectiveFlow() { return this.flowTracker.getEffectiveFlow(); } + get_model_prediction() { return sampling.getModelPrediction(this); } + flowCalc() { sampling.flowCalc(this); } + sampling_program() { sampling.samplingProgram(this); } + set_boundries_and_targets() { params.applyBoundsAndTargets(this); } + regNextDate(rows) { schedule.regNextDate(this, rows); } + updateMonsternametijden(v) { schedule.updateMonsternametijden(this, v); } + updatePredRain(v) { return this.rainAggregator.update(v); } + + tick() { this.logger.debug('Monster tick running'); - - //resolve effective flow in m3/h - this.q = this.getEffectiveFlow(); - - //calculate flow based on input + this.q = this.flowTracker.getEffectiveFlow(); this.flowCalc(); - - //run sampling program this.sampling_program(); - - //logQ for predictions / forecasts - this.logQoverTime(); + this.notifyOutputChanged(); } - regNextDate(monsternametijden){ - - let next_date = new Date(new Date().setFullYear(new Date().getFullYear() + 1)); - let n_days_remaining = 0; - - if(typeof monsternametijden !== 'undefined'){ - // loop through lines - Object.entries(monsternametijden).forEach(([key, line],index) => { - - //console.log(line.START_DATE); - //check if date is not null - if(line.START_DATE != "NULL"){ - let curr_date_conv = new Date(line.START_DATE); - let curr_date = curr_date_conv.getTime(); - - //check if sample name is this sample and if date is bigger than now. - if(line.SAMPLE_NAME == this.aquonSampleName && curr_date > Date.now() ){ - - //only keep date that is bigger than current but smaller than the ones that follow after it. - if(curr_date < next_date){ next_date = curr_date; } - - // check if its within this year only show those days as days remaining - if( new Date().getFullYear() == curr_date_conv.getFullYear() ){ n_days_remaining++; } - } - } - - }); - } - else{ - //this.warning.push(3); - } - - //store vars remaining - this.daysPerYear = n_days_remaining; - this.nextDate = next_date; - } - - logQoverTime(){ - - //store currHour in temp obj for easy ref - let h = this.currHour; - - // define rain hour of which the correlation is the biggest this doesnt belong in this section do this afterwards - // let rainH = h - this.calcTimeShift ; - - // how much rain fell on rainH (define category) - - // fetch current hour from actual time - const currentHour = new Date().getHours(); - - //on hour change begin log - if(h !== currentHour ){ - - //write current total to object - this.qLineRaw.h = this.tmpTotQ - - //reset tmpTotQ - - //set this.currHour to currentHour - } - - } - - //create objects where to push arrays in to keep track of data - createMinMaxSeen(){ - //check which hour it is , then make sum , after sum is complete check which hour it is - //loop over sampling time expressed in hours - for(let h = 1; h < this.sampling_time ; h++){ - this.minSeen = {}; - } - } - - -} // end of class + getOutput() { return buildOutput(this); } + getStatusBadge() { return buildStatusBadge(this); } +} module.exports = Monster; - - -const mConfig={ - general: { - name: "Monster", - logging:{ - logLevel: "debug", - enabled: true, - }, - }, - asset: { - emptyWeightBucket: 3, - }, - constraints: { - minVolume: 4, - maxWeight: 23, - }, -} - -if (require.main === module) { - const monster = new Monster(mConfig); - (async () => { - const intervalId = setInterval(() => { - monster.tick(); - }, 1000); - })(); -}