使用类 island#

../_images/island_no_text.png

island 类是 pagmo 的并行化单元块。其理念是,在 pygmo 中,island 是一个可以物理上位于任何位置的计算单元,因此可以用来外包对 evolve() 的调用,从而将主计算单元从任务中卸载。因此,它能够根据 UDI(用户定义岛屿)的实现细节,在单独的线程、进程或远程机器上运行进化,其类型被擦除。类似于 algorithm 根据 UDA 进化 population,以及问题根据 UDP 计算其适应度。

非高级用户不需要实现自己的UDI,因为我们已经在pygmo提供的UDI中编码了最流行的并行任务执行范式,可以在岛屿列表中找到这些UDI的列表。

注意

一组pygmo.island形成一个archipelago,如果你对pygmo为你通过archipelago并行化任务的默认选择感到满意,你可以跳过本教程,直接跟随教程“使用类archipelago”。

我们首先实例化一个岛屿。

>>> import pygmo as pg
>>> isl = pg.island(algo = pg.de(10), prob = pg.ackley(5), size=20, udi=pg.thread_island())

无论udi参数如何,如果构建成功,将打开一个新线程并委托其运行evolve()方法。在这种情况下,我们构建了指定udi的岛屿,特别是我们选择了thread_island,因此种群进化将直接在打开的线程上进行。

注意

由于进化(优化任务)需要评估problem适应度函数和执行algorithm逻辑,因此这两者都必须标记为线程安全,以便thread_island能够运行进化。

>>> isl.evolve()

通过这个简单的命令,我们运行进化过程,由于任务被卸载到不同的线程,我们当前的python会话或脚本不受影响,因此我们可以同时运行其他事情。

考虑以下脚本,例如:

>>> islands = [pg.island(algo = pg.de(gen = 1000, F=effe, CR=cross), prob = pg.rosenbrock(10), size=20, seed=32) for effe in [0.3,0.5,0.7,0.9] for cross in [0.3,0.5,0.7,0.9]]
>>> _ = [isl.evolve() for isl in islands] 
>>> _ = [isl.wait() for isl in islands]
../_images/ros_10_on_16_isl.png

在上述三行代码中,我们并行运行了相同的优化任务,使用了16个差分进化算法(de),并为其交叉和F参数设置了不同的值。由于在岛屿构造函数中指定了seed,相同的种群将被输入到不同的算法实例中。

注意

由于没有指定udi参数,island构造函数将使用其内部启发式方法为我们选择岛屿类型(参见文档)。在这种情况下,由于问题和算法都是线程安全的,将选择thread_island(可以通过调用__repr__()方法来验证)。

警告

这种使用island的方式,本质上是在复制archipelago的一些功能。虽然了解pygmo机制很有趣,因此在本教程的背景下非常有用,但我们不鼓励这种类型的脚本编写,并鼓励使用archipelago(无迁移)来代替。

重复相同的计算数百次,我们可以得到右边的箱线图,其中找到了算法和此任务的最佳参数。id0: F = 0.3, CR = 0.3, id11: F = 0.7, CR = 0.9, id15: F = 0.9, CR = 0.9。为了方便起见,生成了右边图的脚本。

>>> res = []
>>> for i in range(100):
...      islands = [pg.island(algo = pg.de(gen = 1000, F=effe, CR=cross), prob = pg.rosenbrock(10), size=20, seed=32) for effe in [0.3,0.5,0.7,0.9] for cross in [0.3,0.5,0.7,0.9]]
...      _ = [isl.evolve() for isl in islands] 
...      _ = [isl.wait() for isl in islands]
...      res.append([isl.get_population().champion_f[0] for isl in islands])
>>> import seaborn as sns 
>>> import pandas as pd 
>>> import matplotlib.pyplot as plt 
>>> sns.boxplot(pd.DataFrame(res)) 
>>> ylim = plt.ylim([0,20]) 
>>> plt.title("16 instances of DE on the Rosenbrock 10 problem") 
>>> plt.ylabel("obj. fun.") 

管理异常#

如果在发送到island的优化任务期间发生异常,会发生什么?island有可能通过调用其方法wait_check()在主线程中重新抛出第一个遇到的异常。例如,假设你的问题或算法以某种方式抛出异常(我知道,这有多大可能,对吧?)。为了说明这种情况,我们将使用以下UDP:

>>> class raise_exception:
...     def __init__(self):
...         self.counter=0;
...     def fitness(self,dv):
...         if self.counter == 300:
...             raise ValueError("Ops!")
...         self.counter += 1
...         return [self.counter]
...     def get_bounds(self):
...         return ([0],[1])
...     def get_name(self):
...         return "A throwing UDP"

我们构建了一个岛屿:

>>> isl = pg.island(algo = pg.de(100), prob = raise_exception(), size=20, udi=pg.ipyparallel_island())

注意

我们明确地将udi参数传递给island构造函数,并选择了一个ipyparallel岛。 这需要启动一个集群,例如输入命令ipcluster start

这种构造将触发\(20\)次函数评估,因此不会抛出异常。现在让我们运行一个进化过程:

>>> isl.evolve()
>>> isl.wait()

一切看起来都很好:在我们的线程中,没有真正发生任何事情,也没有抛出任何异常。但是,如果我们检查岛屿,我们会得到:

>>> print(isl) 
Island name: Ipyparallel island
    Status: idle - **error occurred**

Extra info:
    Queue status:

    (unassigned, 0)
    (0, {'queue': 0, 'completed': ..., 'tasks': 0})
    (1, {'queue': 0, 'completed': ..., 'tasks': 0})
    (2, {'queue': 0, 'completed': ..., 'tasks': 0})
    (3, {'queue': 0, 'completed': ..., 'tasks': 0})
    (4, {'queue': 0, 'completed': ..., 'tasks': 0})
    (5, {'queue': 0, 'completed': ..., 'tasks': 0})
    (6, {'queue': 0, 'completed': ..., 'tasks': 0})
    (7, {'queue': 0, 'completed': ..., 'tasks': 0})

Algorithm: Differential Evolution

Problem: A throwing UDP

Population size: 20
    Champion decision vector: ...
    Champion fitness: ...

发生了什么?我需要找回那条消息!

>>> isl.wait_check() 
Traceback (most recent call last):
  File "/Users/darioizzo/miniconda3/envs/pagmo/lib/python3.6/doctest.py", line 1330, in __run
    compileflags, 1), test.globs)
  File "<doctest default[0]>", line 1, in <module>
    isl.wait_check()
RuntimeError: The asynchronous evolution of a pythonic island of type 'Ipyparallel island' raised an error:
Traceback (most recent call last):
  File "/Users/darioizzo/.local/lib/python3.6/site-packages/pygmo/_py_islands.py", line 403, in run_evolve
    return ret.get()
  File "/Users/darioizzo/miniconda3/envs/pagmo/lib/python3.6/site-packages/ipyparallel/client/asyncresult.py", line 167, in get
    raise self.exception()
  File "/Users/darioizzo/miniconda3/envs/pagmo/lib/python3.6/site-packages/ipyparallel/client/asyncresult.py", line 224, in _resolve_result
    raise r
ipyparallel.error.RemoteError: ValueError(Ops!)