Skip to content

估计器工具

该文件是TPOT库的一部分。

当前版本的TPOT是由以下人员在Cedars-Sinai开发的: - Pedro Henrique Ribeiro (https://github.com/perib, https://www.linkedin.com/in/pedro-ribeiro/) - Anil Saini (anil.saini@cshs.org) - Jose Hernandez (jgh9094@gmail.com) - Jay Moran (jay.moran@cshs.org) - Nicholas Matsumoto (nicholas.matsumoto@cshs.org) - Hyunjun Choi (hyunjun.choi@cshs.org) - Miguel E. Hernandez (miguel.e.hernandez@cshs.org) - Jason Moore (moorejh28@gmail.com)

TPOT的原始版本主要由宾夕法尼亚大学的以下人员开发: - Randal S. Olson (rso@randalolson.com) - Weixuan Fu (weixuanf@upenn.edu) - Daniel Angell (dpa34@drexel.edu) - Jason Moore (moorejh28@gmail.com) - 以及许多慷慨的开源贡献者

TPOT 是免费软件:您可以根据自由软件基金会发布的 GNU 宽通用公共许可证的条款重新分发和/或修改它,许可证的版本可以是第 3 版,或者(根据您的选择)任何以后的版本。

TPOT 的发布是希望它能有用, 但没有任何保证;甚至没有对 适销性或特定用途适用性的暗示保证。更多详情请参阅 GNU 较宽松通用公共许可证。

您应该已经收到了一份GNU较宽松通用公共许可证的副本,随TPOT一起提供。如果没有,请参见http://www.gnu.org/licenses/

apply_make_pipeline(ind, preprocessing_pipeline=None, export_graphpipeline=False, **pipeline_kwargs)

辅助函数,用于从tpot2个体类创建sklearn管道的列。

参数:

名称 类型 描述 默认值
ind

要转换为管道的个体。

required
preprocessing_pipeline

在个体的管道之前包含的预处理管道。

None
export_graphpipeline

强制将管道导出为图形管道。将所有嵌套的管道、FeatureUnions 和 GraphPipelines 展平为单个 GraphPipeline。

False
pipeline_kwargs

传递给 export_pipeline 或 export_flattened_graphpipeline 方法的关键字参数。

{}

返回:

类型 描述
sklearn estimator
Source code in tpot2/tpot_estimator/estimator_utils.py
def apply_make_pipeline(ind, preprocessing_pipeline=None, export_graphpipeline=False, **pipeline_kwargs):
    """
    Helper function to create a column of sklearn pipelines from the tpot2 individual class.

    Parameters
    ----------
    ind: tpot2.SklearnIndividual
        The individual to convert to a pipeline.
    preprocessing_pipeline: sklearn.pipeline.Pipeline, optional
        The preprocessing pipeline to include before the individual's pipeline.
    export_graphpipeline: bool, default=False
        Force the pipeline to be exported as a graph pipeline. Flattens all nested pipelines, FeatureUnions, and GraphPipelines into a single GraphPipeline.
    pipeline_kwargs: dict
        Keyword arguments to pass to the export_pipeline or export_flattened_graphpipeline method.

    Returns
    -------
    sklearn estimator
    """

    try:

        if export_graphpipeline:
            est = ind.export_flattened_graphpipeline(**pipeline_kwargs)
        else:
            est = ind.export_pipeline(**pipeline_kwargs)


        if preprocessing_pipeline is None:
            return est
        else:
            return sklearn.pipeline.make_pipeline(sklearn.base.clone(preprocessing_pipeline), est)
    except:
        return None

check_if_y_is_encoded(y)

检查目标 y 是否由从 0 到 N 的连续整数组成。 XGBoost 要求目标以这种方式编码。

参数:

名称 类型 描述 默认值
y

目标向量。

必填

返回:

类型 描述
bool

如果目标被编码为从0到N的顺序整数,则为True,否则为False

Source code in tpot2/tpot_estimator/estimator_utils.py
def check_if_y_is_encoded(y):
    '''
    Checks if the target y is composed of sequential ints from 0 to N.
    XGBoost requires the target to be encoded in this way.

    Parameters
    ----------
    y: np.ndarray
        The target vector.

    Returns
    -------
    bool
        True if the target is encoded as sequential ints from 0 to N, False otherwise
    '''
    y = sorted(set(y))
    return all(i == j for i, j in enumerate(y))

convert_parents_tuples_to_integers(row, object_to_int)

辅助函数,用于将父行转换为表示父代在种群中索引的整数。

使用自定义索引的原始pandas数据框。此函数将自定义索引转换为整数索引,以便最终用户更容易操作。

参数:

名称 类型 描述 默认值
row

要转换的行。

必填
object_to_int

一个将对象映射到整数索引的字典。

required
Returns

元组 带有自定义索引的行已转换为整数索引。

Source code in tpot2/tpot_estimator/estimator_utils.py
def convert_parents_tuples_to_integers(row, object_to_int):
    """
    Helper function to convert the parent rows into integers representing the index of the parent in the population.

    Original pandas dataframe using a custom index for the parents. This function converts the custom index to an integer index for easier manipulation by end users.

    Parameters
    ----------
    row: list, np.ndarray, tuple
        The row to convert.
    object_to_int: dict
        A dictionary mapping the object to an integer index.

    Returns 
    -------
    tuple
        The row with the custom index converted to an integer index.
    """
    if type(row) == list or type(row) == np.ndarray or type(row) == tuple:
        return tuple(object_to_int[obj] for obj in row)
    else:
        return np.nan

objective_function_generator(pipeline, x, y, scorers, cv, other_objective_functions, step=None, budget=None, is_classification=True, export_graphpipeline=False, **pipeline_kwargs)

使用交叉验证来评估管道,使用评分器,并将结果与其他独立目标函数的分数连接起来。

参数:

名称 类型 描述 默认值
pipeline

要评估的个体。

required
x

特征矩阵。

必填
y

目标向量。

必填
scorers

用于交叉验证的评分器。

必填
cv

使用的交叉验证器。例如,sklearn.model_selection.KFold 或 sklearn.model_selection.StratifiedKFold。 如果是一个整数,将使用 sklearn.model_selection.KFold 并设置 n_splits=cv。

required
other_objective_functions

用于评估管道的独立目标函数列表。签名格式为 obj(pipeline) -> float 或 obj(pipeline) -> np.ndarray 这些函数接收未拟合的估计器。

required
step

要返回分数的折叠。如果为None,将返回所有分数的平均值(每个评分器)。默认值为None。

None
budget

用于对数据进行子采样的预算。如果为None,将使用完整的数据集。默认值为None。 将子采样budget*len(x)个样本。

None
is_classification

如果为True,将进行分层子采样。默认为True。

True
export_graphpipeline

强制将管道导出为图形管道。将所有嵌套的sklearn管道、FeatureUnions和GraphPipelines展平为单个GraphPipeline。

False
pipeline_kwargs

传递给 export_pipeline 或 export_flattened_graphpipeline 方法的关键字参数。

{}

返回:

类型 描述
ndarray

管道的连接分数。前 len(scorers) 个元素是交叉验证分数,其余元素是独立的目标函数。

Source code in tpot2/tpot_estimator/estimator_utils.py
def objective_function_generator(pipeline, x,y, scorers, cv, other_objective_functions, step=None, budget=None, is_classification=True, export_graphpipeline=False, **pipeline_kwargs):
    """
    Uses cross validation to evaluate the pipeline using the scorers, and concatenates results with scores from standalone other objective functions.

    Parameters
    ----------
    pipeline: tpot2.SklearnIndividual
        The individual to evaluate.
    x: np.ndarray
        The feature matrix.
    y: np.ndarray
        The target vector.
    scorers: list
        The scorers to use for cross validation. 
    cv: int, float, or sklearn cross-validator
        The cross-validator to use. For example, sklearn.model_selection.KFold or sklearn.model_selection.StratifiedKFold.
        If an int, will use sklearn.model_selection.KFold with n_splits=cv.
    other_objective_functions: list
        A list of standalone objective functions to evaluate the pipeline. With signature obj(pipeline) -> float. or obj(pipeline) -> np.ndarray
        These functions take in the unfitted estimator.
    step: int, optional
        The fold to return the scores for. If None, will return the mean of all the scores (per scorer). Default is None.
    budget: float, optional
        The budget to subsample the data. If None, will use the full dataset. Default is None.
        Will subsample budget*len(x) samples.
    is_classification: bool, default=True
        If True, will stratify the subsampling. Default is True.
    export_graphpipeline: bool, default=False
        Force the pipeline to be exported as a graph pipeline. Flattens all nested sklearn pipelines, FeatureUnions, and GraphPipelines into a single GraphPipeline.
    pipeline_kwargs: dict
        Keyword arguments to pass to the export_pipeline or export_flattened_graphpipeline method.

    Returns
    -------
    np.ndarray
        The concatenated scores for the pipeline. The first len(scorers) elements are the cross validation scores, and the remaining elements are the standalone objective functions.

    """

    if export_graphpipeline:
        pipeline = pipeline.export_flattened_graphpipeline(**pipeline_kwargs)
    else:
        pipeline = pipeline.export_pipeline(**pipeline_kwargs)

    if budget is not None and budget < 1:
        if is_classification:
            x,y = sklearn.utils.resample(x,y, stratify=y, n_samples=int(budget*len(x)), replace=False, random_state=1)
        else:
            x,y = sklearn.utils.resample(x,y, n_samples=int(budget*len(x)), replace=False, random_state=1)

        if isinstance(cv, int) or isinstance(cv, float):
            n_splits = cv
        else:
            n_splits = cv.n_splits

    if len(scorers) > 0:
        cv_obj_scores = cross_val_score_objective(sklearn.base.clone(pipeline),x,y,scorers=scorers, cv=cv , fold=step)
    else:
        cv_obj_scores = []

    if other_objective_functions is not None and len(other_objective_functions) >0:
        other_scores = [obj(sklearn.base.clone(pipeline)) for obj in other_objective_functions]
        #flatten
        other_scores = np.array(other_scores).flatten().tolist()
    else:
        other_scores = []

    return np.concatenate([cv_obj_scores,other_scores])

remove_underrepresented_classes(x, y, min_count)

辅助函数,用于从数据集中移除样本数少于min_count的类别。

参数:

名称 类型 描述 默认值
x

特征矩阵。

必填
y

目标向量。

必填
min_count

保留一个类别所需的最小样本数。

required

返回:

类型 描述
(ndarray, ndarray)

特征矩阵和目标向量,其中移除了样本数少于min_count的类别的行。

Source code in tpot2/tpot_estimator/estimator_utils.py
def remove_underrepresented_classes(x, y, min_count):
    """
    Helper function to remove classes with less than min_count samples from the dataset.

    Parameters
    ----------
    x: np.ndarray or pd.DataFrame
        The feature matrix.
    y: np.ndarray or pd.Series
        The target vector.
    min_count: int
        The minimum number of samples to keep a class.

    Returns
    -------
    np.ndarray, np.ndarray
        The feature matrix and target vector with rows from classes with less than min_count samples removed.
    """
    if isinstance(y, (np.ndarray, pd.Series)):
        unique, counts = np.unique(y, return_counts=True)
        if min(counts) >= min_count:
            return x, y
        keep_classes = unique[counts >= min_count]
        mask = np.isin(y, keep_classes)
        x = x[mask]
        y = y[mask]
    elif isinstance(y, pd.DataFrame):
        counts = y.apply(pd.Series.value_counts)
        if min(counts) >= min_count:
            return x, y
        keep_classes = counts.index[counts >= min_count].tolist()
        mask = y.isin(keep_classes).all(axis=1)
        x = x[mask]
        y = y[mask]
    else:
        raise TypeError("y must be a numpy array or a pandas Series/DataFrame")
    return x, y

val_objective_function_generator(pipeline, X_train, y_train, X_test, y_test, scorers, other_objective_functions, export_graphpipeline=False, **pipeline_kwargs)

在训练集上训练一个管道,并使用评分器和其他目标函数在测试集上对其进行评估。

参数:

名称 类型 描述 默认值
pipeline

要评估的个体。

required
X_train

训练集的特征矩阵。

required
y_train

训练集的目标向量。

required
X_test

测试集的特征矩阵。

required
y_test

测试集的目标向量。

required
scorers

用于交叉验证的评分器。

必填
other_objective_functions

用于评估管道的独立目标函数列表。签名格式为 obj(pipeline) -> float 或 obj(pipeline) -> np.ndarray 这些函数接收未拟合的估计器。

required
export_graphpipeline

强制将管道导出为图形管道。将所有嵌套的sklearn管道、FeatureUnions和GraphPipelines展平为单个GraphPipeline。

False
pipeline_kwargs

传递给 export_pipeline 或 export_flattened_graphpipeline 方法的关键字参数。

{}

返回:

类型 描述
ndarray

管道的连接分数。前 len(scorers) 个元素是交叉验证分数,其余元素是独立的目标函数。

Source code in tpot2/tpot_estimator/estimator_utils.py
def val_objective_function_generator(pipeline, X_train, y_train, X_test, y_test, scorers, other_objective_functions, export_graphpipeline=False, **pipeline_kwargs):
    """
    Trains a pipeline on a training set and evaluates it on a test set using the scorers and other objective functions.

    Parameters
    ----------

    pipeline: tpot2.SklearnIndividual
        The individual to evaluate.
    X_train: np.ndarray
        The feature matrix of the training set.
    y_train: np.ndarray
        The target vector of the training set.
    X_test: np.ndarray
        The feature matrix of the test set.
    y_test: np.ndarray
        The target vector of the test set.
    scorers: list
        The scorers to use for cross validation.
    other_objective_functions: list
        A list of standalone objective functions to evaluate the pipeline. With signature obj(pipeline) -> float. or obj(pipeline) -> np.ndarray
        These functions take in the unfitted estimator.
    export_graphpipeline: bool, default=False
        Force the pipeline to be exported as a graph pipeline. Flattens all nested sklearn pipelines, FeatureUnions, and GraphPipelines into a single GraphPipeline.
    pipeline_kwargs: dict
        Keyword arguments to pass to the export_pipeline or export_flattened_graphpipeline method.

    Returns
    -------
    np.ndarray
        The concatenated scores for the pipeline. The first len(scorers) elements are the cross validation scores, and the remaining elements are the standalone objective functions.


    """

    #subsample the data
    if export_graphpipeline:
        pipeline = pipeline.export_flattened_graphpipeline(**pipeline_kwargs)
    else:
        pipeline = pipeline.export_pipeline(**pipeline_kwargs)

    fitted_pipeline = sklearn.base.clone(pipeline)
    fitted_pipeline.fit(X_train, y_train)

    if len(scorers) > 0:
        scores =[sklearn.metrics.get_scorer(scorer)(fitted_pipeline, X_test, y_test) for scorer in scorers]

    other_scores = []
    if other_objective_functions is not None and len(other_objective_functions) >0:
        other_scores = [obj(sklearn.base.clone(pipeline)) for obj in other_objective_functions]

    return np.concatenate([scores,other_scores])