Support config-driven output formatting
This commit is contained in:
@@ -110,6 +110,10 @@ class ConfigManager {
|
|||||||
softwareType: nodeName.toLowerCase(),
|
softwareType: nodeName.toLowerCase(),
|
||||||
positionVsParent: uiConfig.positionVsParent || 'atEquipment',
|
positionVsParent: uiConfig.positionVsParent || 'atEquipment',
|
||||||
distance: uiConfig.hasDistance ? uiConfig.distance : undefined
|
distance: uiConfig.hasDistance ? uiConfig.distance : undefined
|
||||||
|
},
|
||||||
|
output: {
|
||||||
|
process: uiConfig.processOutputFormat || 'process',
|
||||||
|
dbase: uiConfig.dbaseOutputFormat || 'influxdb'
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -13,12 +13,14 @@
|
|||||||
const influxdbFormatter = require('./influxdbFormatter');
|
const influxdbFormatter = require('./influxdbFormatter');
|
||||||
const jsonFormatter = require('./jsonFormatter');
|
const jsonFormatter = require('./jsonFormatter');
|
||||||
const csvFormatter = require('./csvFormatter');
|
const csvFormatter = require('./csvFormatter');
|
||||||
|
const processFormatter = require('./processFormatter');
|
||||||
|
|
||||||
// Built-in registry
|
// Built-in registry
|
||||||
const registry = {
|
const registry = {
|
||||||
influxdb: influxdbFormatter,
|
influxdb: influxdbFormatter,
|
||||||
json: jsonFormatter,
|
json: jsonFormatter,
|
||||||
csv: csvFormatter,
|
csv: csvFormatter,
|
||||||
|
process: processFormatter,
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
9
src/helper/formatters/processFormatter.js
Normal file
9
src/helper/formatters/processFormatter.js
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
/**
|
||||||
|
* Process formatter
|
||||||
|
* Keeps the existing process-port behaviour: emit only changed fields as an object.
|
||||||
|
*/
|
||||||
|
function format(_measurement, metadata) {
|
||||||
|
return metadata.fields;
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = { format };
|
||||||
@@ -1,12 +1,13 @@
|
|||||||
|
const { getFormatter } = require('./formatters');
|
||||||
|
|
||||||
//this class will handle the output events for the node red node
|
//this class will handle the output events for the node red node
|
||||||
class OutputUtils {
|
class OutputUtils {
|
||||||
constructor() {
|
constructor() {
|
||||||
this.output = {};
|
this.output = {};
|
||||||
this.output['influxdb'] = {};
|
|
||||||
this.output['process'] = {};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
checkForChanges(output, format) {
|
checkForChanges(output, format) {
|
||||||
|
this.output[format] = this.output[format] || {};
|
||||||
const changedFields = {};
|
const changedFields = {};
|
||||||
for (const key in output) {
|
for (const key in output) {
|
||||||
if (Object.prototype.hasOwnProperty.call(output, key) && output[key] !== this.output[format][key]) {
|
if (Object.prototype.hasOwnProperty.call(output, key) && output[key] !== this.output[format][key]) {
|
||||||
@@ -27,56 +28,44 @@ class OutputUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
formatMsg(output, config, format) {
|
formatMsg(output, config, format) {
|
||||||
|
|
||||||
//define emtpy message
|
|
||||||
let msg = {};
|
let msg = {};
|
||||||
|
|
||||||
// Compare output with last output and only include changed values
|
// Compare output with last output and only include changed values
|
||||||
const changedFields = this.checkForChanges(output,format);
|
const changedFields = this.checkForChanges(output,format);
|
||||||
|
|
||||||
if (Object.keys(changedFields).length > 0) {
|
if (Object.keys(changedFields).length > 0) {
|
||||||
|
|
||||||
switch (format) {
|
|
||||||
case 'influxdb': {
|
|
||||||
// Extract the relevant config properties.
|
|
||||||
const relevantConfig = this.extractRelevantConfig(config);
|
|
||||||
// Flatten the tags so that no nested objects are passed on.
|
|
||||||
const flatTags = this.flattenTags(relevantConfig);
|
|
||||||
msg = this.influxDBFormat(changedFields, config, flatTags);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case 'process':
|
|
||||||
|
|
||||||
// Compare output with last output and only include changed values
|
|
||||||
msg = this.processFormat(changedFields,config);
|
|
||||||
//console.log(msg);
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
console.log('Unknown format in output utils');
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return msg;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
influxDBFormat(changedFields, config , flatTags) {
|
|
||||||
// Create the measurement and topic using softwareType and name config.functionality.softwareType + .
|
|
||||||
const measurement = config.general.name;
|
const measurement = config.general.name;
|
||||||
const payload = {
|
const flatTags = this.flattenTags(this.extractRelevantConfig(config));
|
||||||
measurement: measurement,
|
const formatterName = this.resolveFormatterName(config, format);
|
||||||
|
const formatter = getFormatter(formatterName);
|
||||||
|
const payload = formatter.format(measurement, {
|
||||||
fields: changedFields,
|
fields: changedFields,
|
||||||
tags: flatTags,
|
tags: flatTags,
|
||||||
timestamp: new Date(),
|
config,
|
||||||
};
|
channel: format,
|
||||||
|
});
|
||||||
const topic = measurement;
|
msg = this.wrapMessage(measurement, payload);
|
||||||
const msg = { topic: topic, payload: payload };
|
|
||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
resolveFormatterName(config, channel) {
|
||||||
|
const outputConfig = config.output || {};
|
||||||
|
if (channel === 'process') {
|
||||||
|
return outputConfig.process || 'process';
|
||||||
|
}
|
||||||
|
if (channel === 'influxdb') {
|
||||||
|
return outputConfig.dbase || 'influxdb';
|
||||||
|
}
|
||||||
|
return outputConfig[channel] || channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
wrapMessage(measurement, payload) {
|
||||||
|
return {
|
||||||
|
topic: measurement,
|
||||||
|
payload,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
flattenTags(obj) {
|
flattenTags(obj) {
|
||||||
const result = {};
|
const result = {};
|
||||||
@@ -119,15 +108,6 @@ class OutputUtils {
|
|||||||
model: config.asset?.model,
|
model: config.asset?.model,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
processFormat(changedFields,config) {
|
|
||||||
// Create the measurement and topic using softwareType and name config.functionality.softwareType + .
|
|
||||||
const measurement = config.general.name;
|
|
||||||
const payload = changedFields;
|
|
||||||
const topic = measurement;
|
|
||||||
const msg = { topic: topic, payload: payload };
|
|
||||||
return msg;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = OutputUtils;
|
module.exports = OutputUtils;
|
||||||
|
|||||||
@@ -79,6 +79,7 @@ describe('ConfigManager', () => {
|
|||||||
const result = cm.buildConfig('measurement', uiConfig, 'node-id-1');
|
const result = cm.buildConfig('measurement', uiConfig, 'node-id-1');
|
||||||
expect(result).toHaveProperty('general');
|
expect(result).toHaveProperty('general');
|
||||||
expect(result).toHaveProperty('functionality');
|
expect(result).toHaveProperty('functionality');
|
||||||
|
expect(result).toHaveProperty('output');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should populate general.name from uiConfig.name', () => {
|
it('should populate general.name from uiConfig.name', () => {
|
||||||
@@ -168,6 +169,25 @@ describe('ConfigManager', () => {
|
|||||||
expect(result).toHaveProperty('general');
|
expect(result).toHaveProperty('general');
|
||||||
expect(result).toHaveProperty('functionality');
|
expect(result).toHaveProperty('functionality');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should default output formats to process and influxdb', () => {
|
||||||
|
const result = cm.buildConfig('measurement', {}, 'id-1');
|
||||||
|
expect(result.output).toEqual({
|
||||||
|
process: 'process',
|
||||||
|
dbase: 'influxdb',
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should allow output format overrides from ui config', () => {
|
||||||
|
const result = cm.buildConfig('measurement', {
|
||||||
|
processOutputFormat: 'json',
|
||||||
|
dbaseOutputFormat: 'csv',
|
||||||
|
}, 'id-1');
|
||||||
|
expect(result.output).toEqual({
|
||||||
|
process: 'json',
|
||||||
|
dbase: 'csv',
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
// ── createEndpoint() ─────────────────────────────────────────────────
|
// ── createEndpoint() ─────────────────────────────────────────────────
|
||||||
|
|||||||
69
test/outputUtils.test.js
Normal file
69
test/outputUtils.test.js
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
const OutputUtils = require('../src/helper/outputUtils');
|
||||||
|
|
||||||
|
describe('OutputUtils', () => {
|
||||||
|
let outputUtils;
|
||||||
|
let config;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
outputUtils = new OutputUtils();
|
||||||
|
config = {
|
||||||
|
general: {
|
||||||
|
name: 'Pump-1',
|
||||||
|
id: 'node-1',
|
||||||
|
unit: 'm3/h',
|
||||||
|
},
|
||||||
|
functionality: {
|
||||||
|
softwareType: 'pump',
|
||||||
|
role: 'test-role',
|
||||||
|
},
|
||||||
|
asset: {
|
||||||
|
supplier: 'EVOLV',
|
||||||
|
type: 'sensor',
|
||||||
|
},
|
||||||
|
output: {
|
||||||
|
process: 'process',
|
||||||
|
dbase: 'influxdb',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
it('keeps legacy process output by default', () => {
|
||||||
|
const msg = outputUtils.formatMsg({ flow: 12.5 }, config, 'process');
|
||||||
|
expect(msg).toEqual({
|
||||||
|
topic: 'Pump-1',
|
||||||
|
payload: { flow: 12.5 },
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('keeps legacy influxdb output by default', () => {
|
||||||
|
const msg = outputUtils.formatMsg({ flow: 12.5 }, config, 'influxdb');
|
||||||
|
expect(msg.topic).toBe('Pump-1');
|
||||||
|
expect(msg.payload).toEqual(expect.objectContaining({
|
||||||
|
measurement: 'Pump-1',
|
||||||
|
fields: { flow: 12.5 },
|
||||||
|
tags: expect.objectContaining({
|
||||||
|
id: 'node-1',
|
||||||
|
name: 'Pump-1',
|
||||||
|
softwareType: 'pump',
|
||||||
|
}),
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('supports config-driven json formatting on the process channel', () => {
|
||||||
|
config.output.process = 'json';
|
||||||
|
const msg = outputUtils.formatMsg({ flow: 12.5 }, config, 'process');
|
||||||
|
expect(msg.topic).toBe('Pump-1');
|
||||||
|
expect(typeof msg.payload).toBe('string');
|
||||||
|
expect(msg.payload).toContain('"measurement":"Pump-1"');
|
||||||
|
expect(msg.payload).toContain('"flow":12.5');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('supports config-driven csv formatting on the database channel', () => {
|
||||||
|
config.output.dbase = 'csv';
|
||||||
|
const msg = outputUtils.formatMsg({ flow: 12.5 }, config, 'influxdb');
|
||||||
|
expect(msg.topic).toBe('Pump-1');
|
||||||
|
expect(typeof msg.payload).toBe('string');
|
||||||
|
expect(msg.payload).toContain('Pump-1');
|
||||||
|
expect(msg.payload).toContain('flow=12.5');
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user