// MachineGroup — S88 Unit orchestrator coordinating rotatingMachine children. // // All real work lives in the concern modules under src/{groupOps,totals, // combinatorics,optimizer,efficiency,dispatch,control}. This file stitches // them together: child-event routing, demand serialization, mode selection, // and the per-mode dispatch switch. // // Operator demand is always passed in here as a canonical m³/s number. The // set.demand handler resolves units (%, m³/h, l/s, etc.) before calling // handleInput, so this orchestrator has no scaling state and no unit logic. 'use strict'; const { BaseDomain, UnitPolicy, POSITIONS, interpolation } = require('generalFunctions'); const GroupOperatingPoint = require('./groupOps/groupOperatingPoint'); const groupCurves = require('./groupOps/groupCurves'); const TotalsCalculator = require('./totals/totalsCalculator'); const { validPumpCombinations } = require('./combinatorics/pumpCombinations'); const optimizer = require('./optimizer'); const GroupEfficiency = require('./efficiency/groupEfficiency'); const control = require('./control/strategies'); const io = require('./io/output'); const DemandDispatcher = require('./dispatch/demandDispatcher'); const { buildProfile } = require('./movement/machineProfile'); const movementScheduler = require('./movement/movementScheduler'); const MovementExecutor = require('./movement/movementExecutor'); const ACTIVE_STATES = new Set(['operational', 'accelerating', 'decelerating']); // Canonical mode names (camelCase). The dispatcher already lowercases for its // switch, but we normalise at setMode so this.mode is always in the canonical // form — keeps allowedActions/allowedSources lookups (which key on the // canonical form) honest. Module-level so tests can import without spinning // up a full MachineGroup instance. const ALLOWED_MODES = ['optimalControl', 'priorityControl', 'maintenance']; function _normaliseMode(input) { const lc = String(input || '').toLowerCase(); return ALLOWED_MODES.find((m) => m.toLowerCase() === lc) || null; } class MachineGroup extends BaseDomain { static name = 'machineGroupControl'; static unitPolicy = UnitPolicy.declare({ canonical: { flow: 'm3/s', pressure: 'Pa', power: 'W', temperature: 'K' }, output: { flow: 'm3/h', pressure: 'mbar', power: 'kW', temperature: 'C' }, requireUnitForTypes: ['pressure', 'flow', 'power', 'temperature'], }); configure() { this.interpolation = new interpolation(); // Plain id-keyed maps so tests + tight-loop iteration stay readable. // The router populates them via the onRegister handlers below; legacy // tests still write directly (matches the pumpingStation pattern). this.machines = {}; // Persisted flows may have stored the mode in lowercase (legacy editor // behaviour); normalise at construction so allow-list lookups against // the schema's camelCase keys work consistently. Fallback to // optimalControl if the persisted value is missing/garbage so a typo // doesn't quietly disable dispatch. this.mode = _normaliseMode(this.config.mode.current) || 'optimalControl'; this.absDistFromPeak = 0; this.relDistFromPeak = 0; this.dynamicTotals = { flow: { min: Infinity, max: 0 }, power: { min: Infinity, max: 0 }, NCog: 0 }; this.absoluteTotals = { flow: { min: Infinity, max: 0 }, power: { min: Infinity, max: 0 } }; // Latest-wins demand gate. Awaiting handleInput resolves when THIS // call's dispatch settles (LatestWinsGate.fireAndWait); a parked // call that is later superseded resolves with { superseded: true }. this._demandDispatcher = new DemandDispatcher( { logger: this.logger }, (payload) => this._runDispatch(payload.source, payload.demand, payload.powerCap, payload.priorityList), ); this._shutdownInFlight = new Set(); // Tick-driven executor for the movement schedule produced by the // planner. MGC owns the wall-clock setInterval that calls tick(); // the executor itself is pure (testable without timers). this.movementExecutor = new MovementExecutor({ logger: this.logger, fireCommand: (cmd) => this._fireSchedulerCommand(cmd), }); this._executorTimer = null; this._executorIntervalMs = 1000; this.operatingPoint = new GroupOperatingPoint({ measurements: this.measurements, machines: this.machines, unitPolicy: this.unitPolicy, logger: this.logger, }); this.totals = new TotalsCalculator({ machines: this.machines, unitPolicy: this.unitPolicy, logger: this.logger, operatingPoint: this.operatingPoint, isMachineActive: (id) => this.isMachineActive(id), }); this.efficiency = new GroupEfficiency({ logger: this.logger, interpolation: this.interpolation, measurements: this.measurements, machines: this.machines, }); this.router .onRegister('machine', (child) => { const id = child.config.general.id; if (this.machines[id]) { this.logger.warn(`Machine ${id} is already registered.`); return; } this.machines[id] = child; }) .onMeasurement('machine', { type: 'pressure', position: POSITIONS.DOWNSTREAM }, () => this.handlePressureChange()) .onMeasurement('machine', { type: 'pressure', position: 'differential' }, () => this.handlePressureChange()) .onPrediction('machine', { type: 'flow', position: POSITIONS.DOWNSTREAM }, () => this.handlePressureChange()) .onRegister('measurement', (child) => { const position = child.config?.functionality?.positionVsParent || child.config?.general?.positionVsParent; const measurementType = child.config?.asset?.type; if (!measurementType || !position) { this.logger.warn(`Measurement child ${child.config?.general?.id} missing asset.type or positionVsParent — skipping`); return; } const eventName = `${measurementType}.measured.${String(position).toLowerCase()}`; child.measurements.emitter.on(eventName, (eventData = {}) => { this.measurements .type(measurementType).variant('measured').position(position) .value(eventData.value, eventData.timestamp, eventData.unit); if (measurementType === 'pressure') this.handlePressureChange(); }); }); this.logger.info('MachineGroup initialized.'); } context() { return Object.freeze({ ...super.context(), mgc: this, machines: this.machines, groupCurves, readChildMeasurement: (m, t, v, p, u) => this.operatingPoint.readChild(m, t, v, p, u), POSITIONS, }); } // ── Surface kept for tests + commands ────────────────────────────── // Mirror of rotatingMachine/src/specificClass.js:329-339 — same pattern, // mode/source allow-lists live in this.config.mode (loaded from the // schema as Set instances). Anything not declared in the schema is // dropped silently with a warn-level log. isValidActionForMode(action, mode) { const ok = !!this.config?.mode?.allowedActions?.[mode]?.has?.(action); if (ok) this.logger.debug(`action '${action}' allowed in mode '${mode}'`); else this.logger.warn(`action '${action}' not allowed in mode '${mode}'`); return ok; } isValidSourceForMode(source, mode) { const ok = !!this.config?.mode?.allowedSources?.[mode]?.has?.(source); if (ok) this.logger.debug(`source '${source}' allowed in mode '${mode}'`); else this.logger.warn(`source '${source}' not allowed in mode '${mode}'`); return ok; } setMode(mode) { const canonical = _normaliseMode(mode); if (!canonical) { this.logger.warn(`Invalid mode '${mode}'. Allowed: ${ALLOWED_MODES.join(', ')}`); return; } this.mode = canonical; this.notifyOutputChanged(); } isMachineActive(id) { const s = this.machines[id]?.state?.getCurrentState?.(); return ACTIVE_STATES.has(s); } equalizePressure() { this.operatingPoint.equalize(); } calcAbsoluteTotals() { return (this.absoluteTotals = this.totals.calcAbsoluteTotals()); } calcDynamicTotals() { return (this.dynamicTotals = this.totals.calcDynamicTotals()); } activeTotals() { return this.totals.activeTotals(); } calcGroupEfficiency(machines) { return this.efficiency.calcGroupEfficiency(machines); } calcDistanceBEP(eff, max, min) { const d = this.efficiency.calcDistanceBEP(eff, max, min); this.absDistFromPeak = d.absDistFromPeak; this.relDistFromPeak = d.relDistFromPeak; return d; } validPumpCombinations(machines, Qd, powerCap = Infinity) { return validPumpCombinations(machines, Qd, this.context(), powerCap); } calcBestCombination(combinations, Qd) { return optimizer.calcBestCombination(combinations, Qd, this.context()); } calcBestCombinationBEPGravitation(combinations, Qd, method = 'BEP-Gravitation-Directional') { return optimizer.calcBestCombinationBEPGravitation(combinations, Qd, this.context(), method); } handlePressureChange() { this.operatingPoint.equalize(); const totals = this.calcDynamicTotals(); const fUnit = this.unitPolicy.canonical.flow; const pUnit = this.unitPolicy.canonical.power; this.operatingPoint.writeOwn('flow', 'predicted', POSITIONS.AT_EQUIPMENT, totals.flow.act, fUnit); // Mirror live aggregate onto DOWNSTREAM — PS subscribes here for the // outflow signal. See preserve-tests/ps-mgc-flow-contract regression. this.operatingPoint.writeOwn('flow', 'predicted', POSITIONS.DOWNSTREAM, totals.flow.act, fUnit); this.operatingPoint.writeOwn('power', 'predicted', POSITIONS.AT_EQUIPMENT, totals.power.act, pUnit); const { maxEfficiency, lowestEfficiency } = this.efficiency.calcGroupEfficiency(this.machines); const eff = this.measurements.type('efficiency').variant('predicted').position(POSITIONS.AT_EQUIPMENT).getCurrentValue() ?? null; this.calcDistanceBEP(eff, maxEfficiency, lowestEfficiency); this.notifyOutputChanged(); } async abortActiveMovements(reason = 'new demand') { await Promise.all(Object.values(this.machines).map(async machine => { const state = machine.state?.getCurrentState?.(); if (state !== 'accelerating' && state !== 'decelerating') return; this.logger.warn(`Force-aborting in-flight movement on ${machine.config.general.id} (state=${state}) due to: ${reason}.`); if (typeof machine.abortMovement === 'function') await machine.abortMovement(reason); })); } async _optimalControl(Qd, powerCap = Infinity) { if (Object.keys(this.machines).length === 0) { this.logger.warn('No machines registered. Cannot execute optimal control.'); return; } this.operatingPoint.equalize(); const dt = this.dynamicTotals; const machineStates = Object.entries(this.machines).reduce((acc, [id, m]) => { acc[id] = m.state.getCurrentState(); return acc; }, {}); if (Qd <= 0) { await this.turnOffAllMachines(); return; } if (Qd < dt.flow.min) Qd = dt.flow.min; else if (Qd > dt.flow.max) Qd = dt.flow.max; const combinations = this.validPumpCombinations(this.machines, Qd, powerCap); if (!combinations || combinations.length === 0) { this.logger.warn(`Demand: ${Qd.toFixed(2)} -> No valid combination found (empty set).`); return; } const method = this.config.optimization?.method || 'BEP-Gravitation-Directional'; const ctx = this.context(); let bestResult; if (method === 'NCog') { bestResult = optimizer.calcBestCombination(combinations, Qd, ctx); } else if (method === 'BEP-Gravitation' || method === 'BEP-Gravitation-Directional') { bestResult = optimizer.calcBestCombinationBEPGravitation(combinations, Qd, ctx, method); } else { this.logger.warn(`Unknown optimization method '${method}', falling back to BEP-Gravitation-Directional.`); bestResult = optimizer.calcBestCombinationBEPGravitation(combinations, Qd, ctx, 'BEP-Gravitation-Directional'); } if (bestResult.bestCombination === null) { this.logger.warn(`Demand: ${Qd.toFixed(2)} -> No valid combination found => not updating control.`); return; } // INTENT lands on AT_EQUIPMENT only; DOWNSTREAM is the live aggregate. this.operatingPoint.writeOwn('power', 'predicted', POSITIONS.AT_EQUIPMENT, bestResult.bestPower, this.unitPolicy.canonical.power); this.operatingPoint.writeOwn('flow', 'predicted', POSITIONS.AT_EQUIPMENT, bestResult.bestFlow, this.unitPolicy.canonical.flow); // Hydraulic efficiency η = (Q·ΔP)/P_shaft — a dimensionless 0..1 // ratio in the same scale as each child rotatingMachine's `cog`. // Keeps `calcDistanceBEP(eff, maxEfficiency, lowestEfficiency)` in // handlePressureChange comparing apples to apples. const dP = this.operatingPoint.headerDiffPa; if (Number.isFinite(dP) && dP > 0 && bestResult.bestPower > 0) { this.measurements.type('efficiency').variant('predicted').position(POSITIONS.AT_EQUIPMENT) .value((bestResult.bestFlow * dP) / bestResult.bestPower); } this.measurements.type('Ncog').variant('predicted').position(POSITIONS.AT_EQUIPMENT).value(bestResult.bestCog); const distribution = bestResult.bestCombination.map((it) => ({ machineId: String(it.machineId), flow: it.flow })); await this._dispatchFlowDistribution(distribution); } // Shared dispatch path used by every control strategy. Takes a flow // distribution {machineId, flow}[] and routes it through the planner // and executor. Same-time-landing (rendezvous) is the default and can // be turned off via config.planner.useRendezvous, in which case every // command fires at tick 0 (legacy fire-and-forget behaviour, like the // pre-planner equalFlowControl). async _dispatchFlowDistribution(distribution) { const profiles = Object.values(this.machines).map((m) => buildProfile(m)); const headerPa = Number.isFinite(this.operatingPoint.headerDiffPa) ? this.operatingPoint.headerDiffPa : 0; const useRendezvous = this.config?.planner?.useRendezvous !== false; // default true const schedule = movementScheduler.plan(profiles, distribution, headerPa, { tickS: 1, useRendezvous }); this.movementExecutor.replan(schedule); // AWAIT the first tick to preserve the race-favouring behaviour // of the original code. The new move's full chain (residue // handler → operational → ramp) settles before _runDispatch // returns; the in-flight shutdown sequence's for-loop runs on // other microtasks but its invalid-transition exits truncate it. await this.movementExecutor.tick(); this._ensureExecutorTimer(); if (this.logger?.debug) { this.logger.debug(`MGC planner: ${schedule.commands.length} commands queued, tStar=${schedule.tStarS.toFixed(1)}s, rendezvous=${useRendezvous}`); } } // Dispatch one scheduled command to the appropriate child. Returns // synchronously — the underlying handleInput is fire-and-forget from // the executor's perspective, mirroring the existing optimal-control // behaviour where commands are scheduled, not awaited. _fireSchedulerCommand(cmd) { const machine = this.machines[cmd.machineId]; if (!machine) { this.logger?.warn?.(`Scheduler fired ${cmd.action} for unknown machine ${cmd.machineId}`); return undefined; } const handle = typeof machine.handleInput === 'function' ? machine.handleInput.bind(machine) : null; if (!handle) return undefined; if (cmd.action === 'execsequence') { return Promise.resolve(handle('parent', 'execsequence', cmd.sequence)) .catch((e) => this.logger?.error?.(`execsequence ${cmd.sequence} on ${cmd.machineId} failed: ${e?.message || e}`)); } if (cmd.action === 'flowmovement') { const outFlow = this._canonicalToOutputFlow(cmd.flow); return Promise.resolve(handle('parent', 'flowmovement', outFlow)) .catch((e) => this.logger?.error?.(`flowmovement on ${cmd.machineId} failed: ${e?.message || e}`)); } return undefined; } // Wall-clock driver for the executor. Auto-stops when there's nothing // pending so we don't burn a forever-running setInterval. _ensureExecutorTimer() { if (this._executorTimer) return; this._executorTimer = setInterval(() => { this.movementExecutor.tick(); if (this.movementExecutor.pending() === 0) { clearInterval(this._executorTimer); this._executorTimer = null; } }, this._executorIntervalMs); // Unref so the timer doesn't keep Node-RED alive on shutdown. if (typeof this._executorTimer.unref === 'function') this._executorTimer.unref(); } // Stop the executor's wall-clock driver. Called from teardown paths. _stopExecutorTimer() { if (this._executorTimer) { clearInterval(this._executorTimer); this._executorTimer = null; } } // Returns when THIS call's dispatch settles. If overwritten by a later // handleInput() while parked behind an in-flight dispatch, resolves // with the LatestWinsGate.SUPERSEDED sentinel ({ superseded: true }). async handleInput(source, demand, powerCap = Infinity, priorityList = null) { return this._demandDispatcher.fireAndWait({ source, demand, powerCap, priorityList }); } // Operator-style entry point: accepts a (value, unit) pair and resolves // to canonical m³/s before delegating to handleInput. Single source of // truth for the unit math shared by the set.demand command handler and // by parent nodes (e.g. pumpingStation level-based control) that hold a // direct reference to this specificClass and need to push a % demand // without re-implementing the interpolation. Negative value is the // stop-all signal regardless of unit. async setDemand(value, unit = '%') { const v = Number(value); if (!Number.isFinite(v)) { this.logger?.error?.(`setDemand: invalid value '${value}'`); return undefined; } if (v < 0) { await this.turnOffAllMachines(); return undefined; } let canonical; if (unit === '%') { const dt = this.calcDynamicTotals(); canonical = this.interpolation.interpolate_lin_single_point( v, 0, 100, dt.flow.min, dt.flow.max); } else { try { canonical = this.unitPolicy.convert(v, unit, 'm3/s', 'setDemand absolute flow'); } catch (err) { this.logger?.error?.(`setDemand: cannot convert ${v} ${unit} -> m3/s: ${err?.message || err}`); return undefined; } } return this.handleInput('parent', canonical); } async _runDispatch(source, demand, powerCap, priorityList) { const demandQ = parseFloat(demand); if (!Number.isFinite(demandQ)) { this.logger.error(`Invalid flow demand input: ${demand}.`); return; } // Demand is canonical m³/s (the handler has already resolved units). // The handler routes negatives directly to turnOffAllMachines, but // keep a defensive check in case turnOff-state arrives some other way. if (demandQ <= 0) { await this.turnOffAllMachines(); return; } await this.abortActiveMovements('new demand received'); const dt = this.calcDynamicTotals(); // Clamp against the current-pressure envelope. let demandQout = demandQ; if (demandQout < dt.flow.min) demandQout = dt.flow.min; else if (demandQout > dt.flow.max) demandQout = dt.flow.max; // Normalize for the switch — schema enum values use camelCase // (optimalControl, priorityControl) while legacy callers send // lowercase. Accept both rather than silently falling through. const ctx = { mgc: this }; switch (String(this.mode || '').toLowerCase()) { case 'prioritycontrol': await control.equalFlowControl(ctx, demandQout, powerCap, priorityList); break; case 'optimalcontrol': await this._optimalControl(demandQout, powerCap); break; default: this.logger.warn(`${this.mode} is not a valid mode.`); } const { maxEfficiency, lowestEfficiency } = this.efficiency.calcGroupEfficiency(this.machines); const eff = this.measurements.type('efficiency').variant('predicted').position(POSITIONS.AT_EQUIPMENT).getCurrentValue(); this.calcDistanceBEP(eff, maxEfficiency, lowestEfficiency); this.notifyOutputChanged(); } async turnOffAllMachines() { // Cancel any parked demand — turnOff is latest user intent so a // pending fireAndWait must not re-engage pumps post-shutdown. this._demandDispatcher.cancelPending(); await Promise.all(Object.entries(this.machines).map(async ([id, machine]) => { if (this._shutdownInFlight.has(id)) return; if (this.isMachineActive(id)) { this._shutdownInFlight.add(id); try { await machine.handleInput('parent', 'execsequence', 'shutdown'); } finally { this._shutdownInFlight.delete(id); } } })); const fUnit = this.unitPolicy.canonical.flow; const pUnit = this.unitPolicy.canonical.power; this.operatingPoint.writeOwn('flow', 'predicted', POSITIONS.DOWNSTREAM, 0, fUnit); this.operatingPoint.writeOwn('flow', 'predicted', POSITIONS.AT_EQUIPMENT, 0, fUnit); this.operatingPoint.writeOwn('power', 'predicted', POSITIONS.AT_EQUIPMENT, 0, pUnit); this.notifyOutputChanged(); } _canonicalToOutputFlow(value) { return this.unitPolicy.convert( value, this.unitPolicy.canonical.flow, this.unitPolicy.output.flow, 'canonical->output flow', ); } getOutput() { return io.getOutput(this); } getStatusBadge() { return io.getStatusBadge(this); } } module.exports = MachineGroup; // Module-level helpers exposed for unit tests. module.exports._normaliseMode = _normaliseMode; module.exports.ALLOWED_MODES = ALLOWED_MODES;