Source code for manpy.simulation.core.Queue

# ===========================================================================
# Copyright 2013 University of Limerick
#
# This file is part of DREAM.
#
# DREAM is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# DREAM is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with DREAM.  If not, see <http://www.gnu.org/licenses/>.
# ===========================================================================
"""
Created on 8 Nov 2012

@author: George
"""



import simpy
from manpy.simulation.core.CoreObject import CoreObject

# ===========================================================================
#                            the Queue object
# ===========================================================================
[docs] class Queue(CoreObject): """ Models a FIFO queue where entities can wait in order to get into a server """ family = "Buffer" # =========================================================================== # the __init__ method of the Queue # =========================================================================== def __init__( self, id="", name="", capacity=5, isDummy=False, schedulingRule="FIFO", level=None, gatherWipStat=False, cost=0, **kw ): self.type = "Queue" # String that shows the type of object CoreObject.__init__(self, id, name, cost) capacity = float(capacity) if capacity < 0 or capacity == float("inf"): self.capacity = float("inf") else: self.capacity = int(capacity) print(f"INFO: Queue {name} has capacity {self.capacity}.") self.isDummy = bool( int(isDummy) ) # Boolean that shows if it is the dummy first Queue self.schedulingRule = ( schedulingRule # the scheduling rule that the Queue follows ) self.multipleCriterionList = ( [] ) # list with the criteria used to sort the Entities in the Queue SRlist = [schedulingRule] if schedulingRule.startswith( "MC" ): # if the first criterion is MC aka multiple criteria SRlist = schedulingRule.split( "-" ) # split the string of the criteria (delimiter -) self.schedulingRule = SRlist.pop(0) # take the first criterion of the list self.multipleCriterionList = ( SRlist # hold the criteria list in the property multipleCriterionList ) for scheduling_rule in SRlist: if scheduling_rule not in self.getSupportedSchedulingRules(): raise ValueError( "Unknown scheduling rule %s for %s" % (scheduling_rule, id) ) self.gatherWipStat = gatherWipStat # trigger level for the reallocation of operators if level: assert ( level <= self.capacity ), "the level cannot be bigger than the capacity of the queue" self.level = level self.level_history = [] from manpy.simulation.core.Globals import G G.QueueList.append(self)
[docs] @staticmethod def getSupportedSchedulingRules(): return ( "FIFO", "Priority", "EDD", "EOD", "NumStages", "RPC", "LPT", "SPT", "MS", "WINQ", )
[docs] def initialize(self): """the initialize method of the Queue class""" # using the Process __init__ and not the CoreObject __init__ CoreObject.initialize(self) # initialise the internal Queue (type Resource) of the Queue object self.Res = simpy.Resource(self.env, self.capacity) # event used by router self.loadOperatorAvailable = self.env.event() self.expectedSignals["isRequested"] = 1 self.expectedSignals["canDispose"] = 1 self.expectedSignals["loadOperatorAvailable"] = 1
[docs] def run(self): """run method of the queue""" activeObjectQueue = self.Res.users # check if there is WIP and signal receiver self.initialSignalReceiver() while 1: self.printTrace(self.id, waitEvent="") # wait until the Queue can accept an entity and one predecessor requests it self.expectedSignals["canDispose"] = 1 self.expectedSignals["isRequested"] = 1 self.expectedSignals["loadOperatorAvailable"] = 1 receivedEvent = yield self.env.any_of( [self.isRequested, self.canDispose, self.loadOperatorAvailable] ) self.printTrace(self.id, received="") # if the event that activated the thread is isRequested then getEntity if self.isRequested in receivedEvent: transmitter, eventTime = self.isRequested.value self.printTrace(self.id, isRequested=transmitter.id) # reset the isRequested signal parameter self.isRequested = self.env.event() self.getEntity() # if entity just got to the dummyQ set its startTime as the current time if self.isDummy: activeObjectQueue[0].startTime = self.env.now # if the queue received an loadOperatorIsAvailable (from Router) with signalparam time if self.loadOperatorAvailable in receivedEvent: transmitter, eventTime = self.loadOperatorAvailable.value self.loadOperatorAvailable = self.env.event() # if the queue received an canDispose with signalparam time, this means that the signals was sent from a MouldAssemblyBuffer if self.canDispose in receivedEvent: transmitter, eventTime = self.canDispose.value self.printTrace(self.id, canDispose="") self.canDispose = self.env.event() # if the event that activated the thread is canDispose then signalReceiver if self.haveToDispose(): if self.receiver: if not self.receiver.entryIsAssignedTo(): # try to signal receiver. In case of failure signal giver (for synchronization issues) if not self.signalReceiver(): self.signalGiver() continue self.signalReceiver() # signal the giver (for synchronization issues) self.signalGiver()
[docs] def canAccept(self, callerObject=None): """ checks if the Queue can accept an entity it checks also who called it and returns TRUE only to the predecessor that will give the entity. """ activeObjectQueue = self.Res.users # if we have only one predecessor just check if there is a place available # this is done to achieve better (cpu) processing time # then we can also use it as a filter for a yield method if callerObject == None: return len(activeObjectQueue) < self.capacity thecaller = callerObject return len(activeObjectQueue) < self.capacity and (self.isInRouteOf(thecaller))
[docs] def haveToDispose(self, callerObject=None): """ checks if the Queue can dispose an entity to the following object it checks also who called it and returns TRUE only to the receiver that will give the entity. """ activeObjectQueue = self.Res.users # if we have only one possible receiver just check if the Queue holds one or more entities if callerObject == None: return len(activeObjectQueue) > 0 thecaller = callerObject return len(activeObjectQueue) > 0 and thecaller.isInRouteOf(self)
[docs] def removeEntity(self, entity=None): """removes an entity from the Object""" entities = [ent.id for ent in self.Res.users] self.level_history.append((self.env.now, self.id, entities, len(self.Res.users))) activeEntity = CoreObject.removeEntity(self, entity) # run the default method if self.canAccept(): self.signalGiver() # TODO: disable that for the mouldAssemblyBuffer if not self.__class__.__name__ == "MouldAssemblyBufferManaged": if self.haveToDispose(): # self.printTrace(self.id, attemptSignalReceiver='(removeEntity)') self.signalReceiver() # reset the signals for the Queue. It be in the start of the loop for now # xxx consider to dothis in all CoreObjects self.expectedSignals["isRequested"] = 1 self.expectedSignals["canDispose"] = 1 self.expectedSignals["loadOperatorAvailable"] = 1 # check if the queue is empty, if yes then try to signal the router, operators may need reallocation try: if self.level: if ( not len(self.getActiveObjectQueue()) and self.checkForDedicatedOperators() ): self.requestAllocation() except: pass return activeEntity
[docs] def canAcceptAndIsRequested(self, callerObject=None): """ checks if the Queue can accept an entity and there is an entity in some predecessor waiting for it also updates the predecessorIndex to the one that is to be taken """ activeObjectQueue = self.Res.users giverObject = callerObject assert giverObject, "there must be a caller for canAcceptAndIsRequested" return len(activeObjectQueue) < self.capacity and giverObject.haveToDispose( self )
[docs] def getEntity(self): """gets an entity from the predecessor that the predecessor index points to""" entities = [ent.id for ent in self.Res.users] self.level_history.append((self.env.now, self.id, entities, len(self.Res.users))) activeEntity = CoreObject.getEntity(self) # run the default behavior # if the level is reached then try to signal the Router to reallocate the operators try: if self.level: if ( len(self.getActiveObjectQueue()) == self.level and self.checkForDedicatedOperators() ): self.requestAllocation() except: pass return activeEntity
[docs] def canDeliver(self, entity=None): """checks whether the entity can proceed to a successor object""" assert self.isInActiveQueue(entity), ( entity.id + " not in the internalQueue of" + self.id ) activeEntity = entity mayProceed = False # for all the possible receivers of an entity check whether they can accept and then set accordingly the canProceed flag of the entity for nextObject in [ object for object in self.next if object.canAcceptEntity(activeEntity) ]: activeEntity.proceed = True activeEntity.candidateReceivers.append(nextObject) mayProceed = True return mayProceed
[docs] def sortEntities(self): """sorts the Entities of the Queue according to the scheduling rule""" # if we have sorting according to multiple criteria we have to call the sorter many times if self.schedulingRule == "MC": for criterion in reversed(self.multipleCriterionList): self.activeQSorter(criterion=criterion) # else we just use the default scheduling rule else: self.activeQSorter()
[docs] def activeQSorter(self, criterion=None): """sorts the Entities of the Queue according to the scheduling rule""" activeObjectQ = self.Res.users if criterion == None: criterion = self.schedulingRule # if the schedulingRule is first in first out if criterion == "FIFO": pass # if the schedulingRule is based on a pre-defined priority elif criterion == "Priority": activeObjectQ.sort(key=lambda x: x.priority) # if the schedulingRule is earliest due date elif criterion == "EDD": activeObjectQ.sort(key=lambda x: x.dueDate) # if the schedulingRule is earliest order date elif criterion == "EOD": activeObjectQ.sort(key=lambda x: x.orderDate) # if the schedulingRule is to sort Entities according to the stations they have to visit elif criterion == "NumStages": activeObjectQ.sort(key=lambda x: len(x.remainingRoute), reverse=True) # if the schedulingRule is to sort Entities according to the their remaining processing time in the system elif criterion == "RPC": for entity in activeObjectQ: RPT = 0 for step in entity.remainingRoute: processingTime = step.get("processingTime", None) if processingTime: RPT += float(processingTime.get("Fixed", {}).get("mean", 0)) entity.totalRemainingProcessingTime = RPT activeObjectQ.sort( key=lambda x: x.totalRemainingProcessingTime, reverse=True ) # if the schedulingRule is to sort Entities according to longest processing time first in the next station elif criterion == "LPT": for entity in activeObjectQ: processingTime = entity.remainingRoute[0].get("processingTime", None) if processingTime: entity.processingTimeInNextStation = float( processingTime.get("Fixed", {}).get("mean", 0) ) else: entity.processingTimeInNextStation = 0 activeObjectQ.sort( key=lambda x: x.processingTimeInNextStation, reverse=True ) # if the schedulingRule is to sort Entities according to shortest processing time first in the next station elif criterion == "SPT": for entity in activeObjectQ: processingTime = entity.remainingRoute[0].get("processingTime", None) if processingTime: entity.processingTimeInNextStation = float( processingTime.get("Fixed", {}).get("mean", 0) ) else: entity.processingTimeInNextStation = 0 activeObjectQ.sort(key=lambda x: x.processingTimeInNextStation) # if the schedulingRule is to sort Entities based on the minimum slackness elif criterion == "MS": for entity in activeObjectQ: RPT = 0 for step in entity.remainingRoute: processingTime = step.get("processingTime", None) if processingTime: RPT += float(processingTime.get("Fixed", {}).get("mean", 0)) entity.totalRemainingProcessingTime = RPT activeObjectQ.sort( key=lambda x: (x.dueDate - x.totalRemainingProcessingTime) ) # if the schedulingRule is to sort Entities based on the length of the following Queue elif criterion == "WINQ": from manpy.simulation.core.Globals import G for entity in activeObjectQ: if len(entity.remainingRoute) > 1: nextObjIds = entity.remainingRoute[1].get("stationIdsList", []) for obj in G.ObjList: if obj.id in nextObjIds: nextObject = obj entity.nextQueueLength = len(nextObject.Res.users) else: entity.nextQueueLength = 0 activeObjectQ.sort(key=lambda x: x.nextQueueLength) else: assert False, "Unknown scheduling criterion %r" % (criterion,)
[docs] def outputResultsJSON(self): """Outputs the result to a JSON file.""" from manpy.simulation.core.Globals import G json = { "_class": "manpy.%s" % self.__class__.__name__, "id": str(self.id), "family": self.family, "results": {}, } if self.gatherWipStat: json["results"]["wip_stat_list"] = self.WipStat G.outputJSON["elementList"].append(json)