Files
EVOLV/scripts/e2e-reactor-roundtrip.js

270 lines
8.9 KiB
JavaScript

#!/usr/bin/env node
/**
* E2E reactor round-trip test:
* Node-RED -> InfluxDB -> Grafana proxy query
*/
const fs = require('node:fs');
const path = require('node:path');
const NR_URL = process.env.NR_URL || 'http://localhost:1880';
const INFLUX_URL = process.env.INFLUX_URL || 'http://localhost:8086';
const GRAFANA_URL = process.env.GRAFANA_URL || 'http://localhost:3000';
const GRAFANA_USER = process.env.GRAFANA_USER || 'admin';
const GRAFANA_PASSWORD = process.env.GRAFANA_PASSWORD || 'evolv';
const INFLUX_ORG = process.env.INFLUX_ORG || 'evolv';
const INFLUX_BUCKET = process.env.INFLUX_BUCKET || 'telemetry';
const INFLUX_TOKEN = process.env.INFLUX_TOKEN || 'evolv-dev-token';
const GRAFANA_DS_UID = process.env.GRAFANA_DS_UID || 'cdzg44tv250jkd';
const FLOW_FILE = path.join(__dirname, '..', 'docker', 'demo-flow.json');
const REQUIRE_GRAFANA_DASHBOARDS = process.env.REQUIRE_GRAFANA_DASHBOARDS === '1';
const REACTOR_MEASUREMENTS = [
'reactor_demo_reactor_z1',
'reactor_demo_reactor_z2',
'reactor_demo_reactor_z3',
'reactor_demo_reactor_z4',
];
const REACTOR_MEASUREMENT = REACTOR_MEASUREMENTS[3];
const QUERY_TIMEOUT_MS = 90000;
const POLL_INTERVAL_MS = 3000;
const REQUIRED_DASHBOARD_TITLES = ['Bioreactor Z1', 'Bioreactor Z2', 'Bioreactor Z3', 'Bioreactor Z4', 'Settler S1'];
async function wait(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function fetchJson(url, options = {}) {
const response = await fetch(url, options);
const text = await response.text();
let body = null;
if (text) {
try {
body = JSON.parse(text);
} catch {
body = text;
}
}
return { response, body, text };
}
async function assertReachable() {
const checks = [
[`${NR_URL}/settings`, 'Node-RED'],
[`${INFLUX_URL}/health`, 'InfluxDB'],
[`${GRAFANA_URL}/api/health`, 'Grafana'],
];
for (const [url, label] of checks) {
const { response, text } = await fetchJson(url, {
headers: label === 'Grafana'
? { Authorization: `Basic ${Buffer.from(`${GRAFANA_USER}:${GRAFANA_PASSWORD}`).toString('base64')}` }
: undefined,
});
if (!response.ok) {
throw new Error(`${label} not reachable at ${url} (${response.status}): ${text}`);
}
console.log(`PASS: ${label} reachable`);
}
}
async function deployDemoFlow() {
const flow = JSON.parse(fs.readFileSync(FLOW_FILE, 'utf8'));
const { response, text } = await fetchJson(`${NR_URL}/flows`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Node-RED-Deployment-Type': 'full',
},
body: JSON.stringify(flow),
});
if (!(response.status === 200 || response.status === 204)) {
throw new Error(`Flow deploy failed (${response.status}): ${text}`);
}
console.log(`PASS: Demo flow deployed (${response.status})`);
}
async function queryInfluxCsv(query) {
const response = await fetch(`${INFLUX_URL}/api/v2/query?org=${encodeURIComponent(INFLUX_ORG)}`, {
method: 'POST',
headers: {
Authorization: `Token ${INFLUX_TOKEN}`,
'Content-Type': 'application/json',
Accept: 'application/csv',
},
body: JSON.stringify({ query }),
});
const text = await response.text();
if (!response.ok) {
throw new Error(`Influx query failed (${response.status}): ${text}`);
}
return text;
}
function countCsvDataRows(csvText) {
return csvText
.split('\n')
.map((line) => line.trim())
.filter((line) => line && !line.startsWith('#') && line.includes(','))
.length;
}
async function waitForReactorTelemetry() {
const deadline = Date.now() + QUERY_TIMEOUT_MS;
while (Date.now() < deadline) {
const counts = {};
for (const measurement of REACTOR_MEASUREMENTS) {
const query = `
from(bucket: "${INFLUX_BUCKET}")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "${measurement}")
|> limit(n: 20)
`.trim();
counts[measurement] = countCsvDataRows(await queryInfluxCsv(query));
}
const missing = Object.entries(counts)
.filter(([, rows]) => rows === 0)
.map(([measurement]) => measurement);
if (missing.length === 0) {
const summary = Object.entries(counts)
.map(([measurement, rows]) => `${measurement}=${rows}`)
.join(', ');
console.log(`PASS: Reactor telemetry reached InfluxDB (${summary})`);
return;
}
console.log(`WAIT: reactor telemetry not yet present in InfluxDB for ${missing.join(', ')}`);
await wait(POLL_INTERVAL_MS);
}
throw new Error(`Timed out waiting for reactor telemetry measurements ${REACTOR_MEASUREMENTS.join(', ')}`);
}
async function assertGrafanaDatasource() {
const auth = `Basic ${Buffer.from(`${GRAFANA_USER}:${GRAFANA_PASSWORD}`).toString('base64')}`;
const { response, body, text } = await fetchJson(`${GRAFANA_URL}/api/datasources/uid/${GRAFANA_DS_UID}`, {
headers: { Authorization: auth },
});
if (!response.ok) {
throw new Error(`Grafana datasource lookup failed (${response.status}): ${text}`);
}
if (body?.uid !== GRAFANA_DS_UID) {
throw new Error(`Grafana datasource UID mismatch: expected ${GRAFANA_DS_UID}, got ${body?.uid}`);
}
console.log(`PASS: Grafana datasource ${GRAFANA_DS_UID} is present`);
}
async function queryGrafanaDatasource() {
const auth = `Basic ${Buffer.from(`${GRAFANA_USER}:${GRAFANA_PASSWORD}`).toString('base64')}`;
const response = await fetch(`${GRAFANA_URL}/api/ds/query`, {
method: 'POST',
headers: {
Authorization: auth,
'Content-Type': 'application/json',
},
body: JSON.stringify({
from: 'now-15m',
to: 'now',
queries: [
{
refId: 'A',
datasource: { uid: GRAFANA_DS_UID, type: 'influxdb' },
query: `
from(bucket: "${INFLUX_BUCKET}")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "${REACTOR_MEASUREMENT}" and r._field == "S_O")
|> last()
`.trim(),
rawQuery: true,
intervalMs: 1000,
maxDataPoints: 100,
}
],
}),
});
const text = await response.text();
if (!response.ok) {
throw new Error(`Grafana datasource query failed (${response.status}): ${text}`);
}
const body = JSON.parse(text);
const frames = body?.results?.A?.frames || [];
if (frames.length === 0) {
throw new Error('Grafana datasource query returned no reactor frames');
}
console.log(`PASS: Grafana can query reactor telemetry through datasource (${frames.length} frame(s))`);
}
async function waitForGrafanaDashboards(timeoutMs = QUERY_TIMEOUT_MS) {
const deadline = Date.now() + timeoutMs;
const auth = `Basic ${Buffer.from(`${GRAFANA_USER}:${GRAFANA_PASSWORD}`).toString('base64')}`;
while (Date.now() < deadline) {
const response = await fetch(`${GRAFANA_URL}/api/search?query=`, {
headers: { Authorization: auth },
});
const text = await response.text();
if (!response.ok) {
throw new Error(`Grafana dashboard search failed (${response.status}): ${text}`);
}
const results = JSON.parse(text);
const titles = new Set(results.map((item) => item.title));
const missing = REQUIRED_DASHBOARD_TITLES.filter((title) => !titles.has(title));
const pumpingStationCount = results.filter((item) => item.title === 'pumpingStation').length;
if (missing.length === 0 && pumpingStationCount >= 3) {
console.log(`PASS: Grafana dashboards created (${REQUIRED_DASHBOARD_TITLES.join(', ')} + ${pumpingStationCount} pumpingStation dashboards)`);
return;
}
const missingParts = [];
if (missing.length > 0) {
missingParts.push(`missing titled dashboards: ${missing.join(', ')}`);
}
if (pumpingStationCount < 3) {
missingParts.push(`pumpingStation dashboards=${pumpingStationCount}`);
}
console.log(`WAIT: Grafana dashboards not ready: ${missingParts.join(' | ')}`);
await wait(POLL_INTERVAL_MS);
}
throw new Error(`Timed out waiting for Grafana dashboards: ${REQUIRED_DASHBOARD_TITLES.join(', ')} and >=3 pumpingStation dashboards`);
}
async function main() {
console.log('=== EVOLV Reactor E2E Round Trip ===');
await assertReachable();
await deployDemoFlow();
console.log('WAIT: allowing Node-RED inject/tick loops to populate telemetry');
await wait(12000);
await waitForReactorTelemetry();
await assertGrafanaDatasource();
await queryGrafanaDatasource();
if (REQUIRE_GRAFANA_DASHBOARDS) {
await waitForGrafanaDashboards();
console.log('PASS: Node-RED -> InfluxDB -> Grafana round trip is working for reactor telemetry and dashboard generation');
return;
}
try {
await waitForGrafanaDashboards(15000);
console.log('PASS: Node-RED -> InfluxDB -> Grafana round trip is working for reactor telemetry and dashboard generation');
} catch (error) {
console.warn(`WARN: Grafana dashboard auto-generation is not ready yet: ${error.message}`);
console.log('PASS: Node-RED -> InfluxDB -> Grafana round trip is working for live reactor telemetry');
}
}
main().catch((error) => {
console.error(`FAIL: ${error.message}`);
process.exit(1);
});