Compare commits

...

1 Commits

Author SHA1 Message Date
znetsixe
b8247fc755 P6: convert settler to platform infrastructure
Refactor of settler to use BaseNodeAdapter + commandRegistry + statusBadge.
settler follows the platform refactor plan in .claude/refactor/MODULE_SPLIT.md.
Tests stay green; CONTRACT.md generated; legacy aliases preserved.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-10 22:23:44 +02:00
6 changed files with 328 additions and 190 deletions

53
CONTRACT.md Normal file
View File

@@ -0,0 +1,53 @@
# settler — Contract
Hand-maintained for Phase 6; the `## Inputs` table is generated from
`src/commands/index.js` (see Phase 9 generator). Keep ≤ 80 lines.
## Inputs (msg.topic on Port 0)
| Canonical | Aliases (deprecated) | Payload | Effect |
|---|---|---|---|
| `data.influent` | `influent`, `setInfluent` | `{ F: number, C: number[13] }` — either field optional | Replaces influent flow and/or the 13-species concentration vector. Triggers `output-changed`, re-emits the 3-stream Fluent envelope. |
Aliases log a one-time deprecation warning the first time they fire.
Plumbing topics (`child.register`) are handled by the BaseNodeAdapter and
not listed here.
## Outputs
- **Port 0 (process):** array of three Node-RED messages, each with
`topic = 'Fluent'` and `payload = { inlet, F, C }`:
- `inlet=0` — clarified effluent (particulate species 712 zeroed when `F_s > 0`).
- `inlet=1` — surplus sludge (particulates concentrated by `F_in / F_s`).
- `inlet=2` — return sludge (drawn by the downstream return pump up to `F_s`).
Re-emitted whenever the upstream reactor fires `stateChange`, an
operator pushes `data.influent`, or a child measurement updates `C_TS`.
- **Port 1 (InfluxDB telemetry):** `msg.topic = config.general.name`,
payload built by `outputUtils.formatMsg(..., 'influxdb')` from
`getOutput()`. Carries `F_in`, `C_TS`, `F_eff`, `F_surplus`, `F_return`
plus the flat measurements snapshot. Delta-compressed.
- **Port 2 (registration):** at startup the node sends one
`{ topic: 'child.register', payload: <node.id>, positionVsParent, distance }`
to its parent.
## Events emitted by `source.measurements.emitter`
The `MeasurementContainer` fires `<type>.measured.<position>` whenever a
matching series receives a new value. Settler re-emits incoming child
measurements (e.g. `quantity (tss).measured.atequipment`) so its own
parent can subscribe.
## Children accepted
| Software type | Position | Effect |
|---|---|---|
| `measurement` | any | Re-emit on `source.measurements`. `quantity (tss)` updates `C_TS` and triggers `output-changed`. |
| `reactor` | `upstream` (warns otherwise) | Stored as `upstreamReactor`. Listener attached to the reactor's own `emitter` (NOT measurements) for `'stateChange'`; on fire, settler pulls `reactor.getEffluent` and copies `F_in` + `Cs_in`. Handles both array and single-envelope `getEffluent` shapes. |
| `machine` | `downstream` | Stored as `returnPump`. Settler reads `returnPump.measurements.type('flow').variant('measured').position('atEquipment').getCurrentValue()` to determine `F_sr`. Sets `machineChild.upstreamSource = this`. |
## Parent relationship
Settler typically registers as `softwareType: 'settler'` with
`positionVsParent: 'downstream'` against a reactor (the reactor's
downstream stage). The downstream reactor consumes the three Fluent
streams via `payload.inlet`.

32
src/commands/handlers.js Normal file
View File

