chore: initial coresync scaffold

EVOLV CoreSync node — FROST/SensorThings handoff path. First version forwards FROST-ready HTTP request messages on the dbase output; a downstream Node-RED http-request node performs the actual POST and feeds responses back on msg.topic = "frost.response". Lazy stream resolver, latest-wins queue for unresolved/FROST-down streams (keep first + latest, drop middle), knot-emit on slope change, provenance preserved in Observation parameters.

Interview state + open Q20 (slope angle vs. relative delta) recorded in superproject CORESYNC_FROST_INTERVIEW_HANDOFF.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
znetsixe
2026-05-21 15:07:12 +02:00
commit aefec90485
9 changed files with 910 additions and 0 deletions

95
coresync.html Normal file
View File

@@ -0,0 +1,95 @@
<script type="text/javascript">
RED.nodes.registerType('coresync', {
category: 'EVOLV',
color: '#54647B',
defaults: {
name: { value: '' },
frostBaseUrl: { value: 'http://localhost:8080/FROST-Server', required: true },
serviceVersion: { value: 'v1.1', required: true },
dbaseFormat: { value: 'frost' },
assetTagOverride: { value: '' },
sensorTagOverride: { value: '' },
comparisonMode: { value: 'angle' },
angleToleranceDeg: { value: 5, required: true, validate: RED.validators.number() },
timeScaleMs: { value: 60000, required: true, validate: RED.validators.number() },
maxGapMs: { value: 300000, required: true, validate: RED.validators.number() },
minDeltaTimeMs: { value: 0, required: true, validate: RED.validators.number() },
minDeltaValue: { value: 0, required: true, validate: RED.validators.number() },
maxQueuedObservationsPerStream: { value: 2, required: true, validate: RED.validators.number() },
diagnosticsEnabled: { value: false },
},
inputs: 1,
outputs: 3,
inputLabels: ['EVOLV dbase / FROST response'],
outputLabels: ['process', 'dbase', 'parent'],
icon: 'font-awesome/fa-cloud-upload',
label: function() {
return this.name || 'CoreSync';
},
});
</script>
<script type="text/html" data-template-name="coresync">
<div class="form-row">
<label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
<input type="text" id="node-input-name">
</div>
<div class="form-row">
<label for="node-input-frostBaseUrl"><i class="fa fa-globe"></i> FROST URL</label>
<input type="text" id="node-input-frostBaseUrl">
</div>
<div class="form-row">
<label for="node-input-serviceVersion"><i class="fa fa-code-fork"></i> Version</label>
<input type="text" id="node-input-serviceVersion">
</div>
<div class="form-row">
<label for="node-input-dbaseFormat"><i class="fa fa-database"></i> Output</label>
<select id="node-input-dbaseFormat" style="width:70%;">
<option value="frost">frost</option>
</select>
</div>
<div class="form-row">
<label for="node-input-assetTagOverride"><i class="fa fa-cube"></i> Thing Tag</label>
<input type="text" id="node-input-assetTagOverride" placeholder="optional override">
</div>
<div class="form-row">
<label for="node-input-sensorTagOverride"><i class="fa fa-bullseye"></i> Sensor Tag</label>
<input type="text" id="node-input-sensorTagOverride" placeholder="optional override">
</div>
<div class="form-row">
<label for="node-input-comparisonMode"><i class="fa fa-compress"></i> Reducer</label>
<select id="node-input-comparisonMode" style="width:70%;">
<option value="angle">Angle change</option>
<option value="relative-slope">Relative slope change</option>
</select>
</div>
<div class="form-row">
<label for="node-input-angleToleranceDeg"><i class="fa fa-arrows-h"></i> Angle Tol.</label>
<input type="number" id="node-input-angleToleranceDeg" style="width:70%;">
</div>
<div class="form-row">
<label for="node-input-timeScaleMs"><i class="fa fa-clock-o"></i> Time Scale</label>
<input type="number" id="node-input-timeScaleMs" style="width:70%;">
</div>
<div class="form-row">
<label for="node-input-maxGapMs"><i class="fa fa-hourglass-half"></i> Max Gap</label>
<input type="number" id="node-input-maxGapMs" style="width:70%;">
</div>
<div class="form-row">
<label for="node-input-minDeltaTimeMs"><i class="fa fa-filter"></i> Min dt</label>
<input type="number" id="node-input-minDeltaTimeMs" style="width:70%;">
</div>
<div class="form-row">
<label for="node-input-minDeltaValue"><i class="fa fa-filter"></i> Min dv</label>
<input type="number" id="node-input-minDeltaValue" style="width:70%;">
</div>
<div class="form-row">
<label for="node-input-maxQueuedObservationsPerStream"><i class="fa fa-list"></i> Queue</label>
<input type="number" id="node-input-maxQueuedObservationsPerStream" style="width:70%;">
</div>
</script>
<script type="text/html" data-help-name="coresync">
<p>Collects EVOLV dbase telemetry, reduces numeric streams, and emits FROST-ready HTTP request messages on the dbase output.</p>
<p>Feed HTTP responses back into this node with <code>msg.topic = "frost.response"</code> so metadata lookup/create state can advance.</p>
</script>

