第二次习题会将重点放在前面8章的课程内容上,需要熟练掌握课程中提到的所有.py文件之间的内部联系,尤其是各个类之间的相互调用。这一点同样适用于习题3和4.
本章用到的类有:EvaluationLogic(class EvaluationLogic),InputData(class DataMachine, class DataJob, class InputJob)和OutputData(class OutputJob, class Solution)。我在这里按照业务流程附上源代码,供复习用。
*注意:整个优化流程是一步步建立起来的,从导入数据,到启发式算法,再到优化算法。理解背后的逻辑关系非常重要
- 首先是
InputData.py
:
import json
# 机器类,功能很简单,构造函数只需要机器的id。__str__()函数用于输出 Machine...。需要注意被修饰器修饰的函数,这个函数可以当作属性来调用(调用函数时的括号可以省略)
class DataMachine:
def __init__(self, machineId):
self.__machId = machineId
def __str__(self):
result = "Machine " + str(self.__machId)
return result
@property
def MachineId(self):
return self.__machId
# 这个类用于构造一个订单Job对象,构造函数的参数表来自json文件中的key
class DataJob:
def __init__(self, idJob, processingTimes, setupTimes, dueDate, tardinessCost):
self.__jobId = idJob
self.__processingTimes = processingTimes
self.__dueDate = dueDate
self.__setupTimes = setupTimes
self.__tardCost = tardinessCost
# 用于打印对象时自动调用,表达某个订单需要在多少台机器上处理(operations),以及每台机器处理该订单的时长
def __str__(self):
result = f"Job {self.__jobId} with {len(self.__processingTimes)} Operations:\n" # 一个订单共需要几次操作??
for opId, processingTime in enumerate(self.__processingTimes):
result += f"Operation {opId} with Processingtime: {processingTime} \n" # 每台机器上各需要加工多久???
return result # __str__函数一定要有返回值!!
# DataJob的5个属性再加上一个Operation(操作次数)都以成员函数的形式给出。注意这里分两种情况,
# 1.成员函数的参数表只有self,不需要额外的参数。也就是说一个DataJob对象完全可以利用自身属性作为该成员函数的返回值。
# 这种情况需要使用修饰器,这样我们一方面可以像调用属性那样调用函数,比较方便。另一方面,我们的属性是以私有private形式定义的,在其他类中无法调用,
# 因此需要把属性定义成函数的形式,在使用修饰器的情况下,在调用的时候就可以假装属性是公有的
# 2.成员函数有其他的参数,这种情况下就不适合用修饰器,而是要定义成普通的成员函数,这样这个类的对象就可以随时调用该函数。
# 比如我想知道一个订单job在第一台机器上的加工时间,可以用job.ProcessingTime(1)
@property
def JobId(self):
return self.__jobId
@property
def TardCost(self):
return self.__tardCost
@property
def DueDate(self):
return self.__dueDate
@property
def Operations(self): ## 注意operations这个特殊的形式,opID就是机器id
return [(opId, processingTime) for opId, processingTime in enumerate(self.__processingTimes)]
def SetupTime(self, position):
return self.__setupTimes[position]
def ProcessingTime(self, position):
return self.__processingTimes[position]
# 这个类用于导入json数据,json中包含项目名称,机器数量m,订单数量n,所有订单的详细信息InputJobs。注意类的属性,尤其是InputJobs
class InputData:
def __init__(self, path):
self.__path = path
self.DataLoad() # 构造方法中调用成员函数,类似习题1,即在构造对象时完成成员函数的调用。对象不需要再次调用成员函数
def DataLoad(self):
with open(self.__path, "r") as inputFile:
inputData = json.load(inputFile) # inputData是个字典!!
self.n = inputData['nJobs']
self.m = inputData['nMachines']
self.InputJobs = list()
self.InputMachines = list()
for job in inputData['Jobs']: # inputData['Jobs']是一个列表,其元素job是字典。
# 下一行构造DataJob对象,并把对象加到InputJobs中 --> InputJobs是个由DataJob对象组成的list,而DataJob类有__str__函数,因此InputJobs中的元素可以遍历后打印
self.InputJobs.append(DataJob(job['Id'], job['ProcessingTimes'], job["SetupTimes"], job['DueDate'], job['TardCosts']))
for k in range(self.m):
self.InputMachines.append(DataMachine(k))
- 在
InputData.py
的基础上,我们稍微进行处理,增加几个必要的属性,得到OutPutJob
。再在OutputJob
的基础上加上排列组合Permutation,形成Solution
,即解决方案:
from InputData import *
# 定义一个DataJob类的子类
class OutputJob(DataJob):
# 该子类的构造函数需要DataJob类的对象作为参数,并用这个对象的属性进行计算
def __init__(self, dataJob):
# 调用父类的构造函数构造出DataJob对象
super().__init__(dataJob.JobId, [dataJob.ProcessingTime(i) for i in range(len(dataJob.Operations))],
[dataJob.SetupTime(i) for i in range(len(dataJob.Operations))], dataJob.DueDate, dataJob.TardCost)
# 子类的对象在父类基础上多了下面几个属性, 全部初始化为0,若属性为列表,则初始化为有0组成的列表,列表共有n个元素。
# 记住这些属性!别跟Solution类的属性混淆。
self.StartSetups = [0]*len(self.Operations) # 全部元素被初始化为0.Operations是每个job被处理的时间长度组成的list
self.EndSetups = [0]*len(self.Operations)
self.StartTimes = [0]*len(self.Operations)
self.EndTimes = [0]*len(self.Operations)
self.Tardiness = 0
# 看构造函数的参数表,需要一个订单列表和所有订单的排列,才能构造出一个solution对象
class Solution:
def __init__(self, jobList, permutation): # jobList实际上是InputData类的对象的InputJobs属性,是个列表
self.OutputJobs = {}
# 遍历jobList,jobId是列表的索引,job是一个DataJob类的订单
for jobId, job in enumerate(jobList):
# 用DataJob类的对象作为参数,构造出OutputJob的对象,并把OutputJob对象加入字典中
self.OutputJobs[jobId] = OutputJob(job)
self.Permutation = permutation
self.Makespan = -1
self.TotalTardiness = -1
self.TotalWeightedTardiness = -1
# Solution对象的输出,重点在订单排列和总时长Makespan。Makespan需要经过EvaluationLogic才能被赋值。详情见前八章
def __str__(self):
return "The permutation " + str(self.Permutation) + " results in a Makespan of " + str(self.Makespan)
# 这个成员函数用于自定一个排列方式
def SetPermutation(self, permutation):
self.Permutation = permutation
def WriteSolToCsv(self, fileName):
csv_file = open(fileName, "w")
csv_file.write('Machine,Job,Start_Setup,End_Setup,Start,End\n')
for job in self.OutputJobs.values():
for i in range(len(job.EndTimes)):
csv_file.write(str(i + 1) + "," + str(job.JobId) + "," + str(job.StartSetups[i]) + ","
+ str(job.EndSetups[i]) + "," + str(job.StartTimes[i]) + "," + str(job.EndTimes[i]) + "\n")
- 通过观察可以发现,此时
Solution
类中的3个默认值为-1的属性还没赋值,因此这里再引入一个我写的库:EvaluationLogic.py
(注意:在这次练习题中,EL类的功能并没有写完整,详情见前8章):
import numpy
"""注意:这个类没有构造函数,因此在构造EvaluationLogic对象的时候,没有任何函数会自动执行!!!"""
# 没有构造函数意味着,每当我们构造这个类的对象时,要想实现某种功能,就必须调用成员函数
# # 无论是currentJob还是firstJob,都是outputJob类。
# 这个类的最终目的就是计算当前方案current Solution的总时长Makespan!!!
# 前8章还将了怎么在这个类中计算订单延期
class EvaluationLogic:
# 这个成员函数需要一个Solution类的对象作为参数
def DefineStartEnd(self, currentSolution):
#####
# schedule first job: starts when finished at previous stage
firstJob = currentSolution.OutputJobs[currentSolution.Permutation[0]]
# firstJob.EndTimes= 是在往0组成的list中填入累积时间
firstJob.EndTimes = numpy.cumsum([firstJob.ProcessingTime(x) for x in range(len(firstJob.EndTimes))]) # 后半部分的len(.endtimes)调用的还是由0组成的list,此处仅仅计算元素个数
firstJob.StartTimes[1:] = firstJob.EndTimes[:-1]
# firstjob开始时间默认是0,因此starttimes从第二项开始赋值,第二个机器上开始工作的时间是第一个机器结束的时间
#####
# schedule further jobs: starts when finished at previous stage and the predecessor is no longer on the considered machine
for j in range(1,len(currentSolution.Permutation)): # 上面几行把0号也就是第一个job的开始时间和结束时间列表做好了,下面从1号第二个job开始指定开始结束时间
currentJob = currentSolution.OutputJobs[currentSolution.Permutation[j]]# 当前job是permutation中的第j个
previousJob = currentSolution.OutputJobs[currentSolution.Permutation[j-1]]# j-1没有顾虑,因为j从1 开始
# first machine
currentJob.StartTimes[0] = previousJob.EndTimes[0] # Starttimes的首个元素就是在第一个机器上开始处理的时间
currentJob.EndTimes[0] = currentJob.StartTimes[0] + currentJob.ProcessingTime(0)
# other machines
for i in range(1,len(currentJob.StartTimes)):
currentJob.StartTimes[i] = max(previousJob.EndTimes[i], currentJob.EndTimes[i-1]) # 当前job在下一台机器的开始时间取决于两方面,1:当前job在上一台机器的结束时间和2: 前一个job在当前机器的结束时间
currentJob.EndTimes[i] = currentJob.StartTimes[i] + currentJob.ProcessingTime(i) # 当前job的结束时间等于当前job在当前机器的开始时间加上处理时间
#####
# Save Makespan and return Solution
currentSolution.Makespan = currentSolution.OutputJobs[currentSolution.Permutation[-1]].EndTimes[-1]
def CalcuateTardyJobs(solution):
late_jobs = []
for job in solution.OutputJobs.values():
finish = job.EndTimes[-1]
if finish > job.DueDate:
late_jobs.append(job)
return len(late_jobs)
再介绍一下怎么把OutputJobs的几个属性初始化成由n个0组成的list:
下面开始进入正题:
1 流程优化问题的第一步:Solution
a)首先从提供的.py文件中导入所有的类和对象。然后创建对象data作为类InputData的实例。提供的.json文件(VFR10_10_10_SIST.json)中的数据作为输入数据。对于这个样本数据,我们从生产中向你推荐了7-6-8-10-2-9-1-4-3-5的序列。为此,创建一个Solution类的对象,并将其命名为recommendedSolution。如果忽略了整备时间,这个解决方案的Makespan是多少?输出Solution。
from InputData import *
from OutputData import *
from EvaluationLogic import *
data = InputData('VFR10_10_10_SIST.json')
permList = [7,6,8,10,2,9,1,4,3,5]
perm = [x-1 for x in permList]
recommendedSolution = Solution(data.InputJobs, perm)
print(recommendedSolution)
EvaluationLogic().DefineStartEnd(recommendedSolution)
print(recommendedSolution)
*注意:在使用Solution构造对象时,排列permutation会在构造函数调用时被指定,但是makespan只有在调用EvaluationLogic().DefineStartEnd()之后才会被赋值
b)创建另一个Solution类的对象simpleSolution,其中的订单是根据FCFS规则安排的。计算两个解决方案recommendedSolution和simpleSolution*之间的delta,并输出它。
simpleSolution = Solution(data.InputJobs, range(len(data.InputJobs)))
EvaluationLogic().DefineStartEnd(simpleSolution)
# 1 Punkt: Berechnung Delta und Ausgabe
delta = recommendedSolution.Makespan - simpleSolution.Makespan
print(f'The objective delta is {delta}')
simpleSolution = Solution(data.InputJobs, [x for x in range(len(data.InputJobs))])
EvaluationLogic().DefineStartEnd(simpleSolution)
print(f'The objective delta is {simpleSolution.Makespan - recommendedSolution.Makespan}')
*注意:两种方法都可行
c) 编写一个函数CalcuateTardyJobs,计算一个解决方案Solution有多少个订单被延期。然后将这个函数添加到EvaluationLogic类中,并确定两个解决方案recommendedSolution和simpleSolution的延期订单的数量。
这里提供两种解题方法,思路是一样的,不过第一个方法是对字典的键进行遍历,第二种是对值进行遍历;
def CalculateTardyJobs(solution):
delayed = 0
for id in solution.OutputJobs: ### 对字典的key进行遍历,比较麻烦
if solution.OutputJobs[id].EndTimes[-1] > solution.OutputJobs[id].DueDate:
delayed += 1
return delayed
setattr(EvaluationLogic, 'CalculateTardyJobs', CalcuateTardyJobs)
print(f'Tardy jobs of simpleSolution: {CalcuateTardyJobs(simpleSolution)}')
print(f'Tardy jobs of recommendedSolution: {CalcuateTardyJobs(recommendedSolution)}')
def CalculateTardyJobs(solution):
delayed = 0
for job in solution.OutputJobs.values():
finish = job.EndTImes[-1]
if finish > job.DueDate:
delayed += 1
return delayed
setattr(EvaluationLogic, 'CalculateTardyJobs', CalcuateTardyJobs)
print(f'Tardy jobs of simpleSolution: {CalcuateTardyJobs(simpleSolution)}')
print(f'Tardy jobs of recommendedSolution: {CalcuateTardyJobs(recommendedSolution)}')
*注意:这两种方法没有绝对意义的好坏,使用时要分场景。此外这里的两种方法都是采用计数器的思路,我们也可以用一个空列表替换计数器,每当有订单延期,就把这个延期的订单append到空列表中,然后按照题目要求输出列表的length
2 可接受性检查
到目前为止,一些规划数据有可能因为硬件问题被损坏或销毁。
编写一个函数ValidateSolution,检查一个解决方案是否有效。应考虑以下情况。
- 在一个解决方案的表述中,有太少/太多的订单号。
- 一个解决方案包含重复的订单号。
- 一个订单在连续的机器上的开始和结束时间是有重叠的。
为每种可能的数据损坏创建适当的输出。函数ValidateSolution应该输出一个解决方案的全部错误,而不是发现一个错误就停止验证。如果没有发现错误,也应该输出没有错误的信息反馈。
还是两种思路:
def ValidateSol(dataSet, solution):
title = 'Reported Corruptions: \n'
if len(solution.Permutation) != len(dataSet.InputJobs):
title += '\t - Detected wrong number of IDs in solution representation.\n'
if len(set(solution.Permutation)) != len(solution.Permutation):
title += '\t - Detected duplicate IDs in solution representation.\n'
if True:
for job in solution.OutputJobs.values():
for i in range(len(job.EndTimes)):
if job.EndTimes[i] > job.StartTimes[i+1]:
title += '\t Detected intersected start- and endtimes. \n'
if title == 'Reported Corruptions: \n':
title += '\t - No corruptions detected.\n'
print(ValidateSolution(simpleSolution))
print(ValidateSolution(Solution(data.InputJobs, [0, 3, 2, 3, 1, 4, 5, 8])))
print(ValidateSolution(Solution(data.InputJobs, [0, 3, 2, 1, 4, 5, 8, 7, 10, 9, 11, 12])))
def ValidateSolution(solution):
intersect = False
title = 'Reported Corruptions: \n' ##
if len(solution.Permutation) != 10:
title += '\t- Detected wrong number of IDs in solution representation.\n'
if len(solution.Permutation) != len(set(solution.Permutation)):
title += '\t- Detected duplicate IDs in solution representation.\n'
if intersect == False:
for job in solution.OutputJobs.values():
for i in range(1, len(job.EndTimes)):
if job.EndTimes[i] < job.StartTimes[i - 1]:
title += '\t- Die Start- und Endzeiten eines Auftrags auf aufeinanderfolgenden Maschinen überschneiden sich.\n'
intersect == True
break
if (len(solution.Permutation) == 10) and (len(solution.Permutation) == len(set(solution.Permutation))) and intersect == False:
title += '\t- No corruptions detected.\n'
return title
3 CDS Heuristic
def CampellDudekSmith(inputJobs):
solutionPool = {}
nMachines = len(inputJobs[0].Operations)
for i in range(nMachines-1):
minimumFirstMachine = []
notMinimumFirstMachine = []
for j, job in enumerate(inputJobs):
thetaFirst = sum(job.ProcessingTime(x) for x in range(0,i + 1))
thetaSecond = sum(job.ProcessingTime(x) for x in range(nMachines-i-1,nMachines))
if(thetaFirst < thetaSecond):
minimumFirstMachine.append((j,thetaFirst))
else:
notMinimumFirstMachine.append((j,thetaSecond))
minimumFirstMachine.sort(key=lambda x: x[1])
notMinimumFirstMachine.sort(key=lambda x: x[1], reverse = True)
tmpPermutation = [x[0] for x in minimumFirstMachine]
tmpPermutation.extend([x[0] for x in notMinimumFirstMachine])
tmpSolution = Solution(inputJobs, tmpPermutation)
EvaluationLogic().DefineStartEnd(tmpSolution)
solutionPool[i] = tmpSolution
bestSolution = solutionPool[0]
for sol in solutionPool.values():
if bestSolution.Makespan > sol.Makespan:
bestSolution = sol
EvaluationLogic().DefineStartEnd(bestSolution)
return bestSolution
CDSSol = CampellDudekSmith(data.InputJobs)
print(CDSSol)