7 启发式求解器的软件结构和优化算法

在企业的实际生产中,我们的算法体系需要被设计成软件才能被使用。也就是说我在这里编写的所有代码,实际上是软件用户界面背后的逻辑。这一章我也只是按照UML类图整理一下业务逻辑,开发图形界面并不是我的专业。

可以看一下下图所示的求解器Solver的结构。要想利用Solver来获取解决方案,首先需要一个输入数据InputData,并且需要将这个输入数据经过合理的处理,将其转变成输出数据OutputData。有了数据以后,还需要利用算法将这些数据逐步分析和计算。

到目前为止,我们已经完成了数据模型的建立和处理,以及用算法构建(Konstruktiv)和评估(Evaluation)。前面的章节也提到过,启发式算法构建的解决方案未必是最优解,因此本章我们来讲解如何逐步优化启发式算法提供的解决方案,并把到目前为止已经涉及到的算法和结构体系化。

Solver结构
UML-类图

1 Solution的容器:SolutionPool

上一章我们已经利用启发式算法得出了不同的解决方案,例如NEH算法总能提供比较接近最优解的方案,但是NEH算法并没有顾及由于延期交货导致的罚款问题。如果考虑罚款带来的额外成本,NEH的方案未必比得上EDD。

因此我们需要把这些备选方案存放在一个类中,这个类就是SolutionPool。初步设想这个类应该有一个类型为列表的属性,专门用于存放Solution,还要有一个AddSolution成员函数,可以将Solution加到容器中。此外,我还希望SolutionPool能够告诉我,哪个方案能输出最短的生产时间。(当然也可以考虑再加入一个函数,用来处理延期罚款)。

class SolutionPool:
    def __init__(self):
        self.Solutions = []  # SolutionPool 有一个定语叫做Solutions,是个list,列表中的元素是Solution

    def AddSolution(self, newSolution): 
        self.Solutions.append(newSolution)

    def GetLowestMakespanSolution(self):
        self.Solutions.sort(key = lambda solution: solution.Makespan) # sort solutions according to makespan默认从小到大
        return self.Solutions[0]  # 首个元素是时长最短的


        # 存疑[已解决]:solution对象只有经过EvaluationLogic之后,solution的定语makespan才会被赋值!!!
        # 参见OutputData和EvaluationLogic文件

2 把所有启发式算法写入ConstructiveHeuristics

这个类存在的目的就是通过这个类构造出对象,再用对象调用成员函数(启发式算法),然后返回一个Solution。这个Solution在返回时已经执行了EvaluationLogic,也就是说StartTimes和EndTimes等属性已经被赋值。在类定义的最后还要把每种启发式算法生成的Solution加入到SOlutionPool中。
*注意:上一章强调过,类的成员函数定义时,一定要把self作为第一个参数,否则会输出参数数量不一致的错误

class ConstructiveHeuristics: # 由多个不同的启发式算法构成
    def __init__(self, evaluationLogic, solutionPool):
        self.RandomSeed = 2021
        self.RandomRetries = 10
        self.EvaluationLogic = evaluationLogic
        self.SolutionPool = solutionPool
# FCFS
    def FirstComeFirstServe(self, jobList):
        tmpPermutation = [*range(len(jobList))] # *的作用是将range创建的可迭代对象进行解包。如果省略星号,会创建一个列表,只包含一个元素,该元素就是range对象

        tmpSolution = Solution(jobList, tmpPermutation) # 构造Solution对象。注意该构造方法的使用,参见OutputJobs中的必选参数(无默认值)
        self.EvaluationLogic.DefineStartEnd(tmpSolution) # 此处的EvaluationL是个对象  ??,所以才能调用EL类中的成员函数。这一步是为了计算该temSolution的总时间

        return tmpSolution

# SPT
    def ShortestProcessingTime(self, jobList, allMachines = False): # 注意只有类的成员变量才需要self,函数的本地变量不需要
        jobPool = []
        if allMachines:
            for i in range(len(jobList)):
                jobPool.append((i, sum(jobList[i].ProcessingTime(x) for x in range(len(jobList[i].Operations)))))
        else:
            for i in range(len(jobList)):
                jobPool.append((i, jobList[i].ProcessingTime(0)))
            
        jobPool.sort(key= lambda t: t[1])
        tmpPermutation = [x[0] for x in jobPool]

        tmpSolution = Solution(jobList, tmpPermutation)
        self.EvaluationLogic.DefineStartEnd(tmpSolution)

        return tmpSolution

