From aefec90485b1d4438360c67a5fc160e89e67db7e Mon Sep 17 00:00:00 2001 From: znetsixe Date: Thu, 21 May 2026 15:07:12 +0200 Subject: [PATCH] chore: initial coresync scaffold MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit EVOLV CoreSync node — FROST/SensorThings handoff path. First version forwards FROST-ready HTTP request messages on the dbase output; a downstream Node-RED http-request node performs the actual POST and feeds responses back on msg.topic = "frost.response". Lazy stream resolver, latest-wins queue for unresolved/FROST-down streams (keep first + latest, drop middle), knot-emit on slope change, provenance preserved in Observation parameters. Interview state + open Q20 (slope angle vs. relative delta) recorded in superproject CORESYNC_FROST_INTERVIEW_HANDOFF.md. Co-Authored-By: Claude Opus 4.7 (1M context) --- coresync.html | 95 ++++++++++++++ coresync.js | 51 ++++++++ package.json | 23 ++++ src/coreSyncDomain.js | 199 +++++++++++++++++++++++++++++ src/frostRequests.js | 201 ++++++++++++++++++++++++++++++ src/identity.js | 76 +++++++++++ src/normalizer.js | 54 ++++++++ src/reducer.js | 106 ++++++++++++++++ test/basic/coresync.basic.test.js | 105 ++++++++++++++++ 9 files changed, 910 insertions(+) create mode 100644 coresync.html create mode 100644 coresync.js create mode 100644 package.json create mode 100644 src/coreSyncDomain.js create mode 100644 src/frostRequests.js create mode 100644 src/identity.js create mode 100644 src/normalizer.js create mode 100644 src/reducer.js create mode 100644 test/basic/coresync.basic.test.js diff --git a/coresync.html b/coresync.html new file mode 100644 index 0000000..ef2f0a1 --- /dev/null +++ b/coresync.html @@ -0,0 +1,95 @@ + + + + + diff --git a/coresync.js b/coresync.js new file mode 100644 index 0000000..0e3285a --- /dev/null +++ b/coresync.js @@ -0,0 +1,51 @@ +'use strict'; + +const { CoreSyncDomain } = require('./src/coreSyncDomain'); + +module.exports = function(RED) { + RED.nodes.registerType('coresync', function(config) { + RED.nodes.createNode(this, config); + const node = this; + const hub = new CoreSyncDomain({ + frostBaseUrl: config.frostBaseUrl, + serviceVersion: config.serviceVersion, + dbaseFormat: config.dbaseFormat, + assetTagOverride: config.assetTagOverride, + sensorTagOverride: config.sensorTagOverride, + maxQueuedObservationsPerStream: config.maxQueuedObservationsPerStream, + reducer: { + angleToleranceDeg: Number(config.angleToleranceDeg), + timeScaleMs: Number(config.timeScaleMs), + maxGapMs: Number(config.maxGapMs), + minDeltaTimeMs: Number(config.minDeltaTimeMs), + minDeltaValue: Number(config.minDeltaValue), + comparisonMode: config.comparisonMode || 'angle', + }, + }); + + node.on('input', (msg, send, done) => { + try { + const output = hub.handleMessage(msg); + const dbaseCount = Array.isArray(output[1]) ? output[1].length : (output[1] ? 1 : 0); + if (dbaseCount > 0) node.status({ fill: 'blue', shape: 'dot', text: `${dbaseCount} FROST request(s)` }); + send(output); + } catch (error) { + node.status({ fill: 'red', shape: 'ring', text: error.message }); + node.error(error, msg); + } finally { + if (typeof done === 'function') done(); + } + }); + + node.on('close', (removed, done) => { + const closeDone = typeof removed === 'function' ? removed : done; + try { + const output = hub.flushAll('close'); + if (output[1]) node.send(output); + node.status({}); + } finally { + if (typeof closeDone === 'function') closeDone(); + } + }); + }); +}; diff --git a/package.json b/package.json new file mode 100644 index 0000000..917caa0 --- /dev/null +++ b/package.json @@ -0,0 +1,23 @@ +{ + "name": "coresync", + "version": "0.1.0", + "description": "EVOLV CoreSync collector for FROST/SensorThings handoff.", + "main": "coresync.js", + "scripts": { + "test": "jest test/basic/*.test.js" + }, + "keywords": [ + "iot", + "frost", + "sensorthings", + "node-red", + "EVOLV" + ], + "author": "EVOLV", + "license": "SEE LICENSE", + "node-red": { + "nodes": { + "coresync": "coresync.js" + } + } +} diff --git a/src/coreSyncDomain.js b/src/coreSyncDomain.js new file mode 100644 index 0000000..e82fb14 --- /dev/null +++ b/src/coreSyncDomain.js @@ -0,0 +1,199 @@ +'use strict'; + +const { normalizeInput } = require('./normalizer'); +const { PointReducer, DEFAULTS: REDUCER_DEFAULTS } = require('./reducer'); +const { + METADATA_ORDER, + createRequest, + extractEntityId, + lookupRequest, + observationRequest, +} = require('./frostRequests'); + +const DEFAULT_VALUE_SCALE_BY_TYPE = { + pressure: 100000, + flow: 1, + power: 1000, + temperature: 1, + density: 1, + level: 1, + volume: 1, + control: 1, + percentage: 1, + efficiency: 1, +}; + +class CoreSyncDomain { + constructor(config = {}) { + this.config = { + frostBaseUrl: config.frostBaseUrl || 'http://localhost:8080/FROST-Server', + serviceVersion: config.serviceVersion || 'v1.1', + dbaseFormat: config.dbaseFormat || 'frost', + assetTagOverride: config.assetTagOverride || '', + sensorTagOverride: config.sensorTagOverride || '', + maxQueuedObservationsPerStream: Number(config.maxQueuedObservationsPerStream) || 2, + reducer: { ...REDUCER_DEFAULTS, ...(config.reducer || {}) }, + }; + this.streams = new Map(); + } + + handleMessage(msg) { + if (msg && msg.topic === 'frost.response') { + return this.handleFrostResponse(msg); + } + if (msg && msg.topic === 'coresync.flush') { + return this.flushAll('flush'); + } + return this.handleTelemetry(msg); + } + + handleTelemetry(msg) { + const requests = []; + const points = normalizeInput(msg, { + assetTagOverride: this.config.assetTagOverride, + sensorTagOverride: this.config.sensorTagOverride, + }); + + for (const point of points) { + const state = this._stateFor(point); + const candidates = state.reducer.offer(point); + for (const candidate of candidates) { + requests.push(...this._handleObservationCandidate(state, candidate)); + } + } + + return [null, requests.length ? requests : null, null]; + } + + handleFrostResponse(msg) { + const meta = msg._coreSync || {}; + const state = meta.streamKey ? this.streams.get(meta.streamKey) : null; + if (!state || !meta.kind || !meta.action) return [null, null, null]; + + if (meta.kind === 'observation') { + state.inFlightObservations = Math.max(0, state.inFlightObservations - 1); + return [null, null, null]; + } + + const id = extractEntityId(msg.payload); + const statusCode = Number(msg.statusCode || msg.response?.statusCode || 0); + const requests = []; + + if (id !== null && (statusCode === 0 || statusCode < 400)) { + state.ids[meta.kind] = id; + state.inFlightMetadata.delete(meta.kind); + requests.push(...this._continueMetadata(state)); + requests.push(...this._drainPendingObservations(state)); + return [null, requests.length ? requests : null, null]; + } + + if (meta.action === 'lookup' && (statusCode === 200 || statusCode === 0)) { + state.inFlightMetadata.delete(meta.kind); + requests.push(createRequest(this.config, state.meta, meta.kind, state.ids)); + state.inFlightMetadata.add(meta.kind); + return [null, requests, null]; + } + + state.inFlightMetadata.delete(meta.kind); + state.backoffFailures += 1; + return [this._diagnostic('frost.error', state, msg), null, null]; + } + + flushAll(reason = 'flush') { + const requests = []; + for (const state of this.streams.values()) { + for (const candidate of state.reducer.flush(reason)) { + requests.push(...this._handleObservationCandidate(state, candidate)); + } + } + return [null, requests.length ? requests : null, null]; + } + + _stateFor(point) { + let state = this.streams.get(point.streamKey); + if (!state) { + state = { + meta: point, + ids: {}, + reducer: new PointReducer({ + ...this.config.reducer, + valueScale: this._valueScaleFor(point), + }), + pendingObservations: [], + inFlightMetadata: new Set(), + inFlightObservations: 0, + backoffFailures: 0, + }; + this.streams.set(point.streamKey, state); + } else { + state.meta = { ...state.meta, ...point }; + } + return state; + } + + _handleObservationCandidate(state, candidate) { + if (this._isResolved(state)) { + state.inFlightObservations += 1; + return [observationRequest(this.config, candidate, state.ids.datastream, state.ids.featureOfInterest)]; + } + this._enqueuePendingObservation(state, candidate); + return this._continueMetadata(state); + } + + _continueMetadata(state) { + for (const kind of METADATA_ORDER) { + if (state.ids[kind]) continue; + if (state.inFlightMetadata.has(kind)) return []; + const request = lookupRequest(this.config, state.meta, kind); + state.inFlightMetadata.add(kind); + return [request]; + } + return []; + } + + _drainPendingObservations(state) { + if (!this._isResolved(state)) return []; + const pending = state.pendingObservations.splice(0); + state.inFlightObservations += pending.length; + return pending.map((candidate) => observationRequest(this.config, candidate, state.ids.datastream, state.ids.featureOfInterest)); + } + + _enqueuePendingObservation(state, candidate) { + if (state.pendingObservations.length === 0) { + state.pendingObservations.push(candidate); + return; + } + if (state.pendingObservations.length === 1) { + state.pendingObservations.push(candidate); + return; + } + state.pendingObservations[1] = candidate; + } + + _isResolved(state) { + return METADATA_ORDER.every((kind) => state.ids[kind] !== undefined && state.ids[kind] !== null); + } + + _valueScaleFor(point) { + const configured = this.config.reducer.valueScaleByType || DEFAULT_VALUE_SCALE_BY_TYPE; + return configured[point.type] || this.config.reducer.valueScale || 1; + } + + _diagnostic(topic, state, msg) { + return { + topic, + payload: { + streamKey: state.meta.streamKey, + requestId: msg.requestId, + statusCode: msg.statusCode, + kind: msg._coreSync?.kind, + action: msg._coreSync?.action, + }, + }; + } +} + +module.exports = { + DEFAULT_VALUE_SCALE_BY_TYPE, + CoreSyncDomain, +}; diff --git a/src/frostRequests.js b/src/frostRequests.js new file mode 100644 index 0000000..d0c606e --- /dev/null +++ b/src/frostRequests.js @@ -0,0 +1,201 @@ +'use strict'; + +const METADATA_ORDER = ['thing', 'observedProperty', 'sensor', 'featureOfInterest', 'datastream']; + +function trimBaseUrl(baseUrl) { + return String(baseUrl || '').replace(/\/+$/, ''); +} + +function pathUrl(config, path) { + const base = trimBaseUrl(config.frostBaseUrl); + const version = String(config.serviceVersion || 'v1.1').replace(/^\/+|\/+$/g, ''); + return `${base}/${version}${path}`; +} + +function quote(value) { + return `'${String(value).replace(/'/g, "''")}'`; +} + +function requestEnvelope(config, meta, method, path, payload, extra = {}) { + const headers = { + Accept: 'application/json', + }; + if (method === 'POST' || method === 'PATCH') { + headers['Content-Type'] = 'application/json'; + headers.Prefer = 'return=representation'; + } + return { + topic: extra.topic || `frost.metadata.${extra.action || 'request'}`, + requestId: extra.requestId, + _coreSync: { + kind: extra.kind, + action: extra.action, + externalKey: extra.externalKey, + streamKey: meta.streamKey, + nextKind: extra.nextKind, + }, + method, + url: pathUrl(config, path), + headers, + payload: payload === undefined ? null : payload, + }; +} + +function unitOfMeasurement(unit) { + return { + name: unit, + symbol: unit, + definition: 'http://www.opengis.net/def/uom/UCUM/', + }; +} + +function lookupRequest(config, meta, kind) { + const externalKey = externalKeyFor(kind, meta); + const entitySet = entitySetFor(kind); + const filter = encodeURIComponent(`properties/externalKey eq ${quote(externalKey)}`); + return requestEnvelope(config, meta, 'GET', `/${entitySet}?$filter=${filter}&$top=1`, null, { + topic: 'frost.metadata.lookup', + requestId: `${externalKey}:lookup`, + kind, + action: 'lookup', + externalKey, + }); +} + +function createRequest(config, meta, kind, ids = {}) { + const externalKey = externalKeyFor(kind, meta); + return requestEnvelope(config, meta, 'POST', `/${entitySetFor(kind)}`, createPayload(kind, meta, ids), { + topic: 'frost.metadata.create', + requestId: `${externalKey}:create`, + kind, + action: 'create', + externalKey, + }); +} + +function observationRequest(config, candidate, datastreamId, foiId) { + const meta = candidate.point; + return requestEnvelope( + config, + meta, + 'POST', + `/Datastreams(${datastreamId})/Observations`, + { + phenomenonTime: meta.phenomenonTime, + result: meta.value, + FeatureOfInterest: { '@iot.id': foiId }, + parameters: { + reduction: 'knot', + reductionReason: candidate.reason, + evolvFieldKey: meta.fieldKey, + evolvStreamKey: meta.streamKey, + sourceMeasurement: meta.measurement, + }, + }, + { + topic: 'frost.observation.create', + requestId: `observation:${meta.streamKey}:${meta.phenomenonTime}`, + kind: 'observation', + action: 'create', + externalKey: `observation:${meta.streamKey}`, + }, + ); +} + +function entitySetFor(kind) { + return { + thing: 'Things', + observedProperty: 'ObservedProperties', + sensor: 'Sensors', + featureOfInterest: 'FeaturesOfInterest', + datastream: 'Datastreams', + }[kind]; +} + +function externalKeyFor(kind, meta) { + return { + thing: meta.thingExternalKey, + observedProperty: meta.observedPropertyExternalKey, + sensor: meta.sensorExternalKey, + featureOfInterest: meta.featureOfInterestExternalKey, + datastream: meta.datastreamExternalKey, + }[kind]; +} + +function createPayload(kind, meta, ids) { + const builders = { + thing: () => ({ + name: meta.thingTag, + description: `EVOLV Thing ${meta.thingTag}`, + properties: { + externalKey: meta.thingExternalKey, + source: 'EVOLV', + }, + }), + observedProperty: () => ({ + name: meta.type, + definition: `urn:evolv:observedProperty:${meta.type}`, + description: `EVOLV observed property ${meta.type}`, + properties: { + externalKey: meta.observedPropertyExternalKey, + }, + }), + sensor: () => ({ + name: meta.sensorTag, + encodingType: 'application/json', + metadata: JSON.stringify({ source: 'EVOLV', variant: meta.variant }), + properties: { + externalKey: meta.sensorExternalKey, + variant: meta.variant, + }, + }), + featureOfInterest: () => ({ + name: `${meta.thingTag}:${meta.position}`, + description: `EVOLV feature of interest ${meta.position}`, + encodingType: 'application/vnd.geo+json', + feature: { type: 'Point', coordinates: [0, 0] }, + properties: { + externalKey: meta.featureOfInterestExternalKey, + position: meta.position, + thingTag: meta.thingTag, + }, + }), + datastream: () => ({ + name: `${meta.thingTag} ${meta.type} ${meta.variant} ${meta.position} ${meta.sensorTag}`, + description: `EVOLV stream ${meta.streamKey}`, + observationType: 'http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement', + unitOfMeasurement: unitOfMeasurement(meta.unit), + Thing: { '@iot.id': ids.thing }, + ObservedProperty: { '@iot.id': ids.observedProperty }, + Sensor: { '@iot.id': ids.sensor }, + properties: { + externalKey: meta.datastreamExternalKey, + streamKey: meta.streamKey, + position: meta.position, + variant: meta.variant, + sensorTag: meta.sensorTag, + }, + }), + }; + return builders[kind](); +} + +function extractEntityId(payload) { + if (!payload) return null; + if (payload['@iot.id'] !== undefined) return payload['@iot.id']; + if (Array.isArray(payload.value) && payload.value[0] && payload.value[0]['@iot.id'] !== undefined) { + return payload.value[0]['@iot.id']; + } + return null; +} + +module.exports = { + METADATA_ORDER, + createRequest, + entitySetFor, + extractEntityId, + externalKeyFor, + lookupRequest, + observationRequest, + pathUrl, +}; diff --git a/src/identity.js b/src/identity.js new file mode 100644 index 0000000..84f9a07 --- /dev/null +++ b/src/identity.js @@ -0,0 +1,76 @@ +'use strict'; + +const DEFAULT_UNITS = { + pressure: 'Pa', + flow: 'm3/s', + power: 'W', + temperature: 'K', + density: 'kg/m3', + level: 'm', + volume: 'm3', + control: '1', + percentage: '1', + efficiency: '1', +}; + +function firstPresent(...values) { + for (const value of values) { + if (value !== undefined && value !== null && value !== '') return value; + } + return undefined; +} + +function sanitizeToken(value, fallback) { + const raw = firstPresent(value, fallback); + return String(raw || '').trim(); +} + +function parseFieldKey(fieldKey) { + const parts = String(fieldKey || '').split('.').filter(Boolean); + return { + type: parts[0] || 'value', + variant: parts[1] || 'measured', + position: parts[2] || 'atEquipment', + sensorTag: parts[3] || undefined, + }; +} + +function resolveIdentity(input, options = {}) { + const tags = input.tags || {}; + const parsed = parseFieldKey(input.fieldKey); + const thingTag = sanitizeToken( + options.assetTagOverride, + tags.tagcode || tags.tagCode || tags.asset_tagCode || tags.asset_tagcode || input.measurement, + ); + const type = sanitizeToken(input.type, parsed.type).toLowerCase(); + const variant = sanitizeToken(input.variant, parsed.variant).toLowerCase(); + const position = sanitizeToken(input.position, parsed.position); + const sensorTag = sanitizeToken( + options.sensorTagOverride, + parsed.sensorTag || tags.sensorTag || tags.sensor_tagCode || `${variant.toUpperCase()}-${thingTag}`, + ); + const unit = sanitizeToken(input.unit, tags.unit || input.source?.unit || DEFAULT_UNITS[type] || '1'); + const streamKey = [thingTag, type, variant, position, sensorTag].join(':'); + + return { + thingTag, + type, + variant, + position, + sensorTag, + unit, + fieldKey: input.fieldKey, + streamKey, + thingExternalKey: `thing:${thingTag}`, + observedPropertyExternalKey: `observedProperty:${type}`, + sensorExternalKey: `sensor:${sensorTag}`, + featureOfInterestExternalKey: `foi:${thingTag}:${position}`, + datastreamExternalKey: `datastream:${streamKey}`, + }; +} + +module.exports = { + DEFAULT_UNITS, + parseFieldKey, + resolveIdentity, +}; diff --git a/src/normalizer.js b/src/normalizer.js new file mode 100644 index 0000000..a62e3dd --- /dev/null +++ b/src/normalizer.js @@ -0,0 +1,54 @@ +'use strict'; + +const { resolveIdentity } = require('./identity'); + +function normalizeTimestamp(value) { + if (value instanceof Date) return value; + if (typeof value === 'number') return new Date(value); + if (typeof value === 'string' && value.trim()) return new Date(value); + return new Date(); +} + +function unwrapPayload(msg) { + if (msg && msg.payload !== undefined) return msg.payload; + return msg; +} + +function normalizeOne(payload, options = {}) { + if (!payload || typeof payload !== 'object') return []; + if (!payload.fields || typeof payload.fields !== 'object') return []; + + const timestamp = normalizeTimestamp(payload.timestamp || payload.time || payload.ts); + const tags = payload.tags || {}; + return Object.entries(payload.fields) + .filter(([, value]) => typeof value === 'number' && Number.isFinite(value)) + .map(([fieldKey, value]) => { + const identity = resolveIdentity({ + measurement: payload.measurement, + fieldKey, + tags, + source: payload.source, + }, options); + return { + ...identity, + value, + time: timestamp, + phenomenonTime: timestamp.toISOString(), + measurement: payload.measurement, + tags, + source: payload.source || {}, + }; + }); +} + +function normalizeInput(msg, options = {}) { + const payload = unwrapPayload(msg); + const payloads = Array.isArray(payload) ? payload : [payload]; + return payloads.flatMap((item) => normalizeOne(item, options)); +} + +module.exports = { + normalizeInput, + normalizeOne, + normalizeTimestamp, +}; diff --git a/src/reducer.js b/src/reducer.js new file mode 100644 index 0000000..890043c --- /dev/null +++ b/src/reducer.js @@ -0,0 +1,106 @@ +'use strict'; + +const DEFAULTS = { + angleToleranceDeg: 5, + timeScaleMs: 60000, + valueScale: 1, + maxGapMs: 300000, + minDeltaTimeMs: 0, + minDeltaValue: 0, + comparisonMode: 'angle', + relativeSlopeTolerance: 0.1, +}; + +function toMs(point) { + return point.time instanceof Date ? point.time.getTime() : new Date(point.time).getTime(); +} + +function angleDeg(from, to, options) { + const dt = toMs(to) - toMs(from); + const dx = dt / Math.max(Number(options.timeScaleMs) || DEFAULTS.timeScaleMs, 1); + const dy = (Number(to.value) - Number(from.value)) / Math.max(Math.abs(Number(options.valueScale) || DEFAULTS.valueScale), Number.EPSILON); + return Math.atan2(dy, dx) * 180 / Math.PI; +} + +function angleDiff(a, b) { + let diff = Math.abs(a - b) % 360; + if (diff > 180) diff = 360 - diff; + return diff; +} + +function relativeSlope(from, to, options) { + const dt = Math.max(toMs(to) - toMs(from), 1); + return ((Number(to.value) - Number(from.value)) / Math.max(Math.abs(Number(options.valueScale) || DEFAULTS.valueScale), Number.EPSILON)) / dt; +} + +class PointReducer { + constructor(options = {}) { + this.options = { ...DEFAULTS, ...options }; + this.anchor = null; + this.previous = null; + } + + offer(point) { + const output = []; + if (!point || typeof point.value !== 'number' || !Number.isFinite(point.value)) return output; + + if (!this.anchor) { + this.anchor = point; + this.previous = point; + output.push({ point, reason: 'first' }); + return output; + } + + const minDeltaTimeMs = Number(this.options.minDeltaTimeMs) || 0; + const minDeltaValue = Number(this.options.minDeltaValue) || 0; + const dtFromPrevious = toMs(point) - toMs(this.previous); + const dvFromPrevious = Math.abs(Number(point.value) - Number(this.previous.value)); + if (dtFromPrevious < minDeltaTimeMs && dvFromPrevious < minDeltaValue) { + this.previous = point; + return output; + } + + const maxGapMs = Number(this.options.maxGapMs) || 0; + if (maxGapMs > 0 && this.previous !== this.anchor && toMs(point) - toMs(this.anchor) >= maxGapMs) { + output.push({ point: this.previous, reason: 'max-gap' }); + this.anchor = this.previous; + } + + if (this.previous !== this.anchor && this._changedDirection(point)) { + output.push({ point: this.previous, reason: this.options.comparisonMode === 'relative-slope' ? 'slope-change' : 'angle-change' }); + this.anchor = this.previous; + } + + this.previous = point; + return output; + } + + flush(reason = 'flush') { + if (this.previous && this.previous !== this.anchor) { + const point = this.previous; + this.anchor = point; + return [{ point, reason }]; + } + return []; + } + + _changedDirection(point) { + if (this.options.comparisonMode === 'relative-slope') { + const left = relativeSlope(this.anchor, this.previous, this.options); + const right = relativeSlope(this.previous, point, this.options); + const denominator = Math.max(Math.abs(left), Number.EPSILON); + return Math.abs(right - left) / denominator > Number(this.options.relativeSlopeTolerance); + } + + const left = angleDeg(this.anchor, this.previous, this.options); + const right = angleDeg(this.previous, point, this.options); + return angleDiff(left, right) > Number(this.options.angleToleranceDeg); + } +} + +module.exports = { + DEFAULTS, + PointReducer, + angleDeg, + angleDiff, +}; diff --git a/test/basic/coresync.basic.test.js b/test/basic/coresync.basic.test.js new file mode 100644 index 0000000..317fd50 --- /dev/null +++ b/test/basic/coresync.basic.test.js @@ -0,0 +1,105 @@ +const assert = require('node:assert/strict'); + +const { normalizeInput } = require('../../src/normalizer'); +const { PointReducer } = require('../../src/reducer'); +const { CoreSyncDomain } = require('../../src/coreSyncDomain'); + +function telemetry(timestamp, value) { + return { + payload: { + measurement: 'P-1', + fields: { + 'pressure.measured.upstream.PT-1': value, + }, + tags: { + tagcode: 'P-1', + }, + timestamp, + }, + }; +} + +test('normalizer maps EVOLV dbase payloads to stable stream identities', () => { + const points = normalizeInput(telemetry('2026-05-19T10:15:30.000Z', 12345)); + + assert.equal(points.length, 1); + assert.equal(points[0].thingTag, 'P-1'); + assert.equal(points[0].type, 'pressure'); + assert.equal(points[0].variant, 'measured'); + assert.equal(points[0].position, 'upstream'); + assert.equal(points[0].sensorTag, 'PT-1'); + assert.equal(points[0].streamKey, 'P-1:pressure:measured:upstream:PT-1'); + assert.equal(points[0].unit, 'Pa'); +}); + +test('point reducer keeps first point and previous point on angle change', () => { + const reducer = new PointReducer({ + angleToleranceDeg: 5, + timeScaleMs: 60000, + valueScale: 1, + maxGapMs: 0, + }); + const p1 = { time: new Date('2026-05-19T10:00:00.000Z'), value: 0 }; + const p2 = { time: new Date('2026-05-19T10:01:00.000Z'), value: 1 }; + const p3 = { time: new Date('2026-05-19T10:02:00.000Z'), value: 1 }; + + assert.deepEqual(reducer.offer(p1).map((x) => x.reason), ['first']); + assert.deepEqual(reducer.offer(p2), []); + const changed = reducer.offer(p3); + assert.equal(changed.length, 1); + assert.equal(changed[0].reason, 'angle-change'); + assert.equal(changed[0].point, p2); +}); + +test('coresync emits metadata lookup first and drains observations after resolver completes', () => { + const hub = new CoreSyncDomain({ + frostBaseUrl: 'http://frost.example/FROST-Server', + serviceVersion: 'v1.1', + }); + + const first = hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 12345)); + assert.equal(first[1].length, 1); + assert.equal(first[1][0].topic, 'frost.metadata.lookup'); + assert.equal(first[1][0]._coreSync.kind, 'thing'); + + const streamKey = 'P-1:pressure:measured:upstream:PT-1'; + const kinds = [ + ['thing', 1], + ['observedProperty', 2], + ['sensor', 3], + ['featureOfInterest', 4], + ['datastream', 5], + ]; + + let output; + for (const [kind, id] of kinds) { + output = hub.handleMessage({ + topic: 'frost.response', + statusCode: 200, + payload: { value: [{ '@iot.id': id }] }, + _coreSync: { kind, action: 'lookup', streamKey }, + }); + } + + assert.equal(output[1].length, 1); + assert.equal(output[1][0].topic, 'frost.observation.create'); + assert.equal(output[1][0].method, 'POST'); + assert.equal(output[1][0].url, 'http://frost.example/FROST-Server/v1.1/Datastreams(5)/Observations'); + assert.equal(output[1][0].payload.FeatureOfInterest['@iot.id'], 4); + assert.equal(output[1][0].payload.parameters.reductionReason, 'first'); +}); + +test('pending observation queue keeps first and latest unresolved points', () => { + const hub = new CoreSyncDomain({}); + + hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 1)); + hub.handleMessage(telemetry('2026-05-19T10:01:00.000Z', 2)); + hub.handleMessage({ topic: 'coresync.flush' }); + hub.handleMessage(telemetry('2026-05-19T10:02:00.000Z', 3)); + hub.handleMessage({ topic: 'coresync.flush' }); + + const state = hub.streams.get('P-1:pressure:measured:upstream:PT-1'); + assert.equal(state.pendingObservations.length, 2); + assert.equal(state.pendingObservations[0].point.value, 1); + assert.equal(state.pendingObservations[1].point.value, 3); +});