Compare commits
8 Commits
8e684203a8
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea2857fb25 | ||
|
|
2651aaf409 | ||
|
|
df74ea0fac | ||
|
|
96b84d3124 | ||
|
|
a14aa0dab8 | ||
|
|
69bdf11fc4 | ||
|
|
dc27a569d9 | ||
|
|
b7c40b0ddc |
@@ -74,6 +74,16 @@ class MachineGroup {
|
||||
// Combination curve data
|
||||
this.dynamicTotals = { flow: { min: Infinity, max: 0 }, power: { min: Infinity, max: 0 } , NCog : 0};
|
||||
this.absoluteTotals = { flow: { min: Infinity, max: 0 }, power: { min: Infinity, max: 0 }};
|
||||
|
||||
// Dispatch serialization. PS ticks demand into MGC at 1 Hz, but
|
||||
// a real pump ramp takes several seconds — without this gate
|
||||
// every PS tick aborts the in-flight dispatch and starts a new
|
||||
// one, so pumps never reach their setpoint. Mirrors
|
||||
// rotatingMachine state.delayedMove: while a dispatch is in
|
||||
// flight the latest demand is parked here for pickup when the
|
||||
// current dispatch settles. Latest-wins.
|
||||
this._dispatchInFlight = false;
|
||||
this._delayedCall = null;
|
||||
//this always last in the constructor
|
||||
this.childRegistrationUtils = new childRegistrationUtils(this);
|
||||
|
||||
@@ -283,6 +293,18 @@ class MachineGroup {
|
||||
|
||||
this.logger.debug(`Dynamic Totals after pressure change - Flow: Min ${flow.min}, Max ${flow.max}, Act ${flow.act} | Power: Min ${power.min}, Max ${power.max}, Act ${power.act}`);
|
||||
this._writeMeasurement("flow", "predicted", POSITIONS.AT_EQUIPMENT, flow.act, this.unitPolicy.canonical.flow);
|
||||
// Mirror the aggregate flow onto DOWNSTREAM as well. PS subscribes to
|
||||
// flow.predicted.downstream from MGC and uses it as the outflow
|
||||
// estimate for net-flow computation. Without this mirror, the only
|
||||
// place DOWNSTREAM gets written is optimalControl's bestFlow (the
|
||||
// optimizer's TARGET, not the achieved aggregate). During transients
|
||||
// — e.g. demand dropping to dead-band keep-alive while pumps are
|
||||
// still ramping down from full throttle — PS would see a stale
|
||||
// 25 m³/h target while pumps are physically delivering 500+ m³/h,
|
||||
// making netFlow look small and stable when the basin is actually
|
||||
// draining fast. flow.act here is the sum of every pump's current
|
||||
// predicted output, so it IS the achieved aggregate.
|
||||
this._writeMeasurement("flow", "predicted", POSITIONS.DOWNSTREAM, flow.act, this.unitPolicy.canonical.flow);
|
||||
this._writeMeasurement("power", "predicted", POSITIONS.AT_EQUIPMENT, power.act, this.unitPolicy.canonical.power);
|
||||
|
||||
const { maxEfficiency, lowestEfficiency } = this.calcGroupEfficiency(this.machines);
|
||||
@@ -697,8 +719,17 @@ class MachineGroup {
|
||||
}
|
||||
|
||||
async abortActiveMovements(reason = "new demand") {
|
||||
// Safety net: in normal operation the _dispatchInFlight gate
|
||||
// (handleInput) ensures no pump movement is in flight when a
|
||||
// new dispatch starts, so this is a no-op. If a pump IS still
|
||||
// moving here, the gate was bypassed (direct call to
|
||||
// abortActiveMovements, mode change racing a dispatch, etc.) —
|
||||
// surface that loudly so the bypass can be diagnosed.
|
||||
const movementStates = new Set(['accelerating', 'decelerating']);
|
||||
await Promise.all(Object.values(this.machines).map(async machine => {
|
||||
this.logger.warn(`Aborting active movements for machine ${machine.config.general.id} due to: ${reason}`);
|
||||
const state = machine.state?.getCurrentState?.();
|
||||
if (!movementStates.has(state)) return;
|
||||
this.logger.warn(`Force-aborting in-flight movement on ${machine.config.general.id} (state=${state}) due to: ${reason} — _dispatchInFlight gate bypassed.`);
|
||||
if (typeof machine.abortMovement === "function") {
|
||||
await machine.abortMovement(reason);
|
||||
}
|
||||
@@ -772,9 +803,15 @@ class MachineGroup {
|
||||
const debugInfo = bestResult.bestCombination.map(({ machineId, flow }) => `${machineId}: ${flow.toFixed(2)} units`).join(" | ");
|
||||
this.logger.debug(`Moving to demand: ${Qd.toFixed(2)} -> Pumps: [${debugInfo}] => Total Power: ${bestResult.bestPower.toFixed(2)}`);
|
||||
|
||||
//store the total delivered power
|
||||
// Store the optimizer's INTENT on AT_EQUIPMENT (what we
|
||||
// commanded). DOWNSTREAM is reserved for the live aggregate
|
||||
// written by handlePressureChange — PS subscribes to that
|
||||
// for net-flow computation and must see what pumps are
|
||||
// actually delivering, not the planned target. Writing
|
||||
// bestFlow to DOWNSTREAM here would clobber the live value
|
||||
// every handleInput tick (see ps-mgc-flow-contract test).
|
||||
this._writeMeasurement("power", "predicted", POSITIONS.AT_EQUIPMENT, bestResult.bestPower, this.unitPolicy.canonical.power);
|
||||
this._writeMeasurement("flow", "predicted", POSITIONS.DOWNSTREAM, bestResult.bestFlow, this.unitPolicy.canonical.flow);
|
||||
this._writeMeasurement("flow", "predicted", POSITIONS.AT_EQUIPMENT, bestResult.bestFlow, this.unitPolicy.canonical.flow);
|
||||
this.measurements.type("efficiency").variant("predicted").position(POSITIONS.AT_EQUIPMENT).value(bestResult.bestFlow / bestResult.bestPower);
|
||||
this.measurements.type("Ncog").variant("predicted").position(POSITIONS.AT_EQUIPMENT).value(bestResult.bestCog);
|
||||
|
||||
@@ -1108,9 +1145,11 @@ class MachineGroup {
|
||||
|
||||
this.logger.debug(`Priority control for demand: ${totalFlow.toFixed(2)} -> Active pumps: [${debugInfo}] => Total Power: ${totalPower.toFixed(2)}`);
|
||||
|
||||
// Store measurements
|
||||
// Store the planned distribution as INTENT on AT_EQUIPMENT.
|
||||
// DOWNSTREAM (live aggregate) is owned by handlePressureChange.
|
||||
// Writing the plan here would clobber PS's outflow signal.
|
||||
this._writeMeasurement("power", "predicted", POSITIONS.AT_EQUIPMENT, totalPower, this.unitPolicy.canonical.power);
|
||||
this._writeMeasurement("flow", "predicted", POSITIONS.DOWNSTREAM, totalFlow, this.unitPolicy.canonical.flow);
|
||||
this._writeMeasurement("flow", "predicted", POSITIONS.AT_EQUIPMENT, totalFlow, this.unitPolicy.canonical.flow);
|
||||
this.measurements.type("efficiency").variant("predicted").position(POSITIONS.AT_EQUIPMENT).value(totalFlow / totalPower);
|
||||
this.measurements.type("Ncog").variant("predicted").position(POSITIONS.AT_EQUIPMENT).value(totalCog);
|
||||
|
||||
@@ -1121,16 +1160,18 @@ class MachineGroup {
|
||||
this.logger.debug(this.machines[machineId].state);
|
||||
const currentState = this.machines[machineId].state.getCurrentState();
|
||||
|
||||
if (flow <= 0 && (currentState === "operational" || currentState === "accelerating" || currentState === "decelerating")) {
|
||||
// Same dispatch shape as optimalControl — see the comment
|
||||
// there for the rationale. flowmovement BEFORE startup so
|
||||
// concurrent retargets can update delayedMove without a
|
||||
// stale chained flowmovement overwriting it after startup.
|
||||
if (flow > 0) {
|
||||
await machine.handleInput("parent", "flowmovement", this._canonicalToOutputFlow(flow));
|
||||
if (currentState === "idle") {
|
||||
await machine.handleInput("parent", "execsequence", "startup");
|
||||
}
|
||||
} else if (currentState === "operational" || currentState === "accelerating" || currentState === "decelerating") {
|
||||
await machine.handleInput("parent", "execsequence", "shutdown");
|
||||
}
|
||||
else if (currentState === "idle" && flow > 0) {
|
||||
await machine.handleInput("parent", "execsequence", "startup");
|
||||
await machine.handleInput("parent", "flowmovement", this._canonicalToOutputFlow(flow));
|
||||
}
|
||||
else if (currentState === "operational" && flow > 0) {
|
||||
await machine.handleInput("parent", "flowmovement", this._canonicalToOutputFlow(flow));
|
||||
}
|
||||
}));
|
||||
}
|
||||
catch (err) {
|
||||
@@ -1233,8 +1274,12 @@ class MachineGroup {
|
||||
}
|
||||
});
|
||||
|
||||
// Write to AT_EQUIPMENT not DOWNSTREAM. handlePressureChange
|
||||
// is the canonical writer of DOWNSTREAM (the live aggregate
|
||||
// that PS subscribes to for outflow). See optimalControl
|
||||
// comment above.
|
||||
this._writeMeasurement("power", "predicted", POSITIONS.AT_EQUIPMENT, totalPower.reduce((a, b) => a + b, 0), this.unitPolicy.canonical.power);
|
||||
this._writeMeasurement("flow", "predicted", POSITIONS.DOWNSTREAM, totalFlow.reduce((a, b) => a + b, 0), this.unitPolicy.canonical.flow);
|
||||
this._writeMeasurement("flow", "predicted", POSITIONS.AT_EQUIPMENT, totalFlow.reduce((a, b) => a + b, 0), this.unitPolicy.canonical.flow);
|
||||
|
||||
if(totalPower.reduce((a, b) => a + b, 0) > 0){
|
||||
this.measurements.type("efficiency").variant("predicted").position(POSITIONS.AT_EQUIPMENT).value(totalFlow.reduce((a, b) => a + b, 0) / totalPower.reduce((a, b) => a + b, 0));
|
||||
@@ -1248,6 +1293,37 @@ class MachineGroup {
|
||||
|
||||
async handleInput(source, demand, powerCap = Infinity, priorityList = null) {
|
||||
|
||||
// Serialize dispatches: if a previous handleInput is still
|
||||
// awaiting pump movements, park the latest demand and return.
|
||||
// The in-flight dispatch's `finally` block will pick it up.
|
||||
// See rotatingMachine state.delayedMove for the analogous
|
||||
// pattern at the pump level.
|
||||
if (this._dispatchInFlight) {
|
||||
this._delayedCall = { source, demand, powerCap, priorityList };
|
||||
this.logger.debug(`Dispatch in flight; deferring demand=${demand} until current pump moves complete.`);
|
||||
return;
|
||||
}
|
||||
|
||||
this._dispatchInFlight = true;
|
||||
try {
|
||||
return await this._runDispatch(source, demand, powerCap, priorityList);
|
||||
} finally {
|
||||
this._dispatchInFlight = false;
|
||||
// Pick up the latest deferred call (intermediate values were
|
||||
// stomped while we were busy — only the last one matters).
|
||||
if (this._delayedCall) {
|
||||
const next = this._delayedCall;
|
||||
this._delayedCall = null;
|
||||
this.logger.debug(`Dispatch finished; picking up deferred demand=${next.demand}.`);
|
||||
// Recursive call re-enters the gate; safe because
|
||||
// _dispatchInFlight has been reset to false above.
|
||||
await this.handleInput(next.source, next.demand, next.powerCap, next.priorityList);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async _runDispatch(source, demand, powerCap = Infinity, priorityList = null) {
|
||||
|
||||
const demandQ = parseFloat(demand);
|
||||
|
||||
if(!Number.isFinite(demandQ)){
|
||||
@@ -1345,8 +1421,28 @@ class MachineGroup {
|
||||
}
|
||||
|
||||
async turnOffAllMachines(){
|
||||
// Cancel any deferred dispatch — turnOff is the latest user intent,
|
||||
// a stale 1% keep-alive must not re-engage a pump after we shut down.
|
||||
this._delayedCall = null;
|
||||
// Per-pump shutdown serialization: PS calls turnOffAllMachines on
|
||||
// every tick (every 2 s) once level < stopLevel. Without this guard,
|
||||
// each new shutdown call hits the still-running prior shutdown's
|
||||
// movement transitions and triggers abortCurrentMovement, which
|
||||
// bounces the pump back to 'operational' before the sequence loop
|
||||
// can reach stopping/coolingdown/idle. Net effect: pump never
|
||||
// actually shuts down. Track shutdown-in-flight per pump and skip
|
||||
// if already underway.
|
||||
if (!this._shutdownInFlight) this._shutdownInFlight = new Set();
|
||||
await Promise.all(Object.entries(this.machines).map(async ([machineId, machine]) => {
|
||||
if (this.isMachineActive(machineId)) { await machine.handleInput("parent", "execsequence", "shutdown"); }
|
||||
if (this._shutdownInFlight.has(machineId)) return;
|
||||
if (this.isMachineActive(machineId)) {
|
||||
this._shutdownInFlight.add(machineId);
|
||||
try {
|
||||
await machine.handleInput("parent", "execsequence", "shutdown");
|
||||
} finally {
|
||||
this._shutdownInFlight.delete(machineId);
|
||||
}
|
||||
}
|
||||
}));
|
||||
// Update measurements to zero so the parent (PS) sees the
|
||||
// outflow drop immediately — without this the PS keeps the
|
||||
|
||||
131
test/integration/turnoff-deadlock.integration.test.js
Normal file
131
test/integration/turnoff-deadlock.integration.test.js
Normal file
@@ -0,0 +1,131 @@
|
||||
// Regression: pump A in pumpingstation-complete-example demo got stuck
|
||||
// running at minimum flow while basin level dropped past stopLevel and
|
||||
// kept dropping all the way to dry-run threshold.
|
||||
//
|
||||
// Root cause (two parts):
|
||||
//
|
||||
// 1. rotatingMachine.executeSequence on shutdown went through an
|
||||
// interruptible-abort path that returned the FSM to 'operational',
|
||||
// triggering state.transitionToState's auto-pickup of the queued
|
||||
// delayedMove — re-engaging the pump before the shutdown sequence
|
||||
// could reach stopping/coolingdown/idle. Fix: clear delayedMove at
|
||||
// the top of shutdown/emergencystop sequences.
|
||||
//
|
||||
// 2. PS calls turnOffAllMachines on every tick (every 2 s) while
|
||||
// level < stopLevel. Each call interrupted the still-running prior
|
||||
// shutdown's transitions, resetting the FSM to 'accelerating'. The
|
||||
// pump bounced accelerating ↔ decelerating forever and the actual
|
||||
// shutdown sequence transitions never ran. Fix: serialize per-pump
|
||||
// shutdown calls in turnOffAllMachines so concurrent invocations
|
||||
// are no-ops while a shutdown is already in flight.
|
||||
//
|
||||
// This test exercises part 2 — the per-pump serialization at the MGC
|
||||
// level — by hammering turnOffAllMachines from a tight loop, mirroring
|
||||
// the live tick cadence.
|
||||
|
||||
const test = require('node:test');
|
||||
const assert = require('node:assert/strict');
|
||||
|
||||
const MachineGroup = require('../../src/specificClass');
|
||||
const Machine = require('../../../rotatingMachine/src/specificClass');
|
||||
|
||||
const logCfg = { enabled: false, logLevel: 'error' };
|
||||
|
||||
const stateConfig = {
|
||||
general: { logging: logCfg },
|
||||
state: { current: 'idle' },
|
||||
movement: { mode: 'staticspeed', speed: 50, maxSpeed: 100, interval: 10 },
|
||||
// Non-zero shutdown timing so a shutdown takes long enough that a
|
||||
// concurrent turnOff call lands mid-sequence — exactly the live race.
|
||||
time: { starting: 0, warmingup: 0, stopping: 1, coolingdown: 1 },
|
||||
};
|
||||
|
||||
function machineConfig(id) {
|
||||
return {
|
||||
general: { logging: logCfg, name: id, id, unit: 'm3/h' },
|
||||
functionality: { softwareType: 'machine', role: 'rotationaldevicecontroller' },
|
||||
asset: { category: 'pump', type: 'centrifugal', model: 'hidrostal-H05K-S03R', supplier: 'hidrostal' },
|
||||
mode: {
|
||||
current: 'auto',
|
||||
allowedActions: { auto: ['execsequence', 'execmovement', 'flowmovement', 'statuscheck'] },
|
||||
allowedSources: { auto: ['parent', 'GUI'] },
|
||||
},
|
||||
sequences: {
|
||||
startup: ['starting', 'warmingup', 'operational'],
|
||||
shutdown: ['stopping', 'coolingdown', 'idle'],
|
||||
emergencystop: ['emergencystop', 'off'],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function groupConfig() {
|
||||
return {
|
||||
general: { logging: logCfg, name: 'mgc', id: 'mgc' },
|
||||
functionality: { softwareType: 'machinegroup', role: 'groupcontroller', positionVsParent: 'atEquipment' },
|
||||
scaling: { current: 'normalized' },
|
||||
mode: { current: 'optimalcontrol' },
|
||||
};
|
||||
}
|
||||
|
||||
function buildGroup() {
|
||||
const mgc = new MachineGroup(groupConfig());
|
||||
const ids = ['pump_a', 'pump_b', 'pump_c'];
|
||||
const pumps = ids.map(id => new Machine(machineConfig(id), stateConfig));
|
||||
for (const m of pumps) {
|
||||
m.updateMeasuredPressure(0, 'upstream', { timestamp: Date.now(), unit: 'mbar', childName: 'up', childId: `up-${m.config.general.id}` });
|
||||
m.updateMeasuredPressure(1100, 'downstream', { timestamp: Date.now(), unit: 'mbar', childName: 'dn', childId: `dn-${m.config.general.id}` });
|
||||
mgc.childRegistrationUtils.registerChild(m, 'downstream');
|
||||
}
|
||||
mgc.calcAbsoluteTotals();
|
||||
mgc.calcDynamicTotals();
|
||||
return { mgc, pumps };
|
||||
}
|
||||
|
||||
const sleep = (ms) => new Promise(r => setTimeout(r, ms));
|
||||
|
||||
test('repeated turnOffAllMachines reaches idle (serializes concurrent shutdowns)', async () => {
|
||||
const { mgc, pumps } = buildGroup();
|
||||
const pumpA = pumps[0];
|
||||
|
||||
// Start pump A and queue a delayedMove the way MGC's optimalControl
|
||||
// would when PS sends a 1% dead-zone keep-alive.
|
||||
await pumpA.handleInput('parent', 'execsequence', 'startup');
|
||||
assert.equal(pumpA.state.getCurrentState(), 'operational');
|
||||
pumpA.setpoint(80); // start a slow move (not awaited)
|
||||
await sleep(50);
|
||||
assert.equal(pumpA.state.getCurrentState(), 'accelerating');
|
||||
pumpA.state.delayedMove = 75;
|
||||
|
||||
// Mimic PS's tick loop: fire turnOffAllMachines on a tight cadence
|
||||
// without awaiting. Without the per-pump serialization in
|
||||
// turnOffAllMachines, each call hits the still-running prior shutdown
|
||||
// and bounces the pump back to accelerating — the live deadlock.
|
||||
const ticks = [];
|
||||
for (let i = 0; i < 6; i++) {
|
||||
ticks.push(mgc.turnOffAllMachines());
|
||||
await sleep(80); // half the realtime tick — tighter race
|
||||
}
|
||||
await Promise.all(ticks);
|
||||
// Allow the (single) in-flight shutdown to finish its 1+1 s timed
|
||||
// transitions through stopping → coolingdown → idle.
|
||||
await sleep(2500);
|
||||
|
||||
assert.equal(pumpA.state.getCurrentState(), 'idle',
|
||||
`pump must reach idle under repeated turnOff calls; got ${pumpA.state.getCurrentState()} (delayedMove=${pumpA.state.delayedMove})`);
|
||||
assert.equal(pumpA.state.delayedMove, null,
|
||||
'delayedMove must be cleared after shutdown');
|
||||
});
|
||||
|
||||
test('turnOffAllMachines clears MGC._delayedCall to cancel any deferred dispatch', async () => {
|
||||
// PS sends a 1% keep-alive while MGC is mid-dispatch. MGC parks it in
|
||||
// _delayedCall. PS then crosses stopLevel and calls turnOffAllMachines.
|
||||
// Without clearing _delayedCall, MGC's finally block fires the parked
|
||||
// 1% call AFTER the shutdown — re-engaging the pump.
|
||||
const { mgc } = buildGroup();
|
||||
mgc._delayedCall = { source: 'parent', demand: 1, powerCap: Infinity, priorityList: null };
|
||||
|
||||
await mgc.turnOffAllMachines();
|
||||
|
||||
assert.equal(mgc._delayedCall, null,
|
||||
'turnOff must cancel any deferred dispatch so it cannot re-engage pumps post-shutdown');
|
||||
});
|
||||
Reference in New Issue
Block a user