270 lines
8.9 KiB
JavaScript
270 lines
8.9 KiB
JavaScript
#!/usr/bin/env node
|
|
/**
|
|
* E2E reactor round-trip test:
|
|
* Node-RED -> InfluxDB -> Grafana proxy query
|
|
*/
|
|
|
|
const fs = require('node:fs');
|
|
const path = require('node:path');
|
|
|
|
const NR_URL = process.env.NR_URL || 'http://localhost:1880';
|
|
const INFLUX_URL = process.env.INFLUX_URL || 'http://localhost:8086';
|
|
const GRAFANA_URL = process.env.GRAFANA_URL || 'http://localhost:3000';
|
|
const GRAFANA_USER = process.env.GRAFANA_USER || 'admin';
|
|
const GRAFANA_PASSWORD = process.env.GRAFANA_PASSWORD || 'evolv';
|
|
const INFLUX_ORG = process.env.INFLUX_ORG || 'evolv';
|
|
const INFLUX_BUCKET = process.env.INFLUX_BUCKET || 'telemetry';
|
|
const INFLUX_TOKEN = process.env.INFLUX_TOKEN || 'evolv-dev-token';
|
|
const GRAFANA_DS_UID = process.env.GRAFANA_DS_UID || 'cdzg44tv250jkd';
|
|
const FLOW_FILE = path.join(__dirname, '..', 'docker', 'demo-flow.json');
|
|
const REQUIRE_GRAFANA_DASHBOARDS = process.env.REQUIRE_GRAFANA_DASHBOARDS === '1';
|
|
const REACTOR_MEASUREMENTS = [
|
|
'reactor_demo_reactor_z1',
|
|
'reactor_demo_reactor_z2',
|
|
'reactor_demo_reactor_z3',
|
|
'reactor_demo_reactor_z4',
|
|
];
|
|
const REACTOR_MEASUREMENT = REACTOR_MEASUREMENTS[3];
|
|
const QUERY_TIMEOUT_MS = 90000;
|
|
const POLL_INTERVAL_MS = 3000;
|
|
const REQUIRED_DASHBOARD_TITLES = ['Bioreactor Z1', 'Bioreactor Z2', 'Bioreactor Z3', 'Bioreactor Z4', 'Settler S1'];
|
|
|
|
async function wait(ms) {
|
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
|
}
|
|
|
|
async function fetchJson(url, options = {}) {
|
|
const response = await fetch(url, options);
|
|
const text = await response.text();
|
|
let body = null;
|
|
if (text) {
|
|
try {
|
|
body = JSON.parse(text);
|
|
} catch {
|
|
body = text;
|
|
}
|
|
}
|
|
return { response, body, text };
|
|
}
|
|
|
|
async function assertReachable() {
|
|
const checks = [
|
|
[`${NR_URL}/settings`, 'Node-RED'],
|
|
[`${INFLUX_URL}/health`, 'InfluxDB'],
|
|
[`${GRAFANA_URL}/api/health`, 'Grafana'],
|
|
];
|
|
|
|
for (const [url, label] of checks) {
|
|
const { response, text } = await fetchJson(url, {
|
|
headers: label === 'Grafana'
|
|
? { Authorization: `Basic ${Buffer.from(`${GRAFANA_USER}:${GRAFANA_PASSWORD}`).toString('base64')}` }
|
|
: undefined,
|
|
});
|
|
if (!response.ok) {
|
|
throw new Error(`${label} not reachable at ${url} (${response.status}): ${text}`);
|
|
}
|
|
console.log(`PASS: ${label} reachable`);
|
|
}
|
|
}
|
|
|
|
async function deployDemoFlow() {
|
|
const flow = JSON.parse(fs.readFileSync(FLOW_FILE, 'utf8'));
|
|
const { response, text } = await fetchJson(`${NR_URL}/flows`, {
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'Node-RED-Deployment-Type': 'full',
|
|
},
|
|
body: JSON.stringify(flow),
|
|
});
|
|
|
|
if (!(response.status === 200 || response.status === 204)) {
|
|
throw new Error(`Flow deploy failed (${response.status}): ${text}`);
|
|
}
|
|
console.log(`PASS: Demo flow deployed (${response.status})`);
|
|
}
|
|
|
|
async function queryInfluxCsv(query) {
|
|
const response = await fetch(`${INFLUX_URL}/api/v2/query?org=${encodeURIComponent(INFLUX_ORG)}`, {
|
|
method: 'POST',
|
|
headers: {
|
|
Authorization: `Token ${INFLUX_TOKEN}`,
|
|
'Content-Type': 'application/json',
|
|
Accept: 'application/csv',
|
|
},
|
|
body: JSON.stringify({ query }),
|
|
});
|
|
|
|
const text = await response.text();
|
|
if (!response.ok) {
|
|
throw new Error(`Influx query failed (${response.status}): ${text}`);
|
|
}
|
|
return text;
|
|
}
|
|
|
|
function countCsvDataRows(csvText) {
|
|
return csvText
|
|
.split('\n')
|
|
.map((line) => line.trim())
|
|
.filter((line) => line && !line.startsWith('#') && line.includes(','))
|
|
.length;
|
|
}
|
|
|
|
async function waitForReactorTelemetry() {
|
|
const deadline = Date.now() + QUERY_TIMEOUT_MS;
|
|
|
|
while (Date.now() < deadline) {
|
|
const counts = {};
|
|
for (const measurement of REACTOR_MEASUREMENTS) {
|
|
const query = `
|
|
from(bucket: "${INFLUX_BUCKET}")
|
|
|> range(start: -15m)
|
|
|> filter(fn: (r) => r._measurement == "${measurement}")
|
|
|> limit(n: 20)
|
|
`.trim();
|
|
counts[measurement] = countCsvDataRows(await queryInfluxCsv(query));
|
|
}
|
|
|
|
const missing = Object.entries(counts)
|
|
.filter(([, rows]) => rows === 0)
|
|
.map(([measurement]) => measurement);
|
|
|
|
if (missing.length === 0) {
|
|
const summary = Object.entries(counts)
|
|
.map(([measurement, rows]) => `${measurement}=${rows}`)
|
|
.join(', ');
|
|
console.log(`PASS: Reactor telemetry reached InfluxDB (${summary})`);
|
|
return;
|
|
}
|
|
console.log(`WAIT: reactor telemetry not yet present in InfluxDB for ${missing.join(', ')}`);
|
|
await wait(POLL_INTERVAL_MS);
|
|
}
|
|
|
|
throw new Error(`Timed out waiting for reactor telemetry measurements ${REACTOR_MEASUREMENTS.join(', ')}`);
|
|
}
|
|
|
|
async function assertGrafanaDatasource() {
|
|
const auth = `Basic ${Buffer.from(`${GRAFANA_USER}:${GRAFANA_PASSWORD}`).toString('base64')}`;
|
|
const { response, body, text } = await fetchJson(`${GRAFANA_URL}/api/datasources/uid/${GRAFANA_DS_UID}`, {
|
|
headers: { Authorization: auth },
|
|
});
|
|
|
|
if (!response.ok) {
|
|
throw new Error(`Grafana datasource lookup failed (${response.status}): ${text}`);
|
|
}
|
|
|
|
if (body?.uid !== GRAFANA_DS_UID) {
|
|
throw new Error(`Grafana datasource UID mismatch: expected ${GRAFANA_DS_UID}, got ${body?.uid}`);
|
|
}
|
|
|
|
console.log(`PASS: Grafana datasource ${GRAFANA_DS_UID} is present`);
|
|
}
|
|
|
|
async function queryGrafanaDatasource() {
|
|
const auth = `Basic ${Buffer.from(`${GRAFANA_USER}:${GRAFANA_PASSWORD}`).toString('base64')}`;
|
|
const response = await fetch(`${GRAFANA_URL}/api/ds/query`, {
|
|
method: 'POST',
|
|
headers: {
|
|
Authorization: auth,
|
|
'Content-Type': 'application/json',
|
|
},
|
|
body: JSON.stringify({
|
|
from: 'now-15m',
|
|
to: 'now',
|
|
queries: [
|
|
{
|
|
refId: 'A',
|
|
datasource: { uid: GRAFANA_DS_UID, type: 'influxdb' },
|
|
query: `
|
|
from(bucket: "${INFLUX_BUCKET}")
|
|
|> range(start: -15m)
|
|
|> filter(fn: (r) => r._measurement == "${REACTOR_MEASUREMENT}" and r._field == "S_O")
|
|
|> last()
|
|
`.trim(),
|
|
rawQuery: true,
|
|
intervalMs: 1000,
|
|
maxDataPoints: 100,
|
|
}
|
|
],
|
|
}),
|
|
});
|
|
|
|
const text = await response.text();
|
|
if (!response.ok) {
|
|
throw new Error(`Grafana datasource query failed (${response.status}): ${text}`);
|
|
}
|
|
|
|
const body = JSON.parse(text);
|
|
const frames = body?.results?.A?.frames || [];
|
|
if (frames.length === 0) {
|
|
throw new Error('Grafana datasource query returned no reactor frames');
|
|
}
|
|
|
|
console.log(`PASS: Grafana can query reactor telemetry through datasource (${frames.length} frame(s))`);
|
|
}
|
|
|
|
async function waitForGrafanaDashboards(timeoutMs = QUERY_TIMEOUT_MS) {
|
|
const deadline = Date.now() + timeoutMs;
|
|
const auth = `Basic ${Buffer.from(`${GRAFANA_USER}:${GRAFANA_PASSWORD}`).toString('base64')}`;
|
|
|
|
while (Date.now() < deadline) {
|
|
const response = await fetch(`${GRAFANA_URL}/api/search?query=`, {
|
|
headers: { Authorization: auth },
|
|
});
|
|
const text = await response.text();
|
|
if (!response.ok) {
|
|
throw new Error(`Grafana dashboard search failed (${response.status}): ${text}`);
|
|
}
|
|
|
|
const results = JSON.parse(text);
|
|
const titles = new Set(results.map((item) => item.title));
|
|
const missing = REQUIRED_DASHBOARD_TITLES.filter((title) => !titles.has(title));
|
|
const pumpingStationCount = results.filter((item) => item.title === 'pumpingStation').length;
|
|
if (missing.length === 0 && pumpingStationCount >= 3) {
|
|
console.log(`PASS: Grafana dashboards created (${REQUIRED_DASHBOARD_TITLES.join(', ')} + ${pumpingStationCount} pumpingStation dashboards)`);
|
|
return;
|
|
}
|
|
|
|
const missingParts = [];
|
|
if (missing.length > 0) {
|
|
missingParts.push(`missing titled dashboards: ${missing.join(', ')}`);
|
|
}
|
|
if (pumpingStationCount < 3) {
|
|
missingParts.push(`pumpingStation dashboards=${pumpingStationCount}`);
|
|
}
|
|
console.log(`WAIT: Grafana dashboards not ready: ${missingParts.join(' | ')}`);
|
|
await wait(POLL_INTERVAL_MS);
|
|
}
|
|
|
|
throw new Error(`Timed out waiting for Grafana dashboards: ${REQUIRED_DASHBOARD_TITLES.join(', ')} and >=3 pumpingStation dashboards`);
|
|
}
|
|
|
|
async function main() {
|
|
console.log('=== EVOLV Reactor E2E Round Trip ===');
|
|
await assertReachable();
|
|
await deployDemoFlow();
|
|
console.log('WAIT: allowing Node-RED inject/tick loops to populate telemetry');
|
|
await wait(12000);
|
|
await waitForReactorTelemetry();
|
|
await assertGrafanaDatasource();
|
|
await queryGrafanaDatasource();
|
|
if (REQUIRE_GRAFANA_DASHBOARDS) {
|
|
await waitForGrafanaDashboards();
|
|
console.log('PASS: Node-RED -> InfluxDB -> Grafana round trip is working for reactor telemetry and dashboard generation');
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await waitForGrafanaDashboards(15000);
|
|
console.log('PASS: Node-RED -> InfluxDB -> Grafana round trip is working for reactor telemetry and dashboard generation');
|
|
} catch (error) {
|
|
console.warn(`WARN: Grafana dashboard auto-generation is not ready yet: ${error.message}`);
|
|
console.log('PASS: Node-RED -> InfluxDB -> Grafana round trip is working for live reactor telemetry');
|
|
}
|
|
}
|
|
|
|
main().catch((error) => {
|
|
console.error(`FAIL: ${error.message}`);
|
|
process.exit(1);
|
|
});
|