# LPT 最费时间的先做
    def LongestProcessingTime(self, jobList, allMachines = False):
        jobPool = []
        if allMachines:
            for i in range(len(jobList)):
                jobPool.append((i, sum(jobList[i].ProcessingTime(x) for x in range(len(jobList[i].Operations)))))
        else:
            for i in range(len(jobList)):
                jobPool.append((i, jobList[i].ProcessingTime(0)))
            
        jobPool.sort(key= lambda t: -t[1]) # 按照jobPool中,元组的第二个元素的相反数进行排列。排在前面的绝对值更大,也就是时间更长
        tmpPermutation = [x[0] for x in jobPool] # 这一步是把jobPool中每个元组的首位元素,即jobid 放在列表中作为Permutation

        tmpSolution = Solution(jobList, tmpPermutation)
        self.EvaluationLogic.DefineStartEnd(tmpSolution) # 现在已经生成了solution,需要用EvaluationLogic类来调用EL类中的成员函数,以计算该solution的总时长

        return tmpSolution

# ROS 随机完成job
    def ROS(self, jobList, x, seed):
        np.random.seed(seed) ###
        tmpSolution = Solution(jobList, 0)
        bestCmax = np.inf # unendless 无穷大的浮点数

        for i in range(x):
            tmpPermuation = np.random.permutation(len(jobList)) #np.random.permutation生成随机数列,长度是len()
            # tmpSolution.SetPermutation(tmpPermuation) # SetPermutation是Solution类的成员函数
            tmpSolution.Permutation = tmpPermuation # 这样比上一行更好

            self.EvaluationLogic.DefineStartEnd(tmpSolution)

            if (tmpSolution.Makespan < bestCmax):
                bestCmax = tmpSolution.Makespan
                bestPerm = tmpPermuation
    
        bestSol = Solution(jobList, bestPerm)  # ROS中还要筛选出最好的哪个随机数列
        self.EvaluationLogic.DefineStartEnd(bestSol)

        return bestSol    

# NEH(要首先确定最佳插入顺序)  ### NEH的插入函数写在EvaluationLogic类中。这样EL类既可以确定起止时间也能提供最佳插入顺序
    # def DetermineBestInsertion(solution, jobToInsert): #jobToInsert是指需要被插入的那个job
    #     ###
    #     # insert job at front of permutation
    #     solution.Permutation.insert(0, jobToInsert)  # insert函数,在索引位置插入数据,索引后面的往后顺移一位。初始肯定在第0位插入
    #     bestPermutation = deepcopy(solution.Permutation)
    
    #     EvaluationLogic().DefineStartEnd(solution)
    #     bestCmax = solution.Makespan

    #     ###
    #     # swap job i to each position and check for improvement
    #     lengthPermutation = len(solution.Permutation) - 1 #减去1是为了得到使用insert时的索引号
    #     for j in range(0, lengthPermutation):
    #         solution.Permutation[j], solution.Permutation[j + 1] = solution.Permutation[j+1], solution.Permutation[j]
    #         EvaluationLogic().DefineStartEnd(solution)
    #         if(solution.Makespan < bestCmax):
    #             bestCmax = solution.Makespan
    #             bestPermutation = [x for x in solution.Permutation]

    #     solution.Makespan = bestCmax
    #     solution.Permutation = bestPermutation

    def NEH(self, jobList):
        jobPool = []
        tmpPerm = []
        bestCmax = 0
        # Calculate sum of processing times and sort
        for i in range(len(jobList)):
            jobPool.append((i,sum(jobList[i].ProcessingTime(x) for x in range(len(jobList[i].Operations)))))
        jobPool.sort(key=lambda x: x[1], reverse=True)

        # Initalize input
        tmpNEHOrder = [x[0] for x in jobPool]
        tmpPerm.append(tmpNEHOrder[0])
        tmpSolution = Solution(jobList,tmpPerm)

        # Add next jobs in a loop and check all permutations
        for i in range(1,len(tmpNEHOrder)):
            # add next job to end and calculate makespan
            self.EvaluationLogic.DetermineBestInsertion(tmpSolution, tmpNEHOrder[i])
    
        return tmpSolution


    def Run(self, inputData, solutionMethod):
        print('Generating an initial solution according to ' + solutionMethod + '.') # 能直接用加号连结说明solutionMethod是字符串,即算法的名字

        solution = None

        if solutionMethod == 'FCFS':
            solution = self.FirstComeFirstServe(inputData.InputJobs)
        elif solutionMethod == 'SPT':
            solution = self.ShortestProcessingTime(inputData.InputJobs)
        elif solutionMethod == 'LPT':
            solution = self.LeastProcessingTime(inputData.InputJobs)
        elif solutionMethod == 'ROS':
            solution = self.ROS(inputData.InputJobs, self.RandomRetries, self.RandomSeed)
        elif solutionMethod == 'NEH':
            solution = self.NEH(inputData.InputJobs)
        else:
            print('Unkown constructive solution method: ' + solutionMethod + '.')

        self.SolutionPool.AddSolution(solution) #这里的定语solution Pool属于solutionPool类,因此可用该类的成员函数

