#!/usr/bin/env node /** * Monitor WWTP system health and process state. * Captures PS volume, flow rates, pump states, and control actions. */ const http = require('http'); const { execSync } = require('child_process'); const NR_URL = 'http://localhost:1880'; const SAMPLE_INTERVAL = 5000; const NUM_SAMPLES = 20; // 100 seconds function getLogs(lines = 50) { try { return execSync('docker logs evolv-nodered --tail ' + lines + ' 2>&1', { encoding: 'utf8', timeout: 5000, }); } catch (e) { return ''; } } function parseLogs(logs) { const result = { safety: [], pressure: 0, control: [], state: [], errors: [], flow: [] }; logs.split('\n').forEach(line => { if (!line.trim()) return; const volMatch = line.match(/vol=([-\d.]+) m3.*remainingTime=([\w.]+)/); if (volMatch) { result.safety.push({ vol: parseFloat(volMatch[1]), remaining: volMatch[2] }); return; } if (line.includes('Pressure change detected')) { result.pressure++; return; } if (line.includes('Controllevel') || line.includes('flowbased') || line.includes('control applying')) { result.control.push(line.trim().substring(0, 200)); return; } if (line.includes('startup') || line.includes('shutdown') || line.includes('machine state') || line.includes('Handling input') || line.includes('execSequence') || line.includes('execsequence')) { result.state.push(line.trim().substring(0, 200)); return; } if (line.includes('[ERROR]') || line.includes('Error')) { result.errors.push(line.trim().substring(0, 200)); return; } if (line.includes('netflow') || line.includes('Height') || line.includes('flow')) { result.flow.push(line.trim().substring(0, 200)); } }); return result; } (async () => { console.log('=== WWTP Health Monitor ==='); console.log(`Sampling every ${SAMPLE_INTERVAL/1000}s for ${NUM_SAMPLES * SAMPLE_INTERVAL / 1000}s\n`); const history = []; for (let i = 0; i < NUM_SAMPLES; i++) { const elapsed = (i * SAMPLE_INTERVAL / 1000).toFixed(0); const logs = getLogs(40); const parsed = parseLogs(logs); console.log(`--- Sample ${i+1}/${NUM_SAMPLES} (t=${elapsed}s) ---`); // Safety status if (parsed.safety.length > 0) { const latest = parsed.safety[parsed.safety.length - 1]; console.log(` āš ļø SAFETY: ${parsed.safety.length} triggers, vol=${latest.vol} m3`); } else { console.log(' āœ… SAFETY: OK'); } // Pressure changes if (parsed.pressure > 0) { console.log(` šŸ“Š PRESSURE: ${parsed.pressure} changes (sim active)`); } // Control actions if (parsed.control.length > 0) { parsed.control.slice(-3).forEach(c => console.log(` šŸŽ›ļø CONTROL: ${c}`)); } // State changes if (parsed.state.length > 0) { parsed.state.slice(-3).forEach(s => console.log(` šŸ”„ STATE: ${s}`)); } // Flow info if (parsed.flow.length > 0) { parsed.flow.slice(-2).forEach(f => console.log(` šŸ’§ FLOW: ${f}`)); } // Errors if (parsed.errors.length > 0) { parsed.errors.forEach(e => console.log(` āŒ ERROR: ${e}`)); } history.push({ t: parseInt(elapsed), safety: parsed.safety.length, pressure: parsed.pressure, control: parsed.control.length, state: parsed.state.length, errors: parsed.errors.length, }); console.log(''); if (i < NUM_SAMPLES - 1) { await new Promise(r => setTimeout(r, SAMPLE_INTERVAL)); } } // Summary console.log('\n=== Health Summary ==='); const totalSafety = history.reduce((a, h) => a + h.safety, 0); const totalErrors = history.reduce((a, h) => a + h.errors, 0); const totalControl = history.reduce((a, h) => a + h.control, 0); const totalState = history.reduce((a, h) => a + h.state, 0); console.log(`Safety triggers: ${totalSafety} ${totalSafety === 0 ? 'āœ…' : 'āš ļø'}`); console.log(`Errors: ${totalErrors} ${totalErrors === 0 ? 'āœ…' : 'āŒ'}`); console.log(`Control actions: ${totalControl}`); console.log(`State changes: ${totalState}`); if (totalSafety === 0 && totalErrors === 0) { console.log('\n🟢 SYSTEM HEALTHY'); } else if (totalErrors > 0) { console.log('\nšŸ”“ ERRORS DETECTED'); } else { console.log('\n🟔 SAFETY ACTIVE (may be normal during startup)'); } })().catch(err => { console.error('Monitor failed:', err); process.exit(1); });