Full examples

Here, we show you full code examples in our recommended order. The files can also be found in the following project directory: manpy/simulation/examples/

Quality_Control.py

This file demonstrates custom quality control in machines.

from manpy.simulation.imports import Machine, Source, Exit, Feature
from manpy.simulation.core.Globals import runSimulation, get_feature_values_by_id, get_feature_labels_by_id


# Any function can be employed as the condition to control the entity's quality before it exits the machine
# You can utilize any simulation values for quality control purposes
# Return True to reject/discard the entity, and False to allow it to proceed
def condition(machine):
    activeEntity = machine.getActiveEntity()
    # Access first element since function returns a list
    feature_value = get_feature_values_by_id(activeEntity, ["Ftr1"])[0]
    labels = get_feature_labels_by_id(activeEntity, ["Ftr1"])[0]

    if feature_value > 7 or feature_value < 3:
        return True
    else:
        return False

# Objects
S = Source("S1", "Source", interArrivalTime={"Fixed": {"mean": 0.1}}, entity="manpy.Part")

# Assign the condition as the "control" parameter for any machine
M1 = Machine("M1", "Machine1", processingTime={"Normal": {"mean": 0.2, "stdev": 0.1, "min": 0.08, "max": 0.34}},
             control=condition)

E1 = Exit("E1", "Exit1")

# ObjectProperty
Ftr1 = Feature("Ftr1", "Feature1", victim=M1,
               distribution={"Feature": {"Normal": {"mean": 5, "stdev": 2, "min": 1, "max": 9}}})

# Routing
S.defineRouting([M1])
M1.defineRouting([S], [E1])
E1.defineRouting([M1])


def main(test=0):
    maxSimTime = 480

    runSimulation([S, M1, E1, Ftr1], maxSimTime)

    print("""
            Discards: {}
            Produced: {}
            """.format(len(M1.discards), E1.numOfExits))

if __name__ == "__main__":
    main()

Dependency.py

This file demonstrates functional dependencies between features.

from manpy.simulation.imports import Machine, Source, Exit, Failure, Feature, Queue
from manpy.simulation.core.Globals import runSimulation

# condition for quality control
def condition(self):
    activeEntity = self.Res.users[0]
    means = [1.6, 3500, 450, 180, 400, 50, 190, 400]
    stdevs = [0.2, 200, 50, 30, 50, 5, 10, 50]
    for idx, feature in enumerate(activeEntity.features):
        if feature != None:
            min = means[idx] - 2 * stdevs[idx]
            max = means[idx] + 2 * stdevs[idx]
            if feature < min or feature > max:
                return True
    return False


# Objects
S = Source("S1", "Source", interArrivalTime={"Fixed": {"mean": 0.4}}, entity="manpy.Part", capacity=1)
Soldering = Machine("M0", "Löten", processingTime={"Normal": {"mean": 0.8, "stdev": 0.075, "min": 0.425, "max": 1.175}})
Q = Queue("Q", "Queue")
Gluing = Machine("M1", "Kleben", processingTime={"Fixed": {"mean": 0.8, "stdev": 0.075, "min": 0.425, "max": 1.175}}, control=condition)
E1 = Exit("E1", "Exit1")


# ObjectProperty

# With the "dependent" parameter, you can create Features based on the values of other features
# Dependent takes a function and populates variables with the last feature value of the corresponding Feature whenever a feature value is generated
# It is also possible to choose a distribution on top of a dependency (see Temperature example)
# Always use x1, x2, ... or similar variables to avoid complications

# Soldering
Voltage = Feature("Ftr0", "Feature0", victim=Soldering, distribution={"Feature": {"Normal": {"mean": 1.6, "stdev": 0.2}}})
Current = Feature("Ftr1", "Feature1", victim=Soldering, dependent={"Function" : "1000*x1 + 1900", "x1" : Voltage})
Resistance = Feature("Ftr2", "Feature2", victim=Soldering, dependent={"Function" : "(x1/x2)*1000000", "x1" : Voltage, "x2" : Current})
Pressure = Feature("Ftr3", "Feature3", victim=Soldering, distribution={"Feature": {"Normal": {"mean": 180, "stdev": 30}}})
Insertion_depth = Feature("Ftr4", "Feature4", victim=Soldering, distribution={"Feature": {"Normal": {"mean": 400, "stdev": 50}}})