3 创建Solver类

Solver求解器位于类图的最顶端,因此这个类至关重要。通过观察类图我们得知,Solver应该有以下三个函数:

  • ConstructionPhase(constructiveSolutionMethod)
  • ImprovementPhase(startSolution, algorithm)
  • RunLocalSearch(constructiveSolutionMethod, algorithm)

由于现在我们还处在构建阶段ConstructionPhase,现在只讨论这一个函数。
在构建阶段,我们只需要往成员函数ConstructionPhase中输入一个启发式算法的名称,然后利用ConstructiveHeuristic的实例去调用Run函数,当然我们可以使用不同的算法多次使用Run写入Solver的SolutionPool,最后输出那个总时长最短的方案

class Solver:
    def __init__(self, inputData, seed):
        self.InputData = inputData
        self.Seed = seed
        self.RNG = numpy.random.default_rng(seed)  #随机数生成器

        self.EvaluationLogic = EvaluationLogic(inputData) # 定语是El对象,可以调用类中的方法??
        self.SolutionPool = SolutionPool() # 该构造方法没有必选参数
        
        self.ConstructiveHeuristic = ConstructiveHeuristics(self.EvaluationLogic, self.SolutionPool)  # 

    def ConstructionPhase(self, constructiveSolutionMethod):
        self.ConstructiveHeuristic.Run(self.InputData, constructiveSolutionMethod) #ConstructiveHeuristic实际上就是一个solution

        bestInitalSolution = self.SolutionPool.GetLowestMakespanSolution()  # solutionPool中,最短时长即为最优解 -->每次调用这个成员函数,SP中只有一个SOlution?

        print("Constructive solution found.")
        print(bestInitalSolution)

        return bestInitalSolution
# 这里我从外部导包
from Solver import *

# 实例化输入数据
data  = InputData("InputFlowshopSIST.json")

# 实例化Solver类
solver  = Solver(data, 2048) #创建一个solver对象

# 用Solver对象调用ConstructionPhase
solver.ConstructionPhase('FCFS')
solver.ConstructionPhase('NEH')
solver.ConstructionPhase('LPT')
启发式算法得出的初始最优解

4 优化算法:Neighbourhoods and exchanges

启发式算法得出的初始最优解很有可能与真实的最优解有很大的偏差,因此我们需要对其进行优化。我们首选的优化算法是邻域和交换,原因是这种方法最简单。

邻域包含一个或多个解决方案(邻域解决方案),这些解决方案可以通过交换从一个给定的解决方案到达。

在详细讲述邻域交换之前,先来用两种方式评估一下一个FCFS方案。

假定现在我们有一个FCFS的方案,有下面两种方法来评估这个方案的好坏:

1

2

*注意:方法1中,可以直接用之前创建的Solver对象来调用评估函数,也可以像方法2那样,直接调用评估函数,但是评估函数需要一个必选参数——InputJobs类的对象。

现在考虑FCFS前两个元素互换位置的情况:

swapMovePermutation = list(permutationFCFS) #create a copy of the permutation
swapMovePermutation[0] = permutationFCFS[1]  # 交换顺序可以通过元素的赋值来实现,这点在讲NEH插入的时候用到过
swapMovePermutation[1] = permutationFCFS[0]

swapMoveSolution = Solution(data.InputJobs, swapMovePermutation)


solver.EvaluationLogic.DefineStartEnd(swapMoveSolution)
print(swapMoveSolution)

前两个元素互换

*通过对比,仅仅将前两个元素互换,总时长跟之前相比缩短了。由此可见,互换确实能够产生有意义的结果。

