from manpy.simulation.core.ObjectProperty import ObjectProperty
from manpy.simulation.RandomNumberGenerator import RandomNumberGenerator
from manpy.simulation.core.Globals import G
from manpy.simulation.core.utils import check_config_dict
from scipy import interpolate
import copy
[docs]
class Timeseries(ObjectProperty):
"""
The TimeSeries ObjectProperty generates TimeSeries for a Machine and stores them in Entities
:param id: The id of the Feature
:param name: The name of the Feature
:param victim: The machine to which the feature belongs
:param distribution: The statistical distribution of the value of the Datapoints
:param no_negative: If this value is true, returns 0 for values below 0 of the feature value
:param contribute: Needs Failures in a list as an input to contribute the TimeSeries value to conditions
:param start_value: The starting value of the TimeSeries
:param random_walk: If this is True, the TimeSeries will continuously take the previous Datapoint value into account
:param step_time: The time between each Datapoint for a TimeSeries
"""
def __init__(
self,
id="",
name="",
victim=None,
distribution={},
no_negative=False,
contribute=None,
start_value=0,
random_walk=False,
step_time=None,
):
ObjectProperty.__init__(self, id,
name,
victim=victim,
distribution=distribution,
no_negative=no_negative,
contribute=contribute,
start_value=start_value,
random_walk=random_walk,
steptime=step_time
)
G.TimeSeriesList.append(self)
self.step_time = step_time
[docs]
def initialize(self):
"""Initializes the object"""
ObjectProperty.initialize(self)
check_config_dict(self.distribution, ["Function", "DataPoints"], self.name)
# put all intervals into a sorted list
self.intervals = []
for i in self.distribution["Function"].keys():
self.intervals.append(i)
self.intervals.sort()
# check if intervals overlap
for i in range(len(self.intervals) - 1):
if self.intervals[i][1] > self.intervals[i+1][0]:
raise Exception("Intervals {} and {} from {} overlap".format(self.intervals[i], self.intervals[i+1], self.name))
# set stepsize
self.stepsize = (self.intervals[-1][1] - self.intervals[0][0]) / (self.distribution["DataPoints"] - 1)
# set events for a later date
self.victimIsInterrupted = self.env.event()
self.victimStartsProcessing = self.env.event()
self.victimEndsProcessing = self.env.event()
[docs]
def run(self):
"""Every Object has to have a run method. Simpy is mainly used in this function
:return: None
"""
while 1:
machineIsRunning = True
# wait for victim to start process if it is not already processing
if self.victim.operationNotFinished == True or self.env.now == 0:
self.expectedSignals["victimStartsProcessing"] = 1
yield self.victimStartsProcessing
self.victimStartsProcessing = self.env.event()
# set lists for value and time for current entity
self.featureHistory = []
self.timeHistory = []
# calculate step time if necessary
if self.step_time == None:
step_time = self.victim.tinM / self.distribution["DataPoints"]
else:
step_time = self.step_time
# set variables for the following loop
remainingTimeTillFeature = 0
steps = 0
interval = None
last_interval = None
f = None
while machineIsRunning:
# setup for signals
timeRestartedCounting = self.env.now
self.expectedSignals["victimEndsProcessing"] = 1
self.expectedSignals["victimIsInterrupted"] = 1
# waiting for a specific event
receivedEvent = yield self.env.any_of([
self.env.timeout(remainingTimeTillFeature),
self.victimIsInterrupted,
self.victimEndsProcessing
])
# if victim(Machine) has been interrupted
if self.victimIsInterrupted in receivedEvent:
# reset signal and recalculate remainingTimeTillFeature
self.victimIsInterrupted = self.env.event()
remainingTimeTillFeature = remainingTimeTillFeature - (self.env.now - timeRestartedCounting)
# state_controller
if self.distribution_state_controller and self.reset_distributions:
self.distribution_state_controller.reset()
# wait for victim to start processing again and reset signal afterwards
self.expectedSignals["victimResumesProcessing"] = 1
yield self.victimResumesProcessing
self.victimResumesProcessing = self.env.event()
# if victim finishes processing of current entity
elif self.victimEndsProcessing in receivedEvent:
# reset signal and recalculate remainingTimeTillFeature
self.victimEndsProcessing = self.env.event()
remainingTimeTillFeature = remainingTimeTillFeature - (self.env.now - timeRestartedCounting)
# state_controller
if self.distribution_state_controller:
self.distribution = self.distribution_state_controller.get_and_update()
self.rngFeature = RandomNumberGenerator(self, self.distribution.get("Feature"))
# if nothing interrupted, the datapoint can be generated
else:
self.label = None
# set other features as variables that are used in this timeseries
for key in list(self.distribution.keys()):
if key not in ["Function", "DataPoints", "Feature"]:
locals()[key] = self.distribution.get(key).featureValue
# set the interval that is currently being used
x = self.intervals[0][0] + (self.stepsize * steps)
for idx, i in enumerate(self.intervals):
if i[0] <= x <= i[1]:
interval = self.intervals[idx]
break
# check if the interval changed
if last_interval != interval:
f = None
# interpolate or not
if type(self.distribution["Function"][interval]) == list:
# set f for interpolation
if f == None:
data = copy.deepcopy(self.distribution["Function"][interval])
for i, axes in enumerate(data):
for j, coord in enumerate(axes):
if type(coord) == str:
data[i][j] = eval(coord)
xs = data[0]
ys = data[1]
f = interpolate.UnivariateSpline(xs, ys)
# calculate mean for interpolation
try :
if min(xs) <= x <= max(xs):
self.distribution["Feature"][list(self.distribution["Feature"].keys())[0]]["mean"] = f(x)
else:
self.distribution["Feature"][list(self.distribution["Feature"].keys())[0]]["mean"] = 0
except:
print("Interpolation needs at least 4 values")
else:
self.distribution["Feature"][list(self.distribution["Feature"].keys())[0]]["mean"] = eval(self.distribution["Function"][interval])
# generate datapoint
self.rngFeature = RandomNumberGenerator(self, self.distribution.get("Feature"))
value = self.rngFeature.generateNumber()
# check random walk
if self.random_walk == True:
self.featureValue += value
else:
self.featureValue = value
# check no_negative
if self.no_negative == True:
if self.featureValue < 0:
self.featureValue = 0
# add datapoint and time to corresponding lists
self.featureHistory.append(self.featureValue)
self.timeHistory.append(x)
# send data to QuestDB
from manpy.simulation.core.Globals import G
try:
if G.db:
G.db.insert(self.name, {"time": float(self.env.now), "value": float(self.featureValue)})
G.db.commit()
except:
print("Quest-DB error: TimeSeries")
# add datapoint value and time to Entity
ent = self.victim.Res.users[0]
ent.set_timeseries(self.featureHistory, self.label, self.timeHistory,
(self.id, self.victim.id))
self.outputTrace(ent.name, ent.id, str(self.featureValue))
# check contribution
if self.contribute != None:
for c in self.contribute:
if c.expectedSignals["contribution"]:
self.sendSignal(receiver=c, signal=c.contribution)
# set parameters for next loop
remainingTimeTillFeature = step_time
steps += 1
last_interval = interval
# check if it was the last step
if steps == self.distribution["DataPoints"]:
self.expectedSignals["victimEndsProcessing"] = 0
self.expectedSignals["victimIsInterrupted"] = 0
remainingTimeTillFeature = None
machineIsRunning = False