在Spark上进行ONNX推理
在这个例子中,您训练了一个LightGBM模型,并将模型转换为ONNX格式。转换完成后,您使用该模型在Spark上推断一些测试数据。
此示例使用以下Python包和版本:
onnxmltools==1.7.0lightgbm==3.2.1
加载示例数据
要加载示例数据,请将以下代码示例添加到笔记本中的单元格中,然后运行这些单元格:
%pip install lightgbm onnxmltools==1.7.0
df = (
spark.read.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
)
)
display(df)
输出应类似于下表,尽管值和行数可能有所不同:
| 利息覆盖率 | 净收入标志 | 权益负债比 |
|---|---|---|
| 0.5641 | 1.0 | 0.0165 |
| 0.5702 | 1.0 | 0.0208 |
| 0.5673 | 1.0 | 0.0165 |
使用LightGBM训练模型
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier
feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(df)["Bankrupt?", "features"]
model = (
LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?")
.setEarlyStoppingRound(300)
.setLambdaL1(0.5)
.setNumIterations(1000)
.setNumThreads(-1)
.setMaxDeltaStep(0.5)
.setNumLeaves(31)
.setMaxDepth(-1)
.setBaggingFraction(0.7)
.setFeatureFraction(0.7)
.setBaggingFreq(2)
.setObjective("binary")
.setIsUnbalance(True)
.setMinSumHessianInLeaf(20)
.setMinGainToSplit(0.01)
)
model = model.fit(train_data)
将模型转换为ONNX格式
以下代码将训练好的模型导出为LightGBM booster,然后将其转换为ONNX格式:
from synapse.ml.core.platform import running_on_binder
if running_on_binder():
from IPython import get_ipython
import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier
def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
from onnxmltools.convert import convert_lightgbm
from onnxconverter_common.data_types import FloatTensorType
initial_types = [("input", FloatTensorType([-1, input_size]))]
onnx_model = convert_lightgbm(
lgbm_model, initial_types=initial_types, target_opset=9
)
return onnx_model.SerializeToString()
booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))
转换后,将ONNX有效负载加载到ONNXModel中并检查模型输入和输出:
from synapse.ml.onnx import ONNXModel
onnx_ml = ONNXModel().setModelPayload(model_payload_ml)
print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))
将模型输入映射到输入数据框的列名(FeedDict),并将输出数据框的列名映射到模型输出(FetchDict)。
onnx_ml = (
onnx_ml.setDeviceType("CPU")
.setFeedDict({"input": "features"})
.setFetchDict({"probability": "probabilities", "prediction": "label"})
.setMiniBatchSize(5000)
)
使用模型进行推理
要使用模型进行推理,以下代码创建测试数据并通过ONNX模型转换数据。
from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np
n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
VectorAssembler()
.setInputCols(cols)
.setOutputCol("features")
.transform(testDf)
.drop(*cols)
.cache()
)
display(onnx_ml.transform(testDf))
输出应类似于下表,尽管值和行数可能有所不同:
| 索引 | 特征 | 预测 | 概率 |
|---|---|---|---|
| 1 | "{"type":1,"values":[0.105... | 0 | "{"0":0.835... |
| 2 | "{"type":1,"values":[0.814... | 0 | "{"0":0.658... |