在OpenBox中扩展算法

本指南将教你如何在OpenBox中扩展算法。

你可以为贝叶斯优化顾问实现一个新的代理模型、一个新的采集函数或一个新的采集函数最大化器,或者使用一个全新的算法实现一个顾问。

扩展贝叶斯优化顾问

贝叶斯优化顾问的工作流程

让我们从理解贝叶斯优化顾问的工作流程开始。

在每次迭代中,调用Advisor.get_suggestion()来生成一个新的配置。 以下是get_suggestion()中的主要步骤:

def get_suggestion(self, ...):
    self.surrogate_model.train(...)
    self.acquisition_function.update(...)
    challengers = self.acq_optimizer.maximize(...)
    ...

首先,使用最新数据训练代理模型。

其次,使用代理模型和附加信息更新获取函数。

第三,采集函数最大化器采样点以最大化采集函数。

请参考openbox/core/generic_advisor.py了解更多详情。

实现一个新的代理模型

要实现一个新的代理模型,继承AbstractModel类。 需要实现_train()_predict()方法。

对于_train(),代理模型应使用最新数据进行更新。

对于_predict(),给定一批新数据,该方法应返回该批数据的预测均值和方差。

请参考 openbox/surrogate/base/base_model.py 了解更多详情。

这是一个使用 scikit-learn 中的 RandomForest 实现代理的示例:

import typing

import numpy as np
from sklearn.ensemble import RandomForestRegressor
import threading
from joblib import Parallel, delayed
from sklearn.utils.fixes import _joblib_parallel_args
from sklearn.utils.validation import check_is_fitted
from sklearn.ensemble._base import _partition_estimators

from openbox.surrogate.base.base_model import AbstractModel


def _collect_prediction(predict, X, out, lock):
    """
    This is a utility function for joblib's Parallel.

    It can't go locally in ForestClassifier or ForestRegressor, because joblib
    complains that it cannot pickle it when placed there.
    """
    prediction = predict(X, check_input=False)
    with lock:
        out.append(prediction)