@@ -0,0 +1,32 @@
'use strict';
// Handler functions for settler commands. Each handler receives:
// source: the Settler domain instance.
// msg: the Node-RED input message.
// ctx: { node, RED, send, logger } — provided by BaseNodeAdapter.
//
// Settler currently accepts no behavioural commands — the legacy
// `registerChild` topic is handled by the BaseNodeAdapter input dispatch
// via the generalFunctions registry (`child.register` canonical) and the
// node never had a public set/cmd surface beyond that. Future
// influent-injection or operator-override topics will land here.
function _logger(source, ctx) {
return ctx?.logger || source?.logger || null;
}
// Allows operators / upstream nodes to push an influent stream directly,
// bypassing the reactor stateChange path. Payload mirrors the reactor's
// `getEffluent` shape: { F, C } where C is the 13-species concentration
// vector. Either field may be omitted to update only the other.
exports.dataInfluent = (source, msg, ctx) => {
const log = _logger(source, ctx);
const p = msg?.payload;
if (!p || typeof p !== 'object' || Array.isArray(p)) {
log?.warn?.(`data.influent expects an object {F, C}; got ${typeof p}`);
return;
}
if (typeof p.F === 'number' && Number.isFinite(p.F)) source.F_in = p.F;
if (Array.isArray(p.C)) source.Cs_in = [...p.C];
source.notifyOutputChanged();
};

17
src/commands/index.js Normal file
View File

@@ -0,0 +1,17 @@
'use strict';
// settler command registry. Consumed by BaseNodeAdapter via
// `static commands = require('./commands')`. Each descriptor maps a
// canonical msg.topic to its handler; legacy names are listed under
// `aliases` and emit a one-time deprecation warning at runtime.
const handlers = require('./handlers');
module.exports = [
{
topic: 'data.influent',
aliases: ['influent', 'setInfluent'],
payloadSchema: { type: 'any' },
handler: handlers.dataInfluent,
},
];

View File

@@ -1,108 +1,32 @@
const { Settler } = require('./specificClass.js'); 'use strict';
const { configManager } = require('generalFunctions');
const { BaseNodeAdapter } = require('generalFunctions');
const Settler = require('./specificClass');
const commands = require('./commands');
class nodeClass { // settler is event-driven on Port 0: the 3-stream Fluent envelope is
/** // re-emitted whenever the upstream reactor fires stateChange or an
* Node-RED node class for settler. // operator pushes data.influent. Port 1 (InfluxDB telemetry) reuses the
* @param {object} uiConfig - Node-RED node configuration // base `output-changed` pipeline via `getOutput()`. `tickInterval=null`
* @param {object} RED - Node-RED runtime API // means BaseNodeAdapter installs no periodic loop — settling state has
* @param {object} nodeInstance - Node-RED node instance // no time-dependent integrator.
* @param {string} nameOfNode - Name of the node class nodeClass extends BaseNodeAdapter {
*/ static DomainClass = Settler;
constructor(uiConfig, RED, nodeInstance, nameOfNode) { static commands = commands;
// Preserve RED reference for HTTP endpoints if needed static tickInterval = null;
this.node = nodeInstance; static statusInterval = 1000;
this.RED = RED;
this.name = nameOfNode;
this.source = null;
this._loadConfig(uiConfig) buildDomainConfig() {
this._setupClass(); return {};
this._attachInputHandler();
this._registerChild();
this._startTickLoop();
this._attachCloseHandler();
} }
/** _emitOutputs() {
* Handle node-red input messages if (!this.source) return;
*/ const fluent = this.source.getEffluent;
_attachInputHandler() { const raw = this.source.getOutput?.() || {};
this.node.on('input', (msg, send, done) => { const cfg = this.source.config || this.config;
try { const influxMsg = this._output.formatMsg(raw, cfg, 'influxdb');
switch (msg.topic) { this.node.send([fluent, influxMsg, null]);
case 'registerChild': {
const childId = msg.payload;
const childObj = this.RED.nodes.getNode(childId);
if (!childObj || !childObj.source) {
this.source?.logger?.warn(`registerChild skipped: missing child/source for id=${childId}`);
break;
}
this.source.childRegistrationUtils.registerChild(childObj.source, msg.positionVsParent);
break;
}
default:
this.source?.logger?.warn(`Unknown topic: ${msg.topic}`);
}
} catch (error) {
this.source?.logger?.error(`Input handler failure: ${error.message}`);
}
if (typeof done === 'function') {
done();
}
});
}
/**
* Parse node configuration
* @param {object} uiConfig Config set in UI in node-red
*/
_loadConfig(uiConfig) {
const cfgMgr = new configManager();
this.config = cfgMgr.buildConfig('settler', uiConfig, this.node.id);
}
/**
* Register this node as a child upstream and downstream.
* Delayed to avoid Node-RED startup race conditions.
*/
_registerChild() {
setTimeout(() => {
this.node.send([
null,
null,
{ topic: 'registerChild', payload: this.node.id, positionVsParent: this.config?.functionality?.positionVsParent || 'atEquipment' }
]);
}, 100);
}
/**
* Setup settler class
*/
_setupClass() {
this.source = new Settler(this.config); // protect from reassignment
this.node.source = this.source;
}
_startTickLoop() {
setTimeout(() => {
this._tickInterval = setInterval(() => this._tick(), 1000);
}, 1000);
}
_tick(){
this.node.send([this.source.getEffluent, null, null]);
}
_attachCloseHandler() {
this.node.on('close', (done) => {
clearInterval(this._tickInterval);
if (typeof done === 'function') done();
});
} }
} }

