#!/usr/bin/env python3 """ Generate the multi-tab Node-RED flow for the 'pumpingstation-complete-example' end-to-end demo. Stack ----- - 1 pumpingStation (basin model, levelbased control) - 1 machineGroupControl (orchestrates the 3 pumps) - 3 rotatingMachine pumps - 12 measurement nodes (4 per pump: upstream P, downstream P, flow, power) - All EVOLV node port-1 telemetry routed to InfluxDB via http request - FlowFuse dashboard (realtime + 1h trends) - Grafana dashboard (realtime gauges + historic graphs) Tabs ---- Tab 1 Process Plant EVOLV nodes only — pumps, MGC, PS, measurements, per-node output formatters and per-pump physics feeders that drive the measurement nodes from live plant state. Tab 2 Dashboard UI only ui-* widgets. No business logic. Tab 3 Demo Drivers inflow generator (Constant / Sine / Diurnal / Storm scenarios chosen by buttons; baseline set by slider). Tab 4 Setup & Init one-shot deploy-time injects (MGC scaling/mode, pumps mode = auto). Tab 5 Telemetry collects port-1 InfluxDB payloads from every EVOLV node, converts to line protocol, POSTs to InfluxDB. Cross-tab wiring is via NAMED link-out / link-in pairs only. To regenerate: python3 build_flow.py > flow.json """ import json import sys # --------------------------------------------------------------------------- # Tab IDs # --------------------------------------------------------------------------- TAB_PROCESS = "tab_process" TAB_UI = "tab_ui" TAB_DRIVERS = "tab_drivers" TAB_SETUP = "tab_setup" TAB_TLM = "tab_telemetry" # --------------------------------------------------------------------------- # Spacing constants # --------------------------------------------------------------------------- LANE_X = [120, 380, 640, 900, 1160, 1420] ROW = 80 SECTION_GAP = 220 POSITION_ICON = { "upstream": "→", "downstream": "←", "atEquipment": "⊥", } # --------------------------------------------------------------------------- # Cross-tab link channels — the wiring contract # --------------------------------------------------------------------------- CH_INFLOW_BASELINE = "cmd:inflow-baseline" # m³/h baseline (slider) CH_INFLOW_SCENARIO = "cmd:inflow-scenario" # 'constant' | 'sine' | 'diurnal' | 'storm' CH_QIN = "cmd:q_in" # m³/s, generator → PS CH_QD = "cmd:Qd" # m³/h, slider → PS (manual mode only) CH_PS_MODE = "cmd:ps-mode" # 'levelbased' | 'manual' CH_STATION_START = "cmd:station-startup" CH_STATION_STOP = "cmd:station-shutdown" CH_STATION_ESTOP = "cmd:station-estop" CH_PUMP_SETPOINT = {"pump_a": "cmd:setpoint-A", "pump_b": "cmd:setpoint-B", "pump_c": "cmd:setpoint-C"} CH_PUMP_SEQUENCE = {"pump_a": "cmd:pump-A-seq", "pump_b": "cmd:pump-B-seq", "pump_c": "cmd:pump-C-seq"} CH_PUMP_EVT = {"pump_a": "evt:pump-A", "pump_b": "evt:pump-B", "pump_c": "evt:pump-C"} CH_MGC_EVT = "evt:mgc" CH_PS_EVT = "evt:ps" CH_INFLOW_EVT = "evt:inflow" CH_TLM = "evt:tlm" PUMPS = ["pump_a", "pump_b", "pump_c"] PUMP_LABELS = {"pump_a": "Pump A", "pump_b": "Pump B", "pump_c": "Pump C"} MGC_ID = "mgc_pumps" PS_ID = "ps_basin" # Basin geometry — single source of truth. # Realistic wet-well wastewater pumping station — pumps are oversized # ~5× nominal inflow for storm tolerance. Sized so: # - nominal inflow ~25 m³/h refills the dead-band [stopLvl, startLvl] # (~6.25 m³) in ~15 min while pumps are off # - one pump at minimum stable flow (~99 m³/h) drains the same band in # ~5 min once engaged via the stopLevel Schmitt trigger # - storm inflow ~250 m³/h pushes percControl up the ramp until all 3 # pumps are engaged at high flow (combined max ≈ 681 m³/h) # surfaceArea = 50 / 4 = 12.5 m²; band volume = 12.5 × 0.5 = 6.25 m³ BASIN_VOLUME = 50.0 BASIN_HEIGHT = 4.0 OUTFLOW_LEVEL = 0.3 OVERFLOW_LEVEL = 3.8 # --------------------------------------------------------------------------- # Generic node-builder helpers # --------------------------------------------------------------------------- def comment(node_id, tab, x, y, name, info=""): return {"id": node_id, "type": "comment", "z": tab, "name": name, "info": info, "x": x, "y": y, "wires": []} def inject(node_id, tab, x, y, name, topic, payload, payload_type="str", once=False, repeat="", once_delay="0.5", wires=None): return { "id": node_id, "type": "inject", "z": tab, "name": name, "props": [ {"p": "topic", "vt": "str"}, {"p": "payload", "v": str(payload), "vt": payload_type}, ], "topic": topic, "payload": str(payload), "payloadType": payload_type, "repeat": repeat, "crontab": "", "once": once, "onceDelay": once_delay, "x": x, "y": y, "wires": [wires or []], } def function_node(node_id, tab, x, y, name, code, outputs=1, wires=None): return { "id": node_id, "type": "function", "z": tab, "name": name, "func": code, "outputs": outputs, "noerr": 0, "initialize": "", "finalize": "", "libs": [], "x": x, "y": y, "wires": wires if wires is not None else [[] for _ in range(outputs)], } def link_out(node_id, tab, x, y, channel_name, target_in_ids): return { "id": node_id, "type": "link out", "z": tab, "name": channel_name, "mode": "link", "links": list(target_in_ids), "x": x, "y": y, "wires": [], } def link_in(node_id, tab, x, y, channel_name, source_out_ids, downstream): return { "id": node_id, "type": "link in", "z": tab, "name": channel_name, "links": list(source_out_ids), "x": x, "y": y, "wires": [downstream or []], } def debug_node(node_id, tab, x, y, name, target="payload", target_type="msg", active=False): return { "id": node_id, "type": "debug", "z": tab, "name": name, "active": active, "tosidebar": True, "console": False, "tostatus": False, "complete": target, "targetType": target_type, "x": x, "y": y, "wires": [], } # --------------------------------------------------------------------------- # Dashboard scaffolding # --------------------------------------------------------------------------- def dashboard_scaffold(): base = { "id": "ui_base", "type": "ui-base", "name": "EVOLV Pumping", "path": "/dashboard", "appIcon": "", "includeClientData": True, "acceptsClientConfig": ["ui-notification", "ui-control"], "showPathInSidebar": True, "headerContent": "page", "navigationStyle": "default", "titleBarStyle": "default", } theme = { "id": "ui_theme", "type": "ui-theme", "name": "EVOLV Theme", "colors": { "surface": "#ffffff", "primary": "#0c99d9", "bgPage": "#f4f6fa", "groupBg": "#ffffff", "groupOutline": "#cccccc", }, "sizes": { "density": "default", "pagePadding": "12px", "groupGap": "12px", "groupBorderRadius": "6px", "widgetGap": "8px", }, } page_realtime = { "id": "ui_page_realtime", "type": "ui-page", "name": "Realtime", "ui": "ui_base", "path": "/realtime", "icon": "speed", "layout": "grid", "theme": "ui_theme", "breakpoints": [{"name": "Default", "px": "0", "cols": "12"}], "order": 1, "className": "", } page_trends = { "id": "ui_page_trends", "type": "ui-page", "name": "Trends — 1 hour", "ui": "ui_base", "path": "/trends", "icon": "show_chart", "layout": "grid", "theme": "ui_theme", "breakpoints": [{"name": "Default", "px": "0", "cols": "12"}], "order": 2, "className": "", } return [base, theme, page_realtime, page_trends] def ui_group(group_id, name, page_id, width=6, order=1): return { "id": group_id, "type": "ui-group", "name": name, "page": page_id, "width": str(width), "height": "1", "order": order, "showTitle": True, "className": "", "groupType": "default", "disabled": False, "visible": True, } def ui_text(node_id, tab, x, y, group, name, label, fmt, layout="row-spread"): return { "id": node_id, "type": "ui-text", "z": tab, "group": group, "order": 1, "width": "0", "height": "0", "name": name, "label": label, "format": fmt, "layout": layout, "style": False, "font": "", "fontSize": 14, "color": "#000000", "x": x, "y": y, "wires": [], } def ui_button(node_id, tab, x, y, group, name, label, payload, payload_type, topic, color="#0c99d9", icon="play_arrow", wires=None): return { "id": node_id, "type": "ui-button", "z": tab, "group": group, "name": name, "label": label, "order": 1, "width": "0", "height": "0", "tooltip": "", "color": "#ffffff", "bgcolor": color, "className": "", "icon": icon, "iconPosition": "left", "payload": payload, "payloadType": payload_type, "topic": topic, "topicType": "str", "buttonType": "default", "x": x, "y": y, "wires": [wires or []], } def ui_slider(node_id, tab, x, y, group, name, label, mn, mx, step=1.0, topic="", wires=None): return { "id": node_id, "type": "ui-slider", "z": tab, "group": group, "name": name, "label": label, "tooltip": "", "order": 1, "width": "0", "height": "0", "passthru": True, "outs": "end", "topic": topic, "topicType": "str", "min": str(mn), "max": str(mx), "step": str(step), "showLabel": True, "showValue": True, "labelPosition": "top", "valuePosition": "left", "thumbLabel": False, "iconStart": "", "iconEnd": "", "x": x, "y": y, "wires": [wires or []], } def ui_switch(node_id, tab, x, y, group, name, label, on_value, off_value, topic, wires=None): return { "id": node_id, "type": "ui-switch", "z": tab, "group": group, "name": name, "label": label, "tooltip": "", "order": 1, "width": "0", "height": "0", "passthru": True, "decouple": "false", "topic": topic, "topicType": "str", "style": "", "className": "", "evaluate": "true", "onvalue": on_value, "onvalueType": "str", "onicon": "auto_mode", "oncolor": "#0c99d9", "offvalue": off_value, "offvalueType": "str", "officon": "back_hand", "offcolor": "#888888", "x": x, "y": y, "wires": [wires or []], } def ui_chart(node_id, tab, x, y, group, name, label, width=12, height=6, remove_older="60", remove_older_unit="60", remove_older_points="1800", y_axis_label="", ymin=None, ymax=None, order=1, interpolation="linear"): """FlowFuse ui-chart — full required field set per node-red-flow-layout.md.""" return { "id": node_id, "type": "ui-chart", "z": tab, "group": group, "name": name, "label": label, "order": order, "chartType": "line", "interpolation": interpolation, "category": "topic", "categoryType": "msg", "xAxisLabel": "", "xAxisType": "time", "xAxisProperty": "", "xAxisPropertyType": "timestamp", "xAxisFormat": "", "xAxisFormatType": "auto", "xmin": "", "xmax": "", "yAxisLabel": y_axis_label, "yAxisProperty": "payload", "yAxisPropertyType": "msg", "ymin": "" if ymin is None else str(ymin), "ymax": "" if ymax is None else str(ymax), "removeOlder": str(remove_older), "removeOlderUnit": str(remove_older_unit), "removeOlderPoints": str(remove_older_points), "action": "append", "stackSeries": False, "pointShape": "circle", "pointRadius": 4, "showLegend": True, "bins": 10, "colors": [ "#0095FF", "#FF0000", "#FF7F0E", "#2CA02C", "#A347E1", "#D62728", "#FF9896", "#9467BD", "#C5B0D5", ], "textColor": ["#666666"], "textColorDefault": True, "gridColor": ["#e5e5e5"], "gridColorDefault": True, "width": int(width), "height": int(height), "className": "", "x": x, "y": y, "wires": [[]], } def ui_gauge(node_id, tab, x, y, group, name, title, units, mn, mx, segments, gtype="gauge-34", suffix="", icon="", width=3, height=3, order=1): return { "id": node_id, "type": "ui-gauge", "z": tab, "group": group, "name": name, "gtype": gtype, "gstyle": "Rounded", "title": title, "units": units, "prefix": "", "suffix": suffix, "min": mn, "max": mx, "segments": segments, "width": width, "height": height, "order": order, "icon": icon, "sizeGauge": 20, "sizeGap": 2, "sizeSegments": 10, "x": x, "y": y, "wires": [], } # --------------------------------------------------------------------------- # Tab 1 — PROCESS PLANT # --------------------------------------------------------------------------- def build_process_tab(): nodes = [] nodes.append({ "id": TAB_PROCESS, "type": "tab", "label": "🏭 Process Plant", "disabled": False, "info": ( "EVOLV plant model: 3 rotatingMachines (each with 4 measurement " "nodes — upstream P, downstream P, flow, power), MGC, PS.\n\n" "Per pump there is a 'physics' function node that consumes the " "pump's own port-0 stream PLUS PS port-0 (basin level) and " "drives all 4 measurement nodes with physically-coupled values " "(upstream P from basin head; downstream P from pump state + " "flow; flow/power mirror predicted with Gaussian noise). This " "lives on this tab so the plant model is self-contained.\n\n" "All cross-tab wires use named link-in / link-out channels." ), }) nodes.append(comment("c_process_title", TAB_PROCESS, LANE_X[2], 20, "🏭 PROCESS PLANT — EVOLV nodes + per-pump physics feeders", "")) # ---------------- Per-pump rows ---------------- for i, pump in enumerate(PUMPS): label = PUMP_LABELS[pump] y_section = 80 + i * (SECTION_GAP + 60) nodes.append(comment(f"c_{pump}", TAB_PROCESS, LANE_X[2], y_section, f"── {label} ── (pump + 4 sensors + physics feeder)", "Up/Dn pressure + flow + power sensors register as children of " "the pump. The physics_ function takes the pump's own " "port-0 stream and PS port-0 (basin level) and drives all 4 " "sensors with physically-coupled values." )) # ---- 4 measurement nodes (driven via msg.topic='measurement') ---- SENSORS = [ ("u", "Up", "upstream", "mbar", "pressure", "vega", "vega-pressure-10"), ("d", "Dn", "downstream", "mbar", "pressure", "vega", "vega-pressure-10"), ("f", "Flow", "downstream", "m3/h", "flow", "endress", "endress-promag-50"), ("p", "Pwr", "atEquipment","kW", "power", "siemens", "siemens-sentron-pac4200"), ] for j, (suffix, lbl, pos, unit, asset_type, supplier, model) in enumerate(SENSORS): mid = f"meas_{pump}_{suffix}" mid_label = f"{label.split()[1]}-{lbl}" if asset_type == "pressure": o_min, o_max = 0, 4000 elif asset_type == "flow": o_min, o_max = 0, 250 else: # power o_min, o_max = 0, 30 nodes.append({ "id": mid, "type": "measurement", "z": TAB_PROCESS, "name": mid_label, "mode": "analog", "channels": "[]", "scaling": False, "i_min": 0, "i_max": 1, "i_offset": 0, "o_min": o_min, "o_max": o_max, "simulator": False, "smooth_method": "mean", "count": "3", "processOutputFormat": "process", "dbaseOutputFormat": "influxdb", "uuid": f"sensor-{pump}-{suffix}", "supplier": supplier, "category": "sensor", "assetType": asset_type, "model": model, "unit": unit, "assetTagNumber": f"{label.split()[1]}-{suffix.upper()}", "enableLog": False, "logLevel": "warn", "tickIntervalMs": 2000, "positionVsParent": pos, "positionIcon": POSITION_ICON.get(pos, ""), "hasDistance": False, "distance": 0, "distanceUnit": "m", "distanceDescription": "", "x": LANE_X[1], "y": y_section + 40 + j * 35, # Port 0 unused, port 1 → telemetry, port 2 → pump (registerChild) "wires": [[], [f"lout_tlm_{mid}"], [pump]], }) nodes.append(link_out( f"lout_tlm_{mid}", TAB_PROCESS, LANE_X[1] + 200, y_section + 40 + j * 35, CH_TLM, target_in_ids=["lin_tlm"], )) # ---- The pump itself ---- nodes.append({ "id": pump, "type": "rotatingMachine", "z": TAB_PROCESS, "name": label, # speed (movement units/s). The state machine doesn't auto- # return to 'operational' after a routine abort (avoids a # bounce loop), so any setpoint that arrives while still # accelerating gets deferred via delayedMove. With MGC # retargeting every PS tick (2 s) and a 0..100 position # range, speed must be high enough that the movement # finishes inside one tick — otherwise the FSM gets parked # in 'accelerating' and the badge stops advancing. 200 u/s # gives a worst-case 0..100 traversal of 0.5 s, well inside # the 2 s window. "speed": "200", "startup": "2", "warmup": "1", "shutdown": "2", "cooldown": "1", "movementMode": "staticspeed", "machineCurve": "", "uuid": f"pump-{pump}", "supplier": "hidrostal", "category": "pump", "assetType": "pump-centrifugal", "model": "hidrostal-H05K-S03R", "unit": "m3/h", "curvePressureUnit": "mbar", "curveFlowUnit": "m3/h", "curvePowerUnit": "kW", "curveControlUnit": "%", "enableLog": False, "logLevel": "warn", "tickIntervalMs": 2000, "positionVsParent": "atEquipment", "positionIcon": POSITION_ICON["atEquipment"], "hasDistance": False, "distance": 0, "distanceUnit": "m", "distanceDescription": "", "x": LANE_X[3], "y": y_section + 90, "wires": [ [f"format_{pump}", f"physics_{pump}"], [f"lout_tlm_{pump}"], [MGC_ID], ], }) nodes.append(link_out( f"lout_tlm_{pump}", TAB_PROCESS, LANE_X[3], y_section + 130, CH_TLM, target_in_ids=["lin_tlm"], )) # ---- Per-pump output formatter (for dashboard) ---- nodes.append(function_node( f"format_{pump}", TAB_PROCESS, LANE_X[4], y_section + 90, f"format {label} port 0", "const p = msg.payload || {};\n" "const c = context.get('c') || {};\n" "Object.assign(c, p);\n" "context.set('c', c);\n" "// Throttle dashboard fan-out to ≤ 2 Hz. The pump emits on\n" "// every state change (multiple per sec while cycling); the\n" "// dashboard doesn't need that resolution and the websocket\n" "// fan-out chokes the browser.\n" "const now = Date.now();\n" "const last = context.get('_lastEmit') || 0;\n" "if (now - last < 1000) return null;\n" "context.set('_lastEmit', now);\n" "function find(prefix) {\n" " for (const k in c) { if (k.indexOf(prefix) === 0) return c[k]; }\n" " return null;\n" "}\n" "const flow = find('flow.predicted.downstream.');\n" "const power = find('power.predicted.atequipment.');\n" "const ctrl = find('ctrl.predicted.atequipment.');\n" "const pUp = find('pressure.measured.upstream.');\n" "const pDn = find('pressure.measured.downstream.');\n" "msg.payload = {\n" " state: c.state || 'idle',\n" " mode: c.mode || 'auto',\n" " ctrl: ctrl != null ? Number(ctrl ).toFixed(1) + '%' : 'n/a',\n" " flow: flow != null ? Number(flow ).toFixed(1) + ' m³/h' : 'n/a',\n" " power: power != null ? Number(power).toFixed(2) + ' kW' : 'n/a',\n" " pUp: pUp != null ? Number(pUp ).toFixed(0) + ' mbar' : 'n/a',\n" " pDn: pDn != null ? Number(pDn ).toFixed(0) + ' mbar' : 'n/a',\n" " ctrlNum: ctrl != null ? Number(ctrl ) : null,\n" " flowNum: flow != null ? Number(flow ) : null,\n" " powerNum: power != null ? Number(power) : null,\n" " pUpNum: pUp != null ? Number(pUp ) : null,\n" " pDnNum: pDn != null ? Number(pDn ) : null,\n" " // Pump is moving water any time it's between startup and shutdown, not\n" " // just during steady operational. accelerate/decelerate/warmup count.\n" " isRunning: ['operational','starting','warmingup','accelerating','decelerating','stopping'].includes(c.state),\n" "};\n" "return msg;", outputs=1, wires=[[f"lout_evt_{pump}"]], )) nodes.append(link_out( f"lout_evt_{pump}", TAB_PROCESS, LANE_X[5], y_section + 90, CH_PUMP_EVT[pump], target_in_ids=[f"lin_evt_{pump}_dash"], )) # ---- Physics feeder ---- nodes.append(function_node( f"physics_{pump}", TAB_PROCESS, LANE_X[4], y_section + 160, f"physics {label} → 4 sensors", _physics_code(pump.split("_", 1)[1]), outputs=4, wires=[ [f"meas_{pump}_u"], [f"meas_{pump}_d"], [f"meas_{pump}_f"], [f"meas_{pump}_p"], ], )) # ---- Setpoint slider link-in ---- nodes.append(link_in( f"lin_setpoint_{pump}", TAB_PROCESS, LANE_X[0], y_section + 60, CH_PUMP_SETPOINT[pump], source_out_ids=[f"lout_setpoint_{pump}_dash"], downstream=[f"build_setpoint_{pump}"], )) nodes.append(function_node( f"build_setpoint_{pump}", TAB_PROCESS, LANE_X[1] + 220, y_section + 60, f"build setpoint cmd ({label})", "msg.topic = 'execMovement';\n" "msg.payload = { source: 'GUI', action: 'execMovement', " "setpoint: Number(msg.payload) };\n" "return msg;", outputs=1, wires=[[pump]], )) # ---- Per-pump start/stop link-in ---- nodes.append(link_in( f"lin_seq_{pump}", TAB_PROCESS, LANE_X[0], y_section + 110, CH_PUMP_SEQUENCE[pump], source_out_ids=[f"lout_seq_{pump}_dash"], downstream=[pump], )) # ---------------- MGC ---------------- y_mgc = 80 + len(PUMPS) * (SECTION_GAP + 60) nodes.append(comment("c_mgc", TAB_PROCESS, LANE_X[2], y_mgc, "── MGC ── (orchestrates the 3 pumps via optimalcontrol)", "")) nodes.append({ "id": MGC_ID, "type": "machineGroupControl", "z": TAB_PROCESS, "name": "MGC — Pump Group", "uuid": "mgc-pump-group", "category": "controller", "assetType": "machinegroupcontrol", "model": "default", "unit": "m3/h", "supplier": "evolv", "enableLog": True, "logLevel": "debug", "tickIntervalMs": 2000, "positionVsParent": "atEquipment", "positionIcon": POSITION_ICON["atEquipment"], "hasDistance": False, "distance": 0, "distanceUnit": "m", "distanceDescription": "", "processOutputFormat": "process", "dbaseOutputFormat": "influxdb", "x": LANE_X[3], "y": y_mgc + 80, "wires": [ ["format_mgc"], ["lout_tlm_mgc"], [PS_ID], ], }) nodes.append(link_out( "lout_tlm_mgc", TAB_PROCESS, LANE_X[3], y_mgc + 120, CH_TLM, target_in_ids=["lin_tlm"], )) nodes.append(function_node( "format_mgc", TAB_PROCESS, LANE_X[4], y_mgc + 80, "format MGC port 0", "const p = msg.payload || {};\n" "const c = context.get('c') || {};\n" "Object.assign(c, p);\n" "context.set('c', c);\n" "// Throttle: MGC fires on every distribution change.\n" "const now = Date.now();\n" "const last = context.get('_lastEmit') || 0;\n" "if (now - last < 1000) return null;\n" "context.set('_lastEmit', now);\n" "function find(prefix) {\n" " for (const k in c) { if (k.indexOf(prefix) === 0) return c[k]; }\n" " return null;\n" "}\n" "const totalFlow = find('flow.predicted.atequipment.') ?? " "find('downstream_predicted_flow');\n" "const totalPower = find('power.predicted.atequipment.') ?? " "find('atEquipment_predicted_power');\n" "const eff = find('efficiency.predicted.atequipment.');\n" "msg.payload = {\n" " totalFlow: totalFlow != null ? Number(totalFlow ).toFixed(1) + ' m³/h' : 'n/a',\n" " totalPower: totalPower != null ? Number(totalPower).toFixed(2) + ' kW' : 'n/a',\n" " efficiency: eff != null ? Number(eff).toFixed(3) : 'n/a',\n" " totalFlowNum: totalFlow != null ? Number(totalFlow ) : null,\n" " totalPowerNum: totalPower != null ? Number(totalPower) : null,\n" " efficiencyNum: eff != null ? Number(eff) : null,\n" "};\n" "return msg;", outputs=1, wires=[["lout_evt_mgc"]], )) nodes.append(link_out( "lout_evt_mgc", TAB_PROCESS, LANE_X[5], y_mgc + 80, CH_MGC_EVT, target_in_ids=["lin_evt_mgc_dash"], )) # ---------------- PS ---------------- y_ps = y_mgc + SECTION_GAP + 60 nodes.append(comment("c_ps", TAB_PROCESS, LANE_X[2], y_ps, "── Pumping Station ── (basin model, levelbased control)", "")) nodes.append(link_in( "lin_qin_at_ps", TAB_PROCESS, LANE_X[0], y_ps + 40, CH_QIN, source_out_ids=["lout_qin_drivers"], downstream=[PS_ID], )) nodes.append(link_in( "lin_qd_at_ps", TAB_PROCESS, LANE_X[0], y_ps + 80, CH_QD, source_out_ids=["lout_qd_dash"], downstream=["qd_to_ps_wrap"], )) nodes.append(function_node( "qd_to_ps_wrap", TAB_PROCESS, LANE_X[1], y_ps + 80, "wrap slider → PS Qd", "msg.topic = 'Qd';\n" "return msg;", outputs=1, wires=[[PS_ID]], )) nodes.append(link_in( "lin_ps_mode_at_ps", TAB_PROCESS, LANE_X[0], y_ps + 120, CH_PS_MODE, source_out_ids=["lout_ps_mode_dash"], downstream=[PS_ID], )) nodes.append({ "id": PS_ID, "type": "pumpingStation", "z": TAB_PROCESS, "name": "Pumping Station", "uuid": "ps-basin-1", "category": "station", "assetType": "pumpingstation", "model": "default", "unit": "m3/s", "supplier": "evolv", "enableLog": False, "logLevel": "warn", "tickIntervalMs": 2000, "positionVsParent": "atEquipment", "positionIcon": POSITION_ICON["atEquipment"], "hasDistance": False, "distance": 0, "distanceUnit": "m", "distanceDescription": "", "processOutputFormat": "process", "dbaseOutputFormat": "influxdb", "controlMode": "levelbased", "basinVolume": BASIN_VOLUME, "basinHeight": BASIN_HEIGHT, # inflowLevel = top of inlet pipe (geometry) AND foot of the # demand ramp (control). Setting it equal to maxLevel collapses # the ramp to a step function — the runtime cycles 0/100 % every # tick AND the editor's level-mode preview hides the diagonal # line (mode-preview.js refuses to draw a degenerate ramp). "inflowLevel": 2.5, "outflowLevel": OUTFLOW_LEVEL, "overflowLevel": OVERFLOW_LEVEL, "inletPipeDiameter": 0.3, "outletPipeDiameter": 0.3, "minLevel": 0.5, # startLevel — ramp foot AND rising-edge engage point. Demand # scales 0..100 % over [startLevel, maxLevel]. "startLevel": 2.5, # stopLevel — falling-edge disengage point. While engaged AND # level < startLevel (basin draining through the dead band), PS # emits the keep-alive percControl below so MGC keeps a single # pump running until level reaches stopLevel. "stopLevel": 2.0, # deadZoneKeepAlivePercent — % sent to MGC while engaged in the # dead band [stopLevel, startLevel). Mapped by MGC's normalized # scaling to flow.min — i.e., a single pump at minimum stable # speed. 1 % is small enough to round to flow.min. "deadZoneKeepAlivePercent": 1, "maxLevel": 3.5, "refHeight": "NAP", "minHeightBasedOn": "outlet", "basinBottomRef": 0, "staticHead": 12, "maxDischargeHead": 24, "pipelineLength": 80, "defaultFluid": "wastewater", "temperatureReferenceDegC": 15, "maxInflowRate": 200, "enableDryRunProtection": True, "enableOverfillProtection": True, "dryRunThresholdPercent": 5, "overfillThresholdPercent": 95, "timeleftToFullOrEmptyThresholdSeconds": 0, "x": LANE_X[3], "y": y_ps + 80, "wires": [ ["format_ps", "ps_to_physics"], ["lout_tlm_ps"], ], }) nodes.append(link_out( "lout_tlm_ps", TAB_PROCESS, LANE_X[3], y_ps + 120, CH_TLM, target_in_ids=["lin_tlm"], )) nodes.append(function_node( "ps_to_physics", TAB_PROCESS, LANE_X[4], y_ps + 130, "ps → fan basin level to 3 physics feeders", "const out = { from: 'ps', payload: msg.payload };\n" "return [out, out, out];", outputs=3, wires=[["physics_pump_a"], ["physics_pump_b"], ["physics_pump_c"]], )) nodes.append(function_node( "format_ps", TAB_PROCESS, LANE_X[4], y_ps + 80, "format PS port 0", "const p = msg.payload || {};\n" "const c = context.get('c') || {};\n" "Object.assign(c, p);\n" "context.set('c', c);\n" "// Throttle: PS emits frequently in levelbased mode.\n" "const now = Date.now();\n" "const last = context.get('_lastEmit') || 0;\n" "if (now - last < 1000) return null;\n" "context.set('_lastEmit', now);\n" "function find(prefix) {\n" " for (const k in c) { if (k.indexOf(prefix) === 0) return c[k]; }\n" " return null;\n" "}\n" f"const MAX_VOL = {BASIN_VOLUME};\n" "const lvl = find('level.predicted.');\n" "const vol = find('volume.predicted.');\n" "const qIn = find('flow.predicted.in.');\n" "const qOut = find('flow.predicted.out.');\n" "const netFlowRate = find('netFlowRate.predicted.');\n" "const fillPct = vol != null\n" " ? Math.min(100, Math.max(0, Math.round(Number(vol) / MAX_VOL * 100)))\n" " : null;\n" "const netM3h = netFlowRate != null ? Number(netFlowRate) * 3600 : null;\n" "const seconds = (c.timeleft != null && Number.isFinite(Number(c.timeleft)))\n" " ? Number(c.timeleft) : null;\n" "const timeStr = seconds != null\n" " ? (seconds > 60 ? Math.round(seconds/60) + ' min'\n" " : Math.round(seconds) + ' s')\n" " : 'n/a';\n" "msg.payload = {\n" " direction: c.direction || 'steady',\n" " level: lvl != null ? Number(lvl).toFixed(2) + ' m' : 'n/a',\n" " volume: vol != null ? Number(vol).toFixed(1) + ' m³' : 'n/a',\n" " fillPct: fillPct != null ? fillPct + '%' : 'n/a',\n" " netFlow: netM3h != null ? netM3h.toFixed(0) + ' m³/h' : 'n/a',\n" " timeLeft: timeStr,\n" " qIn: qIn != null ? (Number(qIn ) * 3600).toFixed(0) + ' m³/h' : 'n/a',\n" " qOut: qOut != null ? (Number(qOut) * 3600).toFixed(0) + ' m³/h' : 'n/a',\n" " levelNum: lvl != null ? Number(lvl) : null,\n" " volumeNum: vol != null ? Number(vol) : null,\n" " fillPctNum: fillPct,\n" " netFlowNum: netM3h,\n" " percControl: c.percControl != null ? Number(c.percControl) : null,\n" " qInNum: qIn != null ? Number(qIn ) * 3600 : null,\n" " qOutNum: qOut != null ? Number(qOut) * 3600 : null,\n" " safetyState: c.safetyState || 'normal',\n" "};\n" "return msg;", outputs=1, wires=[["lout_evt_ps"]], )) nodes.append(link_out( "lout_evt_ps", TAB_PROCESS, LANE_X[5], y_ps + 80, CH_PS_EVT, target_in_ids=["lin_evt_ps_dash"], )) # ---------------- Mode broadcast ---------------- y_mode = y_ps + SECTION_GAP nodes.append(comment("c_mode_bcast", TAB_PROCESS, LANE_X[2], y_mode, "── Mode broadcast ──", "")) nodes.append(link_in( "lin_mode", TAB_PROCESS, LANE_X[0], y_mode + 60, "cmd:mode", source_out_ids=["lout_mode_setup"], downstream=["fanout_mode"], )) nodes.append(function_node( "fanout_mode", TAB_PROCESS, LANE_X[1] + 220, y_mode + 60, "fan setMode → 3 pumps", "msg.topic = 'setMode';\n" "return [msg, msg, msg];", outputs=3, wires=[["pump_a"], ["pump_b"], ["pump_c"]], )) # ---------------- Station-wide commands ---------------- y_station = y_mode + 200 nodes.append(comment("c_station_cmds", TAB_PROCESS, LANE_X[2], y_station, "── Station-wide commands ──", "")) for k, (chan, link_id, fn_name, label_suffix) in enumerate([ (CH_STATION_START, "lin_station_start", "fan_station_start", "startup"), (CH_STATION_STOP, "lin_station_stop", "fan_station_stop", "shutdown"), (CH_STATION_ESTOP, "lin_station_estop", "fan_station_estop", "emergency stop"), ]): y = y_station + 60 + k * 60 slug = chan.replace(":", "_").replace("-", "_") nodes.append(link_in( link_id, TAB_PROCESS, LANE_X[0], y, chan, source_out_ids=[f"lout_{slug}_dash"], downstream=[fn_name], )) nodes.append(function_node( fn_name, TAB_PROCESS, LANE_X[1] + 220, y, f"fan {label_suffix} → 3 pumps", "return [msg, msg, msg];", outputs=3, wires=[["pump_a"], ["pump_b"], ["pump_c"]], )) # ---------------- Setup feeder link-in ---------------- y_setup_in = y_station + 280 nodes.append(comment("c_setup_at_mgc", TAB_PROCESS, LANE_X[2], y_setup_in, "── Setup feeders ──", "")) nodes.append(link_in( "lin_setup_at_mgc", TAB_PROCESS, LANE_X[0], y_setup_in + 60, "setup:to-mgc", source_out_ids=["lout_setup_to_mgc"], downstream=[MGC_ID], )) nodes.append(link_in( "lin_setup_calibrate_ps", TAB_PROCESS, LANE_X[0], y_setup_in + 120, "setup:calibrate-ps", source_out_ids=["lout_setup_calibrate"], downstream=[PS_ID], )) return nodes def _physics_code(pump_letter): """JS source for the per-pump physics feeder. Real parallel-pump installations share suction and discharge headers, so every pump sees the SAME differential pressure. We therefore publish each pump's predicted flow into Node-RED `flow` context, sum across all pumps to get the manifold flow, and derive ONE header pressure used as p_downstream for ALL pumps. Per-pump diagnostics still get individually-noisy upstream values (suction header) since sensor noise is local even on a shared header. """ return ( "const c = context.get('c') || {};\n" "function find(o, prefix) {\n" " for (const k in o) { if (k.indexOf(prefix) === 0) return o[k]; }\n" " return null;\n" "}\n" "function gauss(sigma) {\n" " let s = 0;\n" " for (let i = 0; i < 12; i++) s += Math.random();\n" " return (s - 6) * sigma;\n" "}\n" "\n" "if (msg.from === 'ps') {\n" " const psSnap = c.ps || {};\n" " Object.assign(psSnap, msg.payload || {});\n" " c.ps = psSnap;\n" " const lvl = find(psSnap, 'level.predicted.atequipment.')\n" " ?? find(psSnap, 'level.measured.atequipment.');\n" " if (lvl != null) c.basinLevel = Number(lvl);\n" " context.set('c', c);\n" " return null;\n" "}\n" "\n" "const pumpSnap = c.pump || {};\n" "Object.assign(pumpSnap, msg.payload || {});\n" "c.pump = pumpSnap;\n" "context.set('c', c);\n" "// Throttle: 1 Hz sensor updates are plenty for the demo; the\n" "// pump emits on every state change (5+/sec while cycling).\n" "const _now = Date.now();\n" "const _last = context.get('_lastEmit') || 0;\n" "if (_now - _last < 1000) return null;\n" "context.set('_lastEmit', _now);\n" "\n" "const state = pumpSnap.state || 'idle';\n" "// 'isRunning' = the rotor is spinning (any non-idle, non-cooled state).\n" "// MGC retargets flow on every tick, so the pump spends most of its\n" "// time in 'accelerating' or 'decelerating', not 'operational'. Those\n" "// transient states are still moving water — flow/power sensors must\n" "// publish non-zero values during them or the measurement nodes go\n" "// quiet (formatMsg skips emits on no-diff).\n" "const isRunning = ['operational','starting','warmingup','accelerating','decelerating','stopping'].includes(state);\n" "// 'pumpFlow' (not 'flow') — `flow` is the Node-RED flow-context object.\n" "const pumpFlow = Number(find(pumpSnap, 'flow.predicted.downstream.'));\n" "const pumpPower = Number(find(pumpSnap, 'power.predicted.atequipment.'));\n" "const basinLevel = c.basinLevel != null ? Number(c.basinLevel) : 0;\n" "\n" "// Publish this pump's contribution to the flow-context shared\n" "// header so the other physics feeders can compute total flow.\n" f"flow.set('pump_flow_{pump_letter}', isRunning && Number.isFinite(pumpFlow) ? pumpFlow : 0);\n" f"flow.set('pump_flow_{pump_letter}_state', state);\n" "const flowA = Number(flow.get('pump_flow_a') || 0);\n" "const flowB = Number(flow.get('pump_flow_b') || 0);\n" "const flowC = Number(flow.get('pump_flow_c') || 0);\n" "const totalFlow = flowA + flowB + flowC;\n" "\n" # Hydrostatic head → mbar. # Pa = rho * g * h = 9810 * h (rho=1000, g=9.81) # mbar = Pa / 100 = 98.1 * h f"const HEAD_M = Math.max(0, basinLevel - {OUTFLOW_LEVEL});\n" "// Suction (basin) header pressure — same physical value for all\n" "// pumps; per-pump sensor noise added independently.\n" "const p_upstream_clean = 98.1 * HEAD_M;\n" "let p_upstream = Math.max(0, p_upstream_clean + gauss(2.5));\n" "\n" "// Discharge (header) pressure — driven by TOTAL flow leaving the\n" "// manifold, NOT this pump's individual flow. Static head 12 m\n" "// + quadratic system curve scaled so totalFlow=300 m³/h gives\n" "// ~full dynamic head.\n" "const STATIC_MBAR = 12 * 98.1;\n" "const DYN_MBAR_MAX = 12 * 98.1;\n" "const TOTAL_FLOW_MAX = 300;\n" "const ratio = Math.min(1, totalFlow / TOTAL_FLOW_MAX);\n" "const p_downstream_header = STATIC_MBAR + ratio * ratio * DYN_MBAR_MAX;\n" "// Publish the clean header value to flow context so the MGC's\n" "// header-pressure measurement child can read it.\n" "flow.set('header_p_downstream', p_downstream_header);\n" "flow.set('header_p_upstream', p_upstream_clean);\n" "// Per-pump downstream sensor: header value with local sensor noise.\n" "let p_downstream = Math.max(0, p_downstream_header + gauss(8));\n" "\n" "const flowMeas = (isRunning && Number.isFinite(pumpFlow))\n" " ? Math.max(0, pumpFlow + gauss(Math.max(0.5, pumpFlow * 0.01)))\n" " : 0;\n" "\n" "const powerMeas = (isRunning && Number.isFinite(pumpPower))\n" " ? Math.max(0, pumpPower + gauss(Math.max(0.05, pumpPower * 0.005)))\n" " : 0;\n" "\n" "return [\n" " { topic: 'measurement', payload: p_upstream },\n" " { topic: 'measurement', payload: p_downstream },\n" " { topic: 'measurement', payload: flowMeas },\n" " { topic: 'measurement', payload: powerMeas },\n" "];\n" ) # --------------------------------------------------------------------------- # Tab 2 — DASHBOARD UI # --------------------------------------------------------------------------- def build_ui_tab(): nodes = [] nodes.append({ "id": TAB_UI, "type": "tab", "label": "📊 Dashboard UI", "disabled": False, "info": ( "All FlowFuse ui-* widgets. Two pages:\n" " /dashboard/realtime — gauges + per-pump status (no time history)\n" " /dashboard/trends — line charts, 1 hour rolling window\n\n" "All inputs leave via link-out; all process state arrives via link-in." ), }) nodes += dashboard_scaffold() PG_RT = "ui_page_realtime" PG_TRENDS = "ui_page_trends" g_inflow = "ui_grp_inflow" g_station = "ui_grp_station" g_basin = "ui_grp_basin" g_mgc = "ui_grp_mgc" g_pump_a = "ui_grp_pump_a" g_pump_b = "ui_grp_pump_b" g_pump_c = "ui_grp_pump_c" g_tr_basin = "ui_grp_tr_basin" g_tr_demand = "ui_grp_tr_demand" g_tr_dq = "ui_grp_tr_dq" g_tr_states = "ui_grp_tr_states" g_tr_flow = "ui_grp_tr_flow" g_tr_power = "ui_grp_tr_power" g_tr_press = "ui_grp_tr_press" nodes += [ ui_group(g_inflow, "1. Inflow (operator input)", PG_RT, width=12, order=1), ui_group(g_station, "2. Station Mode + Commands", PG_RT, width=12, order=2), ui_group(g_basin, "3. Basin Realtime", PG_RT, width=6, order=3), ui_group(g_mgc, "4. Pump Group (MGC)", PG_RT, width=6, order=4), ui_group(g_pump_a, "5a. Pump A", PG_RT, width=4, order=5), ui_group(g_pump_b, "5b. Pump B", PG_RT, width=4, order=6), ui_group(g_pump_c, "5c. Pump C", PG_RT, width=4, order=7), ui_group(g_tr_basin, "Basin level + fill (1h)", PG_TRENDS, width=12, order=1), ui_group(g_tr_demand, "Process demand — PS percControl (1h)", PG_TRENDS, width=12, order=2), ui_group(g_tr_dq, "ΔQ = inflow − outflow (m³/h, +fill / −drain)", PG_TRENDS, width=12, order=3), ui_group(g_tr_states, "Pump state timeline (gantt)", PG_TRENDS, width=12, order=4), ui_group(g_tr_flow, "Inflow / Outflow / Per-pump flow (1h)", PG_TRENDS, width=12, order=5), ui_group(g_tr_power, "Per-pump power (1h)", PG_TRENDS, width=12, order=6), ui_group(g_tr_press, "Per-pump pressures (1h)", PG_TRENDS, width=12, order=7), ] nodes.append(comment("c_ui_title", TAB_UI, LANE_X[2], 20, "📊 DASHBOARD UI — only ui-* widgets here", "")) # ---------- INFLOW SECTION ---------- y = 80 nodes.append(comment("c_ui_inflow", TAB_UI, LANE_X[2], y, "── Operator inflow input ──", "")) nodes.append(ui_slider( "ui_inflow_slider", TAB_UI, LANE_X[0], y + 40, g_inflow, "Inflow baseline", "Inflow baseline (m³/h) — scenarios modulate around this value", 0, 250, 5.0, "inflowBaseline", wires=["lout_inflow_baseline"], )) nodes.append(link_out( "lout_inflow_baseline", TAB_UI, LANE_X[1], y + 40, CH_INFLOW_BASELINE, target_in_ids=["lin_inflow_baseline"], )) SCENARIOS = [ ("constant", "Constant", "#0c99d9", "horizontal_rule"), ("sine", "Sine wave","#16a34a", "show_chart"), ("diurnal", "Diurnal", "#f59e0b", "schedule"), ("storm", "Storm", "#dc2626", "thunderstorm"), ] for k, (key, label, color, icon) in enumerate(SCENARIOS): ybtn = y + 100 + k * 50 btn_id = f"btn_scn_{key}" wrap_id = f"wrap_scn_{key}" nodes.append(ui_button( btn_id, TAB_UI, LANE_X[0], ybtn, g_inflow, f"Scenario {label}", label, key, "str", topic="scenario", color=color, icon=icon, wires=[wrap_id], )) nodes.append(function_node( wrap_id, TAB_UI, LANE_X[1] + 100, ybtn, f"build scenario {key}", f"msg.payload = '{key}';\n" "return msg;", outputs=1, wires=[["lout_inflow_scenario"]], )) nodes.append(link_out( "lout_inflow_scenario", TAB_UI, LANE_X[2], y + 100, CH_INFLOW_SCENARIO, target_in_ids=["lin_inflow_scenario"], )) nodes.append(link_in( "lin_evt_inflow", TAB_UI, LANE_X[3], y + 40, CH_INFLOW_EVT, source_out_ids=["lout_evt_inflow"], downstream=["dispatch_inflow"], )) nodes.append(function_node( "dispatch_inflow", TAB_UI, LANE_X[4], y + 40, "dispatch inflow", "const p = msg.payload || {};\n" "const ts = Date.now();\n" "return [\n" " { payload: (p.scenario || 'constant').toUpperCase() },\n" " { payload: p.q_h != null ? Number(p.q_h).toFixed(1) + ' m³/h' : 'n/a' },\n" " p.q_h != null ? { topic: 'Inflow', payload: Number(p.q_h), timestamp: ts } : null,\n" "];", outputs=3, wires=[["ui_inflow_scn_text"], ["ui_inflow_value_text"], ["chart_trend_flow"]], )) nodes.append(ui_text( "ui_inflow_scn_text", TAB_UI, LANE_X[5], y + 40, g_inflow, "Active scenario", "Active scenario", "{{msg.payload}}", )) nodes.append(ui_text( "ui_inflow_value_text", TAB_UI, LANE_X[5], y + 80, g_inflow, "Live inflow", "Live inflow", "{{msg.payload}}", )) # ---------- MODE + STATION COMMANDS ---------- y = 380 nodes.append(comment("c_ui_station", TAB_UI, LANE_X[2], y, "── Mode + Station-wide buttons ──", "")) nodes.append(ui_switch( "ui_mode_toggle", TAB_UI, LANE_X[0], y + 40, g_station, "Station mode", "Station mode (Auto = level-based · Manual = slider Qd)", on_value="levelbased", off_value="manual", topic="changemode", wires=["lout_ps_mode_dash"], )) nodes.append(link_out( "lout_ps_mode_dash", TAB_UI, LANE_X[1], y + 40, CH_PS_MODE, target_in_ids=["lin_ps_mode_at_ps"], )) nodes.append(ui_slider( "ui_qd_slider", TAB_UI, LANE_X[0], y + 90, g_station, "Manual Qd", "Manual Qd (m³/h, manual mode only)", 0, 600, 5.0, "manualDemand", wires=["lout_qd_dash"], )) nodes.append(link_out( "lout_qd_dash", TAB_UI, LANE_X[1], y + 90, CH_QD, target_in_ids=["lin_qd_at_ps"], )) for k, (text, color, icon, lout_id, channel, wrap_code) in enumerate([ ("Start all pumps", "#16a34a", "play_arrow", "lout_cmd_station_startup_dash", CH_STATION_START, "msg.topic = 'execSequence';\n" "msg.payload = { source:'GUI', action:'execSequence', " "parameter:'startup' };\n" "return msg;"), ("Stop all pumps", "#ea580c", "stop", "lout_cmd_station_shutdown_dash", CH_STATION_STOP, "msg.topic = 'execSequence';\n" "msg.payload = { source:'GUI', action:'execSequence', " "parameter:'shutdown' };\n" "return msg;"), ("EMERGENCY STOP", "#dc2626", "stop_circle", "lout_cmd_station_estop_dash", CH_STATION_ESTOP, "msg.topic = 'emergencystop';\n" "msg.payload = { source:'GUI', action:'emergencystop' };\n" "return msg;"), ]): yk = y + 150 + k * 50 btn_id = f"btn_station_{k}" wrap_id = f"wrap_station_{k}" nodes.append(ui_button( btn_id, TAB_UI, LANE_X[0], yk, g_station, text, text, "fired", "str", topic=f"station_{k}", color=color, icon=icon, wires=[wrap_id], )) nodes.append(function_node( wrap_id, TAB_UI, LANE_X[1] + 100, yk, f"build cmd ({text})", wrap_code, outputs=1, wires=[[lout_id]], )) nodes.append(link_out( lout_id, TAB_UI, LANE_X[2], yk, channel, target_in_ids=[{ CH_STATION_START: "lin_station_start", CH_STATION_STOP: "lin_station_stop", CH_STATION_ESTOP: "lin_station_estop", }[channel]], )) # ---------- BASIN REALTIME ---------- y = 700 nodes.append(comment("c_ui_basin", TAB_UI, LANE_X[2], y, "── Basin realtime (gauges + text) ──", "")) nodes.append(link_in( "lin_evt_ps_dash", TAB_UI, LANE_X[0], y + 40, CH_PS_EVT, source_out_ids=["lout_evt_ps"], downstream=["dispatch_ps"], )) nodes.append(function_node( "dispatch_ps", TAB_UI, LANE_X[1], y + 40, "dispatch PS", "const p = msg.payload || {};\n" "const ts = Date.now();\n" "// ΔQ = inflow − outflow in m³/h (positive = filling).\n" "const dQ = (p.qInNum != null && p.qOutNum != null)\n" " ? p.qInNum - p.qOutNum : null;\n" "// Demand text formatting.\n" "const demandStr = p.percControl != null\n" " ? Number(p.percControl).toFixed(0) + '%' : 'n/a';\n" "return [\n" " { payload: String(p.direction || 'steady') },\n" " { payload: String(p.level || 'n/a') },\n" " { payload: String(p.volume || 'n/a') },\n" " { payload: String(p.fillPct || 'n/a') },\n" " { payload: String(p.netFlow || 'n/a') },\n" " { payload: String(p.timeLeft || 'n/a') },\n" " { payload: String(p.qIn || 'n/a') },\n" " { payload: String(p.qOut || 'n/a') },\n" " { payload: String(p.safetyState || 'normal') },\n" " { payload: demandStr },\n" " p.levelNum != null ? { payload: p.levelNum } : null,\n" " p.fillPctNum != null ? { payload: p.fillPctNum } : null,\n" " p.percControl != null ? { payload: p.percControl } : null,\n" " p.levelNum != null ? { topic: 'Basin level', payload: p.levelNum, timestamp: ts } : null,\n" " p.fillPctNum != null ? { topic: 'Fill %', payload: p.fillPctNum, timestamp: ts } : null,\n" " p.qOutNum != null ? { topic: 'Outflow', payload: p.qOutNum, timestamp: ts } : null,\n" " p.percControl != null ? { topic: 'PS demand', payload: p.percControl, timestamp: ts } : null,\n" " dQ != null ? { topic: 'ΔQ', payload: dQ, timestamp: ts } : null,\n" "];", outputs=18, wires=[ ["ui_ps_direction"], ["ui_ps_level"], ["ui_ps_volume"], ["ui_ps_fill"], ["ui_ps_netflow"], ["ui_ps_timeleft"], ["ui_ps_qin"], ["ui_ps_qout"], ["ui_ps_safety"], ["ui_ps_demand"], ["gauge_basin_level"], ["gauge_basin_fill"], ["gauge_ps_demand"], ["chart_trend_basin"], ["chart_trend_basin"], ["chart_trend_flow"], ["chart_trend_demand"], ["chart_trend_dq"], ], )) nodes.append(ui_text("ui_ps_direction", TAB_UI, LANE_X[2], y + 40, g_basin, "Direction", "Direction", "{{msg.payload}}")) nodes.append(ui_text("ui_ps_level", TAB_UI, LANE_X[2], y + 70, g_basin, "Basin level", "Basin level","{{msg.payload}}")) nodes.append(ui_text("ui_ps_volume", TAB_UI, LANE_X[2], y + 100, g_basin, "Basin volume","Basin volume","{{msg.payload}}")) nodes.append(ui_text("ui_ps_fill", TAB_UI, LANE_X[2], y + 130, g_basin, "Fill %", "Fill %", "{{msg.payload}}")) nodes.append(ui_text("ui_ps_netflow", TAB_UI, LANE_X[2], y + 160, g_basin, "Net flow", "Net flow", "{{msg.payload}}")) nodes.append(ui_text("ui_ps_timeleft", TAB_UI, LANE_X[2], y + 190, g_basin, "Time left", "Time to full/empty", "{{msg.payload}}")) nodes.append(ui_text("ui_ps_qin", TAB_UI, LANE_X[2], y + 220, g_basin, "Inflow", "Inflow", "{{msg.payload}}")) nodes.append(ui_text("ui_ps_qout", TAB_UI, LANE_X[2], y + 250, g_basin, "Outflow", "Outflow", "{{msg.payload}}")) nodes.append(ui_text("ui_ps_safety", TAB_UI, LANE_X[2], y + 280, g_basin, "Safety", "Safety state","{{msg.payload}}")) nodes.append(ui_text("ui_ps_demand", TAB_UI, LANE_X[2], y + 310, g_basin, "PS demand", "Process demand","{{msg.payload}}")) LEVEL_SEGMENTS = [ {"color": "#f44336", "from": 0}, {"color": "#ff9800", "from": 1.0}, {"color": "#2196f3", "from": 2.0}, {"color": "#ff9800", "from": 3.5}, {"color": "#f44336", "from": 3.8}, ] FILL_SEGMENTS = [ {"color": "#f44336", "from": 0}, {"color": "#ff9800", "from": 10}, {"color": "#4caf50", "from": 30}, {"color": "#ff9800", "from": 80}, {"color": "#f44336", "from": 95}, ] nodes.append(ui_gauge( "gauge_basin_level", TAB_UI, LANE_X[3], y + 40, g_basin, "Basin level gauge", "Level", "m", 0, BASIN_HEIGHT, LEVEL_SEGMENTS, gtype="gauge-tank", suffix=" m", width=3, height=4, order=10, )) nodes.append(ui_gauge( "gauge_basin_fill", TAB_UI, LANE_X[3], y + 100, g_basin, "Basin fill gauge", "Fill", "%", 0, 100, FILL_SEGMENTS, gtype="gauge-34", suffix="%", icon="water_drop", width=3, height=4, order=11, )) # PS process demand gauge — shows the % command PS sends to MGC. DEMAND_SEGMENTS = [ {"color": "#cccccc", "from": 0}, {"color": "#0c99d9", "from": 5}, {"color": "#16a34a", "from": 30}, {"color": "#f59e0b", "from": 70}, {"color": "#dc2626", "from": 95}, ] nodes.append(ui_gauge( "gauge_ps_demand", TAB_UI, LANE_X[3], y + 160, g_basin, "PS demand gauge", "PS demand", "%", 0, 100, DEMAND_SEGMENTS, gtype="gauge-34", suffix="%", icon="speed", width=3, height=4, order=12, )) # ---------- MGC REALTIME ---------- y = 1080 nodes.append(comment("c_ui_mgc", TAB_UI, LANE_X[2], y, "── MGC realtime ──", "")) nodes.append(link_in( "lin_evt_mgc_dash", TAB_UI, LANE_X[0], y + 40, CH_MGC_EVT, source_out_ids=["lout_evt_mgc"], downstream=["dispatch_mgc"], )) nodes.append(function_node( "dispatch_mgc", TAB_UI, LANE_X[1], y + 40, "dispatch MGC", "const p = msg.payload || {};\n" "return [\n" " { payload: String(p.totalFlow || 'n/a') },\n" " { payload: String(p.totalPower || 'n/a') },\n" " { payload: String(p.efficiency || 'n/a') },\n" " p.totalFlowNum != null ? { payload: p.totalFlowNum } : null,\n" " p.totalPowerNum != null ? { payload: p.totalPowerNum } : null,\n" "];", outputs=5, wires=[ ["ui_mgc_total_flow"], ["ui_mgc_total_power"], ["ui_mgc_eff"], ["gauge_mgc_flow"], ["gauge_mgc_power"], ], )) nodes.append(ui_text("ui_mgc_total_flow", TAB_UI, LANE_X[2], y + 40, g_mgc, "MGC total flow", "Total flow", "{{msg.payload}}")) nodes.append(ui_text("ui_mgc_total_power", TAB_UI, LANE_X[2], y + 70, g_mgc, "MGC total power", "Total power", "{{msg.payload}}")) nodes.append(ui_text("ui_mgc_eff", TAB_UI, LANE_X[2], y + 100, g_mgc, "MGC efficiency", "Group efficiency", "{{msg.payload}}")) nodes.append(ui_gauge( "gauge_mgc_flow", TAB_UI, LANE_X[3], y + 40, g_mgc, "MGC total flow gauge", "Total flow", "m³/h", 0, 600, [ {"color": "#cccccc", "from": 0}, {"color": "#0c99d9", "from": 50}, {"color": "#16a34a", "from": 200}, {"color": "#f59e0b", "from": 500}, ], gtype="gauge-34", suffix=" m³/h", width=3, height=4, order=10, )) nodes.append(ui_gauge( "gauge_mgc_power", TAB_UI, LANE_X[3], y + 100, g_mgc, "MGC total power gauge", "Total power", "kW", 0, 30, [ {"color": "#cccccc", "from": 0}, {"color": "#0c99d9", "from": 1}, {"color": "#16a34a", "from": 5}, {"color": "#f59e0b", "from": 20}, ], gtype="gauge-34", suffix=" kW", width=3, height=4, order=11, )) # ---------- PER-PUMP REALTIME PANELS ---------- y_pumps_start = 1340 PUMP_FIELDS = [ ("State", "state", "{{msg.payload}}"), ("Mode", "mode", "{{msg.payload}}"), ("Controller %", "ctrl", "{{msg.payload}}"), ("Flow", "flow", "{{msg.payload}}"), ("Power", "power", "{{msg.payload}}"), ("p Upstream", "pUp", "{{msg.payload}}"), ("p Downstream", "pDn", "{{msg.payload}}"), ] for i, pump in enumerate(PUMPS): label = PUMP_LABELS[pump] g = {"pump_a": g_pump_a, "pump_b": g_pump_b, "pump_c": g_pump_c}[pump] y_p = y_pumps_start + i * 480 state_offset = i * 3 # A=0, B=3, C=6 nodes.append(comment(f"c_ui_{pump}", TAB_UI, LANE_X[2], y_p, f"── {label} ──", "")) nodes.append(link_in( f"lin_evt_{pump}_dash", TAB_UI, LANE_X[0], y_p + 40, CH_PUMP_EVT[pump], source_out_ids=[f"lout_evt_{pump}"], downstream=[f"dispatch_{pump}"], )) dispatch_code = ( "const p = msg.payload || {};\n" "const ts = Date.now();\n" f"const OFF = {state_offset};\n" "function stateNum(s) {\n" " switch (s) {\n" " case 'operational': return OFF + 2;\n" " case 'starting':\n" " case 'warmingup': return OFF + 1;\n" " case 'stopping': return OFF + 1.5;\n" " case 'coolingdown': return OFF + 0.5;\n" " default: return OFF;\n" " }\n" "}\n" "const sNum = p.state ? stateNum(p.state) : null;\n" "return [\n" " {payload: String(p.state || 'idle')},\n" " {payload: String(p.mode || 'auto')},\n" " {payload: String(p.ctrl || 'n/a')},\n" " {payload: String(p.flow || 'n/a')},\n" " {payload: String(p.power || 'n/a')},\n" " {payload: String(p.pUp || 'n/a')},\n" " {payload: String(p.pDn || 'n/a')},\n" " p.flowNum != null ? {topic: '" + label + "', payload: p.flowNum, timestamp: ts} : null,\n" " p.powerNum != null ? {topic: '" + label + "', payload: p.powerNum, timestamp: ts} : null,\n" " p.pUpNum != null ? {topic: '" + label + " up', payload: p.pUpNum, timestamp: ts} : null,\n" " p.pDnNum != null ? {topic: '" + label + " dn', payload: p.pDnNum, timestamp: ts} : null,\n" " sNum != null ? {topic: '" + label + " state', payload: sNum, timestamp: ts} : null,\n" "];" ) nodes.append(function_node( f"dispatch_{pump}", TAB_UI, LANE_X[1], y_p + 40, f"dispatch {label}", dispatch_code, outputs=12, wires=[ [f"ui_{pump}_{f}"] for _, f, _ in PUMP_FIELDS ] + [ ["chart_trend_flow"], ["chart_trend_power"], ["chart_trend_pressure"], ["chart_trend_pressure"], ["chart_trend_states"], ], )) for k, (label_txt, field, fmt) in enumerate(PUMP_FIELDS): nodes.append(ui_text( f"ui_{pump}_{field}", TAB_UI, LANE_X[2], y_p + 40 + k * 30, g, f"{label} {label_txt}", label_txt, fmt, )) nodes.append(ui_slider( f"ui_{pump}_setpoint", TAB_UI, LANE_X[0], y_p + 280, g, f"{label} setpoint", "Setpoint % (manual mode)", 0, 100, 5.0, f"setpoint_{pump}", wires=[f"lout_setpoint_{pump}_dash"], )) nodes.append(link_out( f"lout_setpoint_{pump}_dash", TAB_UI, LANE_X[1], y_p + 280, CH_PUMP_SETPOINT[pump], target_in_ids=[f"lin_setpoint_{pump}"], )) nodes.append(ui_button( f"btn_{pump}_start", TAB_UI, LANE_X[0], y_p + 330, g, f"{label} startup", "Startup", "fired", "str", topic=f"start_{pump}", color="#16a34a", icon="play_arrow", wires=[f"wrap_{pump}_start"], )) nodes.append(function_node( f"wrap_{pump}_start", TAB_UI, LANE_X[1] + 100, y_p + 330, f"build start ({label})", "msg.topic = 'execSequence';\n" "msg.payload = { source:'GUI', action:'execSequence', parameter:'startup' };\n" "return msg;", outputs=1, wires=[[f"lout_seq_{pump}_dash"]], )) nodes.append(ui_button( f"btn_{pump}_stop", TAB_UI, LANE_X[0], y_p + 380, g, f"{label} shutdown", "Shutdown", "fired", "str", topic=f"stop_{pump}", color="#ea580c", icon="stop", wires=[f"wrap_{pump}_stop"], )) nodes.append(function_node( f"wrap_{pump}_stop", TAB_UI, LANE_X[1] + 100, y_p + 380, f"build stop ({label})", "msg.topic = 'execSequence';\n" "msg.payload = { source:'GUI', action:'execSequence', parameter:'shutdown' };\n" "return msg;", outputs=1, wires=[[f"lout_seq_{pump}_dash"]], )) nodes.append(link_out( f"lout_seq_{pump}_dash", TAB_UI, LANE_X[2], y_p + 355, CH_PUMP_SEQUENCE[pump], target_in_ids=[f"lin_seq_{pump}"], )) # ---------- TREND CHARTS ---------- y_trends = y_pumps_start + len(PUMPS) * 480 + 60 nodes.append(comment("c_ui_trends", TAB_UI, LANE_X[2], y_trends, "── Trend charts (1h rolling) ──", "")) nodes.append(ui_chart( "chart_trend_basin", TAB_UI, LANE_X[3], y_trends + 40, g_tr_basin, "Basin level + fill %", "Basin level + fill", width=12, height=8, y_axis_label="m / %", remove_older="60", remove_older_unit="60", remove_older_points="3600", order=1, )) nodes.append(ui_chart( "chart_trend_demand", TAB_UI, LANE_X[3], y_trends + 80, g_tr_demand, "PS process demand %", "PS demand", width=12, height=6, y_axis_label="%", remove_older="60", remove_older_unit="60", remove_older_points="3600", ymin=0, ymax=110, order=1, )) nodes.append(ui_chart( "chart_trend_dq", TAB_UI, LANE_X[3], y_trends + 100, g_tr_dq, "ΔQ — inflow − outflow", "ΔQ", width=12, height=6, y_axis_label="m³/h", remove_older="60", remove_older_unit="60", remove_older_points="3600", order=1, )) # State timeline: each pump has a Y-axis "track" (A=0..2, B=3..5, C=6..8) # with discrete values: 0/3/6 idle, 0.5/3.5/6.5 coolingdown, # 1/4/7 starting/warmingup, 1.5/4.5/7.5 stopping, 2/5/8 operational. # Step interpolation so transitions are sharp. nodes.append(ui_chart( "chart_trend_states", TAB_UI, LANE_X[3], y_trends + 120, g_tr_states, "Pump state timeline", "Pump states (A=0-2, B=3-5, C=6-8)", width=12, height=6, y_axis_label="A B C tracks", remove_older="60", remove_older_unit="60", remove_older_points="3600", ymin=-0.5, ymax=8.5, order=1, interpolation="step", )) nodes.append(ui_chart( "chart_trend_flow", TAB_UI, LANE_X[3], y_trends + 120, g_tr_flow, "Inflow / Outflow / Per-pump flow", "Flows", width=12, height=8, y_axis_label="m³/h", remove_older="60", remove_older_unit="60", remove_older_points="3600", order=1, )) nodes.append(ui_chart( "chart_trend_power", TAB_UI, LANE_X[3], y_trends + 200, g_tr_power, "Per-pump power", "Power", width=12, height=8, y_axis_label="kW", remove_older="60", remove_older_unit="60", remove_older_points="3600", order=1, )) nodes.append(ui_chart( "chart_trend_pressure", TAB_UI, LANE_X[3], y_trends + 280, g_tr_press, "Per-pump up/dn pressure", "Pressure", width=12, height=8, y_axis_label="mbar", remove_older="60", remove_older_unit="60", remove_older_points="3600", order=1, )) return nodes # --------------------------------------------------------------------------- # Tab 3 — DEMO DRIVERS (inflow generator) # --------------------------------------------------------------------------- def build_drivers_tab(): nodes = [] nodes.append({ "id": TAB_DRIVERS, "type": "tab", "label": "🎛️ Demo Drivers", "disabled": False, "info": ( "Inflow generator. The operator picks a SCENARIO (Constant / Sine /" " Diurnal / Storm) on the dashboard and sets a BASELINE m³/h value." " Every second this generator emits q_in to the PS based on the " "active scenario + baseline.\n\n" "Outflow is implicit: the pumps drain the basin via MGC." ), }) nodes.append(comment("c_drv_title", TAB_DRIVERS, LANE_X[2], 20, "🎛️ DEMO DRIVERS — operator-driven inflow generator", "")) nodes.append(link_in( "lin_inflow_scenario", TAB_DRIVERS, LANE_X[0], 100, CH_INFLOW_SCENARIO, source_out_ids=["lout_inflow_scenario", "lout_setup_inflow_scn"], downstream=["inflow_state"], )) nodes.append(link_in( "lin_inflow_baseline", TAB_DRIVERS, LANE_X[0], 140, CH_INFLOW_BASELINE, source_out_ids=["lout_inflow_baseline", "lout_setup_inflow_baseline"], downstream=["inflow_state"], )) nodes.append(inject( "inflow_tick", TAB_DRIVERS, LANE_X[0], 200, "tick (1 Hz)", topic="tick", payload="", payload_type="date", repeat="1", wires=["inflow_state"], )) nodes.append(function_node( "inflow_state", TAB_DRIVERS, LANE_X[2], 160, "inflow scenario engine", "let scenario = context.get('scenario') || 'constant';\n" "let baseline = context.get('baseline');\n" "if (baseline == null) baseline = 60;\n" "\n" "if (msg.topic === 'inflowBaseline') {\n" " const v = Number(msg.payload);\n" " if (Number.isFinite(v) && v >= 0) {\n" " baseline = v;\n" " context.set('baseline', baseline);\n" " }\n" " return null;\n" "}\n" "if (msg.topic === 'scenario') {\n" " const s = String(msg.payload || '').toLowerCase();\n" " if (['constant','sine','diurnal','storm'].includes(s)) {\n" " scenario = s;\n" " context.set('scenario', scenario);\n" " }\n" " return null;\n" "}\n" "const t = Date.now() / 1000;\n" "let q_h;\n" "switch (scenario) {\n" " case 'sine': {\n" " q_h = baseline * (1 + 0.5 * Math.sin(2 * Math.PI * t / 240));\n" " break;\n" " }\n" " case 'diurnal': {\n" " q_h = baseline * (1 + 0.6 * Math.sin(2 * Math.PI * t / 480 - Math.PI/2));\n" " break;\n" " }\n" " case 'storm': {\n" " const phase = (t % 240) / 240;\n" " let factor;\n" " if (phase < 0.15) factor = 1 + (4 / 0.15) * phase;\n" " else factor = Math.max(1, 5 - (4 / 0.85) * (phase - 0.15));\n" " q_h = baseline * factor;\n" " break;\n" " }\n" " case 'constant':\n" " default:\n" " q_h = baseline;\n" "}\n" "q_h = Math.max(0, q_h);\n" "const q_s = q_h / 3600;\n" "return [\n" " { topic: 'q_in', payload: q_s, unit: 'm3/s', timestamp: Date.now() },\n" " { payload: { scenario, baseline, q_h, q_s, ts: Date.now() } },\n" "];", outputs=2, wires=[["lout_qin_drivers"], ["lout_evt_inflow"]], )) nodes.append(link_out( "lout_qin_drivers", TAB_DRIVERS, LANE_X[3], 140, CH_QIN, target_in_ids=["lin_qin_at_ps"], )) nodes.append(link_out( "lout_evt_inflow", TAB_DRIVERS, LANE_X[3], 180, CH_INFLOW_EVT, target_in_ids=["lin_evt_inflow"], )) return nodes # --------------------------------------------------------------------------- # Tab 4 — SETUP & INIT # --------------------------------------------------------------------------- def build_setup_tab(): nodes = [] nodes.append({ "id": TAB_SETUP, "type": "tab", "label": "⚙️ Setup & Init", "disabled": False, "info": ( "One-shot deploy-time injects:\n" " • MGC scaling = normalized + mode = optimalcontrol\n" " • all pumps mode = auto\n" " • initial inflow baseline + scenario\n\n" "Disable this tab in production." ), }) nodes.append(comment("c_setup_title", TAB_SETUP, LANE_X[2], 20, "⚙️ SETUP & INIT — one-shot deploy-time injects", "")) nodes.append(inject( "setup_mgc_scaling", TAB_SETUP, LANE_X[0], 100, "MGC scaling = normalized", topic="setScaling", payload="normalized", payload_type="str", once=True, once_delay="1.5", wires=["lout_setup_to_mgc"], )) nodes.append(inject( "setup_mgc_mode", TAB_SETUP, LANE_X[0], 160, "MGC mode = optimalcontrol", topic="setMode", payload="optimalcontrol", payload_type="str", once=True, once_delay="1.7", wires=["lout_setup_to_mgc"], )) nodes.append(link_out( "lout_setup_to_mgc", TAB_SETUP, LANE_X[1], 130, "setup:to-mgc", target_in_ids=["lin_setup_at_mgc"], )) nodes.append(inject( "setup_pumps_mode", TAB_SETUP, LANE_X[0], 240, "pumps mode = auto", topic="setMode", payload="auto", payload_type="str", once=True, once_delay="2.0", wires=["lout_mode_setup"], )) nodes.append(link_out( "lout_mode_setup", TAB_SETUP, LANE_X[1], 240, "cmd:mode", target_in_ids=["lin_mode"], )) nodes.append(inject( "setup_inflow_baseline", TAB_SETUP, LANE_X[0], 320, "inflow baseline = 25 m³/h (nominal)", topic="inflowBaseline", payload="25", payload_type="num", once=True, once_delay="2.5", wires=["lout_setup_inflow_baseline"], )) nodes.append(link_out( "lout_setup_inflow_baseline", TAB_SETUP, LANE_X[1], 320, CH_INFLOW_BASELINE, target_in_ids=["lin_inflow_baseline"], )) nodes.append(inject( "setup_inflow_scenario", TAB_SETUP, LANE_X[0], 380, "inflow scenario = sine", topic="scenario", payload="sine", payload_type="str", once=True, once_delay="2.7", wires=["lout_setup_inflow_scn"], )) nodes.append(link_out( "lout_setup_inflow_scn", TAB_SETUP, LANE_X[1], 380, CH_INFLOW_SCENARIO, target_in_ids=["lin_inflow_scenario"], )) # Manual calibrate basin button — does NOT auto-fire on deploy. # Auto-firing on every flow reload would clobber the basin level # mid-cycle and reset the simulation, so we expose this as an inject # the user clicks when they actually want to reset (e.g. starting a # fresh demo run). To use: open the editor's Setup tab and click the # button on this inject node. nodes.append(inject( "setup_calibrate_level", TAB_SETUP, LANE_X[0], 460, "[manual] calibrate basin = 1.0 m (click to reset)", topic="calibratePredictedLevel", payload="1.0", payload_type="num", once=False, # <- never fire on deploy wires=["lout_setup_calibrate"], )) nodes.append(link_out( "lout_setup_calibrate", TAB_SETUP, LANE_X[1], 460, "setup:calibrate-ps", target_in_ids=["lin_setup_calibrate_ps"], )) return nodes # --------------------------------------------------------------------------- # Tab 5 — TELEMETRY (port 1 → InfluxDB line protocol → http POST) # --------------------------------------------------------------------------- def build_telemetry_tab(): nodes = [] nodes.append({ "id": TAB_TLM, "type": "tab", "label": "📈 Telemetry", "disabled": False, "info": ( "InfluxDB writer: every EVOLV node's port-1 telemetry is fanned in " "via the evt:tlm link channel, converted to line protocol, and " "POSTed to InfluxDB v2 (org=evolv, bucket=telemetry).\n\n" "Pattern adapted from docker/demo-flow.json." ), }) nodes.append(comment("c_tlm_title", TAB_TLM, LANE_X[2], 20, "📈 TELEMETRY — InfluxDB writer", "")) nodes.append(link_in( "lin_tlm", TAB_TLM, LANE_X[0], 100, CH_TLM, source_out_ids=_all_tlm_lout_ids(), downstream=["fn_tlm_to_lp"], )) # ── Pipeline ── # link in → fn_tlm_to_lp (one line / msg) # → join (string mode, joiner=\n, count=100 OR timeout 1s) # → fn_tlm_post (set headers/url/method) # → http request → fn_count nodes.append(function_node( "fn_tlm_to_lp", TAB_TLM, LANE_X[2], 100, "→ InfluxDB line protocol", "const p = msg.payload;\n" "if (!p || !p.measurement || !p.fields) return null;\n" "const esc = (s) => String(s)\n" " .replace(/,/g, '\\\\,').replace(/ /g, '\\\\ ').replace(/=/g, '\\\\=');\n" "const tags = Object.entries(p.tags || {})\n" " .filter(([k, v]) => v !== undefined && v !== null && v !== '')\n" " .map(([k, v]) => `${esc(k)}=${esc(v)}`).join(',');\n" "const fieldPairs = Object.entries(p.fields)\n" " .filter(([k, v]) => v !== undefined && v !== null)\n" " .map(([k, v]) => {\n" " if (typeof v === 'number' && Number.isFinite(v)) return `${esc(k)}=${v}`;\n" " if (typeof v === 'boolean') return `${esc(k)}=${v}`;\n" " return `${esc(k)}=\"${String(v).replace(/\"/g, '\\\\\"')}\"`;\n" " });\n" "if (fieldPairs.length === 0) return null;\n" "const ts = Date.now() * 1000000;\n" "msg.payload = `${esc(p.measurement)}${tags ? ',' + tags : ''} `\n" " + `${fieldPairs.join(',')} ${ts}`;\n" "// Hint the join node to fire on size or timeout.\n" "msg.topic = 'tlm';\n" "return msg;", outputs=1, wires=[["join_tlm"]], )) # Idiomatic Node-RED batching: join collects messages into a single # newline-joined string, flushed every `count` messages OR `timeout` # seconds, whichever fires first. nodes.append({ "id": "join_tlm", "type": "join", "z": TAB_TLM, "name": "batch (200 lines / 2 s)", "mode": "custom", "build": "string", "property": "payload", "propertyType": "msg", "key": "topic", "joiner": "\\n", "joinerType": "str", "accumulate": False, "timeout": "2", "count": "200", "reduceRight": False, "reduceExp": "", "reduceInit": "", "reduceInitType": "", "reduceFixup": "", "x": LANE_X[3], "y": 100, "wires": [["fn_tlm_post"]], }) nodes.append(function_node( "fn_tlm_post", TAB_TLM, LANE_X[3] + 200, 100, "wrap as InfluxDB POST", "// Count lines for status reporting.\n" "const body = String(msg.payload || '');\n" "const lineCount = body ? body.split('\\n').length : 0;\n" "if (lineCount === 0) return null;\n" "msg.lineCount = lineCount;\n" "msg.headers = {\n" " 'Authorization': 'Token evolv-dev-token',\n" " 'Content-Type': 'text/plain'\n" "};\n" "msg.url = 'http://influxdb:8086/api/v2/write?org=evolv&bucket=telemetry&precision=ns';\n" "msg.method = 'POST';\n" "return msg;", outputs=1, wires=[["http_tlm"]], )) nodes.append({ "id": "http_tlm", "type": "http request", "z": TAB_TLM, "name": "Write InfluxDB", "method": "use", "ret": "txt", "paytoqs": "ignore", "url": "", "tls": "", "persist": False, "proxy": "", "authType": "", "senderr": False, "x": LANE_X[4] + 80, "y": 100, "wires": [["fn_tlm_count"]], }) nodes.append(function_node( "fn_tlm_count", TAB_TLM, LANE_X[5], 100, "Count writes", "const lines = Number(msg.lineCount) || 0;\n" "const writes = (global.get('influx_writes') || 0) + 1;\n" "const totalLines = (global.get('influx_lines') || 0) + lines;\n" "global.set('influx_writes', writes);\n" "global.set('influx_lines', totalLines);\n" "const errors = global.get('influx_errors') || 0;\n" "if (msg.statusCode && msg.statusCode >= 400) {\n" " global.set('influx_errors', errors + 1);\n" " node.status({fill:'red', shape:'ring',\n" " text:`ERR ${errors+1}: ${msg.statusCode}`});\n" "} else {\n" " node.status({fill:'green', shape:'dot',\n" " text:`${writes} POSTs · ${totalLines} lines (${errors} err)`});\n" "}\n" "return null;", outputs=1, wires=[[]], )) return nodes def _all_tlm_lout_ids(): """Every link-out id that emits to evt:tlm. Listed explicitly for stable cross-tab wiring.""" ids = [] for pump in PUMPS: ids.append(f"lout_tlm_{pump}") for suffix in ("u", "d", "f", "p"): ids.append(f"lout_tlm_meas_{pump}_{suffix}") ids.append("lout_tlm_mgc") ids.append("lout_tlm_ps") return ids # --------------------------------------------------------------------------- # Assemble + emit # --------------------------------------------------------------------------- def main(): nodes = ( build_process_tab() + build_ui_tab() + build_drivers_tab() + build_setup_tab() + build_telemetry_tab() ) json.dump(nodes, sys.stdout, indent=2) sys.stdout.write("\n") if __name__ == "__main__": main()