在OpenBox中扩展算法¶
本指南将教你如何在OpenBox中扩展算法。
你可以为贝叶斯优化顾问实现一个新的代理模型、一个新的采集函数或一个新的采集函数最大化器,或者使用一个全新的算法实现一个顾问。
扩展贝叶斯优化顾问¶
贝叶斯优化顾问的工作流程¶
让我们从理解贝叶斯优化顾问的工作流程开始。
在每次迭代中,调用Advisor.get_suggestion()来生成一个新的配置。
以下是get_suggestion()中的主要步骤:
def get_suggestion(self, ...):
self.surrogate_model.train(...)
self.acquisition_function.update(...)
challengers = self.acq_optimizer.maximize(...)
...
首先,使用最新数据训练代理模型。
其次,使用代理模型和附加信息更新获取函数。
第三,采集函数最大化器采样点以最大化采集函数。
请参考openbox/core/generic_advisor.py了解更多详情。
实现一个新的代理模型¶
要实现一个新的代理模型,继承AbstractModel类。
需要实现_train()和_predict()方法。
对于_train(),代理模型应使用最新数据进行更新。
对于_predict(),给定一批新数据,该方法应返回该批数据的预测均值和方差。
请参考 openbox/surrogate/base/base_model.py 了解更多详情。
这是一个使用 scikit-learn 中的 RandomForest 实现代理的示例:
import typing
import numpy as np
from sklearn.ensemble import RandomForestRegressor
import threading
from joblib import Parallel, delayed
from sklearn.utils.fixes import _joblib_parallel_args
from sklearn.utils.validation import check_is_fitted
from sklearn.ensemble._base import _partition_estimators
from openbox.surrogate.base.base_model import AbstractModel
def _collect_prediction(predict, X, out, lock):
"""
This is a utility function for joblib's Parallel.
It can't go locally in ForestClassifier or ForestRegressor, because joblib
complains that it cannot pickle it when placed there.
"""
prediction = predict(X, check_input=False)
with lock:
out.append(prediction)
class skRandomForestWithInstances(AbstractModel):
def __init__(self, types: np.ndarray,
bounds: typing.List[typing.Tuple[float, float]],
num_trees: int=10,
do_bootstrapping: bool=True,
n_points_per_tree: int=-1,
ratio_features: float=5. / 6.,
min_samples_split: int=3,
min_samples_leaf: int=3,
max_depth: int=2**20,
eps_purity: float=1e-8,
max_num_nodes: int=2**20,
seed: int=42,
n_jobs: int=None,
**kwargs):
"""
Parameters
----------
types : np.ndarray (D)
Specifies the number of categorical values of an input dimension where
the i-th entry corresponds to the i-th input dimension. Let's say we
have 2 dimension where the first dimension consists of 3 different
categorical choices and the second dimension is continuous than we
have to pass np.array([2, 0]). Note that we count starting from 0.
bounds : list
Specifies the bounds for continuous features.
num_trees : int
The number of trees in the random forest.
do_bootstrapping : bool
Turns on / off bootstrapping in the random forest.
n_points_per_tree : int
Number of points per tree. If <= 0 X.shape[0] will be used
in _train(X, y) instead
ratio_features : float
The ratio of features that are considered for splitting.
min_samples_split : int
The minimum number of data points to perform a split.
min_samples_leaf : int
The minimum number of data points in a leaf.
max_depth : int
The maximum depth of a single tree.
eps_purity : float
The minimum difference between two target values to be considered
different
max_num_nodes : int
The maxmimum total number of nodes in a tree
seed : int
The seed that is passed to the random_forest_run library.
n_jobs : int, default=None
The number of jobs to run in parallel. :meth:`fit`, :meth:`predict`,
:meth:`decision_path` and :meth:`apply` are all parallelized over the
trees. ``None`` means 1 unless in a :obj:`joblib.parallel_backend`
context. ``-1`` means using all processors. See :term:`Glossary
<n_jobs>` for more details.
"""
super().__init__(types, bounds, **kwargs)
self.rng = np.random.RandomState(seed)
self.num_trees = num_trees
self.do_bootstrapping = do_bootstrapping
max_features = None if ratio_features > 1.0 else \
int(max(1, types.shape[0] * ratio_features))
self.max_features = max_features
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.max_depth = max_depth
self.epsilon_purity = eps_purity
self.max_num_nodes = max_num_nodes
self.n_points_per_tree = n_points_per_tree
self.n_jobs = n_jobs
self.rf = None # type: RandomForestRegressor
def _train(self, X: np.ndarray, y: np.ndarray):
"""Trains the random forest on X and y.
Parameters
----------
X : np.ndarray [n_samples, n_features]
Input data points.
Y : np.ndarray [n_samples, ]
The corresponding target values.
Returns
-------
self
"""
self.X = X
self.y = y.flatten()
if self.n_points_per_tree <= 0:
self.num_data_points_per_tree = self.X.shape[0]
else:
self.num_data_points_per_tree = self.n_points_per_tree
self.rf = RandomForestRegressor(
n_estimators=self.num_trees,
max_depth=self.max_depth,
min_samples_split=self.min_samples_split,
min_samples_leaf=self.min_samples_leaf,
max_features=self.max_features,
max_samples=self.num_data_points_per_tree,
max_leaf_nodes=self.max_num_nodes,
min_impurity_decrease=self.epsilon_purity,
bootstrap=self.do_bootstrapping,
n_jobs=self.n_jobs,
random_state=self.rng,
)
self.rf.fit(self.X, self.y)
return self
def _predict(self, X: np.ndarray) -> typing.Tuple[np.ndarray, np.ndarray]:
"""Predict means and variances for given X.
Parameters
----------
X : np.ndarray of shape = [n_samples, n_features]
Returns
-------
means : np.ndarray of shape = [n_samples, 1]
Predictive mean
vars : np.ndarray of shape = [n_samples, 1]
Predictive variance
"""
if len(X.shape) != 2:
raise ValueError(
'Expected 2d array, got %dd array!' % len(X.shape))
if X.shape[1] != self.types.shape[0]:
raise ValueError('Rows in X should have %d entries but have %d!' % (self.types.shape[0], X.shape[1]))
check_is_fitted(self.rf)
# Check data
if X.ndim == 1:
X = X.reshape((1, -1))
X = self.rf._validate_X_predict(X)
# Assign chunk of trees to jobs
n_jobs, _, _ = _partition_estimators(self.rf.n_estimators, self.rf.n_jobs)
# collect the output of every estimator
all_y_preds = list()
# Parallel loop
lock = threading.Lock()
Parallel(n_jobs=n_jobs, verbose=self.rf.verbose,
**_joblib_parallel_args(require="sharedmem"))(
delayed(_collect_prediction)(e.predict, X, all_y_preds, lock)
for e in self.rf.estimators_)
all_y_preds = np.asarray(all_y_preds, dtype=np.float64)
means = np.mean(all_y_preds, axis=0)
vars_ = np.var(all_y_preds, axis=0)
return means.reshape((-1, 1)), vars_.reshape((-1, 1))
实现一个新的获取函数¶
要实现一个新的获取函数,继承AbstractAcquisitionFunction类。
需要实现_compute()方法,该方法计算新一批点的获取函数值。
调用self.model.predict_marginalized_over_instances(X)从代理模型中获取预测的均值和方差。
请参考 openbox/acquisition_function/acquisition.py 了解更多详情。
以下是改进概率(PI)获取函数的示例:
import numpy as np
from scipy.stats import norm
from openbox.acquisition_function import AbstractAcquisitionFunction
class PI(AbstractAcquisitionFunction):
def __init__(self,
model,
par: float = 0.0,
**kwargs):
r"""Computes the probability of improvement for a given x over the best so far value as
acquisition value.
:math:`P(f_{t+1}(\mathbf{X})\geq f(\mathbf{X^+})) :=
\Phi(\frac{\mu(\mathbf{X}) - f(\mathbf{X^+})}{\sigma(\mathbf{X})})`,
with :math:`f(X^+)` as the incumbent and :math:`\Phi` the cdf of the standard normal
Parameters
----------
model : AbstractEPM
A surrogate that implements at least
- predict_marginalized_over_instances(X)
par : float, default=0.0
Controls the balance between exploration and exploitation of the
acquisition function.
"""
super(PI, self).__init__(model)
self.long_name = 'Probability of Improvement'
self.par = par
self.eta = None
def _compute(self, X: np.ndarray, **kwargs):
"""Computes the PI value.
Parameters
----------
X: np.ndarray(N, D)
Points to evaluate PI. N is the number of points and D the dimension for the points
Returns
-------
np.ndarray(N, 1)
PI of X
"""
if self.eta is None:
raise ValueError('No current best specified. Call update('
'eta=<float>) to inform the acquisition function '
'about the current best value.')
if len(X.shape) == 1:
X = X[:, np.newaxis]
m, var_ = self.model.predict_marginalized_over_instances(X)
std = np.sqrt(var_)
return norm.cdf((self.eta - m - self.par) / std)
实现一个新的获取函数最大化器¶
要实现一个新的获取函数最大化器,继承AcquisitionFunctionMaximizer类。
需要实现_maximize()方法,该方法返回一个由获取函数值和配置组成的可迭代元组。
请参考openbox/acq_optimizer/basic_maximizer.py获取更多详细信息。
使用您自己的组件修改Advisor¶
要使用您自己的代理模型/采集函数/采集函数最大化器,只需替换Advisor中的属性如下:
from openbox import Advisor
advisor = Advisor(...)
advisor.surrogate_model = MySurrogateModel(...)
advisor.acquisition_function = MyAcquisitionFunction(...)
advisor.acq_optimizer = MyAcquisitionFunctionMaximizer(...)
这是使用自定义组件的最快方法。
要将新算法集成到贝叶斯优化Advisor中,
请修改Advisor的初始化过程。
实现一个新的顾问¶
顾问被设计为一个配置生成器。
在每次迭代中,调用advisor.get_suggestion()来生成一个新的配置。
评估后,通过advisor.update_observation()将Observation更新到顾问中。
要实现一个新的顾问,继承BaseAdvisor类。
在大多数情况下,实现get_suggestion()方法就足够了。
此方法应返回一个要评估的新配置。
在更新历史数据期间执行额外操作,请修改advisor中的update_observation()。
要在并行设置中一次建议多个配置,请实现 get_suggestions() 代替。
这是一个没有重复检查的简单随机顾问的示例:
from openbox.core.base_advisor import BaseAdvisor
class RandomAdvisor(BaseAdvisor):
def get_suggestion(self):
config = self.config_space.sample_configuration()
return config
这里是一个使用BoTorch实现的多目标EHVI顾问示例:
import warnings
from typing import List
import numpy as np
from ConfigSpace import UniformFloatHyperparameter, UniformIntegerHyperparameter, Configuration
import torch
from botorch.models.gp_regression import SingleTaskGP
from botorch.models.model_list_gp_regression import ModelListGP
from botorch.models.transforms.outcome import Standardize
from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood
from botorch.utils.transforms import unnormalize, normalize
from botorch.utils.sampling import draw_sobol_samples
from botorch.optim.optimize import optimize_acqf
from botorch.utils.multi_objective.box_decompositions.non_dominated import (
FastNondominatedPartitioning,
)
from botorch.acquisition.multi_objective.monte_carlo import (
qExpectedHypervolumeImprovement,
)
from botorch import fit_gpytorch_mll
from botorch.exceptions import BadInitialCandidatesWarning
from botorch.sampling.normal import SobolQMCNormalSampler
from openbox import Observation, logger
from openbox.core.base_advisor import BaseAdvisor
from openbox.utils.config_space.space_utils import get_config_from_dict
class BoTorchEHVIAdvisor(BaseAdvisor):
"""
An Advisor using BoTorch's qEHVI acquisition function.
Caution: BoTorch maximizes the objectives.
"""
def __init__(
self,
config_space,
num_objectives: int,
ref_point,
init_num=-1,
NUM_RESTARTS=10,
RAW_SAMPLES=512,
MC_SAMPLES=128,
output_dir='logs',
task_id='BoTorchEHVI',
seed=None,
logger_kwargs: dict = None,
):
assert num_objectives > 1
for hp in config_space.get_hyperparameters():
assert isinstance(hp, (UniformFloatHyperparameter, UniformIntegerHyperparameter))
assert ref_point is not None
super().__init__(
config_space=config_space,
num_objectives=num_objectives,
num_constraints=0,
ref_point=ref_point,
output_dir=output_dir,
task_id=task_id,
random_state=seed,
logger_kwargs=logger_kwargs,
)
warnings.filterwarnings("ignore", category=BadInitialCandidatesWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)
# random seed
np.random.seed(seed)
torch.manual_seed(seed)
self.tkwargs = {
"dtype": torch.double,
"device": torch.device("cuda" if torch.cuda.is_available() else "cpu"),
}
self.NUM_RESTARTS = NUM_RESTARTS
self.RAW_SAMPLES = RAW_SAMPLES
self.MC_SAMPLES = MC_SAMPLES
self.problem_dim = len(config_space.get_hyperparameters())
self.standard_bounds = torch.zeros(2, self.problem_dim, **self.tkwargs)
self.standard_bounds[1] = 1
self.problem_bounds = self.get_problem_bounds()
# Caution: BoTorch maximizes the objectives
self.botorch_ref_point = -1.0 * torch.tensor(ref_point, **self.tkwargs)
# initial design
self.init_num = init_num if init_num > 0 else 2 * (self.problem_dim + 1)
logger.info(f'init_num: {self.init_num}')
self.init_configs = self.generate_initial_configs(self.init_num)
def get_suggestion(self):
n_history = len(self.history)
if n_history < self.init_num:
logger.info(f'Initial iter {n_history + 1}/{self.init_num}')
return self.init_configs[n_history]
X = self.history.get_config_array(transform='numerical')
Y = self.history.get_objectives(transform='failed')
train_x = torch.tensor(X, **self.tkwargs) # train_x is not normalized
train_obj = -1.0 * torch.tensor(Y, **self.tkwargs) # Caution: BoTorch maximizes the objectives
# reinitialize the models so they are ready for fitting on next iteration
# Note: we find improved performance from not warm starting the model hyperparameters
# using the hyperparameters from the previous iteration
mll, model = self.initialize_model(train_x, train_obj)
# fit the models
fit_gpytorch_mll(mll)
# define the acquisition modules using a QMC sampler
qehvi_sampler = SobolQMCNormalSampler(sample_shape=torch.Size([self.MC_SAMPLES]))
# optimize acquisition functions and get new candidate
new_x = self.optimize_qehvi(
model, train_x, train_obj, qehvi_sampler
)
config = self.tensor2configs(new_x)[0]
logger.info(f'Get suggestion. new_x: {new_x}, config: {config}')
return config
def update_observation(self, observation: Observation):
logger.info(f'Update observation: {observation}')
return super().update_observation(observation)
def generate_initial_configs(self, init_num):
x = draw_sobol_samples(bounds=self.problem_bounds, n=init_num, q=1).squeeze(1)
configs = self.tensor2configs(x)
return configs
def initialize_model(self, train_x, train_obj):
# define models for objective
train_x = normalize(train_x, self.problem_bounds)
models = []
for i in range(train_obj.shape[-1]):
train_y = train_obj[..., i: i + 1]
model = SingleTaskGP(train_x, train_y, outcome_transform=Standardize(m=1))
models.append(model)
model = ModelListGP(*models)
mll = SumMarginalLogLikelihood(model.likelihood, model)
return mll, model
def optimize_qehvi(self, model, train_x, train_obj, sampler):
"""Optimizes the qEHVI acquisition function, and returns a new candidate."""
# partition non-dominated space into disjoint rectangles
with torch.no_grad():
pred = model.posterior(normalize(train_x, self.problem_bounds)).mean
partitioning = FastNondominatedPartitioning(
ref_point=self.botorch_ref_point,
Y=pred,
)
acq_func = qExpectedHypervolumeImprovement(
model=model,
ref_point=self.botorch_ref_point,
partitioning=partitioning,
sampler=sampler,
)
# optimize
candidates, _ = optimize_acqf(
acq_function=acq_func,
bounds=self.standard_bounds,
q=1,
num_restarts=self.NUM_RESTARTS,
raw_samples=self.RAW_SAMPLES, # used for intialization heuristic
options={"batch_limit": 5, "maxiter": 200},
sequential=True,
)
# observe new values
new_x = unnormalize(candidates.detach(), bounds=self.problem_bounds)
return new_x
def get_problem_bounds(self):
bounds = []
for hp in self.config_space.get_hyperparameters():
bounds.append((hp.lower, hp.upper))
bounds = torch.tensor(bounds, **self.tkwargs).transpose(0, 1)
return bounds
def tensor2configs(self, x: torch.Tensor) -> List[Configuration]:
"""
Convert x (torch tensor) to a list of Configurations
x should be unnormalized.
"""
assert x.dim() == 2, f'Expected 2-d tensor, got {x.dim()}'
configs = []
for i in range(x.shape[0]):
config_dict = dict()
for j, hp in enumerate(self.config_space.get_hyperparameters()):
config_dict[hp.name] = x[i, j].item()
config = get_config_from_dict(self.config_space, config_dict)
configs.append(config)
return configs
if __name__ == '__main__':
import matplotlib.pyplot as plt
from openbox.benchmark.objective_functions.synthetic import BraninCurrin
problem = BraninCurrin()
advisor = BoTorchEHVIAdvisor(
config_space=problem.config_space,
num_objectives=problem.num_objectives,
ref_point=problem.ref_point,
init_num=-1,
output_dir='logs',
task_id='BoTorchEHVI',
seed=1234,
)
n_iter = 20
for i in range(n_iter):
logger.info(f'=== Iteration {i + 1}/{n_iter} ===')
config = advisor.get_suggestion()
result = problem(config)
observation = Observation(config=config, objectives=result['objectives'])
advisor.update_observation(observation)
history = advisor.get_history()
history.plot_hypervolumes(optimal_hypervolume=problem.max_hv, logy=True)
plt.show()