Gekko解算器是基于梯度的非线性规划(NLP)解算器,可以找到局部极小点.有几个策略可以帮助Gekko找到全局最优.
下面是一个例子,可以帮助解决局部最小值与全局最小值这一重要主题.下面的脚本生成目标为951.0的(7,0,0)的局部(非全局)解.
from gekko import GEKKO
m = GEKKO(remote=False)
x = m.Array(m.Var,3,lb=0)
x1,x2,x3 = x
m.Minimize(1000-x1**2-2*x2**2-x3**2-x1*x2-x1*x3)
m.Equations([8*x1+14*x2+7*x3==56,
x1**2+x2**2+x3**2>=25])
m.solve(disp=False)
res=[print(f'x{i+1}: {xi.value[0]}') for i,xi in enumerate(x)]
print(f'Objective: {m.options.objfcnval:.2f}')
有基于梯度的全局优化方法,如BARON,遗传算法,模拟退火,等等,一个简单的方法是执行多起点方法与不同的初始条件(猜测)在网格搜索或智能地与贝叶斯方法更智能地搜索,如果初始猜测的数量是小的.
Multi-Start with Parallel Threading个
网格搜索很容易并行化,可以同时从多个位置开始.这里是相同的优化问题,其中全局解是通过并行gekko优化找到的.
import numpy as np
import threading
import time, random
from gekko import GEKKO
class ThreadClass(threading.Thread):
def __init__(self, id, xg):
s = self
s.id = id
s.m = GEKKO(remote=False)
s.xg = xg
s.objective = float('NaN')
# initialize variables
s.m.x = s.m.Array(s.m.Var,3,lb=0)
for i in range(3):
s.m.x[i].value = xg[i]
s.m.x1,s.m.x2,s.m.x3 = s.m.x
# Equations
s.m.Equation(8*s.m.x1+14*s.m.x2+7*s.m.x3==56)
s.m.Equation(s.m.x1**2+s.m.x2**2+s.m.x3**2>=25)
# Objective
s.m.Minimize(1000-s.m.x1**2-2*s.m.x2**2-s.m.x3**2
-s.m.x1*s.m.x2-s.m.x1*s.m.x3)
# Set solver option
s.m.options.SOLVER = 1
threading.Thread.__init__(s)
def run(self):
print('Running application ' + str(self.id) + '\n')
self.m.solve(disp=False,debug=0) # solve
# Retrieve objective if successful
if (self.m.options.APPSTATUS==1):
self.objective = self.m.options.objfcnval
else:
self.objective = float('NaN')
self.m.cleanup()
# Optimize at mesh points
x1_ = np.arange(0.0, 10.0, 3.0)
x2_ = np.arange(0.0, 10.0, 3.0)
x3_ = np.arange(0.0, 10.0, 3.0)
x1,x2,x3 = np.meshgrid(x1_,x2_,x3_)
threads = [] # Array of threads
# Load applications
id = 0
for i in range(x1.shape[0]):
for j in range(x1.shape[1]):
for k in range(x1.shape[2]):
xg = (x1[i,j,k],x2[i,j,k],x3[i,j,k])
# Create new thread
threads.append(ThreadClass(id, xg))
# Increment ID
id += 1
# Run applications simultaneously as multiple threads
# Max number of threads to run at once
max_threads = 8
for t in threads:
while (threading.activeCount()>max_threads):
# check for additional threads every 0.01 sec
time.sleep(0.01)
# start the thread
t.start()
# Check for completion
mt = 10.0 # max time (sec)
it = 0.0 # time counter
st = 1.0 # sleep time (sec)
while (threading.active_count()>=3):
time.sleep(st)
it = it + st
print('Active Threads: ' + str(threading.active_count()))
# Terminate after max time
if (it>=mt):
break
# Initialize array for objective
obj = np.empty_like(x1)
# Retrieve objective results
id = 0
id_best = 0; obj_best = 1e10
for i in range(x1.shape[0]):
for j in range(x1.shape[1]):
for k in range(x1.shape[2]):
obj[i,j,k] = threads[id].objective
if obj[i,j,k]<obj_best:
id_best = id
obj_best = obj[i,j,k]
id += 1
print(obj)
print(f'Best objective {obj_best}')
print(f'Solution {threads[id_best].m.x}')
Bayesian Optimization
另一种方法是通过将初始条件映射到优化解决方案的性能来智能搜索.它搜索的领域是它期望最佳性能或尚未测试的领域,不确定性很高.
from gekko import GEKKO
from hyperopt import fmin, tpe, hp
from hyperopt import STATUS_OK, STATUS_FAIL
# Define the search space for the hyperparameters
space = {'x1': hp.quniform('x1', 0, 10, 3),
'x2': hp.quniform('x2', 0, 10, 3),
'x3': hp.quniform('x3', 0, 10, 3)}
def objective(params):
m = GEKKO(remote=False)
x = m.Array(m.Var,3,lb=0)
x1,x2,x3 = x
x1.value = params['x1']
x2.value = params['x2']
x3.value = params['x3']
m.Minimize(1000-x1**2-2*x2**2-x3**2-x1*x2-x1*x3)
m.Equations([8*x1+14*x2+7*x3==56,
x1**2+x2**2+x3**2>=25])
m.options.SOLVER = 1
m.solve(disp=False,debug=False)
obj = m.options.objfcnval
if m.options.APPSTATUS==1:
s=STATUS_OK
else:
s=STATUS_FAIL
m.cleanup()
return {'loss':obj, 'status': s, 'x':x}
best = fmin(objective, space, algo=tpe.suggest, max_evals=50)
sol = objective(best)
print(f"Solution Status: {sol['status']}")
print(f"Objective: {sol['loss']:.2f}")
print(f"Solution: {sol['x']}")
两种多起点方法都能找到全局解决方案:
Objective: 936.00
Solution: [[0.0] [0.0] [8.0]]
如果确定等式约束总是产生全局最优值,则还可以从不等式约束切换到强制约束边界.
关于这些多启动方法的更多信息请参见Global Optimization and Solver Tuning页上的Engineering Optimization course.