P6: convert valve to BaseDomain + BaseNodeAdapter + concern split

Refactor of valve to use the platform infrastructure (BaseDomain, BaseNodeAdapter,
ChildRouter, commandRegistry, statusBadge). Extracts concerns into
focused modules per .claude/refactor/MODULE_SPLIT.md generic template.
Tests stay green; CONTRACT.md generated; legacy aliases preserved.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
znetsixe
2026-05-10 22:09:22 +02:00
parent ae5bc750cd
commit e27135bdc4
11 changed files with 984 additions and 1194 deletions

66
src/commands/handlers.js Normal file
View File

@@ -0,0 +1,66 @@
'use strict';
// Valve command handlers. Each receives (source, msg, ctx) where source is
// the domain instance, msg is the incoming Node-RED message, and ctx carries
// { node, RED, send, logger } per BaseNodeAdapter.
function _logger(source, ctx) { return ctx?.logger || source?.logger || null; }
function _send(ctx, ports) { if (typeof ctx?.send === 'function') ctx.send(ports); }
exports.setMode = (source, msg) => {
source.setMode(msg.payload);
};
exports.startup = async (source, msg) => {
const p = msg.payload || {};
await source.handleInput(p.source ?? 'parent', 'execSequence', 'startup');
};
exports.shutdown = async (source, msg) => {
const p = msg.payload || {};
await source.handleInput(p.source ?? 'parent', 'execSequence', 'shutdown');
};
exports.estop = async (source, msg) => {
const p = msg.payload || {};
await source.handleInput(p.source ?? 'parent', p.action ?? 'emergencystop');
};
// Legacy umbrella: payload.action selects the canonical verb.
exports.execSequenceAlias = async (source, msg, ctx) => {
const log = _logger(source, ctx);
const action = msg?.payload?.action;
if (action === 'startup') return exports.startup(source, msg, ctx);
if (action === 'shutdown') return exports.shutdown(source, msg, ctx);
if (action === 'emergencyStop' || action === 'emergencystop') {
return exports.estop(source, msg, ctx);
}
log?.warn?.(`execSequence: unsupported action '${action}'`);
};
exports.setPosition = async (source, msg) => {
const p = msg.payload || {};
const action = p.action ?? 'execMovement';
await source.handleInput(p.source ?? 'parent', action, Number(p.setpoint));
};
exports.dataFlow = (source, msg) => {
const p = msg.payload || {};
source.updateFlow(p.variant, p.value, p.position, p.unit || source.unitPolicyView?.output?.flow);
};
exports.queryCurve = (source, msg, ctx) => {
const reply = Object.assign({}, msg, { topic: 'Showing curve', payload: source.showCurve() });
_send(ctx, [reply, null, null]);
};
exports.registerChild = (source, msg, ctx) => {
const log = _logger(source, ctx);
const childId = msg.payload;
const childObj = ctx?.RED?.nodes?.getNode?.(childId);
if (!childObj || !childObj.source) {
log?.warn?.(`registerChild: child '${childId}' not found or has no .source`);
return;
}
source.childRegistrationUtils.registerChild(childObj.source, msg.positionVsParent);
};

20
src/commands/index.js Normal file
View File

@@ -0,0 +1,20 @@
'use strict';
// valve command registry — consumed by BaseNodeAdapter. Canonical topics
// follow CONTRACTS.md §1; legacy names are kept as aliases (one-time
// deprecation warning when fired).
const handlers = require('./handlers');
module.exports = [
{ topic: 'set.mode', aliases: ['setMode'], payloadSchema: { type: 'string' }, handler: handlers.setMode },
{ topic: 'cmd.startup', payloadSchema: { type: 'any' }, handler: handlers.startup },
{ topic: 'cmd.shutdown', payloadSchema: { type: 'any' }, handler: handlers.shutdown },
{ topic: 'cmd.estop', aliases: ['emergencystop', 'emergencyStop'], payloadSchema: { type: 'any' }, handler: handlers.estop },
// Content-based demux; behaviour matches cmd.startup/cmd.shutdown exactly.
{ topic: 'execSequence', payloadSchema: { type: 'object' }, handler: handlers.execSequenceAlias, _legacy: true },
{ topic: 'set.position', aliases: ['execMovement'], payloadSchema: { type: 'object' }, handler: handlers.setPosition },
{ topic: 'data.flow', aliases: ['updateFlow'], payloadSchema: { type: 'object' }, handler: handlers.dataFlow },
{ topic: 'query.curve', aliases: ['showcurve'], payloadSchema: { type: 'any' }, handler: handlers.queryCurve },
{ topic: 'child.register', aliases: ['registerChild'], payloadSchema: { type: 'string' }, handler: handlers.registerChild },
];

View File