class skRandomForestWithInstances(AbstractModel):
    def __init__(self, types: np.ndarray,
                 bounds: typing.List[typing.Tuple[float, float]],
                 num_trees: int=10,
                 do_bootstrapping: bool=True,
                 n_points_per_tree: int=-1,
                 ratio_features: float=5. / 6.,
                 min_samples_split: int=3,
                 min_samples_leaf: int=3,
                 max_depth: int=2**20,
                 eps_purity: float=1e-8,
                 max_num_nodes: int=2**20,
                 seed: int=42,
                 n_jobs: int=None,
                 **kwargs):
        """
        Parameters
        ----------
        types : np.ndarray (D)
            Specifies the number of categorical values of an input dimension where
            the i-th entry corresponds to the i-th input dimension. Let's say we
            have 2 dimension where the first dimension consists of 3 different
            categorical choices and the second dimension is continuous than we
            have to pass np.array([2, 0]). Note that we count starting from 0.
        bounds : list
            Specifies the bounds for continuous features.
        num_trees : int
            The number of trees in the random forest.
        do_bootstrapping : bool
            Turns on / off bootstrapping in the random forest.
        n_points_per_tree : int
            Number of points per tree. If <= 0 X.shape[0] will be used
            in _train(X, y) instead
        ratio_features : float
            The ratio of features that are considered for splitting.
        min_samples_split : int
            The minimum number of data points to perform a split.
        min_samples_leaf : int
            The minimum number of data points in a leaf.
        max_depth : int
            The maximum depth of a single tree.
        eps_purity : float
            The minimum difference between two target values to be considered
            different
        max_num_nodes : int
            The maxmimum total number of nodes in a tree
        seed : int
            The seed that is passed to the random_forest_run library.
        n_jobs : int, default=None
            The number of jobs to run in parallel. :meth:`fit`, :meth:`predict`,
            :meth:`decision_path` and :meth:`apply` are all parallelized over the
            trees. ``None`` means 1 unless in a :obj:`joblib.parallel_backend`
            context. ``-1`` means using all processors. See :term:`Glossary
            <n_jobs>` for more details.
        """
        super().__init__(types, bounds, **kwargs)

        self.rng = np.random.RandomState(seed)

        self.num_trees = num_trees
        self.do_bootstrapping = do_bootstrapping
        max_features = None if ratio_features > 1.0 else \
            int(max(1, types.shape[0] * ratio_features))
        self.max_features = max_features
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.max_depth = max_depth
        self.epsilon_purity = eps_purity
        self.max_num_nodes = max_num_nodes

        self.n_points_per_tree = n_points_per_tree
        self.n_jobs = n_jobs

        self.rf = None  # type: RandomForestRegressor

    def _train(self, X: np.ndarray, y: np.ndarray):
        """Trains the random forest on X and y.

        Parameters
        ----------
        X : np.ndarray [n_samples, n_features]
            Input data points.
        Y : np.ndarray [n_samples, ]
            The corresponding target values.

        Returns
        -------
        self
        """

        self.X = X
        self.y = y.flatten()

        if self.n_points_per_tree <= 0:
            self.num_data_points_per_tree = self.X.shape[0]
        else:
            self.num_data_points_per_tree = self.n_points_per_tree
        self.rf = RandomForestRegressor(
            n_estimators=self.num_trees,
            max_depth=self.max_depth,
            min_samples_split=self.min_samples_split,
            min_samples_leaf=self.min_samples_leaf,
            max_features=self.max_features,
            max_samples=self.num_data_points_per_tree,
            max_leaf_nodes=self.max_num_nodes,
            min_impurity_decrease=self.epsilon_purity,
            bootstrap=self.do_bootstrapping,
            n_jobs=self.n_jobs,
            random_state=self.rng,
        )
        self.rf.fit(self.X, self.y)
        return self

    def _predict(self, X: np.ndarray) -> typing.Tuple[np.ndarray, np.ndarray]:
        """Predict means and variances for given X.

        Parameters
        ----------
        X : np.ndarray of shape = [n_samples, n_features]

        Returns
        -------
        means : np.ndarray of shape = [n_samples, 1]
            Predictive mean
        vars : np.ndarray  of shape = [n_samples, 1]
            Predictive variance
        """
        if len(X.shape) != 2:
            raise ValueError(
                'Expected 2d array, got %dd array!' % len(X.shape))
        if X.shape[1] != self.types.shape[0]:
            raise ValueError('Rows in X should have %d entries but have %d!' % (self.types.shape[0], X.shape[1]))

        check_is_fitted(self.rf)
        # Check data
        if X.ndim == 1:
            X = X.reshape((1, -1))
        X = self.rf._validate_X_predict(X)

        # Assign chunk of trees to jobs
        n_jobs, _, _ = _partition_estimators(self.rf.n_estimators, self.rf.n_jobs)

        # collect the output of every estimator
        all_y_preds = list()

        # Parallel loop
        lock = threading.Lock()
        Parallel(n_jobs=n_jobs, verbose=self.rf.verbose,
                 **_joblib_parallel_args(require="sharedmem"))(
            delayed(_collect_prediction)(e.predict, X, all_y_preds, lock)
            for e in self.rf.estimators_)
        all_y_preds = np.asarray(all_y_preds, dtype=np.float64)

        means = np.mean(all_y_preds, axis=0)
        vars_ = np.var(all_y_preds, axis=0)

        return means.reshape((-1, 1)), vars_.reshape((-1, 1))

实现一个新的获取函数

要实现一个新的获取函数,继承AbstractAcquisitionFunction类。 需要实现_compute()方法,该方法计算新一批点的获取函数值。 调用self.model.predict_marginalized_over_instances(X)从代理模型中获取预测的均值和方差。