51
coresync.js Normal file
View File

@@ -0,0 +1,51 @@
'use strict';
const { CoreSyncDomain } = require('./src/coreSyncDomain');
module.exports = function(RED) {
RED.nodes.registerType('coresync', function(config) {
RED.nodes.createNode(this, config);
const node = this;
const hub = new CoreSyncDomain({
frostBaseUrl: config.frostBaseUrl,
serviceVersion: config.serviceVersion,
dbaseFormat: config.dbaseFormat,
assetTagOverride: config.assetTagOverride,
sensorTagOverride: config.sensorTagOverride,
maxQueuedObservationsPerStream: config.maxQueuedObservationsPerStream,
reducer: {
angleToleranceDeg: Number(config.angleToleranceDeg),
timeScaleMs: Number(config.timeScaleMs),
maxGapMs: Number(config.maxGapMs),
minDeltaTimeMs: Number(config.minDeltaTimeMs),
minDeltaValue: Number(config.minDeltaValue),
comparisonMode: config.comparisonMode || 'angle',
},
});
node.on('input', (msg, send, done) => {
try {
const output = hub.handleMessage(msg);
const dbaseCount = Array.isArray(output[1]) ? output[1].length : (output[1] ? 1 : 0);
if (dbaseCount > 0) node.status({ fill: 'blue', shape: 'dot', text: `${dbaseCount} FROST request(s)` });
send(output);
} catch (error) {
node.status({ fill: 'red', shape: 'ring', text: error.message });
node.error(error, msg);
} finally {
if (typeof done === 'function') done();
}
});
node.on('close', (removed, done) => {
const closeDone = typeof removed === 'function' ? removed : done;
try {
const output = hub.flushAll('close');
if (output[1]) node.send(output);
node.status({});
} finally {
if (typeof closeDone === 'function') closeDone();
}
});
});
};

23
package.json Normal file
View File

@@ -0,0 +1,23 @@
{
"name": "coresync",
"version": "0.1.0",
"description": "EVOLV CoreSync collector for FROST/SensorThings handoff.",
"main": "coresync.js",
"scripts": {
"test": "jest test/basic/*.test.js"
},
"keywords": [
"iot",
"frost",
"sensorthings",
"node-red",
"EVOLV"
],
"author": "EVOLV",
"license": "SEE LICENSE",
"node-red": {
"nodes": {
"coresync": "coresync.js"
}
}
}

199
src/coreSyncDomain.js Normal file
View File