@@ -0,0 +1,95 @@
'use strict';
const { loadCurve, predict } = require('generalFunctions');
const FALLBACK_SUPPLIER_CURVE = Object.freeze({
'1.204': { '125': { x: [0, 100], y: [0, 1] } },
});
function isValidCurveData(curveData) {
if (!curveData || typeof curveData !== 'object') return false;
const dKeys = Object.keys(curveData);
if (!dKeys.length) return false;
for (const dk of dKeys) {
const diameters = curveData[dk];
if (!diameters || typeof diameters !== 'object') return false;
const diaKeys = Object.keys(diameters);
if (!diaKeys.length) return false;
for (const k of diaKeys) {
const c = diameters[k];
if (!Array.isArray(c?.x) || !Array.isArray(c?.y) || c.x.length < 2 || c.x.length !== c.y.length) return false;
}
}
return true;
}
function pickNearestNumericKey(keys, target) {
const numeric = keys.map((k) => Number(k)).filter((v) => Number.isFinite(v));
if (!numeric.length) return String(target);
let selected = numeric[0];
let dist = Math.abs(selected - target);
for (const k of numeric) {
const d = Math.abs(k - target);
if (d < dist) { selected = k; dist = d; }
}
return String(selected);
}
class SupplierCurvePredictor {
constructor({ logger, model, configCurve, defaultDensity, defaultTemperatureK, rho, temperatureK, valveDiameter }) {
this.logger = logger;
this.model = model;
this.curve = model ? loadCurve(model) : null;
this._configCurve = configCurve;
this.defaultDensity = defaultDensity;
this.defaultTemperatureK = defaultTemperatureK;
this.rho = Number.isFinite(rho) && rho > 0 ? rho : defaultDensity;
this.T = Number.isFinite(temperatureK) && temperatureK > 0 ? temperatureK : defaultTemperatureK;
this._init(valveDiameter);
}
_init(valveDiameter) {
const supplierCurve = this._resolveData();
const densityTarget = Number.isFinite(this.rho) && this.rho > 0 ? this.rho : this.defaultDensity;
const densityKey = pickNearestNumericKey(Object.keys(supplierCurve), densityTarget);
const densityCurveFamily = supplierCurve[densityKey];
const diaTarget = Number(valveDiameter);
const diameterKey = pickNearestNumericKey(
Object.keys(densityCurveFamily || {}),
Number.isFinite(diaTarget) && diaTarget > 0 ? diaTarget : 125
);
this.curveSelection = { densityKey: Number(densityKey), diameterKey: Number(diameterKey) };
this.predictKv = new predict({ curve: densityCurveFamily || FALLBACK_SUPPLIER_CURVE['1.204'] });
this.predictKv.fDimension = this.curveSelection.diameterKey;
}
_resolveData() {
if (isValidCurveData(this.curve)) return this.curve;
if (isValidCurveData(this._configCurve)) return this._configCurve;
this.logger.warn('No valid supplier curve data found, using fallback curve.');
return FALLBACK_SUPPLIER_CURVE;
}
predictKvForPosition(positionPercent) {
if (!this.predictKv) return 0.1;
try {
this.predictKv.fDimension = this.curveSelection?.diameterKey || this.predictKv.fDimension;
const kv = Number(this.predictKv.y(positionPercent));
if (!Number.isFinite(kv)) return 0.1;
return Math.max(0.1, kv);
} catch (error) {
this.logger.warn(`Failed to predict Kv for position=${positionPercent}: ${error.message}`);
return 0.1;
}
}
snapshot() {
return {
selectedDensity: this.curveSelection?.densityKey ?? null,
selectedDiameter: this.curveSelection?.diameterKey ?? null,
curve: this.predictKv?.currentFxyCurve?.[this.predictKv?.fDimension] || null,
};
}
}
module.exports = { SupplierCurvePredictor, FALLBACK_SUPPLIER_CURVE };

View File

@@ -0,0 +1,84 @@
'use strict';
// Sequence + setpoint execution. Mirrors the pre-refactor Valve.handleInput
// switch but delegates state transitions to host.state. Pre-shutdown ramp-down
// to 0 happens here so the existing test contract holds.
class FlowController {
constructor(host) {
this.host = host;
this.logger = host.logger;
}
isValidSourceForMode(source, mode) {
const allowed = this.host.config.mode.allowedSources[mode] || [];
return allowed.has(source);
}
async handleInput(source, action, parameter) {
if (!this.isValidSourceForMode(source, this.host.currentMode)) {
const msg = `Source '${source}' is not valid for mode '${this.host.currentMode}'.`;
this.logger.warn(msg);
return { status: false, feedback: msg };
}
this.logger.info(`Handling input from source '${source}' with action '${action}' in mode '${this.host.currentMode}'.`);
try {
switch (action) {
case 'execSequence':
await this.executeSequence(parameter);
break;
case 'execMovement':
await this.setpoint(parameter);
break;
case 'emergencyStop':
case 'emergencystop':
this.logger.warn(`Emergency stop activated by '${source}'.`);
await this.executeSequence('emergencystop');
break;
case 'statusCheck':
this.logger.info(`Status Check: Mode = '${this.host.currentMode}', Source = '${source}'.`);
break;
default:
this.logger.warn(`Action '${action}' is not implemented.`);
}
this.logger.debug(`Action '${action}' successfully executed`);
return { status: true, feedback: `Action '${action}' successfully executed.` };
} catch (error) {
this.logger.error(`Error handling input: ${error}`);
}
}
async executeSequence(sequenceName) {
const sequence = this.host.config.sequences[sequenceName];
if (!sequence || sequence.size === 0) {
this.logger.warn(`Sequence '${sequenceName}' not defined.`);
return;
}
if (this.host.state.getCurrentState() === 'operational' && sequenceName === 'shutdown') {
this.logger.info(`Machine will ramp down to position 0 before performing ${sequenceName} sequence`);
await this.setpoint(0);
}
this.logger.info(` --------- Executing sequence: ${sequenceName} -------------`);
for (const stateName of sequence) {
try {
await this.host.state.transitionToState(stateName);
} catch (error) {
this.logger.error(`Error during sequence '${sequenceName}': ${error}`);
break;
}
}
}
async setpoint(value) {
try {
if (typeof value !== 'number' || value < 0) {
throw new Error('Invalid setpoint: Setpoint must be a non-negative number.');
}
await this.host.state.moveTo(value);
} catch (error) {
this.logger.error(`Error setting setpoint: ${error}`);
}
}
}
module.exports = FlowController;