# Gluing
Flow_rate = Feature("Ftr5", "Feature5", victim=Gluing, distribution={"Feature": {"Normal": {"mean": 50, "stdev": 5}}})
# The calculated value from "dependent" becomes the distribution's mean, allowing you to apply any desired dispersion
Temperature = Feature("Ftr6", "Feature6", victim=Gluing, dependent={"Function" : "2*x3 + 90", "x3" : Flow_rate}, distribution={"Feature": {"Normal": {"stdev": 1}}})
Mass = Feature("Ftr7", "Feature7", victim=Gluing, distribution={"Feature": {"Normal": {"mean": 400, "stdev": 50}}})


# ObjectInterruption

# With the parameter entity=True, the Time-to-Failure (TTF) is calculated based on the processing time of the entity within the machine
# The time of failure can be modified on a scale from 0 to 1, where 0 represents the beginning of processing, and 1 represents the end
# By adding "probability" to TTR, the occurrence of failure will be probabilistic, determined by chance.
Stuck = Failure("Flr0", "Failure0", victim=Gluing, entity=True,
                distribution={"TTF": {"Fixed": {"mean": 0}}, "TTR": {"Normal": {"mean": 2,"stdev": 0.2, "min":0, "probability": 0.05}}})


# Routing
S.defineRouting([Soldering])
Soldering.defineRouting([S], [Q])
Q.defineRouting([Soldering], [Gluing])
Gluing.defineRouting([Q], [E1])
E1.defineRouting([Gluing])


def main(test=0):
    maxSimTime = 100
    objectList = [S, Soldering, Q, Gluing, E1, Stuck, Voltage, Current, Resistance, Pressure, Insertion_depth, Flow_rate, Temperature, Mass]

    runSimulation(objectList, maxSimTime)

    # show dependency
    print("Voltage:  {:.2f} V\nCurrent:  {:.2f} A\nResistance:  {:.2f} Ohm\nV calculated (I*R):  {:.2f} V\n".format(
        Soldering.entities[0].features[0],
        Soldering.entities[0].features[1]/1000,
        Soldering.entities[0].features[2]/1000,
        (Soldering.entities[0].features[1]/1000)*(Soldering.entities[0].features[2]/1000)))

    # stats
    print("""
            Discards:           {}
            Produced:           {}
            blocked for:        {:.2f}
            """.format(len(Gluing.discards), E1.numOfExits, Gluing.totalBlockageTime))

    # for unittest
    if test:
        result = {}
        result["Spannung"] = Voltage.featureHistory
        result["Strom"] = Current.featureHistory
        result["Widerstand"] = Resistance.featureHistory
        return result

if __name__ == "__main__":
    main()

ExampleTS.py

This file demonstrates the basic usage of the TimeSeries class.

from manpy.simulation.imports import Machine, Source, Exit, Failure, Feature, Queue, Timeseries
from manpy.simulation.core.Globals import runSimulation
import matplotlib.pyplot as plt


# Objects
S = Source("S1", "Source", interArrivalTime={"Fixed": {"mean": 0.4}}, entity="manpy.Part")
Soldering = Machine("M0", "Soldering", processingTime={"Normal": {"mean": 0.8, "stdev": 0.075, "min": 0.425, "max": 1.175}})
Q = Queue("Q", "Queue")
Gluing = Machine("M1", "Gluing", processingTime={"Fixed": {"mean": 0.8, "stdev": 0.075, "min": 0.425, "max": 1.175}})
E1 = Exit("E1", "Exit1")

# ObjectProperty

# In TimeSeries distribution, one or multiple functions with different intervals are used,
# to generate a certain amount of data points across the entire range from the lowest to the highest point in any interval
# The step_time can be manually set or dynamically calculated based on the entity's processing time
# Functions in TimeSeries distribution can utilize assigned variables, similar to how dependent variables work for Features

# Soldering
Voltage = Timeseries("TS0", "Voltage", victim=Soldering, no_negative=True, step_time=0.03,
                     distribution={"Function" : {(-1, 0) : "-1.6*x**2+1.6", (0, 1) : "-1.6*x**2+2"}, "DataPoints" : 20, "Feature": {"Normal": {"stdev": 0.02}}})