@@ -0,0 +1,199 @@
'use strict';
const { normalizeInput } = require('./normalizer');
const { PointReducer, DEFAULTS: REDUCER_DEFAULTS } = require('./reducer');
const {
METADATA_ORDER,
createRequest,
extractEntityId,
lookupRequest,
observationRequest,
} = require('./frostRequests');
const DEFAULT_VALUE_SCALE_BY_TYPE = {
pressure: 100000,
flow: 1,
power: 1000,
temperature: 1,
density: 1,
level: 1,
volume: 1,
control: 1,
percentage: 1,
efficiency: 1,
};
class CoreSyncDomain {
constructor(config = {}) {
this.config = {
frostBaseUrl: config.frostBaseUrl || 'http://localhost:8080/FROST-Server',
serviceVersion: config.serviceVersion || 'v1.1',
dbaseFormat: config.dbaseFormat || 'frost',
assetTagOverride: config.assetTagOverride || '',
sensorTagOverride: config.sensorTagOverride || '',
maxQueuedObservationsPerStream: Number(config.maxQueuedObservationsPerStream) || 2,
reducer: { ...REDUCER_DEFAULTS, ...(config.reducer || {}) },
};
this.streams = new Map();
}
handleMessage(msg) {
if (msg && msg.topic === 'frost.response') {
return this.handleFrostResponse(msg);
}
if (msg && msg.topic === 'coresync.flush') {
return this.flushAll('flush');
}
return this.handleTelemetry(msg);
}
handleTelemetry(msg) {
const requests = [];
const points = normalizeInput(msg, {
assetTagOverride: this.config.assetTagOverride,
sensorTagOverride: this.config.sensorTagOverride,
});
for (const point of points) {
const state = this._stateFor(point);
const candidates = state.reducer.offer(point);
for (const candidate of candidates) {
requests.push(...this._handleObservationCandidate(state, candidate));
}
}
return [null, requests.length ? requests : null, null];
}
handleFrostResponse(msg) {
const meta = msg._coreSync || {};
const state = meta.streamKey ? this.streams.get(meta.streamKey) : null;
if (!state || !meta.kind || !meta.action) return [null, null, null];
if (meta.kind === 'observation') {
state.inFlightObservations = Math.max(0, state.inFlightObservations - 1);
return [null, null, null];
}
const id = extractEntityId(msg.payload);
const statusCode = Number(msg.statusCode || msg.response?.statusCode || 0);
const requests = [];
if (id !== null && (statusCode === 0 || statusCode < 400)) {
state.ids[meta.kind] = id;
state.inFlightMetadata.delete(meta.kind);
requests.push(...this._continueMetadata(state));
requests.push(...this._drainPendingObservations(state));
return [null, requests.length ? requests : null, null];
}
if (meta.action === 'lookup' && (statusCode === 200 || statusCode === 0)) {
state.inFlightMetadata.delete(meta.kind);
requests.push(createRequest(this.config, state.meta, meta.kind, state.ids));
state.inFlightMetadata.add(meta.kind);
return [null, requests, null];
}
state.inFlightMetadata.delete(meta.kind);
state.backoffFailures += 1;
return [this._diagnostic('frost.error', state, msg), null, null];
}
flushAll(reason = 'flush') {
const requests = [];
for (const state of this.streams.values()) {
for (const candidate of state.reducer.flush(reason)) {
requests.push(...this._handleObservationCandidate(state, candidate));
}
}
return [null, requests.length ? requests : null, null];
}
_stateFor(point) {
let state = this.streams.get(point.streamKey);
if (!state) {
state = {
meta: point,
ids: {},
reducer: new PointReducer({
...this.config.reducer,
valueScale: this._valueScaleFor(point),
}),
pendingObservations: [],
inFlightMetadata: new Set(),
inFlightObservations: 0,
backoffFailures: 0,
};
this.streams.set(point.streamKey, state);
} else {
state.meta = { ...state.meta, ...point };
}
return state;
}
_handleObservationCandidate(state, candidate) {
if (this._isResolved(state)) {
state.inFlightObservations += 1;
return [observationRequest(this.config, candidate, state.ids.datastream, state.ids.featureOfInterest)];
}
this._enqueuePendingObservation(state, candidate);
return this._continueMetadata(state);
}
_continueMetadata(state) {
for (const kind of METADATA_ORDER) {
if (state.ids[kind]) continue;
if (state.inFlightMetadata.has(kind)) return [];
const request = lookupRequest(this.config, state.meta, kind);
state.inFlightMetadata.add(kind);
return [request];
}
return [];
}
_drainPendingObservations(state) {
if (!this._isResolved(state)) return [];
const pending = state.pendingObservations.splice(0);
state.inFlightObservations += pending.length;
return pending.map((candidate) => observationRequest(this.config, candidate, state.ids.datastream, state.ids.featureOfInterest));
}
_enqueuePendingObservation(state, candidate) {
if (state.pendingObservations.length === 0) {
state.pendingObservations.push(candidate);
return;
}
if (state.pendingObservations.length === 1) {
state.pendingObservations.push(candidate);
return;
}
state.pendingObservations[1] = candidate;
}
_isResolved(state) {
return METADATA_ORDER.every((kind) => state.ids[kind] !== undefined && state.ids[kind] !== null);
}
_valueScaleFor(point) {
const configured = this.config.reducer.valueScaleByType || DEFAULT_VALUE_SCALE_BY_TYPE;
return configured[point.type] || this.config.reducer.valueScale || 1;
}
_diagnostic(topic, state, msg) {
return {
topic,
payload: {
streamKey: state.meta.streamKey,
requestId: msg.requestId,
statusCode: msg.statusCode,
kind: msg._coreSync?.kind,
action: msg._coreSync?.action,
},
};
}
}
module.exports = {
DEFAULT_VALUE_SCALE_BY_TYPE,
CoreSyncDomain,
};

