Compare commits

..

3 Commits

Author SHA1 Message Date
Rene De Ren
ea2857fb25 fix: serialize per-pump shutdown + cancel deferred dispatch in turnOffAllMachines
PS calls turnOffAllMachines on every tick once level < stopLevel. Two
ways the pump could re-engage after we shut it down:

1. _delayedCall: a 1% dead-zone keep-alive parked in MGC's deferred
   dispatch fires from the in-flight handleInput's finally block AFTER
   the shutdown completes, dispatching flow + startup to a fresh pump.
   Clear _delayedCall at the top of turnOff.

2. Concurrent shutdown calls on the same pump interrupt each other
   before the sequence can transition past stopping. Track shutdown-
   in-flight per pump and skip if one is already underway.

Together with the rotatingMachine delayedMove-clearing fix, this lets
the level-based hysteresis cycle complete: pumps shut off cleanly at
stopLevel, basin reverses direction, refills to startLevel, repeat.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 18:17:55 +02:00
Rene De Ren
2651aaf409 abortActiveMovements: only WARN when actually aborting an in-flight move
In normal operation the _dispatchInFlight gate (handleInput)
guarantees no pump movement is in flight when a new dispatch starts,
so the per-machine abort call is a no-op. The previous unconditional
WARN flooded the log with one line per pump per tick (~3/s) for what
was actually a normal-path no-op.

Now the WARN fires ONLY when a pump's state is accelerating or
decelerating — i.e. the gate has been bypassed and we're force-
aborting an in-flight ramp. The wording reflects that:

  Force-aborting in-flight movement on pump_a (state=accelerating)
  due to: new demand received — _dispatchInFlight gate bypassed.

If you ever see this in production logs, the gate has a hole and
needs investigating.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 09:43:12 +02:00
Rene De Ren
df74ea0fac Serialize handleInput dispatches via _dispatchInFlight gate
Mirrors rotatingMachine state.delayedMove. PS ticks demand into MGC at
1 Hz but a real pump ramp takes several seconds; before this gate every
PS tick aborted the in-flight optimalControl and started a new one, so
pumps never reached their setpoint. Live observation: 120 aborts / 2
min, pump_a drifting to 138 m³/h while pump_b stayed clamped at minFlow
60 m³/h ("near_curve_edge").

While a dispatch is in flight, the latest {source, demand, powerCap,
priorityList} is parked in _delayedCall and the new call returns.
The in-flight dispatch's finally block picks up the latest delayed
value when it settles. Latest-wins — intermediate demands are stomped
because they were obsolete by the time the pumps were ready for them.

Regression test in superproject:
test/mgc-overactive-demand-serialization.integration.test.js
30 concurrent demand calls now produce ≤ 5 aborts (was 30).

All existing tests still pass: 21 MGC integration + 7 cross-node
integration (incl. realistic-startup-timing, inflow-overcapacity-
stability, ps-mgc-flow-contract, idle-startup-deadlock).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 09:14:59 +02:00
2 changed files with 203 additions and 2 deletions

View File

