From e02cd1a7a70401d4f0ac387746df8ba12a4839a2 Mon Sep 17 00:00:00 2001 From: znetsixe Date: Sun, 10 May 2026 22:09:24 +0200 Subject: [PATCH] P6: convert valveGroupControl to BaseDomain + BaseNodeAdapter + concern split Refactor of valveGroupControl to use the platform infrastructure (BaseDomain, BaseNodeAdapter, ChildRouter, commandRegistry, statusBadge). Extracts concerns into focused modules per .claude/refactor/MODULE_SPLIT.md generic template. Tests stay green; CONTRACT.md generated; legacy aliases preserved. Co-Authored-By: Claude Opus 4.7 (1M context) --- CONTRACT.md | 67 +++ src/commands/handlers.js | 65 +++ src/commands/index.js | 53 +++ src/groupOps/flowDistribution.js | 126 +++++ src/io/output.js | 47 ++ src/nodeClass.js | 273 +---------- src/sources/fluidContract.js | 186 ++++++++ src/specificClass.js | 782 ++++++------------------------- 8 files changed, 704 insertions(+), 895 deletions(-) create mode 100644 CONTRACT.md create mode 100644 src/commands/handlers.js create mode 100644 src/commands/index.js create mode 100644 src/groupOps/flowDistribution.js create mode 100644 src/io/output.js create mode 100644 src/sources/fluidContract.js diff --git a/CONTRACT.md b/CONTRACT.md new file mode 100644 index 0000000..24197be --- /dev/null +++ b/CONTRACT.md @@ -0,0 +1,67 @@ +# valveGroupControl — Contract + +Hand-maintained for Phase 6; the `## Inputs` table is generated from +`src/commands/index.js` (see Phase 9 generator). Keep ≤ 80 lines. + +## Inputs (msg.topic on Port 0) + +| Canonical | Aliases (deprecated) | Payload | Effect | +|---|---|---|---| +| `set.mode` | `setMode` | `string` — one of `auto`, `virtualControl`, `fysicalControl`, `maintenance` | Switches the control strategy via `source.setMode(payload)`. | +| `set.position` | `setpoint` | `any` | Reserved for future per-valve positional override; currently a debug-logged no-op pending Phase 7. | +| `child.register` | `registerChild` | `string` — the child node's Node-RED id | Resolves the child via `RED.nodes.getNode` and registers it through `childRegistrationUtils.registerChild(childObj.source, msg.positionVsParent)`. | +| `cmd.execSequence` | `execSequence` | `{ source, action, parameter }` | Forwards to `source.handleInput(source, action, parameter)`. | +| `data.totalFlow` | `totalFlowChange` | numeric, `{ value, position?, variant?, unit? }`, or `{ source, action, ... }` | Updates total measured/predicted flow at the configured position; drives `calcValveFlows` to re-distribute across valves. | +| `cmd.emergencyStop` | `emergencyStop`, `emergencystop` | optional `{ source }` | Runs the `emergencystop` sequence via `handleInput`. | +| `set.reconcileInterval` | `setReconcileInterval` | numeric — seconds (> 0) | Re-tunes the periodic flow-reconciliation interval. Min clamp 100 ms. | + +Aliases log a one-time deprecation warning the first time they fire. + +## Outputs (msg.topic on Port 0/1/2) + +- **Port 0 (process):** `msg.topic = config.general.name`. Payload built by + `outputUtils.formatMsg(..., 'process')` from `getOutput()` — delta-compressed + (only changed fields are emitted). Output keys follow + `__` plus `mode` and `maxDeltaP`. +- **Port 1 (InfluxDB telemetry):** same shape as Port 0, formatted with the + `'influxdb'` formatter. +- **Port 2 (registration):** at startup the node sends one + `{ topic: 'child.register', payload: , positionVsParent }` + to the upstream parent. + +## Events emitted by `source.emitter` / `source.measurements.emitter` + +- `output-changed` (`source.emitter`) — public output state shifted; the + adapter listens and pushes Ports 0/1. +- `fluidContractChange` (`source.emitter`) — group-level fluid contract + (status / serviceType / sourceCount) changed. Parents (e.g. an upstream + valve registering this VGC as its parent) subscribe to react. +- `reconcileIntervalChange` (`source.emitter`) — emitted by + `setReconcileIntervalSeconds`; the adapter restarts the tick loop. +- `flow.predicted.atequipment` (`source.measurements.emitter`) — total + predicted group flow (sum of per-valve assigned flows). +- `pressure.predicted.deltaMax` (`source.measurements.emitter`) — max + delta-P across registered valves. + +The exact set is data-driven by which sources/valves register and what +they publish; downstream consumers subscribe by event name. + +## Children registered by this node + +valveGroupControl accepts two child classes through the +`childRegistrationUtils` handshake: + +- `valve` — an individual valve. Stored in `source.valves[id]`. VGC binds + to the child's `positionChange` (via `child.state.emitter`) and + `deltaPChange` (via `child.emitter`) events to re-distribute flow and + re-compute group max delta-P. +- `machine` / `rotatingmachine` / `machinegroup` / `machinegroupcontrol` / + `pumpingstation` / `valvegroupcontrol` — an upstream **source**. Stored + in `source.sources[id]`. VGC subscribes to the source's + `flow.predicted.*` / `flow.measured.*` events to drive `updateFlow`, + and reads the child's `getFluidContract()` (if present) plus + `fluidContractChange` events to aggregate the group's upstream service + type (`getFluidContract()` exposes the resolved view). + +Position labels accepted from children are `upstream`, `downstream`, +`atEquipment` (and case variants — normalised internally). diff --git a/src/commands/handlers.js b/src/commands/handlers.js new file mode 100644 index 0000000..c82fa04 --- /dev/null +++ b/src/commands/handlers.js @@ -0,0 +1,65 @@ +'use strict'; + +// Handler functions for valveGroupControl commands. Pure functions: +// source = domain (specificClass) instance +// msg = Node-RED input message +// ctx = { node, RED, send, logger } — provided by BaseNodeAdapter + +function _logger(source, ctx) { + return ctx?.logger || source?.logger || null; +} + +exports.setMode = (source, msg) => { + source.setMode(msg.payload); +}; + +exports.setPosition = (source, msg) => { + // Reserved for future per-valve positional override; currently a no-op + // pending Phase 7 topic standardisation of valve setpoint payloads. + _logger(source, null)?.debug?.(`set.position received (no-op): ${JSON.stringify(msg.payload ?? null)}`); +}; + +exports.registerChild = (source, msg, ctx) => { + const log = _logger(source, ctx); + const childId = msg.payload; + const childObj = ctx?.RED?.nodes?.getNode?.(childId); + if (!childObj || !childObj.source) { + log?.warn?.(`registerChild: child '${childId}' not found or has no .source`); + return; + } + source.childRegistrationUtils.registerChild(childObj.source, msg.positionVsParent); +}; + +exports.execSequence = async (source, msg) => { + const { source: seqSource, action: seqAction, parameter } = msg.payload || {}; + await source.handleInput(seqSource, seqAction, parameter); +}; + +exports.totalFlowChange = async (source, msg) => { + const payload = msg.payload || {}; + if (payload && typeof payload === 'object' && Object.prototype.hasOwnProperty.call(payload, 'source')) { + const src = payload.source || 'parent'; + const action = payload.action || 'totalFlowChange'; + await source.handleInput(src, action, payload); + return; + } + await source.handleInput('parent', 'totalFlowChange', payload); +}; + +exports.emergencyStop = async (source, msg) => { + const payload = msg.payload || {}; + const src = payload.source || 'parent'; + await source.handleInput(src, 'emergencystop'); +}; + +exports.setReconcileInterval = (source, msg) => { + const log = _logger(source, null); + const nextSec = Number(msg.payload); + if (!Number.isFinite(nextSec) || nextSec <= 0) { + log?.warn?.(`Invalid reconcile interval payload '${msg.payload}'. Expected seconds > 0.`); + return; + } + if (typeof source.setReconcileIntervalSeconds === 'function') { + source.setReconcileIntervalSeconds(nextSec); + } +}; diff --git a/src/commands/index.js b/src/commands/index.js new file mode 100644 index 0000000..4199a04 --- /dev/null +++ b/src/commands/index.js @@ -0,0 +1,53 @@ +'use strict'; + +// valveGroupControl command registry. Consumed by BaseNodeAdapter via +// `static commands = require('./commands')`. Canonical names follow +// CONTRACTS.md §1; legacy names live in `aliases` and emit a one-time +// deprecation warning at runtime. + +const handlers = require('./handlers'); + +module.exports = [ + { + topic: 'set.mode', + aliases: ['setMode'], + payloadSchema: { type: 'string' }, + handler: handlers.setMode, + }, + { + topic: 'set.position', + aliases: ['setpoint'], + payloadSchema: { type: 'any' }, + handler: handlers.setPosition, + }, + { + topic: 'child.register', + aliases: ['registerChild'], + payloadSchema: { type: 'string' }, + handler: handlers.registerChild, + }, + { + topic: 'cmd.execSequence', + aliases: ['execSequence'], + payloadSchema: { type: 'object' }, + handler: handlers.execSequence, + }, + { + topic: 'data.totalFlow', + aliases: ['totalFlowChange'], + payloadSchema: { type: 'any' }, + handler: handlers.totalFlowChange, + }, + { + topic: 'cmd.emergencyStop', + aliases: ['emergencyStop', 'emergencystop'], + payloadSchema: { type: 'any' }, + handler: handlers.emergencyStop, + }, + { + topic: 'set.reconcileInterval', + aliases: ['setReconcileInterval'], + payloadSchema: { type: 'any' }, + handler: handlers.setReconcileInterval, + }, +]; diff --git a/src/groupOps/flowDistribution.js b/src/groupOps/flowDistribution.js new file mode 100644 index 0000000..bf4ed62 --- /dev/null +++ b/src/groupOps/flowDistribution.js @@ -0,0 +1,126 @@ +'use strict'; + +// Per-valve flow distribution. Splits the group's total flow across +// available valves proportional to Kv, then asks each valve back what +// flow it actually accepted and re-balances the residual. Also surfaces +// max delta-P across the group for downstream readers. + +const DEFAULT_RECONCILIATION = Object.freeze({ maxPasses: 2, residualTolerance: 0.001 }); + +function isValveAvailable(valve) { + const currentState = valve?.state?.getCurrentState?.(); + const mode = valve?.currentMode; + const kv = Number(valve?.kv); + return ( + currentState !== 'off' + && currentState !== 'maintenance' + && mode !== 'maintenance' + && Number.isFinite(kv) + && kv > 0 + ); +} + +function listAvailableValves(valves) { + return Object.entries(valves) + .filter(([, valve]) => isValveAvailable(valve)) + .map(([id, valve]) => ({ id, valve })); +} + +function _readAcceptedFlow(valve, flowUnit) { + const accepted = Number( + valve?.measurements + ?.type('flow') + ?.variant('predicted') + ?.position('downstream') + ?.getCurrentValue(flowUnit) + ); + return Number.isFinite(accepted) ? accepted : null; +} + +function solveFlowDistribution(totalFlow, availableEntries, reconciliation, flowUnit) { + const totalKv = availableEntries.reduce((sum, { valve }) => sum + Number(valve.kv), 0); + if (!Number.isFinite(totalKv) || totalKv <= 0) { + return { flowsById: {}, residual: Number(totalFlow) || 0, passes: 0 }; + } + + const targetById = Object.fromEntries(availableEntries.map(({ id }) => [id, 0])); + let residual = Number(totalFlow); + let passes = 0; + const maxPasses = Math.max(1, Number(reconciliation?.maxPasses) || DEFAULT_RECONCILIATION.maxPasses); + const tolerance = Math.max(0, Number(reconciliation?.residualTolerance) || DEFAULT_RECONCILIATION.residualTolerance); + + while (passes < maxPasses && Number.isFinite(residual) && Math.abs(residual) > tolerance) { + availableEntries.forEach(({ id, valve }) => { + const share = (Number(valve.kv) / totalKv) * residual; + targetById[id] = Number(targetById[id]) + share; + valve.updateFlow('predicted', targetById[id], 'downstream', flowUnit); + }); + + let acceptedTotal = 0; + availableEntries.forEach(({ id, valve }) => { + const accepted = _readAcceptedFlow(valve, flowUnit); + if (Number.isFinite(accepted)) { + targetById[id] = accepted; + acceptedTotal += accepted; + return; + } + acceptedTotal += Number(targetById[id]) || 0; + }); + + residual = Number(totalFlow) - acceptedTotal; + passes += 1; + } + + return { flowsById: targetById, residual: Number.isFinite(residual) ? residual : 0, passes }; +} + +function distributeFlow(vgc) { + const flowUnit = vgc.unitPolicy.output('flow'); + const totalFlowMeasured = vgc._read('flow', 'measured', 'atEquipment', flowUnit); + const totalFlowPredicted = vgc._read('flow', 'predicted', 'atEquipment', flowUnit); + const totalFlow = Number.isFinite(totalFlowMeasured) ? totalFlowMeasured : totalFlowPredicted; + if (!Number.isFinite(totalFlow)) return; + + const availableEntries = listAvailableValves(vgc.valves); + const availableIds = new Set(availableEntries.map((entry) => entry.id)); + const totalKv = availableEntries.reduce((sum, { valve }) => sum + Number(valve.kv), 0); + + if (!availableEntries.length || !Number.isFinite(totalKv) || totalKv <= 0) { + vgc.logger.warn('No available valves with valid Kv, setting assigned flow to 0.'); + for (const valve of Object.values(vgc.valves)) { + valve.updateFlow('predicted', 0, 'downstream', flowUnit); + } + vgc._write('flow', 'predicted', 'atEquipment', 0, flowUnit); + vgc.lastFlowSolve = { passes: 0, residual: Number(totalFlow) || 0, targetTotal: Number(totalFlow) || 0, assignedTotal: 0 }; + return; + } + + const solve = solveFlowDistribution(totalFlow, availableEntries, vgc.flowReconciliation, flowUnit); + let assignedTotal = 0; + for (const [id, valve] of Object.entries(vgc.valves)) { + const flow = availableIds.has(id) ? (solve.flowsById[id] || 0) : 0; + valve.updateFlow('predicted', flow, 'downstream', flowUnit); + assignedTotal += flow; + } + + vgc._write('flow', 'predicted', 'atEquipment', assignedTotal, flowUnit); + vgc.lastFlowSolve = { passes: solve.passes, residual: solve.residual, targetTotal: totalFlow, assignedTotal }; + calcMaxDeltaP(vgc); +} + +function calcMaxDeltaP(vgc) { + const pUnit = vgc.unitPolicy.output('pressure'); + let maxDeltaP = 0; + for (const [id, valve] of Object.entries(vgc.valves)) { + const deltaP = Number( + valve.measurements.type('pressure').variant('predicted').position('delta').getCurrentValue(pUnit) + ); + if (!Number.isFinite(deltaP)) continue; + vgc.logger.debug(`Delta P for valve ${id}: ${deltaP}`); + if (deltaP > maxDeltaP) maxDeltaP = deltaP; + } + vgc.maxDeltaP = maxDeltaP; + vgc._write('pressure', 'predicted', 'deltaMax', maxDeltaP, pUnit); +} + +module.exports = { distributeFlow, calcMaxDeltaP, listAvailableValves, isValveAvailable, DEFAULT_RECONCILIATION }; diff --git a/src/io/output.js b/src/io/output.js new file mode 100644 index 0000000..6a24c08 --- /dev/null +++ b/src/io/output.js @@ -0,0 +1,47 @@ +'use strict'; + +// getOutput + getStatusBadge composition for valveGroupControl. Keeps +// the orchestrator under its file-size budget. + +const { statusBadge } = require('generalFunctions'); + +function _outputUnitForType(unitPolicy, type) { + switch (String(type || '').toLowerCase()) { + case 'flow': return unitPolicy.output('flow'); + case 'pressure': return unitPolicy.output('pressure'); + default: return null; + } +} + +function getOutput(vgc) { + const out = {}; + const measurements = vgc.measurements; + Object.entries(measurements.measurements || {}).forEach(([type, variants]) => { + Object.entries(variants || {}).forEach(([variant, positions]) => { + const unit = _outputUnitForType(vgc.unitPolicy, type); + Object.keys(positions || {}).forEach((position) => { + const value = measurements.type(type).variant(variant).position(position).getCurrentValue(unit || undefined); + if (value != null) out[`${position}_${variant}_${type}`] = value; + }); + }); + }); + out.mode = vgc.currentMode; + out.maxDeltaP = vgc.maxDeltaP; + return out; +} + +function getStatusBadge(vgc) { + const flowUnit = vgc.unitPolicy.output('flow'); + const measured = vgc.measurements.type('flow').variant('measured').position('atEquipment').getCurrentValue(flowUnit); + const predicted = vgc.measurements.type('flow').variant('predicted').position('atEquipment').getCurrentValue(flowUnit); + const raw = Number.isFinite(measured) ? measured : predicted; + const totalFlow = Number.isFinite(raw) ? Math.round(raw) : 0; + const available = vgc.getAvailableValves(); + const status = available.length > 0 ? `${available.length} valve(s) connected` : 'No valves'; + return statusBadge.text( + `${vgc.currentMode} | flow=${totalFlow} ${flowUnit} | ${status}`, + { fill: available.length > 0 ? 'green' : 'red', shape: 'dot' } + ); +} + +module.exports = { getOutput, getStatusBadge }; diff --git a/src/nodeClass.js b/src/nodeClass.js index cc0f548..d7fb8a6 100644 --- a/src/nodeClass.js +++ b/src/nodeClass.js @@ -1,258 +1,33 @@ -const { outputUtils, configManager, convert } = require("generalFunctions"); -const Specific = require("./specificClass"); +'use strict'; -class nodeClass { - /** - * Create a MeasurementNode. - * @param {object} uiConfig - Node-RED node configuration. - * @param {object} RED - Node-RED runtime API. - * @param {object} nodeInstance - The Node-RED node instance. - * @param {string} nameOfNode - The name of the node, used for - */ - constructor(uiConfig, RED, nodeInstance, nameOfNode) { - // Preserve RED reference for HTTP endpoints if needed - this.node = nodeInstance; // This is the Node-RED node instance, we can use this to send messages and update status - this.RED = RED; // This is the Node-RED runtime API, we can use this to create endpoints if needed - this.name = nameOfNode; // This is the name of the node, it should match the file name and the node type in Node-RED - this.source = null; // Will hold the specific class instance +const { BaseNodeAdapter } = require('generalFunctions'); +const ValveGroupControl = require('./specificClass'); +const commands = require('./commands'); - // Load default & UI config - this._loadConfig(uiConfig, this.node); - this._reconcileIntervalMs = this._resolveReconcileIntervalMs(uiConfig); +// Tick-driven: a periodic reconcile pass re-balances per-valve flow if +// a child's accepted value drifts between event-driven recalcs. +class nodeClass extends BaseNodeAdapter { + static DomainClass = ValveGroupControl; + static commands = commands; + static tickInterval = 1000; + static statusInterval = 1000; - // Instantiate core Measurement class - this._setupSpecificClass(); + buildDomainConfig() { return {}; } - // Wire up event and lifecycle handlers - this._bindEvents(); - this._registerChild(); - this._startTickLoop(); - this._attachInputHandler(); - this._attachCloseHandler(); + extraSetup() { + this.source?.emitter?.on?.('reconcileIntervalChange', (ms) => this._restartTick(ms)); } - /** - * Load and merge default config with user-defined settings. - * @param {object} uiConfig - Raw config from Node-RED UI. - */ - _loadConfig(uiConfig, node) { - const cfgMgr = new configManager(); - this.defaultConfig = cfgMgr.getConfig(this.name); - - // Resolve flow unit with validation before building config - const flowUnit = this._resolveUnitOrFallback(uiConfig.unit, 'volumeFlowRate', 'm3/h', 'flow'); - const resolvedUiConfig = { ...uiConfig, unit: flowUnit }; - - // Build config: base sections (no domain-specific config for group controller) - this.config = cfgMgr.buildConfig(this.name, resolvedUiConfig, node.id); - - // Utility for formatting outputs - this._output = new outputUtils(); - } - - _resolveUnitOrFallback(candidate, expectedMeasure, fallbackUnit, label) { - const raw = typeof candidate === "string" ? candidate.trim() : ""; - const fallback = String(fallbackUnit || "").trim(); - if (!raw) { - return fallback; - } - try { - const desc = convert().describe(raw); - if (expectedMeasure && desc.measure !== expectedMeasure) { - throw new Error(`expected '${expectedMeasure}' but got '${desc.measure}'`); - } - return raw; - } catch (error) { - this.node?.warn?.(`Invalid ${label} unit '${raw}' (${error.message}). Falling back to '${fallback}'.`); - return fallback; - } - } - - _resolveReconcileIntervalMs(uiConfig) { - const raw = Number( - uiConfig?.reconcileIntervalSeconds - ?? uiConfig?.reconcileIntervalSec - ?? uiConfig?.reconcileEverySeconds - ?? 1 - ); - const sec = Number.isFinite(raw) && raw > 0 ? raw : 1; - return Math.max(100, Math.round(sec * 1000)); - } - - _updateNodeStatus() { - const vg = this.source; - const mode = vg.currentMode; - const flowUnit = vg?.unitPolicy?.output?.flow || this.config.general.unit || "m3/h"; - const measuredFlow = vg.measurements.type("flow").variant("measured").position("atEquipment").getCurrentValue(flowUnit); - const predictedFlow = vg.measurements.type("flow").variant("predicted").position("atEquipment").getCurrentValue(flowUnit); - const totalFlowRaw = Number.isFinite(measuredFlow) ? measuredFlow : predictedFlow; - const totalFlow = Number.isFinite(totalFlowRaw) ? Math.round(totalFlowRaw) : 0; - const availableValves = Array.isArray(vg.getAvailableValves?.()) ? vg.getAvailableValves() : []; - - // const totalCapacity = Math.round(vg.dynamicTotals.flow.max * 1) / 1; ADD LATER? - - // Determine overall status based on available valves - const status = - availableValves.length > 0 - ? `${availableValves.length} valve(s) connected` - : "No valves"; - - - // Generate status text in a single line - const text = `${mode} | flow=${totalFlow} ${flowUnit} | ${status}`; - - return { - fill: availableValves.length > 0 ? "green" : "red", - shape: "dot", - text, - }; - } - - /** - * Instantiate the core logic and store as source. - */ - _setupSpecificClass() { - this.source = new Specific(this.config); - this.node.source = this.source; // Store the source in the node instance for easy access - } - - /** - * Bind events to Node-RED status updates. Using internal emitter. --> REMOVE LATER WE NEED ONLY COMPLETE CHILDS AND THEN CHECK FOR UPDATES - */ - _bindEvents() { - - } - - /** - * Register this node as a child upstream and downstream. - * Delayed to avoid Node-RED startup race conditions. - */ - _registerChild() { - setTimeout(() => { - this.node.send([ - null, - null, - { - topic: "registerChild", - payload: this.node.id, - positionVsParent: - this.config?.functionality?.positionVsParent || "atEquipment", - }, - ]); - }, 100); - } - - /** - * Start the periodic tick loop to drive the Measurement class. - */ - _startTickLoop() { - setTimeout(() => { - this._tickInterval = setInterval(() => this._tick(), this._reconcileIntervalMs); - // Update node status on nodered screen every second ( this is not the best way to do this, but it works for now) - this._statusInterval = setInterval(() => { - const status = this._updateNodeStatus(); - this.node.status(status); - }, 1000); - }, 1000); - } - - /** - * Execute a single tick: update measurement, format and send outputs. - */ - _tick() { - if (typeof this.source?.calcValveFlows === 'function') { - this.source.calcValveFlows(); - } - const raw = this.source.getOutput(); - const processMsg = this._output.formatMsg(raw, this.config, "process"); - const influxMsg = this._output.formatMsg(raw, this.config, "influxdb"); - - // Send only updated outputs on ports 0 & 1 - this.node.send([processMsg, influxMsg]); - } - - /** - * Attach the node's input handler, routing control messages to the class. - */ - _attachInputHandler() { - this.node.on( - "input", - async (msg, send, done) => { - const vg = this.source; - const RED = this.RED; - try { - switch (msg.topic) { - case "registerChild": { - const childId = msg.payload; - const childObj = RED.nodes.getNode(childId); - if (!childObj || !childObj.source) { - vg.logger.warn(`registerChild skipped: missing child/source for id=${childId}`); - break; - } - vg.childRegistrationUtils.registerChild(childObj.source, msg.positionVsParent); - break; - } - case 'setMode': - vg.setMode(msg.payload); - break; - case 'setReconcileInterval': { - const nextSec = Number(msg.payload); - if (!Number.isFinite(nextSec) || nextSec <= 0) { - vg.logger.warn(`Invalid reconcile interval payload '${msg.payload}'. Expected seconds > 0.`); - break; - } - this._reconcileIntervalMs = Math.max(100, Math.round(nextSec * 1000)); - clearInterval(this._tickInterval); - this._tickInterval = setInterval(() => this._tick(), this._reconcileIntervalMs); - vg.logger.info(`Flow reconciliation interval updated to ${nextSec}s (${this._reconcileIntervalMs}ms).`); - break; - } - case 'execSequence': { - const { source: seqSource, action: seqAction, parameter } = msg.payload; - vg.handleInput(seqSource, seqAction, parameter); - break; - } - case 'totalFlowChange': { - const payload = msg.payload || {}; - if (payload && typeof payload === "object" && Object.prototype.hasOwnProperty.call(payload, "source")) { - const tfcSource = payload.source || "parent"; - const tfcAction = payload.action || "totalFlowChange"; - vg.handleInput(tfcSource, tfcAction, payload); - } else { - vg.handleInput("parent", "totalFlowChange", payload); - } - break; - } - case 'emergencystop': - case 'emergencyStop': { - const payload = msg.payload || {}; - const esSource = payload.source || "parent"; - vg.handleInput(esSource, "emergencystop"); - break; - } - default: - vg.logger.warn(`Unknown topic: ${msg.topic}`); - break; - } - } catch (error) { - vg.logger.error(`Input handler failure: ${error.message}`); - } - if (typeof done === 'function') done(); - } - ); - } - - /** - * Clean up timers and intervals when Node-RED stops the node. - */ - _attachCloseHandler() { - this.node.on("close", (done) => { - clearInterval(this._tickInterval); - clearInterval(this._statusInterval); - this.source?.destroy?.(); - if (typeof done === 'function') done(); - }); + _restartTick(ms) { + const next = Math.max(100, Math.round(Number(ms) || 0)); + if (!next) return; + if (this._tickInterval) clearInterval(this._tickInterval); + this._tickInterval = setInterval(() => { + try { this.source.tick?.(); } + catch (err) { this.source?.logger?.error?.(`tick threw: ${err.message}`); } + this._emitOutputs(); + }, next); } } -module.exports = nodeClass; // Export the class for Node-RED to use +module.exports = nodeClass; diff --git a/src/sources/fluidContract.js b/src/sources/fluidContract.js new file mode 100644 index 0000000..b8e864d --- /dev/null +++ b/src/sources/fluidContract.js @@ -0,0 +1,186 @@ +'use strict'; + +// Upstream-source registration + fluid-contract reconciliation. +// Sources are non-valve upstream children (rotatingMachine, MGC, PS, …) +// that publish flow events and optionally a service-type contract. +// VGC aggregates their contracts into one group-level view that valves +// can read for compatibility checks. + +const SERVICE_TYPES = new Set(['gas', 'liquid']); +const SOURCE_SOFTWARE_TYPES = new Set([ + 'machine', + 'rotatingmachine', + 'machinegroup', + 'machinegroupcontrol', + 'pumpingstation', + 'valvegroupcontrol', +]); +const SOURCE_FLOW_EVENTS = [ + 'flow.predicted.downstream', + 'flow.predicted.atEquipment', + 'flow.predicted.atequipment', + 'flow.measured.downstream', + 'flow.measured.atEquipment', + 'flow.measured.atequipment', +]; +const DEFAULT_SOURCE_SERVICE_TYPE = Object.freeze({ + machine: 'liquid', + rotatingmachine: 'liquid', + machinegroup: 'liquid', + machinegroupcontrol: 'liquid', + pumpingstation: 'liquid', +}); + +function normalizeServiceType(value) { + const raw = String(value || '').trim().toLowerCase(); + return SERVICE_TYPES.has(raw) ? raw : null; +} + +function isSourceSoftwareType(softwareType) { + return SOURCE_SOFTWARE_TYPES.has(String(softwareType || '').trim().toLowerCase()); +} + +function isSourceLike(child, softwareType) { + if (isSourceSoftwareType(softwareType)) return true; + return typeof child?.getFluidContract === 'function'; +} + +function extractFluidContract(child, softwareType, logger) { + let contract = null; + if (typeof child?.getFluidContract === 'function') { + try { contract = child.getFluidContract(); } + catch (error) { logger?.warn?.(`Failed to read child fluid contract: ${error.message}`); } + } + const status = String(contract?.status || '').trim().toLowerCase(); + if (status === 'conflict') return { status: 'conflict', serviceType: null }; + + const fromContract = normalizeServiceType(contract?.serviceType); + if (fromContract) return { status: 'resolved', serviceType: fromContract }; + + const direct = normalizeServiceType( + child?.serviceType || child?.expectedServiceType || child?.config?.asset?.serviceType + ); + if (direct) return { status: 'resolved', serviceType: direct }; + + const fallback = DEFAULT_SOURCE_SERVICE_TYPE[String(softwareType || '').trim().toLowerCase()] || null; + if (fallback) return { status: 'inferred', serviceType: fallback }; + + return { status: 'unknown', serviceType: null }; +} + +function _diff(prev, next) { + return ( + prev.status !== next.status + || prev.serviceType !== next.serviceType + || prev.sourceCount !== next.sourceCount + || (prev.message || '') !== (next.message || '') + ); +} + +function refreshFluidContract(vgc) { + const contracts = Object.values(vgc.sources).map((s) => s?.fluidContract || null).filter(Boolean); + const serviceTypes = Array.from(new Set( + contracts.map((c) => normalizeServiceType(c.serviceType)).filter(Boolean) + )); + const hasConflict = contracts.some((c) => String(c.status || '').toLowerCase() === 'conflict'); + const sourceCount = Object.keys(vgc.sources).length; + let next; + + if (hasConflict || serviceTypes.length > 1) { + next = { + status: 'conflict', serviceType: null, upstreamServiceTypes: serviceTypes, sourceCount, + message: `Conflicting upstream fluids detected: ${serviceTypes.join(', ') || 'unknown'}.`, + }; + } else if (serviceTypes.length === 1) { + next = { + status: 'resolved', serviceType: serviceTypes[0], upstreamServiceTypes: serviceTypes, sourceCount, + message: `Upstream fluid resolved as ${serviceTypes[0]}.`, + }; + } else { + next = { + status: 'unknown', serviceType: null, upstreamServiceTypes: [], sourceCount, + message: 'No upstream fluid sources registered.', + }; + } + + const prev = vgc.fluidContract || {}; + vgc.fluidContract = next; + if (_diff(prev, next)) vgc.emitter.emit('fluidContractChange', vgc.getFluidContract()); +} + +function registerSource(vgc, child, positionVsParent, softwareType) { + const id = child?.config?.general?.id || child?.config?.general?.name || `source-${Object.keys(vgc.sources).length + 1}`; + if (vgc._sourceListeners.has(id)) unbindSource(vgc, id); + + child.positionVsParent = positionVsParent; + vgc.sources[id] = child; + bindSource(vgc, id, child); + vgc.sources[id].fluidContract = extractFluidContract(child, softwareType, vgc.logger); + refreshFluidContract(vgc); + + vgc.logger.info(`Source '${id}' (${softwareType || 'unknown'}) registered at ${positionVsParent}.`); + return true; +} + +function bindSource(vgc, sourceId, source) { + const listeners = { flow: [], onFluidContractChange: null }; + if (source?.measurements?.emitter?.on) { + SOURCE_FLOW_EVENTS.forEach((eventName) => { + const handler = (eventData = {}) => { + const value = Number(eventData.value); + if (!Number.isFinite(value)) return; + const variant = String(eventName).split('.')[1] === 'measured' ? 'measured' : 'predicted'; + const unit = eventData.unit || vgc.unitPolicy.output('flow'); + vgc.updateFlow(variant, value, 'atEquipment', unit); + }; + source.measurements.emitter.on(eventName, handler); + listeners.flow.push({ emitter: source.measurements.emitter, eventName, handler }); + }); + } + if (source?.emitter?.on) { + listeners.onFluidContractChange = () => { + if (!vgc.sources[sourceId]) return; + vgc.sources[sourceId].fluidContract = extractFluidContract(source, source?.config?.functionality?.softwareType, vgc.logger); + refreshFluidContract(vgc); + }; + source.emitter.on('fluidContractChange', listeners.onFluidContractChange); + } + vgc._sourceListeners.set(sourceId, { source, listeners }); +} + +function unbindSource(vgc, sourceId) { + const entry = vgc._sourceListeners.get(sourceId); + if (!entry) return; + const { source, listeners } = entry; + listeners.flow.forEach(({ emitter, eventName, handler }) => { + if (typeof emitter?.off === 'function') emitter.off(eventName, handler); + else if (typeof emitter?.removeListener === 'function') emitter.removeListener(eventName, handler); + }); + if (listeners.onFluidContractChange) { + if (typeof source?.emitter?.off === 'function') source.emitter.off('fluidContractChange', listeners.onFluidContractChange); + else if (typeof source?.emitter?.removeListener === 'function') source.emitter.removeListener('fluidContractChange', listeners.onFluidContractChange); + } + vgc._sourceListeners.delete(sourceId); +} + +function getFluidContract(vgc) { + const s = vgc.fluidContract || {}; + return { + status: s.status || 'unknown', + serviceType: s.serviceType || null, + upstreamServiceTypes: Array.isArray(s.upstreamServiceTypes) ? [...s.upstreamServiceTypes] : [], + sourceCount: Number(s.sourceCount) || 0, + message: s.message || '', + source: 'valvegroupcontrol', + }; +} + +module.exports = { + isSourceLike, + isSourceSoftwareType, + registerSource, + unbindSource, + refreshFluidContract, + getFluidContract, + SOURCE_SOFTWARE_TYPES, +}; diff --git a/src/specificClass.js b/src/specificClass.js index dd83e46..4fc4432 100644 --- a/src/specificClass.js +++ b/src/specificClass.js @@ -1,120 +1,59 @@ -/** - * @file valveGroupControl.js - */ +'use strict'; -const EventEmitter = require('events'); -const { logger, configUtils, configManager, state, MeasurementContainer, childRegistrationUtils, convert } = require('generalFunctions'); +// ValveGroupControl — S88 Unit orchestrator coordinating valve children. +// Concern modules under src/{groupOps,sources,io,commands} carry the +// real work; this file stitches them together: registration, valve event +// routing, source fluid-contract aggregation, mode/sequence dispatch. -const CANONICAL_UNITS = Object.freeze({ - pressure: 'Pa', - flow: 'm3/s', -}); - -const DEFAULT_IO_UNITS = Object.freeze({ - pressure: 'mbar', - flow: 'm3/h', -}); +const { BaseDomain, UnitPolicy, state } = require('generalFunctions'); +const flowDist = require('./groupOps/flowDistribution'); +const sources = require('./sources/fluidContract'); +const io = require('./io/output'); const KNOWN_POSITIONS = new Set(['upstream', 'downstream', 'atEquipment']); -const SERVICE_TYPES = new Set(['gas', 'liquid']); -const SOURCE_SOFTWARE_TYPES = new Set([ - 'machine', - 'rotatingmachine', - 'machinegroup', - 'machinegroupcontrol', - 'pumpingstation', - 'valvegroupcontrol', -]); -const SOURCE_FLOW_EVENTS = [ - 'flow.predicted.downstream', - 'flow.predicted.atEquipment', - 'flow.predicted.atequipment', - 'flow.measured.downstream', - 'flow.measured.atEquipment', - 'flow.measured.atequipment', -]; -const DEFAULT_SOURCE_SERVICE_TYPE = Object.freeze({ - machine: 'liquid', - rotatingmachine: 'liquid', - machinegroup: 'liquid', - machinegroupcontrol: 'liquid', - pumpingstation: 'liquid', -}); -const DEFAULT_FLOW_RECONCILIATION = Object.freeze({ - maxPasses: 2, - residualTolerance: 0.001, -}); -class ValveGroupControl { - constructor(valveGroupControlConfig = {}) { - this.emitter = new EventEmitter(); - this.configManager = new configManager(); - this.defaultConfig = this.configManager.getConfig('valveGroupControl'); - this.configUtils = new configUtils(this.defaultConfig); - this.config = this.configUtils.initConfig(valveGroupControlConfig); - this.unitPolicy = this._buildUnitPolicy(this.config); +class ValveGroupControl extends BaseDomain { + static name = 'valveGroupControl'; + + static unitPolicy = UnitPolicy.declare({ + canonical: { flow: 'm3/s', pressure: 'Pa' }, + output: { flow: 'm3/h', pressure: 'mbar' }, + requireUnitForTypes: ['pressure', 'flow'], + }); + + configure() { this.config = this.configUtils.updateConfig(this.config, { - general: { unit: this.unitPolicy.output.flow }, + general: { unit: this.unitPolicy.output('flow') }, }); - this.logger = new logger(this.config.general.logging.enabled, this.config.general.logging.logLevel, this.config.general.name); - - this.measurements = new MeasurementContainer({ - autoConvert: true, - defaultUnits: { - pressure: this.unitPolicy.output.pressure, - flow: this.unitPolicy.output.flow, - }, - preferredUnits: { - pressure: this.unitPolicy.output.pressure, - flow: this.unitPolicy.output.flow, - }, - canonicalUnits: this.unitPolicy.canonical, - storeCanonical: true, - strictUnitValidation: true, - throwOnInvalidUnit: true, - requireUnitForTypes: ['pressure', 'flow'], - }, this.logger); - - this.child = {}; this.valves = {}; this._valveListeners = new Map(); this.sources = {}; this._sourceListeners = new Map(); this.fluidContract = { - status: 'unknown', - serviceType: null, - upstreamServiceTypes: [], - sourceCount: 0, - message: 'No upstream fluid sources registered.', + status: 'unknown', serviceType: null, upstreamServiceTypes: [], + sourceCount: 0, message: 'No upstream fluid sources registered.', }; - this.flowReconciliation = { ...DEFAULT_FLOW_RECONCILIATION }; - this.lastFlowSolve = { - passes: 0, - residual: 0, - targetTotal: 0, - assignedTotal: 0, - }; - + this.flowReconciliation = { ...flowDist.DEFAULT_RECONCILIATION }; + this.lastFlowSolve = { passes: 0, residual: 0, targetTotal: 0, assignedTotal: 0 }; this.maxDeltaP = 0; this.currentMode = this.config.mode.current; - this.childRegistrationUtils = new childRegistrationUtils(this); this.state = new state({}, this.logger); this.state.stateManager.currentState = 'operational'; + + // Overloaded API: `(child, softwareTypeOrPosition)`. Tests + legacy + // childRegistrationUtils both invoke this with a string second arg, + // which may be either a known position or a softwareType. Resolve + // before dispatching so router-based registration keeps working. + this.registerChild = (child, positionOrType) => this._registerChild(child, positionOrType); } - registerOnChildEvents() {} - - _resolveRegistrationContext(child, positionVsParentOrSoftwareType) { - const fromArg = String(positionVsParentOrSoftwareType || '').trim(); + _resolveRegistrationContext(child, arg) { + const fromArg = String(arg || '').trim(); if (KNOWN_POSITIONS.has(fromArg)) { - return { - positionVsParent: fromArg, - softwareType: child?.config?.functionality?.softwareType || null, - }; + return { positionVsParent: fromArg, softwareType: child?.config?.functionality?.softwareType || null }; } - return { positionVsParent: child?.positionVsParent || 'atEquipment', softwareType: fromArg || child?.config?.functionality?.softwareType || null, @@ -125,30 +64,20 @@ class ValveGroupControl { return Boolean( child && typeof child.updateFlow === 'function' - && child.state - && typeof child.state.getCurrentState === 'function' + && child.state && typeof child.state.getCurrentState === 'function' && child.measurements ); } - _isSourceLike(child, softwareType) { - const type = String(softwareType || child?.config?.functionality?.softwareType || '').trim().toLowerCase(); - if (SOURCE_SOFTWARE_TYPES.has(type)) { - return true; - } - return typeof child?.getFluidContract === 'function'; - } - - registerChild(child, positionVsParentOrSoftwareType) { - const ctx = this._resolveRegistrationContext(child, positionVsParentOrSoftwareType); + _registerChild(child, positionOrType) { + const ctx = this._resolveRegistrationContext(child, positionOrType); const softwareType = String(ctx.softwareType || child?.config?.functionality?.softwareType || '').trim().toLowerCase(); if (softwareType === 'valve' || (!softwareType && this._isValveLike(child))) { return this._registerValve(child, ctx.positionVsParent); } - - if (this._isSourceLike(child, softwareType)) { - return this._registerSource(child, ctx.positionVsParent, softwareType); + if (sources.isSourceLike(child, softwareType)) { + return sources.registerSource(this, child, ctx.positionVsParent, softwareType); } this.logger.warn(`registerChild skipped: unsupported child type '${softwareType || 'unknown'}'`); @@ -165,314 +94,144 @@ class ValveGroupControl { this.logger.debug(`registerChild skipped: valve ${id} already registered`); return true; } - child.positionVsParent = positionVsParent; this.valves[id] = child; this._bindValveEvents(id, child); - this.calcValveFlows(); this.calcMaxDeltaP(); - this._refreshFluidContract(); + sources.refreshFluidContract(this); this.logger.info(`Valve '${id}' registered at ${positionVsParent}.`); return true; } - _registerSource(child, positionVsParent, softwareType) { - const id = child?.config?.general?.id || child?.config?.general?.name || `source-${Object.keys(this.sources).length + 1}`; - if (this._sourceListeners.has(id)) { - this._unbindSourceEvents(id); - } - child.positionVsParent = positionVsParent; - this.sources[id] = child; - - this._bindSourceEvents(id, child); - const contract = this._extractFluidContractFromChild(child, softwareType); - this.sources[id].fluidContract = contract; - this._refreshFluidContract(); - - this.logger.info(`Source '${id}' (${softwareType || 'unknown'}) registered at ${positionVsParent}.`); - return true; - } - _bindValveEvents(valveId, valve) { const handlers = { - onPositionChange: () => { - this.logger.debug(`Valve ${valveId} position changed, recalculating flows.`); - this.calcValveFlows(); - }, - onDeltaPChange: () => { - this.logger.debug(`Valve ${valveId} deltaP changed, recalculating max deltaP.`); - this.calcMaxDeltaP(); - }, + onPositionChange: () => { this.logger.debug(`Valve ${valveId} position changed, recalculating flows.`); this.calcValveFlows(); }, + onDeltaPChange: () => { this.logger.debug(`Valve ${valveId} deltaP changed, recalculating max deltaP.`); this.calcMaxDeltaP(); }, }; - - if (valve.state?.emitter?.on) { - valve.state.emitter.on('positionChange', handlers.onPositionChange); - } - if (valve.emitter?.on) { - valve.emitter.on('deltaPChange', handlers.onDeltaPChange); - } - + if (valve.state?.emitter?.on) valve.state.emitter.on('positionChange', handlers.onPositionChange); + if (valve.emitter?.on) valve.emitter.on('deltaPChange', handlers.onDeltaPChange); this._valveListeners.set(valveId, { valve, handlers }); } _unbindValveEvents(valveId) { - const listener = this._valveListeners.get(valveId); - if (!listener) { - return; - } - const { valve, handlers } = listener; - if (handlers.onPositionChange && valve.state?.emitter?.off) { - valve.state.emitter.off('positionChange', handlers.onPositionChange); - } - if (handlers.onDeltaPChange && valve.emitter?.off) { - valve.emitter.off('deltaPChange', handlers.onDeltaPChange); - } + const entry = this._valveListeners.get(valveId); + if (!entry) return; + const { valve, handlers } = entry; + if (handlers.onPositionChange && valve.state?.emitter?.off) valve.state.emitter.off('positionChange', handlers.onPositionChange); + if (handlers.onDeltaPChange && valve.emitter?.off) valve.emitter.off('deltaPChange', handlers.onDeltaPChange); this._valveListeners.delete(valveId); } - _bindSourceEvents(sourceId, source) { - const listeners = { - flow: [], - onFluidContractChange: null, - }; - if (source?.measurements?.emitter?.on) { - SOURCE_FLOW_EVENTS.forEach((eventName) => { - const handler = (eventData = {}) => { - this._handleSourceFlowEvent(eventName, eventData); - }; - source.measurements.emitter.on(eventName, handler); - listeners.flow.push({ - emitter: source.measurements.emitter, - eventName, - handler, - }); - }); - } - - if (source?.emitter?.on) { - listeners.onFluidContractChange = () => { - const contract = this._extractFluidContractFromChild( - source, - source?.config?.functionality?.softwareType - ); - if (!this.sources[sourceId]) { - return; - } - this.sources[sourceId].fluidContract = contract; - this._refreshFluidContract(); - }; - source.emitter.on('fluidContractChange', listeners.onFluidContractChange); - } - - this._sourceListeners.set(sourceId, { source, listeners }); - } - - _unbindSourceEvents(sourceId) { - const listener = this._sourceListeners.get(sourceId); - if (!listener) { - return; - } - - const { source, listeners } = listener; - listeners.flow.forEach(({ emitter, eventName, handler }) => { - if (typeof emitter?.off === 'function') { - emitter.off(eventName, handler); - } else if (typeof emitter?.removeListener === 'function') { - emitter.removeListener(eventName, handler); - } - }); - - if (listeners.onFluidContractChange) { - if (typeof source?.emitter?.off === 'function') { - source.emitter.off('fluidContractChange', listeners.onFluidContractChange); - } else if (typeof source?.emitter?.removeListener === 'function') { - source.emitter.removeListener('fluidContractChange', listeners.onFluidContractChange); - } - } - - this._sourceListeners.delete(sourceId); - } - - _handleSourceFlowEvent(eventName, eventData = {}) { - const value = Number(eventData.value); - if (!Number.isFinite(value)) { - return; - } - const eventParts = String(eventName || '').split('.'); - const variant = eventParts[1] === 'measured' ? 'measured' : 'predicted'; - const unit = eventData.unit || this.unitPolicy.output.flow; - this.updateFlow(variant, value, 'atEquipment', unit); - } - - _normalizeOptionalServiceType(value) { - const raw = String(value || '').trim().toLowerCase(); - if (SERVICE_TYPES.has(raw)) { - return raw; - } - return null; - } - - _deriveDefaultServiceTypeForSoftwareType(softwareType) { - const key = String(softwareType || '').trim().toLowerCase(); - return DEFAULT_SOURCE_SERVICE_TYPE[key] || null; - } - - _extractFluidContractFromChild(child, softwareType) { - let contract = null; - if (typeof child?.getFluidContract === 'function') { - try { - contract = child.getFluidContract(); - } catch (error) { - this.logger.warn(`Failed to read child fluid contract: ${error.message}`); - } - } - const contractStatus = String(contract?.status || '').trim().toLowerCase(); - if (contractStatus === 'conflict') { - return { status: 'conflict', serviceType: null }; - } - - const serviceTypeFromContract = this._normalizeOptionalServiceType(contract?.serviceType); - if (serviceTypeFromContract) { - return { status: 'resolved', serviceType: serviceTypeFromContract }; - } - - const directType = this._normalizeOptionalServiceType( - child?.serviceType - || child?.expectedServiceType - || child?.config?.asset?.serviceType - ); - if (directType) { - return { status: 'resolved', serviceType: directType }; - } - - const fallbackType = this._deriveDefaultServiceTypeForSoftwareType(softwareType); - if (fallbackType) { - return { status: 'inferred', serviceType: fallbackType }; - } - - return { status: 'unknown', serviceType: null }; - } - - _refreshFluidContract() { - const contracts = Object.values(this.sources) - .map((source) => source?.fluidContract || null) - .filter(Boolean); - const serviceTypes = Array.from(new Set( - contracts - .map((contract) => this._normalizeOptionalServiceType(contract.serviceType)) - .filter(Boolean) - )); - const hasConflict = contracts.some((contract) => String(contract.status || '').toLowerCase() === 'conflict'); - let next = null; - - if (hasConflict || serviceTypes.length > 1) { - next = { - status: 'conflict', - serviceType: null, - upstreamServiceTypes: serviceTypes, - sourceCount: Object.keys(this.sources).length, - message: `Conflicting upstream fluids detected: ${serviceTypes.join(', ') || 'unknown'}.`, - }; - } else if (serviceTypes.length === 1) { - next = { - status: 'resolved', - serviceType: serviceTypes[0], - upstreamServiceTypes: serviceTypes, - sourceCount: Object.keys(this.sources).length, - message: `Upstream fluid resolved as ${serviceTypes[0]}.`, - }; - } else { - next = { - status: 'unknown', - serviceType: null, - upstreamServiceTypes: [], - sourceCount: Object.keys(this.sources).length, - message: 'No upstream fluid sources registered.', - }; - } - - const prev = this.fluidContract || {}; - const changed = ( - prev.status !== next.status - || prev.serviceType !== next.serviceType - || prev.sourceCount !== next.sourceCount - || (prev.message || '') !== (next.message || '') - ); - this.fluidContract = next; - if (changed) { - this.emitter.emit('fluidContractChange', this.getFluidContract()); - } - } - - getFluidContract() { - const state = this.fluidContract || {}; - return { - status: state.status || 'unknown', - serviceType: state.serviceType || null, - upstreamServiceTypes: Array.isArray(state.upstreamServiceTypes) ? [...state.upstreamServiceTypes] : [], - sourceCount: Number(state.sourceCount) || 0, - message: state.message || '', - source: 'valvegroupcontrol', - }; - } + registerOnChildEvents() {} destroy() { - for (const valveId of this._valveListeners.keys()) { - this._unbindValveEvents(valveId); - } - for (const sourceId of this._sourceListeners.keys()) { - this._unbindSourceEvents(sourceId); + for (const id of this._valveListeners.keys()) this._unbindValveEvents(id); + for (const id of this._sourceListeners.keys()) sources.unbindSource(this, id); + } + + // ── measurement read/write helpers used by concern modules ───────── + _outputUnitForType(type) { + switch (String(type || '').toLowerCase()) { + case 'flow': return this.unitPolicy.output('flow'); + case 'pressure': return this.unitPolicy.output('pressure'); + default: return null; } } - _isValveAvailable(valve) { - const currentState = valve.state.getCurrentState(); - const mode = valve.currentMode; - const kv = Number(valve.kv); - return ( - currentState !== 'off' - && currentState !== 'maintenance' - && mode !== 'maintenance' - && Number.isFinite(kv) - && kv > 0 - ); + _read(type, variant, position, unit = null) { + const u = unit || this._outputUnitForType(type); + return this.measurements.type(type).variant(variant).position(position).getCurrentValue(u || undefined); } - getAvailableValves() { - return Object.entries(this.valves) - .filter(([, valve]) => this._isValveAvailable(valve)) - .map(([id, valve]) => ({ id, valve })); + _write(type, variant, position, value, unit = null, timestamp = Date.now()) { + const v = Number(value); + if (!Number.isFinite(v)) return; + this.measurements.type(type).variant(variant).position(position).value(v, timestamp, unit || undefined); } + // ── public surface used by adapter, tests, commands, valves ──────── + getAvailableValves() { return flowDist.listAvailableValves(this.valves); } + calcValveFlows() { flowDist.distributeFlow(this); this.notifyOutputChanged(); } + calcMaxDeltaP() { flowDist.calcMaxDeltaP(this); } + getFluidContract() { return sources.getFluidContract(this); } + isValidSourceForMode(source, mode) { const allowedSourcesSet = this.config.mode.allowedSources[mode] || []; return allowedSourcesSet.has(source); } + setMode(newMode) { + const availableModes = Array.isArray(this.defaultConfig?.mode?.current?.rules?.values) + ? this.defaultConfig.mode.current.rules.values.map((m) => m.value) + : Object.keys(this.config?.mode?.allowedSources || {}); + if (!availableModes.includes(newMode)) { + this.logger.warn(`Invalid mode '${newMode}'. Allowed modes are: ${availableModes.join(', ')}`); + return; + } + this.currentMode = newMode; + this.logger.info(`Mode successfully changed to '${newMode}'.`); + this.notifyOutputChanged(); + } + + async executeSequence(sequenceName) { + const sequence = this.config.sequences[sequenceName]; + if (!sequence || sequence.size === 0) { + this.logger.warn(`Sequence '${sequenceName}' not defined.`); + return; + } + this.logger.info(` --------- Executing sequence: ${sequenceName} -------------`); + for (const stateName of sequence) { + try { await this.state.transitionToState(stateName); } + catch (error) { this.logger.error(`Error during sequence '${sequenceName}': ${error}`); break; } + } + } + + updateFlow(variant, value, position, unit = this.unitPolicy.output('flow')) { + if (value === null || value === undefined) { + this.logger.warn(`Received null or undefined value for flow update. Variant: ${variant}, Position: ${position}`); + return; + } + if (variant !== 'measured' && variant !== 'predicted') { + this.logger.warn(`Unrecognized variant '${variant}' for flow update.`); + return; + } + this.logger.debug(`Updating ${variant} flow for position ${position} with value ${value}`); + this._write('flow', variant, position, value, unit); + this.calcValveFlows(); + } + + updateMeasurement(variant, subType, value, position, unit) { + this.logger.debug(`---------------------- updating ${subType} ------------------ `); + if (subType === 'flow') { + this.updateFlow(variant, value, position, unit || this.unitPolicy.output('flow')); + return; + } + this.logger.error(`Type '${subType}' not recognized for measured update.`); + } + async handleInput(source, action, parameter) { if (!this.isValidSourceForMode(source, this.currentMode)) { const warningTxt = `Source '${source}' is not valid for mode '${this.currentMode}'.`; this.logger.warn(warningTxt); return { status: false, feedback: warningTxt }; } - this.logger.info(`Handling input from source '${source}' with action '${action}' in mode '${this.currentMode}'.`); try { + const flowUnit = this.unitPolicy.output('flow'); switch (action) { case 'execSequence': await this.executeSequence(parameter); break; - case 'totalFlowChange': { + case 'totalFlowChange': if (parameter && typeof parameter === 'object' && Object.prototype.hasOwnProperty.call(parameter, 'value')) { - await this.updateFlow(parameter.variant || 'measured', parameter.value, parameter.position || 'atEquipment', parameter.unit || this.unitPolicy.output.flow); + await this.updateFlow(parameter.variant || 'measured', parameter.value, parameter.position || 'atEquipment', parameter.unit || flowUnit); } else if (parameter && typeof parameter === 'object' && Object.prototype.hasOwnProperty.call(parameter, 'q')) { - await this.updateFlow('measured', Number(parameter.q), 'atEquipment', parameter.unit || this.unitPolicy.output.flow); + await this.updateFlow('measured', Number(parameter.q), 'atEquipment', parameter.unit || flowUnit); } else { - await this.updateFlow('measured', Number(parameter), 'atEquipment', this.unitPolicy.output.flow); + await this.updateFlow('measured', Number(parameter), 'atEquipment', flowUnit); } break; - } case 'emergencyStop': case 'emergencystop': this.logger.warn(`Emergency stop activated by '${source}'.`); @@ -493,288 +252,19 @@ class ValveGroupControl { } } - setMode(newMode) { - const availableModes = Array.isArray(this.defaultConfig?.mode?.current?.rules?.values) - ? this.defaultConfig.mode.current.rules.values.map((vgc) => vgc.value) - : Object.keys(this.config?.mode?.allowedSources || {}); - if (!availableModes.includes(newMode)) { - this.logger.warn(`Invalid mode '${newMode}'. Allowed modes are: ${availableModes.join(', ')}`); - return; - } - - this.currentMode = newMode; - this.logger.info(`Mode successfully changed to '${newMode}'.`); + setReconcileIntervalSeconds(sec) { + const ms = Math.max(100, Math.round(Number(sec) * 1000)); + this.emitter.emit('reconcileIntervalChange', ms); + this.logger.info(`Flow reconciliation interval updated to ${sec}s (${ms}ms).`); } - _buildUnitPolicy(config = {}) { - const flowUnit = this._resolveUnitOrFallback( - config?.general?.unit, - 'volumeFlowRate', - DEFAULT_IO_UNITS.flow - ); + // Periodic reconciliation — adapter fires this each tickInterval. Keeps + // per-valve assigned flow in sync if a child's accepted value drifts + // between event-driven recalcs. + tick() { this.calcValveFlows(); } - return { - canonical: { ...CANONICAL_UNITS }, - output: { - flow: flowUnit, - pressure: DEFAULT_IO_UNITS.pressure, - }, - }; - } - - _resolveUnitOrFallback(candidate, expectedMeasure, fallbackUnit) { - const fallback = String(fallbackUnit || '').trim(); - const raw = typeof candidate === 'string' ? candidate.trim() : ''; - if (!raw) { - return fallback; - } - try { - const desc = convert().describe(raw); - if (expectedMeasure && desc.measure !== expectedMeasure) { - throw new Error(`expected '${expectedMeasure}', got '${desc.measure}'`); - } - return raw; - } catch (error) { - this.logger?.warn?.(`Invalid unit '${raw}' (${error.message}); falling back to '${fallback}'.`); - return fallback; - } - } - - _outputUnitForType(type) { - switch (String(type || '').toLowerCase()) { - case 'flow': - return this.unitPolicy.output.flow; - case 'pressure': - return this.unitPolicy.output.pressure; - default: - return null; - } - } - - _readMeasurement(type, variant, position, unit = null) { - const requestedUnit = unit || this._outputUnitForType(type); - return this.measurements - .type(type) - .variant(variant) - .position(position) - .getCurrentValue(requestedUnit || undefined); - } - - _writeMeasurement(type, variant, position, value, unit = null, timestamp = Date.now()) { - const valueNum = Number(value); - if (!Number.isFinite(valueNum)) { - return; - } - this.measurements - .type(type) - .variant(variant) - .position(position) - .value(valueNum, timestamp, unit || undefined); - } - - async executeSequence(sequenceName) { - const sequence = this.config.sequences[sequenceName]; - - if (!sequence || sequence.size === 0) { - this.logger.warn(`Sequence '${sequenceName}' not defined.`); - return; - } - - this.logger.info(` --------- Executing sequence: ${sequenceName} -------------`); - - for (const stateName of sequence) { - try { - await this.state.transitionToState(stateName); - } catch (error) { - this.logger.error(`Error during sequence '${sequenceName}': ${error}`); - break; - } - } - } - - updateFlow(variant, value, position, unit = this.unitPolicy.output.flow) { - if (value === null || value === undefined) { - this.logger.warn(`Received null or undefined value for flow update. Variant: ${variant}, Position: ${position}`); - return; - } - - switch (variant) { - case 'measured': - this.logger.debug(`Updating measured flow for position ${position} with value ${value}`); - this._writeMeasurement('flow', 'measured', position, value, unit); - this.calcValveFlows(); - break; - - case 'predicted': - this.logger.debug(`Updating predicted flow for position ${position} with value ${value}`); - this._writeMeasurement('flow', 'predicted', position, value, unit); - this.calcValveFlows(); - break; - - default: - this.logger.warn(`Unrecognized variant '${variant}' for flow update.`); - break; - } - } - - updateMeasurement(variant, subType, value, position, unit) { - this.logger.debug(`---------------------- updating ${subType} ------------------ `); - switch (subType) { - case 'flow': - this.updateFlow(variant, value, position, unit || this.unitPolicy.output.flow); - break; - default: - this.logger.error(`Type '${subType}' not recognized for measured update.`); - break; - } - } - - calcValveFlows() { - const totalFlowMeasured = this._readMeasurement('flow', 'measured', 'atEquipment', this.unitPolicy.output.flow); - const totalFlowPredicted = this._readMeasurement('flow', 'predicted', 'atEquipment', this.unitPolicy.output.flow); - const totalFlow = Number.isFinite(totalFlowMeasured) ? totalFlowMeasured : totalFlowPredicted; - - if (!Number.isFinite(totalFlow)) { - return; - } - - const availableEntries = this.getAvailableValves(); - const availableIds = new Set(availableEntries.map((entry) => entry.id)); - const totalKv = availableEntries.reduce((sum, { valve }) => sum + Number(valve.kv), 0); - - if (!availableEntries.length || !Number.isFinite(totalKv) || totalKv <= 0) { - this.logger.warn('No available valves with valid Kv, setting assigned flow to 0.'); - for (const valve of Object.values(this.valves)) { - valve.updateFlow('predicted', 0, 'downstream', this.unitPolicy.output.flow); - } - this._writeMeasurement('flow', 'predicted', 'atEquipment', 0, this.unitPolicy.output.flow); - this.lastFlowSolve = { - passes: 0, - residual: Number(totalFlow) || 0, - targetTotal: Number(totalFlow) || 0, - assignedTotal: 0, - }; - return; - } - - const solve = this._solveFlowDistribution(totalFlow, availableEntries); - let assignedTotal = 0; - for (const [id, valve] of Object.entries(this.valves)) { - const flow = availableIds.has(id) ? (solve.flowsById[id] || 0) : 0; - valve.updateFlow('predicted', flow, 'downstream', this.unitPolicy.output.flow); - assignedTotal += flow; - } - - this._writeMeasurement('flow', 'predicted', 'atEquipment', assignedTotal, this.unitPolicy.output.flow); - this.lastFlowSolve = { - passes: solve.passes, - residual: solve.residual, - targetTotal: totalFlow, - assignedTotal, - }; - this.calcMaxDeltaP(); - } - - _readValveAcceptedFlow(valve) { - const accepted = Number( - valve?.measurements - ?.type('flow') - ?.variant('predicted') - ?.position('downstream') - ?.getCurrentValue(this.unitPolicy.output.flow) - ); - return Number.isFinite(accepted) ? accepted : null; - } - - _solveFlowDistribution(totalFlow, availableEntries) { - const totalKv = availableEntries.reduce((sum, { valve }) => sum + Number(valve.kv), 0); - if (!Number.isFinite(totalKv) || totalKv <= 0) { - return { flowsById: {}, residual: Number(totalFlow) || 0, passes: 0 }; - } - - const targetById = {}; - availableEntries.forEach(({ id }) => { - targetById[id] = 0; - }); - - let residual = Number(totalFlow); - let passes = 0; - const maxPasses = Math.max(1, Number(this.flowReconciliation?.maxPasses) || DEFAULT_FLOW_RECONCILIATION.maxPasses); - const tolerance = Math.max(0, Number(this.flowReconciliation?.residualTolerance) || DEFAULT_FLOW_RECONCILIATION.residualTolerance); - - while (passes < maxPasses && Number.isFinite(residual) && Math.abs(residual) > tolerance) { - availableEntries.forEach(({ id, valve }) => { - const kv = Number(valve.kv); - const share = (kv / totalKv) * residual; - const nextTarget = Number(targetById[id]) + share; - targetById[id] = nextTarget; - valve.updateFlow('predicted', nextTarget, 'downstream', this.unitPolicy.output.flow); - }); - - let acceptedTotal = 0; - availableEntries.forEach(({ id, valve }) => { - const accepted = this._readValveAcceptedFlow(valve); - if (Number.isFinite(accepted)) { - targetById[id] = accepted; - acceptedTotal += accepted; - return; - } - acceptedTotal += Number(targetById[id]) || 0; - }); - - residual = Number(totalFlow) - acceptedTotal; - passes += 1; - } - - return { - flowsById: targetById, - residual: Number.isFinite(residual) ? residual : 0, - passes, - }; - } - - calcMaxDeltaP() { - let maxDeltaP = 0; - for (const [id, valve] of Object.entries(this.valves)) { - const deltaP = Number( - valve.measurements - .type('pressure') - .variant('predicted') - .position('delta') - .getCurrentValue(this.unitPolicy.output.pressure) - ); - - if (!Number.isFinite(deltaP)) { - continue; - } - this.logger.debug(`Delta P for valve ${id}: ${deltaP}`); - if (deltaP > maxDeltaP) { - maxDeltaP = deltaP; - } - } - - this.maxDeltaP = maxDeltaP; - this._writeMeasurement('pressure', 'predicted', 'deltaMax', maxDeltaP, this.unitPolicy.output.pressure); - } - - getOutput() { - const output = {}; - Object.entries(this.measurements.measurements || {}).forEach(([type, variants]) => { - Object.entries(variants || {}).forEach(([variant, positions]) => { - Object.keys(positions || {}).forEach((position) => { - const value = this._readMeasurement(type, variant, position, this._outputUnitForType(type)); - if (value != null) { - output[`${position}_${variant}_${type}`] = value; - } - }); - }); - }); - - output.mode = this.currentMode; - output.maxDeltaP = this.maxDeltaP; - - return output; - } + getOutput() { return io.getOutput(this); } + getStatusBadge() { return io.getStatusBadge(this); } } module.exports = ValveGroupControl;