并行化¶
本教程涵盖了使用Dask并行化TPOT2的高级设置。如果您只想在单台计算机上使用多个进程并行化TPOT2,请将n_jobs参数设置为您想要使用的线程数,并跳过本教程。
TPOT2 使用 Dask 进行并行化,并默认使用 dask.distributed.LocalCluster 进行本地并行化。用户可以为高级用法传入自定义的 Dask 客户端或集群。例如,可以使用 dask-jobqueue 包实现多节点并行化。
TPOT2 可以通过设置 n_jobs 和 memory_limit 参数在本地计算机上轻松并行化。
n_jobs 决定了要启动多少个dask工作器。在TPOT2中,这对应于并行评估的管道数量。
memory_limit 是每个工作进程使用的内存量。
In [2]:
Copied!
import tpot2
import sklearn
import sklearn.datasets
import numpy as np
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
graph_search_space = tpot2.search_spaces.pipelines.GraphPipeline(
root_search_space= tpot2.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot2.config.get_search_space("selectors"),
inner_search_space = tpot2.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot2.TPOTEstimator(
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
verbose = 2,
n_jobs=16,
memory_limit="4GB"
)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
import tpot2
import sklearn
import sklearn.datasets
import numpy as np
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
graph_search_space = tpot2.search_spaces.pipelines.GraphPipeline(
root_search_space= tpot2.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot2.config.get_search_space("selectors"),
inner_search_space = tpot2.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot2.TPOTEstimator(
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
verbose = 2,
n_jobs=16,
memory_limit="4GB"
)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
Generation: 100%|██████████| 5/5 [00:11<00:00, 2.24s/it] /home/ribeirop/miniconda3/envs/tpot2env/lib/python3.10/site-packages/sklearn/decomposition/_fastica.py:595: UserWarning: n_components is too large: it will be set to 8 warnings.warn( /home/ribeirop/miniconda3/envs/tpot2env/lib/python3.10/site-packages/sklearn/decomposition/_fastica.py:128: ConvergenceWarning: FastICA did not converge. Consider increasing tolerance or the maximum number of iterations. warnings.warn( /home/ribeirop/miniconda3/envs/tpot2env/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:350: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge warnings.warn(
1.0
初始化一个基本的dask本地集群
In [3]:
Copied!
from dask.distributed import Client, LocalCluster
n_jobs = 4
memory_limit = "4GB"
cluster = LocalCluster(n_workers=n_jobs, #if no client is passed in and no global client exists, create our own
threads_per_worker=1,
memory_limit=memory_limit)
client = Client(cluster)
from dask.distributed import Client, LocalCluster
n_jobs = 4
memory_limit = "4GB"
cluster = LocalCluster(n_workers=n_jobs, #if no client is passed in and no global client exists, create our own
threads_per_worker=1,
memory_limit=memory_limit)
client = Client(cluster)
获取查看dask仪表板的链接。
In [4]:
Copied!
client.dashboard_link
client.dashboard_link
Out[4]:
'http://127.0.0.1:8787/status'
In [5]:
Copied!
graph_search_space = tpot2.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot2.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot2.config.get_search_space("selectors"),
inner_search_space = tpot2.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot2.TPOTEstimator(
client = client,
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
# this is equivalent to:
# est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
#It is good to close the client and cluster when you are done with them
client.close()
cluster.close()
graph_search_space = tpot2.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot2.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot2.config.get_search_space("selectors"),
inner_search_space = tpot2.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot2.TPOTEstimator(
client = client,
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
# this is equivalent to:
# est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
#It is good to close the client and cluster when you are done with them
client.close()
cluster.close()
Generation: 100%|██████████| 5/5 [00:13<00:00, 2.62s/it] /home/ribeirop/miniconda3/envs/tpot2env/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:350: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge warnings.warn(
1.0
选项 2
您可以使用上下文管理器初始化集群和客户端,它将自动关闭它们。
In [7]:
Copied!
from dask.distributed import Client, LocalCluster
import tpot2
import sklearn
import sklearn.datasets
import numpy as np
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
n_jobs = 4
memory_limit = "4GB"
with LocalCluster(
n_workers=n_jobs,
threads_per_worker=1,
memory_limit='4GB',
) as cluster, Client(cluster) as client:
graph_search_space = tpot2.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot2.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot2.config.get_search_space("selectors"),
inner_search_space = tpot2.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot2.TPOTEstimator(
client = client,
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 5,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
from dask.distributed import Client, LocalCluster
import tpot2
import sklearn
import sklearn.datasets
import numpy as np
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
n_jobs = 4
memory_limit = "4GB"
with LocalCluster(
n_workers=n_jobs,
threads_per_worker=1,
memory_limit='4GB',
) as cluster, Client(cluster) as client:
graph_search_space = tpot2.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot2.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot2.config.get_search_space("selectors"),
inner_search_space = tpot2.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot2.TPOTEstimator(
client = client,
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 5,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
Generation: 100%|██████████| 5/5 [00:16<00:00, 3.33s/it] /home/ribeirop/miniconda3/envs/tpot2env/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:350: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge warnings.warn(
1.0
Dask 多节点并行化在HPC上¶
Dask 可以通过作业队列系统在多个节点上并行化。这是通过使用 Dask-Jobqueue 包实现的。更多信息可以在官方的文档这里找到。
要使用Dask-Jobqueue并行化TPOT2,只需将基于Jobqueue集群的客户端传递到client参数中,并设置所需的配置。每个作业将评估一个单独的管道。
请注意,TPOT 将忽略 n_jobs 和 memory_limit,因为这些应该在 Dask 集群中设置。
以下示例特定于Sun Grid Engine。其他支持的集群可以在Dask-Jobqueue文档这里找到。
In [ ]:
Copied!
from dask.distributed import Client, LocalCluster
import sklearn
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import tpot2
from dask_jobqueue import SGECluster # or SLURMCluster, PBSCluster, etc. Replace SGE with your scheduler.
import os
if os.system("which qsub") != 0:
print("Sun Grid Engine is not installed. This example requires Sun Grid Engine to be installed.")
else:
print("Sun Grid Engine is installed.")
cluster = SGECluster(
queue='all.q',
cores=2,
memory="50 GB"
)
cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs
client = Client(cluster)
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_digits(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
graph_search_space = tpot2.search_spaces.pipelines.GraphPipeline(
root_search_space= tpot2.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot2.config.get_search_space("selectors"),
inner_search_space = tpot2.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot2.TPOTEstimator(
client = client,
scorers = ["roc_auc"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
est.fit(X_train, y_train)
# this is equivalent to:
# est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
#It is good to close the client and cluster when you are done with them
client.close()
cluster.close()
from dask.distributed import Client, LocalCluster
import sklearn
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import tpot2
from dask_jobqueue import SGECluster # or SLURMCluster, PBSCluster, etc. Replace SGE with your scheduler.
import os
if os.system("which qsub") != 0:
print("Sun Grid Engine is not installed. This example requires Sun Grid Engine to be installed.")
else:
print("Sun Grid Engine is installed.")
cluster = SGECluster(
queue='all.q',
cores=2,
memory="50 GB"
)
cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs
client = Client(cluster)
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_digits(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
graph_search_space = tpot2.search_spaces.pipelines.GraphPipeline(
root_search_space= tpot2.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot2.config.get_search_space("selectors"),
inner_search_space = tpot2.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot2.TPOTEstimator(
client = client,
scorers = ["roc_auc"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
est.fit(X_train, y_train)
# this is equivalent to:
# est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
#It is good to close the client and cluster when you are done with them
client.close()
cluster.close()