#!/usr/bin/env python3 """ Dual-curve E2E prediction benchmark for rotatingMachine. Deploys a Node-RED flow containing TWO rotatingMachine nodes, one per pump curve shipped in generalFunctions/datasets/assetData/curves/. For each curve we run a controlled ctrl x pressure sweep and record the predicted flow and power, plus the efficiency / CoG metrics. Output is a table the team can compare against supplier data sheets. This is a live-deploy benchmark (not a unit test) — it exercises the full Node-RED runtime path including delta compression on port 0, curve loading via generalFunctions, and output formatting. """ import copy import json import os import re import sys import time import threading import uuid import requests import websocket BASE = "http://localhost:1880" WS = "ws://localhost:1880/comms" CURVES_DIR = "/mnt/d/gitea/EVOLV/nodes/generalFunctions/datasets/assetData/curves" PUMPS = [ { "id": "H05K", "model": "hidrostal-H05K-S03R", }, { "id": "C5", "model": "hidrostal-C5-D03R-SHN1", }, ] events = [] start = None lock = threading.Lock() ready = threading.Event() def on_message(ws, msg): try: data = json.loads(msg) except Exception: return for item in (data if isinstance(data, list) else [data]): if str(item.get("topic", "")).startswith("debug"): d = item.get("data", {}) or {} with lock: events.append({ "t": round(time.time() - start, 3), "name": d.get("name"), "msg": d.get("msg"), }) def on_open(ws): ws.send(json.dumps({"subscribe": "debug"})) ready.set() def ws_thread(): websocket.WebSocketApp(WS, on_message=on_message, on_open=on_open).run_forever() def deploy(flow): r = requests.post( f"{BASE}/flows", headers={ "Content-Type": "application/json", "Node-RED-Deployment-Type": "full", }, data=json.dumps(flow), ) r.raise_for_status() return r.text def inject(node_id): r = requests.post(f"{BASE}/inject/{node_id}", timeout=5) return r.status_code def port0(node_tag): """Return the most recent parsed port-0 payload for a given pump tag.""" debug_name = f"P0-{node_tag}" with lock: for e in reversed(events): if e["name"] == debug_name: try: return json.loads(e["msg"]) except Exception: return None return None def curve_envelope(model): d = json.load(open(os.path.join(CURVES_DIR, f"{model}.json"))) pressures = sorted(int(k) for k in d["nq"].keys() if re.fullmatch(r"-?\d+", k)) flow_vals = [v for p in pressures for v in d["nq"][str(p)]["y"]] power_vals = [v for p in pressures for v in d["np"][str(p)]["y"]] return { "pressures": pressures, "p_low": pressures[0], "p_mid": pressures[len(pressures) // 2], "p_high": pressures[-1], "flow_range": (min(flow_vals), max(flow_vals)), "power_range": (min(power_vals), max(power_vals)), } def build_flow(): """Construct a Node-RED flow with one tab holding both pumps + injects + function nodes.""" flow = [{"id": "curve_bench_tab", "type": "tab", "label": "Curve Benchmark", "disabled": False}] # Generate an id-pool for injects and function nodes def nid(prefix, i=0): return f"{prefix}-{i}-{uuid.uuid4().hex[:8]}" for pump in PUMPS: pid = pump["id"] tab = "curve_bench_tab" # rotatingMachine node rm_id = f"rm_{pid}" flow.append({ "id": rm_id, "type": "rotatingMachine", "z": tab, "name": f"Pump-{pid}", "speed": "50", # fast ramp for benchmark "startup": "0", "warmup": "0", "shutdown": "0", "cooldown": "0", "movementMode": "staticspeed", "machineCurve": "", "uuid": f"bench-{pid}", "supplier": "hidrostal", "category": "pump", "assetType": "pump-centrifugal", "model": pump["model"], "unit": "m3/h", "curvePressureUnit": "mbar", "curveFlowUnit": "m3/h", "curvePowerUnit": "kW", "curveControlUnit": "%", "enableLog": False, "logLevel": "error", "positionVsParent": "atEquipment", "positionIcon": "", "hasDistance": False, "distance": 0, "distanceUnit": "m", "distanceDescription": "", "x": 500, "y": 100 + PUMPS.index(pump) * 400, "wires": [[f"fmt_{pid}"], [], []], }) # function node to merge deltas fmt_id = f"fmt_{pid}" flow.append({ "id": fmt_id, "type": "function", "z": tab, "name": f"merge-{pid}", "func": ( "const p = msg.payload || {};\n" "const c = context.get('c') || {};\n" "Object.assign(c, p);\n" "context.set('c', c);\n" "function find(prefix) {\n" " for (var k in c) if (k.indexOf(prefix) === 0) return c[k];\n" " return null;\n" "}\n" "msg.payload = {\n" " state: c.state || 'idle',\n" " mode: c.mode || 'auto',\n" " ctrl: c.ctrl != null ? Number(c.ctrl) : null,\n" " flow: find('flow.predicted.downstream.'),\n" " power: find('power.predicted.atequipment.'),\n" " NCog: c.NCog != null ? Number(c.NCog) : null,\n" " cog: c.cog != null ? Number(c.cog) : null,\n" " pU: find('pressure.measured.upstream.'),\n" " pD: find('pressure.measured.downstream.')\n" "};\n" "return msg;" ), "outputs": 1, "x": 760, "y": 100 + PUMPS.index(pump) * 400, "wires": [[f"dbg_{pid}"]], }) # debug node flow.append({ "id": f"dbg_{pid}", "type": "debug", "z": tab, "name": f"P0-{pid}", "active": True, "tosidebar": True, "console": False, "tostatus": False, "complete": "payload", "targetType": "msg", "x": 1000, "y": 100 + PUMPS.index(pump) * 400, "wires": [], }) # injects def mk_inject(name, topic, payload, y_offset): return { "id": f"inj_{pid}_{name.replace(' ', '_')}", "type": "inject", "z": tab, "name": name, "props": [ {"p": "topic", "vt": "str"}, {"p": "payload"}, ], "topic": topic, "payload": payload, "payloadType": "json", "repeat": "", "crontab": "", "once": False, "onceDelay": "", "x": 200, "y": y_offset, "wires": [[rm_id]], } base_y = 100 + PUMPS.index(pump) * 400 flow.append({ **mk_inject("setMode-virtual", "setMode", "\"virtualControl\"", base_y + 40), "payloadType": "str", "payload": "virtualControl", }) flow.append(mk_inject( "Startup", "execSequence", json.dumps({"source": "GUI", "action": "execSequence", "parameter": "startup"}), base_y + 80, )) return flow def run_sweep(pump_id, model, envelope): """For one pump, sweep (pressure, ctrl) and collect predictions.""" results = [] # Use 3 pressures (low/mid/high) and 4 ctrl levels pressures = [envelope["p_low"], envelope["p_mid"], envelope["p_high"]] ctrls = [20, 40, 60, 80] for p in pressures: # Inject pressures via the simulateMeasurement topic -- we'll do this # via the Node-RED admin API using a raw msg injection helper: send # via a synthetic inject. Easiest: create ephemeral inject? Simpler: # just POST directly to the node using the admin API is not possible # without a pre-wired inject. Instead we call the node via websocket # notify? Simpler: deploy a pair of dedicated 'sim' injects per pump. # But we want a dynamic sweep. Workaround: use the Node-RED http-in? # Best path: spawn a temporary inject at deploy time. Not trivial. # # Alternative that works with the deployed flow: post a message by # using the /inject admin endpoint with an inject node whose payload # we rewrite via PUT /flow. Simplest in practice: keep the flow # static but use the programmable approach: send msg via socket. # Here we'll just use 3 simulate injects per pump (low/mid/high). # Since we haven't built those, we fall back to modifying the flow # dynamically for each pressure. pass # <-- replaced below with alt strategy return results def build_sweep_flow(pressure): """Build a flow where pressures for both pumps are pinned to `pressure`.""" flow = build_flow() for pump in PUMPS: pid = pump["id"] rm_id = f"rm_{pid}" tab = "curve_bench_tab" base_y = 100 + PUMPS.index(pump) * 400 def inj(name, topic, payload_json, y): return { "id": f"sim_{pid}_{name}", "type": "inject", "z": tab, "name": name, "props": [{"p": "topic", "vt": "str"}, {"p": "payload"}], "topic": topic, "payload": payload_json, "payloadType": "json", "repeat": "", "crontab": "", "once": True, "onceDelay": "1", "x": 200, "y": y, "wires": [[rm_id]], } flow.append(inj( "sim-pU", "simulateMeasurement", json.dumps({"type": "pressure", "position": "upstream", "value": 0, "unit": "mbar"}), base_y + 160, )) flow.append(inj( "sim-pD", "simulateMeasurement", json.dumps({"type": "pressure", "position": "downstream", "value": pressure, "unit": "mbar"}), base_y + 200, )) # Setpoint injects (20/40/60/80) for k, val in enumerate([20, 40, 60, 80]): flow.append({ "id": f"mv_{pid}_{val}", "type": "inject", "z": tab, "name": f"Set {val}%", "props": [{"p": "topic", "vt": "str"}, {"p": "payload"}], "topic": "execMovement", "payload": json.dumps({"source": "GUI", "action": "execMovement", "setpoint": val}), "payloadType": "json", "repeat": "", "crontab": "", "once": False, "onceDelay": "", "x": 200, "y": base_y + 240 + k * 40, "wires": [[rm_id]], }) return flow def main(): global start start = time.time() threading.Thread(target=ws_thread, daemon=True).start() ready.wait(5) results_by_pump = {p["id"]: {"model": p["model"], "envelope": curve_envelope(p["model"]), "sweeps": []} for p in PUMPS} # Per-pump pressure plan: each pump sees only pressures inside its own # curve envelope. Out-of-range extrapolation is a known limitation # (see rm memory / known-issues) and is tested separately below. pressure_plan = [] seen = set() for p in PUMPS: env = results_by_pump[p["id"]]["envelope"] for label, val in (("low", env["p_low"]), ("mid", env["p_mid"]), ("high", env["p_high"])): key = (p["id"], val) if key not in seen: pressure_plan.append({"pump_id": p["id"], "pressure": val, "label": label}) seen.add(key) # Group by pressure so both pumps share a sweep when pressures overlap. pressures = sorted({row["pressure"] for row in pressure_plan}) pump_allowed_at = {p: [row["pump_id"] for row in pressure_plan if row["pressure"] == p] for p in pressures} for pressure in pressures: allowed = pump_allowed_at[pressure] flow = build_sweep_flow(pressure) print(f"\n=== Deploying sweep at pressure={pressure} mbar (pumps in range: {allowed}) ===") with lock: events.clear() deploy(flow) # allow pumps to register and reach operational time.sleep(4) # startup both pumps for pump in PUMPS: pid = pump["id"] inject(f"inj_{pid}_setMode-virtual") time.sleep(0.2) inject(f"inj_{pid}_Startup") time.sleep(3) # reach operational (startup=0, warmup=0 -> immediate) # pressure injects were set to once=True so they fire on deploy. Wait. time.sleep(2) for val in [20, 40, 60, 80]: for pump in PUMPS: if pump["id"] not in allowed: continue inject(f"mv_{pump['id']}_{val}") # ramp takes (val)/(speed=50) = val/50 s; plus a safety tick time.sleep(max(2.5, val / 50 + 1.5)) for pump in PUMPS: if pump["id"] not in allowed: continue pid = pump["id"] data = port0(pid) if not data: continue entry = { "pressure": pressure, "setpoint": val, "state": data.get("state"), "ctrl": data.get("ctrl"), "flow": data.get("flow"), "power": data.get("power"), "NCog": data.get("NCog"), "cog": data.get("cog"), } results_by_pump[pump["id"]]["sweeps"].append(entry) print(f" [{pump['id']}] p={pressure} setpoint={val} ctrl={entry['ctrl']} flow={entry['flow']} power={entry['power']} NCog={entry['NCog']}") # Envelope sanity check print("\n======== SUMMARY ========") out = {} for pid, info in results_by_pump.items(): env = info["envelope"] good = 0; bad = 0; notes = [] prior_flow_by_p = {} for row in info["sweeps"]: if row["flow"] is None or row["power"] is None: bad += 1; continue if row["flow"] < -1: bad += 1; notes.append(f"negative flow: {row}") elif row["power"] < -1: bad += 1; notes.append(f"negative power: {row}") elif row["flow"] > env["flow_range"][1] * 2: bad += 1; notes.append(f"flow above envelope {env['flow_range'][1]}: {row}") else: good += 1 # monotonicity in ctrl at fixed pressure by_p = {} for row in info["sweeps"]: by_p.setdefault(row["pressure"], []).append(row) mono_ok = True for p, rows in by_p.items(): rows.sort(key=lambda r: r["setpoint"]) flows = [r["flow"] for r in rows if r["flow"] is not None] for i in range(1, len(flows)): if flows[i] < flows[i-1] * 0.95: mono_ok = False notes.append(f"flow drops at p={p}: {flows}") break print(f"\n[{pid}] model={info['model']}") print(f" envelope flow {env['flow_range']} power {env['power_range']} pressures {env['p_low']}..{env['p_high']} mbar") print(f" sweep samples: good={good} bad={bad}") print(f" ctrl-monotonic: {mono_ok}") if notes: print(f" notes: {notes[:3]}") out[pid] = { "model": info["model"], "envelope": env, "samples": info["sweeps"], "good": good, "bad": bad, "mono_ok": mono_ok, } json.dump(out, open("/tmp/rm_curve_bench.json", "w"), indent=2, default=str) print("\nfull results -> /tmp/rm_curve_bench.json") if __name__ == "__main__": main()