请参考 openbox/acquisition_function/acquisition.py 了解更多详情。

以下是改进概率(PI)获取函数的示例:

import numpy as np
from scipy.stats import norm
from openbox.acquisition_function import AbstractAcquisitionFunction

class PI(AbstractAcquisitionFunction):
    def __init__(self,
                 model,
                 par: float = 0.0,
                 **kwargs):

        r"""Computes the probability of improvement for a given x over the best so far value as
        acquisition value.

        :math:`P(f_{t+1}(\mathbf{X})\geq f(\mathbf{X^+})) :=
        \Phi(\frac{\mu(\mathbf{X}) - f(\mathbf{X^+})}{\sigma(\mathbf{X})})`,
        with :math:`f(X^+)` as the incumbent and :math:`\Phi` the cdf of the standard normal

        Parameters
        ----------
        model : AbstractEPM
            A surrogate that implements at least
                 - predict_marginalized_over_instances(X)
        par : float, default=0.0
            Controls the balance between exploration and exploitation of the
            acquisition function.
        """
        super(PI, self).__init__(model)
        self.long_name = 'Probability of Improvement'
        self.par = par
        self.eta = None

    def _compute(self, X: np.ndarray, **kwargs):
        """Computes the PI value.

        Parameters
        ----------
        X: np.ndarray(N, D)
           Points to evaluate PI. N is the number of points and D the dimension for the points

        Returns
        -------
        np.ndarray(N, 1)
            PI of X
        """
        if self.eta is None:
            raise ValueError('No current best specified. Call update('
                             'eta=<float>) to inform the acquisition function '
                             'about the current best value.')

        if len(X.shape) == 1:
            X = X[:, np.newaxis]
        m, var_ = self.model.predict_marginalized_over_instances(X)
        std = np.sqrt(var_)
        return norm.cdf((self.eta - m - self.par) / std)

实现一个新的获取函数最大化器

要实现一个新的获取函数最大化器,继承AcquisitionFunctionMaximizer类。 需要实现_maximize()方法,该方法返回一个由获取函数值和配置组成的可迭代元组。

请参考openbox/acq_optimizer/basic_maximizer.py获取更多详细信息。

使用您自己的组件修改Advisor

要使用您自己的代理模型/采集函数/采集函数最大化器,只需替换Advisor中的属性如下:

from openbox import Advisor
advisor = Advisor(...)
advisor.surrogate_model = MySurrogateModel(...)
advisor.acquisition_function = MyAcquisitionFunction(...)
advisor.acq_optimizer = MyAcquisitionFunctionMaximizer(...)

这是使用自定义组件的最快方法。 要将新算法集成到贝叶斯优化Advisor中, 请修改Advisor的初始化过程。

实现一个新的顾问

顾问被设计为一个配置生成器。 在每次迭代中,调用advisor.get_suggestion()来生成一个新的配置。 评估后,通过advisor.update_observation()Observation更新到顾问中。

要实现一个新的顾问,继承BaseAdvisor类。 在大多数情况下,实现get_suggestion()方法就足够了。 此方法应返回一个要评估的新配置。

在更新历史数据期间执行额外操作,请修改advisor中的update_observation()

要在并行设置中一次建议多个配置,请实现 get_suggestions() 代替。

这是一个没有重复检查的简单随机顾问的示例:

from openbox.core.base_advisor import BaseAdvisor

class RandomAdvisor(BaseAdvisor):
    def get_suggestion(self):
        config = self.config_space.sample_configuration()
        return config

这里是一个使用BoTorch实现的多目标EHVI顾问示例:

import warnings
from typing import List
import numpy as np
from ConfigSpace import UniformFloatHyperparameter, UniformIntegerHyperparameter, Configuration

