feat(mgc): rendezvous planner — same-time landing across all modes

Routes every dispatch through a tick-aware planner so all pumps reach
their setpoint at the same wall-clock instant t* = max(eta_i),
regardless of control strategy or per-pump reaction speed.

Architecture (src/movement/):
- machineProfile.js   – pure snapshot of a registered child (state,
                        position, velocityPctPerS, ladder timings,
                        flowAt / positionForFlow). Reads timings from
                        child.state.config.time (the actual storage
                        location — previous fallback paths silently
                        produced 0 s, collapsing every eta to ramp-only).
- moveTrajectory.js   – seconds-to-target per machine; handles
                        idle / starting / warmingup / operational / cooling.
- movementScheduler.js – t* = max eta over ALL non-noop moves. Every
                        command is delayed so its move finishes at t*.
                        Startup execsequence fires at 0; its flowmovement
                        is gated by max(ladderS, t* − rampS) so a fast
                        pump waits before ramping rather than landing
                        early. useRendezvous=false collapses to all
                        fireAtTickN=0 (legacy fire-and-forget).
- movementExecutor.js – wall-clock virtual cursor: each tick fires
                        every command whose fireAtTickN ≤ floor(elapsed/tickS).
                        tick() no longer awaits pending fireCommand
                        promises — the synchronous prologue of
                        handleInput claims the latest-wins gate, which
                        is what race-favouring relies on.

Shared dispatch path (src/specificClass.js):
- _dispatchFlowDistribution(distribution) — extracted from
  _optimalControl. Builds profiles, calls movementScheduler.plan,
  replans the executor, ticks once. Reads
  config.planner.useRendezvous (default true).
- _optimalControl computes its bestCombination and hands off.
- equalFlowControl (priorityControl mode) computes its
  flowDistribution and hands off via ctx.mgc._dispatchFlowDistribution.
  Same-time landing now applies in BOTH modes.

Editor toggle (mgc.html + src/nodeClass.js):
- New "Same-time landing" checkbox under Control Strategy.
- nodeClass.buildDomainConfig bridges uiConfig.useRendezvous →
  config.planner.useRendezvous. Default ON.

Tests:
- New: planner-convergence.integration.test.js (real-time end-to-end
  diagnostic — drives a 3-pump mixed-state dispatch and asserts both
  convergence to the demand setpoint AND same-time landing within
  one tick).
- New: planner-rendezvous.integration.test.js (schedule-shape
  assertions against real pump objects).
- New: movementScheduler.basic.test.js — includes a mixed-speed
  multi-startup case proving the fast pumps wait so all three land
  together (the regression that prompted this work).
- New: movementExecutor.basic.test.js + moveTrajectory.basic.test.js.
- Updated executor contract test: tick() must NOT await pending fires.

Commands + wiki:
- handlers.js: source/mode allow-list gate moved into a shared _gate()
  helper; every command now checks isValidActionForMode +
  isValidSourceForMode before dispatching. Status-level commands
  (set.mode, set.scaling) are allowed in every mode.
- commands.basic.test.js: coverage for the new gate behaviour.
- wiki regen: Home.md visual-first rewrite + Reference-{Architecture,
  Contracts,Examples,Limitations}.md split with _Sidebar.md index.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
znetsixe
2026-05-17 19:43:55 +02:00
parent 26e92b54f7
commit 472402c62d
26 changed files with 3048 additions and 280 deletions

View File