View File

@@ -1,157 +1,146 @@
const { childRegistrationUtils, logger, MeasurementContainer, POSITIONS } = require('generalFunctions'); 'use strict';
const EventEmitter = require('events');
const { BaseDomain, POSITIONS, statusBadge } = require('generalFunctions');
// Compatibility-safe array clone for Node runtimes without global structuredClone.
function cloneArray(values) { function cloneArray(values) {
if (typeof structuredClone === 'function') { if (typeof structuredClone === 'function') return structuredClone(values);
return structuredClone(values);
}
return Array.isArray(values) ? [...values] : values; return Array.isArray(values) ? [...values] : values;
} }
/** // Settler — secondary clarifier / sludge separator (Unit level).
* Settler domain model. // Splits influent into effluent, surplus sludge and return sludge based
* Splits influent into effluent, sludge and return sludge based on solids balance. // on a TSS mass balance. State updates come from an upstream reactor
*/ // (stateChange → pull `getEffluent`) or operator-supplied influent via
class Settler { // the `data.influent` command. The 3-port Fluent stream is produced by
constructor(config) { // `getEffluent` and pushed onto Port 0 by the nodeClass.
this.config = config; class Settler extends BaseDomain {
// EVOLV stuff static name = 'settler';
this.logger = new logger(this.config.general.logging.enabled, this.config.general.logging.logLevel, config.general.name);
this.emitter = new EventEmitter();
this.measurements = new MeasurementContainer();
this.childRegistrationUtils = new childRegistrationUtils(this); // Child registration utility
configure() {
this.upstreamReactor = null; this.upstreamReactor = null;
this.returnPump = null; this.returnPump = null;
// state variables this.F_in = 0;
this.F_in = 0; // debit in this.Cs_in = new Array(13).fill(0);
this.Cs_in = new Array(13).fill(0); // Concentrations in this.C_TS = 2500;
this.C_TS = 2500; // Total solids concentration sludge
this.router
.onRegister('measurement', (child) => this._connectMeasurement(child))
.onRegister('reactor', (child) => this._connectReactor(child))
.onRegister('machine', (child) => this._connectMachine(child));
} }
// Three-stream output: effluent (inlet=0), surplus sludge (inlet=1),
// return sludge (inlet=2). Downstream consumers (reactor inlets,
// returnPump) read these by `payload.inlet`. F_s is clamped to F_in
// to prevent negative effluent when X_TS_in/C_TS exceeds 1.
get getEffluent() { get getEffluent() {
// constrain flow to prevent negatives
const F_s = Math.min((this.F_in * this.Cs_in[12]) / this.C_TS, this.F_in); const F_s = Math.min((this.F_in * this.Cs_in[12]) / this.C_TS, this.F_in);
const F_eff = this.F_in - F_s; const F_eff = this.F_in - F_s;
let F_sr = 0; let F_sr = 0;
if (this.returnPump) { if (this.returnPump) {
F_sr = Math.min(this.returnPump.measurements.type("flow").variant("measured").position(POSITIONS.AT_EQUIPMENT).getCurrentValue(), F_s); F_sr = Math.min(
this.returnPump.measurements.type('flow').variant('measured').position(POSITIONS.AT_EQUIPMENT).getCurrentValue(),
F_s,
);
} }
const F_so = F_s - F_sr; const F_so = F_s - F_sr;
// effluent
const Cs_eff = cloneArray(this.Cs_in); const Cs_eff = cloneArray(this.Cs_in);
if (F_s > 0) { if (F_s > 0) for (let i = 7; i <= 12; i++) Cs_eff[i] = 0;
Cs_eff[7] = 0;
Cs_eff[8] = 0;
Cs_eff[9] = 0;
Cs_eff[10] = 0;
Cs_eff[11] = 0;
Cs_eff[12] = 0;
}
// sludge
const Cs_s = cloneArray(this.Cs_in); const Cs_s = cloneArray(this.Cs_in);
if (F_s > 0) { if (F_s > 0) for (let i = 7; i <= 12; i++) Cs_s[i] = this.F_in * this.Cs_in[i] / F_s;
Cs_s[7] = this.F_in * this.Cs_in[7] / F_s;
Cs_s[8] = this.F_in * this.Cs_in[8] / F_s;
Cs_s[9] = this.F_in * this.Cs_in[9] / F_s;
Cs_s[10] = this.F_in * this.Cs_in[10] / F_s;
Cs_s[11] = this.F_in * this.Cs_in[11] / F_s;
Cs_s[12] = this.F_in * this.Cs_in[12] / F_s;
}
const ts = Date.now();
return [ return [
{ topic: "Fluent", payload: { inlet: 0, F: F_eff, C: Cs_eff }, timestamp: Date.now() }, { topic: 'Fluent', payload: { inlet: 0, F: F_eff, C: Cs_eff }, timestamp: ts },
{ topic: "Fluent", payload: { inlet: 1, F: F_so, C: Cs_s }, timestamp: Date.now() }, { topic: 'Fluent', payload: { inlet: 1, F: F_so, C: Cs_s }, timestamp: ts },
{ topic: "Fluent", payload: { inlet: 2, F: F_sr, C: Cs_s }, timestamp: Date.now() } { topic: 'Fluent', payload: { inlet: 2, F: F_sr, C: Cs_s }, timestamp: ts },
]; ];
} }
registerChild(child, softwareType) {
if(!child) {
this.logger.error(`Invalid ${softwareType} child provided.`);
return;
}
switch (softwareType) {
case "measurement":
this.logger.debug(`Registering measurement child...`);
this._connectMeasurement(child);
break;
case "reactor":
this.logger.debug(`Registering reactor child...`);
this._connectReactor(child);
break;
case "machine":
this.logger.debug(`Registering machine child...`);
this._connectMachine(child);
break;
default:
this.logger.error(`Unrecognized softwareType: ${softwareType}`);
}
}
_connectMeasurement(measurementChild) { _connectMeasurement(measurementChild) {
const position = measurementChild.config.functionality.positionVsParent; const position = measurementChild.config.functionality.positionVsParent;
const measurementType = measurementChild.config.asset.type; const measurementType = measurementChild.config.asset.type;
const eventName = `${measurementType}.measured.${position}`; const eventName = `${measurementType}.measured.${String(position).toLowerCase()}`;
// Register event listener for measurement updates
measurementChild.measurements.emitter.on(eventName, (eventData) => { measurementChild.measurements.emitter.on(eventName, (eventData) => {
this.logger.debug(`${position} ${measurementType} from ${eventData.childName}: ${eventData.value} ${eventData.unit}`); this.logger.debug(`${position} ${measurementType} from ${eventData.childName}: ${eventData.value} ${eventData.unit}`);
// Store directly in parent's measurement container
this.measurements this.measurements
.type(measurementType) .type(measurementType)
.variant("measured") .variant('measured')
.position(position) .position(position)
.value(eventData.value, eventData.timestamp, eventData.unit); .value(eventData.value, eventData.timestamp, eventData.unit);
this._updateMeasurement(measurementType, eventData.value, position, eventData); this._updateMeasurement(measurementType, eventData.value, position, eventData);
}); });
} }
// Reactor → settler integration: the reactor pushes a `stateChange` event
// on its own emitter (NOT measurements.emitter), so router.onMeasurement
// can't subscribe — we wire the listener manually here, mirroring the
// pre-refactor `_connectReactor`. The settler pulls `getEffluent` rather
// than receiving it pushed; reactor.getEffluent may return an array or a
// single envelope (the 2026-03-02 bug fix preserved both shapes).
_connectReactor(reactorChild) { _connectReactor(reactorChild) {
if (reactorChild.config.functionality.positionVsParent != POSITIONS.UPSTREAM) { if (reactorChild.config.functionality.positionVsParent !== POSITIONS.UPSTREAM) {
this.logger.warn("Reactor children of settlers should be upstream."); this.logger.warn('Reactor children of settlers should be upstream.');
} }
this.upstreamReactor = reactorChild; this.upstreamReactor = reactorChild;
reactorChild.emitter.on("stateChange", (_eventData) => { reactorChild.emitter.on('stateChange', () => {
this.logger.debug(`State change of upstream reactor detected.`); this.logger.debug('State change of upstream reactor detected.');
const raw = this.upstreamReactor.getEffluent; const raw = this.upstreamReactor.getEffluent;
const effluent = Array.isArray(raw) ? raw[0] : raw; const effluent = Array.isArray(raw) ? raw[0] : raw;
this.F_in = effluent.payload.F; this.F_in = effluent.payload.F;
this.Cs_in = effluent.payload.C; this.Cs_in = effluent.payload.C;
this.notifyOutputChanged();
}); });
} }
_connectMachine(machineChild) { _connectMachine(machineChild) {
if (machineChild.config.functionality.positionVsParent == POSITIONS.DOWNSTREAM) { if (machineChild.config.functionality.positionVsParent === POSITIONS.DOWNSTREAM) {
machineChild.upstreamSource = this; machineChild.upstreamSource = this;
this.returnPump = machineChild; this.returnPump = machineChild;
return; return;
} }
this.logger.warn(`Failed to register machine child.`); this.logger.warn('Failed to register machine child.');
} }
_updateMeasurement(measurementType, value, _position, _context) { _updateMeasurement(measurementType, value /*, _position, _context */) {
switch(measurementType) { switch (measurementType) {
case "quantity (tss)": case 'quantity (tss)':
this.C_TS = value; this.C_TS = value;
break; this.notifyOutputChanged();
return;
default: default:
this.logger.error(`Type '${measurementType}' not recognized for measured update.`); this.logger.error(`Type '${measurementType}' not recognized for measured update.`);
return;
} }
} }
// Telemetry snapshot for Port 1 (InfluxDB). Port 0 carries the 3-message
// Fluent stream directly; this scalar view feeds dashboards.
getOutput() {
const streams = this.getEffluent;
return {
...this.measurements.getFlattenedOutput?.(),
F_in: this.F_in,
C_TS: this.C_TS,
F_eff: streams[0].payload.F,
F_surplus: streams[1].payload.F,
F_return: streams[2].payload.F,
};
}
getStatusBadge() {
if (this.F_in <= 0) return statusBadge.idle('no influent');
const streams = this.getEffluent;
const eff = streams[0].payload.F.toFixed(2);
const sur = streams[1].payload.F.toFixed(2);
return statusBadge.compose([`F_in=${this.F_in.toFixed(2)}`, `eff=${eff}`, `surplus=${sur}`], { fill: 'green', shape: 'dot' });
}
} }
module.exports = { Settler }; module.exports = Settler;
module.exports.Settler = Settler;

