使用Spark Serving进行模型部署
在这个例子中,我们尝试从成人普查数据集中预测收入。然后我们将使用Spark serving将其部署为实时网络服务。 首先,我们导入所需的包:
现在让我们读取数据并将其分割为训练集和测试集:
data = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/AdultCensusIncome.parquet"
)
data = data.select(["education", "marital-status", "hours-per-week", "income"])
train, test = data.randomSplit([0.75, 0.25], seed=123)
train.limit(10).toPandas()
TrainClassifier 可用于初始化和拟合模型,它封装了 SparkML 分类器。
你可以使用 help(synapse.ml.TrainClassifier) 来查看不同的参数。
请注意,它会隐式地将数据转换为算法期望的格式。更具体地说,它会:
对字符串进行分词、哈希处理,对分类变量进行独热编码,将特征组装成向量
等等。参数 numFeatures 控制哈希特征的数量。
from synapse.ml.train import TrainClassifier
from pyspark.ml.classification import LogisticRegression
model = TrainClassifier(
model=LogisticRegression(), labelCol="income", numFeatures=256
).fit(train)
模型训练完成后,我们针对测试数据集对其进行评分并查看指标。
from synapse.ml.train import ComputeModelStatistics, TrainedClassifierModel
prediction = model.transform(test)
prediction.printSchema()
metrics = ComputeModelStatistics().transform(prediction)
metrics.limit(10).toPandas()
首先,我们将定义网络服务的输入/输出。 更多信息,您可以访问Spark Serving的文档
from pyspark.sql.types import *
from synapse.ml.io import *
import uuid
serving_inputs = (
spark.readStream.server()
.address("localhost", 8898, "my_api")
.option("name", "my_api")
.load()
.parseRequest("my_api", test.schema)
)
serving_outputs = model.transform(serving_inputs).makeReply("prediction")
server = (
serving_outputs.writeStream.server()
.replyTo("my_api")
.queryName("my_query")
.option("checkpointLocation", "file:///tmp/checkpoints-{}".format(uuid.uuid1()))
.start()
)
测试网络服务
import requests
data = '{"education":" 10th","marital-status":"Divorced","hours-per-week":40.0}'
r = requests.post(data=data, url="http://localhost:8898/my_api")
print("Response {}".format(r.text))
import requests
data = '{"education":" Masters","marital-status":"Married-civ-spouse","hours-per-week":40.0}'
r = requests.post(data=data, url="http://localhost:8898/my_api")
print("Response {}".format(r.text))
import time
time.sleep(20) # wait for server to finish setting up (just to be safe)
server.stop()