201
src/frostRequests.js Normal file
View File

@@ -0,0 +1,201 @@
'use strict';
const METADATA_ORDER = ['thing', 'observedProperty', 'sensor', 'featureOfInterest', 'datastream'];
function trimBaseUrl(baseUrl) {
return String(baseUrl || '').replace(/\/+$/, '');
}
function pathUrl(config, path) {
const base = trimBaseUrl(config.frostBaseUrl);
const version = String(config.serviceVersion || 'v1.1').replace(/^\/+|\/+$/g, '');
return `${base}/${version}${path}`;
}
function quote(value) {
return `'${String(value).replace(/'/g, "''")}'`;
}
function requestEnvelope(config, meta, method, path, payload, extra = {}) {
const headers = {
Accept: 'application/json',
};
if (method === 'POST' || method === 'PATCH') {
headers['Content-Type'] = 'application/json';
headers.Prefer = 'return=representation';
}
return {
topic: extra.topic || `frost.metadata.${extra.action || 'request'}`,
requestId: extra.requestId,
_coreSync: {
kind: extra.kind,
action: extra.action,
externalKey: extra.externalKey,
streamKey: meta.streamKey,
nextKind: extra.nextKind,
},
method,
url: pathUrl(config, path),
headers,
payload: payload === undefined ? null : payload,
};
}
function unitOfMeasurement(unit) {
return {
name: unit,
symbol: unit,
definition: 'http://www.opengis.net/def/uom/UCUM/',
};
}
function lookupRequest(config, meta, kind) {
const externalKey = externalKeyFor(kind, meta);
const entitySet = entitySetFor(kind);
const filter = encodeURIComponent(`properties/externalKey eq ${quote(externalKey)}`);
return requestEnvelope(config, meta, 'GET', `/${entitySet}?$filter=${filter}&$top=1`, null, {
topic: 'frost.metadata.lookup',
requestId: `${externalKey}:lookup`,
kind,
action: 'lookup',
externalKey,
});
}
function createRequest(config, meta, kind, ids = {}) {
const externalKey = externalKeyFor(kind, meta);
return requestEnvelope(config, meta, 'POST', `/${entitySetFor(kind)}`, createPayload(kind, meta, ids), {
topic: 'frost.metadata.create',
requestId: `${externalKey}:create`,
kind,
action: 'create',
externalKey,
});
}
function observationRequest(config, candidate, datastreamId, foiId) {
const meta = candidate.point;
return requestEnvelope(
config,
meta,
'POST',
`/Datastreams(${datastreamId})/Observations`,
{
phenomenonTime: meta.phenomenonTime,
result: meta.value,
FeatureOfInterest: { '@iot.id': foiId },
parameters: {
reduction: 'knot',
reductionReason: candidate.reason,
evolvFieldKey: meta.fieldKey,
evolvStreamKey: meta.streamKey,
sourceMeasurement: meta.measurement,
},
},
{
topic: 'frost.observation.create',
requestId: `observation:${meta.streamKey}:${meta.phenomenonTime}`,
kind: 'observation',
action: 'create',
externalKey: `observation:${meta.streamKey}`,
},
);
}
function entitySetFor(kind) {
return {
thing: 'Things',
observedProperty: 'ObservedProperties',
sensor: 'Sensors',
featureOfInterest: 'FeaturesOfInterest',
datastream: 'Datastreams',
}[kind];
}
function externalKeyFor(kind, meta) {
return {
thing: meta.thingExternalKey,
observedProperty: meta.observedPropertyExternalKey,
sensor: meta.sensorExternalKey,
featureOfInterest: meta.featureOfInterestExternalKey,
datastream: meta.datastreamExternalKey,
}[kind];
}
function createPayload(kind, meta, ids) {
const builders = {
thing: () => ({
name: meta.thingTag,
description: `EVOLV Thing ${meta.thingTag}`,
properties: {
externalKey: meta.thingExternalKey,
source: 'EVOLV',
},
}),
observedProperty: () => ({
name: meta.type,
definition: `urn:evolv:observedProperty:${meta.type}`,
description: `EVOLV observed property ${meta.type}`,
properties: {
externalKey: meta.observedPropertyExternalKey,
},
}),
sensor: () => ({
name: meta.sensorTag,
encodingType: 'application/json',
metadata: JSON.stringify({ source: 'EVOLV', variant: meta.variant }),
properties: {
externalKey: meta.sensorExternalKey,
variant: meta.variant,
},
}),
featureOfInterest: () => ({
name: `${meta.thingTag}:${meta.position}`,
description: `EVOLV feature of interest ${meta.position}`,
encodingType: 'application/vnd.geo+json',
feature: { type: 'Point', coordinates: [0, 0] },
properties: {
externalKey: meta.featureOfInterestExternalKey,
position: meta.position,
thingTag: meta.thingTag,
},
}),
datastream: () => ({
name: `${meta.thingTag} ${meta.type} ${meta.variant} ${meta.position} ${meta.sensorTag}`,
description: `EVOLV stream ${meta.streamKey}`,
observationType: 'http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement',
unitOfMeasurement: unitOfMeasurement(meta.unit),
Thing: { '@iot.id': ids.thing },
ObservedProperty: { '@iot.id': ids.observedProperty },
Sensor: { '@iot.id': ids.sensor },
properties: {
externalKey: meta.datastreamExternalKey,
streamKey: meta.streamKey,
position: meta.position,
variant: meta.variant,
sensorTag: meta.sensorTag,
},
}),
};
return builders[kind]();
}
function extractEntityId(payload) {
if (!payload) return null;
if (payload['@iot.id'] !== undefined) return payload['@iot.id'];
if (Array.isArray(payload.value) && payload.value[0] && payload.value[0]['@iot.id'] !== undefined) {
return payload.value[0]['@iot.id'];
}
return null;
}
module.exports = {
METADATA_ORDER,
createRequest,
entitySetFor,
extractEntityId,
externalKeyFor,
lookupRequest,
observationRequest,
pathUrl,
};

