diff --git a/src/specificClass.js b/src/specificClass.js index d34170d..073d3f3 100644 --- a/src/specificClass.js +++ b/src/specificClass.js @@ -286,7 +286,7 @@ class PumpingStation extends BaseDomain { const measurementType = child.config.asset.type; const eventName = `${measurementType}.measured.${position}`; - child.measurements.emitter.on(eventName, (eventData = {}) => { + const handle = (eventData = {}) => { this.logger.debug( `Measurement update ${eventName} <- ${eventData.childName || child.config.general.name}: ${eventData.value} ${eventData.unit}` ); @@ -297,7 +297,21 @@ class PumpingStation extends BaseDomain { this.measurements.type(measurementType).variant('measured').position(position) .value(eventData.value, eventData.timestamp, eventData.unit); this.measurementRouter.route(measurementType, eventData.value, position, eventData); - }); + }; + + child.measurements.emitter.on(eventName, handle); + + // Seed from the child's current value. The emitter only delivers FUTURE + // updates, so a parent that registers after the child already emitted + // (e.g. a once-only inject that fired during startup before this + // subscription existed) would otherwise never see that value. Replaying + // the last sample makes a late subscriber pick up the present state. + const series = child.measurements + .type(measurementType).variant('measured').position(position).get?.(); + const sample = series?.getLaggedSample?.(0); + if (sample && sample.value != null) { + handle({ ...sample, childName: child.config.general.name }); + } } _subscribePredictedFlow(child) { diff --git a/test/basic/replay-on-subscribe.basic.test.js b/test/basic/replay-on-subscribe.basic.test.js new file mode 100644 index 0000000..7268d55 --- /dev/null +++ b/test/basic/replay-on-subscribe.basic.test.js @@ -0,0 +1,81 @@ +// Late-subscriber replay: a measurement child that already holds a value when +// the pumpingStation registers it (e.g. a once-only inject that fired during +// startup before the parent subscribed) must still surface on Port 0. The +// emitter only delivers future updates, so _subscribeMeasurement seeds from the +// child's current sample. + +const test = require('node:test'); +const assert = require('node:assert/strict'); +const EventEmitter = require('node:events'); + +const PumpingStation = require('../../src/specificClass'); +const { MeasurementContainer, configManager } = require('generalFunctions'); + +function makePsConfig() { + const cm = new configManager(); + return cm.buildConfig('pumpingStation', { name: 'PS' }, 'ps-replay', { + basin: { volume: 50, height: 5, inflowLevel: 3, outflowLevel: 0.2, overflowLevel: 4.5 }, + hydraulics: { minHeightBasedOn: 'outlet' }, + control: { + mode: 'levelbased', + allowedModes: new Set(['levelbased']), + levelbased: { minLevel: 1, startLevel: 2, maxLevel: 4, curveType: 'linear' }, + }, + safety: {}, + }); +} + +function makeFlowMeasurementChild(id = 'meas-replay') { + const measurements = new MeasurementContainer({ autoConvert: true, preferredUnits: { flow: 'm3/s' } }); + assert.ok(typeof measurements.emitter?.on === 'function'); + return { + id, + source: { + config: { + general: { id, name: id }, + functionality: { softwareType: 'measurement', positionVsParent: 'upstream' }, + asset: { type: 'flow' }, + }, + measurements, + }, + }; +} + +test('value written BEFORE registration is replayed on subscribe (once-inject timing)', () => { + const ps = new PumpingStation(makePsConfig()); + const child = makeFlowMeasurementChild(); + + // Child already holds a value — emitted into the void before the parent existed. + child.source.measurements + .type('flow').variant('measured').position('upstream') + .value(50, Date.now(), 'm3/h'); + + // Parent registers AFTER the value is present. Without replay it would only + // catch future emits and surface nothing. + ps.childRegistrationUtils.registerChild(child.source, 'upstream'); + + const out = ps.getOutput(); + const upstreamKeys = Object.keys(out).filter((k) => k.startsWith('flow.measured.upstream')); + assert.ok(upstreamKeys.length > 0, 'parent must surface flow.measured.upstream.* after late subscribe'); +}); + +test('no stored value → nothing replayed, no crash', () => { + const ps = new PumpingStation(makePsConfig()); + const child = makeFlowMeasurementChild('empty-child'); + // Register with an empty child container; replay must be a safe no-op. + assert.doesNotThrow(() => ps.childRegistrationUtils.registerChild(child.source, 'upstream')); + const out = ps.getOutput(); + const upstreamKeys = Object.keys(out).filter((k) => k.startsWith('flow.measured.upstream')); + assert.equal(upstreamKeys.length, 0, 'no upstream key when child has no value'); +}); + +test('future emits still delivered after subscribe (listener intact)', () => { + const ps = new PumpingStation(makePsConfig()); + const child = makeFlowMeasurementChild('streaming-child'); + ps.childRegistrationUtils.registerChild(child.source, 'upstream'); + // Emit AFTER registration — the normal streaming-sensor path. + child.source.measurements.type('flow').variant('measured').position('upstream').value(30, Date.now(), 'm3/h'); + const out = ps.getOutput(); + const upstreamKeys = Object.keys(out).filter((k) => k.startsWith('flow.measured.upstream')); + assert.ok(upstreamKeys.length > 0, 'normal post-subscribe emit still surfaces'); +});