为了找到进一步的改进,有必要更广泛地探索邻域的情况。初始解决方案的交换邻域中的所有交换都应该被创建和评估。为了这个目的,我们创建了新的类SwapMove

交换公式

class SwapMove: # 跟上一个单元格原理类似,列表元素的互换
# 注意这个类中的数据类型,swapMove对象有个Permutation属性,我们实际上交换的也正是这个属性。后面调用的时候会用到
    def __init__(self, initialPermutation, indexA, indexB):
        self.Permutation = list(initialPermutation) # create a copy of the permutation
        self.IndexA = indexA
        self.IndexB = indexB    #先初始化成员变量(定语),这是面向对象必须的一步

        # 对成员变量进行修改,因此不需要返回值
        self.Permutation[indexA] = initialPermutation[indexB]
        self.Permutation[indexB] = initialPermutation[indexA]

*注意:在我们定义类的时候,通常越普遍越好,例如这个SwapMove类,它的作用在于借助initialPermutation,把属性Permutation中位于indexA和B的两个元素互换。这两个index可以随意指定,至于指定哪些值,则需要按照具体需求给出参数

现在我们来把FCFS中的排列进行换位,这里需要注意对称性,因为我们把1,2两个元素换位后,再换回来是完全没有意义的。

swapMoves = []
for i in range(len(permutationFCFS)):
    for j in range(len(permutationFCFS)):
        if i < j:
            swapMove = SwapMove(permutationFCFS, i, j)
            swapMoves.append(swapMove)

print(len(swapMoves))  # 输出共有多少种交换的情况
# 注意这里swapmoves不能直接打印,因为里面装的是类SwapMove的对象

答案是55种。
简单分析一下,这个问题相当于从n=11个订单中随机不重复选出两个订单进行换位



现在再来用math模块检验一下:

import math # 这一步的目的是输出一共有几次换位

expectedNumberOfMoves = math.comb(len(data.InputJobs), 2) #binomial coefficient "n choose k" 直接输出换位几次
actualNumberOfMoves = len(swapMoves) #swapMoves中容纳了所有换位的可能情况

print(f'there schould exist {expectedNumberOfMoves} moves , and this is {actualNumberOfMoves == expectedNumberOfMoves}')
结果正确

新的交换已被创建,因此有55个新的候选解决方案。现在必须对新的候选解决方案进行评估,以确定是否其中至少有一个解决方案带来了改进。我们将解决方案保存在一个列表中,以便我们以后可以访问它们,并确定找到的最佳候选解决方案。

# 把交换后的解决方案放进列表中并输出55个方案及其总时长
swapMoveSolutions = []
for move in swapMoves:

    # 使用SwapMove类的Permutation属性来构建Solution对象
    swapMoveSolution = Solution(data.InputJobs, move.Permutation)
    solver.EvaluationLogic.DefineStartEnd(swapMoveSolution)
    
    swapMoveSolutions.append(swapMoveSolution)

print('initial solution: ' + str(permutationFCFS))

for solution in swapMoveSolutions:
    print(solution)
55个方案

我们现在想要找到备选方案中的最佳解决方案,并检查它是否比FCFS解决方案更好。


可见,换位后我们得出了比FCFS更好的结果。那么这个更好的结果究竟在哪些位置发生了变化?试着输出有差异的元素的序号:


判断两个list有差别的元素索引

5 创建SwapNeighborhood类

我们现在把这个逻辑构建到一个上位的 SwapNeighborhood类中,代表一个邻域的互换N({swap}, s)。我们应如何进行?SwapNeighborhood类的各个组成部分是什么?需要执行哪些逻辑步骤?

