B2.3: migrate MGC to LatestWinsGate.fireAndWait
specificClass.js 319 → 311 lines. Removed inline _dispatchInFlight +
_delayedCall + finally block. handleInput is now a 1-line delegate
to DemandDispatcher.fireAndWait({source, demand, ...}).
turnOffAllMachines calls _demandDispatcher.cancelPending().
DemandDispatcher 39 → 53 lines. One integration test rewritten to
use the new sentinel-resolution semantics. 77/77 tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -26,10 +26,25 @@ class DemandDispatcher {
|
||||
this._gate.fire(demand);
|
||||
}
|
||||
|
||||
// Returns a promise that resolves when THIS demand's dispatch settles.
|
||||
// If superseded by a later fireAndWait while parked, the promise
|
||||
// resolves with the LatestWinsGate SUPERSEDED sentinel
|
||||
// ({ superseded: true }) — callers can branch on it without try/catch.
|
||||
fireAndWait(demand) {
|
||||
return this._gate.fireAndWait(demand);
|
||||
}
|
||||
|
||||
drain() {
|
||||
return this._gate.drain();
|
||||
}
|
||||
|
||||
// Cancels any parked pending value so it cannot run. The currently
|
||||
// in-flight dispatch (if any) still runs to completion. A parked
|
||||
// fireAndWait promise resolves with the SUPERSEDED sentinel.
|
||||
cancelPending() {
|
||||
if (this._gate._pending) this._gate._supersedePending();
|
||||
}
|
||||
|
||||
get inFlight() {
|
||||
return this._gate.size > 0;
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ const optimizer = require('./optimizer');
|
||||
const GroupEfficiency = require('./efficiency/groupEfficiency');
|
||||
const control = require('./control/strategies');
|
||||
const io = require('./io/output');
|
||||
const DemandDispatcher = require('./dispatch/demandDispatcher');
|
||||
|
||||
const ACTIVE_STATES = new Set(['operational', 'accelerating', 'decelerating']);
|
||||
|
||||
@@ -43,11 +44,13 @@ class MachineGroup extends BaseDomain {
|
||||
this.dynamicTotals = { flow: { min: Infinity, max: 0 }, power: { min: Infinity, max: 0 }, NCog: 0 };
|
||||
this.absoluteTotals = { flow: { min: Infinity, max: 0 }, power: { min: Infinity, max: 0 } };
|
||||
|
||||
// Latest-wins gate kept inline (not DemandDispatcher) so awaiting
|
||||
// handleInput in tests blocks until dispatch completes. See
|
||||
// turnoff-deadlock.integration.test.js — _delayedCall is pinned.
|
||||
this._dispatchInFlight = false;
|
||||
this._delayedCall = null;
|
||||
// Latest-wins demand gate. Awaiting handleInput resolves when THIS
|
||||
// call's dispatch settles (LatestWinsGate.fireAndWait); a parked
|
||||
// call that is later superseded resolves with { superseded: true }.
|
||||
this._demandDispatcher = new DemandDispatcher(
|
||||
{ logger: this.logger },
|
||||
(payload) => this._runDispatch(payload.source, payload.demand, payload.powerCap, payload.priorityList),
|
||||
);
|
||||
this._shutdownInFlight = new Set();
|
||||
|
||||
this.operatingPoint = new GroupOperatingPoint({
|
||||
@@ -230,22 +233,11 @@ class MachineGroup extends BaseDomain {
|
||||
}));
|
||||
}
|
||||
|
||||
// Returns when THIS call's dispatch settles. If overwritten by a later
|
||||
// handleInput() while parked behind an in-flight dispatch, resolves
|
||||
// with the LatestWinsGate.SUPERSEDED sentinel ({ superseded: true }).
|
||||
async handleInput(source, demand, powerCap = Infinity, priorityList = null) {
|
||||
if (this._dispatchInFlight) {
|
||||
this._delayedCall = { source, demand, powerCap, priorityList };
|
||||
return;
|
||||
}
|
||||
this._dispatchInFlight = true;
|
||||
try {
|
||||
return await this._runDispatch(source, demand, powerCap, priorityList);
|
||||
} finally {
|
||||
this._dispatchInFlight = false;
|
||||
if (this._delayedCall) {
|
||||
const next = this._delayedCall;
|
||||
this._delayedCall = null;
|
||||
await this.handleInput(next.source, next.demand, next.powerCap, next.priorityList);
|
||||
}
|
||||
}
|
||||
return this._demandDispatcher.fireAndWait({ source, demand, powerCap, priorityList });
|
||||
}
|
||||
|
||||
async _runDispatch(source, demand, powerCap, priorityList) {
|
||||
@@ -286,8 +278,9 @@ class MachineGroup extends BaseDomain {
|
||||
}
|
||||
|
||||
async turnOffAllMachines() {
|
||||
// Cancel any deferred dispatch — turnOff is latest user intent.
|
||||
this._delayedCall = null;
|
||||
// Cancel any parked demand — turnOff is latest user intent so a
|
||||
// pending fireAndWait must not re-engage pumps post-shutdown.
|
||||
this._demandDispatcher.cancelPending();
|
||||
await Promise.all(Object.entries(this.machines).map(async ([id, machine]) => {
|
||||
if (this._shutdownInFlight.has(id)) return;
|
||||
if (this.isMachineActive(id)) {
|
||||
|
||||
Reference in New Issue
Block a user