From e27135bdc441c86b73ff3c6aa665bf8bb12b7bc5 Mon Sep 17 00:00:00 2001 From: znetsixe Date: Sun, 10 May 2026 22:09:22 +0200 Subject: [PATCH] P6: convert valve to BaseDomain + BaseNodeAdapter + concern split Refactor of valve to use the platform infrastructure (BaseDomain, BaseNodeAdapter, ChildRouter, commandRegistry, statusBadge). Extracts concerns into focused modules per .claude/refactor/MODULE_SPLIT.md generic template. Tests stay green; CONTRACT.md generated; legacy aliases preserved. Co-Authored-By: Claude Opus 4.7 (1M context) --- CONTRACT.md | 95 +++ src/commands/handlers.js | 66 ++ src/commands/index.js | 20 + src/curve/supplierCurve.js | 95 +++ src/flow/flowController.js | 84 +++ src/fluid/fluidCompatibility.js | 201 +++++ src/io/output.js | 73 ++ src/measurement/measurementRouter.js | 120 +++ src/nodeClass.js | 370 +-------- src/specificClass.js | 1037 +++++--------------------- src/state/stateBindings.js | 17 + 11 files changed, 984 insertions(+), 1194 deletions(-) create mode 100644 CONTRACT.md create mode 100644 src/commands/handlers.js create mode 100644 src/commands/index.js create mode 100644 src/curve/supplierCurve.js create mode 100644 src/flow/flowController.js create mode 100644 src/fluid/fluidCompatibility.js create mode 100644 src/io/output.js create mode 100644 src/measurement/measurementRouter.js create mode 100644 src/state/stateBindings.js diff --git a/CONTRACT.md b/CONTRACT.md new file mode 100644 index 0000000..c21f989 --- /dev/null +++ b/CONTRACT.md @@ -0,0 +1,95 @@ +# valve — Contract + +Generated from `src/commands/index.js` (canonical topic + alias list) plus +the hand-written events section. Keep ≤ 100 lines. + +## Inputs (msg.topic on Port 0) + +| Canonical | Aliases (deprecated) | Payload | Effect | +|---|---|---|---| +| `set.mode` | `setMode` | `string` — one of the allowed mode names | Calls `source.setMode(payload)`. Invalid mode logs `warn` and is dropped. | +| `cmd.startup` | — | `{ source?: string }` | Calls `source.handleInput(payload.source ?? 'parent', 'execSequence', 'startup')`. | +| `cmd.shutdown` | — | `{ source?: string }` | Calls `source.handleInput(payload.source ?? 'parent', 'execSequence', 'shutdown')`. Pre-shutdown the valve ramps to position 0 if currently operational. | +| `cmd.estop` | `emergencystop`, `emergencyStop` | `{ source?: string, action?: string }` | Calls `source.handleInput(payload.source ?? 'parent', payload.action ?? 'emergencystop')`. | +| `execSequence` | — (legacy umbrella) | `{ source, action, parameter }` with `action ∈ {'startup','shutdown','emergencyStop','emergencystop'}` | Content-based router: forwards to canonical `cmd.startup` / `cmd.shutdown` / `cmd.estop` based on `payload.action`. Unknown action logs `warn`. Prefer the canonical `cmd.*` topics. | +| `set.position` | `execMovement` | `{ source, action, setpoint }` — setpoint coerced to `Number`; valve position percent in `[0, 100]` | Calls `source.handleInput(payload.source ?? 'parent', payload.action ?? 'execMovement', Number(payload.setpoint))`. | +| `data.flow` | `updateFlow` | `{ variant, value, position, unit? }` — `variant ∈ {'measured','predicted'}` | Pushes a flow value into the measurement container at `` and triggers a deltaP recompute through the hydraulic model. | +| `query.curve` | `showcurve` | none | Calls `source.showCurve()` and replies on **Port 0** with `{ topic: 'Showing curve', payload: }` via `ctx.send`. | +| `child.register` | `registerChild` | `string` — child Node-RED id; `msg.positionVsParent` carries position | Resolves child via `RED.nodes.getNode(payload)` and registers it through `childRegistrationUtils.registerChild(child.source, msg.positionVsParent)`. The valve's `registerChild` records the child for fluid-contract tracking. | + +Aliases log a one-time deprecation warning the first time they fire. + +### `execSequence` demux + +The pre-refactor topic `execSequence` carried `{ source, action, parameter }` +where `action` selected the verb. The command registry does not natively +dispatch by payload content, so `execSequence` keeps its own descriptor +whose handler forwards directly to the canonical `cmd.startup` / +`cmd.shutdown` / `cmd.estop` handler based on `payload.action`. The +deprecation warning fires once. Future-Phase-7 removal of `execSequence` +is a behavioural change — callers must migrate to the canonical topics. + +## Outputs (msg.topic on Port 0/1/2) + +- **Port 0 (process):** `msg.topic = config.general.name`. Payload built by + `outputUtils.formatMsg(..., 'process')` from `getOutput()` — delta-compressed + (only changed fields are emitted). On `query.curve` the node additionally + emits `{ topic: 'Showing curve', payload: }` as a synchronous + reply on Port 0. +- **Port 1 (InfluxDB telemetry):** same shape as Port 0, formatted with the + `'influxdb'` formatter. +- **Port 2 (registration):** at startup the node sends one + `{ topic: 'child.register', payload: , positionVsParent, distance }` + to its upstream parent (typically a `valveGroupControl`). + `positionVsParent` defaults to `'atEquipment'`. + +`getOutput()` keys per tick include: `__` slots +from the measurement container (e.g. `delta_predicted_pressure`, +`downstream_measured_flow`), plus `state`, `percentageOpen`, `moveTimeleft`, +`mode`. + +## Events emitted by `source.emitter` + +- `deltaPChange` — fires whenever the hydraulic model recomputes a finite + deltaP. Data: the deltaP value in `unitPolicy.output.pressure` (default + `mbar`). Consumed by `valveGroupControl` to update group totals. +- `fluidCompatibilityChange` — fires when the upstream fluid-contract + status changes (status / expected / received / sourceCount / message). + Data: `FluidCompatibility.getCompatibility()`. +- `fluidContractChange` — fires whenever the fluid contract that this valve + advertises downstream changes. Data: `FluidCompatibility.getContract()`. + +## Events emitted by `source.state.emitter` + +- `positionChange` — fires when the position percentage changes (per + movement tick). Data: `{ position, state, mode, timestamp }`. The valve + itself listens and triggers a Kv lookup + deltaP recompute. +- `stateChange` — fires on transitions of the operating state machine + (`idle → starting → warmingup → operational → accelerating → + decelerating → stopping → coolingdown → idle`, plus `off`). + +## Events emitted by `source.measurements.emitter` + +The `MeasurementContainer` fires `..` whenever +a series receives a new value. Parents subscribe via the generic +`child.measurements.emitter.on(eventName, ...)` handshake. valve +publishes: + +- `pressure.predicted.delta` — predicted pressure drop across the valve. +- `pressure.measured.`, `pressure.predicted.` — when + upstream pressure data arrives via `data.flow`-driven recompute or + direct measurement pushes. +- `flow.measured.`, `flow.predicted.` — mirrored from + upstream sources via `data.flow`. + +Position labels are normalised to lowercase in the event name. + +## Children registered by this node + +valve accepts upstream sources (`machine`, `rotatingmachine`, +`machinegroup`, `machinegroupcontrol`, `pumpingstation`, `valvegroupcontrol`, +…) via `child.register`. The handler records each child for fluid-contract +tracking: the valve reads either the child's `getFluidContract()` result, +its `asset.serviceType` field, or a default per software type +(`liquid` for the rotating-equipment family). It then subscribes to the +child's `fluidContractChange` so re-keyed contracts propagate. diff --git a/src/commands/handlers.js b/src/commands/handlers.js new file mode 100644 index 0000000..fe700c6 --- /dev/null +++ b/src/commands/handlers.js @@ -0,0 +1,66 @@ +'use strict'; + +// Valve command handlers. Each receives (source, msg, ctx) where source is +// the domain instance, msg is the incoming Node-RED message, and ctx carries +// { node, RED, send, logger } per BaseNodeAdapter. + +function _logger(source, ctx) { return ctx?.logger || source?.logger || null; } +function _send(ctx, ports) { if (typeof ctx?.send === 'function') ctx.send(ports); } + +exports.setMode = (source, msg) => { + source.setMode(msg.payload); +}; + +exports.startup = async (source, msg) => { + const p = msg.payload || {}; + await source.handleInput(p.source ?? 'parent', 'execSequence', 'startup'); +}; + +exports.shutdown = async (source, msg) => { + const p = msg.payload || {}; + await source.handleInput(p.source ?? 'parent', 'execSequence', 'shutdown'); +}; + +exports.estop = async (source, msg) => { + const p = msg.payload || {}; + await source.handleInput(p.source ?? 'parent', p.action ?? 'emergencystop'); +}; + +// Legacy umbrella: payload.action selects the canonical verb. +exports.execSequenceAlias = async (source, msg, ctx) => { + const log = _logger(source, ctx); + const action = msg?.payload?.action; + if (action === 'startup') return exports.startup(source, msg, ctx); + if (action === 'shutdown') return exports.shutdown(source, msg, ctx); + if (action === 'emergencyStop' || action === 'emergencystop') { + return exports.estop(source, msg, ctx); + } + log?.warn?.(`execSequence: unsupported action '${action}'`); +}; + +exports.setPosition = async (source, msg) => { + const p = msg.payload || {}; + const action = p.action ?? 'execMovement'; + await source.handleInput(p.source ?? 'parent', action, Number(p.setpoint)); +}; + +exports.dataFlow = (source, msg) => { + const p = msg.payload || {}; + source.updateFlow(p.variant, p.value, p.position, p.unit || source.unitPolicyView?.output?.flow); +}; + +exports.queryCurve = (source, msg, ctx) => { + const reply = Object.assign({}, msg, { topic: 'Showing curve', payload: source.showCurve() }); + _send(ctx, [reply, null, null]); +}; + +exports.registerChild = (source, msg, ctx) => { + const log = _logger(source, ctx); + const childId = msg.payload; + const childObj = ctx?.RED?.nodes?.getNode?.(childId); + if (!childObj || !childObj.source) { + log?.warn?.(`registerChild: child '${childId}' not found or has no .source`); + return; + } + source.childRegistrationUtils.registerChild(childObj.source, msg.positionVsParent); +}; diff --git a/src/commands/index.js b/src/commands/index.js new file mode 100644 index 0000000..c8d5ea1 --- /dev/null +++ b/src/commands/index.js @@ -0,0 +1,20 @@ +'use strict'; + +// valve command registry — consumed by BaseNodeAdapter. Canonical topics +// follow CONTRACTS.md §1; legacy names are kept as aliases (one-time +// deprecation warning when fired). + +const handlers = require('./handlers'); + +module.exports = [ + { topic: 'set.mode', aliases: ['setMode'], payloadSchema: { type: 'string' }, handler: handlers.setMode }, + { topic: 'cmd.startup', payloadSchema: { type: 'any' }, handler: handlers.startup }, + { topic: 'cmd.shutdown', payloadSchema: { type: 'any' }, handler: handlers.shutdown }, + { topic: 'cmd.estop', aliases: ['emergencystop', 'emergencyStop'], payloadSchema: { type: 'any' }, handler: handlers.estop }, + // Content-based demux; behaviour matches cmd.startup/cmd.shutdown exactly. + { topic: 'execSequence', payloadSchema: { type: 'object' }, handler: handlers.execSequenceAlias, _legacy: true }, + { topic: 'set.position', aliases: ['execMovement'], payloadSchema: { type: 'object' }, handler: handlers.setPosition }, + { topic: 'data.flow', aliases: ['updateFlow'], payloadSchema: { type: 'object' }, handler: handlers.dataFlow }, + { topic: 'query.curve', aliases: ['showcurve'], payloadSchema: { type: 'any' }, handler: handlers.queryCurve }, + { topic: 'child.register', aliases: ['registerChild'], payloadSchema: { type: 'string' }, handler: handlers.registerChild }, +]; diff --git a/src/curve/supplierCurve.js b/src/curve/supplierCurve.js new file mode 100644 index 0000000..ea792a7 --- /dev/null +++ b/src/curve/supplierCurve.js @@ -0,0 +1,95 @@ +'use strict'; + +const { loadCurve, predict } = require('generalFunctions'); + +const FALLBACK_SUPPLIER_CURVE = Object.freeze({ + '1.204': { '125': { x: [0, 100], y: [0, 1] } }, +}); + +function isValidCurveData(curveData) { + if (!curveData || typeof curveData !== 'object') return false; + const dKeys = Object.keys(curveData); + if (!dKeys.length) return false; + for (const dk of dKeys) { + const diameters = curveData[dk]; + if (!diameters || typeof diameters !== 'object') return false; + const diaKeys = Object.keys(diameters); + if (!diaKeys.length) return false; + for (const k of diaKeys) { + const c = diameters[k]; + if (!Array.isArray(c?.x) || !Array.isArray(c?.y) || c.x.length < 2 || c.x.length !== c.y.length) return false; + } + } + return true; +} + +function pickNearestNumericKey(keys, target) { + const numeric = keys.map((k) => Number(k)).filter((v) => Number.isFinite(v)); + if (!numeric.length) return String(target); + let selected = numeric[0]; + let dist = Math.abs(selected - target); + for (const k of numeric) { + const d = Math.abs(k - target); + if (d < dist) { selected = k; dist = d; } + } + return String(selected); +} + +class SupplierCurvePredictor { + constructor({ logger, model, configCurve, defaultDensity, defaultTemperatureK, rho, temperatureK, valveDiameter }) { + this.logger = logger; + this.model = model; + this.curve = model ? loadCurve(model) : null; + this._configCurve = configCurve; + this.defaultDensity = defaultDensity; + this.defaultTemperatureK = defaultTemperatureK; + this.rho = Number.isFinite(rho) && rho > 0 ? rho : defaultDensity; + this.T = Number.isFinite(temperatureK) && temperatureK > 0 ? temperatureK : defaultTemperatureK; + this._init(valveDiameter); + } + + _init(valveDiameter) { + const supplierCurve = this._resolveData(); + const densityTarget = Number.isFinite(this.rho) && this.rho > 0 ? this.rho : this.defaultDensity; + const densityKey = pickNearestNumericKey(Object.keys(supplierCurve), densityTarget); + const densityCurveFamily = supplierCurve[densityKey]; + const diaTarget = Number(valveDiameter); + const diameterKey = pickNearestNumericKey( + Object.keys(densityCurveFamily || {}), + Number.isFinite(diaTarget) && diaTarget > 0 ? diaTarget : 125 + ); + this.curveSelection = { densityKey: Number(densityKey), diameterKey: Number(diameterKey) }; + this.predictKv = new predict({ curve: densityCurveFamily || FALLBACK_SUPPLIER_CURVE['1.204'] }); + this.predictKv.fDimension = this.curveSelection.diameterKey; + } + + _resolveData() { + if (isValidCurveData(this.curve)) return this.curve; + if (isValidCurveData(this._configCurve)) return this._configCurve; + this.logger.warn('No valid supplier curve data found, using fallback curve.'); + return FALLBACK_SUPPLIER_CURVE; + } + + predictKvForPosition(positionPercent) { + if (!this.predictKv) return 0.1; + try { + this.predictKv.fDimension = this.curveSelection?.diameterKey || this.predictKv.fDimension; + const kv = Number(this.predictKv.y(positionPercent)); + if (!Number.isFinite(kv)) return 0.1; + return Math.max(0.1, kv); + } catch (error) { + this.logger.warn(`Failed to predict Kv for position=${positionPercent}: ${error.message}`); + return 0.1; + } + } + + snapshot() { + return { + selectedDensity: this.curveSelection?.densityKey ?? null, + selectedDiameter: this.curveSelection?.diameterKey ?? null, + curve: this.predictKv?.currentFxyCurve?.[this.predictKv?.fDimension] || null, + }; + } +} + +module.exports = { SupplierCurvePredictor, FALLBACK_SUPPLIER_CURVE }; diff --git a/src/flow/flowController.js b/src/flow/flowController.js new file mode 100644 index 0000000..0ae0699 --- /dev/null +++ b/src/flow/flowController.js @@ -0,0 +1,84 @@ +'use strict'; + +// Sequence + setpoint execution. Mirrors the pre-refactor Valve.handleInput +// switch but delegates state transitions to host.state. Pre-shutdown ramp-down +// to 0 happens here so the existing test contract holds. + +class FlowController { + constructor(host) { + this.host = host; + this.logger = host.logger; + } + + isValidSourceForMode(source, mode) { + const allowed = this.host.config.mode.allowedSources[mode] || []; + return allowed.has(source); + } + + async handleInput(source, action, parameter) { + if (!this.isValidSourceForMode(source, this.host.currentMode)) { + const msg = `Source '${source}' is not valid for mode '${this.host.currentMode}'.`; + this.logger.warn(msg); + return { status: false, feedback: msg }; + } + this.logger.info(`Handling input from source '${source}' with action '${action}' in mode '${this.host.currentMode}'.`); + try { + switch (action) { + case 'execSequence': + await this.executeSequence(parameter); + break; + case 'execMovement': + await this.setpoint(parameter); + break; + case 'emergencyStop': + case 'emergencystop': + this.logger.warn(`Emergency stop activated by '${source}'.`); + await this.executeSequence('emergencystop'); + break; + case 'statusCheck': + this.logger.info(`Status Check: Mode = '${this.host.currentMode}', Source = '${source}'.`); + break; + default: + this.logger.warn(`Action '${action}' is not implemented.`); + } + this.logger.debug(`Action '${action}' successfully executed`); + return { status: true, feedback: `Action '${action}' successfully executed.` }; + } catch (error) { + this.logger.error(`Error handling input: ${error}`); + } + } + + async executeSequence(sequenceName) { + const sequence = this.host.config.sequences[sequenceName]; + if (!sequence || sequence.size === 0) { + this.logger.warn(`Sequence '${sequenceName}' not defined.`); + return; + } + if (this.host.state.getCurrentState() === 'operational' && sequenceName === 'shutdown') { + this.logger.info(`Machine will ramp down to position 0 before performing ${sequenceName} sequence`); + await this.setpoint(0); + } + this.logger.info(` --------- Executing sequence: ${sequenceName} -------------`); + for (const stateName of sequence) { + try { + await this.host.state.transitionToState(stateName); + } catch (error) { + this.logger.error(`Error during sequence '${sequenceName}': ${error}`); + break; + } + } + } + + async setpoint(value) { + try { + if (typeof value !== 'number' || value < 0) { + throw new Error('Invalid setpoint: Setpoint must be a non-negative number.'); + } + await this.host.state.moveTo(value); + } catch (error) { + this.logger.error(`Error setting setpoint: ${error}`); + } + } +} + +module.exports = FlowController; diff --git a/src/fluid/fluidCompatibility.js b/src/fluid/fluidCompatibility.js new file mode 100644 index 0000000..8ed5f4a --- /dev/null +++ b/src/fluid/fluidCompatibility.js @@ -0,0 +1,201 @@ +'use strict'; + +const SERVICE_TYPES = new Set(['gas', 'liquid']); +const DEFAULT_SOURCE_SERVICE_TYPE = Object.freeze({ + machine: 'liquid', + rotatingmachine: 'liquid', + machinegroup: 'liquid', + machinegroupcontrol: 'liquid', + pumpingstation: 'liquid', +}); + +function normalizeOptional(value) { + const raw = String(value || '').trim().toLowerCase(); + return SERVICE_TYPES.has(raw) ? raw : null; +} + +function defaultForSoftwareType(softwareType) { + const key = String(softwareType || '').trim().toLowerCase(); + return DEFAULT_SOURCE_SERVICE_TYPE[key] || null; +} + +class FluidCompatibility { + constructor({ logger, emitter, expectedServiceType }) { + this.logger = logger; + this.emitter = emitter; + this.expectedServiceType = expectedServiceType || null; + this.upstreamFluidSources = new Map(); + this._fluidContractListeners = new Map(); + this.state = { + status: this.expectedServiceType ? 'pending' : 'unknown', + expectedServiceType: this.expectedServiceType, + receivedServiceType: null, + upstreamServiceTypes: [], + sourceCount: 0, + message: this.expectedServiceType + ? `Waiting for upstream fluid contract (${this.expectedServiceType}).` + : 'No upstream fluid contract available.', + }; + } + + registerChild(child, softwareType) { + if (!child || typeof child !== 'object') { + this.logger.warn('registerChild skipped: invalid child payload'); + return false; + } + const sourceType = String(softwareType || child?.config?.functionality?.softwareType || '').trim().toLowerCase(); + const sourceId = child?.config?.general?.id + || child?.config?.general?.name + || `source-${this.upstreamFluidSources.size + 1}`; + const contract = this._extractContract(child, sourceType); + this.upstreamFluidSources.set(sourceId, { child, sourceType, contract }); + this._bindListener(sourceId, child, sourceType); + this._updateState(); + this.logger.info(`Source '${sourceId}' (${sourceType || 'unknown'}) registered for fluid contract.`); + return true; + } + + _extractContract(child, softwareType) { + const sourceType = String(softwareType || child?.config?.functionality?.softwareType || '').trim().toLowerCase(); + let fromChild = null; + if (typeof child?.getFluidContract === 'function') { + try { fromChild = child.getFluidContract(); } + catch (error) { this.logger.warn(`Failed to read child fluid contract: ${error.message}`); } + } + const status = String(fromChild?.status || '').trim().toLowerCase(); + if (status === 'conflict') return { status: 'conflict', serviceType: null, sourceType }; + const contractType = normalizeOptional(fromChild?.serviceType); + if (contractType) return { status: 'resolved', serviceType: contractType, sourceType }; + const directType = normalizeOptional( + child?.serviceType || child?.expectedServiceType || child?.config?.asset?.serviceType + ); + if (directType) return { status: 'resolved', serviceType: directType, sourceType }; + const fallback = defaultForSoftwareType(sourceType); + if (fallback) return { status: 'inferred', serviceType: fallback, sourceType }; + return { status: 'unknown', serviceType: null, sourceType }; + } + + _bindListener(sourceId, child, sourceType) { + if (!sourceId || this._fluidContractListeners.has(sourceId)) return; + if (!child?.emitter || typeof child.emitter.on !== 'function') return; + const handler = () => { + const latest = this._extractContract(child, sourceType); + const existing = this.upstreamFluidSources.get(sourceId) || {}; + existing.contract = latest; + this.upstreamFluidSources.set(sourceId, existing); + this._updateState(); + }; + child.emitter.on('fluidContractChange', handler); + this._fluidContractListeners.set(sourceId, { emitter: child.emitter, handler }); + } + + _computeSnapshot() { + const expectedServiceType = this.expectedServiceType || null; + const contracts = Array.from(this.upstreamFluidSources.values()) + .map((entry) => entry?.contract) + .filter(Boolean); + const upstreamServiceTypes = Array.from(new Set( + contracts.map((c) => normalizeOptional(c.serviceType)).filter(Boolean) + )); + const hasConflict = contracts.some((c) => String(c.status || '').toLowerCase() === 'conflict'); + const sourceCount = this.upstreamFluidSources.size; + + if (hasConflict || upstreamServiceTypes.length > 1) { + return { + status: 'conflict', expectedServiceType, + receivedServiceType: upstreamServiceTypes.length === 1 ? upstreamServiceTypes[0] : null, + upstreamServiceTypes, sourceCount, + message: `Conflicting upstream fluids detected: ${upstreamServiceTypes.join(', ') || 'unknown'}.`, + }; + } + if (upstreamServiceTypes.length === 1) { + const receivedServiceType = upstreamServiceTypes[0]; + if (expectedServiceType && expectedServiceType !== receivedServiceType) { + return { + status: 'mismatch', expectedServiceType, receivedServiceType, + upstreamServiceTypes, sourceCount, + message: `Expected ${expectedServiceType}, received ${receivedServiceType}.`, + }; + } + return { + status: expectedServiceType ? 'match' : 'inferred', + expectedServiceType, receivedServiceType, + upstreamServiceTypes, sourceCount, + message: expectedServiceType + ? `Fluid contract validated: ${receivedServiceType}.` + : `Fluid inferred from upstream: ${receivedServiceType}.`, + }; + } + return { + status: expectedServiceType ? 'pending' : 'unknown', + expectedServiceType, receivedServiceType: null, + upstreamServiceTypes: [], sourceCount, + message: expectedServiceType + ? `Waiting for upstream fluid contract (${expectedServiceType}).` + : 'No upstream fluid contract available.', + }; + } + + _updateState() { + const next = this._computeSnapshot(); + const prev = this.state || {}; + const changed = ( + prev.status !== next.status + || prev.expectedServiceType !== next.expectedServiceType + || prev.receivedServiceType !== next.receivedServiceType + || prev.sourceCount !== next.sourceCount + || (prev.message || '') !== (next.message || '') + ); + this.state = next; + if (!changed) return; + if (next.status === 'mismatch' || next.status === 'conflict') { + this.logger.warn(`Fluid compatibility warning: ${next.message}`); + } else { + this.logger.info(`Fluid compatibility update: ${next.message}`); + } + this.emitter.emit('fluidCompatibilityChange', this.getCompatibility()); + this.emitter.emit('fluidContractChange', this.getContract()); + } + + getCompatibility() { + const s = this.state || {}; + return { + status: s.status || 'unknown', + expectedServiceType: s.expectedServiceType || null, + receivedServiceType: s.receivedServiceType || null, + upstreamServiceTypes: Array.isArray(s.upstreamServiceTypes) ? [...s.upstreamServiceTypes] : [], + sourceCount: Number(s.sourceCount) || 0, + message: s.message || '', + }; + } + + getContract() { + const c = this.getCompatibility(); + if (c.status === 'conflict') { + return { + status: 'conflict', serviceType: null, + expectedServiceType: c.expectedServiceType, + observedServiceType: c.receivedServiceType, + source: 'valve', + }; + } + const advertised = c.expectedServiceType || null; + return { + status: advertised ? 'resolved' : 'unknown', + serviceType: advertised, + expectedServiceType: c.expectedServiceType, + observedServiceType: c.receivedServiceType, + source: 'valve', + }; + } + + destroy() { + for (const { emitter, handler } of this._fluidContractListeners.values()) { + if (typeof emitter?.off === 'function') emitter.off('fluidContractChange', handler); + else if (typeof emitter?.removeListener === 'function') emitter.removeListener('fluidContractChange', handler); + } + this._fluidContractListeners.clear(); + } +} + +module.exports = { FluidCompatibility, normalizeOptional, defaultForSoftwareType }; diff --git a/src/io/output.js b/src/io/output.js new file mode 100644 index 0000000..d0895dc --- /dev/null +++ b/src/io/output.js @@ -0,0 +1,73 @@ +'use strict'; + +const { statusBadge } = require('generalFunctions'); + +const STATE_SYMBOLS = { + off: '⬛', idle: '⏸️', operational: '⏵️', + starting: '⏯️', warmingup: '🔄', accelerating: '⏩', + stopping: '⏹️', coolingdown: '❄️', decelerating: '⏪', +}; + +const STATE_FILL = { + off: 'red', idle: 'blue', + operational: 'green', warmingup: 'green', + starting: 'yellow', accelerating: 'yellow', + stopping: 'yellow', coolingdown: 'yellow', decelerating: 'yellow', +}; + +const SHOW_METRICS = new Set(['operational', 'warmingup', 'accelerating', 'decelerating']); + +function buildOutput(host) { + const output = {}; + Object.entries(host.measurements.measurements || {}).forEach(([type, variants]) => { + Object.entries(variants || {}).forEach(([variant, positions]) => { + Object.keys(positions || {}).forEach((position) => { + const unit = host._outputUnitForType(type); + const value = host._readMeasurement(type, variant, position, unit); + if (value != null) output[`${position}_${variant}_${type}`] = value; + }); + }); + }); + output.state = host.state.getCurrentState(); + output.percentageOpen = host.state.getCurrentPosition(); + output.moveTimeleft = host.state.getMoveTimeLeft(); + output.mode = host.currentMode; + return output; +} + +function buildStatusBadge(host) { + try { + const mode = host.currentMode; + const stateName = host.state.getCurrentState(); + const flowUnit = host.unitPolicyView.output.flow || 'm3/h'; + const pressureUnit = host.unitPolicyView.output.pressure || 'mbar'; + const flow = Math.round(host.measurements.type('flow').variant('predicted').position('downstream').getCurrentValue(flowUnit)); + let deltaP = host.measurements.type('pressure').variant('predicted').position('delta').getCurrentValue(pressureUnit); + if (deltaP !== null && deltaP !== undefined) deltaP = parseFloat(deltaP.toFixed(0)); + if (Number.isNaN(deltaP)) deltaP = '∞'; + const pos = Math.round(host.state.getCurrentPosition() * 100) / 100; + const symbol = STATE_SYMBOLS[stateName] || '❔'; + const fill = STATE_FILL[stateName] || 'grey'; + + let badge; + if (SHOW_METRICS.has(stateName)) { + badge = statusBadge.compose( + [`${mode}: ${symbol}`, `${pos}%`, `💨${flow}${flowUnit}`, `ΔP${deltaP} ${pressureUnit}`], + { fill, shape: 'dot' } + ); + } else { + badge = statusBadge.compose([`${mode}: ${symbol}`], { fill, shape: 'dot' }); + } + + const fc = typeof host.getFluidCompatibility === 'function' ? host.getFluidCompatibility() : null; + if (fc && (fc.status === 'mismatch' || fc.status === 'conflict')) { + return { fill: 'yellow', shape: 'ring', text: `${badge.text} | ⚠ ${fc.message}` }; + } + return badge; + } catch (err) { + host.logger?.error?.(`getStatusBadge: ${err.message}`); + return statusBadge.error('Status Error'); + } +} + +module.exports = { buildOutput, buildStatusBadge }; diff --git a/src/measurement/measurementRouter.js b/src/measurement/measurementRouter.js new file mode 100644 index 0000000..5b5cfa0 --- /dev/null +++ b/src/measurement/measurementRouter.js @@ -0,0 +1,120 @@ +'use strict'; + +// Routes incoming pressure/flow measurement updates and triggers the +// hydraulic deltaP recompute. The formula path uses fixed FORMULA_UNITS +// (mbar / m3/h / K) — the hydraulic model multiplies q^2 with rho * T +// and divides by an absolute-pressure term, so unit choices are pinned. + +const FORMULA_UNITS = Object.freeze({ pressure: 'mbar', flow: 'm3/h', temperature: 'K' }); + +class MeasurementRouter { + constructor(host) { + this.host = host; + this.logger = host.logger; + } + + updatePressure(variant, value, position, unit) { + const h = this.host; + if (value === null || value === undefined) { + this.logger.warn(`Received null or undefined value for pressure update. Variant: ${variant}, Position: ${position}`); + return; + } + this.logger.debug(`Updating pressure: variant=${variant}, value=${value}, position=${position}`); + const u = unit || h.unitPolicyView.output.pressure; + + if (variant === 'measured') { + h._writeMeasurement('pressure', 'measured', position, Number(value), u); + const downstreamP = h._readMeasurement('pressure', 'measured', 'downstream', FORMULA_UNITS.pressure); + const measuredFlow = h._readMeasurement('flow', 'measured', 'downstream', FORMULA_UNITS.flow); + const predictedFlow = h._readMeasurement('flow', 'predicted', 'downstream', FORMULA_UNITS.flow); + const activeFlow = Number.isFinite(predictedFlow) ? predictedFlow : measuredFlow; + this.updateDeltaP(activeFlow, h.kv, downstreamP); + return; + } + if (variant === 'predicted') { + h._writeMeasurement('pressure', 'predicted', position, Number(value), u); + const downstreamP = h._readMeasurement('pressure', 'predicted', 'downstream', FORMULA_UNITS.pressure); + const measuredFlow = h._readMeasurement('flow', 'measured', 'downstream', FORMULA_UNITS.flow); + const predictedFlow = h._readMeasurement('flow', 'predicted', 'downstream', FORMULA_UNITS.flow); + const activeFlow = Number.isFinite(predictedFlow) ? predictedFlow : measuredFlow; + this.updateDeltaP(activeFlow, h.kv, downstreamP); + return; + } + this.logger.warn(`Unrecognized variant '${variant}' for flow update.`); + } + + updateFlow(variant, value, position, unit) { + const h = this.host; + if (value === null || value === undefined) { + this.logger.warn(`Received null or undefined value for flow update. Variant: ${variant}, Position: ${position}`); + return; + } + this.logger.debug(`Updating flow: variant=${variant}, value=${value}, position=${position}`); + const u = unit || h.unitPolicyView.output.flow; + + if (variant === 'measured') { + h._writeMeasurement('flow', 'measured', position, Number(value), u); + const downstreamP = h._readMeasurement('pressure', 'measured', 'downstream', FORMULA_UNITS.pressure); + const measuredFlow = h._readMeasurement('flow', 'measured', position, FORMULA_UNITS.flow); + this.updateDeltaP(measuredFlow, h.kv, downstreamP); + return; + } + if (variant === 'predicted') { + h._writeMeasurement('flow', 'predicted', position, Number(value), u); + const downstreamP = h._readMeasurement('pressure', 'measured', 'downstream', FORMULA_UNITS.pressure); + const predictedFlow = h._readMeasurement('flow', 'predicted', position, FORMULA_UNITS.flow); + this.updateDeltaP(predictedFlow, h.kv, downstreamP); + return; + } + this.logger.warn(`Unrecognized variant '${variant}' for flow update.`); + } + + updateMeasurement(variant, subType, value, position, unit) { + this.logger.debug(`---------------------- updating ${subType} ------------------ `); + switch (subType) { + case 'pressure': + this.updatePressure(variant, value, position, unit || this.host.unitPolicyView.output.pressure); + return; + case 'flow': + this.updateFlow(variant, value, position, unit || this.host.unitPolicyView.output.flow); + return; + case 'power': + return; + default: + this.logger.error(`Type '${subType}' not recognized for measured update.`); + } + } + + // q in m3/h, downstreamP in mbar(g), temp in K + updateDeltaP(q, kv, downstreamP) { + const h = this.host; + const result = h.hydraulicModel.calculateDeltaPMbar({ + qM3h: q, kv, downstreamGaugeMbar: downstreamP, rho: h.rho, tempK: h.T, + }); + if (!result || !Number.isFinite(result.deltaPMbar)) return; + const deltaP = result.deltaPMbar; + h.deltaPKlep = deltaP; + h.hydraulicDiagnostics = result.details || null; + h._writeMeasurement('pressure', 'predicted', 'delta', deltaP, h.unitPolicyView.output.pressure); + this.logger.info('DeltaP updated to: ' + deltaP); + h.emitter.emit('deltaPChange', deltaP); + this.logger.info('DeltaPChange emitted to valveGroupController'); + } + + updatePositionDependent() { + const h = this.host; + const s = h.state.getCurrentState(); + if (s !== 'operational' && s !== 'accelerating' && s !== 'decelerating') return; + this.logger.debug('Calculating new deltaP'); + const x = h.state.getCurrentPosition(); + const measuredFlow = h._readMeasurement('flow', 'measured', 'downstream', FORMULA_UNITS.flow); + const predictedFlow = h._readMeasurement('flow', 'predicted', 'downstream', FORMULA_UNITS.flow); + const currentFlow = Number.isFinite(predictedFlow) ? predictedFlow : measuredFlow; + const downstreamP = h._readMeasurement('pressure', 'measured', 'downstream', FORMULA_UNITS.pressure); + h.kv = h.curvePredictor.predictKvForPosition(x); + this.logger.debug(`Kv value for position valve ${x} is ${h.kv}`); + this.updateDeltaP(currentFlow, h.kv, downstreamP); + } +} + +module.exports = { MeasurementRouter, FORMULA_UNITS }; diff --git a/src/nodeClass.js b/src/nodeClass.js index 26553d3..9b7d4f8 100644 --- a/src/nodeClass.js +++ b/src/nodeClass.js @@ -1,341 +1,47 @@ -/** - * Encapsulates all node logic in a reusable class. In future updates we can split this into multiple generic classes and use the config to specifiy which ones to use. - * This allows us to keep the Node-RED node clean and focused on wiring up the UI and event handlers. - */ -const { outputUtils, configManager, convert } = require('generalFunctions'); -const Specific = require("./specificClass"); +'use strict'; +const { BaseNodeAdapter, convert } = require('generalFunctions'); +const Valve = require('./specificClass'); +const commands = require('./commands'); -class nodeClass { - /** - * Create a MeasurementNode. - * @param {object} uiConfig - Node-RED node configuration. - * @param {object} RED - Node-RED runtime API. - * @param {object} nodeInstance - The Node-RED node instance. - * @param {string} nameOfNode - The name of the node, used for - */ - constructor(uiConfig, RED, nodeInstance, nameOfNode) { +class nodeClass extends BaseNodeAdapter { + static DomainClass = Valve; + static commands = commands; + static tickInterval = null; + static statusInterval = 1000; - // Preserve RED reference for HTTP endpoints if needed - this.node = nodeInstance; - this.RED = RED; - this.name = nameOfNode; - this.source = null; // Will hold the specific class instance - this.config = null; // Will hold the merged configuration - - // Load default & UI config - this._loadConfig(uiConfig,this.node); - - // Instantiate core Measurement class - this._setupSpecificClass(uiConfig); - - // Wire up event and lifecycle handlers - this._bindEvents(); - this._registerChild(); - this._startTickLoop(); - this._attachInputHandler(); - this._attachCloseHandler(); - } - - /** - * Load and merge default config with user-defined settings. - * @param {object} uiConfig - Raw config from Node-RED UI. - */ - _loadConfig(uiConfig,node) { - // Resolve flow unit with validation before building config - const flowUnit = this._resolveUnitOrFallback(uiConfig.unit, 'volumeFlowRate', 'm3/h', 'flow'); - const resolvedUiConfig = { ...uiConfig, unit: flowUnit }; - - // Build config: base sections handle general, asset, functionality - const cfgMgr = new configManager(); - this.config = cfgMgr.buildConfig(this.name, resolvedUiConfig, node.id); - - // Utility for formatting outputs - this._output = new outputUtils(); - } - - _resolveUnitOrFallback(candidate, expectedMeasure, fallbackUnit, label) { - const raw = typeof candidate === "string" ? candidate.trim() : ""; - const fallback = String(fallbackUnit || "").trim(); - if (!raw) { - return fallback; - } - try { - const desc = convert().describe(raw); - if (expectedMeasure && desc.measure !== expectedMeasure) { - throw new Error(`expected '${expectedMeasure}' but got '${desc.measure}'`); - } - return raw; - } catch (error) { - this.node?.warn?.(`Invalid ${label} unit '${raw}' (${error.message}). Falling back to '${fallback}'.`); - return fallback; - } - } - - /** - * Instantiate the core logic and store as source. - */ - _setupSpecificClass(uiConfig) { - const vconfig = this.config; - const asNumberOrUndefined = (value) => { - const parsed = Number(value); - return Number.isFinite(parsed) ? parsed : undefined; - }; - - // need extra state for this - const stateConfig = { - general: { - logging: { - enabled: vconfig.general.logging.enabled, - logLevel: vconfig.general.logging.logLevel - } - }, - movement: { - speed: asNumberOrUndefined(uiConfig.speed) - }, + buildDomainConfig(uiConfig) { + const flowUnit = _resolveUnit(uiConfig.unit, 'volumeFlowRate', 'm3/h'); + const asNum = (v) => { const n = Number(v); return Number.isFinite(n) ? n : undefined; }; + Valve._pendingExtras = { + stateConfig: { + general: { logging: { enabled: uiConfig.enableLog, logLevel: uiConfig.logLevel } }, + movement: { speed: asNum(uiConfig.speed) }, time: { - starting: asNumberOrUndefined(uiConfig.startup), - warmingup: asNumberOrUndefined(uiConfig.warmup), - stopping: asNumberOrUndefined(uiConfig.shutdown), - coolingdown: asNumberOrUndefined(uiConfig.cooldown) - } - }; - - const runtimeOptions = { - serviceType: uiConfig.serviceType, - fluidDensity: asNumberOrUndefined(uiConfig.fluidDensity), - fluidTemperatureK: asNumberOrUndefined(uiConfig.fluidTemperatureK), - gasChokedRatioLimit: asNumberOrUndefined(uiConfig.gasChokedRatioLimit), + starting: asNum(uiConfig.startup), warmingup: asNum(uiConfig.warmup), + stopping: asNum(uiConfig.shutdown), coolingdown: asNum(uiConfig.cooldown), + }, + }, + runtimeOptions: { + serviceType: uiConfig.serviceType, + fluidDensity: asNum(uiConfig.fluidDensity), + fluidTemperatureK: asNum(uiConfig.fluidTemperatureK), + gasChokedRatioLimit: asNum(uiConfig.gasChokedRatioLimit), + }, }; - - this.source = new Specific(vconfig, stateConfig, runtimeOptions); - - //store in node - this.node.source = this.source; // Store the source in the node instance for easy access - - } - - /** - * Bind Measurement events to Node-RED status updates. Using internal emitter. --> REMOVE LATER WE NEED ONLY COMPLETE CHILDS AND THEN CHECK FOR UPDATES - */ - _bindEvents() { - - } - - _updateNodeStatus() { - const v = this.source; - - try { - const mode = v.currentMode; - const state = v.state.getCurrentState(); - const fluidCompatibility = typeof v.getFluidCompatibility === "function" - ? v.getFluidCompatibility() - : null; - const fluidWarningText = ( - fluidCompatibility - && (fluidCompatibility.status === "mismatch" || fluidCompatibility.status === "conflict") - ) - ? fluidCompatibility.message - : ""; - const flowUnit = v?.unitPolicy?.output?.flow || this.config.general.unit || "m3/h"; - const pressureUnit = v?.unitPolicy?.output?.pressure || "mbar"; - const flow = Math.round(v.measurements.type("flow").variant("predicted").position("downstream").getCurrentValue(flowUnit)); - - let deltaP = v.measurements.type("pressure").variant("predicted").position("delta").getCurrentValue(pressureUnit); - if (deltaP !== null) { - deltaP = parseFloat(deltaP.toFixed(0)); - } - if(isNaN(deltaP)) { - deltaP = "∞"; - } - const roundedPosition = Math.round(v.state.getCurrentPosition() * 100) / 100; - let symbolState; - switch(state){ - case "off": - symbolState = "⬛"; - break; - case "idle": - symbolState = "⏸️"; - break; - case "operational": - symbolState = "⏵️"; - break; - case "starting": - symbolState = "⏯️"; - break; - case "warmingup": - symbolState = "🔄"; - break; - case "accelerating": - symbolState = "⏩"; - break; - case "stopping": - symbolState = "⏹️"; - break; - case "coolingdown": - symbolState = "❄️"; - break; - case "decelerating": - symbolState = "⏪"; - break; - } - - let status; - switch (state) { - case "off": - status = { fill: "red", shape: "dot", text: `${mode}: OFF` }; - break; - case "idle": - status = { fill: "blue", shape: "dot", text: `${mode}: ${symbolState}` }; - break; - case "operational": - status = { fill: "green", shape: "dot", text: `${mode}: ${symbolState} | ${roundedPosition}% | 💨${flow}${flowUnit} | ΔP${deltaP} ${pressureUnit}`}; - break; - case "starting": - status = { fill: "yellow", shape: "dot", text: `${mode}: ${symbolState}` }; - break; - case "warmingup": - status = { fill: "green", shape: "dot", text: `${mode}: ${symbolState} | ${roundedPosition}% | 💨${flow}${flowUnit} | ΔP${deltaP} ${pressureUnit}`}; - break; - case "accelerating": - status = { fill: "yellow", shape: "dot", text: `${mode}: ${symbolState} | ${roundedPosition}% | 💨${flow}${flowUnit} | ΔP${deltaP} ${pressureUnit}` }; - break; - case "stopping": - status = { fill: "yellow", shape: "dot", text: `${mode}: ${symbolState}` }; - break; - case "coolingdown": - status = { fill: "yellow", shape: "dot", text: `${mode}: ${symbolState}` }; - break; - case "decelerating": - status = { fill: "yellow", shape: "dot", text: `${mode}: ${symbolState} - ${roundedPosition}% | 💨${flow}${flowUnit} | ΔP${deltaP} ${pressureUnit}`}; - break; - default: - status = { fill: "grey", shape: "dot", text: `${mode}: ${symbolState}` }; - } - if (fluidWarningText) { - status = { - fill: "yellow", - shape: "ring", - text: `${status.text} | ⚠ ${fluidWarningText}`, - }; - } - return status; - } catch (error) { - this.node.error("Error in updateNodeStatus: " + error.message); - return { fill: "red", shape: "ring", text: "Status Error" }; - } - } - - /** - * Register this node as a child upstream and downstream. - * Delayed to avoid Node-RED startup race conditions. - */ - _registerChild() { - setTimeout(() => { - this.node.send([ - null, - null, - { topic: 'registerChild', payload: this.node.id , positionVsParent: this.config?.functionality?.positionVsParent || 'atEquipment' }, - ]); - }, 100); - } - - /** - * Start the periodic tick loop. - */ - _startTickLoop() { - setTimeout(() => { - this._tickInterval = setInterval(() => this._tick(), 1000); - - // Update node status on nodered screen every second ( this is not the best way to do this, but it works for now) - this._statusInterval = setInterval(() => { - const status = this._updateNodeStatus(); - this.node.status(status); - }, 1000); - - }, 1000); - } - - /** - * Execute a single tick: update measurement, format and send outputs. - */ - _tick() { - //this.source.tick(); - - const raw = this.source.getOutput(); - const processMsg = this._output.formatMsg(raw, this.source.config, 'process'); - const influxMsg = this._output.formatMsg(raw, this.source.config, 'influxdb'); - - // Send only updated outputs on ports 0 & 1 - this.node.send([processMsg, influxMsg]); - } - - /** - * Attach the node's input handler, routing control messages to the class. - */ - _attachInputHandler() { - this.node.on('input', (msg, send, done) => { - const v = this.source; - try { - switch(msg.topic) { - case 'registerChild': { - const childId = msg.payload; - const childObj = this.RED.nodes.getNode(childId); - if (!childObj || !childObj.source) { - v.logger.warn(`registerChild skipped: missing child/source for id=${childId}`); - break; - } - v.childRegistrationUtils.registerChild(childObj.source, msg.positionVsParent); - break; - } - case 'setMode': - v.setMode(msg.payload); - break; - case 'execSequence': { - const { source: seqSource, action: seqAction, parameter } = msg.payload; - v.handleInput(seqSource, seqAction, parameter); - break; - } - case 'execMovement': { - const { source: mvSource, action: mvAction, setpoint } = msg.payload; - v.handleInput(mvSource, mvAction, Number(setpoint)); - break; - } - case 'emergencystop': - case 'emergencyStop': { - const payload = msg.payload || {}; - const esSource = payload.source || 'parent'; - v.handleInput(esSource, 'emergencystop'); - break; - } - case 'showcurve': - send({ topic: 'Showing curve', payload: v.showCurve() }); - break; - case 'updateFlow': - v.updateFlow(msg.payload.variant, msg.payload.value, msg.payload.position, msg.payload.unit || this.config.general.unit); - break; - default: - v.logger.warn(`Unknown topic: ${msg.topic}`); - } - } catch (error) { - v.logger.error(`Input handler failure: ${error.message}`); - } - - if (typeof done === 'function') done(); - }); - } - - /** - * Clean up timers and intervals when Node-RED stops the node. - */ - _attachCloseHandler() { - this.node.on('close', (done) => { - clearInterval(this._tickInterval); - clearInterval(this._statusInterval); - this.source?.destroy?.(); - if (typeof done === 'function') done(); - }); + return { general: { unit: flowUnit }, asset: { unit: flowUnit } }; } } +function _resolveUnit(candidate, expectedMeasure, fallback) { + const raw = typeof candidate === 'string' ? candidate.trim() : ''; + const fb = String(fallback || '').trim(); + if (!raw) return fb; + try { + const desc = convert().describe(raw); + if (expectedMeasure && desc.measure !== expectedMeasure) return fb; + return raw; + } catch (_) { return fb; } +} + module.exports = nodeClass; diff --git a/src/specificClass.js b/src/specificClass.js index 76e62a2..8460a5e 100644 --- a/src/specificClass.js +++ b/src/specificClass.js @@ -1,895 +1,208 @@ -/** - * @file valve.js - * - * Permission is hereby granted to any person obtaining a copy of this software - * and associated documentation files (the "Software"), to use it for personal - * or non-commercial purposes, with the following restrictions: - * - * 1. **No Copying or Redistribution**: The Software or any of its parts may not - * be copied, merged, distributed, sublicensed, or sold without explicit - * prior written permission from the author. - * - * 2. **Commercial Use**: Any use of the Software for commercial purposes requires - * a valid license, obtainable only with the explicit consent of the author. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE, AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT, OR OTHERWISE, ARISING FROM, - * OUT OF, OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - * - * Ownership of this code remains solely with the original author. Unauthorized - * use of this Software is strictly prohibited. - * - * Author: - * - Rene De Ren - * Email: - * - r.de.ren@brabantsedelta.nl - * - * Future Improvements: - * - Time-based stability checks - * - Warmup handling - * - Dynamic outlier detection thresholds - * - Dynamic smoothing window and methods - * - Alarm and threshold handling - * - Maintenance mode - * - Historical data and trend analysis - */ -/** - * @file valveClass.js - * - * Permission is hereby granted to any person obtaining a copy of this software - * and associated documentation files (the "Software"), to use it for personal -.... -*/ +'use strict'; -//load local dependencies -const EventEmitter = require('events'); -const { loadCurve, logger, configUtils, configManager, state, MeasurementContainer, predict, childRegistrationUtils, convert } = require('generalFunctions'); +// valve — S88 Equipment Module domain orchestrator. Concern modules under +// src/{fluid,curve,measurement,flow,state,io,commands} carry the bulk of +// the logic; this file wires them together and preserves the public surface +// the test suite + parents (VGC, MGC, pumpingStation) depend on. + +const { BaseDomain, UnitPolicy, state } = require('generalFunctions'); const { ValveHydraulicModel, normalizeServiceType } = require('./hydraulicModel'); +const { FluidCompatibility, normalizeOptional } = require('./fluid/fluidCompatibility'); +const { SupplierCurvePredictor } = require('./curve/supplierCurve'); +const { MeasurementRouter } = require('./measurement/measurementRouter'); +const FlowController = require('./flow/flowController'); +const { bindStateEvents } = require('./state/stateBindings'); +const io = require('./io/output'); -const SERVICE_TYPES = new Set(['gas', 'liquid']); -const DEFAULT_SOURCE_SERVICE_TYPE = Object.freeze({ - machine: 'liquid', - rotatingmachine: 'liquid', - machinegroup: 'liquid', - machinegroupcontrol: 'liquid', - pumpingstation: 'liquid', -}); +class Valve extends BaseDomain { + static name = 'valve'; -const CANONICAL_UNITS = Object.freeze({ - pressure: 'Pa', - flow: 'm3/s', - temperature: 'K', -}); + static unitPolicy = UnitPolicy.declare({ + canonical: { pressure: 'Pa', flow: 'm3/s', temperature: 'K' }, + output: { pressure: 'mbar', flow: 'm3/h', temperature: 'C' }, + requireUnitForTypes: ['pressure', 'flow', 'temperature'], + }); -const DEFAULT_IO_UNITS = Object.freeze({ - pressure: 'mbar', - flow: 'm3/h', - temperature: 'C', -}); - -const FORMULA_UNITS = Object.freeze({ - pressure: 'mbar', - flow: 'm3/h', - temperature: 'K', -}); - -const FALLBACK_SUPPLIER_CURVE = Object.freeze({ - '1.204': { - '125': { - x: [0, 100], - y: [0, 1], - }, - }, -}); - -class Valve { - constructor(valveConfig = {}, stateConfig = {}, runtimeOptions = {}) { - //basic setup - this.emitter = new EventEmitter(); // nodig voor ontvangen en uitvoeren van events emit() --> Zien als internet berichten (niet bedraad in node-red) - - this.logger = new logger(valveConfig.general.logging.enabled,valveConfig.general.logging.logLevel, valveConfig.general.name); - this.configManager = new configManager(); - this.defaultConfig = this.configManager.getConfig('valve'); // Load default config for rotating machine ( use software type name ? ) - this.configUtils = new configUtils(this.defaultConfig); - - // Load supplier-specific curve data (if available for model) - this.model = valveConfig.asset.model; // Get the model from the valveConfig - this.curve = this.model ? loadCurve(this.model) : null; - - //Init config and check if it is valid - this.config = this.configUtils.initConfig(valveConfig); - this.unitPolicy = this._buildUnitPolicy(this.config); - this.config = this.configUtils.updateConfig(this.config, { - general: { unit: this.unitPolicy.output.flow }, - asset: { ...this.config.asset, unit: this.unitPolicy.output.flow }, - }); - - // Initialize measurements - this.measurements = new MeasurementContainer({ - autoConvert: true, - defaultUnits: { - pressure: this.unitPolicy.output.pressure, - flow: this.unitPolicy.output.flow, - temperature: this.unitPolicy.output.temperature, - }, - preferredUnits: { - pressure: this.unitPolicy.output.pressure, - flow: this.unitPolicy.output.flow, - temperature: this.unitPolicy.output.temperature, - }, - canonicalUnits: this.unitPolicy.canonical, - storeCanonical: true, - strictUnitValidation: true, - throwOnInvalidUnit: true, - requireUnitForTypes: ['pressure', 'flow', 'temperature'], - }, this.logger); - this.child = {}; // object to hold child information so we know on what to subscribe - - // Init after config is set - this.state = new state(stateConfig, this.logger); // Init State manager and pass logger - - this.state.stateManager.currentState = "operational"; // Set default state to operational - - this.kv = 0; // default - const configuredServiceType = this._normalizeOptionalServiceType(runtimeOptions?.serviceType || valveConfig?.asset?.serviceType); - this.expectedServiceType = configuredServiceType; - this.serviceType = configuredServiceType || normalizeServiceType(runtimeOptions?.serviceType || valveConfig?.asset?.serviceType); - this.upstreamFluidSources = new Map(); - this._fluidContractListeners = new Map(); - this.fluidCompatibility = { - status: configuredServiceType ? 'pending' : 'unknown', - expectedServiceType: configuredServiceType || null, - receivedServiceType: null, - upstreamServiceTypes: [], - sourceCount: 0, - message: configuredServiceType - ? `Waiting for upstream fluid contract (${configuredServiceType}).` - : 'No upstream fluid contract available.', - }; - this.hydraulicModel = new ValveHydraulicModel( - { - serviceType: this.serviceType, - gasChokedRatioLimit: runtimeOptions?.gasChokedRatioLimit ?? valveConfig?.asset?.gasChokedRatioLimit, - }, - this.logger - ); - this.rho = this._resolvePositiveNumber( - runtimeOptions?.fluidDensity, - valveConfig?.asset?.fluidDensity, - this.hydraulicModel.defaultDensity - ); - this.T = this._resolvePositiveNumber( - runtimeOptions?.fluidTemperatureK, - valveConfig?.asset?.fluidTemperatureK, - this.hydraulicModel.defaultTemperatureK - ); - this.currentMode = this.config.mode.current; - - // wanneer hij deze ontvangt is de positie van de klep verandererd en gaat hij de updateposition functie aanroepen wat dan alle metingen en standen gaat updaten - this._onPositionChange = (data) => { - this.logger.debug(`Position change detected: ${data}`); - this.updatePosition(); - }; - this.state.emitter.on("positionChange", this._onPositionChange); //To update deltaP - - - this.childRegistrationUtils = new childRegistrationUtils(this); // Child registration utility - this._initSupplierCurvePredictor(); - } - - // -------- Config -------- // - updateConfig(newConfig) { - this.config = this.configUtils.updateConfig(this.config, newConfig); - } - - isValidSourceForMode(source, mode) { - const allowedSourcesSet = this.config.mode.allowedSources[mode] || []; - return allowedSourcesSet.has(source); - } - - async handleInput(source, action, parameter) { - if (!this.isValidSourceForMode(source, this.currentMode)) { - let warningTxt = `Source '${source}' is not valid for mode '${this.currentMode}'.`; - this.logger.warn(warningTxt); - return {status : false , feedback: warningTxt}; - } - - this.logger.info(`Handling input from source '${source}' with action '${action}' in mode '${this.currentMode}'.`); - try { - switch (action) { - case "execSequence": - await this.executeSequence(parameter); - break; - case "execMovement": // past het setpoint aan - movement van klep stand - await this.setpoint(parameter); - break; - case "emergencyStop": - this.logger.warn(`Emergency stop activated by '${source}'.`); - await this.executeSequence("emergencystop"); - break; - case "emergencystop": - this.logger.warn(`Emergency stop activated by '${source}'.`); - await this.executeSequence("emergencystop"); - break; - case "statusCheck": - this.logger.info(`Status Check: Mode = '${this.currentMode}', Source = '${source }'.`); - break; - default: - this.logger.warn(`Action '${action}' is not implemented.`); - break; - } - this.logger.debug(`Action '${action}' successfully executed`); - return {status : true , feedback: `Action '${action}' successfully executed.`}; - } catch (error) { - this.logger.error(`Error handling input: ${error}`); - } - - } - - setMode(newMode) { - const availableModes = Array.isArray(this.defaultConfig?.mode?.current?.rules?.values) - ? this.defaultConfig.mode.current.rules.values.map(v => v.value) - : Object.keys(this.config?.mode?.allowedSources || {}); - if (!availableModes.includes(newMode)) { - this.logger.warn(`Invalid mode '${newMode}'. Allowed modes are: ${availableModes.join(', ')}`); - return; - } - - this.currentMode = newMode; - this.logger.info(`Mode successfully changed to '${newMode}'.`); - } - - _buildUnitPolicy(config = {}) { - const flowUnit = this._resolveUnitOrFallback( - config?.general?.unit || config?.asset?.unit, - 'volumeFlowRate', - DEFAULT_IO_UNITS.flow - ); - - return { - canonical: { ...CANONICAL_UNITS }, - output: { - flow: flowUnit, - pressure: DEFAULT_IO_UNITS.pressure, - temperature: DEFAULT_IO_UNITS.temperature, - } - }; - } - - _resolveUnitOrFallback(candidate, expectedMeasure, fallbackUnit) { - const fallback = String(fallbackUnit || '').trim(); - const raw = typeof candidate === 'string' ? candidate.trim() : ''; - if (!raw) { - return fallback; - } - try { - const desc = convert().describe(raw); - if (expectedMeasure && desc.measure !== expectedMeasure) { - throw new Error(`expected '${expectedMeasure}', got '${desc.measure}'`); - } - return raw; - } catch (error) { - this.logger?.warn?.(`Invalid unit '${raw}' (${error.message}); falling back to '${fallback}'.`); - return fallback; - } - } - - _outputUnitForType(type) { - switch (String(type || '').toLowerCase()) { - case 'flow': - return this.unitPolicy.output.flow; - case 'pressure': - return this.unitPolicy.output.pressure; - case 'temperature': - return this.unitPolicy.output.temperature; - default: - return null; - } - } - - _readMeasurement(type, variant, position, unit = null) { - const requestedUnit = unit || this._outputUnitForType(type); - return this.measurements - .type(type) - .variant(variant) - .position(position) - .getCurrentValue(requestedUnit || undefined); - } - - _writeMeasurement(type, variant, position, value, unit = null, timestamp = Date.now()) { - if (!Number.isFinite(value)) { - return; - } - this.measurements - .type(type) - .variant(variant) - .position(position) - .value(value, timestamp, unit || undefined); - } - - _resolvePositiveNumber(...candidates) { - for (const candidate of candidates) { - const parsed = Number(candidate); - if (Number.isFinite(parsed) && parsed > 0) { - return parsed; - } - } - return undefined; - } - - _normalizeOptionalServiceType(value) { - const raw = String(value || '').trim().toLowerCase(); - if (SERVICE_TYPES.has(raw)) { - return raw; - } - return null; - } - - _deriveDefaultServiceTypeForSoftwareType(softwareType) { - const key = String(softwareType || '').trim().toLowerCase(); - return DEFAULT_SOURCE_SERVICE_TYPE[key] || null; - } - - _extractFluidContractFromChild(child, softwareType) { - const sourceType = String(softwareType || child?.config?.functionality?.softwareType || '').trim().toLowerCase(); - let contractFromChild = null; - - if (typeof child?.getFluidContract === 'function') { - try { - contractFromChild = child.getFluidContract(); - } catch (error) { - this.logger.warn(`Failed to read child fluid contract: ${error.message}`); - } - } - - const contractStatus = String(contractFromChild?.status || '').trim().toLowerCase(); - if (contractStatus === 'conflict') { - return { - status: 'conflict', - serviceType: null, - sourceType, - }; - } - - const contractType = this._normalizeOptionalServiceType(contractFromChild?.serviceType); - if (contractType) { - return { - status: 'resolved', - serviceType: contractType, - sourceType, - }; - } - - const directType = this._normalizeOptionalServiceType( - child?.serviceType - || child?.expectedServiceType - || child?.config?.asset?.serviceType - ); - if (directType) { - return { - status: 'resolved', - serviceType: directType, - sourceType, - }; - } - - const fallbackType = this._deriveDefaultServiceTypeForSoftwareType(sourceType); - if (fallbackType) { - return { - status: 'inferred', - serviceType: fallbackType, - sourceType, - }; - } - - return { - status: 'unknown', - serviceType: null, - sourceType, - }; - } - - _bindFluidContractListener(sourceId, child, sourceType) { - if (!sourceId || this._fluidContractListeners.has(sourceId)) { - return; - } - if (!child?.emitter || typeof child.emitter.on !== 'function') { - return; - } - const handler = () => { - const latest = this._extractFluidContractFromChild(child, sourceType); - const existing = this.upstreamFluidSources.get(sourceId) || {}; - existing.contract = latest; - this.upstreamFluidSources.set(sourceId, existing); - this._updateFluidCompatibilityState(); - }; - child.emitter.on('fluidContractChange', handler); - this._fluidContractListeners.set(sourceId, { - emitter: child.emitter, - handler, - }); - } - - _computeFluidCompatibilitySnapshot() { - const expectedServiceType = this.expectedServiceType || null; - const contracts = Array.from(this.upstreamFluidSources.values()) - .map((entry) => entry?.contract) - .filter(Boolean); - const upstreamServiceTypes = Array.from(new Set( - contracts - .map((contract) => this._normalizeOptionalServiceType(contract.serviceType)) - .filter(Boolean) - )); - const hasConflict = contracts.some((contract) => String(contract.status || '').toLowerCase() === 'conflict'); - const sourceCount = this.upstreamFluidSources.size; - - if (hasConflict || upstreamServiceTypes.length > 1) { - return { - status: 'conflict', - expectedServiceType, - receivedServiceType: upstreamServiceTypes.length === 1 ? upstreamServiceTypes[0] : null, - upstreamServiceTypes, - sourceCount, - message: `Conflicting upstream fluids detected: ${upstreamServiceTypes.join(', ') || 'unknown'}.`, - }; - } - - if (upstreamServiceTypes.length === 1) { - const receivedServiceType = upstreamServiceTypes[0]; - if (expectedServiceType && expectedServiceType !== receivedServiceType) { - return { - status: 'mismatch', - expectedServiceType, - receivedServiceType, - upstreamServiceTypes, - sourceCount, - message: `Expected ${expectedServiceType}, received ${receivedServiceType}.`, - }; - } - return { - status: expectedServiceType ? 'match' : 'inferred', - expectedServiceType, - receivedServiceType, - upstreamServiceTypes, - sourceCount, - message: expectedServiceType - ? `Fluid contract validated: ${receivedServiceType}.` - : `Fluid inferred from upstream: ${receivedServiceType}.`, - }; - } - - return { - status: expectedServiceType ? 'pending' : 'unknown', - expectedServiceType, - receivedServiceType: null, - upstreamServiceTypes: [], - sourceCount, - message: expectedServiceType - ? `Waiting for upstream fluid contract (${expectedServiceType}).` - : 'No upstream fluid contract available.', - }; - } - - _updateFluidCompatibilityState() { - const next = this._computeFluidCompatibilitySnapshot(); - const previous = this.fluidCompatibility || {}; - const changed = ( - previous.status !== next.status - || previous.expectedServiceType !== next.expectedServiceType - || previous.receivedServiceType !== next.receivedServiceType - || previous.sourceCount !== next.sourceCount - || (previous.message || '') !== (next.message || '') - ); - this.fluidCompatibility = next; - if (!changed) { - return; - } - if (next.status === 'mismatch' || next.status === 'conflict') { - this.logger.warn(`Fluid compatibility warning: ${next.message}`); - } else { - this.logger.info(`Fluid compatibility update: ${next.message}`); - } - this.emitter.emit('fluidCompatibilityChange', this.getFluidCompatibility()); - this.emitter.emit('fluidContractChange', this.getFluidContract()); - } - - getFluidCompatibility() { - const state = this.fluidCompatibility || {}; - return { - status: state.status || 'unknown', - expectedServiceType: state.expectedServiceType || null, - receivedServiceType: state.receivedServiceType || null, - upstreamServiceTypes: Array.isArray(state.upstreamServiceTypes) ? [...state.upstreamServiceTypes] : [], - sourceCount: Number(state.sourceCount) || 0, - message: state.message || '', - }; - } - - getFluidContract() { - const compatibility = this.getFluidCompatibility(); - if (compatibility.status === 'conflict') { - return { - status: 'conflict', - serviceType: null, - expectedServiceType: compatibility.expectedServiceType, - observedServiceType: compatibility.receivedServiceType, - source: 'valve', - }; - } - - const advertisedServiceType = compatibility.expectedServiceType || null; - return { - status: advertisedServiceType ? 'resolved' : 'unknown', - serviceType: advertisedServiceType, - expectedServiceType: compatibility.expectedServiceType, - observedServiceType: compatibility.receivedServiceType, - source: 'valve', - }; - } - - registerChild(child, softwareType) { - if (!child || typeof child !== 'object') { - this.logger.warn('registerChild skipped: invalid child payload'); - return false; - } - const sourceType = String(softwareType || child?.config?.functionality?.softwareType || '').trim().toLowerCase(); - const sourceId = child?.config?.general?.id - || child?.config?.general?.name - || `source-${this.upstreamFluidSources.size + 1}`; - const contract = this._extractFluidContractFromChild(child, sourceType); - this.upstreamFluidSources.set(sourceId, { - child, - sourceType, - contract, - }); - this._bindFluidContractListener(sourceId, child, sourceType); - this._updateFluidCompatibilityState(); - this.logger.info(`Source '${sourceId}' (${sourceType || 'unknown'}) registered for fluid contract.`); - return true; - } - - _initSupplierCurvePredictor() { - const supplierCurve = this._resolveSupplierCurveData(); - const densityTarget = Number.isFinite(this.rho) && this.rho > 0 ? this.rho : this.hydraulicModel.defaultDensity; - const densityKey = this._pickNearestNumericKey(Object.keys(supplierCurve), densityTarget); - const densityCurveFamily = supplierCurve[densityKey]; - const diameterTarget = Number(this.config?.asset?.valveDiameter); - const diameterKey = this._pickNearestNumericKey( - Object.keys(densityCurveFamily || {}), - Number.isFinite(diameterTarget) && diameterTarget > 0 ? diameterTarget : 125 - ); - - this.curveSelection = { - densityKey: Number(densityKey), - diameterKey: Number(diameterKey), - }; - this.rho = Number.isFinite(this.rho) && this.rho > 0 ? this.rho : this.hydraulicModel.defaultDensity; - this.T = Number.isFinite(this.T) && this.T > 0 ? this.T : this.hydraulicModel.defaultTemperatureK; - - this.predictKv = new predict({ curve: densityCurveFamily || FALLBACK_SUPPLIER_CURVE['1.204'] }); - this.predictKv.fDimension = this.curveSelection.diameterKey; - - this.logger.info( - `Using supplier curve model='${this.model || "inline"}', densityCurve=${this.curveSelection.densityKey}, diameter=${this.curveSelection.diameterKey}, serviceType=${this.serviceType}` - ); - } - - _resolveSupplierCurveData() { - if (this._isValidSupplierCurveData(this.curve)) { - return this.curve; - } - if (this._isValidSupplierCurveData(this.config?.asset?.valveCurve)) { - return this.config.asset.valveCurve; - } - this.logger.warn("No valid supplier curve data found, using fallback curve."); - return FALLBACK_SUPPLIER_CURVE; - } - - _isValidSupplierCurveData(curveData) { - if (!curveData || typeof curveData !== "object") { - return false; - } - const densityKeys = Object.keys(curveData); - if (!densityKeys.length) { - return false; - } - for (const densityKey of densityKeys) { - const diameters = curveData[densityKey]; - if (!diameters || typeof diameters !== "object") { - return false; - } - const diameterKeys = Object.keys(diameters); - if (!diameterKeys.length) { - return false; - } - for (const diameterKey of diameterKeys) { - const curve = diameters[diameterKey]; - if (!Array.isArray(curve?.x) || !Array.isArray(curve?.y) || curve.x.length < 2 || curve.x.length !== curve.y.length) { - return false; - } - } - } - return true; - } - - _pickNearestNumericKey(keys, target) { - const numericKeys = keys.map((key) => Number(key)).filter((value) => Number.isFinite(value)); - if (!numericKeys.length) { - return String(target); - } - let selected = numericKeys[0]; - let selectedDistance = Math.abs(selected - target); - for (const key of numericKeys) { - const distance = Math.abs(key - target); - if (distance < selectedDistance) { - selected = key; - selectedDistance = distance; - } - } - return String(selected); - } - - _predictKvForPosition(positionPercent) { - if (!this.predictKv) { - return 0.1; - } - try { - this.predictKv.fDimension = this.curveSelection?.diameterKey || this.predictKv.fDimension; - const kv = Number(this.predictKv.y(positionPercent)); - if (!Number.isFinite(kv)) { - return 0.1; - } - return Math.max(0.1, kv); - } catch (error) { - this.logger.warn(`Failed to predict Kv for position=${positionPercent}: ${error.message}`); - return 0.1; - } - } - - // -------- Sequence Handlers -------- // - async executeSequence(sequenceName) { - - const sequence = this.config.sequences[sequenceName]; - - if (!sequence || sequence.size === 0) { - this.logger.warn(`Sequence '${sequenceName}' not defined.`); - return; - } - - if (this.state.getCurrentState() == "operational" && sequenceName == "shutdown") { - this.logger.info(`Machine will ramp down to position 0 before performing ${sequenceName} sequence`); - await this.setpoint(0); - } - - this.logger.info(` --------- Executing sequence: ${sequenceName} -------------`); - - for (const state of sequence) { - try { - await this.state.transitionToState(state); - // Update measurements after state change - - } catch (error) { - this.logger.error(`Error during sequence '${sequenceName}': ${error}`); - break; // Exit sequence execution on error - } - } - } - - async setpoint(setpoint) { - - try { - // Validate setpoint - if (typeof setpoint !== 'number' || setpoint < 0) { - throw new Error("Invalid setpoint: Setpoint must be a non-negative number."); - } - - // Move to the desired setpoint - await this.state.moveTo(setpoint); - - } catch (error) { - this.logger.error(`Error setting setpoint: ${error}`); - } + constructor(valveConfig = {}, stateConfig = {}, runtimeOptions = {}) { + Valve._pendingExtras = { stateConfig, runtimeOptions }; + super(valveConfig); } - updatePressure(variant,value,position,unit = this.unitPolicy.output.pressure) { - if( value === null || value === undefined) { - this.logger.warn(`Received null or undefined value for pressure update. Variant: ${variant}, Position: ${position}`); - return; - } - this.logger.debug(`Updating pressure: variant=${variant}, value=${value}, position=${position}`); + configure() { + const extras = Valve._pendingExtras || {}; + Valve._pendingExtras = null; + const stateConfig = extras.stateConfig || {}; + const runtimeOptions = extras.runtimeOptions || {}; - switch (variant) { - case ("measured"): { - // put value in measurements container - this._writeMeasurement("pressure", "measured", position, Number(value), unit); - // get latest downstream pressure measurement - const measuredDownStreamP = this._readMeasurement("pressure", "measured", "downstream", FORMULA_UNITS.pressure); - const measuredFlow = this._readMeasurement("flow", "measured", "downstream", FORMULA_UNITS.flow); - const predictedFlow = this._readMeasurement("flow", "predicted", "downstream", FORMULA_UNITS.flow); - const activeFlow = Number.isFinite(predictedFlow) ? predictedFlow : measuredFlow; - // update predicted flow measurement - this.updateDeltaPKlep(activeFlow,this.kv,measuredDownStreamP,this.rho,this.T); - break; - } + this._unitPolicyInstance = this.unitPolicy; + this.unitPolicyView = this._freezeUnitView(this.unitPolicy); + this.unitPolicy = this.unitPolicyView; - case ("predicted"): { - // put value in measurements container - this._writeMeasurement("pressure", "predicted", position, Number(value), unit); - const predictedDownStreamP = this._readMeasurement("pressure", "predicted", "downstream", FORMULA_UNITS.pressure); - const measuredFlowFromPred = this._readMeasurement("flow", "measured", "downstream", FORMULA_UNITS.flow); - const predictedFlowFromPred = this._readMeasurement("flow", "predicted", "downstream", FORMULA_UNITS.flow); - const activeFlowFromPred = Number.isFinite(predictedFlowFromPred) ? predictedFlowFromPred : measuredFlowFromPred; - this.updateDeltaPKlep(activeFlowFromPred,this.kv,predictedDownStreamP,this.rho,this.T); - break; - } + this.config = this.configUtils.updateConfig(this.config, { + general: { unit: this.unitPolicyView.output.flow }, + asset: { ...this.config.asset, unit: this.unitPolicyView.output.flow }, + }); + this.child = {}; - default: - this.logger.warn(`Unrecognized variant '${variant}' for flow update.`); - break; - } + this.state = new state(stateConfig, this.logger); + this.state.stateManager.currentState = 'operational'; + + this.kv = 0; + this.currentMode = this.config.mode.current; + + const configuredServiceType = normalizeOptional(runtimeOptions.serviceType || this.config?.asset?.serviceType); + this.expectedServiceType = configuredServiceType; + this.serviceType = configuredServiceType + || normalizeServiceType(runtimeOptions.serviceType || this.config?.asset?.serviceType); + + this.hydraulicModel = new ValveHydraulicModel( + { serviceType: this.serviceType, gasChokedRatioLimit: runtimeOptions.gasChokedRatioLimit ?? this.config?.asset?.gasChokedRatioLimit }, + this.logger + ); + this.rho = _positive(runtimeOptions.fluidDensity, this.config?.asset?.fluidDensity, this.hydraulicModel.defaultDensity); + this.T = _positive(runtimeOptions.fluidTemperatureK, this.config?.asset?.fluidTemperatureK, this.hydraulicModel.defaultTemperatureK); + + this.fluid = new FluidCompatibility({ + logger: this.logger, emitter: this.emitter, expectedServiceType: configuredServiceType, + }); + + this.model = this.config.asset?.model; + this.curvePredictor = new SupplierCurvePredictor({ + logger: this.logger, + model: this.model, + configCurve: this.config?.asset?.valveCurve, + defaultDensity: this.hydraulicModel.defaultDensity, + defaultTemperatureK: this.hydraulicModel.defaultTemperatureK, + rho: this.rho, + temperatureK: this.T, + valveDiameter: this.config?.asset?.valveDiameter, + }); + this.rho = this.curvePredictor.rho; + this.T = this.curvePredictor.T; + this.curveSelection = this.curvePredictor.curveSelection; + this.predictKv = this.curvePredictor.predictKv; + this.curve = this.curvePredictor.curve; + + this.logger.info(`Using supplier curve model='${this.model || 'inline'}', densityCurve=${this.curveSelection.densityKey}, diameter=${this.curveSelection.diameterKey}, serviceType=${this.serviceType}`); + + this.measurementRouter = new MeasurementRouter(this); + this.flowController = new FlowController(this); + + // BaseDomain pre-installs a `registerChild` that routes through + // ChildRouter. Valve owns its own upstream-fluid tracking — override + // here so the parent-side registration falls into FluidCompatibility. + this.registerChild = (child, softwareType) => this.fluid.registerChild(child, softwareType); + + this._stateUnbind = bindStateEvents({ + state: this.state, + onPositionChange: () => this.updatePosition(), + }); } + _freezeUnitView(p) { + const slot = (m, k) => (typeof p[m] === 'function' ? p[m](k) : p[m]?.[k]); + return Object.freeze({ + canonical: Object.freeze({ + pressure: slot('canonical', 'pressure'), + flow: slot('canonical', 'flow'), + temperature: slot('canonical', 'temperature'), + }), + output: Object.freeze({ + pressure: slot('output', 'pressure'), + flow: slot('output', 'flow'), + temperature: slot('output', 'temperature'), + }), + }); + } + + // ── config + mode ────────────────────────────────────────────────── + updateConfig(newConfig) { + this.config = this.configUtils.updateConfig(this.config, newConfig); + } + + setMode(newMode) { + const available = Array.isArray(this.defaultConfig?.mode?.current?.rules?.values) + ? this.defaultConfig.mode.current.rules.values.map((v) => v.value) + : Object.keys(this.config?.mode?.allowedSources || {}); + if (!available.includes(newMode)) { + this.logger.warn(`Invalid mode '${newMode}'. Allowed modes are: ${available.join(', ')}`); + return; + } + this.currentMode = newMode; + this.logger.info(`Mode successfully changed to '${newMode}'.`); + } + + isValidSourceForMode(source, mode) { return this.flowController.isValidSourceForMode(source, mode); } + handleInput(source, action, parameter) { return this.flowController.handleInput(source, action, parameter); } + executeSequence(name) { return this.flowController.executeSequence(name); } + setpoint(value) { return this.flowController.setpoint(value); } + + // ── measurement helpers used by router + io ──────────────────────── + _outputUnitForType(type) { + switch (String(type || '').toLowerCase()) { + case 'flow': return this.unitPolicyView.output.flow; + case 'pressure': return this.unitPolicyView.output.pressure; + case 'temperature': return this.unitPolicyView.output.temperature; + default: return null; + } + } + _readMeasurement(type, variant, position, unit) { + const u = unit || this._outputUnitForType(type); + return this.measurements.type(type).variant(variant).position(position).getCurrentValue(u || undefined); + } + _writeMeasurement(type, variant, position, value, unit, timestamp = Date.now()) { + if (!Number.isFinite(value)) return; + this.measurements.type(type).variant(variant).position(position).value(value, timestamp, unit || undefined); + } + + updatePressure(variant, value, position, unit) { + return this.measurementRouter.updatePressure(variant, value, position, unit); + } + updateFlow(variant, value, position, unit) { + return this.measurementRouter.updateFlow(variant, value, position, unit); + } updateMeasurement(variant, subType, value, position, unit) { - this.logger.debug(`---------------------- updating ${subType} ------------------ `); - switch (subType) { - case "pressure": - // Update pressure measurement - this.updatePressure(variant,value,position, unit || this.unitPolicy.output.pressure); - break; - case "flow": - this.updateFlow(variant,value,position, unit || this.unitPolicy.output.flow); - break; - case "power": - // Update power measurement - break; - default: - this.logger.error(`Type '${subType}' not recognized for measured update.`); - return; - } + return this.measurementRouter.updateMeasurement(variant, subType, value, position, unit); } - - // NOTE: q in m3/h (normalized basis), downstreamP in mbar(g), temp in K - updateDeltaPKlep(q,kv,downstreamP,rho,temp){ - const result = this.hydraulicModel.calculateDeltaPMbar({ - qM3h: q, - kv, - downstreamGaugeMbar: downstreamP, - rho, - tempK: temp, - }); - if (!result || !Number.isFinite(result.deltaPMbar)) { - return; - } - - const deltaP = result.deltaPMbar; - this.deltaPKlep = deltaP; - this.hydraulicDiagnostics = result.details || null; - - this._writeMeasurement("pressure", "predicted", "delta", deltaP, this.unitPolicy.output.pressure); - this.logger.info('DeltaP updated to: ' + deltaP); - - this.emitter.emit('deltaPChange', deltaP); // Emit event to notify valveGroupController of deltaP change - this.logger.info('DeltaPChange emitted to valveGroupController'); + updateDeltaPKlep(q, kv, downstreamP /*, rho, T */) { + return this.measurementRouter.updateDeltaP(q, kv, downstreamP); } + updatePosition() { return this.measurementRouter.updatePositionDependent(); } - - // Als er een nieuwe flow door de klep komt doordat de machines harder zijn gaan werken, dan update deze functie dit ook in de valve attributes en measurements - updateFlow(variant,value,position,unit = this.unitPolicy.output.flow) { - if( value === null || value === undefined) { - this.logger.warn(`Received null or undefined value for flow update. Variant: ${variant}, Position: ${position}`); - return; - } - this.logger.debug(`Updating flow: variant=${variant}, value=${value}, position=${position}`); - - switch (variant) { - case ("measured"): { - // put value in measurements container - this._writeMeasurement("flow", "measured", position, Number(value), unit); - // get latest downstream pressure measurement - const measuredDownStreamP = this._readMeasurement("pressure", "measured", "downstream", FORMULA_UNITS.pressure); - const measuredFlow = this._readMeasurement("flow", "measured", position, FORMULA_UNITS.flow); - // update predicted flow measurement - this.updateDeltaPKlep(measuredFlow,this.kv,measuredDownStreamP,this.rho,this.T); - break; - } - - case ("predicted"): { - // put value in measurements container - this._writeMeasurement("flow", "predicted", position, Number(value), unit); - const predictedDownStreamP = this._readMeasurement("pressure", "measured", "downstream", FORMULA_UNITS.pressure); - const predictedFlow = this._readMeasurement("flow", "predicted", position, FORMULA_UNITS.flow); - this.updateDeltaPKlep(predictedFlow,this.kv,predictedDownStreamP,this.rho,this.T); - break; - } - - default: - this.logger.warn(`Unrecognized variant '${variant}' for flow update.`); - break; - } - } - - updatePosition() { //update alle parameters nadat er een verandering is geweest in stand van klep - if (this.state.getCurrentState() == "operational" || this.state.getCurrentState() == "accelerating" || this.state.getCurrentState() == "decelerating") { - - this.logger.debug('Calculating new deltaP'); - const currentPosition = this.state.getCurrentPosition(); - const measuredFlow = this._readMeasurement("flow", "measured", "downstream", FORMULA_UNITS.flow); - const predictedFlow = this._readMeasurement("flow", "predicted", "downstream", FORMULA_UNITS.flow); - const currentFlow = Number.isFinite(predictedFlow) ? predictedFlow : measuredFlow; - - const downstreamP = this._readMeasurement("pressure", "measured", "downstream", FORMULA_UNITS.pressure); - const x = currentPosition; // dit is de positie van de klep waarvoor we delta P willen berekenen - const y = this._predictKvForPosition(x); // haal de waarde van kv op uit de supplierscurve - - this.kv = y; //update de kv waarde in de valve class - this.logger.debug(`Kv value for position valve ${x} is ${this.kv}`); // log de waarde van kv - - this.updateDeltaPKlep(currentFlow,this.kv,downstreamP,this.rho,this.T); //update deltaP - - } - } + // ── fluid contract delegates ─────────────────────────────────────── + getFluidCompatibility() { return this.fluid.getCompatibility(); } + getFluidContract() { return this.fluid.getContract(); } + // ── display + diagnostics ────────────────────────────────────────── showCurve() { return { model: this.model || null, serviceType: this.serviceType, expectedServiceType: this.expectedServiceType, gasChokedRatioLimit: this.hydraulicModel?.gasChokedRatioLimit, - selectedDensity: this.curveSelection?.densityKey ?? null, - selectedDiameter: this.curveSelection?.diameterKey ?? null, - curve: this.predictKv?.currentFxyCurve?.[this.predictKv?.fDimension] || null, + ...this.curvePredictor.snapshot(), hydraulics: this.hydraulicDiagnostics || null, }; } - destroy() { - if (this._onPositionChange && this.state?.emitter?.off) { - this.state.emitter.off("positionChange", this._onPositionChange); - } - for (const { emitter, handler } of this._fluidContractListeners.values()) { - if (typeof emitter?.off === 'function') { - emitter.off('fluidContractChange', handler); - } else if (typeof emitter?.removeListener === 'function') { - emitter.removeListener('fluidContractChange', handler); - } - } - this._fluidContractListeners.clear(); + getOutput() { return io.buildOutput(this); } + getStatusBadge() { return io.buildStatusBadge(this); } + + destroy() { this.close(); } + + close() { + this._stateUnbind?.(); + this.fluid?.destroy(); + super.close?.(); } - - getOutput() { +} - // Improved output object generation - const output = {}; - //build the output object - Object.entries(this.measurements.measurements || {}).forEach(([type, variants]) => { - Object.entries(variants || {}).forEach(([variant, positions]) => { - Object.keys(positions || {}).forEach((position) => { - const value = this._readMeasurement(type, variant, position, this._outputUnitForType(type)); - if (value != null) { - output[`${position}_${variant}_${type}`] = value; - } - }); - }); - }); - - //fill in the rest of the output object - output["state"] = this.state.getCurrentState(); - output["percentageOpen"] = this.state.getCurrentPosition(); - output["moveTimeleft"] = this.state.getMoveTimeLeft(); - output["mode"] = this.currentMode; - - //this.logger.debug(`Output: ${JSON.stringify(output)}`); - - return output; +function _positive(...candidates) { + for (const c of candidates) { + const n = Number(c); + if (Number.isFinite(n) && n > 0) return n; } - + return undefined; } module.exports = Valve; - - - diff --git a/src/state/stateBindings.js b/src/state/stateBindings.js new file mode 100644 index 0000000..c46e22c --- /dev/null +++ b/src/state/stateBindings.js @@ -0,0 +1,17 @@ +'use strict'; + +// Bind the underlying state machine's positionChange event to the host's +// updatePosition() hook. Returns an unbind function for clean teardown. + +function bindStateEvents({ state, onPositionChange }) { + const handler = (data) => onPositionChange?.(data); + state.emitter.on('positionChange', handler); + return () => { + if (typeof state.emitter.off === 'function') state.emitter.off('positionChange', handler); + else if (typeof state.emitter.removeListener === 'function') { + state.emitter.removeListener('positionChange', handler); + } + }; +} + +module.exports = { bindStateEvents };