"""这个类包括了所有从n个订单中选两个的可能排列"""
class SwapNeighborhood:
    def __init__(self, inputData, initialPermutation, evaluationLogic, solutionPool):
        self.InputData = inputData
        self.Permutation = initialPermutation
        self.EvaluationLogic = evaluationLogic
        self.SolutionPool = solutionPool  ## 跟下面的SwapMoveSolutions区分开!!!!!!

        self.Moves = []
        self.SwapMoveSolutions = []

    """生成所有可能的交换排列"""
    def DiscoverMoves(self):
        for i in range(len(self.Permutation)):
            for j in range(len(self.Permutation)):
                if i < j:
                    swapMove = SwapMove(self.Permutation, i, j)
                    self.Moves.append(swapMove)  # Self.Moves列表,里面装的是SwapMove对象

    """把生成的所有排列self.Moves中的每次交换,都进行实例化后评估。这里注意SwapMove类的属性。找出总时长最短"""
    def EvaluateMovesBestImprovement(self):
        for move in self.Moves:
            swapMoveSolution = Solution(self.InputData.InputJobs, move.Permutation)
            self.EvaluationLogic.DefineStartEnd(swapMoveSolution)

            self.SwapMoveSolutions.append(swapMoveSolution)


    """另一种评估方式:只评估到第一个优化现有方案的交换排列"""
    def EvaluateMovesFirstImprovement(self):
        bestObjective = self.SolutionPool.GetLowestMakespanSolution().Makespan

        for move in self.Moves:
            swapMoveSolution = Solution(self.InputData.InputJobs, move.Permutation)

            self.EvaluationLogic.DefineStartEnd(swapMoveSolution)
            self.SwapMoveSolutions.append(swapMoveSolution)

            if swapMoveSolution.Makespan < bestObjective:
                # 终止邻域评估,因为已经找到了第一个优化的结果:比pool中的最优结果还要好
                return

    """定义了两个评估函数,现在还需要定义一个接口,利用这个接口去调用两个评估函数"""
    def EvaluateMoves(self, evaluationStrategy):
        if evaluationStrategy == 'BestImprovement':
            self.EvaluateMovesBestImprovement()
        elif evaluationStrategy == 'FirstImprovement':
            self.EvaluateMovesFirstImprovement()
        else:
            print(f'Evaluation strategy {evaluationStrategy} not implemented! ')
            # 这个函数不涉及赋值操作,也不用返回。每次调用这个函数,要么调用其他函数,要么打印提示。因此用print就行不用return


    """两种评估方案各自产生的结果都被存放在self.SwapMoveSolutions中。现在要通过MakeBestMove函数来输出最优解"""
    def MakeBestMove(self):
        self.SwapMoveSolutions.sort(key = lambda solution: solution.Makespan) # 这里的solution是可以直接调用Makespan的,因为在两个评估方案中都是用了EvaluationLogic
        bestNeighborhoodSolution = self.SwapMoveSolutions[0]

        return bestNeighborhoodSolution

    """使用clear函数清空列表等容器。猜想:如果我们更换另一种排列方式,那么之前保存的moves和SwapMoveSolutions就没用了"""
    def Update(self, permutation):

            self.Permutation = permutation # 修改类的属性,修改后前面的函数执行都会改变,因此要清空两个列表属性

            self.Moves.clear()
            self.SwapMoveSolutions.clear()


    """进行本地搜索locaksearch,用手头现有的解决方案去跟经过交换的方案进行比较,默认交换后会有更好的方案(True)"""
    def LocalSearch(self, neighborhoodEvaluationStrategy, solution):
        hasSolutionImproved = True

        while hasSolutionImproved:
            self.Update(solution.Permutation)  # 清空两个列表
            self.DiscoverMoves() # 填充Moves列表
            self.EvaluateMoves(neighborhoodEvaluationStrategy) # 填充SwapMoveSolutions列表

            bestNeighborhoodSolution = self.MakeBestMove()

            if bestNeighborhoodSolution.Makespan < solution.Makespan:
                print('New best solution has been found!!!')
                print(bestNeighborhoodSolution)  # 对Solution类的对象执行print,会执行__str__

                self.SolutionPool.AddSolution(bestNeighborhoodSolution)

                solution.Permutation = bestNeighborhoodSolution.Permutation
                solution.Makespan = bestNeighborhoodSolution.Makespan

            else:
                print(f'Reached local optimum of {self.Type} neighborhood. Stop local search. ')
                hasSolutionImproved = False

我们将逻辑外包给Neighborhood.py模块,并将邻域嵌入ImprovementAlgorithm类中,这个类控制优化程序的逻辑。

Neighborhood.py文件中,已经包含了上一节写的代码。不过略有优化,SwapMove被定义成BaseNeighborhood的子类,这样SwapMove可以继承所有成员函数并根据子类的特有需求重写了父类的成员函数

