#!/usr/bin/env node /** * E2E reactor round-trip test: * Node-RED -> InfluxDB -> Grafana proxy query */ const fs = require('node:fs'); const path = require('node:path'); const NR_URL = process.env.NR_URL || 'http://localhost:1880'; const INFLUX_URL = process.env.INFLUX_URL || 'http://localhost:8086'; const GRAFANA_URL = process.env.GRAFANA_URL || 'http://localhost:3000'; const GRAFANA_USER = process.env.GRAFANA_USER || 'admin'; const GRAFANA_PASSWORD = process.env.GRAFANA_PASSWORD || 'evolv'; const INFLUX_ORG = process.env.INFLUX_ORG || 'evolv'; const INFLUX_BUCKET = process.env.INFLUX_BUCKET || 'telemetry'; const INFLUX_TOKEN = process.env.INFLUX_TOKEN || 'evolv-dev-token'; const GRAFANA_DS_UID = process.env.GRAFANA_DS_UID || 'cdzg44tv250jkd'; const FLOW_FILE = path.join(__dirname, '..', 'docker', 'demo-flow.json'); const REQUIRE_GRAFANA_DASHBOARDS = process.env.REQUIRE_GRAFANA_DASHBOARDS === '1'; const REACTOR_MEASUREMENTS = [ 'reactor_demo_reactor_z1', 'reactor_demo_reactor_z2', 'reactor_demo_reactor_z3', 'reactor_demo_reactor_z4', ]; const REACTOR_MEASUREMENT = REACTOR_MEASUREMENTS[3]; const QUERY_TIMEOUT_MS = 90000; const POLL_INTERVAL_MS = 3000; const REQUIRED_DASHBOARD_TITLES = ['Bioreactor Z1', 'Bioreactor Z2', 'Bioreactor Z3', 'Bioreactor Z4', 'Settler S1']; async function wait(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } async function fetchJson(url, options = {}) { const response = await fetch(url, options); const text = await response.text(); let body = null; if (text) { try { body = JSON.parse(text); } catch { body = text; } } return { response, body, text }; } async function assertReachable() { const checks = [ [`${NR_URL}/settings`, 'Node-RED'], [`${INFLUX_URL}/health`, 'InfluxDB'], [`${GRAFANA_URL}/api/health`, 'Grafana'], ]; for (const [url, label] of checks) { const { response, text } = await fetchJson(url, { headers: label === 'Grafana' ? { Authorization: `Basic ${Buffer.from(`${GRAFANA_USER}:${GRAFANA_PASSWORD}`).toString('base64')}` } : undefined, }); if (!response.ok) { throw new Error(`${label} not reachable at ${url} (${response.status}): ${text}`); } console.log(`PASS: ${label} reachable`); } } async function deployDemoFlow() { const flow = JSON.parse(fs.readFileSync(FLOW_FILE, 'utf8')); const { response, text } = await fetchJson(`${NR_URL}/flows`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Node-RED-Deployment-Type': 'full', }, body: JSON.stringify(flow), }); if (!(response.status === 200 || response.status === 204)) { throw new Error(`Flow deploy failed (${response.status}): ${text}`); } console.log(`PASS: Demo flow deployed (${response.status})`); } async function queryInfluxCsv(query) { const response = await fetch(`${INFLUX_URL}/api/v2/query?org=${encodeURIComponent(INFLUX_ORG)}`, { method: 'POST', headers: { Authorization: `Token ${INFLUX_TOKEN}`, 'Content-Type': 'application/json', Accept: 'application/csv', }, body: JSON.stringify({ query }), }); const text = await response.text(); if (!response.ok) { throw new Error(`Influx query failed (${response.status}): ${text}`); } return text; } function countCsvDataRows(csvText) { return csvText .split('\n') .map((line) => line.trim()) .filter((line) => line && !line.startsWith('#') && line.includes(',')) .length; } async function waitForReactorTelemetry() { const deadline = Date.now() + QUERY_TIMEOUT_MS; while (Date.now() < deadline) { const counts = {}; for (const measurement of REACTOR_MEASUREMENTS) { const query = ` from(bucket: "${INFLUX_BUCKET}") |> range(start: -15m) |> filter(fn: (r) => r._measurement == "${measurement}") |> limit(n: 20) `.trim(); counts[measurement] = countCsvDataRows(await queryInfluxCsv(query)); } const missing = Object.entries(counts) .filter(([, rows]) => rows === 0) .map(([measurement]) => measurement); if (missing.length === 0) { const summary = Object.entries(counts) .map(([measurement, rows]) => `${measurement}=${rows}`) .join(', '); console.log(`PASS: Reactor telemetry reached InfluxDB (${summary})`); return; } console.log(`WAIT: reactor telemetry not yet present in InfluxDB for ${missing.join(', ')}`); await wait(POLL_INTERVAL_MS); } throw new Error(`Timed out waiting for reactor telemetry measurements ${REACTOR_MEASUREMENTS.join(', ')}`); } async function assertGrafanaDatasource() { const auth = `Basic ${Buffer.from(`${GRAFANA_USER}:${GRAFANA_PASSWORD}`).toString('base64')}`; const { response, body, text } = await fetchJson(`${GRAFANA_URL}/api/datasources/uid/${GRAFANA_DS_UID}`, { headers: { Authorization: auth }, }); if (!response.ok) { throw new Error(`Grafana datasource lookup failed (${response.status}): ${text}`); } if (body?.uid !== GRAFANA_DS_UID) { throw new Error(`Grafana datasource UID mismatch: expected ${GRAFANA_DS_UID}, got ${body?.uid}`); } console.log(`PASS: Grafana datasource ${GRAFANA_DS_UID} is present`); } async function queryGrafanaDatasource() { const auth = `Basic ${Buffer.from(`${GRAFANA_USER}:${GRAFANA_PASSWORD}`).toString('base64')}`; const response = await fetch(`${GRAFANA_URL}/api/ds/query`, { method: 'POST', headers: { Authorization: auth, 'Content-Type': 'application/json', }, body: JSON.stringify({ from: 'now-15m', to: 'now', queries: [ { refId: 'A', datasource: { uid: GRAFANA_DS_UID, type: 'influxdb' }, query: ` from(bucket: "${INFLUX_BUCKET}") |> range(start: -15m) |> filter(fn: (r) => r._measurement == "${REACTOR_MEASUREMENT}" and r._field == "S_O") |> last() `.trim(), rawQuery: true, intervalMs: 1000, maxDataPoints: 100, } ], }), }); const text = await response.text(); if (!response.ok) { throw new Error(`Grafana datasource query failed (${response.status}): ${text}`); } const body = JSON.parse(text); const frames = body?.results?.A?.frames || []; if (frames.length === 0) { throw new Error('Grafana datasource query returned no reactor frames'); } console.log(`PASS: Grafana can query reactor telemetry through datasource (${frames.length} frame(s))`); } async function waitForGrafanaDashboards(timeoutMs = QUERY_TIMEOUT_MS) { const deadline = Date.now() + timeoutMs; const auth = `Basic ${Buffer.from(`${GRAFANA_USER}:${GRAFANA_PASSWORD}`).toString('base64')}`; while (Date.now() < deadline) { const response = await fetch(`${GRAFANA_URL}/api/search?query=`, { headers: { Authorization: auth }, }); const text = await response.text(); if (!response.ok) { throw new Error(`Grafana dashboard search failed (${response.status}): ${text}`); } const results = JSON.parse(text); const titles = new Set(results.map((item) => item.title)); const missing = REQUIRED_DASHBOARD_TITLES.filter((title) => !titles.has(title)); const pumpingStationCount = results.filter((item) => item.title === 'pumpingStation').length; if (missing.length === 0 && pumpingStationCount >= 3) { console.log(`PASS: Grafana dashboards created (${REQUIRED_DASHBOARD_TITLES.join(', ')} + ${pumpingStationCount} pumpingStation dashboards)`); return; } const missingParts = []; if (missing.length > 0) { missingParts.push(`missing titled dashboards: ${missing.join(', ')}`); } if (pumpingStationCount < 3) { missingParts.push(`pumpingStation dashboards=${pumpingStationCount}`); } console.log(`WAIT: Grafana dashboards not ready: ${missingParts.join(' | ')}`); await wait(POLL_INTERVAL_MS); } throw new Error(`Timed out waiting for Grafana dashboards: ${REQUIRED_DASHBOARD_TITLES.join(', ')} and >=3 pumpingStation dashboards`); } async function main() { console.log('=== EVOLV Reactor E2E Round Trip ==='); await assertReachable(); await deployDemoFlow(); console.log('WAIT: allowing Node-RED inject/tick loops to populate telemetry'); await wait(12000); await waitForReactorTelemetry(); await assertGrafanaDatasource(); await queryGrafanaDatasource(); if (REQUIRE_GRAFANA_DASHBOARDS) { await waitForGrafanaDashboards(); console.log('PASS: Node-RED -> InfluxDB -> Grafana round trip is working for reactor telemetry and dashboard generation'); return; } try { await waitForGrafanaDashboards(15000); console.log('PASS: Node-RED -> InfluxDB -> Grafana round trip is working for reactor telemetry and dashboard generation'); } catch (error) { console.warn(`WARN: Grafana dashboard auto-generation is not ready yet: ${error.message}`); console.log('PASS: Node-RED -> InfluxDB -> Grafana round trip is working for live reactor telemetry'); } } main().catch((error) => { console.error(`FAIL: ${error.message}`); process.exit(1); });