回归 - 航班延误与数据清理
这个示例笔记本类似于
回归 - 航班延误。
在这个示例中,我们将以两种方式展示DataConversion()的使用。首先,在数据集被读入Spark DataFrame后,转换几列的数据类型,而不是在读取文件时指定数据类型。其次,将列转换为分类列,而不是遍历列并应用StringIndexer。
此示例演示如何使用以下API:
接下来,导入CSV数据集:如果需要,检索文件,将其保存在本地,通过read_csv()将数据读入pandas数据框,然后将其转换为Spark数据框。
打印数据框的模式,并注意那些是long类型的列。
flightDelay = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/On_Time_Performance_2012_9.parquet"
)
# print some basic info
print("records read: " + str(flightDelay.count()))
print("Schema: ")
flightDelay.printSchema()
flightDelay.limit(10).toPandas()
使用DataConversion转换API将列转换为双精度类型。
DataConversion API 接受以下类型的 convertTo 参数:
booleanbyteshortintegerlongfloatdoublestringtoCategoricalclearCategoricaldate-- 将字符串或长整型转换为格式为"yyyy-MM-dd HH:mm:ss"的日期,除非dateTimeFormat参数指定了其他格式。
再次打印模式,并注意列现在为double而不是长整型。
from synapse.ml.featurize import DataConversion
flightDelay = DataConversion(
cols=[
"Quarter",
"Month",
"DayofMonth",
"DayOfWeek",
"OriginAirportID",
"DestAirportID",
"CRSDepTime",
"CRSArrTime",
],
convertTo="double",
).transform(flightDelay)
flightDelay.printSchema()
flightDelay.limit(10).toPandas()
将数据集分割为训练集和测试集。
train, test = flightDelay.randomSplit([0.75, 0.25])
创建一个回归模型并在数据集上进行训练。
首先,使用DataConversion将列Carrier、DepTimeBlk和ArrTimeBlk转换为分类数据。回想一下,在Notebook 102中,这是通过遍历列并使用StringIndexer API将字符串转换为索引值来完成的。DataConversion API通过允许您在单个命令中指定所有将具有相同最终类型的列来简化任务。
使用有限内存BFGS求解器(l-bfgs)、ElasticNet混合参数为0.3和Regularization为0.1创建一个线性回归模型。
使用TrainRegressor API在训练数据集上训练模型。
from synapse.ml.train import TrainRegressor, TrainedRegressorModel
from pyspark.ml.regression import LinearRegression
trainCat = DataConversion(
cols=["Carrier", "DepTimeBlk", "ArrTimeBlk"], convertTo="toCategorical"
).transform(train)
testCat = DataConversion(
cols=["Carrier", "DepTimeBlk", "ArrTimeBlk"], convertTo="toCategorical"
).transform(test)
lr = LinearRegression().setRegParam(0.1).setElasticNetParam(0.3)
model = TrainRegressor(model=lr, labelCol="ArrDelay").fit(trainCat)
在测试数据上对回归器进行评分。
scoredData = model.transform(testCat)
scoredData.limit(10).toPandas()
计算模型指标以对抗整个评分数据集
from synapse.ml.train import ComputeModelStatistics
metrics = ComputeModelStatistics().transform(scoredData)
metrics.toPandas()
最后,计算并显示测试数据集中个别预测的统计信息,展示ComputePerInstanceStatistics的使用方法。
from synapse.ml.train import ComputePerInstanceStatistics
evalPerInstance = ComputePerInstanceStatistics().transform(scoredData)
evalPerInstance.select("ArrDelay", "prediction", "L1_loss", "L2_loss").limit(
10
).toPandas()