76
src/identity.js Normal file
View File

@@ -0,0 +1,76 @@
'use strict';
const DEFAULT_UNITS = {
pressure: 'Pa',
flow: 'm3/s',
power: 'W',
temperature: 'K',
density: 'kg/m3',
level: 'm',
volume: 'm3',
control: '1',
percentage: '1',
efficiency: '1',
};
function firstPresent(...values) {
for (const value of values) {
if (value !== undefined && value !== null && value !== '') return value;
}
return undefined;
}
function sanitizeToken(value, fallback) {
const raw = firstPresent(value, fallback);
return String(raw || '').trim();
}
function parseFieldKey(fieldKey) {
const parts = String(fieldKey || '').split('.').filter(Boolean);
return {
type: parts[0] || 'value',
variant: parts[1] || 'measured',
position: parts[2] || 'atEquipment',
sensorTag: parts[3] || undefined,
};
}
function resolveIdentity(input, options = {}) {
const tags = input.tags || {};
const parsed = parseFieldKey(input.fieldKey);
const thingTag = sanitizeToken(
options.assetTagOverride,
tags.tagcode || tags.tagCode || tags.asset_tagCode || tags.asset_tagcode || input.measurement,
);
const type = sanitizeToken(input.type, parsed.type).toLowerCase();
const variant = sanitizeToken(input.variant, parsed.variant).toLowerCase();
const position = sanitizeToken(input.position, parsed.position);
const sensorTag = sanitizeToken(
options.sensorTagOverride,
parsed.sensorTag || tags.sensorTag || tags.sensor_tagCode || `${variant.toUpperCase()}-${thingTag}`,
);
const unit = sanitizeToken(input.unit, tags.unit || input.source?.unit || DEFAULT_UNITS[type] || '1');
const streamKey = [thingTag, type, variant, position, sensorTag].join(':');
return {
thingTag,
type,
variant,
position,
sensorTag,
unit,
fieldKey: input.fieldKey,
streamKey,
thingExternalKey: `thing:${thingTag}`,
observedPropertyExternalKey: `observedProperty:${type}`,
sensorExternalKey: `sensor:${sensorTag}`,
featureOfInterestExternalKey: `foi:${thingTag}:${position}`,
datastreamExternalKey: `datastream:${streamKey}`,
};
}
module.exports = {
DEFAULT_UNITS,
parseFieldKey,
resolveIdentity,
};

