883 lines
32 KiB
JavaScript
883 lines
32 KiB
JavaScript
|
|
const EventEmitter = require('events');
|
|
const {logger,configUtils,configManager, MeasurementContainer, predict, interpolation, childRegistrationUtils} = require('generalFunctions');
|
|
|
|
class Monster{
|
|
/*------------------- Construct and set vars -------------------*/
|
|
constructor(config={}) {
|
|
|
|
//init
|
|
this.init = false; // keep track of init
|
|
|
|
//basic setup
|
|
this.emitter = new EventEmitter(); // Own EventEmitter
|
|
|
|
this.logger = new logger(config.general.logging.enabled,config.general.logging.logLevel, config.general.name);
|
|
this.configManager = new configManager();
|
|
this.defaultConfig = this.configManager.getConfig('monster'); // Load default config for rotating machine ( use software type name ? )
|
|
this.configUtils = new configUtils(this.defaultConfig);
|
|
|
|
// -------------------------------------- fetch dependencies --------------------------
|
|
//this.math = require('mathjs');
|
|
|
|
//measurements
|
|
this.measurements = new MeasurementContainer({
|
|
autoConvert: true,
|
|
windowSize: 50,
|
|
defaultUnits: {
|
|
flow: 'm3/h',
|
|
volume: 'm3'
|
|
}
|
|
}, this.logger);
|
|
|
|
//child registration
|
|
this.child = {} ; // register childs
|
|
this.childRegistrationUtils = new childRegistrationUtils(this);
|
|
|
|
//Specific object info
|
|
this.aquonSampleName = "112100" ; // aquon sample name to start automatic sampling on the basis of the document
|
|
this.monsternametijden = {} ; // json monsternametijden file?
|
|
this.rain_data = {} ; // precipitation data
|
|
this.aggregatedOutput = {} ; // object that does not contain momentary values but a combination of all kinds of data over a fixed period of time
|
|
this.sumRain = 0 ; // total sum of rain over time window + n hours and - n hours
|
|
this.avgRain = 0 ; // total divided by number of locations to get average over total time
|
|
this.daysPerYear = 0 ; // how many days remaining for this year
|
|
this.lastRainUpdate = 0 ; // timestamp of last rain data update
|
|
this.rainMaxRef = 10 ; // mm reference for scaling linear prediction
|
|
this.rainStaleMs = 2 * 60 * 60 * 1000; // 2 hours
|
|
|
|
// outputs
|
|
this.pulse = false; // output pulse to sampling machine
|
|
this.bucketVol = 0; // how full is the sample?
|
|
this.sumPuls = 0; // number of pulses so far
|
|
this.predFlow = 0; // predicted flow over sampling time in hours, expressed in m3
|
|
this.bucketWeight = 0; // actual weight of bucket
|
|
|
|
//inputs
|
|
this.q = 0; // influent flow in m3/h (effective)
|
|
this.manualFlow = null; // manual flow override value in m3/h
|
|
this.i_start = false // when true, the program gets kicked off calculating what it needs to take samples
|
|
this.sampling_time = config.constraints.samplingtime; // time expressed in hours over which the sampling will run (currently 24)
|
|
this.emptyWeightBucket = config.asset.emptyWeightBucket; // empty weight of the bucket
|
|
this.nominalFlowMin = config.constraints.nominalFlowMin; // nominal dry-day flow in m3/h
|
|
this.flowMax = config.constraints.flowMax; // max inflow in m3/h
|
|
this.minSampleIntervalSec = config.constraints.minSampleIntervalSec || 60; // min seconds between samples
|
|
|
|
// internal vars
|
|
this.temp_pulse = 0; // each interval pulses send out 1 and then reset
|
|
this.volume_pulse = 0.05; // define volume pulse expressed in L
|
|
this.minVolume = config.constraints.minVolume;// define min volume in a sampling cabinet before a sample is declared valid expressed in L
|
|
this.maxVolume = 0; // calculated maxvolume depending on own weight
|
|
this.maxWeight = config.constraints.maxWeight;// define max volume in a sampling cabinet before a sample is declared invalid expressed in L
|
|
this.cap_volume = 55; // abs max capacity of bucket (volume) in liters
|
|
this.targetVolume = 0; // volume of sampling cabinet that model aims for
|
|
this.minPuls = 0; // calculates the min pulses depending on min_vol and max_vol
|
|
this.maxPuls = 0; // calculates the max pulses depending on min_vol and max_vol
|
|
this.absMaxPuls = 0; // capacity of sampling cabinet (number of pulses)
|
|
this.targetPuls = 0; // keeps track of the desired amount of pulses (+- 50% tolerance), based on aimed volume
|
|
this.m3PerPuls = 0; // each pulse is equal to a number of m3
|
|
this.predM3PerSec = 0; // predicted flow in m3 per second
|
|
this.m3PerTick = 0; // actual measured flow in m3 per second
|
|
this.m3Total = 0; // total measured flow over sampling time in m3
|
|
this.running = false; // define if sampling is running or not
|
|
this.invalidFlowBounds = false; // whether nominalFlowMin/flowMax are invalid
|
|
this.lastSampleTime = 0; // last sample (pulse) timestamp
|
|
this.lastSampleWarnTime = 0; // last warning timestamp for cooldown
|
|
this.missedSamples = 0; // count blocked samples due to cooldown
|
|
|
|
this.qLineRaw = {}; // see example
|
|
this.minSeen = {}; // keeps track of minimum ever seen so far in a time period for each hour (over totals not every value)
|
|
this.maxSeen = {}; // keeps track of maximum ever seen so far in a time period for each hour (over totals not every value)
|
|
this.qLineRefined = {}; // this should be the ( quantiles? ) classified in the datasets
|
|
this.calcTimeShiftDry = 0; // What is the delay after a dry period of minimum n hours
|
|
this.calcTimeShiftWet = 0;
|
|
this.calcCapacitySewer = 0;
|
|
// how much rain goes to the sewage ? -> calculate surface area of hardend / sewage.
|
|
|
|
this.minDryHours = 0; // what is the minimum of dry hours before we can calculate timeshift? spot this with moving average?
|
|
this.minWetHours = 0; // how long does it take to remove all the rain?
|
|
this.resolution = 0; // Number of chunks in qLineRaw / define how big the window is to sum all values ( for now we need to take 1 hour or bigger resolutions but in the future smaller is better to see more accurate correlations)
|
|
this.tmpTotQ = 0; // keep track of sum of q within resolution window
|
|
|
|
//old prediction factor
|
|
this.predFactor = 0.7; // define factor as multiplier for prediction
|
|
|
|
//track program start and stop
|
|
this.start_time = Date.now(); // default start time
|
|
this.stop_time = Date.now(); // default stop time
|
|
this.flowTime = 0; //keep track in detail how much time between 2 ticks for more accurate flow measurement
|
|
this.timePassed = 0; // time in seconds
|
|
this.timeLeft = 0; // time in seconds
|
|
this.currHour = new Date().getHours(); // on init define in which hour we are 0 - 23
|
|
|
|
if (Number.isFinite(config?.constraints?.maxRainRef)) {
|
|
this.rainMaxRef = config.constraints.maxRainRef;
|
|
}
|
|
|
|
this.init = true; // end of constructor
|
|
|
|
//set boundries and targets after init based on above settings
|
|
this.set_boundries_and_targets();
|
|
|
|
|
|
}
|
|
|
|
/*------------------- INPUT HANDLING -------------------*/
|
|
handleInput(topic, payload) {
|
|
switch (topic) {
|
|
case 'i_start':
|
|
this.i_start = Boolean(payload);
|
|
break;
|
|
case 'monsternametijden':
|
|
this.updateMonsternametijden(payload);
|
|
break;
|
|
case 'rain_data':
|
|
this.updateRainData(payload);
|
|
break;
|
|
case 'input_q':
|
|
this.updateManualFlow(payload);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
updateMonsternametijden(value) {
|
|
if (!this.init || !value || Object.keys(value).length === 0) {
|
|
return;
|
|
}
|
|
|
|
if (
|
|
typeof value[0]?.SAMPLE_NAME !== 'undefined' &&
|
|
typeof value[0]?.DESCRIPTION !== 'undefined' &&
|
|
typeof value[0]?.SAMPLED_DATE !== 'undefined' &&
|
|
typeof value[0]?.START_DATE !== 'undefined' &&
|
|
typeof value[0]?.END_DATE !== 'undefined'
|
|
) {
|
|
this.monsternametijden = value;
|
|
this.regNextDate(value);
|
|
}
|
|
}
|
|
|
|
updateRainData(value) {
|
|
this.rain_data = value;
|
|
this.lastRainUpdate = Date.now();
|
|
|
|
if (this.init && !this.running) {
|
|
this.updatePredRain(value);
|
|
}
|
|
}
|
|
|
|
updateBucketVol(val) {
|
|
this.bucketVol = val;
|
|
this.bucketWeight = val + this.emptyWeightBucket;
|
|
}
|
|
|
|
getSampleCooldownMs() {
|
|
if (!this.lastSampleTime) {
|
|
return 0;
|
|
}
|
|
const remaining = (this.minSampleIntervalSec * 1000) - (Date.now() - this.lastSampleTime);
|
|
return Math.max(0, remaining);
|
|
}
|
|
|
|
validateFlowBounds() {
|
|
const min = Number(this.nominalFlowMin);
|
|
const max = Number(this.flowMax);
|
|
const valid = Number.isFinite(min) && Number.isFinite(max) && min >= 0 && max > 0 && min < max;
|
|
this.invalidFlowBounds = !valid;
|
|
if (!valid) {
|
|
this.logger.warn(`Invalid flow bounds. nominalFlowMin=${this.nominalFlowMin}, flowMax=${this.flowMax}`);
|
|
}
|
|
return valid;
|
|
}
|
|
|
|
getRainIndex() {
|
|
if (!this.lastRainUpdate) {
|
|
return 0;
|
|
}
|
|
if (Date.now() - this.lastRainUpdate > this.rainStaleMs) {
|
|
return 0;
|
|
}
|
|
return Number.isFinite(this.avgRain) ? this.avgRain : 0;
|
|
}
|
|
|
|
getPredictedFlowRate() {
|
|
const min = Number(this.nominalFlowMin);
|
|
const max = Number(this.flowMax);
|
|
if (!Number.isFinite(min) || !Number.isFinite(max) || min < 0 || max <= 0 || min >= max) {
|
|
return 0;
|
|
}
|
|
const rainIndex = this.getRainIndex();
|
|
const scale = Math.max(0, Math.min(1, this.rainMaxRef > 0 ? rainIndex / this.rainMaxRef : 0));
|
|
return min + (max - min) * scale;
|
|
}
|
|
|
|
|
|
updateManualFlow(payload = {}) {
|
|
const value = Number(payload.value);
|
|
if (!Number.isFinite(value)) {
|
|
return;
|
|
}
|
|
|
|
const unit = payload.unit || 'm3/h';
|
|
this.manualFlow = value;
|
|
this.measurements
|
|
.type('flow')
|
|
.variant('manual')
|
|
.position('atequipment')
|
|
.value(value, Date.now(), unit);
|
|
}
|
|
|
|
handleMeasuredFlow(eventData) {
|
|
const value = Number(eventData?.value);
|
|
if (!Number.isFinite(value)) {
|
|
return;
|
|
}
|
|
|
|
const position = String(eventData.position || 'atequipment').toLowerCase();
|
|
const unit = eventData.unit || 'm3/h';
|
|
this.measurements
|
|
.type('flow')
|
|
.variant('measured')
|
|
.position(position)
|
|
.value(value, eventData.timestamp || Date.now(), unit);
|
|
}
|
|
|
|
getMeasuredFlow() {
|
|
const positions = ['upstream', 'downstream', 'atequipment'];
|
|
const values = [];
|
|
|
|
positions.forEach((position) => {
|
|
const measured = this.measurements
|
|
.type('flow')
|
|
.variant('measured')
|
|
.position(position)
|
|
.getCurrentValue();
|
|
|
|
if (Number.isFinite(measured)) {
|
|
values.push(measured);
|
|
}
|
|
});
|
|
|
|
if (!values.length) {
|
|
return null;
|
|
}
|
|
|
|
const sum = values.reduce((total, curr) => total + curr, 0);
|
|
return sum / values.length;
|
|
}
|
|
|
|
getManualFlow() {
|
|
const manual = this.measurements
|
|
.type('flow')
|
|
.variant('manual')
|
|
.position('atequipment')
|
|
.getCurrentValue();
|
|
|
|
return Number.isFinite(manual) ? manual : null;
|
|
}
|
|
|
|
getEffectiveFlow() {
|
|
const measured = this.getMeasuredFlow();
|
|
const manual = this.getManualFlow();
|
|
|
|
if (measured != null && manual != null) {
|
|
return (measured + manual) / 2;
|
|
}
|
|
|
|
if (measured != null) {
|
|
return measured;
|
|
}
|
|
|
|
if (manual != null) {
|
|
return manual;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
registerChild(child, softwareType) {
|
|
if (softwareType !== 'measurement' || !child?.measurements?.emitter) {
|
|
return;
|
|
}
|
|
|
|
const childType = child?.config?.asset?.type;
|
|
if (childType && childType !== 'flow') {
|
|
return;
|
|
}
|
|
|
|
const handler = (eventData) => this.handleMeasuredFlow(eventData);
|
|
child.measurements.emitter.on('flow.measured.upstream', handler);
|
|
child.measurements.emitter.on('flow.measured.downstream', handler);
|
|
child.measurements.emitter.on('flow.measured.atequipment', handler);
|
|
}
|
|
|
|
getOutput() {
|
|
const output = this.measurements.getFlattenedOutput();
|
|
const flowRate = Number(this.q) || 0;
|
|
const m3PerPulse = Number(this.m3PerPuls) || 0;
|
|
const pulseFraction = Number(this.temp_pulse) || 0;
|
|
const targetVolumeM3 = Number(this.targetVolume) > 0 ? this.targetVolume / 1000 : 0;
|
|
const flowToNextPulseM3 = m3PerPulse > 0 ? Math.max(0, (1 - pulseFraction) * m3PerPulse) : 0;
|
|
const timeToNextPulseSec = flowRate > 0 && flowToNextPulseM3 > 0
|
|
? Math.round((flowToNextPulseM3 / (flowRate / 3600)) * 100) / 100
|
|
: 0;
|
|
const targetProgressPct = targetVolumeM3 > 0
|
|
? Math.round((this.m3Total / targetVolumeM3) * 10000) / 100
|
|
: 0;
|
|
const targetDeltaM3 = targetVolumeM3 > 0
|
|
? Math.round((this.m3Total - targetVolumeM3) * 10000) / 10000
|
|
: 0;
|
|
|
|
output.pulse = this.pulse;
|
|
output.running = this.running;
|
|
output.bucketVol = this.bucketVol;
|
|
output.bucketWeight = this.bucketWeight;
|
|
output.sumPuls = this.sumPuls;
|
|
output.predFlow = this.predFlow;
|
|
output.predM3PerSec = this.predM3PerSec;
|
|
output.timePassed = this.timePassed;
|
|
output.timeLeft = this.timeLeft;
|
|
output.m3Total = this.m3Total;
|
|
output.q = this.q;
|
|
output.nominalFlowMin = this.nominalFlowMin;
|
|
output.flowMax = this.flowMax;
|
|
output.invalidFlowBounds = this.invalidFlowBounds;
|
|
output.minSampleIntervalSec = this.minSampleIntervalSec;
|
|
output.missedSamples = this.missedSamples;
|
|
output.sampleCooldownMs = this.getSampleCooldownMs();
|
|
output.maxVolume = this.maxVolume;
|
|
output.minVolume = this.minVolume;
|
|
output.nextDate = this.nextDate;
|
|
output.daysPerYear = this.daysPerYear;
|
|
output.m3PerPuls = this.m3PerPuls;
|
|
output.m3PerPulse = this.m3PerPuls;
|
|
output.pulsesRemaining = Math.max(0, (this.targetPuls || 0) - (this.sumPuls || 0));
|
|
output.pulseFraction = pulseFraction;
|
|
output.flowToNextPulseM3 = flowToNextPulseM3;
|
|
output.timeToNextPulseSec = timeToNextPulseSec;
|
|
output.targetVolumeM3 = targetVolumeM3;
|
|
output.targetProgressPct = targetProgressPct;
|
|
output.targetDeltaM3 = targetDeltaM3;
|
|
output.predictedRateM3h = this.getPredictedFlowRate();
|
|
|
|
return output;
|
|
}
|
|
|
|
/*------------------- FUNCTIONS -------------------*/
|
|
|
|
set_boundries_and_targets(){
|
|
|
|
// define boundries for algorithm
|
|
this.maxVolume = this.maxWeight - this.emptyWeightBucket ; // substract bucket weight of max volume assuming they are both on a 1 to 1 ratio
|
|
this.minPuls = Math.round(this.minVolume / this.volume_pulse); // minimum pulses we want before we have a valid sample
|
|
this.maxPuls = Math.round(this.maxVolume / this.volume_pulse); // maximum pulses we can handle (otherwise sample is too heavy)
|
|
this.absMaxPuls = Math.round(this.cap_volume / this.volume_pulse); // number of pulses a sample can contain before overflowing
|
|
// define target values
|
|
this.targetVolume = this.minVolume * Math.sqrt(this.maxVolume / this.minVolume);
|
|
//old way
|
|
//this.targetVolume = Math.round( ( ( (this.maxVolume - this.minVolume) / 2 ) + this.minVolume ) * 100) / 100; // calculate middle between min and max
|
|
// correct target values
|
|
this.targetPuls = Math.round(this.targetVolume / this.volume_pulse) ; // define desired amount of pulses (in this case our prediction can deviate 50% up and 50% down without a problem)
|
|
}
|
|
|
|
|
|
updatePredRain(value){
|
|
//make date objects to define relative time window
|
|
let now = new Date(Date.now());
|
|
let past = new Date(Date.now());
|
|
let future = new Date(Date.now());
|
|
let totalRaw = {};
|
|
let totalProb = {};
|
|
let totalAvg = {};
|
|
|
|
//refine object with different values
|
|
let rain = {};
|
|
rain.hourly = {}; // an object with timestamps and aggreated over all locations summed precipation in mm
|
|
rain.hourly.time = [];
|
|
rain.hourly.precipationRaw = [];
|
|
rain.hourly.precipationProb = [];
|
|
|
|
let numberOfLocations = 0;
|
|
|
|
//Make timestamp + 24 hours
|
|
future.setHours(now.getHours() + 24);
|
|
|
|
//Make timestamp - 24hours
|
|
past.setHours(now.getHours() - 24);
|
|
|
|
//go through all locations and sum up the average precipation of each location so we have summed precipation over every hour
|
|
Object.entries(value).forEach(([locationKey, location],locationindex) => {
|
|
|
|
//number of locations
|
|
numberOfLocations++;
|
|
|
|
// make an object to keep track of the dataset we load
|
|
this.aggregatedOutput[locationKey] = {};
|
|
this.aggregatedOutput[locationKey].tag = {};
|
|
this.aggregatedOutput[locationKey].tag.latitude = location.latitude;
|
|
this.aggregatedOutput[locationKey].tag.longitude = location.longitude;
|
|
this.aggregatedOutput[locationKey].precipationRaw = {};
|
|
this.aggregatedOutput[locationKey].precipationProb = {};
|
|
|
|
|
|
//loop through object for each location over all hourlys
|
|
Object.entries(location.hourly.time).forEach(([key, time], index) => {
|
|
|
|
this.aggregatedOutput[locationKey].precipationRaw[key] = {};
|
|
this.aggregatedOutput[locationKey].precipationProb[key] = {};
|
|
|
|
//convert string output to a date object
|
|
let checkdate = new Date(time);
|
|
|
|
//convert date to milliseconds timestamps
|
|
let currTimestamp = checkdate.getTime();
|
|
let probability = 100; //default probility unless otherwise defined
|
|
|
|
if(typeof location.hourly.precipitation_probability !== 'undefined'){
|
|
probability = location.hourly.precipitation_probability[key];
|
|
|
|
}
|
|
|
|
if(probability > 0){
|
|
probability /= 100;
|
|
}
|
|
|
|
// only interested in dates before timeframe and after to make use of
|
|
// ( currTimestamp >= now && currTimestamp < future) || ( currTimestamp < now && currTimestamp > past )
|
|
if( true ){
|
|
|
|
typeof totalRaw[currTimestamp] === 'undefined' ? totalRaw[currTimestamp] = 0 : null;
|
|
typeof totalProb[currTimestamp] === 'undefined' ? totalProb[currTimestamp] = 0 : null;
|
|
|
|
//placed probability into the equation
|
|
totalRaw[currTimestamp] += location.hourly.precipitation[key] ;
|
|
totalProb[currTimestamp] += ( location.hourly.precipitation[key] * probability ) ;
|
|
|
|
//keep track of all requested data
|
|
this.aggregatedOutput[locationKey].precipationRaw[key]["val"] = location.hourly.precipitation[key]; // raw data from open weather data
|
|
this.aggregatedOutput[locationKey].precipationRaw[key]["time"] = currTimestamp;
|
|
|
|
this.aggregatedOutput[locationKey].precipationProb[key]["val"] = probability; // probability of open weather
|
|
this.aggregatedOutput[locationKey].precipationProb[key]["time"] = currTimestamp;
|
|
|
|
}
|
|
|
|
//remove dead info
|
|
if(Object.keys(this.aggregatedOutput[locationKey].precipationRaw[key]).length == 0 ){
|
|
delete this.aggregatedOutput[locationKey].precipationRaw[key];
|
|
};
|
|
|
|
if(Object.keys(this.aggregatedOutput[locationKey].precipationProb[key]).length == 0 ){
|
|
delete this.aggregatedOutput[locationKey].precipationProb[key];
|
|
};
|
|
|
|
});
|
|
});
|
|
|
|
//total sum expected over time window (just for ref now not so important anymore)
|
|
this.sumRain = Object.values(totalProb).reduce((sum, value) => sum + value, 0);
|
|
this.avgRain = this.sumRain / numberOfLocations;
|
|
|
|
//make average over prob
|
|
Object.entries(totalProb).forEach(([key, sum],index) => {
|
|
typeof totalAvg[key] === 'undefined' ? totalAvg[key] = 0 : null;
|
|
totalAvg[key] = sum / numberOfLocations;
|
|
});
|
|
|
|
//make new prediction
|
|
return this.aggregatedOutput;
|
|
}
|
|
|
|
// for getting the day of the year (0-365)
|
|
getDayOfYear(ts){
|
|
const start = new Date(ts.getFullYear(), 0, 1);
|
|
const diff = ts - start;
|
|
const oneDay = 1000 * 60 * 60 * 24;
|
|
return Math.floor(diff / oneDay);
|
|
}
|
|
|
|
|
|
get_model_prediction(){
|
|
// Linear predictor based on rain index with flow bounds.
|
|
const samplingHours = Number(this.sampling_time) || 0;
|
|
const predictedRate = this.getPredictedFlowRate();
|
|
const fallbackRate = this.getEffectiveFlow();
|
|
const flowM3PerHour = predictedRate > 0 ? predictedRate : fallbackRate;
|
|
const fallback = Math.max(0, flowM3PerHour * samplingHours);
|
|
|
|
this.predFlow = fallback;
|
|
return this.predFlow;
|
|
}
|
|
|
|
// Legacy/experimental model-based prediction (kept for reference; not used by default).
|
|
get_model_prediction_from_rain(){
|
|
|
|
// combine 24 hourly predictions to make one daily prediction (for the next 24 hours including the current hour)
|
|
let inputs = [];
|
|
for (let predHour = 0; predHour <= 23; predHour++) {
|
|
|
|
// select 48 timestamps based on hour te be predicted
|
|
let now = new Date();
|
|
const lastHour = new Date(now.setHours(now.getHours() + predHour));
|
|
let timestamps = this.rain_data[0].hourly.time.map(ts => new Date(ts));
|
|
let timestamps_48 = timestamps.filter(ts => ts <= lastHour).slice(-48)
|
|
|
|
// for each relevant hour calculate the mean precipitation across all areas
|
|
let precipitation = [];
|
|
for (let i = 0; i < timestamps.length; i++) {
|
|
|
|
if(timestamps_48.includes(timestamps[i])) {
|
|
|
|
let values = [];
|
|
for (let j = 0; j < this.rain_data.length; j++) {
|
|
|
|
values.push(this.rain_data[j].hourly.precipitation[i]);
|
|
}
|
|
let mean = values.reduce((sum, value) => sum + value, 0) / this.rain_data.length;
|
|
precipitation.push(mean);
|
|
}
|
|
}
|
|
|
|
// generate seasonal variables for model: hour of day, day of week, day of year (last 2 with sin cos transformation)
|
|
let hour = timestamps_48.map(ts => ts.getHours());
|
|
let weekdayJS = timestamps_48.map(ts => ts.getDay()); // Javascript weekday
|
|
let weekdayPY = weekdayJS.map(weekdayJS => (weekdayJS + 6) % 7); // Python weekday
|
|
let weekdaySin = weekdayPY.map(weekdayPY => Math.sin(2 * Math.PI * weekdayPY / 7));
|
|
let weekdayCos = weekdayPY.map(weekdayPY => Math.cos(2 * Math.PI * weekdayPY / 7));
|
|
let dayOfYear = timestamps_48.map(ts => this.getDayOfYear(ts));
|
|
let dayOfYearSin = dayOfYear.map(day => Math.sin(2 * Math.PI * day / 365));
|
|
let dayOfYearCos = dayOfYear.map(day => Math.cos(2 * Math.PI * day / 365));
|
|
|
|
// standardize variables for prediction and 'zip' them
|
|
const scaling = [
|
|
{
|
|
"hour mean": 11.504046716524947,
|
|
"weekdaySin mean": -0.00023422353487966347,
|
|
"weekdayCos mean": 0.0033714029956787715,
|
|
"dayOfYearSin mean": 0.06748893577363864,
|
|
"dayOfYearCos mean": -0.02137433139416939,
|
|
"precipitation mean": 0.0887225073082283
|
|
},
|
|
{
|
|
"hour scale": 6.92182769305216,
|
|
"weekdaySin scale": 0.7073194528907719,
|
|
"weekdayCos scale": 0.7068859670013796,
|
|
"dayOfYearSin scale": 0.701099604274817,
|
|
"dayOfYearCos scale": 0.7095405037003095,
|
|
"precipitation scale": 0.4505403578968155
|
|
},
|
|
{
|
|
"Flow (m3/h) mean": 1178.7800890533754
|
|
},
|
|
{
|
|
"Flow (m3/h) scale": 1025.3973622173557
|
|
}
|
|
]
|
|
const means = scaling[0];
|
|
const scales = scaling[1];
|
|
|
|
let features = [hour, weekdaySin, weekdayCos, dayOfYearSin, dayOfYearCos, precipitation];
|
|
const names = ["hour", "weekdaySin", "weekdayCos", "dayOfYearSin", "dayOfYearCos", "precipitation"]
|
|
|
|
features = features.map((arr, i) =>
|
|
arr.map(value => (value - means[`${names[i]} mean`]) / scales[`${names[i]} scale`]));
|
|
[hour, weekdaySin, weekdayCos, dayOfYearSin, dayOfYearCos, precipitation] = features;
|
|
|
|
const zipped = this.zip(hour, weekdaySin, weekdayCos, dayOfYearSin, dayOfYearCos, precipitation);
|
|
|
|
// collect inputdata for model
|
|
inputs.push(zipped);
|
|
|
|
}
|
|
const output = this.model_loader(inputs);
|
|
console.log('Final output: ' + output);
|
|
}
|
|
|
|
async model_loader(inputs){
|
|
|
|
let dailyPred = 0;
|
|
|
|
try {
|
|
|
|
|
|
// Try loading with default input shape*/
|
|
const path = 'nodes/generalFunctions/datasets/lstmData/tfjs_model/model.json';
|
|
const model = await this.modelLoader.loadModelPath(path);
|
|
console.log('Model loaded successfully!');
|
|
|
|
// make predictions
|
|
for (const input of inputs) {
|
|
|
|
const inputTensor = tf.tensor3d([input]);
|
|
const predict = model.predict(inputTensor);
|
|
let predictValue = await predict.data();
|
|
|
|
// back-transformation because of standardization of the response variable
|
|
predictValue = predictValue[0] * 1024.1940942 + 1188.0105115;
|
|
dailyPred += predictValue;
|
|
}
|
|
console.log('Daily prediction: ' + dailyPred);
|
|
} catch (error) {
|
|
console.error('Failed to load model:', error);
|
|
}
|
|
return dailyPred;
|
|
}
|
|
|
|
sampling_program(){
|
|
|
|
// ------------------ Run once on conditions and start sampling
|
|
if( ( (this.i_start ) || ( Date.now() >= this.nextDate ) ) && !this.running ){
|
|
|
|
if (!this.validateFlowBounds()) {
|
|
this.running = false;
|
|
this.i_start = false;
|
|
return;
|
|
}
|
|
|
|
this.running = true;
|
|
|
|
// reset persistent vars
|
|
this.temp_pulse = 0;
|
|
this.pulse = false;
|
|
this.updateBucketVol(0);
|
|
this.sumPuls = 0;
|
|
this.m3Total = 0;
|
|
this.timePassed = 0; // time in seconds
|
|
this.timeLeft = 0; // time in seconds
|
|
this.predM3PerSec = 0;
|
|
|
|
//run prediction to ensure its value is filled
|
|
this.get_model_prediction();
|
|
|
|
// define m3 per pulse for this run and round to int !
|
|
this.m3PerPuls = Math.round(this.predFlow / this.targetPuls);
|
|
this.predM3PerSec = this.predFlow / this.sampling_time / 60 / 60; // predicted m3 per time
|
|
|
|
// define start and stop time based on calender data
|
|
this.start_time = Date.now();
|
|
this.stop_time = Date.now() + (this.sampling_time * 60 * 60 * 1000); // convert to milliseconds
|
|
|
|
//reset parameters and look for next date
|
|
this.regNextDate(this.monsternametijden);
|
|
|
|
// reset start
|
|
this.i_start = false;
|
|
}
|
|
|
|
// ------------------ Run for as long as sampling time is not greater than stop time
|
|
if(this.stop_time > Date.now()){
|
|
|
|
// define time vars
|
|
this.timePassed = Math.round( ( Date.now() - this.start_time ) / 1000);
|
|
this.timeLeft = Math.round( ( this.stop_time - Date.now() ) / 1000);
|
|
|
|
// calc temp pulse rate
|
|
let update = this.m3PerTick / this.m3PerPuls;
|
|
|
|
// update values
|
|
this.temp_pulse += update;
|
|
this.m3Total += this.m3PerTick;
|
|
|
|
// check if we need to send out a pulse (stop sending pulses if capacity is reached)
|
|
if(this.temp_pulse >= 1 && this.sumPuls < this.absMaxPuls){
|
|
const now = Date.now();
|
|
const cooldownMs = this.minSampleIntervalSec * 1000;
|
|
const blocked = this.lastSampleTime && (now - this.lastSampleTime) < cooldownMs;
|
|
|
|
if (blocked) {
|
|
this.missedSamples++;
|
|
this.pulse = false;
|
|
this.temp_pulse = Math.min(this.temp_pulse, 1);
|
|
|
|
if (!this.lastSampleWarnTime || (now - this.lastSampleWarnTime) > cooldownMs) {
|
|
this.lastSampleWarnTime = now;
|
|
this.logger.warn(`Sampling too fast. Cooldown active for ${Math.ceil((cooldownMs - (now - this.lastSampleTime)) / 1000)}s.`);
|
|
}
|
|
} else {
|
|
// reset
|
|
this.temp_pulse += -1;
|
|
// send out a pulse and add to count
|
|
this.pulse = true;
|
|
this.lastSampleTime = now;
|
|
// count pulses
|
|
this.sumPuls++;
|
|
// update bucket volume each pulse
|
|
this.updateBucketVol(Math.round(this.sumPuls * this.volume_pulse * 100) / 100);
|
|
}
|
|
|
|
}
|
|
else{
|
|
|
|
if( this.sumPuls > this.absMaxPuls){
|
|
|
|
// find out how to reschedule sample automatically?
|
|
}
|
|
|
|
//update pulse when its true
|
|
if(this.pulse){
|
|
this.pulse = false; // continue but don't send out a pulse
|
|
}
|
|
|
|
}
|
|
}
|
|
else
|
|
{
|
|
//after setting once dont do it again
|
|
if(this.running){
|
|
// Vars can only be 0 if this is not running
|
|
this.m3PerPuls = 0;
|
|
this.temp_pulse = 0;
|
|
this.pulse = false;
|
|
this.updateBucketVol(0);
|
|
this.sumPuls = 0;
|
|
this.timePassed = 0; // time in seconds
|
|
this.timeLeft = 0; // time in seconds
|
|
this.predFlow = 0;
|
|
this.predM3PerSec = 0;
|
|
this.m3Total = 0;
|
|
this.running = false; // end of sampling program (stop_time reached)
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
flowCalc(){
|
|
//reset timePassed
|
|
let timePassed = 0;
|
|
|
|
// each tick calc flowtimepassed
|
|
this.flowTime > 0 ? timePassed = ( Date.now() - this.flowTime) / 1000 : timePassed = 0 ;
|
|
|
|
//conver to m3 per tick
|
|
this.m3PerTick = this.q / 60 / 60 * timePassed ;
|
|
|
|
// put new timestamp
|
|
this.flowTime = Date.now();
|
|
|
|
}
|
|
|
|
//goes through time related functions
|
|
tick(){
|
|
|
|
// ------------------ 1.0 Main program loop ------------------
|
|
this.logger.debug('Monster tick running');
|
|
|
|
//resolve effective flow in m3/h
|
|
this.q = this.getEffectiveFlow();
|
|
|
|
//calculate flow based on input
|
|
this.flowCalc();
|
|
|
|
//run sampling program
|
|
this.sampling_program();
|
|
|
|
//logQ for predictions / forecasts
|
|
this.logQoverTime();
|
|
}
|
|
|
|
regNextDate(monsternametijden){
|
|
|
|
let next_date = new Date(new Date().setFullYear(new Date().getFullYear() + 1));
|
|
let n_days_remaining = 0;
|
|
|
|
if(typeof monsternametijden !== 'undefined'){
|
|
// loop through lines
|
|
Object.entries(monsternametijden).forEach(([key, line],index) => {
|
|
|
|
//console.log(line.START_DATE);
|
|
//check if date is not null
|
|
if(line.START_DATE != "NULL"){
|
|
let curr_date_conv = new Date(line.START_DATE);
|
|
let curr_date = curr_date_conv.getTime();
|
|
|
|
//check if sample name is this sample and if date is bigger than now.
|
|
if(line.SAMPLE_NAME == this.aquonSampleName && curr_date > Date.now() ){
|
|
|
|
//only keep date that is bigger than current but smaller than the ones that follow after it.
|
|
if(curr_date < next_date){ next_date = curr_date; }
|
|
|
|
// check if its within this year only show those days as days remaining
|
|
if( new Date().getFullYear() == curr_date_conv.getFullYear() ){ n_days_remaining++; }
|
|
}
|
|
}
|
|
|
|
});
|
|
}
|
|
else{
|
|
//this.warning.push(3);
|
|
}
|
|
|
|
//store vars remaining
|
|
this.daysPerYear = n_days_remaining;
|
|
this.nextDate = next_date;
|
|
}
|
|
|
|
logQoverTime(){
|
|
|
|
//store currHour in temp obj for easy ref
|
|
let h = this.currHour;
|
|
|
|
// define rain hour of which the correlation is the biggest this doesnt belong in this section do this afterwards
|
|
// let rainH = h - this.calcTimeShift ;
|
|
|
|
// how much rain fell on rainH (define category)
|
|
|
|
// fetch current hour from actual time
|
|
const currentHour = new Date().getHours();
|
|
|
|
//on hour change begin log
|
|
if(h !== currentHour ){
|
|
|
|
//write current total to object
|
|
this.qLineRaw.h = this.tmpTotQ
|
|
|
|
//reset tmpTotQ
|
|
|
|
//set this.currHour to currentHour
|
|
}
|
|
|
|
}
|
|
|
|
//create objects where to push arrays in to keep track of data
|
|
createMinMaxSeen(){
|
|
//check which hour it is , then make sum , after sum is complete check which hour it is
|
|
//loop over sampling time expressed in hours
|
|
for(let h = 1; h < this.sampling_time ; h++){
|
|
this.minSeen = {};
|
|
}
|
|
}
|
|
|
|
|
|
} // end of class
|
|
|
|
module.exports = Monster;
|
|
|
|
|
|
const mConfig={
|
|
general: {
|
|
name: "Monster",
|
|
logging:{
|
|
logLevel: "debug",
|
|
enabled: true,
|
|
},
|
|
},
|
|
asset: {
|
|
emptyWeightBucket: 3,
|
|
},
|
|
constraints: {
|
|
minVolume: 4,
|
|
maxWeight: 23,
|
|
},
|
|
}
|
|
|
|
if (require.main === module) {
|
|
const monster = new Monster(mConfig);
|
|
(async () => {
|
|
const intervalId = setInterval(() => {
|
|
monster.tick();
|
|
}, 1000);
|
|
})();
|
|
}
|