Current = Timeseries("TS1", "Current", victim=Soldering, step_time=0.03,
                     distribution={"Function" : {(-1, 1) : "1000*x1 + 1900"}, "x1" : Voltage, "DataPoints" : 20, "Feature": {"Normal": {"stdev": 20}}})
Resistance = Timeseries("TS2", "Resistance", victim=Soldering, step_time=0.03,
                        distribution={"Function" : {(-1, 1) : "(x1/x2)*1000000"}, "x1" : Voltage, "x2" : Current, "DataPoints" : 20})
Pressure = Feature("Ftr0", "Pressure", victim=Soldering,
                   distribution={"Feature": {"Normal": {"mean": 180, "stdev": 30}}})
Insertion_depth = Feature("Ftr1", "Insertion_depth", victim=Soldering,
                          distribution={"Feature": {"Normal": {"mean": 400, "stdev": 50}}})

# Gluing
Flow_rate = Feature("Ftr2", "Flow_rate", victim=Gluing,
                    distribution={"Feature": {"Normal": {"mean": 50, "stdev": 5}}})
Temperature = Feature("Ftr3", "Temperature", victim=Gluing,
                      distribution={"Feature": {"Normal": {"mean": 190, "stdev": 10}}})
Mass = Feature("Ftr4", "Mass", victim=Gluing,
               distribution={"Feature": {"Normal": {"mean": 400, "stdev": 50}}})

# ObjectInterruption
Stuck = Failure("Flr0", "Failure0", victim=Gluing, entity=True,
                distribution={"TTF": {"Fixed": {"mean": 0}}, "TTR": {"Normal": {"mean": 2,"stdev": 0.2, "min":0, "probability": 0.05}}})


# Routing
S.defineRouting([Soldering])
Soldering.defineRouting([S], [Q])
Q.defineRouting([Soldering], [Gluing])
Gluing.defineRouting([Q], [E1])
E1.defineRouting([Gluing])


def main(test=0):
    maxSimTime = 50
    objectList = [S, Soldering, Q, Gluing, E1, Voltage, Current, Resistance, Pressure, Insertion_depth, Flow_rate, Temperature, Mass]
    runSimulation(objectList, maxSimTime)

    # show dependency of TimeSeries
    plt.plot(E1.entities[0].timeseries_times[0], E1.entities[0].timeseries[0])
    plt.show()
    plt.plot(E1.entities[0].timeseries_times[1], E1.entities[0].timeseries[1], c="orange")
    plt.show()
    plt.plot(E1.entities[0].timeseries_times[2], E1.entities[0].timeseries[2], c="g")
    plt.show()

    #for unittest
    if test:
        return E1.entities[0]

if __name__ == "__main__":
    main()

Conditional_Failure.py

This file demonstrates failures that are triggered by conditions.

from manpy.simulation.imports import Machine, Source, Exit, Failure, Queue, Feature, SimpleStateController
from manpy.simulation.core.Globals import runSimulation, G

# Any function can be employed as the condition for a Failure to occur
# You can utilize any simulation values for the condition
# Return True to let the Failure occur


def condition(self):
    value_1 = Ftr1.get_feature_value()
    value_2 = Ftr2.get_feature_value()

    if (value_1 + 20 * value_2) > 200:
        return True
    else:
        return False


# Objects
S = Source("S1", "Source", interArrivalTime={"Fixed": {"mean": 0.1}}, entity="manpy.Part", capacity=1)
Q = Queue("Q1", "Queue")
M1 = Machine("M1", "Machine1", processingTime={"Normal": {"mean": 0.2, "stdev": 0.1, "min": 0.08, "max": 0.34}})
M2 = Machine("M2", "Machine2", processingTime={"Normal": {"mean": 0.3, "stdev": 0.1, "min": 0.1, "max": 0.4}})

E1 = Exit("E1", "Exit1")

# ObjectInterruption
# Assign the condition as the "conditional" parameter for any machine
F1 = Failure("CondFlr", "CondFailure", victim=M1, conditional=condition, waitOnTie=True,
             distribution={"TTF": {"Fixed": {"mean": 0}}, "TTR": {"Fixed": {"mean": 10}}})