from OutputData import *
from copy import deepcopy
class BaseNeighborhood:
    def __init__(self, inputData, initialPermutation, evaluationLogic, solutionPool):
        self.InputData = inputData
        self.Permutation = initialPermutation
        self.EvaluationLogic = evaluationLogic
        self.SolutionPool = solutionPool

        self.Moves = []
        self.MoveSolutions = []

        self.Type = 'None'

    def DiscoverMoves(self):
        raise Exception('DiscoverMoves() is not implemented for the abstract BaseNeighborhood class.')

    def EvaluateMoves(self, evaluationStrategy):
        if evaluationStrategy == 'BestImprovement':
            self.EvaluateMovesBestImprovement()
        elif evaluationStrategy == 'FirstImprovement':
            self.EvaluateMovesFirstImprovement()
        else:
            raise Exception(f'Evaluation strategy {evaluationStrategy} not implemented.')

    """ Evaluate all moves. """
    def EvaluateMovesBestImprovement(self):
        for move in self.Moves:
            moveSolution = Solution(self.InputData.InputJobs, move.Permutation)

            self.EvaluationLogic.DefineStartEnd(moveSolution)
            
            self.MoveSolutions.append(moveSolution)

    """ Evaluate all moves until the first one is found that improves the best solution found so far. """
    def EvaluateMovesFirstImprovement(self):
        bestObjective = self.SolutionPool.GetLowestMakespanSolution().Makespan

        for move in self.Moves:
            moveSolution = Solution(self.InputData.InputJobs, move.Permutation)

            if self.Type == 'BestInsertion':
                self.EvaluationLogic.DetermineBestInsertionAccelerated(moveSolution, move.removedJob)
            else:
                self.EvaluationLogic.DefineStartEnd(moveSolution)

            self.MoveSolutions.append(moveSolution)

            if moveSolution.Makespan < bestObjective:
                # abort neighborhood evaluation because an improvement has been found
                return

    def MakeBestMove(self):
        self.MoveSolutions.sort(key = lambda solution: solution.Makespan) # sort solutions according to makespan

        bestNeighborhoodSolution = self.MoveSolutions[0]

        return bestNeighborhoodSolution

    def Update(self, permutation):
        self.Permutation = permutation

        self.Moves.clear()
        self.MoveSolutions.clear()

    def LocalSearch(self, neighborhoodEvaluationStrategy, solution):        
        hasSolutionImproved = True

        while hasSolutionImproved:
            self.Update(solution.Permutation)
            self.DiscoverMoves()
            self.EvaluateMoves(neighborhoodEvaluationStrategy)

            bestNeighborhoodSolution = self.MakeBestMove()

            if bestNeighborhoodSolution.Makespan < solution.Makespan:
                # print("New best solution has been found!")
                print(bestNeighborhoodSolution)

                self.SolutionPool.AddSolution(bestNeighborhoodSolution)

                solution.Permutation = bestNeighborhoodSolution.Permutation
                solution.Makespan = bestNeighborhoodSolution.Makespan
            else:
                print(f"Reached local optimum of {self.Type} neighborhood. Stop local search.")
                hasSolutionImproved = False        

""" Represents the swap of the element at IndexA with the element at IndexB for a given permutation (= solution). """
class SwapMove:
    def __init__(self, initialPermutation, indexA, indexB):
        self.Permutation = list(initialPermutation) # create a copy of the permutation
        self.IndexA = indexA
        self.IndexB = indexB

        self.Permutation[indexA] = initialPermutation[indexB]
        self.Permutation[indexB] = initialPermutation[indexA]
        
""" Contains all $n choose 2$ swap moves for a given permutation (= solution). """
class SwapNeighborhood(BaseNeighborhood):
    def __init__(self, inputData, initialPermutation, evaluationLogic, solutionPool):
        super().__init__(inputData, initialPermutation, evaluationLogic, solutionPool)

        self.Type = 'Swap'

    """ Generate all $n choose 2$ moves. """
    def DiscoverMoves(self):
        for i in range(len(self.Permutation)):
            for j in range(len(self.Permutation)):
                if i < j:
                    swapMove = SwapMove(self.Permutation, i, j)
                    self.Moves.append(swapMove)

""" Represents the insertion of the element at IndexA at the new position IndexB for a given permutation (= solution). """
class InsertionMove:
    def __init__(self, initialPermutation, indexA, indexB):
        self.Permutation = [] # create a copy of the permutation
        self.IndexA = indexA
        self.IndexB = indexB

        for k in range(len(initialPermutation)):
            if k == indexA:
                continue

            self.Permutation.append(initialPermutation[k])

        self.Permutation.insert(indexB, initialPermutation[indexA])

