#!/usr/bin/env node /** * Step 2: Merge Collection Point * - Adds link-out from each PS tab to merge on treatment tab * - Creates link-in, tag, collect, and dashboard link-out nodes on treatment * - Wires PS outputs through merge to feed reactor */ const fs = require('fs'); const path = require('path'); const FLOW_PATH = path.join(__dirname, '..', 'docker', 'demo-flow.json'); const flow = JSON.parse(fs.readFileSync(FLOW_PATH, 'utf8')); const byId = (id) => flow.find(n => n.id === id); // ============================================= // 2a. Link-out nodes on each PS tab // ============================================= flow.push( { id: "demo_link_merge_west_out", type: "link out", z: "demo_tab_ps_west", name: "→ Merge (West)", mode: "link", links: ["demo_link_merge_west_in"], x: 1080, y: 360 }, { id: "demo_link_merge_north_out", type: "link out", z: "demo_tab_ps_north", name: "→ Merge (North)", mode: "link", links: ["demo_link_merge_north_in"], x: 1080, y: 340 }, { id: "demo_link_merge_south_out", type: "link out", z: "demo_tab_ps_south", name: "→ Merge (South)", mode: "link", links: ["demo_link_merge_south_in"], x: 1080, y: 340 } ); // Add merge link-outs to each PS node's wires[0] const psWest = byId("demo_ps_west"); psWest.wires[0].push("demo_link_merge_west_out"); const psNorth = byId("demo_ps_north"); psNorth.wires[0].push("demo_link_merge_north_out"); const psSouth = byId("demo_ps_south"); psSouth.wires[0].push("demo_link_merge_south_out"); // ============================================= // 2b. Merge nodes on Treatment tab // ============================================= // Link-in nodes flow.push( { id: "demo_link_merge_west_in", type: "link in", z: "demo_tab_treatment", name: "← PS West", links: ["demo_link_merge_west_out"], x: 100, y: 920, wires: [["demo_fn_tag_west"]] }, { id: "demo_link_merge_north_in", type: "link in", z: "demo_tab_treatment", name: "← PS North", links: ["demo_link_merge_north_out"], x: 100, y: 980, wires: [["demo_fn_tag_north"]] }, { id: "demo_link_merge_south_in", type: "link in", z: "demo_tab_treatment", name: "← PS South", links: ["demo_link_merge_south_out"], x: 100, y: 1040, wires: [["demo_fn_tag_south"]] } ); // Tag functions flow.push( { id: "demo_fn_tag_west", type: "function", z: "demo_tab_treatment", name: "Tag: west", func: "msg._psSource = 'west';\nreturn msg;", outputs: 1, x: 280, y: 920, wires: [["demo_fn_merge_collect"]] }, { id: "demo_fn_tag_north", type: "function", z: "demo_tab_treatment", name: "Tag: north", func: "msg._psSource = 'north';\nreturn msg;", outputs: 1, x: 280, y: 980, wires: [["demo_fn_merge_collect"]] }, { id: "demo_fn_tag_south", type: "function", z: "demo_tab_treatment", name: "Tag: south", func: "msg._psSource = 'south';\nreturn msg;", outputs: 1, x: 280, y: 1040, wires: [["demo_fn_merge_collect"]] } ); // Merge collect function flow.push({ id: "demo_fn_merge_collect", type: "function", z: "demo_tab_treatment", name: "Merge Collector", func: `// Cache each PS output by _psSource tag, compute totals const p = msg.payload || {}; const ps = msg._psSource; const cache = flow.get('merge_cache') || { west: {}, north: {}, south: {} }; const keys = Object.keys(p); const pick = (prefix) => { const k = keys.find(k => k.startsWith(prefix)); return k ? Number(p[k]) : null; }; if (ps && cache[ps]) { const nf = pick('netFlowRate.predicted'); if (nf !== null) cache[ps].netFlow = nf; const fp = pick('volumePercent.predicted'); if (fp !== null) cache[ps].fillPct = fp; cache[ps].direction = p.direction || cache[ps].direction; cache[ps].ts = Date.now(); } flow.set('merge_cache', cache); const totalFlow = (cache.west.netFlow||0) + (cache.north.netFlow||0) + (cache.south.netFlow||0); const avgFill = ((cache.west.fillPct||0) + (cache.north.fillPct||0) + (cache.south.fillPct||0)) / 3; return { topic: 'merge_combined_influent', payload: { totalInfluentFlow: +totalFlow.toFixed(1), avgFillPercent: +avgFill.toFixed(1), west: cache.west, north: cache.north, south: cache.south } };`, outputs: 1, x: 480, y: 980, wires: [["demo_link_merge_dash"]] }); // Dashboard link-out for merge data flow.push({ id: "demo_link_merge_dash", type: "link out", z: "demo_tab_treatment", name: "→ Merge Dashboard", mode: "link", links: ["demo_link_merge_dash_in"], x: 680, y: 980 }); // Create a comment for the merge section flow.push({ id: "demo_comment_merge", type: "comment", z: "demo_tab_treatment", name: "=== MERGE COLLECTION POINT ===", info: "Combines output from all 3 pumping stations", x: 200, y: 880 }); // ============================================= // Validate // ============================================= const allIds = new Set(flow.map(n => n.id)); let brokenWires = 0; for (const n of flow) { if (!n.wires) continue; for (const port of n.wires) { for (const target of port) { if (!allIds.has(target)) { console.warn(`BROKEN WIRE: ${n.id} → ${target}`); brokenWires++; } } } } for (const n of flow) { if (n.type === 'link out' && n.links) { for (const lt of n.links) { if (!allIds.has(lt)) console.warn(`BROKEN LINK: ${n.id} links to missing ${lt}`); } } if (n.type === 'link in' && n.links) { for (const ls of n.links) { if (!allIds.has(ls)) console.warn(`BROKEN LINK: ${n.id} expects link from missing ${ls}`); } } } if (brokenWires === 0) console.log('All wire references valid ✓'); console.log('Total nodes:', flow.length); // Write fs.writeFileSync(FLOW_PATH, JSON.stringify(flow, null, 2) + '\n'); console.log(`Wrote ${FLOW_PATH}`);