54
src/normalizer.js Normal file
View File

@@ -0,0 +1,54 @@
'use strict';
const { resolveIdentity } = require('./identity');
function normalizeTimestamp(value) {
if (value instanceof Date) return value;
if (typeof value === 'number') return new Date(value);
if (typeof value === 'string' && value.trim()) return new Date(value);
return new Date();
}
function unwrapPayload(msg) {
if (msg && msg.payload !== undefined) return msg.payload;
return msg;
}
function normalizeOne(payload, options = {}) {
if (!payload || typeof payload !== 'object') return [];
if (!payload.fields || typeof payload.fields !== 'object') return [];
const timestamp = normalizeTimestamp(payload.timestamp || payload.time || payload.ts);
const tags = payload.tags || {};
return Object.entries(payload.fields)
.filter(([, value]) => typeof value === 'number' && Number.isFinite(value))
.map(([fieldKey, value]) => {
const identity = resolveIdentity({
measurement: payload.measurement,
fieldKey,
tags,
source: payload.source,
}, options);
return {
...identity,
value,
time: timestamp,
phenomenonTime: timestamp.toISOString(),
measurement: payload.measurement,
tags,
source: payload.source || {},
};
});
}
function normalizeInput(msg, options = {}) {
const payload = unwrapPayload(msg);
const payloads = Array.isArray(payload) ? payload : [payload];
return payloads.flatMap((item) => normalizeOne(item, options));
}
module.exports = {
normalizeInput,
normalizeOne,
normalizeTimestamp,
};