View File

@@ -0,0 +1,201 @@
'use strict';
const SERVICE_TYPES = new Set(['gas', 'liquid']);
const DEFAULT_SOURCE_SERVICE_TYPE = Object.freeze({
machine: 'liquid',
rotatingmachine: 'liquid',
machinegroup: 'liquid',
machinegroupcontrol: 'liquid',
pumpingstation: 'liquid',
});
function normalizeOptional(value) {
const raw = String(value || '').trim().toLowerCase();
return SERVICE_TYPES.has(raw) ? raw : null;
}
function defaultForSoftwareType(softwareType) {
const key = String(softwareType || '').trim().toLowerCase();
return DEFAULT_SOURCE_SERVICE_TYPE[key] || null;
}
class FluidCompatibility {
constructor({ logger, emitter, expectedServiceType }) {
this.logger = logger;
this.emitter = emitter;
this.expectedServiceType = expectedServiceType || null;
this.upstreamFluidSources = new Map();
this._fluidContractListeners = new Map();
this.state = {
status: this.expectedServiceType ? 'pending' : 'unknown',
expectedServiceType: this.expectedServiceType,
receivedServiceType: null,
upstreamServiceTypes: [],
sourceCount: 0,
message: this.expectedServiceType
? `Waiting for upstream fluid contract (${this.expectedServiceType}).`
: 'No upstream fluid contract available.',
};
}
registerChild(child, softwareType) {
if (!child || typeof child !== 'object') {
this.logger.warn('registerChild skipped: invalid child payload');
return false;
}
const sourceType = String(softwareType || child?.config?.functionality?.softwareType || '').trim().toLowerCase();
const sourceId = child?.config?.general?.id
|| child?.config?.general?.name
|| `source-${this.upstreamFluidSources.size + 1}`;
const contract = this._extractContract(child, sourceType);
this.upstreamFluidSources.set(sourceId, { child, sourceType, contract });
this._bindListener(sourceId, child, sourceType);
this._updateState();
this.logger.info(`Source '${sourceId}' (${sourceType || 'unknown'}) registered for fluid contract.`);
return true;
}
_extractContract(child, softwareType) {
const sourceType = String(softwareType || child?.config?.functionality?.softwareType || '').trim().toLowerCase();
let fromChild = null;
if (typeof child?.getFluidContract === 'function') {
try { fromChild = child.getFluidContract(); }
catch (error) { this.logger.warn(`Failed to read child fluid contract: ${error.message}`); }
}
const status = String(fromChild?.status || '').trim().toLowerCase();
if (status === 'conflict') return { status: 'conflict', serviceType: null, sourceType };
const contractType = normalizeOptional(fromChild?.serviceType);
if (contractType) return { status: 'resolved', serviceType: contractType, sourceType };
const directType = normalizeOptional(
child?.serviceType || child?.expectedServiceType || child?.config?.asset?.serviceType
);
if (directType) return { status: 'resolved', serviceType: directType, sourceType };
const fallback = defaultForSoftwareType(sourceType);
if (fallback) return { status: 'inferred', serviceType: fallback, sourceType };
return { status: 'unknown', serviceType: null, sourceType };
}
_bindListener(sourceId, child, sourceType) {
if (!sourceId || this._fluidContractListeners.has(sourceId)) return;
if (!child?.emitter || typeof child.emitter.on !== 'function') return;
const handler = () => {
const latest = this._extractContract(child, sourceType);
const existing = this.upstreamFluidSources.get(sourceId) || {};
existing.contract = latest;
this.upstreamFluidSources.set(sourceId, existing);
this._updateState();
};
child.emitter.on('fluidContractChange', handler);
this._fluidContractListeners.set(sourceId, { emitter: child.emitter, handler });
}
_computeSnapshot() {
const expectedServiceType = this.expectedServiceType || null;
const contracts = Array.from(this.upstreamFluidSources.values())
.map((entry) => entry?.contract)
.filter(Boolean);
const upstreamServiceTypes = Array.from(new Set(
contracts.map((c) => normalizeOptional(c.serviceType)).filter(Boolean)
));
const hasConflict = contracts.some((c) => String(c.status || '').toLowerCase() === 'conflict');
const sourceCount = this.upstreamFluidSources.size;
if (hasConflict || upstreamServiceTypes.length > 1) {
return {
status: 'conflict', expectedServiceType,
receivedServiceType: upstreamServiceTypes.length === 1 ? upstreamServiceTypes[0] : null,
upstreamServiceTypes, sourceCount,
message: `Conflicting upstream fluids detected: ${upstreamServiceTypes.join(', ') || 'unknown'}.`,
};
}
if (upstreamServiceTypes.length === 1) {
const receivedServiceType = upstreamServiceTypes[0];
if (expectedServiceType && expectedServiceType !== receivedServiceType) {
return {
status: 'mismatch', expectedServiceType, receivedServiceType,
upstreamServiceTypes, sourceCount,
message: `Expected ${expectedServiceType}, received ${receivedServiceType}.`,
};
}
return {
status: expectedServiceType ? 'match' : 'inferred',
expectedServiceType, receivedServiceType,
upstreamServiceTypes, sourceCount,
message: expectedServiceType
? `Fluid contract validated: ${receivedServiceType}.`
: `Fluid inferred from upstream: ${receivedServiceType}.`,
};
}
return {
status: expectedServiceType ? 'pending' : 'unknown',
expectedServiceType, receivedServiceType: null,
upstreamServiceTypes: [], sourceCount,
message: expectedServiceType
? `Waiting for upstream fluid contract (${expectedServiceType}).`
: 'No upstream fluid contract available.',
};
}
_updateState() {
const next = this._computeSnapshot();
const prev = this.state || {};
const changed = (
prev.status !== next.status
|| prev.expectedServiceType !== next.expectedServiceType
|| prev.receivedServiceType !== next.receivedServiceType
|| prev.sourceCount !== next.sourceCount
|| (prev.message || '') !== (next.message || '')
);
this.state = next;
if (!changed) return;
if (next.status === 'mismatch' || next.status === 'conflict') {
this.logger.warn(`Fluid compatibility warning: ${next.message}`);
} else {
this.logger.info(`Fluid compatibility update: ${next.message}`);
}
this.emitter.emit('fluidCompatibilityChange', this.getCompatibility());
this.emitter.emit('fluidContractChange', this.getContract());
}
getCompatibility() {
const s = this.state || {};
return {
status: s.status || 'unknown',
expectedServiceType: s.expectedServiceType || null,
receivedServiceType: s.receivedServiceType || null,
upstreamServiceTypes: Array.isArray(s.upstreamServiceTypes) ? [...s.upstreamServiceTypes] : [],
sourceCount: Number(s.sourceCount) || 0,
message: s.message || '',
};
}
getContract() {
const c = this.getCompatibility();
if (c.status === 'conflict') {
return {
status: 'conflict', serviceType: null,
expectedServiceType: c.expectedServiceType,
observedServiceType: c.receivedServiceType,
source: 'valve',
};
}
const advertised = c.expectedServiceType || null;
return {
status: advertised ? 'resolved' : 'unknown',
serviceType: advertised,
expectedServiceType: c.expectedServiceType,
observedServiceType: c.receivedServiceType,
source: 'valve',
};
}
destroy() {
for (const { emitter, handler } of this._fluidContractListeners.values()) {
if (typeof emitter?.off === 'function') emitter.off('fluidContractChange', handler);
else if (typeof emitter?.removeListener === 'function') emitter.removeListener('fluidContractChange', handler);
}
this._fluidContractListeners.clear();
}
}
module.exports = { FluidCompatibility, normalizeOptional, defaultForSoftwareType };

