diff --git a/CONTRACT.md b/CONTRACT.md new file mode 100644 index 0000000..56f4880 --- /dev/null +++ b/CONTRACT.md @@ -0,0 +1,53 @@ +# settler — Contract + +Hand-maintained for Phase 6; the `## Inputs` table is generated from +`src/commands/index.js` (see Phase 9 generator). Keep ≤ 80 lines. + +## Inputs (msg.topic on Port 0) + +| Canonical | Aliases (deprecated) | Payload | Effect | +|---|---|---|---| +| `data.influent` | `influent`, `setInfluent` | `{ F: number, C: number[13] }` — either field optional | Replaces influent flow and/or the 13-species concentration vector. Triggers `output-changed`, re-emits the 3-stream Fluent envelope. | + +Aliases log a one-time deprecation warning the first time they fire. +Plumbing topics (`child.register`) are handled by the BaseNodeAdapter and +not listed here. + +## Outputs + +- **Port 0 (process):** array of three Node-RED messages, each with + `topic = 'Fluent'` and `payload = { inlet, F, C }`: + - `inlet=0` — clarified effluent (particulate species 7–12 zeroed when `F_s > 0`). + - `inlet=1` — surplus sludge (particulates concentrated by `F_in / F_s`). + - `inlet=2` — return sludge (drawn by the downstream return pump up to `F_s`). + Re-emitted whenever the upstream reactor fires `stateChange`, an + operator pushes `data.influent`, or a child measurement updates `C_TS`. +- **Port 1 (InfluxDB telemetry):** `msg.topic = config.general.name`, + payload built by `outputUtils.formatMsg(..., 'influxdb')` from + `getOutput()`. Carries `F_in`, `C_TS`, `F_eff`, `F_surplus`, `F_return` + plus the flat measurements snapshot. Delta-compressed. +- **Port 2 (registration):** at startup the node sends one + `{ topic: 'child.register', payload: , positionVsParent, distance }` + to its parent. + +## Events emitted by `source.measurements.emitter` + +The `MeasurementContainer` fires `.measured.` whenever a +matching series receives a new value. Settler re-emits incoming child +measurements (e.g. `quantity (tss).measured.atequipment`) so its own +parent can subscribe. + +## Children accepted + +| Software type | Position | Effect | +|---|---|---| +| `measurement` | any | Re-emit on `source.measurements`. `quantity (tss)` updates `C_TS` and triggers `output-changed`. | +| `reactor` | `upstream` (warns otherwise) | Stored as `upstreamReactor`. Listener attached to the reactor's own `emitter` (NOT measurements) for `'stateChange'`; on fire, settler pulls `reactor.getEffluent` and copies `F_in` + `Cs_in`. Handles both array and single-envelope `getEffluent` shapes. | +| `machine` | `downstream` | Stored as `returnPump`. Settler reads `returnPump.measurements.type('flow').variant('measured').position('atEquipment').getCurrentValue()` to determine `F_sr`. Sets `machineChild.upstreamSource = this`. | + +## Parent relationship + +Settler typically registers as `softwareType: 'settler'` with +`positionVsParent: 'downstream'` against a reactor (the reactor's +downstream stage). The downstream reactor consumes the three Fluent +streams via `payload.inlet`. diff --git a/src/commands/handlers.js b/src/commands/handlers.js new file mode 100644 index 0000000..4386744 --- /dev/null +++ b/src/commands/handlers.js @@ -0,0 +1,32 @@ +'use strict'; + +// Handler functions for settler commands. Each handler receives: +// source: the Settler domain instance. +// msg: the Node-RED input message. +// ctx: { node, RED, send, logger } — provided by BaseNodeAdapter. +// +// Settler currently accepts no behavioural commands — the legacy +// `registerChild` topic is handled by the BaseNodeAdapter input dispatch +// via the generalFunctions registry (`child.register` canonical) and the +// node never had a public set/cmd surface beyond that. Future +// influent-injection or operator-override topics will land here. + +function _logger(source, ctx) { + return ctx?.logger || source?.logger || null; +} + +// Allows operators / upstream nodes to push an influent stream directly, +// bypassing the reactor stateChange path. Payload mirrors the reactor's +// `getEffluent` shape: { F, C } where C is the 13-species concentration +// vector. Either field may be omitted to update only the other. +exports.dataInfluent = (source, msg, ctx) => { + const log = _logger(source, ctx); + const p = msg?.payload; + if (!p || typeof p !== 'object' || Array.isArray(p)) { + log?.warn?.(`data.influent expects an object {F, C}; got ${typeof p}`); + return; + } + if (typeof p.F === 'number' && Number.isFinite(p.F)) source.F_in = p.F; + if (Array.isArray(p.C)) source.Cs_in = [...p.C]; + source.notifyOutputChanged(); +}; diff --git a/src/commands/index.js b/src/commands/index.js new file mode 100644 index 0000000..6a43d8b --- /dev/null +++ b/src/commands/index.js @@ -0,0 +1,17 @@ +'use strict'; + +// settler command registry. Consumed by BaseNodeAdapter via +// `static commands = require('./commands')`. Each descriptor maps a +// canonical msg.topic to its handler; legacy names are listed under +// `aliases` and emit a one-time deprecation warning at runtime. + +const handlers = require('./handlers'); + +module.exports = [ + { + topic: 'data.influent', + aliases: ['influent', 'setInfluent'], + payloadSchema: { type: 'any' }, + handler: handlers.dataInfluent, + }, +]; diff --git a/src/nodeClass.js b/src/nodeClass.js index c1c3526..2956333 100644 --- a/src/nodeClass.js +++ b/src/nodeClass.js @@ -1,108 +1,32 @@ -const { Settler } = require('./specificClass.js'); -const { configManager } = require('generalFunctions'); +'use strict'; +const { BaseNodeAdapter } = require('generalFunctions'); +const Settler = require('./specificClass'); +const commands = require('./commands'); -class nodeClass { - /** - * Node-RED node class for settler. - * @param {object} uiConfig - Node-RED node configuration - * @param {object} RED - Node-RED runtime API - * @param {object} nodeInstance - Node-RED node instance - * @param {string} nameOfNode - Name of the node - */ - constructor(uiConfig, RED, nodeInstance, nameOfNode) { - // Preserve RED reference for HTTP endpoints if needed - this.node = nodeInstance; - this.RED = RED; - this.name = nameOfNode; - this.source = null; +// settler is event-driven on Port 0: the 3-stream Fluent envelope is +// re-emitted whenever the upstream reactor fires stateChange or an +// operator pushes data.influent. Port 1 (InfluxDB telemetry) reuses the +// base `output-changed` pipeline via `getOutput()`. `tickInterval=null` +// means BaseNodeAdapter installs no periodic loop — settling state has +// no time-dependent integrator. +class nodeClass extends BaseNodeAdapter { + static DomainClass = Settler; + static commands = commands; + static tickInterval = null; + static statusInterval = 1000; - this._loadConfig(uiConfig) - this._setupClass(); - - this._attachInputHandler(); - this._registerChild(); - this._startTickLoop(); - this._attachCloseHandler(); + buildDomainConfig() { + return {}; } - /** - * Handle node-red input messages - */ - _attachInputHandler() { - this.node.on('input', (msg, send, done) => { - try { - switch (msg.topic) { - case 'registerChild': { - const childId = msg.payload; - const childObj = this.RED.nodes.getNode(childId); - if (!childObj || !childObj.source) { - this.source?.logger?.warn(`registerChild skipped: missing child/source for id=${childId}`); - break; - } - this.source.childRegistrationUtils.registerChild(childObj.source, msg.positionVsParent); - break; - } - default: - this.source?.logger?.warn(`Unknown topic: ${msg.topic}`); - } - } catch (error) { - this.source?.logger?.error(`Input handler failure: ${error.message}`); - } - - if (typeof done === 'function') { - done(); - } - }); - } - - /** - * Parse node configuration - * @param {object} uiConfig Config set in UI in node-red - */ - _loadConfig(uiConfig) { - const cfgMgr = new configManager(); - this.config = cfgMgr.buildConfig('settler', uiConfig, this.node.id); - } - - /** - * Register this node as a child upstream and downstream. - * Delayed to avoid Node-RED startup race conditions. - */ - _registerChild() { - setTimeout(() => { - this.node.send([ - null, - null, - { topic: 'registerChild', payload: this.node.id, positionVsParent: this.config?.functionality?.positionVsParent || 'atEquipment' } - ]); - }, 100); - } - - /** - * Setup settler class - */ - _setupClass() { - - this.source = new Settler(this.config); // protect from reassignment - this.node.source = this.source; - } - - _startTickLoop() { - setTimeout(() => { - this._tickInterval = setInterval(() => this._tick(), 1000); - }, 1000); - } - - _tick(){ - this.node.send([this.source.getEffluent, null, null]); - } - - _attachCloseHandler() { - this.node.on('close', (done) => { - clearInterval(this._tickInterval); - if (typeof done === 'function') done(); - }); + _emitOutputs() { + if (!this.source) return; + const fluent = this.source.getEffluent; + const raw = this.source.getOutput?.() || {}; + const cfg = this.source.config || this.config; + const influxMsg = this._output.formatMsg(raw, cfg, 'influxdb'); + this.node.send([fluent, influxMsg, null]); } } diff --git a/src/specificClass.js b/src/specificClass.js index 6886946..bc8503b 100644 --- a/src/specificClass.js +++ b/src/specificClass.js @@ -1,157 +1,146 @@ -const { childRegistrationUtils, logger, MeasurementContainer, POSITIONS } = require('generalFunctions'); -const EventEmitter = require('events'); +'use strict'; + +const { BaseDomain, POSITIONS, statusBadge } = require('generalFunctions'); -// Compatibility-safe array clone for Node runtimes without global structuredClone. function cloneArray(values) { - if (typeof structuredClone === 'function') { - return structuredClone(values); - } + if (typeof structuredClone === 'function') return structuredClone(values); return Array.isArray(values) ? [...values] : values; } -/** - * Settler domain model. - * Splits influent into effluent, sludge and return sludge based on solids balance. - */ -class Settler { - constructor(config) { - this.config = config; - // EVOLV stuff - this.logger = new logger(this.config.general.logging.enabled, this.config.general.logging.logLevel, config.general.name); - this.emitter = new EventEmitter(); - this.measurements = new MeasurementContainer(); - this.childRegistrationUtils = new childRegistrationUtils(this); // Child registration utility +// Settler — secondary clarifier / sludge separator (Unit level). +// Splits influent into effluent, surplus sludge and return sludge based +// on a TSS mass balance. State updates come from an upstream reactor +// (stateChange → pull `getEffluent`) or operator-supplied influent via +// the `data.influent` command. The 3-port Fluent stream is produced by +// `getEffluent` and pushed onto Port 0 by the nodeClass. +class Settler extends BaseDomain { + static name = 'settler'; + configure() { this.upstreamReactor = null; this.returnPump = null; - // state variables - this.F_in = 0; // debit in - this.Cs_in = new Array(13).fill(0); // Concentrations in - this.C_TS = 2500; // Total solids concentration sludge + this.F_in = 0; + this.Cs_in = new Array(13).fill(0); + this.C_TS = 2500; + + this.router + .onRegister('measurement', (child) => this._connectMeasurement(child)) + .onRegister('reactor', (child) => this._connectReactor(child)) + .onRegister('machine', (child) => this._connectMachine(child)); } + // Three-stream output: effluent (inlet=0), surplus sludge (inlet=1), + // return sludge (inlet=2). Downstream consumers (reactor inlets, + // returnPump) read these by `payload.inlet`. F_s is clamped to F_in + // to prevent negative effluent when X_TS_in/C_TS exceeds 1. get getEffluent() { - // constrain flow to prevent negatives const F_s = Math.min((this.F_in * this.Cs_in[12]) / this.C_TS, this.F_in); const F_eff = this.F_in - F_s; let F_sr = 0; if (this.returnPump) { - F_sr = Math.min(this.returnPump.measurements.type("flow").variant("measured").position(POSITIONS.AT_EQUIPMENT).getCurrentValue(), F_s); + F_sr = Math.min( + this.returnPump.measurements.type('flow').variant('measured').position(POSITIONS.AT_EQUIPMENT).getCurrentValue(), + F_s, + ); } const F_so = F_s - F_sr; - // effluent const Cs_eff = cloneArray(this.Cs_in); - if (F_s > 0) { - Cs_eff[7] = 0; - Cs_eff[8] = 0; - Cs_eff[9] = 0; - Cs_eff[10] = 0; - Cs_eff[11] = 0; - Cs_eff[12] = 0; - } + if (F_s > 0) for (let i = 7; i <= 12; i++) Cs_eff[i] = 0; - // sludge const Cs_s = cloneArray(this.Cs_in); - if (F_s > 0) { - Cs_s[7] = this.F_in * this.Cs_in[7] / F_s; - Cs_s[8] = this.F_in * this.Cs_in[8] / F_s; - Cs_s[9] = this.F_in * this.Cs_in[9] / F_s; - Cs_s[10] = this.F_in * this.Cs_in[10] / F_s; - Cs_s[11] = this.F_in * this.Cs_in[11] / F_s; - Cs_s[12] = this.F_in * this.Cs_in[12] / F_s; - } + if (F_s > 0) for (let i = 7; i <= 12; i++) Cs_s[i] = this.F_in * this.Cs_in[i] / F_s; + const ts = Date.now(); return [ - { topic: "Fluent", payload: { inlet: 0, F: F_eff, C: Cs_eff }, timestamp: Date.now() }, - { topic: "Fluent", payload: { inlet: 1, F: F_so, C: Cs_s }, timestamp: Date.now() }, - { topic: "Fluent", payload: { inlet: 2, F: F_sr, C: Cs_s }, timestamp: Date.now() } + { topic: 'Fluent', payload: { inlet: 0, F: F_eff, C: Cs_eff }, timestamp: ts }, + { topic: 'Fluent', payload: { inlet: 1, F: F_so, C: Cs_s }, timestamp: ts }, + { topic: 'Fluent', payload: { inlet: 2, F: F_sr, C: Cs_s }, timestamp: ts }, ]; } - registerChild(child, softwareType) { - if(!child) { - this.logger.error(`Invalid ${softwareType} child provided.`); - return; - } - - switch (softwareType) { - case "measurement": - this.logger.debug(`Registering measurement child...`); - this._connectMeasurement(child); - break; - case "reactor": - this.logger.debug(`Registering reactor child...`); - this._connectReactor(child); - break; - case "machine": - this.logger.debug(`Registering machine child...`); - this._connectMachine(child); - break; - - default: - this.logger.error(`Unrecognized softwareType: ${softwareType}`); - } - } - _connectMeasurement(measurementChild) { const position = measurementChild.config.functionality.positionVsParent; const measurementType = measurementChild.config.asset.type; - const eventName = `${measurementType}.measured.${position}`; + const eventName = `${measurementType}.measured.${String(position).toLowerCase()}`; - // Register event listener for measurement updates measurementChild.measurements.emitter.on(eventName, (eventData) => { this.logger.debug(`${position} ${measurementType} from ${eventData.childName}: ${eventData.value} ${eventData.unit}`); - - // Store directly in parent's measurement container this.measurements .type(measurementType) - .variant("measured") + .variant('measured') .position(position) .value(eventData.value, eventData.timestamp, eventData.unit); - this._updateMeasurement(measurementType, eventData.value, position, eventData); }); } + // Reactor → settler integration: the reactor pushes a `stateChange` event + // on its own emitter (NOT measurements.emitter), so router.onMeasurement + // can't subscribe — we wire the listener manually here, mirroring the + // pre-refactor `_connectReactor`. The settler pulls `getEffluent` rather + // than receiving it pushed; reactor.getEffluent may return an array or a + // single envelope (the 2026-03-02 bug fix preserved both shapes). _connectReactor(reactorChild) { - if (reactorChild.config.functionality.positionVsParent != POSITIONS.UPSTREAM) { - this.logger.warn("Reactor children of settlers should be upstream."); + if (reactorChild.config.functionality.positionVsParent !== POSITIONS.UPSTREAM) { + this.logger.warn('Reactor children of settlers should be upstream.'); } - this.upstreamReactor = reactorChild; - reactorChild.emitter.on("stateChange", (_eventData) => { - this.logger.debug(`State change of upstream reactor detected.`); + reactorChild.emitter.on('stateChange', () => { + this.logger.debug('State change of upstream reactor detected.'); const raw = this.upstreamReactor.getEffluent; const effluent = Array.isArray(raw) ? raw[0] : raw; this.F_in = effluent.payload.F; this.Cs_in = effluent.payload.C; + this.notifyOutputChanged(); }); } _connectMachine(machineChild) { - if (machineChild.config.functionality.positionVsParent == POSITIONS.DOWNSTREAM) { + if (machineChild.config.functionality.positionVsParent === POSITIONS.DOWNSTREAM) { machineChild.upstreamSource = this; this.returnPump = machineChild; return; } - this.logger.warn(`Failed to register machine child.`); + this.logger.warn('Failed to register machine child.'); } - _updateMeasurement(measurementType, value, _position, _context) { - switch(measurementType) { - case "quantity (tss)": + _updateMeasurement(measurementType, value /*, _position, _context */) { + switch (measurementType) { + case 'quantity (tss)': this.C_TS = value; - break; - + this.notifyOutputChanged(); + return; default: this.logger.error(`Type '${measurementType}' not recognized for measured update.`); - return; } } + + // Telemetry snapshot for Port 1 (InfluxDB). Port 0 carries the 3-message + // Fluent stream directly; this scalar view feeds dashboards. + getOutput() { + const streams = this.getEffluent; + return { + ...this.measurements.getFlattenedOutput?.(), + F_in: this.F_in, + C_TS: this.C_TS, + F_eff: streams[0].payload.F, + F_surplus: streams[1].payload.F, + F_return: streams[2].payload.F, + }; + } + + getStatusBadge() { + if (this.F_in <= 0) return statusBadge.idle('no influent'); + const streams = this.getEffluent; + const eff = streams[0].payload.F.toFixed(2); + const sur = streams[1].payload.F.toFixed(2); + return statusBadge.compose([`F_in=${this.F_in.toFixed(2)}`, `eff=${eff}`, `surplus=${sur}`], { fill: 'green', shape: 'dot' }); + } } -module.exports = { Settler }; +module.exports = Settler; +module.exports.Settler = Settler; diff --git a/test/basic/specificClass.basic.test.js b/test/basic/specificClass.basic.test.js new file mode 100644 index 0000000..0146ff9 --- /dev/null +++ b/test/basic/specificClass.basic.test.js @@ -0,0 +1,123 @@ +'use strict'; + +const test = require('node:test'); +const assert = require('node:assert/strict'); +const EventEmitter = require('events'); +const Settler = require('../../src/specificClass'); + +const NUM_SPECIES = 13; + +function makeSettler() { + return new Settler({ + general: { name: 'TestSettler', id: 'settler-test-1', logging: { enabled: false, logLevel: 'error' } }, + functionality: { softwareType: 'settler', positionVsParent: 'downstream' }, + }); +} + +test('constructor sets default state', () => { + const s = makeSettler(); + assert.equal(s.F_in, 0); + assert.deepEqual(s.Cs_in, new Array(NUM_SPECIES).fill(0)); + assert.equal(s.C_TS, 2500); + assert.equal(s.upstreamReactor, null); + assert.equal(s.returnPump, null); +}); + +test('getEffluent conserves total flow (mass balance)', () => { + const s = makeSettler(); + s.F_in = 200; + s.C_TS = 3000; + const C = new Array(NUM_SPECIES).fill(5); + C[12] = 2000; + s.Cs_in = C; + const [eff, sur, ret] = s.getEffluent; + assert.equal(eff.topic, 'Fluent'); + assert.ok(Math.abs(eff.payload.F + sur.payload.F + ret.payload.F - 200) < 1e-6); + for (let i = 7; i <= 12; i++) assert.equal(eff.payload.C[i], 0); +}); + +test('getEffluent clamps F_s to F_in when X_TS exceeds C_TS', () => { + const s = makeSettler(); + s.F_in = 100; + s.C_TS = 1000; + s.Cs_in = new Array(NUM_SPECIES).fill(10); + s.Cs_in[12] = 5000; + const [eff] = s.getEffluent; + assert.equal(eff.payload.F, 0); +}); + +test('reactor stateChange pulls effluent (preserves _connectReactor integration)', () => { + const s = makeSettler(); + let outputChanges = 0; + s.emitter.on('output-changed', () => outputChanges++); + + const reactor = { + config: { general: { name: 'r', id: 'r-1' }, functionality: { positionVsParent: 'upstream' } }, + emitter: new EventEmitter(), + measurements: { emitter: new EventEmitter() }, + // Mirror the array shape the reactor produces in production. + get getEffluent() { + const C = new Array(NUM_SPECIES).fill(2); + C[12] = 3500; + return [{ topic: 'Fluent', payload: { inlet: 0, F: 150, C } }]; + }, + }; + s.router.dispatchRegister(reactor, 'reactor'); + reactor.emitter.emit('stateChange'); + + assert.equal(s.upstreamReactor, reactor); + assert.equal(s.F_in, 150); + assert.equal(s.Cs_in[12], 3500); + assert.ok(outputChanges >= 1, 'reactor stateChange should trigger output-changed'); +}); + +test('reactor stateChange handles single-envelope getEffluent (not array)', () => { + const s = makeSettler(); + const reactor = { + config: { general: { name: 'r', id: 'r-1' }, functionality: { positionVsParent: 'upstream' } }, + emitter: new EventEmitter(), + measurements: { emitter: new EventEmitter() }, + get getEffluent() { + const C = new Array(NUM_SPECIES).fill(1); + C[12] = 800; + return { topic: 'Fluent', payload: { inlet: 0, F: 42, C } }; + }, + }; + s.router.dispatchRegister(reactor, 'reactor'); + reactor.emitter.emit('stateChange'); + + assert.equal(s.F_in, 42); + assert.equal(s.Cs_in[12], 800); +}); + +test('TSS measurement updates C_TS via _updateMeasurement', () => { + const s = makeSettler(); + s._updateMeasurement('quantity (tss)', 7000); + assert.equal(s.C_TS, 7000); +}); + +test('downstream machine becomes returnPump', () => { + const s = makeSettler(); + const pump = { + config: { general: { name: 'pump', id: 'p-1' }, functionality: { positionVsParent: 'downstream' } }, + measurements: { emitter: new EventEmitter() }, + }; + s.router.dispatchRegister(pump, 'machine'); + assert.equal(s.returnPump, pump); + assert.equal(pump.upstreamSource, s); +}); + +test('getStatusBadge returns idle when F_in=0, green when flowing', () => { + const s = makeSettler(); + const idle = s.getStatusBadge(); + assert.equal(idle.fill, 'blue'); + + s.F_in = 100; + s.C_TS = 5000; + const C = new Array(NUM_SPECIES).fill(10); + C[12] = 3000; + s.Cs_in = C; + const active = s.getStatusBadge(); + assert.equal(active.fill, 'green'); + assert.ok(active.text.includes('F_in')); +});