106
src/reducer.js Normal file
View File

@@ -0,0 +1,106 @@
'use strict';
const DEFAULTS = {
angleToleranceDeg: 5,
timeScaleMs: 60000,
valueScale: 1,
maxGapMs: 300000,
minDeltaTimeMs: 0,
minDeltaValue: 0,
comparisonMode: 'angle',
relativeSlopeTolerance: 0.1,
};
function toMs(point) {
return point.time instanceof Date ? point.time.getTime() : new Date(point.time).getTime();
}
function angleDeg(from, to, options) {
const dt = toMs(to) - toMs(from);
const dx = dt / Math.max(Number(options.timeScaleMs) || DEFAULTS.timeScaleMs, 1);
const dy = (Number(to.value) - Number(from.value)) / Math.max(Math.abs(Number(options.valueScale) || DEFAULTS.valueScale), Number.EPSILON);
return Math.atan2(dy, dx) * 180 / Math.PI;
}
function angleDiff(a, b) {
let diff = Math.abs(a - b) % 360;
if (diff > 180) diff = 360 - diff;
return diff;
}
function relativeSlope(from, to, options) {
const dt = Math.max(toMs(to) - toMs(from), 1);
return ((Number(to.value) - Number(from.value)) / Math.max(Math.abs(Number(options.valueScale) || DEFAULTS.valueScale), Number.EPSILON)) / dt;
}
class PointReducer {
constructor(options = {}) {
this.options = { ...DEFAULTS, ...options };
this.anchor = null;
this.previous = null;
}
offer(point) {
const output = [];
if (!point || typeof point.value !== 'number' || !Number.isFinite(point.value)) return output;
if (!this.anchor) {
this.anchor = point;
this.previous = point;
output.push({ point, reason: 'first' });
return output;
}
const minDeltaTimeMs = Number(this.options.minDeltaTimeMs) || 0;
const minDeltaValue = Number(this.options.minDeltaValue) || 0;
const dtFromPrevious = toMs(point) - toMs(this.previous);
const dvFromPrevious = Math.abs(Number(point.value) - Number(this.previous.value));
if (dtFromPrevious < minDeltaTimeMs && dvFromPrevious < minDeltaValue) {
this.previous = point;
return output;
}
const maxGapMs = Number(this.options.maxGapMs) || 0;
if (maxGapMs > 0 && this.previous !== this.anchor && toMs(point) - toMs(this.anchor) >= maxGapMs) {
output.push({ point: this.previous, reason: 'max-gap' });
this.anchor = this.previous;
}
if (this.previous !== this.anchor && this._changedDirection(point)) {
output.push({ point: this.previous, reason: this.options.comparisonMode === 'relative-slope' ? 'slope-change' : 'angle-change' });
this.anchor = this.previous;
}
this.previous = point;
return output;
}
flush(reason = 'flush') {
if (this.previous && this.previous !== this.anchor) {
const point = this.previous;
this.anchor = point;
return [{ point, reason }];
}
return [];
}
_changedDirection(point) {
if (this.options.comparisonMode === 'relative-slope') {
const left = relativeSlope(this.anchor, this.previous, this.options);
const right = relativeSlope(this.previous, point, this.options);
const denominator = Math.max(Math.abs(left), Number.EPSILON);
return Math.abs(right - left) / denominator > Number(this.options.relativeSlopeTolerance);
}
const left = angleDeg(this.anchor, this.previous, this.options);
const right = angleDeg(this.previous, point, this.options);
return angleDiff(left, right) > Number(this.options.angleToleranceDeg);
}
}
module.exports = {
DEFAULTS,
PointReducer,
angleDeg,
angleDiff,
};

View File