73
src/io/output.js Normal file
View File

@@ -0,0 +1,73 @@
'use strict';
const { statusBadge } = require('generalFunctions');
const STATE_SYMBOLS = {
off: '⬛', idle: '⏸️', operational: '⏵️',
starting: '⏯️', warmingup: '🔄', accelerating: '⏩',
stopping: '⏹️', coolingdown: '❄️', decelerating: '⏪',
};
const STATE_FILL = {
off: 'red', idle: 'blue',
operational: 'green', warmingup: 'green',
starting: 'yellow', accelerating: 'yellow',
stopping: 'yellow', coolingdown: 'yellow', decelerating: 'yellow',
};
const SHOW_METRICS = new Set(['operational', 'warmingup', 'accelerating', 'decelerating']);
function buildOutput(host) {
const output = {};
Object.entries(host.measurements.measurements || {}).forEach(([type, variants]) => {
Object.entries(variants || {}).forEach(([variant, positions]) => {
Object.keys(positions || {}).forEach((position) => {
const unit = host._outputUnitForType(type);
const value = host._readMeasurement(type, variant, position, unit);
if (value != null) output[`${position}_${variant}_${type}`] = value;
});
});
});
output.state = host.state.getCurrentState();
output.percentageOpen = host.state.getCurrentPosition();
output.moveTimeleft = host.state.getMoveTimeLeft();
output.mode = host.currentMode;
return output;
}
function buildStatusBadge(host) {
try {
const mode = host.currentMode;
const stateName = host.state.getCurrentState();
const flowUnit = host.unitPolicyView.output.flow || 'm3/h';
const pressureUnit = host.unitPolicyView.output.pressure || 'mbar';
const flow = Math.round(host.measurements.type('flow').variant('predicted').position('downstream').getCurrentValue(flowUnit));
let deltaP = host.measurements.type('pressure').variant('predicted').position('delta').getCurrentValue(pressureUnit);
if (deltaP !== null && deltaP !== undefined) deltaP = parseFloat(deltaP.toFixed(0));
if (Number.isNaN(deltaP)) deltaP = '∞';
const pos = Math.round(host.state.getCurrentPosition() * 100) / 100;
const symbol = STATE_SYMBOLS[stateName] || '❔';
const fill = STATE_FILL[stateName] || 'grey';
let badge;
if (SHOW_METRICS.has(stateName)) {
badge = statusBadge.compose(
[`${mode}: ${symbol}`, `${pos}%`, `💨${flow}${flowUnit}`, `ΔP${deltaP} ${pressureUnit}`],
{ fill, shape: 'dot' }
);
} else {
badge = statusBadge.compose([`${mode}: ${symbol}`], { fill, shape: 'dot' });
}
const fc = typeof host.getFluidCompatibility === 'function' ? host.getFluidCompatibility() : null;
if (fc && (fc.status === 'mismatch' || fc.status === 'conflict')) {
return { fill: 'yellow', shape: 'ring', text: `${badge.text} | ⚠ ${fc.message}` };
}
return badge;
} catch (err) {
host.logger?.error?.(`getStatusBadge: ${err.message}`);
return statusBadge.error('Status Error');
}
}
module.exports = { buildOutput, buildStatusBadge };