""" Contains all $(n - 1)^2$ insertion moves for a given permutation (= solution). """
class InsertionNeighborhood(BaseNeighborhood):
    def __init__(self, inputData, initialPermutation, evaluationLogic, solutionPool):
        super().__init__(inputData, initialPermutation, evaluationLogic, solutionPool)

        self.Type = 'Insertion'

    def DiscoverMoves(self):
        for i in range(len(self.Permutation)):
            for j in range(len(self.Permutation)):
                if i == j or i == j + 1:
                    continue

                insertionMove = InsertionMove(self.Permutation, i, j)
                self.Moves.append(insertionMove)
                
BaseNeighborhood类图

6 优化程序的流程逻辑

ImprovementAlgorithm类图

我们的优化程序是基于邻域的,目前我只介绍了SwapNeighborhood这一中邻域类型。而在ImprovementAlgorithmus类中,我们可以使用多种邻域类型来创建邻域Neighborhood,并把这些邻域存入字典,键是邻域的类型,值是邻域的对象。最后在ImprovementAlgorithmus的子类IterativeImprovement中,用键把字典遍历即可得到solution。

"""这是多种不同优化算法的基本类,特定的优化算法可以继承这个基本类"""
class ImprovementAlgorithmus:
    def __init__(self, inputData, neighborhoodEvaluationStrategy = 'BestImprovement', neighborhoodTypes = ['Swap']):
        self.InputData = inputData  # neighborhoodTypes实际上有多种,也可以是Insertion,TwoEdgeExchange。。。
        self.EvaluationLogic = None
        self.SolutionPool = None
        self.RNG = None # 表示随机数

        self.NeighborhoodEvaluationStrategy = neighborhoodEvaluationStrategy
        self.NeighborhoodTypes = neighborhoodTypes
        self.Neighborhoods = {}

    def Initialize(self, evaluationLogic, solutionPool, rng = None):
        self.EvaluationLogic = evaluationLogic
        self.SolutionPool = solutionPool
        self.RNG = rng

    def InitializeNeighborhoods(self, solution): # 字典Neighborhood中,可能有两个键Insertion和Swap,添加新数据时,需要用到相应的构造器
        self.Neighborhoods['Swap'] = SwapNeighborhood(self.InputData, solution.Permutation, self.EvaluationLogic, self.SolutionPool)
        # add further neighborhoods 详见class ImprovementAlgorithm文件

现在,优化算法 ImprovementAlgorithm类包含了基于邻域的优化程序的所有基本组件。这些可以在不同的算法中使用。在这里,我们创建了一个 "迭代优化 "类IterativeImprovement,通过连续的局部搜索来迭代改进解决方案。

""" Iterative improvement algorithm through sequential variable neighborhood descent. """
class IterativeImprovement(ImprovementAlgorithm): # einzelne Nachbaren
    def __init__(self, inputData, neighborhoodEvaluationStrategy = 'BestImprovement', neighborhoodTypes = ['Swap']):
        super().__init__(inputData, neighborhoodEvaluationStrategy, neighborhoodTypes)

    def Run(self, solution): #startSolution ?
        self.InitializeNeighborhoods(solution)  # 初始化即构造对象  

        # According to "Hansen et al. (2017): Variable neighorhood search", this is equivalent to the 
        # sequential variable neighborhood descent with a pipe neighborhood change step.
        for neighborhoodType in self.NeighborhoodTypes:
            neighborhood = self.Neighborhoods[neighborhoodType]

            neighborhood.LocalSearch(self.NeighborhoodEvaluationStrategy, solution)
        
        return solution

这里Run函数的用法可以自己思考,前面讲过。InitializeNeighborhoods实际上会用当前参数中的solution往Neighborhoods字典中填充邻域对象。字典中有了内容我们才能针对不同类型的邻域执行localsearch

现在对Solver类进行扩展,增加ImprovementPhase()RunLocalSearch()两个函数

