DOWNSTREAM is the live aggregate; AT_EQUIPMENT is the optimizer's intent

handlePressureChange writes the live aggregate (sum of every pump's
current predicted-flow measurement) to flow.predicted.downstream — that
is the channel PS subscribes to for its outflow estimate, and it must
reflect what pumps are actually delivering.

optimalControl + equalFlowControl + prioPercentageControl were also
writing to DOWNSTREAM with the optimizer's TARGET (bestFlow / totalFlow).
That's a planned setpoint, not an achieved aggregate, and it was
clobbering the live value every handleInput tick — leaving PS reading
e.g. 105 m³/h while the real aggregate was 681 m³/h. Test
ps-mgc-flow-contract caught this deterministically.

Move all the optimizer-target writes to AT_EQUIPMENT (the "what we
commanded the equipment to do" channel). DOWNSTREAM is now
single-writer (handlePressureChange) and faithfully tracks reality.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Rene De Ren
2026-05-08 18:32:58 +02:00
parent dc27a569d9
commit 69bdf11fc4

View File

@@ -784,9 +784,15 @@ class MachineGroup {
const debugInfo = bestResult.bestCombination.map(({ machineId, flow }) => `${machineId}: ${flow.toFixed(2)} units`).join(" | ");
this.logger.debug(`Moving to demand: ${Qd.toFixed(2)} -> Pumps: [${debugInfo}] => Total Power: ${bestResult.bestPower.toFixed(2)}`);
//store the total delivered power
// Store the optimizer's INTENT on AT_EQUIPMENT (what we
// commanded). DOWNSTREAM is reserved for the live aggregate
// written by handlePressureChange — PS subscribes to that
// for net-flow computation and must see what pumps are
// actually delivering, not the planned target. Writing
// bestFlow to DOWNSTREAM here would clobber the live value
// every handleInput tick (see ps-mgc-flow-contract test).
this._writeMeasurement("power", "predicted", POSITIONS.AT_EQUIPMENT, bestResult.bestPower, this.unitPolicy.canonical.power);
this._writeMeasurement("flow", "predicted", POSITIONS.DOWNSTREAM, bestResult.bestFlow, this.unitPolicy.canonical.flow);
this._writeMeasurement("flow", "predicted", POSITIONS.AT_EQUIPMENT, bestResult.bestFlow, this.unitPolicy.canonical.flow);
this.measurements.type("efficiency").variant("predicted").position(POSITIONS.AT_EQUIPMENT).value(bestResult.bestFlow / bestResult.bestPower);
this.measurements.type("Ncog").variant("predicted").position(POSITIONS.AT_EQUIPMENT).value(bestResult.bestCog);
@@ -1120,9 +1126,11 @@ class MachineGroup {
this.logger.debug(`Priority control for demand: ${totalFlow.toFixed(2)} -> Active pumps: [${debugInfo}] => Total Power: ${totalPower.toFixed(2)}`);
// Store measurements
// Store the planned distribution as INTENT on AT_EQUIPMENT.
// DOWNSTREAM (live aggregate) is owned by handlePressureChange.
// Writing the plan here would clobber PS's outflow signal.
this._writeMeasurement("power", "predicted", POSITIONS.AT_EQUIPMENT, totalPower, this.unitPolicy.canonical.power);
this._writeMeasurement("flow", "predicted", POSITIONS.DOWNSTREAM, totalFlow, this.unitPolicy.canonical.flow);
this._writeMeasurement("flow", "predicted", POSITIONS.AT_EQUIPMENT, totalFlow, this.unitPolicy.canonical.flow);
this.measurements.type("efficiency").variant("predicted").position(POSITIONS.AT_EQUIPMENT).value(totalFlow / totalPower);
this.measurements.type("Ncog").variant("predicted").position(POSITIONS.AT_EQUIPMENT).value(totalCog);
@@ -1247,8 +1255,12 @@ class MachineGroup {
}
});
// Write to AT_EQUIPMENT not DOWNSTREAM. handlePressureChange
// is the canonical writer of DOWNSTREAM (the live aggregate
// that PS subscribes to for outflow). See optimalControl
// comment above.
this._writeMeasurement("power", "predicted", POSITIONS.AT_EQUIPMENT, totalPower.reduce((a, b) => a + b, 0), this.unitPolicy.canonical.power);
this._writeMeasurement("flow", "predicted", POSITIONS.DOWNSTREAM, totalFlow.reduce((a, b) => a + b, 0), this.unitPolicy.canonical.flow);
this._writeMeasurement("flow", "predicted", POSITIONS.AT_EQUIPMENT, totalFlow.reduce((a, b) => a + b, 0), this.unitPolicy.canonical.flow);
if(totalPower.reduce((a, b) => a + b, 0) > 0){
this.measurements.type("efficiency").variant("predicted").position(POSITIONS.AT_EQUIPMENT).value(totalFlow.reduce((a, b) => a + b, 0) / totalPower.reduce((a, b) => a + b, 0));