View File

@@ -0,0 +1,120 @@
'use strict';
// Routes incoming pressure/flow measurement updates and triggers the
// hydraulic deltaP recompute. The formula path uses fixed FORMULA_UNITS
// (mbar / m3/h / K) — the hydraulic model multiplies q^2 with rho * T
// and divides by an absolute-pressure term, so unit choices are pinned.
const FORMULA_UNITS = Object.freeze({ pressure: 'mbar', flow: 'm3/h', temperature: 'K' });
class MeasurementRouter {
constructor(host) {
this.host = host;
this.logger = host.logger;
}
updatePressure(variant, value, position, unit) {
const h = this.host;
if (value === null || value === undefined) {
this.logger.warn(`Received null or undefined value for pressure update. Variant: ${variant}, Position: ${position}`);
return;
}
this.logger.debug(`Updating pressure: variant=${variant}, value=${value}, position=${position}`);
const u = unit || h.unitPolicyView.output.pressure;
if (variant === 'measured') {
h._writeMeasurement('pressure', 'measured', position, Number(value), u);
const downstreamP = h._readMeasurement('pressure', 'measured', 'downstream', FORMULA_UNITS.pressure);
const measuredFlow = h._readMeasurement('flow', 'measured', 'downstream', FORMULA_UNITS.flow);
const predictedFlow = h._readMeasurement('flow', 'predicted', 'downstream', FORMULA_UNITS.flow);
const activeFlow = Number.isFinite(predictedFlow) ? predictedFlow : measuredFlow;
this.updateDeltaP(activeFlow, h.kv, downstreamP);
return;
}
if (variant === 'predicted') {
h._writeMeasurement('pressure', 'predicted', position, Number(value), u);
const downstreamP = h._readMeasurement('pressure', 'predicted', 'downstream', FORMULA_UNITS.pressure);
const measuredFlow = h._readMeasurement('flow', 'measured', 'downstream', FORMULA_UNITS.flow);
const predictedFlow = h._readMeasurement('flow', 'predicted', 'downstream', FORMULA_UNITS.flow);
const activeFlow = Number.isFinite(predictedFlow) ? predictedFlow : measuredFlow;
this.updateDeltaP(activeFlow, h.kv, downstreamP);
return;
}
this.logger.warn(`Unrecognized variant '${variant}' for flow update.`);
}
updateFlow(variant, value, position, unit) {
const h = this.host;
if (value === null || value === undefined) {
this.logger.warn(`Received null or undefined value for flow update. Variant: ${variant}, Position: ${position}`);
return;
}
this.logger.debug(`Updating flow: variant=${variant}, value=${value}, position=${position}`);
const u = unit || h.unitPolicyView.output.flow;
if (variant === 'measured') {
h._writeMeasurement('flow', 'measured', position, Number(value), u);
const downstreamP = h._readMeasurement('pressure', 'measured', 'downstream', FORMULA_UNITS.pressure);
const measuredFlow = h._readMeasurement('flow', 'measured', position, FORMULA_UNITS.flow);
this.updateDeltaP(measuredFlow, h.kv, downstreamP);
return;
}
if (variant === 'predicted') {
h._writeMeasurement('flow', 'predicted', position, Number(value), u);
const downstreamP = h._readMeasurement('pressure', 'measured', 'downstream', FORMULA_UNITS.pressure);
const predictedFlow = h._readMeasurement('flow', 'predicted', position, FORMULA_UNITS.flow);
this.updateDeltaP(predictedFlow, h.kv, downstreamP);
return;
}
this.logger.warn(`Unrecognized variant '${variant}' for flow update.`);
}
updateMeasurement(variant, subType, value, position, unit) {
this.logger.debug(`---------------------- updating ${subType} ------------------ `);
switch (subType) {
case 'pressure':
this.updatePressure(variant, value, position, unit || this.host.unitPolicyView.output.pressure);
return;
case 'flow':
this.updateFlow(variant, value, position, unit || this.host.unitPolicyView.output.flow);
return;
case 'power':
return;
default:
this.logger.error(`Type '${subType}' not recognized for measured update.`);
}
}
// q in m3/h, downstreamP in mbar(g), temp in K
updateDeltaP(q, kv, downstreamP) {
const h = this.host;
const result = h.hydraulicModel.calculateDeltaPMbar({
qM3h: q, kv, downstreamGaugeMbar: downstreamP, rho: h.rho, tempK: h.T,
});
if (!result || !Number.isFinite(result.deltaPMbar)) return;
const deltaP = result.deltaPMbar;
h.deltaPKlep = deltaP;
h.hydraulicDiagnostics = result.details || null;
h._writeMeasurement('pressure', 'predicted', 'delta', deltaP, h.unitPolicyView.output.pressure);
this.logger.info('DeltaP updated to: ' + deltaP);
h.emitter.emit('deltaPChange', deltaP);
this.logger.info('DeltaPChange emitted to valveGroupController');
}
updatePositionDependent() {
const h = this.host;
const s = h.state.getCurrentState();
if (s !== 'operational' && s !== 'accelerating' && s !== 'decelerating') return;
this.logger.debug('Calculating new deltaP');
const x = h.state.getCurrentPosition();
const measuredFlow = h._readMeasurement('flow', 'measured', 'downstream', FORMULA_UNITS.flow);
const predictedFlow = h._readMeasurement('flow', 'predicted', 'downstream', FORMULA_UNITS.flow);
const currentFlow = Number.isFinite(predictedFlow) ? predictedFlow : measuredFlow;
const downstreamP = h._readMeasurement('pressure', 'measured', 'downstream', FORMULA_UNITS.pressure);
h.kv = h.curvePredictor.predictKvForPosition(x);
this.logger.debug(`Kv value for position valve ${x} is ${h.kv}`);
this.updateDeltaP(currentFlow, h.kv, downstreamP);
}
}
module.exports = { MeasurementRouter, FORMULA_UNITS };

