#!/usr/bin/env node /** * Add monitoring/debug nodes to the demo flow for process visibility. * Adds a function node per PS that logs volume, level, flow rate every 10 ticks. * Also adds a status debug node for the overall system. */ const fs = require('fs'); const path = require('path'); const flowPath = path.join(__dirname, '..', 'docker', 'demo-flow.json'); const flow = JSON.parse(fs.readFileSync(flowPath, 'utf8')); // Remove existing monitoring nodes const monitorIds = flow.filter(n => n.id && n.id.startsWith('demo_mon_')).map(n => n.id); if (monitorIds.length > 0) { console.log('Removing existing monitoring nodes:', monitorIds); for (const id of monitorIds) { const idx = flow.findIndex(n => n.id === id); if (idx !== -1) flow.splice(idx, 1); } // Also remove from wires flow.forEach(n => { if (n.wires) { n.wires = n.wires.map(portWires => Array.isArray(portWires) ? portWires.filter(w => !monitorIds.includes(w)) : portWires ); } }); } // Add monitoring function nodes for each PS const monitors = [ { id: 'demo_mon_west', name: 'Monitor PS West', ps: 'demo_ps_west', x: 800, y: 50, }, { id: 'demo_mon_north', name: 'Monitor PS North', ps: 'demo_ps_north', x: 800, y: 100, }, { id: 'demo_mon_south', name: 'Monitor PS South', ps: 'demo_ps_south', x: 800, y: 150, }, ]; // Each PS sends process data on port 0. Wire monitoring nodes to PS port 0. monitors.forEach(mon => { // Function node that extracts key metrics and logs them periodically const fnNode = { id: mon.id, type: 'function', z: 'demo_tab_wwtp', name: mon.name, func: `// Extract key metrics from PS process output const p = msg.payload || {}; // Keys have .default suffix in PS output format const vol = p["volume.predicted.atequipment.default"]; const level = p["level.predicted.atequipment.default"]; const netFlow = p["netFlowRate.predicted.atequipment.default"]; const volPct = p["volumePercent.predicted.atequipment.default"]; // Only log when we have volume data if (vol !== null && vol !== undefined) { const ctx = context.get("tickCount") || 0; context.set("tickCount", ctx + 1); // Log every 10 ticks if (ctx % 10 === 0) { const fmt = (v, dec) => typeof v === "number" ? v.toFixed(dec) : String(v); const parts = ["vol=" + fmt(vol, 1) + "m3"]; if (level !== null && level !== undefined) parts.push("lvl=" + fmt(level, 3) + "m"); if (volPct !== null && volPct !== undefined) parts.push("fill=" + fmt(volPct, 1) + "%"); if (netFlow !== null && netFlow !== undefined) parts.push("net=" + fmt(netFlow, 1) + "m3/h"); node.warn(parts.join(" | ")); } } return msg;`, outputs: 1, timeout: '', noerr: 0, initialize: '', finalize: '', libs: [], x: mon.x, y: mon.y, wires: [[]], }; flow.push(fnNode); // Wire PS port 0 to this monitor (append to existing wires) const psNode = flow.find(n => n.id === mon.ps); if (psNode && psNode.wires && psNode.wires[0]) { if (!psNode.wires[0].includes(mon.id)) { psNode.wires[0].push(mon.id); } } console.log(`Added ${mon.id}: ${mon.name} → wired to ${mon.ps} port 0`); }); fs.writeFileSync(flowPath, JSON.stringify(flow, null, 2) + '\n'); console.log(`\nDone. ${monitors.length} monitoring nodes added.`);