import torch
from botorch.models.gp_regression import SingleTaskGP
from botorch.models.model_list_gp_regression import ModelListGP
from botorch.models.transforms.outcome import Standardize
from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood
from botorch.utils.transforms import unnormalize, normalize
from botorch.utils.sampling import draw_sobol_samples
from botorch.optim.optimize import optimize_acqf
from botorch.utils.multi_objective.box_decompositions.non_dominated import (
    FastNondominatedPartitioning,
)
from botorch.acquisition.multi_objective.monte_carlo import (
    qExpectedHypervolumeImprovement,
)
from botorch import fit_gpytorch_mll
from botorch.exceptions import BadInitialCandidatesWarning
from botorch.sampling.normal import SobolQMCNormalSampler

from openbox import Observation, logger
from openbox.core.base_advisor import BaseAdvisor
from openbox.utils.config_space.space_utils import get_config_from_dict


class BoTorchEHVIAdvisor(BaseAdvisor):
    """
    An Advisor using BoTorch's qEHVI acquisition function.

    Caution: BoTorch maximizes the objectives.
    """
    def __init__(
            self,
            config_space,
            num_objectives: int,
            ref_point,
            init_num=-1,
            NUM_RESTARTS=10,
            RAW_SAMPLES=512,
            MC_SAMPLES=128,
            output_dir='logs',
            task_id='BoTorchEHVI',
            seed=None,
            logger_kwargs: dict = None,
    ):
        assert num_objectives > 1
        for hp in config_space.get_hyperparameters():
            assert isinstance(hp, (UniformFloatHyperparameter, UniformIntegerHyperparameter))
        assert ref_point is not None

        super().__init__(
            config_space=config_space,
            num_objectives=num_objectives,
            num_constraints=0,
            ref_point=ref_point,
            output_dir=output_dir,
            task_id=task_id,
            random_state=seed,
            logger_kwargs=logger_kwargs,
        )

        warnings.filterwarnings("ignore", category=BadInitialCandidatesWarning)
        warnings.filterwarnings("ignore", category=RuntimeWarning)

        # random seed
        np.random.seed(seed)
        torch.manual_seed(seed)

        self.tkwargs = {
            "dtype": torch.double,
            "device": torch.device("cuda" if torch.cuda.is_available() else "cpu"),
        }

        self.NUM_RESTARTS = NUM_RESTARTS
        self.RAW_SAMPLES = RAW_SAMPLES
        self.MC_SAMPLES = MC_SAMPLES

        self.problem_dim = len(config_space.get_hyperparameters())
        self.standard_bounds = torch.zeros(2, self.problem_dim, **self.tkwargs)
        self.standard_bounds[1] = 1
        self.problem_bounds = self.get_problem_bounds()

        # Caution: BoTorch maximizes the objectives
        self.botorch_ref_point = -1.0 * torch.tensor(ref_point, **self.tkwargs)

        # initial design
        self.init_num = init_num if init_num > 0 else 2 * (self.problem_dim + 1)
        logger.info(f'init_num: {self.init_num}')
        self.init_configs = self.generate_initial_configs(self.init_num)

    def get_suggestion(self):
        n_history = len(self.history)
        if n_history < self.init_num:
            logger.info(f'Initial iter {n_history + 1}/{self.init_num}')
            return self.init_configs[n_history]

        X = self.history.get_config_array(transform='numerical')
        Y = self.history.get_objectives(transform='failed')

        train_x = torch.tensor(X, **self.tkwargs)  # train_x is not normalized
        train_obj = -1.0 * torch.tensor(Y, **self.tkwargs)  # Caution: BoTorch maximizes the objectives

        # reinitialize the models so they are ready for fitting on next iteration
        # Note: we find improved performance from not warm starting the model hyperparameters
        # using the hyperparameters from the previous iteration
        mll, model = self.initialize_model(train_x, train_obj)

        # fit the models
        fit_gpytorch_mll(mll)

        # define the acquisition modules using a QMC sampler
        qehvi_sampler = SobolQMCNormalSampler(sample_shape=torch.Size([self.MC_SAMPLES]))

        # optimize acquisition functions and get new candidate
        new_x = self.optimize_qehvi(
            model, train_x, train_obj, qehvi_sampler
        )
        config = self.tensor2configs(new_x)[0]
        logger.info(f'Get suggestion. new_x: {new_x}, config: {config}')
        return config

    def update_observation(self, observation: Observation):
        logger.info(f'Update observation: {observation}')
        return super().update_observation(observation)

    def generate_initial_configs(self, init_num):
        x = draw_sobol_samples(bounds=self.problem_bounds, n=init_num, q=1).squeeze(1)
        configs = self.tensor2configs(x)
        return configs

    def initialize_model(self, train_x, train_obj):
        # define models for objective
        train_x = normalize(train_x, self.problem_bounds)
        models = []
        for i in range(train_obj.shape[-1]):
            train_y = train_obj[..., i: i + 1]
            model = SingleTaskGP(train_x, train_y, outcome_transform=Standardize(m=1))
            models.append(model)
        model = ModelListGP(*models)
        mll = SumMarginalLogLikelihood(model.likelihood, model)
        return mll, model

    def optimize_qehvi(self, model, train_x, train_obj, sampler):
        """Optimizes the qEHVI acquisition function, and returns a new candidate."""
        # partition non-dominated space into disjoint rectangles
        with torch.no_grad():
            pred = model.posterior(normalize(train_x, self.problem_bounds)).mean
        partitioning = FastNondominatedPartitioning(
            ref_point=self.botorch_ref_point,
            Y=pred,
        )
        acq_func = qExpectedHypervolumeImprovement(
            model=model,
            ref_point=self.botorch_ref_point,
            partitioning=partitioning,
            sampler=sampler,
        )
        # optimize
        candidates, _ = optimize_acqf(
            acq_function=acq_func,
            bounds=self.standard_bounds,
            q=1,
            num_restarts=self.NUM_RESTARTS,
            raw_samples=self.RAW_SAMPLES,  # used for intialization heuristic
            options={"batch_limit": 5, "maxiter": 200},
            sequential=True,
        )
        # observe new values
        new_x = unnormalize(candidates.detach(), bounds=self.problem_bounds)
        return new_x

    def get_problem_bounds(self):
        bounds = []
        for hp in self.config_space.get_hyperparameters():
            bounds.append((hp.lower, hp.upper))
        bounds = torch.tensor(bounds, **self.tkwargs).transpose(0, 1)
        return bounds

    def tensor2configs(self, x: torch.Tensor) -> List[Configuration]:
        """
        Convert x (torch tensor) to a list of Configurations
        x should be unnormalized.
        """
        assert x.dim() == 2, f'Expected 2-d tensor, got {x.dim()}'
        configs = []
        for i in range(x.shape[0]):
            config_dict = dict()
            for j, hp in enumerate(self.config_space.get_hyperparameters()):
                config_dict[hp.name] = x[i, j].item()
            config = get_config_from_dict(self.config_space, config_dict)
            configs.append(config)
        return configs


if __name__ == '__main__':
    import matplotlib.pyplot as plt
    from openbox.benchmark.objective_functions.synthetic import BraninCurrin

    problem = BraninCurrin()
    advisor = BoTorchEHVIAdvisor(
        config_space=problem.config_space,
        num_objectives=problem.num_objectives,
        ref_point=problem.ref_point,
        init_num=-1,
        output_dir='logs',
        task_id='BoTorchEHVI',
        seed=1234,
    )

    n_iter = 20
    for i in range(n_iter):
        logger.info(f'=== Iteration {i + 1}/{n_iter} ===')
        config = advisor.get_suggestion()
        result = problem(config)
        observation = Observation(config=config, objectives=result['objectives'])
        advisor.update_observation(observation)

    history = advisor.get_history()
    history.plot_hypervolumes(optimal_hypervolume=problem.max_hv, logy=True)
    plt.show()