View File

@@ -1,341 +1,47 @@
/**
* Encapsulates all node logic in a reusable class. In future updates we can split this into multiple generic classes and use the config to specifiy which ones to use.
* This allows us to keep the Node-RED node clean and focused on wiring up the UI and event handlers.
*/
const { outputUtils, configManager, convert } = require('generalFunctions');
const Specific = require("./specificClass");
'use strict';
const { BaseNodeAdapter, convert } = require('generalFunctions');
const Valve = require('./specificClass');
const commands = require('./commands');
class nodeClass {
/**
* Create a MeasurementNode.
* @param {object} uiConfig - Node-RED node configuration.
* @param {object} RED - Node-RED runtime API.
* @param {object} nodeInstance - The Node-RED node instance.
* @param {string} nameOfNode - The name of the node, used for
*/
constructor(uiConfig, RED, nodeInstance, nameOfNode) {
class nodeClass extends BaseNodeAdapter {
static DomainClass = Valve;
static commands = commands;
static tickInterval = null;
static statusInterval = 1000;
// Preserve RED reference for HTTP endpoints if needed
this.node = nodeInstance;
this.RED = RED;
this.name = nameOfNode;
this.source = null; // Will hold the specific class instance
this.config = null; // Will hold the merged configuration
// Load default & UI config
this._loadConfig(uiConfig,this.node);
// Instantiate core Measurement class
this._setupSpecificClass(uiConfig);
// Wire up event and lifecycle handlers
this._bindEvents();
this._registerChild();
this._startTickLoop();
this._attachInputHandler();
this._attachCloseHandler();
}
/**
* Load and merge default config with user-defined settings.
* @param {object} uiConfig - Raw config from Node-RED UI.
*/
_loadConfig(uiConfig,node) {
// Resolve flow unit with validation before building config
const flowUnit = this._resolveUnitOrFallback(uiConfig.unit, 'volumeFlowRate', 'm3/h', 'flow');
const resolvedUiConfig = { ...uiConfig, unit: flowUnit };
// Build config: base sections handle general, asset, functionality
const cfgMgr = new configManager();
this.config = cfgMgr.buildConfig(this.name, resolvedUiConfig, node.id);
// Utility for formatting outputs
this._output = new outputUtils();
}
_resolveUnitOrFallback(candidate, expectedMeasure, fallbackUnit, label) {
const raw = typeof candidate === "string" ? candidate.trim() : "";
const fallback = String(fallbackUnit || "").trim();
if (!raw) {
return fallback;
}
try {
const desc = convert().describe(raw);
if (expectedMeasure && desc.measure !== expectedMeasure) {
throw new Error(`expected '${expectedMeasure}' but got '${desc.measure}'`);
}
return raw;
} catch (error) {
this.node?.warn?.(`Invalid ${label} unit '${raw}' (${error.message}). Falling back to '${fallback}'.`);
return fallback;
}
}
/**
* Instantiate the core logic and store as source.
*/
_setupSpecificClass(uiConfig) {
const vconfig = this.config;
const asNumberOrUndefined = (value) => {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : undefined;
};
// need extra state for this
const stateConfig = {
general: {
logging: {
enabled: vconfig.general.logging.enabled,
logLevel: vconfig.general.logging.logLevel
}
},
movement: {
speed: asNumberOrUndefined(uiConfig.speed)
},
buildDomainConfig(uiConfig) {
const flowUnit = _resolveUnit(uiConfig.unit, 'volumeFlowRate', 'm3/h');
const asNum = (v) => { const n = Number(v); return Number.isFinite(n) ? n : undefined; };
Valve._pendingExtras = {
stateConfig: {
general: { logging: { enabled: uiConfig.enableLog, logLevel: uiConfig.logLevel } },
movement: { speed: asNum(uiConfig.speed) },
time: {
starting: asNumberOrUndefined(uiConfig.startup),
warmingup: asNumberOrUndefined(uiConfig.warmup),
stopping: asNumberOrUndefined(uiConfig.shutdown),
coolingdown: asNumberOrUndefined(uiConfig.cooldown)
}
};
const runtimeOptions = {
serviceType: uiConfig.serviceType,
fluidDensity: asNumberOrUndefined(uiConfig.fluidDensity),
fluidTemperatureK: asNumberOrUndefined(uiConfig.fluidTemperatureK),
gasChokedRatioLimit: asNumberOrUndefined(uiConfig.gasChokedRatioLimit),
starting: asNum(uiConfig.startup), warmingup: asNum(uiConfig.warmup),
stopping: asNum(uiConfig.shutdown), coolingdown: asNum(uiConfig.cooldown),
},
},
runtimeOptions: {
serviceType: uiConfig.serviceType,
fluidDensity: asNum(uiConfig.fluidDensity),
fluidTemperatureK: asNum(uiConfig.fluidTemperatureK),
gasChokedRatioLimit: asNum(uiConfig.gasChokedRatioLimit),
},
};
this.source = new Specific(vconfig, stateConfig, runtimeOptions);
//store in node
this.node.source = this.source; // Store the source in the node instance for easy access
}
/**
* Bind Measurement events to Node-RED status updates. Using internal emitter. --> REMOVE LATER WE NEED ONLY COMPLETE CHILDS AND THEN CHECK FOR UPDATES
*/
_bindEvents() {
}
_updateNodeStatus() {
const v = this.source;
try {
const mode = v.currentMode;
const state = v.state.getCurrentState();
const fluidCompatibility = typeof v.getFluidCompatibility === "function"
? v.getFluidCompatibility()
: null;
const fluidWarningText = (
fluidCompatibility
&& (fluidCompatibility.status === "mismatch" || fluidCompatibility.status === "conflict")
)
? fluidCompatibility.message
: "";
const flowUnit = v?.unitPolicy?.output?.flow || this.config.general.unit || "m3/h";
const pressureUnit = v?.unitPolicy?.output?.pressure || "mbar";
const flow = Math.round(v.measurements.type("flow").variant("predicted").position("downstream").getCurrentValue(flowUnit));
let deltaP = v.measurements.type("pressure").variant("predicted").position("delta").getCurrentValue(pressureUnit);
if (deltaP !== null) {
deltaP = parseFloat(deltaP.toFixed(0));
}
if(isNaN(deltaP)) {
deltaP = "∞";
}
const roundedPosition = Math.round(v.state.getCurrentPosition() * 100) / 100;
let symbolState;
switch(state){
case "off":
symbolState = "⬛";
break;
case "idle":
symbolState = "⏸️";
break;
case "operational":
symbolState = "⏵️";
break;
case "starting":
symbolState = "⏯️";
break;
case "warmingup":
symbolState = "🔄";
break;
case "accelerating":
symbolState = "⏩";
break;
case "stopping":
symbolState = "⏹️";
break;
case "coolingdown":
symbolState = "❄️";
break;
case "decelerating":
symbolState = "⏪";
break;
}
let status;
switch (state) {
case "off":
status = { fill: "red", shape: "dot", text: `${mode}: OFF` };
break;
case "idle":
status = { fill: "blue", shape: "dot", text: `${mode}: ${symbolState}` };
break;
case "operational":
status = { fill: "green", shape: "dot", text: `${mode}: ${symbolState} | ${roundedPosition}% | 💨${flow}${flowUnit} | ΔP${deltaP} ${pressureUnit}`};
break;
case "starting":
status = { fill: "yellow", shape: "dot", text: `${mode}: ${symbolState}` };
break;
case "warmingup":
status = { fill: "green", shape: "dot", text: `${mode}: ${symbolState} | ${roundedPosition}% | 💨${flow}${flowUnit} | ΔP${deltaP} ${pressureUnit}`};
break;
case "accelerating":
status = { fill: "yellow", shape: "dot", text: `${mode}: ${symbolState} | ${roundedPosition}% | 💨${flow}${flowUnit} | ΔP${deltaP} ${pressureUnit}` };
break;
case "stopping":
status = { fill: "yellow", shape: "dot", text: `${mode}: ${symbolState}` };
break;
case "coolingdown":
status = { fill: "yellow", shape: "dot", text: `${mode}: ${symbolState}` };
break;
case "decelerating":
status = { fill: "yellow", shape: "dot", text: `${mode}: ${symbolState} - ${roundedPosition}% | 💨${flow}${flowUnit} | ΔP${deltaP} ${pressureUnit}`};
break;
default:
status = { fill: "grey", shape: "dot", text: `${mode}: ${symbolState}` };
}
if (fluidWarningText) {
status = {
fill: "yellow",
shape: "ring",
text: `${status.text} | ⚠ ${fluidWarningText}`,
};
}
return status;
} catch (error) {
this.node.error("Error in updateNodeStatus: " + error.message);
return { fill: "red", shape: "ring", text: "Status Error" };
}
}
/**
* Register this node as a child upstream and downstream.
* Delayed to avoid Node-RED startup race conditions.
*/
_registerChild() {
setTimeout(() => {
this.node.send([
null,
null,
{ topic: 'registerChild', payload: this.node.id , positionVsParent: this.config?.functionality?.positionVsParent || 'atEquipment' },
]);
}, 100);
}
/**
* Start the periodic tick loop.
*/
_startTickLoop() {
setTimeout(() => {
this._tickInterval = setInterval(() => this._tick(), 1000);
// Update node status on nodered screen every second ( this is not the best way to do this, but it works for now)
this._statusInterval = setInterval(() => {
const status = this._updateNodeStatus();
this.node.status(status);
}, 1000);
}, 1000);
}
/**
* Execute a single tick: update measurement, format and send outputs.
*/
_tick() {
//this.source.tick();
const raw = this.source.getOutput();
const processMsg = this._output.formatMsg(raw, this.source.config, 'process');
const influxMsg = this._output.formatMsg(raw, this.source.config, 'influxdb');
// Send only updated outputs on ports 0 & 1
this.node.send([processMsg, influxMsg]);
}
/**
* Attach the node's input handler, routing control messages to the class.
*/
_attachInputHandler() {
this.node.on('input', (msg, send, done) => {
const v = this.source;
try {
switch(msg.topic) {
case 'registerChild': {
const childId = msg.payload;
const childObj = this.RED.nodes.getNode(childId);
if (!childObj || !childObj.source) {
v.logger.warn(`registerChild skipped: missing child/source for id=${childId}`);
break;
}
v.childRegistrationUtils.registerChild(childObj.source, msg.positionVsParent);
break;
}
case 'setMode':
v.setMode(msg.payload);
break;
case 'execSequence': {
const { source: seqSource, action: seqAction, parameter } = msg.payload;
v.handleInput(seqSource, seqAction, parameter);
break;
}
case 'execMovement': {
const { source: mvSource, action: mvAction, setpoint } = msg.payload;
v.handleInput(mvSource, mvAction, Number(setpoint));
break;
}
case 'emergencystop':
case 'emergencyStop': {
const payload = msg.payload || {};
const esSource = payload.source || 'parent';
v.handleInput(esSource, 'emergencystop');
break;
}
case 'showcurve':
send({ topic: 'Showing curve', payload: v.showCurve() });
break;
case 'updateFlow':
v.updateFlow(msg.payload.variant, msg.payload.value, msg.payload.position, msg.payload.unit || this.config.general.unit);
break;
default:
v.logger.warn(`Unknown topic: ${msg.topic}`);
}
} catch (error) {
v.logger.error(`Input handler failure: ${error.message}`);
}
if (typeof done === 'function') done();
});
}
/**
* Clean up timers and intervals when Node-RED stops the node.
*/
_attachCloseHandler() {
this.node.on('close', (done) => {
clearInterval(this._tickInterval);
clearInterval(this._statusInterval);
this.source?.destroy?.();
if (typeof done === 'function') done();
});
return { general: { unit: flowUnit }, asset: { unit: flowUnit } };
}
}
function _resolveUnit(candidate, expectedMeasure, fallback) {
const raw = typeof candidate === 'string' ? candidate.trim() : '';
const fb = String(fallback || '').trim();
if (!raw) return fb;
try {
const desc = convert().describe(raw);
if (expectedMeasure && desc.measure !== expectedMeasure) return fb;
return raw;
} catch (_) { return fb; }
}
module.exports = nodeClass;

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,17 @@
'use strict';
// Bind the underlying state machine's positionChange event to the host's
// updatePosition() hook. Returns an unbind function for clean teardown.
function bindStateEvents({ state, onPositionChange }) {
const handler = (data) => onPositionChange?.(data);
state.emitter.on('positionChange', handler);
return () => {
if (typeof state.emitter.off === 'function') state.emitter.off('positionChange', handler);
else if (typeof state.emitter.removeListener === 'function') {
state.emitter.removeListener('positionChange', handler);
}
};
}
module.exports = { bindStateEvents };