# ObjectProperty
# Link failures to "contribute" for a Feature when utilizing its values
Ftr1 = Feature("Ftr1", "Feature1", victim=M1, contribute=[F1], no_negative=True,
               distribution={"Feature": {"Normal": {"mean": 5, "stdev": 1, "min": 1, "max": 10}}}
               )

dists = [{"Feature": {"Normal": {"mean": 0.5, "stdev": 0.2, "min": 0.1, "max": 0.9}}},
         {"Feature": {"Normal": {"mean": 500, "stdev": 0.2, "min": 400, "max": 600}}}]
boundaries = {(0, 10): 0, (10, None): 1}
distribution_controller = SimpleStateController(states=dists, boundaries=boundaries, wear_per_step=1.0, reset_amount=None)

Ftr2 = Feature("Ftr2", "Feature2", victim=M1,
               contribute=[F1],
               distribution_state_controller=distribution_controller, reset_distributions=True
               )

# Routing
S.defineRouting([M1])
M1.defineRouting([S], [Q])
Q.defineRouting([M1], [M2])
M2.defineRouting([Q], [E1])
E1.defineRouting([M2])


def main(test=0):
    maxSimTime = 1000
    runSimulation([S, Q, M1, M2, E1,
                   F1,
                   Ftr1, Ftr2], maxSimTime)


if __name__ == "__main__":
    main()

Interpolation.py

This file demonstrates the creation of TimeSeries objects using interpolation.

from manpy.simulation.imports import Machine, Source, Exit, Feature, Timeseries
from manpy.simulation.core.Globals import runSimulation
import matplotlib.pyplot as plt


# Objects
S = Source("S1", "Source", interArrivalTime={"Fixed": {"mean": 2}}, entity="manpy.Part")
M1 = Machine("M1", "Machine1", processingTime={"Normal": {"mean": 5, "stdev": 0.5}})
E1 = Exit("E1", "Exit1")

# ObjectProperty
Ftr1 = Feature("Ftr1", "Feature1", victim=M1, distribution={"Feature": {"Normal": {"mean": 2.71828, "stdev": 0.3}}})
Ftr2 = Feature("Ftr2", "Feature2", victim=M1, distribution={"Feature": {"Normal": {"mean": 2.71828**2, "stdev": 0.3}}})
# Interpolation uses at least 4 points to interpolate a unique function for every entity
# It is possible to use feature values or even functions as points
TS = Timeseries("TS", "TimeSeries", victim=M1, no_negative=True,
                      distribution={"Function": {(0, 1): "-5*x**2+10*x",
                                                 (1, 4): [[1, 2, 3, 4], [5, "Ftr1", "Ftr2", "Ftr1**3"]]},
                                    "Ftr1": Ftr1, "Ftr2": Ftr2,
                                    "DataPoints": 100})

# Routing
S.defineRouting([M1])
M1.defineRouting([S], [E1])
E1.defineRouting([M1])


def main():
    maxSimTime = 20

    runSimulation([S, M1, E1, Ftr1, Ftr2, TS], maxSimTime)

    for i in M1.entities:
        plt.plot([2], [i.features[0]], "o", c="blue", label="Ftr1")
        plt.plot([3], [i.features[1]], "o", c="green", label="Ftr2")
        plt.plot(i.timeseries_times[0], i.timeseries[0], c="red", label="TS")
        plt.legend()
        plt.show()

if __name__ == "__main__":
    main()

Data_Extraction.py

This file demonstrates the various ways of exporting simulated data.

from manpy.simulation.imports import Machine, Source, Exit, Failure, Feature, Queue, Timeseries
from manpy.simulation.core.Globals import runSimulation, getTimeSeriesData, getFeatureData


# Objects
S = Source("S1", "Source", interArrivalTime={"Fixed": {"mean": 0.4}}, entity="manpy.Part")
# It is possible to add a cost to any step in the production line (CoreObject)
Soldering = Machine("M0", "Soldering", processingTime={"Normal": {"mean": 0.8, "stdev": 0.075, "min": 0.425, "max": 1.175}}, cost=25)
Q = Queue("Q", "Queue")
Gluing = Machine("M1", "Gluing", processingTime={"Fixed": {"mean": 0.8, "stdev": 0.075, "min": 0.425, "max": 1.175}})
E1 = Exit("E1", "Exit1", cost=5)