class Solver:
    def __init__(self, inputData, seed):
        
        self.InputData = inputData
        self.Seed = seed
        self.RNG = numpy.random.default_rng(seed)  #随机数生成器

        self.EvaluationLogic = EvaluationLogic(inputData) # 定语是El对象,可以调用类中的方法
        self.SolutionPool = SolutionPool()
        
        self.ConstructiveHeuristic = ConstructiveHeuristics(self.EvaluationLogic, self.SolutionPool)

    def ConstructionPhase(self, constructiveSolutionMethod):
        self.ConstructiveHeuristic.Run(self.InputData, constructiveSolutionMethod)

        bestInitalSolution = self.SolutionPool.GetLowestMakespanSolution()

        print("Constructive solution found.")
        print(bestInitalSolution)

        return bestInitalSolution


    def ImprovementPhase(self, startSolution, algorithm):
        algorithm.Initialize(self.EvaluationLogic, self.SolutionPool, self.RNG)
        bestSolution = algorithm.Run(startSolution)

        print("Best found Solution.")
        print(bestSolution)

    def RunLocalSearch(self, constructiveSolutionMethod, algorithm):
        startSolution = self.ConstructionPhase(constructiveSolutionMethod)

        self.ImprovementPhase(startSolution, algorithm)

*注意:

  • 在ImprovementPhase中,有个Initialize函数,这里对应的是IterativeImprovement类或其他子类的对象,可以调用他们父类的Initialize函数和子类中的Run函数。子类Run函数实质上就是把邻域进行局部搜索,并返回最优解。--> 其实这里已经是在RunLocalSearch了,那么为什么还要定义一个叫做RunLocalSearch的函数,并在这个函数中调用ImprovementPhase?
  • 原因在于,这样做我们在调用函数的时候能少写参数。例如,在RunLocalSearch中我们会首先构建startSolution,也就是启发式算法的方案,然后把这个方案作为参数输入到优化步骤--ImprovementPhase。编程要理解这种思想。

*总结:到目前为止,solver类的作用就是通过ConstructionPhase创建初始解决方案,然后把这个方案进行评估。评估时要用到两个函数,但是我们只需要调用RunLocalSearch这一个函数就够了

7 运行实例

from Solver import *
from ImprovementAlgorithm import *

data = InputData("InputFlowshopSIST.JSON")

solver = Solver(data, 2048)

improvementAlgorithm = IterativeImprovement(data)

solver.RunLocalSearch('FCFS', improvementAlgorithm)

*注意:操作流程就是先构建一个solver对象,然后用这个solver对象调用RunLocalSearch,第一个参数是启发式算法的名字,之前我们也是这样定义的;第二个参数是优化算法的对象或实例。在构造IterativeImprovement对象时,我们只有一个必选参数,也就是data(inputData),可选参数的值取决于我们想用哪种优化方式(best还是first),以及哪种邻域类型


*上图就是用FCFS产生初始方案,再用SwapNeighborhood和BestImprovement得出最优解的过程。

问题

到目前为止,我们已经了解了两个邻域的评估策略。

  • Best Improvement(广泛搜索):对邻域的所有交换进行评估。有最大改进的交换被执行。
  • First Improvement(选择性搜索):对一个邻域的交换进行评估,直到发现优化。一旦发现有改进,就会中断评估,并立即执行发现的交换。因此,更好的交换方案被忽略

你认为,采用哪种评价策略能取得更好的效果?两种评价策略的优势和劣势是什么?

from Solver import *

data = InputData("InputFlowshopSIST.json")

constructiveSolutionMethod = 'FCFS'

solverBestImprovement = Solver(data, 2048)
bestImprovementAlgorithm = IterativeImprovement(data, 'BestImprovement')
solverBestImprovement.RunLocalSearch(constructiveSolutionMethod, bestImprovementAlgorithm)

solverFirstImprovement = Solver(data, 2048)
bestImprovementAlgorithm = IterativeImprovement(data, 'FirstImprovement')
solverFirstImprovement.RunLocalSearch(constructiveSolutionMethod, bestImprovementAlgorithm)

localOptimumBestImprovement = solverBestImprovement.SolutionPool.GetLowestMakespanSolution()
localOptimumFirstImprovement = solverFirstImprovement.SolutionPool.GetLowestMakespanSolution()

print(f"Best improvement makespan: {localOptimumBestImprovement.Makespan}")
print(f"First improvement makespan: {localOptimumFirstImprovement.Makespan}")

isBestImprovementMakespanLower = localOptimumBestImprovement.Makespan < localOptimumFirstImprovement.Makespan

print(f"The best improvement strategy achieves a lower makespan: {isBestImprovementMakespanLower}")
对比两种评估方案
from Solver import *

data = InputData("InputFlowshopSIST.json")

improvementAlgorithm = IterativeImprovement(data, 'BestImprovement', ['Swap', 'Insertion'])

solver = Solver(data, 1024)

solver.RunLocalSearch('LPT', improvementAlgorithm)
对比Insertion和Swap
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容