Apache Zeppelin 的 Apache Mahout 解释器
安装
Apache Mahout 是一个包集合,它支持在底层引擎如 Apache Flink 或 Apache Spark 上进行机器学习和矩阵代数运算。存在一个方便的脚本来创建和配置两个支持 Mahout 的解释器。默认情况下,%sparkMahout 和 %flinkMahout 解释器并不存在,但可以使用此脚本轻松创建。
轻松安装
要快速轻松地开始使用 Apache Mahout,请从 Zeppelin 安装的顶级目录运行以下命令:
python scripts/mahout/add_mahout.py
这将创建%sparkMahout和%flinkMahout解释器,并重新启动Zeppelin。
高级安装
add_mahout.py 脚本包含多个供高级用户使用的命令行参数。
| 参数 | 描述 | 示例 |
|---|---|---|
--zeppelin_home |
这是Zeppelin安装的路径。如果脚本从顶级安装目录或zeppelin/scripts/mahout目录运行,则不需要此标志。 |
/path/to/zeppelin |
--mahout_home |
如果用户已经安装了Mahout,此标志可以设置MAHOUT_HOME的路径。如果设置了此标志,将跳过下载Mahout。 |
/path/to/mahout_home |
--restart_later |
为了使更新生效,需要重新启动。默认情况下,脚本将为您重新启动Zeppelin。如果设置了此标志,将跳过重新启动。 | NA |
--force_download |
此标志将强制脚本重新下载二进制文件,即使它已经存在。这对于之前下载失败的情况很有用。 | NA |
--overwrite_existing |
此标志将强制脚本覆盖现有的%sparkMahout和%flinkMahout解释器。当您想要重新开始时很有用。 |
NA |
注意 1: 目前 Apache Mahout 仅支持 Spark 1.5 和 Spark 1.6 以及 Scala 2.10。如果用户使用的是其他版本的 Spark(例如 2.0),%sparkMahout 可能无法正常工作。%flinkMahout 解释器仍然可以工作,建议用户使用该引擎进行开发,因为代码可以通过复制和粘贴进行移植,如教程笔记本所示。
注意 2: 如果在集群模式下使用 Apache Flink,还需要将以下库复制到 ${FLINK_HOME}/lib
- mahout-math-0.12.2.jar
- mahout-math-scala2.10-0.12.2.jar
- mahout-flink2.10-0.12.2.jar
- mahout-hdfs-0.12.2.jar
- com.google.guava:guava:14.0.1
概述
Apache Mahout™ 项目的目标是构建一个环境,用于快速创建可扩展的高性能机器学习应用程序。
Apache Mahout 软件提供了三大功能:
- 一个简单且可扩展的编程环境和框架,用于构建可扩展的算法
- 适用于 Scala + Apache Spark、H2O、Apache Flink 的多种预制算法
- Samsara,一个具有类似R语法的向量数学实验环境,能够大规模工作
换句话说:
Apache Mahout 提供了一个统一的API,用于在各种引擎上快速创建机器学习算法。
如何使用
当开始使用Apache Mahout会话时,根据您使用的引擎(Spark或Flink),必须进行一些导入并声明一个分布式上下文。复制并粘贴以下代码并运行一次以开始。
Flink
%flinkMahout
import org.apache.flink.api.scala._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.flinkbindings._
import org.apache.mahout.math._
import scalabindings._
import RLikeOps._
implicit val ctx = new FlinkDistributedContext(benv)
Spark
%sparkMahout
import org.apache.mahout.math._
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.sparkbindings._
implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)
相同的代码,不同的引擎
在导入并设置分布式上下文后,Mahout R-Like DSL 在各个引擎中是一致的。以下代码将在 %flinkMahout 和 %sparkMahout 中运行。
val drmData = drmParallelize(dense(
(2, 2, 10.5, 10, 29.509541), // Apple Cinnamon Cheerios
(1, 2, 12, 12, 18.042851), // Cap'n'Crunch
(1, 1, 12, 13, 22.736446), // Cocoa Puffs
(2, 1, 11, 13, 32.207582), // Froot Loops
(1, 2, 12, 11, 21.871292), // Honey Graham Ohs
(2, 1, 16, 8, 36.187559), // Wheaties Honey Gold
(6, 2, 17, 1, 50.764999), // Cheerios
(3, 2, 13, 7, 40.400208), // Clusters
(3, 3, 13, 4, 45.811716)), numPartitions = 2)
drmData.collect(::, 0 until 4)
val drmX = drmData(::, 0 until 4)
val y = drmData.collect(::, 4)
val drmXtX = drmX.t %*% drmX
val drmXty = drmX.t %*% y
val XtX = drmXtX.collect
val Xty = drmXty.collect(::, 0)
val beta = solve(XtX, Xty)
利用资源池和R进行可视化
资源池是Zeppelin的一个强大功能,它允许我们在解释器之间共享信息。一个有趣的技巧是获取我们在Mahout中的工作输出,并在其他语言中进行分析。
在Flink中设置资源池
在基于Spark的解释器中,资源池通过ZeppelinContext API访问。要从资源池中放入和获取内容,可以简单地完成
val myVal = 1
z.put("foo", myVal)
val myFetchedVal = z.get("foo")
为了将此功能添加到基于Flink的解释器中,我们声明以下内容
%flinkMahout
import org.apache.zeppelin.interpreter.InterpreterContext
val z = InterpreterContext.get().getResourcePool()
现在我们可以从%flinkMahout解释器以一致的方式访问资源池。
将变量从Mahout传递到R并进行绘图
在这个简单的例子中,我们使用Mahout(在Flink或Spark上,代码是相同的)来创建一个随机矩阵,然后对每个元素取Sin值。接着我们随机采样矩阵并创建一个以制表符分隔的字符串。最后,我们将该字符串传递给R,在R中它被读取为.tsv文件,并创建一个DataFrame,使用原生的R绘图库进行绘图。
val mxRnd = Matrices.symmetricUniformView(5000, 2, 1234)
val drmRand = drmParallelize(mxRnd)
val drmSin = drmRand.mapBlock() {case (keys, block) =>
val blockB = block.like()
for (i <- 0 until block.nrow) {
blockB(i, 0) = block(i, 0)
blockB(i, 1) = Math.sin((block(i, 0) * 8))
}
keys -> blockB
}
z.put("sinDrm", org.apache.mahout.math.drm.drmSampleToTSV(drmSin, 0.85))
然后在R段落中...
%spark.r {"imageWidth": "400px"}
library("ggplot2")
sinStr = z.get("flinkSinDrm")
data <- read.table(text= sinStr, sep="\t", header=FALSE)
plot(data, col="red")