@@ -16,11 +16,31 @@ function _logger(source, ctx) {
return ctx?.logger || source?.logger || null;
}
// Gate one command against the mode-allowed action and source allow-lists.
// Returns true if both gates pass (or if the source lacks the gate methods —
// keeps backward compat with fakes/specifics that haven't adopted the pattern
// yet). When a gate fails the source already warn-logs; we just bail out.
function _gate(source, action, msg) {
if (typeof source?.isValidActionForMode === 'function') {
if (!source.isValidActionForMode(action, source.mode)) return false;
}
if (typeof source?.isValidSourceForMode === 'function') {
const src = (typeof msg?.source === 'string' && msg.source) ? msg.source : 'parent';
if (!source.isValidSourceForMode(src, source.mode)) return false;
}
return true;
}
exports.setMode = (source, msg) => {
// set.mode is a status-level operation — allowed in every mode by the
// default schema (incl. maintenance). The gate still fires so an
// unauthorised source is rejected even for mode switching.
if (!_gate(source, 'statusCheck', msg)) return;
source.setMode(msg.payload);
};
exports.registerChild = (source, msg, ctx) => {
if (!_gate(source, 'statusCheck', msg)) return;
const log = _logger(source, ctx);
const childId = msg.payload;
const childObj = ctx?.RED?.nodes?.getNode?.(childId);
@@ -58,6 +78,16 @@ exports.setDemand = async (source, msg, ctx) => {
log?.error?.(`set.demand: invalid Qd value '${JSON.stringify(msg?.payload)}'`);
return;
}
// Gate the demand against the current mode. Action kind depends on whether
// this is a stop-all (negative) or a dispatch — the schema declares which
// are accepted per mode (maintenance gets neither). Done after numeric
// parse so an unparseable payload is still surfaced as an error, not a
// silent mode-rejection.
let action;
if (value < 0) action = 'emergencyStop';
else if (source?.mode === 'priorityControl') action = 'execSequentialControl';
else action = 'execOptimalCombination';
if (!_gate(source, action, msg)) return;
// Negative is the operator's "stop all" signal regardless of unit.
if (value < 0) {
try {

View File

@@ -162,18 +162,15 @@ async function equalFlowControl(ctx, Qd, _powerCap = Infinity, priorityList = nu
}
mgc.measurements.type('Ncog').variant('predicted').position(POSITIONS.AT_EQUIPMENT).value(totalCog);
await Promise.all(flowDistribution.map(async ({ machineId, flow }) => {
const machine = mgc.machines[machineId];
const currentState = machine.state.getCurrentState();
if (flow > 0) {
await machine.handleInput('parent', 'flowmovement', mgc._canonicalToOutputFlow(flow));
if (currentState === 'idle') {
await machine.handleInput('parent', 'execsequence', 'startup');
}
} else if (currentState === 'operational' || currentState === 'accelerating' || currentState === 'decelerating') {
await machine.handleInput('parent', 'execsequence', 'shutdown');
}
}));
// Route the chosen distribution through the shared planner/executor
// path. With planner.useRendezvous=true (the default) all pumps
// reach their per-pump flow target at the same wall-clock instant;
// with it false, every command fires at tick 0 — same effect as
// the legacy Promise.all dispatch but with correct startup/shutdown
// ordering (the planner emits execsequence BEFORE flowmovement for
// idle pumps, where the legacy code emitted them in the opposite
// order and relied on the pump's delayedMove queue to recover).
await mgc._dispatchFlowDistribution(flowDistribution);
} catch (err) {
mgc.logger?.error?.(err);
}

34
src/editor/index.js Normal file
View File

@@ -0,0 +1,34 @@
// machineGroupControl editor — namespace bootstrap.
//
// Attaches the editor's submodule registry to the shared
// window.EVOLV.nodes.machineGroupControl namespace (same one the menuManager
// and configManager endpoints populate). Each sibling module in this
// directory (mode-cards.js, demand-contract.js, oneditprepare.js) registers
// itself by writing additional members onto this namespace.
//
// Loaded first by mgc.html — must not depend on any other src/editor module.
(function () {
const root = window.EVOLV = window.EVOLV || {};
const nodes = root.nodes = root.nodes || {};
const ns = nodes.machineGroupControl = nodes.machineGroupControl || {};
const editor = ns.editor = ns.editor || {};
// Pub/sub for mode changes — mode-cards.js fires, anything that wants to
// re-render on mode change subscribes. Keep it tiny; no third-party emitter.
const modeListeners = [];
editor.onModeChange = (cb) => { if (typeof cb === 'function') modeListeners.push(cb); };
editor.emitModeChange = (newMode) => {
for (const cb of modeListeners) {
try { cb(newMode); } catch (e) { /* swallow — UI helper */ }
}
};
// Read the currently selected mode from the hidden input that mode-cards.js
// keeps in sync with the active card. Falls back to optimalControl if the
// input isn't on the page yet (race against oneditprepare).
editor.getMode = () => {
const el = document.getElementById('node-input-mode');
return (el && el.value) || 'optimalControl';
};
})();

142
src/editor/mode-cards.js Normal file
View File

@@ -0,0 +1,142 @@
// mode-cards.js — visual radio picker for the three control-strategy modes.
//
// Replaces the plain <select id="node-input-mode"> with three illustrated
// cards. The original <input> stays in the DOM but is hidden — Node-RED reads
// its value on save, exactly as before. Clicking a card sets that value and
// fires editor.emitModeChange so downstream UI (none today, future widgets
// such as a parameter panel) can re-render.
//
// Three cards: optimalControl (BEP-curve), priorityControl (flow ladder),
// maintenance (status-only badge). SVGs are inline so the editor doesn't
// need to fetch additional assets.
(function () {
const editor = window.EVOLV?.nodes?.machineGroupControl?.editor;
if (!editor) return;
const MODES = [
{
value: 'optimalControl',
label: 'optimalControl',
caption: 'Picks the pump combination whose BEP sits closest to current demand.',
svg: `
<svg viewBox="0 0 160 90" xmlns="http://www.w3.org/2000/svg" aria-hidden="true">
<text x="6" y="14" font-size="9" fill="#444">η</text>
<line x1="14" y1="78" x2="154" y2="78" stroke="#444" stroke-width="1"/>
<line x1="14" y1="78" x2="14" y2="14" stroke="#444" stroke-width="1"/>
<text x="118" y="88" font-size="8" fill="#666">demand →</text>
<!-- Three pump-combination efficiency humps. Each combination has
its own BEP (peak). The optimizer "gravitates" toward whichever
peak sits closest to the current demand. Quadratic-Bezier peak
formula: peak_y = (y0 + 2*cy + y1)/4 — so for y0=y1=78 (foot on
x-axis), cy=22 → peak_y=50, cy=-26 → peak_y=26, cy=10 → peak_y=44. -->
<path d="M 16 78 Q 32 22 50 78" fill="none" stroke="#888" stroke-width="1.1"/>
<path d="M 44 78 Q 72 -26 100 78" fill="none" stroke="#1E8449" stroke-width="2"/>
<path d="M 92 78 Q 122 10 152 78" fill="none" stroke="#888" stroke-width="1.1"/>
<!-- BEP markers sit ON each hump's apex — small grey for unpicked
combos, large red for the selected (winner) combination. -->
<circle cx="33" cy="50" r="2" fill="#888"/>
<circle cx="72" cy="26" r="3.2" fill="#C0392B" stroke="#fff" stroke-width="1"/>
<circle cx="122" cy="44" r="2" fill="#888"/>
<!-- Current demand (dashed line) lines up with combo #2's BEP, so
combo #2 wins — drawn thicker/green above. -->
<line x1="72" y1="14" x2="72" y2="78" stroke="#1F4E79" stroke-dasharray="2 2" stroke-width="0.9"/>
<text x="46" y="20" font-size="7" fill="#1F4E79">demand</text>
<text x="80" y="22" font-size="7" fill="#C0392B" font-weight="bold">BEP</text>
<!-- Combination labels under each curve. -->
<text x="33" y="86" font-size="6" fill="#666" text-anchor="middle">P1</text>
<text x="72" y="86" font-size="6" fill="#1E8449" text-anchor="middle" font-weight="bold">P1+P2</text>
<text x="122" y="86" font-size="6" fill="#666" text-anchor="middle">P1+P2+P3</text>
</svg>`,
},
{
value: 'priorityControl',
label: 'priorityControl',
caption: 'Sequential equal-flow ramp — fill pumps one-by-one in priority order.',
svg: `
<svg viewBox="0 0 160 90" xmlns="http://www.w3.org/2000/svg" aria-hidden="true">
<text x="6" y="14" font-size="9" fill="#444">flow</text>
<line x1="14" y1="78" x2="154" y2="78" stroke="#444" stroke-width="1"/>
<line x1="14" y1="78" x2="14" y2="14" stroke="#444" stroke-width="1"/>
<text x="118" y="88" font-size="8" fill="#666">demand →</text>
<polyline points="14,72 50,72 50,52 86,52 86,32 122,32 122,16 154,16"
fill="none" stroke="#1F4E79" stroke-width="2"/>
<text x="28" y="86" font-size="7" fill="#666">P1</text>
<text x="64" y="86" font-size="7" fill="#666">P2</text>
<text x="100" y="86" font-size="7" fill="#666">P3</text>
</svg>`,
},
{
value: 'maintenance',
label: 'maintenance',
caption: 'Monitor only. Dispatch and stop-all commands are rejected; status messages still flow.',
svg: `
<svg viewBox="0 0 160 90" xmlns="http://www.w3.org/2000/svg" aria-hidden="true">
<circle cx="80" cy="42" r="22" fill="none" stroke="#888" stroke-width="2"/>
<circle cx="80" cy="42" r="8" fill="#888"/>
<g stroke="#888" stroke-width="3" stroke-linecap="round">
<line x1="80" y1="14" x2="80" y2="24"/>
<line x1="80" y1="60" x2="80" y2="70"/>
<line x1="52" y1="42" x2="62" y2="42"/>
<line x1="98" y1="42" x2="108" y2="42"/>
<line x1="60" y1="22" x2="67" y2="29"/>
<line x1="93" y1="55" x2="100" y2="62"/>
<line x1="60" y1="62" x2="67" y2="55"/>
<line x1="93" y1="29" x2="100" y2="22"/>
</g>
<text x="80" y="84" text-anchor="middle" font-size="8" fill="#888">monitor only</text>
</svg>`,
},
];
// Render the three cards into the placeholder div. The hidden <select> stays
// intact — the card click handler writes its value back to that <select> so
// Node-RED's save path is unchanged.
function init(/* node */) {
const placeholder = document.getElementById('mgc-mode-cards');
const hidden = document.getElementById('node-input-mode');
if (!placeholder || !hidden) return;
placeholder.innerHTML = MODES.map((m) => `
<div class="mgc-mode-card" data-mode="${m.value}" role="radio" tabindex="0" aria-checked="false">
<div class="mgc-mode-card-svg">${m.svg}</div>
<div class="mgc-mode-card-label">${m.label}</div>
<div class="mgc-mode-card-caption">${m.caption}</div>
</div>
`).join('');
const cards = Array.from(placeholder.querySelectorAll('.mgc-mode-card'));
function syncHighlight() {
const current = hidden.value || 'optimalControl';
for (const c of cards) {
const on = c.getAttribute('data-mode') === current;
c.classList.toggle('mgc-mode-card-on', on);
c.setAttribute('aria-checked', String(on));
}
}
function pick(mode) {
hidden.value = mode;
// Fire change so any other listener bound to the input (Node-RED's
// dirty-tracker, plus our pub/sub) sees the update.
hidden.dispatchEvent(new Event('change', { bubbles: true }));
syncHighlight();
editor.emitModeChange(mode);
}
for (const c of cards) {
c.addEventListener('click', () => pick(c.getAttribute('data-mode')));
c.addEventListener('keydown', (e) => {
if (e.key === ' ' || e.key === 'Enter') {
e.preventDefault();
pick(c.getAttribute('data-mode'));
}
});
}
syncHighlight();
}
editor.modeCards = { init };
})();

View File

@@ -0,0 +1,16 @@
// oneditprepare.js — initialise the editor's visual modules.
//
// Called from mgc.html's oneditprepare alongside the existing menuManager
// initialiser (logger/position dropdowns). Each module is responsible for
// its own placeholder; we just kick them off in dependency order.
(function () {
const ns = window.EVOLV?.nodes?.machineGroupControl;
if (!ns || !ns.editor) return;
ns.editor.initVisuals = function (node) {
if (ns.editor.modeCards && typeof ns.editor.modeCards.init === 'function') {
ns.editor.modeCards.init(node);
}
};
})();

View File

@@ -0,0 +1,90 @@
'use strict';
// Builds a plain-object snapshot of a registered child machine for the
// movement planner. Pure read — no contract changes to the parent/child
// registration handshake, no mutation of the child.
function buildProfile(child) {
if (!child) throw new TypeError('buildProfile: child is required');
const id = child?.config?.general?.id ?? null;
const state = typeof child.state?.getCurrentState === 'function'
? child.state.getCurrentState()
: null;
const position = typeof child.state?.getCurrentPosition === 'function'
? child.state.getCurrentPosition()
: null;
const mm = child.state?.movementManager;
const minPosition = Number(mm?.minPosition);
const maxPosition = Number(mm?.maxPosition);
const velocityPctPerS = (() => {
if (typeof mm?.getNormalizedSpeed === 'function' && Number.isFinite(maxPosition) && Number.isFinite(minPosition)) {
return mm.getNormalizedSpeed() * (maxPosition - minPosition);
}
const s = Number(mm?.speed);
return Number.isFinite(s) ? s : 0;
})();
// Source of truth for ladder durations is the child state's config.time
// (state.js stores the merged stateConfig there). Older fallbacks
// (child.config.stateConfig, child.stateConfig) are kept for callers
// that pre-populate them, but rotatingMachine doesn't — it stores
// timings under state.config.time. Reading the wrong path is silent:
// every duration defaults to 0, the planner thinks startup is
// instantaneous, tStar collapses to the ramp time, and same-time
// landing breaks.
const t = child.state?.config?.time
?? child.config?.stateConfig?.time
?? child.stateConfig?.time
?? {};
const timings = {
startingS: Number(t.starting) || 0,
warmingupS: Number(t.warmingup) || 0,
stoppingS: Number(t.stopping) || 0,
coolingdownS: Number(t.coolingdown) || 0,
};
const remainingTransitionS = typeof child.state?.stateManager?.getRemainingTransitionS === 'function'
? child.state.stateManager.getRemainingTransitionS()
: null;
const flowAt = (pos, pressure) => {
if (typeof child.predictFlow?.evaluate === 'function') {
return child.predictFlow.evaluate(pos, pressure);
}
return null;
};
// Inverse curve: target flow (canonical m³/s, in the child's output unit
// since predictCtrl was built from the same curve units) → control %.
// Mirrors the conversion the pump performs in flowController on a
// `flowmovement` command (rotatingMachine/src/flow/flowController.js:52).
// Returns null when the child has no curve loaded so the scheduler can
// fall back gracefully.
const positionForFlow = (flow) => {
if (!Number.isFinite(flow)) return null;
if (typeof child.predictCtrl?.y !== 'function') return null;
try {
const v = child.predictCtrl.y(flow);
return Number.isFinite(v) ? v : null;
} catch (_) {
return null;
}
};
return {
id,
state,
position,
minPosition,
maxPosition,
velocityPctPerS,
timings,
remainingTransitionS,
flowAt,
positionForFlow,
};
}
module.exports = { buildProfile };

View File

@@ -0,0 +1,86 @@
'use strict';
// Per-machine time-parameterised plan. Pure: given a MachineProfile
// snapshot and a target position, computes how long the move will take.
//
// Cases by profile.state:
// idle / off startup ladder + ramp from min to target
// operational |target position| / velocity
// accelerating |
// decelerating post-abort residue, same as operational
// starting remaining-in-starting + full warmup + ramp from min
// warmingup remaining-in-warmingup + ramp from min
// stopping | coolingdown non-interruptible deload; cannot contribute flow
// in this dispatch — returns null so the scheduler
// can exclude the machine from "up" candidates.
//
// Velocity of 0 returns Infinity (misconfigured speed) so the scheduler
// can demote the machine without crashing.
const ACTIVE_OPERATIONAL = new Set(['operational', 'accelerating', 'decelerating']);
const STARTUP_LADDER = new Set(['starting', 'warmingup']);
const SHUTDOWN_LADDER = new Set(['stopping', 'coolingdown']);
class MoveTrajectory {
constructor(profile, { targetPosition } = {}) {
if (!profile || typeof profile !== 'object') {
throw new TypeError('MoveTrajectory: profile is required');
}
if (!Number.isFinite(targetPosition)) {
throw new TypeError('MoveTrajectory: targetPosition must be a finite number');
}
this.profile = profile;
this.targetPosition = this._clampToBounds(targetPosition);
}
_clampToBounds(p) {
const { minPosition, maxPosition } = this.profile;
if (Number.isFinite(minPosition) && p < minPosition) return minPosition;
if (Number.isFinite(maxPosition) && p > maxPosition) return maxPosition;
return p;
}
// Seconds from "fire" until the machine is delivering flow at
// targetPosition. Null when the machine is in a non-contributing
// (shutting-down) state.
etaToTargetS() {
const p = this.profile;
const v = p.velocityPctPerS;
const target = this.targetPosition;
if (SHUTDOWN_LADDER.has(p.state)) return null;
if (!Number.isFinite(v) || v <= 0) return Infinity;
if (p.state === 'operational' || ACTIVE_OPERATIONAL.has(p.state)) {
const dist = Math.abs(target - p.position);
return dist / v;
}
if (p.state === 'warmingup') {
// Remaining warmup, then ramp from minPosition to target.
// Ramp starts from minPosition because the pump is not moving
// during warmup — position is held at min.
const remW = p.remainingTransitionS ?? p.timings.warmingupS;
const rampDist = Math.max(0, target - p.minPosition);
return remW + rampDist / v;
}
if (p.state === 'starting') {
// Remaining-in-starting + full warmup duration + ramp from min.
const remS = p.remainingTransitionS ?? p.timings.startingS;
const rampDist = Math.max(0, target - p.minPosition);
return remS + p.timings.warmingupS + rampDist / v;
}
// idle / off / emergencystop / maintenance / any non-active state
// not in the ladders: full startup sequence to operational, then ramp.
const rampDist = Math.max(0, target - p.minPosition);
return p.timings.startingS + p.timings.warmingupS + rampDist / v;
}
}
MoveTrajectory.SHUTDOWN_LADDER = SHUTDOWN_LADDER;
MoveTrajectory.STARTUP_LADDER = STARTUP_LADDER;
module.exports = MoveTrajectory;

View File

@@ -0,0 +1,121 @@
'use strict';
// Tick-driven executor for the schedule produced by movementScheduler.plan.
//
// - Holds the current schedule + a cursor that advances one per tick().
// - Fires any unfired command whose fireAtTickN <= cursor.
// - replan(newSchedule) replaces the schedule and resets the cursor —
// already-fired commands stay fired (the pump's FSM is downstream and
// handles their consequences; the executor never tries to "undo" a
// fired startup, which keeps warmup/cooldown safety intact).
// - fireCommand is injected for unit-testability — production wires it to
// `machine.handleInput(...)`.
class MovementExecutor {
constructor({ fireCommand, logger } = {}) {
if (typeof fireCommand !== 'function') {
throw new TypeError('MovementExecutor: fireCommand callback is required');
}
this._fireCommand = fireCommand;
this._logger = logger || null;
this._schedule = null;
this._cursor = 0;
this._firedIdx = new Set();
// Wall-clock anchor for the active schedule. Each tick recomputes
// a "virtual cursor" from elapsed time so the schedule survives a
// blocking first tick (e.g. an awaited startup sequence that takes
// multiple seconds to settle).
this._dispatchT0 = null;
}
// Replace the active schedule. Cursor starts at 0 (new dispatch is
// anchored to "now"). The previous schedule's unfired commands are
// dropped; already-fired commands are not retracted.
replan(schedule) {
this._schedule = schedule || { commands: [] };
this._cursor = 0;
this._firedIdx = new Set();
this._dispatchT0 = Date.now();
if (this._logger?.debug) {
const cmds = this._schedule.commands || [];
this._logger.debug(`MovementExecutor.replan: ${cmds.length} commands, tStar=${this._schedule.tStarS ?? '?'}s`);
}
}
// Advance one tick. Returns a Promise resolving to the list of
// commands fired this tick once their async work settles. Awaiting
// the FIRST tick from within a dispatch is what gives the new move
// priority over an in-flight shutdown sequence — fire-and-forget
// gives the shutdown's for-loop a window to progress through state
// transitions before the new move's residue handler claims the FSM.
async tick() {
// Virtual cursor = max(advanced cursor, elapsed wall-clock ticks).
// If a previous tick blocked on a long await, elapsed time has
// already passed and we should fire every command whose
// fireAtTickN now lies in the past — not wait another N timer
// cycles to catch up. tickS is stamped on the schedule by the
// planner (defaults to 1 s).
const tickS = Number.isFinite(this._schedule?.tickS) && this._schedule.tickS > 0
? this._schedule.tickS
: 1;
const elapsedS = this._dispatchT0 != null ? (Date.now() - this._dispatchT0) / 1000 : 0;
const wallTick = Math.floor(elapsedS / tickS);
const virtCursor = Math.max(this._cursor, wallTick);
const fired = [];
const cmds = this._schedule?.commands || [];
for (let i = 0; i < cmds.length; i++) {
if (this._firedIdx.has(i)) continue;
const c = cmds[i];
if (c.fireAtTickN <= virtCursor) {
this._firedIdx.add(i);
try {
// Fire-and-forget. The synchronous prologue of
// handleInput claims the latest-wins gate before
// returning its promise — that's enough for race
// favouring. AWAITing the returned promise here
// would block the executor for the entire ladder +
// ramp duration of a flowmovement-after-startup
// (because the pump's delayedMove only resolves
// when the ramp completes), preventing the
// wall-clock timer from starting and dragging every
// delayed command in the schedule forward by that
// amount.
const r = this._fireCommand(c);
if (r && typeof r.then === 'function') {
r.catch((e) => {
if (this._logger?.error) {
this._logger.error(`MovementExecutor: fireCommand rejected for ${c.machineId}/${c.action}: ${e?.message || e}`);
}
});
}
fired.push(c);
} catch (e) {
if (this._logger?.error) {
this._logger.error(`MovementExecutor: fireCommand failed for ${c.machineId}/${c.action}: ${e?.message || e}`);
}
}
}
}
this._cursor = virtCursor + 1;
return fired;
}
// Telemetry — number of commands not yet fired.
pending() {
const cmds = this._schedule?.commands || [];
return cmds.length - this._firedIdx.size;
}
// Telemetry — current tick cursor.
cursor() {
return this._cursor;
}
// Telemetry — the live schedule (read-only view).
schedule() {
return this._schedule;
}
}
module.exports = MovementExecutor;

View File

@@ -0,0 +1,245 @@
'use strict';
// Pure movement planner. Given a set of machine profile snapshots and the
// optimizer's chosen flow combination, returns a tick-indexed schedule of
// commands that minimises flow disruption during the transition.
//
// Algorithm — rendezvous-on-demand-at-current-pressure:
//
// 1. For each machine, classify the move it needs (startup, flow-move
// up, flow-move down, shutdown, no-op) based on its current FSM state
// and the optimizer's target flow for it.
// 2. Compute eta_i (seconds-to-target-flow) per machine via
// MoveTrajectory. Machines that can't contribute on this dispatch
// (stopping / coolingdown / unknown) are skipped.
// 3. Rendezvous time t* = max(eta_i over ALL non-noop moves). The
// slowest move (typically a startup ladder + ramp) sets the deadline.
// 4. Every command is delayed by (t* eta_j) so it FINISHES at t*.
// Exception: a startup's `execsequence` command must fire NOW so the
// ladder can begin — its own duration is what defines eta and thus
// t* — but the startup's queued flowmovement (held in the pump's
// delayedMove) lands at t* by construction.
//
// Net effect: ALL pumps reach their per-pump flow target at the same
// wall-clock instant t*. Sum-of-flows is monotonic during the transition
// (no overshoot from a fast in-flight retarget arriving before the
// startup pumps catch up).
//
// The pump's flow→position conversion (via predictCtrl.y) lives in the
// profile so this module is pure: no Node-RED calls, no live child reads.
const MoveTrajectory = require('./moveTrajectory');
const ACTIVE_STATES = new Set(['operational', 'accelerating', 'decelerating']);
const STARTUP_LADDER = new Set(['starting', 'warmingup']);
const SHUTDOWN_LADDER = new Set(['stopping', 'coolingdown']);
// Tick cadence — MGC main loop is 1 Hz per .claude/rules tick convention.
const DEFAULT_TICK_S = 1;
function isOn(state) {
return ACTIVE_STATES.has(state) || STARTUP_LADDER.has(state);
}
// Classify the action a machine needs. The optimizer's combination is a
// canonical statement of "what flow should this machine deliver now."
// `targetFlow == 0` (or absence from combination) means "this machine is
// not part of the new combination."
function classify(profile, targetFlow) {
const isOff = !isOn(profile.state) && !SHUTDOWN_LADDER.has(profile.state);
if (targetFlow > 0) {
if (isOff) return 'startup';
return 'flowmove'; // up or down depending on current vs target
}
// targetFlow <= 0
if (ACTIVE_STATES.has(profile.state) || STARTUP_LADDER.has(profile.state)) {
return 'shutdown';
}
return 'noop';
}
// Direction in flow-space: increasing, decreasing, or unchanged. Drives
// rendezvous: t* is the max eta over INCREASING moves; DECREASING moves
// get delayed to land at t*.
function directionOf(profile, targetFlow) {
if (!isOn(profile.state)) return targetFlow > 0 ? 'increasing' : 'unchanged';
const currentFlow = Number.isFinite(profile.flowAt?.(profile.position, profile._pressureForClassification))
? profile.flowAt(profile.position, profile._pressureForClassification)
: null;
if (currentFlow == null) {
// Without a current-flow read, assume increasing iff target > 0.
return targetFlow > 0 ? 'increasing' : 'decreasing';
}
if (targetFlow > currentFlow) return 'increasing';
if (targetFlow < currentFlow) return 'decreasing';
return 'unchanged';
}
// Plan the schedule.
//
// profiles — array from buildProfile(child)
// combination — array of {machineId, flow} from optimizer
// currentPressure — Pa, for flow→flow and flow→position conversions
// options — { tickS?: 1, useRendezvous?: true }
//
// useRendezvous=false collapses the schedule to "all commands fire at
// tick 0" — every pump moves at its own speed and lands at its own eta.
// Used when the operator explicitly opts out of same-time landing.
function plan(profiles, combination, currentPressure, options = {}) {
const tickS = Number.isFinite(options.tickS) && options.tickS > 0 ? options.tickS : DEFAULT_TICK_S;
const useRendezvous = options.useRendezvous !== false;
const targets = new Map();
for (const item of combination || []) {
if (item && item.machineId != null) targets.set(String(item.machineId), Number(item.flow) || 0);
}
// First pass: classify + compute eta per machine.
const plans = [];
for (const p of profiles) {
const id = String(p.id);
const targetFlow = targets.get(id) ?? 0;
// Stash pressure on a copy of the profile so directionOf can read it
// without changing the public profile shape. Non-mutating: classify
// only needs the value during this pass.
const probeProfile = Object.assign({}, p, { _pressureForClassification: currentPressure });
const action = classify(p, targetFlow);
const direction = directionOf(probeProfile, targetFlow);
if (action === 'noop') {
plans.push({ machineId: id, action, direction, eta: 0, targetFlow, skip: true });
continue;
}
// Convert target flow to target position using the pump's inverse
// curve (lives on the profile). Fallback: linear interpolation
// across [min,max] using the curve domain we know.
let targetPosition = null;
if (action !== 'shutdown' && typeof p.positionForFlow === 'function') {
targetPosition = p.positionForFlow(targetFlow);
}
if (targetPosition == null) {
// Shutdown: target is the minimum position.
targetPosition = action === 'shutdown' ? (Number.isFinite(p.minPosition) ? p.minPosition : 0) : p.position;
}
let eta;
// Per-pump ladder duration; used to gate the flowmovement so it
// can't fire before warmup completes (the pump won't accept it).
const ladderS = action === 'startup'
? ((Number(p.timings?.startingS) || 0) + (Number(p.timings?.warmingupS) || 0))
: 0;
// Ramp-only portion of the eta. For startup this is eta ladder.
// For flow-move or shutdown the entire eta IS the ramp.
let rampS = 0;
if (action === 'shutdown') {
// Time for flow to reach zero = position ramp from current
// position to minPosition. stoppingS / coolingdownS happen
// AFTER flow is zero; they don't affect rendezvous.
const v = Number(p.velocityPctPerS) > 0 ? p.velocityPctPerS : Infinity;
const dist = Math.max(0, p.position - (p.minPosition ?? 0));
eta = v === Infinity ? 0 : dist / v;
rampS = eta;
} else {
const traj = new MoveTrajectory(p, { targetPosition });
eta = traj.etaToTargetS();
if (eta == null) eta = Infinity; // shouldn't happen for non-shutdown actions, but defensive
rampS = Math.max(0, Number.isFinite(eta) ? eta - ladderS : 0);
}
plans.push({ machineId: id, action, direction, eta, ladderS, rampS, targetFlow, targetPosition, skip: false });
}
// Rendezvous: t* = max eta over ALL non-noop moves. Includes
// increasing AND decreasing flow-moves so the slowest mover sets the
// deadline for everyone. When useRendezvous=false, tStar is forced
// to 0 so every command's delay collapses to 0 (legacy behaviour).
const allEtas = plans
.filter((q) => !q.skip && Number.isFinite(q.eta))
.map((q) => q.eta);
const tStar = useRendezvous && allEtas.length > 0 ? Math.max(...allEtas) : 0;
// Second pass: assign fireAtTickN. Every command is delayed so its
// move finishes at t*; the lone exception is the startup ladder's
// execsequence (the ladder must begin now because eta == ladder + ramp).
const commands = [];
for (const q of plans) {
if (q.skip) continue;
// Delay-to-rendezvous: fire (t* eta) seconds from now so the
// move FINISHES at t*. Clamped to >= 0 (the eta == t* mover fires
// immediately).
const fireAtSDelayed = Math.max(0, tStar - q.eta);
const fireAtTickNDelayed = Math.round(fireAtSDelayed / tickS);
// Unchanged moves are no-ops; fire at 0 for simplicity (the pump
// ignores them and we don't pollute the schedule with delays).
const isUnchanged = q.direction === 'unchanged';
if (q.action === 'startup') {
// execsequence MUST begin NOW — the ladder duration is
// baked into eta and can't be compressed.
commands.push({
machineId: q.machineId,
action: 'execsequence',
sequence: 'startup',
fireAtTickN: 0,
eta: q.eta,
});
// flowmovement timing.
//
// Default behaviour: queue it at tick 0; the pump's
// delayedMove holds it until warmup completes, after which
// the pump ramps at its own velocity. That ramp finishes at
// ladderS + rampS = eta. For a single pump (eta == tStar)
// this naturally lands at tStar — no extra delay needed.
//
// Mixed-speed multi-startup: if this pump is FASTER than
// the slowest one, its natural landing (at its own eta)
// is EARLIER than tStar. Delay the flowmovement so the
// ramp starts at (tStar rampS), making the ramp finish
// at tStar regardless of per-pump speed.
const naturalRampStartS = q.ladderS;
const rendezvousRampStartS = tStar - q.rampS;
const flowMoveFireAtS = rendezvousRampStartS > naturalRampStartS
? rendezvousRampStartS
: 0;
commands.push({
machineId: q.machineId,
action: 'flowmovement',
flow: q.targetFlow,
fireAtTickN: Math.max(0, Math.round(flowMoveFireAtS / tickS)),
eta: q.eta,
});
} else if (q.action === 'flowmove') {
commands.push({
machineId: q.machineId,
action: 'flowmovement',
flow: q.targetFlow,
// Unchanged moves are no-ops; fire immediately so we
// don't park them behind a long startup ladder for no
// reason. Up/down moves both delay so they land at t*.
fireAtTickN: isUnchanged ? 0 : fireAtTickNDelayed,
eta: q.eta,
});
} else if (q.action === 'shutdown') {
commands.push({
machineId: q.machineId,
action: 'execsequence',
sequence: 'shutdown',
fireAtTickN: fireAtTickNDelayed,
eta: q.eta,
});
}
}
return {
tStarS: tStar,
tickS,
commands,
// Debugging telemetry — kept in the output so tests can introspect.
_plans: plans,
};
}
module.exports = { plan, DEFAULT_TICK_S };

View File

@@ -19,6 +19,9 @@ class nodeClass extends BaseNodeAdapter {
const out = {};
if (uiConfig.mode) out.mode = { current: uiConfig.mode };
if (uiConfig.scaling) out.scaling = { current: uiConfig.scaling };
if (uiConfig.useRendezvous !== undefined) {
out.planner = { useRendezvous: uiConfig.useRendezvous };
}
return out;
}
}

View File

@@ -21,9 +21,23 @@ const GroupEfficiency = require('./efficiency/groupEfficiency');
const control = require('./control/strategies');
const io = require('./io/output');
const DemandDispatcher = require('./dispatch/demandDispatcher');
const { buildProfile } = require('./movement/machineProfile');
const movementScheduler = require('./movement/movementScheduler');
const MovementExecutor = require('./movement/movementExecutor');
const ACTIVE_STATES = new Set(['operational', 'accelerating', 'decelerating']);
// Canonical mode names (camelCase). The dispatcher already lowercases for its
// switch, but we normalise at setMode so this.mode is always in the canonical
// form — keeps allowedActions/allowedSources lookups (which key on the
// canonical form) honest. Module-level so tests can import without spinning
// up a full MachineGroup instance.
const ALLOWED_MODES = ['optimalControl', 'priorityControl', 'maintenance'];
function _normaliseMode(input) {
const lc = String(input || '').toLowerCase();
return ALLOWED_MODES.find((m) => m.toLowerCase() === lc) || null;
}
class MachineGroup extends BaseDomain {
static name = 'machineGroupControl';
@@ -41,7 +55,12 @@ class MachineGroup extends BaseDomain {
// tests still write directly (matches the pumpingStation pattern).
this.machines = {};
this.mode = this.config.mode.current;
// Persisted flows may have stored the mode in lowercase (legacy editor
// behaviour); normalise at construction so allow-list lookups against
// the schema's camelCase keys work consistently. Fallback to
// optimalControl if the persisted value is missing/garbage so a typo
// doesn't quietly disable dispatch.
this.mode = _normaliseMode(this.config.mode.current) || 'optimalControl';
this.absDistFromPeak = 0;
this.relDistFromPeak = 0;
this.dynamicTotals = { flow: { min: Infinity, max: 0 }, power: { min: Infinity, max: 0 }, NCog: 0 };
@@ -56,6 +75,16 @@ class MachineGroup extends BaseDomain {
);
this._shutdownInFlight = new Set();
// Tick-driven executor for the movement schedule produced by the
// planner. MGC owns the wall-clock setInterval that calls tick();
// the executor itself is pure (testable without timers).
this.movementExecutor = new MovementExecutor({
logger: this.logger,
fireCommand: (cmd) => this._fireSchedulerCommand(cmd),
});
this._executorTimer = null;
this._executorIntervalMs = 1000;
this.operatingPoint = new GroupOperatingPoint({
measurements: this.measurements,
machines: this.machines,
@@ -119,7 +148,31 @@ class MachineGroup extends BaseDomain {
}
// ── Surface kept for tests + commands ──────────────────────────────
setMode(mode) { this.mode = mode; this.notifyOutputChanged(); }
// Mirror of rotatingMachine/src/specificClass.js:329-339 — same pattern,
// mode/source allow-lists live in this.config.mode (loaded from the
// schema as Set instances). Anything not declared in the schema is
// dropped silently with a warn-level log.
isValidActionForMode(action, mode) {
const ok = !!this.config?.mode?.allowedActions?.[mode]?.has?.(action);
if (ok) this.logger.debug(`action '${action}' allowed in mode '${mode}'`);
else this.logger.warn(`action '${action}' not allowed in mode '${mode}'`);
return ok;
}
isValidSourceForMode(source, mode) {
const ok = !!this.config?.mode?.allowedSources?.[mode]?.has?.(source);
if (ok) this.logger.debug(`source '${source}' allowed in mode '${mode}'`);
else this.logger.warn(`source '${source}' not allowed in mode '${mode}'`);
return ok;
}
setMode(mode) {
const canonical = _normaliseMode(mode);
if (!canonical) {
this.logger.warn(`Invalid mode '${mode}'. Allowed: ${ALLOWED_MODES.join(', ')}`);
return;
}
this.mode = canonical;
this.notifyOutputChanged();
}
isMachineActive(id) {
const s = this.machines[id]?.state?.getCurrentState?.();
return ACTIVE_STATES.has(s);
@@ -223,20 +276,80 @@ class MachineGroup extends BaseDomain {
}
this.measurements.type('Ncog').variant('predicted').position(POSITIONS.AT_EQUIPMENT).value(bestResult.bestCog);
await Promise.all(Object.entries(this.machines).map(async ([id, machine]) => {
const pumpInfo = bestResult.bestCombination.find(it => it.machineId == id);
const flow = pumpInfo ? pumpInfo.flow : 0;
const state = machineStates[id];
// flowmovement BEFORE startup so concurrent retargets update
// delayedMove without a stale chained flowmovement landing
// post-startup — see idle-startup-deadlock Scenario 4.
if (flow > 0) {
await machine.handleInput('parent', 'flowmovement', this._canonicalToOutputFlow(flow));
if (state === 'idle') await machine.handleInput('parent', 'execsequence', 'startup');
} else if (ACTIVE_STATES.has(state)) {
await machine.handleInput('parent', 'execsequence', 'shutdown');
const distribution = bestResult.bestCombination.map((it) => ({ machineId: String(it.machineId), flow: it.flow }));
await this._dispatchFlowDistribution(distribution);
}
// Shared dispatch path used by every control strategy. Takes a flow
// distribution {machineId, flow}[] and routes it through the planner
// and executor. Same-time-landing (rendezvous) is the default and can
// be turned off via config.planner.useRendezvous, in which case every
// command fires at tick 0 (legacy fire-and-forget behaviour, like the
// pre-planner equalFlowControl).
async _dispatchFlowDistribution(distribution) {
const profiles = Object.values(this.machines).map((m) => buildProfile(m));
const headerPa = Number.isFinite(this.operatingPoint.headerDiffPa) ? this.operatingPoint.headerDiffPa : 0;
const useRendezvous = this.config?.planner?.useRendezvous !== false; // default true
const schedule = movementScheduler.plan(profiles, distribution, headerPa, { tickS: 1, useRendezvous });
this.movementExecutor.replan(schedule);
// AWAIT the first tick to preserve the race-favouring behaviour
// of the original code. The new move's full chain (residue
// handler → operational → ramp) settles before _runDispatch
// returns; the in-flight shutdown sequence's for-loop runs on
// other microtasks but its invalid-transition exits truncate it.
await this.movementExecutor.tick();
this._ensureExecutorTimer();
if (this.logger?.debug) {
this.logger.debug(`MGC planner: ${schedule.commands.length} commands queued, tStar=${schedule.tStarS.toFixed(1)}s, rendezvous=${useRendezvous}`);
}
}
// Dispatch one scheduled command to the appropriate child. Returns
// synchronously — the underlying handleInput is fire-and-forget from
// the executor's perspective, mirroring the existing optimal-control
// behaviour where commands are scheduled, not awaited.
_fireSchedulerCommand(cmd) {
const machine = this.machines[cmd.machineId];
if (!machine) {
this.logger?.warn?.(`Scheduler fired ${cmd.action} for unknown machine ${cmd.machineId}`);
return undefined;
}
const handle = typeof machine.handleInput === 'function' ? machine.handleInput.bind(machine) : null;
if (!handle) return undefined;
if (cmd.action === 'execsequence') {
return Promise.resolve(handle('parent', 'execsequence', cmd.sequence))
.catch((e) => this.logger?.error?.(`execsequence ${cmd.sequence} on ${cmd.machineId} failed: ${e?.message || e}`));
}
if (cmd.action === 'flowmovement') {
const outFlow = this._canonicalToOutputFlow(cmd.flow);
return Promise.resolve(handle('parent', 'flowmovement', outFlow))
.catch((e) => this.logger?.error?.(`flowmovement on ${cmd.machineId} failed: ${e?.message || e}`));
}
return undefined;
}
// Wall-clock driver for the executor. Auto-stops when there's nothing
// pending so we don't burn a forever-running setInterval.
_ensureExecutorTimer() {
if (this._executorTimer) return;
this._executorTimer = setInterval(() => {
this.movementExecutor.tick();
if (this.movementExecutor.pending() === 0) {
clearInterval(this._executorTimer);
this._executorTimer = null;
}
}));
}, this._executorIntervalMs);
// Unref so the timer doesn't keep Node-RED alive on shutdown.
if (typeof this._executorTimer.unref === 'function') this._executorTimer.unref();
}
// Stop the executor's wall-clock driver. Called from teardown paths.
_stopExecutorTimer() {
if (this._executorTimer) {
clearInterval(this._executorTimer);
this._executorTimer = null;
}
}
// Returns when THIS call's dispatch settles. If overwritten by a later
@@ -311,3 +424,6 @@ class MachineGroup extends BaseDomain {
}
module.exports = MachineGroup;
// Module-level helpers exposed for unit tests.
module.exports._normaliseMode = _normaliseMode;
module.exports.ALLOWED_MODES = ALLOWED_MODES;