# ===========================================================================
# Copyright 2013 University of Limerick
#
# This file is part of DREAM.
#
# DREAM is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# DREAM is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with DREAM. If not, see <http://www.gnu.org/licenses/>.
# ===========================================================================
"""
Created on 8 Nov 2012
@author: George
"""
"""
carries some global variables
"""
import warnings
import pandas as pd
import simpy
import xlwt
from manpy.simulation.core.Database import ManPyDatabase
from manpy.simulation.core.utils import info
from tqdm import tqdm
[docs]
class G:
"""Defines global properties for the whole simulation"""
ObjList = [] # a list that holds all the CoreObjects
EntityList = [] # a list that holds all the Entities
ObjectResourceList = []
ObjectInterruptionList = []
ObjectPropertyList = []
RouterList = []
simulation_snapshots = [pd.DataFrame()]
numberOfReplications = 1 # the number of replications default=1git
confidenceLevel = 0.9 # the confidence level default=90%
Base = 1 # the Base time unit. Default =1 minute
maxSimTime = 0 # the total simulation time
# flag for printing in console
console = ""
# data for the trace output in excel
trace = "" # this is written from input. If it is "Yes" then you write to trace, else we do not
snapshots = False
traceIndex = 0 # index that shows in what row we are
sheetIndex = 1 # index that shows in what sheet we are
traceFile = xlwt.Workbook() # create excel file
traceSheet = traceFile.add_sheet(
"sheet " + str(sheetIndex), cell_overwrite_ok=True
) # create excel sheet
trace_list = []
# variables for excel output
outputIndex = 0 # index that shows in what row we are
sheetIndex = 1 # index that shows in what sheet we are
outputFile = xlwt.Workbook() # create excel file
outputSheet = outputFile.add_sheet(
"sheet " + str(sheetIndex), cell_overwrite_ok=True
) # create excel sheet
# variables for json output
outputJSON = {}
outputJSONFile = None
numberOfEntities = 0
# define the lists of each object type
SourceList = []
MachineList = []
ExitList = []
QueueList = []
RepairmanList = []
AssemblyList = []
DismantleList = []
ConveyerList = []
MachineJobShopList = []
QueueJobShopList = []
ExitJobShopList = []
BatchDecompositionList = []
BatchSourceList = []
BatchReassemblyList = []
LineClearanceList = []
EventGeneratorList = []
OperatorsList = []
OperatorManagedJobsList = []
OperatorPoolsList = []
BrokersList = []
OperatedMachineList = []
BatchScrapMachineList = []
OrderDecompositionList = []
ConditionalBufferList = []
MouldAssemblyBufferList = []
MouldAssemblyList = []
MachineManagedJobList = []
QueueManagedJobList = []
ModelResourceList = []
FeatureList = []
TimeSeriesList = []
JobList = []
WipList = []
EntityList = []
PartList = []
OrderComponentList = []
OrderList = []
MouldList = []
BatchList = []
SubBatchList = []
# entities that just finished processing in a station
# and have to enter the next machine
pendingEntities = []
env = simpy.Environment()
totalPulpTime = 0 # temporary to track how much time PuLP needs to run
[docs]
@staticmethod
def get_simulation_results_dataframe() -> pd.DataFrame:
"""
Collects the logs from the traces in the simulation into a pandas dataframe.
This dataframe contains the columns:
- Simulation time
- Entity (aka Resource) name
- Entity ID
- Station (aka Machine) ID
- Station name
- Trace message
:returns Dataframe containing the described columns
"""
df = pd.DataFrame(
G.trace_list,
columns=[
"simulation_time",
"entity_name",
"entity_id",
"station_id",
"station_name",
"message",
],
)
return df
[docs]
@staticmethod
def get_simulation_entities_history() -> pd.DataFrame:
"""
Iterates through all entities that passed through the simulation and collects
their history, i.e. all the objects they passed through and when they entered/left
them.
:returns History containing all
"""
dfs = []
for entity in G.EntityList:
history = entity.schedule
en = [entity.id] * len(history)
dfs.append(pd.DataFrame(history, index=en))
entity_hist = pd.concat(dfs, sort=False)
entity_hist["station_id"] = entity_hist.station.apply(lambda x: x.id)
return entity_hist
[docs]
def moveExcess(consumption=1, safetyStock=70, giverId=None, receiverId=None):
"""method to move entities exceeding a certain safety stock"""
giver = findObjectById(giverId)
receiver = findObjectById(receiverId)
safetyStock = int(safetyStock)
consumption = int(consumption)
if giver and receiver:
if len(giver.getActiveObjectQueue()) > safetyStock:
giver.receiver = receiver
receiver.giver = giver
for i in range(consumption):
receiver.getEntity()
giver.next = []
receiver.previous = []
else:
print("Giver and/or Receiver not defined")
[docs]
def getClassFromName(dotted_name):
"""Import a class from a dotted name used in json."""
from zope.dottedname.resolve import resolve
import logging
logger = logging.getLogger("manpy.platform")
parts = dotted_name.split(".")
# this is added for backwards compatibility
if dotted_name.startswith("manpy"):
if 'core' in dotted_name:
class_name = dotted_name.split(".")[-1]
new_dotted_name = "manpy.simulation.core.%s.%s" % (class_name, class_name)
dotted_name = new_dotted_name
else:
class_name = dotted_name.split(".")[-1]
new_dotted_name = "manpy.simulation.%s.%s" % (class_name, class_name)
# logger.info(("Old style name %s used, using %s instead" % (dotted_name, new_dotted_name)))
dotted_name = new_dotted_name
return resolve(dotted_name)
[docs]
def getMethodFromName(dotted_name):
"""returns a method by its name. name should be given as manpy.ClassName.MethodName"""
name = dotted_name.split(".")
methodName = name[-1]
# if the method is in this script
if "Globals" in name:
methodName = name[name.index("Globals") + 1]
possibles = globals().copy()
possibles.update(locals())
method = possibles.get(methodName)
# if the method is in a class
else:
clsName = ""
# clsName=name[0]+'.'+name[1]
for i in range(len(name) - 1):
clsName += name[i]
clsName += "."
clsName = clsName[:-1]
cls = getClassFromName(clsName)
method = getattr(cls, methodName)
if not method:
raise Exception("Method %s not implemented" % methodName)
return method
[docs]
def findObjectById(id):
"""method finding objects by ID"""
for obj in (
G.ObjList
+ G.ObjectResourceList
+ G.EntityList
+ G.ObjectInterruptionList
+ G.ObjectPropertyList
+ G.OrderList
):
if obj.id == id:
return obj
return None
[docs]
class SetWipTypeError(Exception):
"""Error in the setting up of the WIP"""
def __init__(self, setWipError):
Exception.__init__(self, setWipError)
[docs]
def setWIP(entityList):
"""
method to set-up the entities in the current stations as Work In Progress
in this case the current station must be defined!
otherwise there is no current station but a list of possible stations although the entity cannot be in more than one stations
"""
# for all the entities in the entityList
for entity in entityList:
# if the entity is of type Part
if entity.type in ["Part", "Batch", "SubBatch", "CapacityEntity", "Vehicle"]:
# these entities have to have a currentStation.
# TODO apply a more generic approach so that all need to have
if entity.currentStation:
object = entity.currentStation # identify the object
object.getActiveObjectQueue().append(
entity
) # append the entity to its Queue
entity.schedule.append(
{"station": object, "entranceTime": G.env.now}
) # append the time to schedule so that it can be read in the result
# if the entity is of type Job/OrderComponent/Order/Mould
# XXX Orders do no more run in the system, instead we have OrderDesigns
elif entity.type in ["Job", "OrderComponent", "Order", "OrderDesign", "Mould"]:
# find the list of starting station of the entity
# XXX if the entity is in wip then the current station is already defined and the remainingRoute has to be redefined
currentObjectIds = entity.remainingRoute[0].get("stationIdsList", [])
# if the list of starting stations has length greater than one then there is a starting WIP definition error
try:
if len(currentObjectIds) == 1:
objectId = currentObjectIds[0]
else:
raise SetWipTypeError(
"The starting station of the the entity is not defined uniquely"
)
except SetWipTypeError as setWipError:
print(("WIP definition error: {0}".format(setWipError)))
# get the starting station of the entity and load it with it
object = findObjectById(objectId)
object.getActiveObjectQueue().append(
entity
) # append the entity to its Queue
# if the entity is to be appended to a mouldAssemblyBuffer then it is readyForAsselbly
if object.__class__.__name__ == "MouldAssemblyBuffer":
entity.readyForAssembly = 1
# read the IDs of the possible successors of the object
nextObjectIds = entity.remainingRoute[1].get("stationIdsList", [])
# for each objectId in the nextObjects find the corresponding object and populate the object's next list
nextObjects = []
for nextObjectId in nextObjectIds:
nextObject = findObjectById(nextObjectId)
nextObjects.append(nextObject)
# update the next list of the object
for nextObject in nextObjects:
# append only if not already in the list
if nextObject not in object.next:
object.next.append(nextObject)
entity.currentStep = entity.remainingRoute.pop(
0
) # remove data from the remaining route.
entity.schedule.append(
{"station": object, "entranceTime": G.env.now}
) # append the time to schedule so that it can be read in the result
# if there is currentStep task_id then append it to the schedule
if entity.currentStep:
if entity.currentStep.get("task_id", None):
entity.schedule[-1]["task_id"] = entity.currentStep["task_id"]
# if the currentStation of the entity is of type Machine then the entity
# must be processed first and then added to the pendingEntities list
# Its hot flag is not raised
# the following to be performed only if there is a current station. Orders, Projects e.t.c do not have
# TODO, maybe we should loop in wiplist here
if (not (entity.currentStation in G.MachineList)) and entity.currentStation:
# add the entity to the pendingEntities list
G.pendingEntities.append(entity)
# if the station is buffer then sent the canDispose signal
from .Queue import Queue
if entity.currentStation:
if issubclass(entity.currentStation.__class__, Queue):
# send the signal only if it is not already triggered
if not entity.currentStation.canDispose.triggered:
if entity.currentStation.expectedSignals["canDispose"]:
succeedTuple = (G.env, G.env.now)
entity.currentStation.canDispose.succeed(succeedTuple)
entity.currentStation.expectedSignals["canDispose"] = 0
# if we are in the start of the simulation the object is of server type then we should send initialWIP signal
# TODO, maybe use 'class_family attribute here'
if G.env.now == 0 and entity.currentStation:
if entity.currentStation.class_name:
stationClass = entity.currentStation.__class__.__name__
if stationClass in [
"ProductionPoint",
"ConveyorMachine",
"ConveyorPoint",
"ConditionalPoint",
"Machine",
"BatchScrapMachine",
"MachineJobShop",
"BatchDecomposition",
"BatchReassembly",
"M3",
"MouldAssembly",
"BatchReassemblyBlocking",
"BatchDecompositionBlocking",
"BatchScrapMachineAfterDecompose",
"BatchDecompositionStartTime",
]:
entity.currentStation.currentEntity = entity
# trigger initialWIP event only if it has not been triggered. Otherwise
# if we set more than one entities (e.g. in reassembly) it will crash
if not (entity.currentStation.initialWIP.triggered):
if entity.currentStation.expectedSignals["initialWIP"]:
succeedTuple = (G.env, G.env.now)
entity.currentStation.initialWIP.succeed(succeedTuple)
entity.currentStation.expectedSignals["initialWIP"] = 0
def countIntervalThroughput(**kw):
from .Exit import Exit
currentExited = 0
for obj in G.ObjList:
if isinstance(obj, Exit):
totalExited = obj.totalNumberOfUnitsExited
previouslyExited = sum(obj.intervalThroughPutList)
currentExited += totalExited - previouslyExited
obj.intervalThroughPutList.append(currentExited)
# #===========================================================================
# # printTrace
# #===========================================================================
# def printTrace(entity='',station='', **kw):
# assert len(kw)==1, 'only one phrase per printTrace supported for the moment'
# from Globals import G
# time=G.env.now
# charLimit=60
# remainingChar=charLimit-len(entity)-len(str(time))
# if(G.console=='Yes'):
# print time,entity,
# for key in kw:
# if key not in getSupportedPrintKwrds():
# raise ValueError("Unsupported phrase %s for %s" % (key, station.id))
# element=getPhrase()[key]
# phrase=element['phrase']
# prefix=element.get('prefix',None)
# suffix=element.get('suffix',None)
# arg=kw[key]
# if prefix:
# print prefix*remainingChar,phrase,arg
# elif suffix:
# remainingChar-=len(phrase)+len(arg)
# suffix*=remainingChar
# if key=='enter':
# suffix=suffix+'>'
# print phrase,arg,suffix
# else:
# print phrase,arg
[docs]
def getSupportedPrintKwrds():
"""get the supported print Keywords"""
return (
"create",
"signal",
"signalReceiver",
"signalGiver",
"attemptSignal",
"attemptSignalGiver",
"attemptSignalReceiver",
"preempt",
"preempted",
"startWork",
"finishWork",
"processEnd",
"interrupted",
"enter",
"destroy",
"waitEvent",
"received",
"isRequested",
"canDispose",
"interruptionEnd",
"loadOperatorAvailable",
"resourceAvailable",
"entityRemoved",
"conveyerEnd",
"conveyerFull",
"moveEnd",
)
[docs]
def getPhrase():
"""get the phrase to print from the keyword"""
printKwrds = {
"create": {"phrase": "created an entity"},
"destroy": {"phrase": "destroyed at", "suffix": " * "},
"signal": {"phrase": "signalling"},
"signalGiver": {"phrase": "signalling giver", "prefix": "_"},
"signalReceiver": {"phrase": "signalling receiver", "prefix": "_"},
"attemptSignal": {"phrase": "will try to signal"},
"attemptSignalGiver": {"phrase": "will try to signal a giver"},
"attemptSignalReceiver": {"phrase": "will try to signal a receiver"},
"preempt": {"phrase": "preempts", "suffix": " ."},
"preempted": {"phrase": "is being preempted", "suffix": ". "},
"startWork": {"phrase": "started working in"},
"finishWork": {"phrase": "finished working in"},
"processEnd": {"phrase": "ended processing in"},
"interrupted": {"phrase": "interrupted at", "suffix": " ."},
"enter": {"phrase": "got into", "suffix": "="},
"waitEvent": {"phrase": "will wait for event"},
"received": {"phrase": "received event"},
"isRequested": {"phrase": "received an isRequested event from"},
"canDispose": {"phrase": "received an canDispose event"},
"interruptionEnd": {"phrase": "received an interruptionEnd event at"},
"loadOperatorAvailable": {
"phrase": "received a loadOperatorAvailable event at"
},
"resourceAvailable": {"phrase": "received a resourceAvailable event"},
"entityRemoved": {"phrase": "received an entityRemoved event from"},
"moveEnd": {"phrase": "received a moveEnd event"},
"conveyerEnd": {"phrase": "has reached conveyer End", "suffix": ".!"},
"conveyerFull": {"phrase": "is now Full, No of units:", "suffix": "(*)"},
}
return printKwrds
def run(env, objectList):
from manpy.simulation.core.Entity import Entity
# run the replications
for i in range(G.numberOfReplications):
G.env = env or simpy.Environment()
# this is where all the simulation object 'live'
G.EntityList = []
for object in objectList:
if issubclass(object.__class__, Entity):
G.EntityList.append(object)
# initialize all the objects
for object in (
G.ObjList + G.ObjectInterruptionList + G.ObjectResourceList + G.EntityList + G.ObjectPropertyList
):
object.initialize()
# activate all the objects
for object in G.ObjectInterruptionList:
G.env.process(object.run())
for object in G.ObjectPropertyList:
G.env.process(object.run())
# activate all the objects
for object in G.ObjList:
G.env.process(object.run())
# set the WIP
setWIP(G.EntityList)
info("Config finished. Starting simulation...")
G.env.run(until=G.maxSimTime) # run the simulation
# identify from the exits what is the time that the last entity has ended.
endList = []
from manpy.simulation.core.Exit import Exit
for object in G.ObjList:
if issubclass(object.__class__, Exit):
endList.append(object.timeLastEntityLeft)
# identify the time of the last event
if G.env.now == float("inf"):
G.maxSimTime = float(max(endList))
# do not let G.maxSimTime=0 so that there will be no crash
if G.maxSimTime == 0:
print("simulation ran for 0 time, something may have gone wrong")
import sys
sys.exit()
# carry on the post processing operations for every object in the topology
for object in G.ObjList + G.ObjectResourceList:
object.postProcessing()
[docs]
def runSimulation(
objectList=[],
maxSimTime=100,
numberOfReplications=1,
trace=False,
snapshots=False,
seed=1,
env=None,
data="No",
db: ManPyDatabase = None
):
"""
Starts the simulation
:param objectList: Objects for the simulation
:param maxSimTime: Timespan that's simulated
:param numberOfReplications: TODO
:param trace: TODO
:param snapshots: TODO
:param seed: TODO
:param env: TODO
:param data: TODO
:param db: Database object. Optional. If passed, the results are saved to the database
"""
G.numberOfReplications = numberOfReplications
G.trace = trace
G.snapshots = snapshots
G.maxSimTime = float(maxSimTime)
G.seed = seed
G.data = data
G.ObjList = []
G.ObjectInterruptionList = []
G.ObjectResourceList = []
G.trace_list = []
G.ftr_st = [] # list of (feature, corresponding station)
G.feature_indices = {}
G.ts_st = [] # list of (timeseries, corresponding station)
G.timeseries_indices = {}
G.db = db
G.objectList = objectList
from manpy.simulation.core.CoreObject import CoreObject
from .ObjectInterruption import ObjectInterruption
from .ObjectProperty import ObjectProperty
from .ObjectResource import ObjectResource
for object in objectList:
if issubclass(object.__class__, CoreObject):
G.ObjList.append(object)
elif issubclass(object.__class__, ObjectInterruption):
G.ObjectInterruptionList.append(object)
elif issubclass(object.__class__, ObjectProperty):
G.ObjectPropertyList.append(object)
elif issubclass(object.__class__, ObjectResource):
G.ObjectResourceList.append(object)
# set ftr_st
for f in G.FeatureList:
if f.victim == None:
G.ftr_st.append((f.id, None))
else:
G.ftr_st.append((f.id, f.victim.id))
G.feature_indices[f.id] = len(G.ftr_st) - 1
# set ts_st
for ts in G.TimeSeriesList:
if ts.victim == None:
G.ts_st.append((ts.id, None))
else:
G.ts_st.append((ts.id, ts.victim.id))
G.timeseries_indices[ts.id] = len(G.ts_st) - 1
# connect to QuestDB
if G.db:
G.db.establish_connection()
run(env, objectList)
G.db.commit()
G.db.close_connection()
else:
run(env, objectList)
[docs]
def ExcelPrinter(df, filename):
"""
Prints a dataframe to excel
:param df: The dataframe to export
:param filename: Filename for export
"""
number_sheets = df.shape[0] // 65535 + 1
if number_sheets > 1:
for i in range(number_sheets):
file = "{}({}).xls".format(filename, i)
df[65535 * (i): 65535 * (i + 1)].to_excel(file)
else:
df.to_excel("{}.xls".format(filename))
[docs]
def getFeatureData(objectList=[], time=False, price=False) -> pd.DataFrame:
"""
getFeatureData returns feature data of specific machines as dataframes
:param objectList: a list of machines that will be included in the dataframe
:param time: boolean, should timestamps be included or not
:return: dataframe
"""
columns = ["ID"] # name of columns
df_list = [] # list for the DataFrame
feature_list = [] # list of included features
# set columns
for ftr in G.ftr_st:
for o in objectList:
if ftr[1] == o.id:
if time:
columns.append("{}_{}_v".format(ftr[1], ftr[0]))
columns.append("{}_{}_t".format(ftr[1], ftr[0]))
else:
columns.append("{}_{}".format(ftr[1], ftr[0]))
feature_list.append(G.ftr_st.index(ftr))
columns.append("Result")
if price:
columns.append("Price")
# set df_list
unique = []
for o in objectList:
entities = o.entities + o.discards
for entity in entities:
if entity not in unique:
# check which features
features = []
times = []
for f in feature_list:
features.append(entity.features[f])
times.append(entity.feature_times[f])
features.append(entity.result)
if time:
if price:
l = [None] * (len(columns) - 2)
else:
l = [None] * (len(columns) - 1)
for i in range(len(l)):
if i % 2 == 0:
l[i] = features[i // 2]
else:
l[i] = times[i // 2]
l = [int(entity.id[4:])] + l
else:
l = [int(entity.id[4:])] + features
if price:
l.append(entity.cost)
if len(l) == len(columns):
df_list.append(l)
unique.append(entity)
# return result
result = pd.DataFrame(df_list, columns=columns).sort_values("ID")
if "Success" in result["Result"].unique() or "Fail" in result["Result"].unique():
return result
else:
return result.drop("Result", axis=1)
[docs]
def getTimeSeriesData(ts) -> pd.DataFrame:
"""
getTimeSeriesData returns timeseries data
:param ts: the timeseries you want the data of
:return: dataframe with entity-ID|time|value as columns
"""
columns = ["ID", "Time", "Value"]
id = []
time = []
value = []
entities = ts.victim.entities + ts.victim.discards
for entity in entities:
id += [int(entity.id[4:])] * (len(entity.timeseries[G.TimeSeriesList.index(ts)]))
time += entity.timeseries_times[G.TimeSeriesList.index(ts)]
value += entity.timeseries[G.TimeSeriesList.index(ts)]
return pd.DataFrame(list(zip(id, time, value)), columns=columns)
[docs]
def get_feature_values_by_id(entity, feature_ids):
"""
Returns a list of the entity's feature values of the specified ids
:param entity: The entity of which the feature values should be retrieved.
:param feature_ids: List containing the IDs of the features (as string) that should be retrieved.
"""
try:
indices = [G.feature_indices[i] for i in feature_ids]
feature_values = [entity.features[idx] for idx in indices]
except KeyError:
raise KeyError(f"Attempting to access a non-existent feature id for entity {entity.name}.")
return feature_values
[docs]
def get_feature_labels_by_id(entity, feature_ids):
"""
Returns a list of the entity's feature labels of the specified ids
:param entity: The entity of which the feature labels should be retrieved.
:param feature_ids: List containing the IDs of the features (as string) that should be retrieved.
"""
try:
indices = [G.feature_indices[i] for i in feature_ids]
feature_values = [entity.labels[idx] for idx in indices]
except KeyError:
raise KeyError(f"Attempting to access a non-existent feature id for entity {entity.name}.")
return feature_values
def resetSimulation():
# reset all global parameters of the simulation in order to start a clean new one
global G
for i in vars(G).keys():
if i[:2] == '__':
continue
t = type(vars(G)[i])
if t == list:
setattr(G, i, [])
elif t == dict:
setattr(G, i, {})
elif t == bool:
setattr(G, i, False)
G.numberOfReplications = 1
G.confidenceLevel = 0.9
G.Base = 1
G.maxSimTime = 0
G.traceIndex = 0
G.sheetIndex = 1
G.outputIndex = 0
G.numberOfEntities = 0
G.totalPulpTime = 0
G.seed = 1
G.console = ""
G.traceFile = xlwt.Workbook()
G.traceSheet = G.traceFile.add_sheet("sheet " + str(G.sheetIndex), cell_overwrite_ok=True)
G.outputFile = xlwt.Workbook()
G.outputSheet = G.outputFile.add_sheet("sheet " + str(G.sheetIndex), cell_overwrite_ok=True)
G.outputJSONFile = None
G.db = None
G.env = simpy.Environment()