Apache Zeppelin 的 Apache Mahout 解释器

安装

Apache Mahout 是一个包集合,它支持在底层引擎如 Apache Flink 或 Apache Spark 上进行机器学习和矩阵代数运算。存在一个方便的脚本来创建和配置两个支持 Mahout 的解释器。默认情况下,%sparkMahout%flinkMahout 解释器并不存在,但可以使用此脚本轻松创建。

轻松安装

要快速轻松地开始使用 Apache Mahout,请从 Zeppelin 安装的顶级目录运行以下命令:

python scripts/mahout/add_mahout.py

这将创建%sparkMahout%flinkMahout解释器,并重新启动Zeppelin。

高级安装

add_mahout.py 脚本包含多个供高级用户使用的命令行参数。

参数 描述 示例
--zeppelin_home 这是Zeppelin安装的路径。如果脚本从顶级安装目录或zeppelin/scripts/mahout目录运行,则不需要此标志。 /path/to/zeppelin
--mahout_home 如果用户已经安装了Mahout,此标志可以设置MAHOUT_HOME的路径。如果设置了此标志,将跳过下载Mahout。 /path/to/mahout_home
--restart_later 为了使更新生效,需要重新启动。默认情况下,脚本将为您重新启动Zeppelin。如果设置了此标志,将跳过重新启动。 NA
--force_download 此标志将强制脚本重新下载二进制文件,即使它已经存在。这对于之前下载失败的情况很有用。 NA
--overwrite_existing 此标志将强制脚本覆盖现有的%sparkMahout%flinkMahout解释器。当您想要重新开始时很有用。 NA

注意 1: 目前 Apache Mahout 仅支持 Spark 1.5 和 Spark 1.6 以及 Scala 2.10。如果用户使用的是其他版本的 Spark(例如 2.0),%sparkMahout 可能无法正常工作。%flinkMahout 解释器仍然可以工作,建议用户使用该引擎进行开发,因为代码可以通过复制和粘贴进行移植,如教程笔记本所示。

注意 2: 如果在集群模式下使用 Apache Flink,还需要将以下库复制到 ${FLINK_HOME}/lib - mahout-math-0.12.2.jar - mahout-math-scala2.10-0.12.2.jar - mahout-flink2.10-0.12.2.jar - mahout-hdfs-0.12.2.jar - com.google.guava:guava:14.0.1

概述

Apache Mahout™ 项目的目标是构建一个环境,用于快速创建可扩展的高性能机器学习应用程序。

Apache Mahout 软件提供了三大功能:

  • 一个简单且可扩展的编程环境和框架,用于构建可扩展的算法
  • 适用于 Scala + Apache Spark、H2O、Apache Flink 的多种预制算法
  • Samsara,一个具有类似R语法的向量数学实验环境,能够大规模工作

换句话说:

Apache Mahout 提供了一个统一的API,用于在各种引擎上快速创建机器学习算法。

如何使用

当开始使用Apache Mahout会话时,根据您使用的引擎(Spark或Flink),必须进行一些导入并声明一个分布式上下文。复制并粘贴以下代码并运行一次以开始。

Flink

%flinkMahout

import org.apache.flink.api.scala._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.flinkbindings._
import org.apache.mahout.math._
import scalabindings._
import RLikeOps._

implicit val ctx = new FlinkDistributedContext(benv)

Spark

%sparkMahout

import org.apache.mahout.math._
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.sparkbindings._

implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)

相同的代码,不同的引擎

在导入并设置分布式上下文后,Mahout R-Like DSL 在各个引擎中是一致的。以下代码将在 %flinkMahout%sparkMahout 中运行。

val drmData = drmParallelize(dense(
  (2, 2, 10.5, 10, 29.509541),  // Apple Cinnamon Cheerios
  (1, 2, 12,   12, 18.042851),  // Cap'n'Crunch
  (1, 1, 12,   13, 22.736446),  // Cocoa Puffs
  (2, 1, 11,   13, 32.207582),  // Froot Loops
  (1, 2, 12,   11, 21.871292),  // Honey Graham Ohs
  (2, 1, 16,   8,  36.187559),  // Wheaties Honey Gold
  (6, 2, 17,   1,  50.764999),  // Cheerios
  (3, 2, 13,   7,  40.400208),  // Clusters
  (3, 3, 13,   4,  45.811716)), numPartitions = 2)

drmData.collect(::, 0 until 4)

val drmX = drmData(::, 0 until 4)
val y = drmData.collect(::, 4)
val drmXtX = drmX.t %*% drmX
val drmXty = drmX.t %*% y


val XtX = drmXtX.collect
val Xty = drmXty.collect(::, 0)
val beta = solve(XtX, Xty)

利用资源池和R进行可视化

资源池是Zeppelin的一个强大功能,它允许我们在解释器之间共享信息。一个有趣的技巧是获取我们在Mahout中的工作输出,并在其他语言中进行分析。

在Flink中设置资源池

在基于Spark的解释器中,资源池通过ZeppelinContext API访问。要从资源池中放入和获取内容,可以简单地完成

val myVal = 1
z.put("foo", myVal)
val myFetchedVal = z.get("foo")

为了将此功能添加到基于Flink的解释器中,我们声明以下内容

%flinkMahout

import org.apache.zeppelin.interpreter.InterpreterContext

val z = InterpreterContext.get().getResourcePool()

现在我们可以从%flinkMahout解释器以一致的方式访问资源池。

将变量从Mahout传递到R并进行绘图

在这个简单的例子中,我们使用Mahout(在Flink或Spark上,代码是相同的)来创建一个随机矩阵,然后对每个元素取Sin值。接着我们随机采样矩阵并创建一个以制表符分隔的字符串。最后,我们将该字符串传递给R,在R中它被读取为.tsv文件,并创建一个DataFrame,使用原生的R绘图库进行绘图。

val mxRnd = Matrices.symmetricUniformView(5000, 2, 1234)
val drmRand = drmParallelize(mxRnd)


val drmSin = drmRand.mapBlock() {case (keys, block) =>  
  val blockB = block.like()
  for (i <- 0 until block.nrow) {
    blockB(i, 0) = block(i, 0)
    blockB(i, 1) = Math.sin((block(i, 0) * 8))
  }
  keys -> blockB
}

z.put("sinDrm", org.apache.mahout.math.drm.drmSampleToTSV(drmSin, 0.85))

然后在R段落中...

%spark.r {"imageWidth": "400px"}

library("ggplot2")

sinStr = z.get("flinkSinDrm")

data <- read.table(text= sinStr, sep="\t", header=FALSE)

plot(data,  col="red")