# ObjectProperty
# Soldering
Voltage = Timeseries("TS0", "Voltage", victim=Soldering, no_negative=True, step_time=0.03,
                     distribution={"Function" : {(-1, 0) : "-1.6*x**2+1.6", (0, 1) : "-1.6*x**2+2"}, "DataPoints" : 20, "Feature": {"Normal": {"stdev": 0.02}}})
Current = Timeseries("TS1", "Current", victim=Soldering, step_time=0.03,
                     distribution={"Function" : {(-1, 1) : "1000*x1 + 1900"}, "x1" : Voltage, "DataPoints" : 20, "Feature": {"Normal": {"stdev": 20}}})
Resistance = Timeseries("TS2", "Resistance", victim=Soldering, step_time=0.03,
                        distribution={"Function" : {(-1, 1) : "(x1/x2)*1000000"}, "x1" : Voltage, "x2" : Current, "DataPoints" : 20})
Pressure = Feature("Ftr0", "Pressure", victim=Soldering,
                   distribution={"Feature": {"Normal": {"mean": 180, "stdev": 30}}})
Insertion_depth = Feature("Ftr1", "Insertion_depth", victim=Soldering,
                          distribution={"Feature": {"Normal": {"mean": 400, "stdev": 50}}})

# Gluing
Flow_rate = Feature("Ftr2", "Flow_rate", victim=Gluing,
                    distribution={"Feature": {"Normal": {"mean": 50, "stdev": 5}}})
Temperature = Feature("Ftr3", "Temperature", victim=Gluing,
                      distribution={"Feature": {"Normal": {"mean": 190, "stdev": 10}}})
Mass = Feature("Ftr4", "Mass", victim=Gluing,
               distribution={"Feature": {"Normal": {"mean": 400, "stdev": 50}}})

# ObjectInterruption
# It is possible to add cost to any Failure
# Failures can remove the current entity from a machine and mark it as Fail by setting 'remove' to True
Stuck = Failure("Flr0", "Failure0", victim=Gluing, entity=True, cost=10, remove=True,
                distribution={"TTF": {"Fixed": {"mean": 0}}, "TTR": {"Normal": {"mean": 1,"stdev": 0.1, "min":0, "probability": 0.5}}})

# Routing
S.defineRouting([Soldering])
Soldering.defineRouting([S], [Q])
Q.defineRouting([Soldering], [Gluing])
Gluing.defineRouting([Q], [E1])
E1.defineRouting([Gluing])


def main(test=0):
    maxSimTime = 7
    objectList = [S, Soldering, Q, Gluing, E1, Voltage, Current, Resistance, Pressure, Insertion_depth, Flow_rate, Temperature, Mass, Stuck]


    # To utilize a database, you have two options:
    # 1. Import a pre-existing `DataBase` class from `DataBase.py`
    # 2. Easily set up your own database using the `ManPyDatabase` interface
    # from manpy.simulation.core.Database import ManPyQuestDBDatabase
    # db = ManPyQuestDBDatabase()
    # runSimulation(objectList, maxSimTime, db=db)
    runSimulation(objectList, maxSimTime)


    # To retrieve feature data from the simulation, utilize the getFeatureData function
    # The function accepts a list of machines and produces a DataFrame with all of their occurring features
    solder = getFeatureData([Soldering])
    print(solder.to_string(index=False), "\n")

    # With 'time=True', timestamps of the feature values are included in the DataFrame
    # With 'price=True', the price of the entities are included in the DataFrame
    solder_time = getFeatureData([Soldering], time=True, price=True)
    print(solder_time.to_string(index=False), "\n")

    # The function supports multiple machines
    both = getFeatureData([Soldering, Gluing])
    print(both.to_string(index=False), "\n")

    # To retrieve timeseries data from the simulation, utilize the getTimeSeriesData function
    # The function accepts a timeSeries and returns a DataFrame representing that timeseries
    vol = getTimeSeriesData(Voltage)
    print(vol)


if __name__ == "__main__":
    main()