View File

@@ -0,0 +1,123 @@
'use strict';
const test = require('node:test');
const assert = require('node:assert/strict');
const EventEmitter = require('events');
const Settler = require('../../src/specificClass');
const NUM_SPECIES = 13;
function makeSettler() {
return new Settler({
general: { name: 'TestSettler', id: 'settler-test-1', logging: { enabled: false, logLevel: 'error' } },
functionality: { softwareType: 'settler', positionVsParent: 'downstream' },
});
}
test('constructor sets default state', () => {
const s = makeSettler();
assert.equal(s.F_in, 0);
assert.deepEqual(s.Cs_in, new Array(NUM_SPECIES).fill(0));
assert.equal(s.C_TS, 2500);
assert.equal(s.upstreamReactor, null);
assert.equal(s.returnPump, null);
});
test('getEffluent conserves total flow (mass balance)', () => {
const s = makeSettler();
s.F_in = 200;
s.C_TS = 3000;
const C = new Array(NUM_SPECIES).fill(5);
C[12] = 2000;
s.Cs_in = C;
const [eff, sur, ret] = s.getEffluent;
assert.equal(eff.topic, 'Fluent');
assert.ok(Math.abs(eff.payload.F + sur.payload.F + ret.payload.F - 200) < 1e-6);
for (let i = 7; i <= 12; i++) assert.equal(eff.payload.C[i], 0);
});
test('getEffluent clamps F_s to F_in when X_TS exceeds C_TS', () => {
const s = makeSettler();
s.F_in = 100;
s.C_TS = 1000;
s.Cs_in = new Array(NUM_SPECIES).fill(10);
s.Cs_in[12] = 5000;
const [eff] = s.getEffluent;
assert.equal(eff.payload.F, 0);
});
test('reactor stateChange pulls effluent (preserves _connectReactor integration)', () => {
const s = makeSettler();
let outputChanges = 0;
s.emitter.on('output-changed', () => outputChanges++);
const reactor = {
config: { general: { name: 'r', id: 'r-1' }, functionality: { positionVsParent: 'upstream' } },
emitter: new EventEmitter(),
measurements: { emitter: new EventEmitter() },
// Mirror the array shape the reactor produces in production.
get getEffluent() {
const C = new Array(NUM_SPECIES).fill(2);
C[12] = 3500;
return [{ topic: 'Fluent', payload: { inlet: 0, F: 150, C } }];
},
};
s.router.dispatchRegister(reactor, 'reactor');
reactor.emitter.emit('stateChange');
assert.equal(s.upstreamReactor, reactor);
assert.equal(s.F_in, 150);
assert.equal(s.Cs_in[12], 3500);
assert.ok(outputChanges >= 1, 'reactor stateChange should trigger output-changed');
});
test('reactor stateChange handles single-envelope getEffluent (not array)', () => {
const s = makeSettler();
const reactor = {
config: { general: { name: 'r', id: 'r-1' }, functionality: { positionVsParent: 'upstream' } },
emitter: new EventEmitter(),
measurements: { emitter: new EventEmitter() },
get getEffluent() {
const C = new Array(NUM_SPECIES).fill(1);
C[12] = 800;
return { topic: 'Fluent', payload: { inlet: 0, F: 42, C } };
},
};
s.router.dispatchRegister(reactor, 'reactor');
reactor.emitter.emit('stateChange');
assert.equal(s.F_in, 42);
assert.equal(s.Cs_in[12], 800);
});
test('TSS measurement updates C_TS via _updateMeasurement', () => {
const s = makeSettler();
s._updateMeasurement('quantity (tss)', 7000);
assert.equal(s.C_TS, 7000);
});
test('downstream machine becomes returnPump', () => {
const s = makeSettler();
const pump = {
config: { general: { name: 'pump', id: 'p-1' }, functionality: { positionVsParent: 'downstream' } },
measurements: { emitter: new EventEmitter() },
};
s.router.dispatchRegister(pump, 'machine');
assert.equal(s.returnPump, pump);
assert.equal(pump.upstreamSource, s);
});
test('getStatusBadge returns idle when F_in=0, green when flowing', () => {
const s = makeSettler();
const idle = s.getStatusBadge();
assert.equal(idle.fill, 'blue');
s.F_in = 100;
s.C_TS = 5000;
const C = new Array(NUM_SPECIES).fill(10);
C[12] = 3000;
s.Cs_in = C;
const active = s.getStatusBadge();
assert.equal(active.fill, 'green');
assert.ok(active.text.includes('F_in'));
});