@@ -74,6 +74,16 @@ class MachineGroup {
// Combination curve data // Combination curve data
this.dynamicTotals = { flow: { min: Infinity, max: 0 }, power: { min: Infinity, max: 0 } , NCog : 0}; this.dynamicTotals = { flow: { min: Infinity, max: 0 }, power: { min: Infinity, max: 0 } , NCog : 0};
this.absoluteTotals = { flow: { min: Infinity, max: 0 }, power: { min: Infinity, max: 0 }}; this.absoluteTotals = { flow: { min: Infinity, max: 0 }, power: { min: Infinity, max: 0 }};
// Dispatch serialization. PS ticks demand into MGC at 1 Hz, but
// a real pump ramp takes several seconds — without this gate
// every PS tick aborts the in-flight dispatch and starts a new
// one, so pumps never reach their setpoint. Mirrors
// rotatingMachine state.delayedMove: while a dispatch is in
// flight the latest demand is parked here for pickup when the
// current dispatch settles. Latest-wins.
this._dispatchInFlight = false;
this._delayedCall = null;
//this always last in the constructor //this always last in the constructor
this.childRegistrationUtils = new childRegistrationUtils(this); this.childRegistrationUtils = new childRegistrationUtils(this);
@@ -709,8 +719,17 @@ class MachineGroup {
} }
async abortActiveMovements(reason = "new demand") { async abortActiveMovements(reason = "new demand") {
// Safety net: in normal operation the _dispatchInFlight gate
// (handleInput) ensures no pump movement is in flight when a
// new dispatch starts, so this is a no-op. If a pump IS still
// moving here, the gate was bypassed (direct call to
// abortActiveMovements, mode change racing a dispatch, etc.) —
// surface that loudly so the bypass can be diagnosed.
const movementStates = new Set(['accelerating', 'decelerating']);
await Promise.all(Object.values(this.machines).map(async machine => { await Promise.all(Object.values(this.machines).map(async machine => {
this.logger.warn(`Aborting active movements for machine ${machine.config.general.id} due to: ${reason}`); const state = machine.state?.getCurrentState?.();
if (!movementStates.has(state)) return;
this.logger.warn(`Force-aborting in-flight movement on ${machine.config.general.id} (state=${state}) due to: ${reason} — _dispatchInFlight gate bypassed.`);
if (typeof machine.abortMovement === "function") { if (typeof machine.abortMovement === "function") {
await machine.abortMovement(reason); await machine.abortMovement(reason);
} }
@@ -1274,6 +1293,37 @@ class MachineGroup {
async handleInput(source, demand, powerCap = Infinity, priorityList = null) { async handleInput(source, demand, powerCap = Infinity, priorityList = null) {
// Serialize dispatches: if a previous handleInput is still
// awaiting pump movements, park the latest demand and return.
// The in-flight dispatch's `finally` block will pick it up.
// See rotatingMachine state.delayedMove for the analogous
// pattern at the pump level.
if (this._dispatchInFlight) {
this._delayedCall = { source, demand, powerCap, priorityList };
this.logger.debug(`Dispatch in flight; deferring demand=${demand} until current pump moves complete.`);
return;
}
this._dispatchInFlight = true;
try {
return await this._runDispatch(source, demand, powerCap, priorityList);
} finally {
this._dispatchInFlight = false;
// Pick up the latest deferred call (intermediate values were
// stomped while we were busy — only the last one matters).
if (this._delayedCall) {
const next = this._delayedCall;
this._delayedCall = null;
this.logger.debug(`Dispatch finished; picking up deferred demand=${next.demand}.`);
// Recursive call re-enters the gate; safe because
// _dispatchInFlight has been reset to false above.
await this.handleInput(next.source, next.demand, next.powerCap, next.priorityList);
}
}
}
async _runDispatch(source, demand, powerCap = Infinity, priorityList = null) {
const demandQ = parseFloat(demand); const demandQ = parseFloat(demand);
if(!Number.isFinite(demandQ)){ if(!Number.isFinite(demandQ)){
@@ -1371,8 +1421,28 @@ class MachineGroup {
} }
async turnOffAllMachines(){ async turnOffAllMachines(){
// Cancel any deferred dispatch — turnOff is the latest user intent,
// a stale 1% keep-alive must not re-engage a pump after we shut down.
this._delayedCall = null;
// Per-pump shutdown serialization: PS calls turnOffAllMachines on
// every tick (every 2 s) once level < stopLevel. Without this guard,
// each new shutdown call hits the still-running prior shutdown's
// movement transitions and triggers abortCurrentMovement, which
// bounces the pump back to 'operational' before the sequence loop
// can reach stopping/coolingdown/idle. Net effect: pump never
// actually shuts down. Track shutdown-in-flight per pump and skip
// if already underway.
if (!this._shutdownInFlight) this._shutdownInFlight = new Set();
await Promise.all(Object.entries(this.machines).map(async ([machineId, machine]) => { await Promise.all(Object.entries(this.machines).map(async ([machineId, machine]) => {
if (this.isMachineActive(machineId)) { await machine.handleInput("parent", "execsequence", "shutdown"); } if (this._shutdownInFlight.has(machineId)) return;
if (this.isMachineActive(machineId)) {
this._shutdownInFlight.add(machineId);
try {
await machine.handleInput("parent", "execsequence", "shutdown");
} finally {
this._shutdownInFlight.delete(machineId);
}
}
})); }));
// Update measurements to zero so the parent (PS) sees the // Update measurements to zero so the parent (PS) sees the
// outflow drop immediately — without this the PS keeps the // outflow drop immediately — without this the PS keeps the

View File

@@ -0,0 +1,131 @@
// Regression: pump A in pumpingstation-complete-example demo got stuck
// running at minimum flow while basin level dropped past stopLevel and
// kept dropping all the way to dry-run threshold.
//
// Root cause (two parts):
//
// 1. rotatingMachine.executeSequence on shutdown went through an
// interruptible-abort path that returned the FSM to 'operational',
// triggering state.transitionToState's auto-pickup of the queued
// delayedMove — re-engaging the pump before the shutdown sequence
// could reach stopping/coolingdown/idle. Fix: clear delayedMove at
// the top of shutdown/emergencystop sequences.
//
// 2. PS calls turnOffAllMachines on every tick (every 2 s) while
// level < stopLevel. Each call interrupted the still-running prior
// shutdown's transitions, resetting the FSM to 'accelerating'. The
// pump bounced accelerating ↔ decelerating forever and the actual
// shutdown sequence transitions never ran. Fix: serialize per-pump
// shutdown calls in turnOffAllMachines so concurrent invocations
// are no-ops while a shutdown is already in flight.
//
// This test exercises part 2 — the per-pump serialization at the MGC
// level — by hammering turnOffAllMachines from a tight loop, mirroring
// the live tick cadence.
const test = require('node:test');
const assert = require('node:assert/strict');
const MachineGroup = require('../../src/specificClass');
const Machine = require('../../../rotatingMachine/src/specificClass');
const logCfg = { enabled: false, logLevel: 'error' };
const stateConfig = {
general: { logging: logCfg },
state: { current: 'idle' },
movement: { mode: 'staticspeed', speed: 50, maxSpeed: 100, interval: 10 },
// Non-zero shutdown timing so a shutdown takes long enough that a
// concurrent turnOff call lands mid-sequence — exactly the live race.
time: { starting: 0, warmingup: 0, stopping: 1, coolingdown: 1 },
};
function machineConfig(id) {
return {
general: { logging: logCfg, name: id, id, unit: 'm3/h' },
functionality: { softwareType: 'machine', role: 'rotationaldevicecontroller' },
asset: { category: 'pump', type: 'centrifugal', model: 'hidrostal-H05K-S03R', supplier: 'hidrostal' },
mode: {
current: 'auto',
allowedActions: { auto: ['execsequence', 'execmovement', 'flowmovement', 'statuscheck'] },
allowedSources: { auto: ['parent', 'GUI'] },
},
sequences: {
startup: ['starting', 'warmingup', 'operational'],
shutdown: ['stopping', 'coolingdown', 'idle'],
emergencystop: ['emergencystop', 'off'],
},
};
}
function groupConfig() {
return {
general: { logging: logCfg, name: 'mgc', id: 'mgc' },
functionality: { softwareType: 'machinegroup', role: 'groupcontroller', positionVsParent: 'atEquipment' },
scaling: { current: 'normalized' },
mode: { current: 'optimalcontrol' },
};
}
function buildGroup() {
const mgc = new MachineGroup(groupConfig());
const ids = ['pump_a', 'pump_b', 'pump_c'];
const pumps = ids.map(id => new Machine(machineConfig(id), stateConfig));
for (const m of pumps) {
m.updateMeasuredPressure(0, 'upstream', { timestamp: Date.now(), unit: 'mbar', childName: 'up', childId: `up-${m.config.general.id}` });
m.updateMeasuredPressure(1100, 'downstream', { timestamp: Date.now(), unit: 'mbar', childName: 'dn', childId: `dn-${m.config.general.id}` });
mgc.childRegistrationUtils.registerChild(m, 'downstream');
}
mgc.calcAbsoluteTotals();
mgc.calcDynamicTotals();
return { mgc, pumps };
}
const sleep = (ms) => new Promise(r => setTimeout(r, ms));
test('repeated turnOffAllMachines reaches idle (serializes concurrent shutdowns)', async () => {
const { mgc, pumps } = buildGroup();
const pumpA = pumps[0];
// Start pump A and queue a delayedMove the way MGC's optimalControl
// would when PS sends a 1% dead-zone keep-alive.
await pumpA.handleInput('parent', 'execsequence', 'startup');
assert.equal(pumpA.state.getCurrentState(), 'operational');
pumpA.setpoint(80); // start a slow move (not awaited)
await sleep(50);
assert.equal(pumpA.state.getCurrentState(), 'accelerating');
pumpA.state.delayedMove = 75;
// Mimic PS's tick loop: fire turnOffAllMachines on a tight cadence
// without awaiting. Without the per-pump serialization in
// turnOffAllMachines, each call hits the still-running prior shutdown
// and bounces the pump back to accelerating — the live deadlock.
const ticks = [];
for (let i = 0; i < 6; i++) {
ticks.push(mgc.turnOffAllMachines());
await sleep(80); // half the realtime tick — tighter race
}
await Promise.all(ticks);
// Allow the (single) in-flight shutdown to finish its 1+1 s timed
// transitions through stopping → coolingdown → idle.
await sleep(2500);
assert.equal(pumpA.state.getCurrentState(), 'idle',
`pump must reach idle under repeated turnOff calls; got ${pumpA.state.getCurrentState()} (delayedMove=${pumpA.state.delayedMove})`);
assert.equal(pumpA.state.delayedMove, null,
'delayedMove must be cleared after shutdown');
});
test('turnOffAllMachines clears MGC._delayedCall to cancel any deferred dispatch', async () => {
// PS sends a 1% keep-alive while MGC is mid-dispatch. MGC parks it in
// _delayedCall. PS then crosses stopLevel and calls turnOffAllMachines.
// Without clearing _delayedCall, MGC's finally block fires the parked
// 1% call AFTER the shutdown — re-engaging the pump.
const { mgc } = buildGroup();
mgc._delayedCall = { source: 'parent', demand: 1, powerCap: Infinity, priorityList: null };
await mgc.turnOffAllMachines();
assert.equal(mgc._delayedCall, null,
'turnOff must cancel any deferred dispatch so it cannot re-engage pumps post-shutdown');
});