@@ -0,0 +1,105 @@
const assert = require('node:assert/strict');
const { normalizeInput } = require('../../src/normalizer');
const { PointReducer } = require('../../src/reducer');
const { CoreSyncDomain } = require('../../src/coreSyncDomain');
function telemetry(timestamp, value) {
return {
payload: {
measurement: 'P-1',
fields: {
'pressure.measured.upstream.PT-1': value,
},
tags: {
tagcode: 'P-1',
},
timestamp,
},
};
}
test('normalizer maps EVOLV dbase payloads to stable stream identities', () => {
const points = normalizeInput(telemetry('2026-05-19T10:15:30.000Z', 12345));
assert.equal(points.length, 1);
assert.equal(points[0].thingTag, 'P-1');
assert.equal(points[0].type, 'pressure');
assert.equal(points[0].variant, 'measured');
assert.equal(points[0].position, 'upstream');
assert.equal(points[0].sensorTag, 'PT-1');
assert.equal(points[0].streamKey, 'P-1:pressure:measured:upstream:PT-1');
assert.equal(points[0].unit, 'Pa');
});
test('point reducer keeps first point and previous point on angle change', () => {
const reducer = new PointReducer({
angleToleranceDeg: 5,
timeScaleMs: 60000,
valueScale: 1,
maxGapMs: 0,
});
const p1 = { time: new Date('2026-05-19T10:00:00.000Z'), value: 0 };
const p2 = { time: new Date('2026-05-19T10:01:00.000Z'), value: 1 };
const p3 = { time: new Date('2026-05-19T10:02:00.000Z'), value: 1 };
assert.deepEqual(reducer.offer(p1).map((x) => x.reason), ['first']);
assert.deepEqual(reducer.offer(p2), []);
const changed = reducer.offer(p3);
assert.equal(changed.length, 1);
assert.equal(changed[0].reason, 'angle-change');
assert.equal(changed[0].point, p2);
});
test('coresync emits metadata lookup first and drains observations after resolver completes', () => {
const hub = new CoreSyncDomain({
frostBaseUrl: 'http://frost.example/FROST-Server',
serviceVersion: 'v1.1',
});
const first = hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 12345));
assert.equal(first[1].length, 1);
assert.equal(first[1][0].topic, 'frost.metadata.lookup');
assert.equal(first[1][0]._coreSync.kind, 'thing');
const streamKey = 'P-1:pressure:measured:upstream:PT-1';
const kinds = [
['thing', 1],
['observedProperty', 2],
['sensor', 3],
['featureOfInterest', 4],
['datastream', 5],
];
let output;
for (const [kind, id] of kinds) {
output = hub.handleMessage({
topic: 'frost.response',
statusCode: 200,
payload: { value: [{ '@iot.id': id }] },
_coreSync: { kind, action: 'lookup', streamKey },
});
}
assert.equal(output[1].length, 1);
assert.equal(output[1][0].topic, 'frost.observation.create');
assert.equal(output[1][0].method, 'POST');
assert.equal(output[1][0].url, 'http://frost.example/FROST-Server/v1.1/Datastreams(5)/Observations');
assert.equal(output[1][0].payload.FeatureOfInterest['@iot.id'], 4);
assert.equal(output[1][0].payload.parameters.reductionReason, 'first');
});
test('pending observation queue keeps first and latest unresolved points', () => {
const hub = new CoreSyncDomain({});
hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 1));
hub.handleMessage(telemetry('2026-05-19T10:01:00.000Z', 2));
hub.handleMessage({ topic: 'coresync.flush' });
hub.handleMessage(telemetry('2026-05-19T10:02:00.000Z', 3));
hub.handleMessage({ topic: 'coresync.flush' });
const state = hub.streams.get('P-1:pressure:measured:upstream:PT-1');
assert.equal(state.pendingObservations.length, 2);
assert.equal(state.pendingObservations[0].point.value, 1);
assert.equal(state.pendingObservations[1].point.value, 3);
});