Zeppelin 教程
本教程将引导您了解一些基本的Zeppelin概念。我们假设您已经安装了Zeppelin。如果没有,请先查看这里。
Zeppelin当前的主要后端处理引擎是Apache Spark。如果您对这个系统不熟悉,您可能想先了解一下它如何处理数据,以便充分利用Zeppelin。
本地文件教程
数据精炼
在开始Zeppelin教程之前,你需要下载bank.zip。
首先,要将csv格式的数据转换为Bank对象的RDD,请运行以下脚本。这还将使用filter函数删除标题。
val bankText = sc.textFile("yourPath/bank/bank-full.csv")
case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)
// split each line, filter out header (starts with "age"), and map it into Bank case class
val bank = bankText.map(s=>s.split(";")).filter(s=>s(0)!="\"age\"").map(
s=>Bank(s(0).toInt,
s(1).replaceAll("\"", ""),
s(2).replaceAll("\"", ""),
s(3).replaceAll("\"", ""),
s(5).replaceAll("\"", "").toInt
)
)
// convert to DataFrame and create temporal table
bank.toDF().registerTempTable("bank")
数据检索
假设我们想查看bank中的年龄分布。为此,请运行:
%sql select age, count(1) from bank where age < 30 group by age order by age
您可以通过将30替换为${maxAge=30}来创建用于设置年龄条件的输入框。
%sql select age, count(1) from bank where age < ${maxAge=30} group by age order by age
现在我们想查看特定婚姻状况的年龄分布,并添加组合框以选择婚姻状况。运行:
%sql select age, count(1) from bank where marital="${marital=single,single|divorced|married}" group by age order by age
流数据教程
数据精炼
由于本教程基于Twitter的示例推文流,您必须配置与Twitter帐户的认证。为此,请查看Twitter凭证设置。获取API密钥后,您应在以下脚本中使用您的API密钥填写与凭证相关的值(apiKey, apiSecret, accessToken, accessTokenSecret)。
这将创建一个Tweet对象的RDD,并将这些流数据注册为一个表:
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
val configs = new HashMap[String, String] ++= Seq(
"apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
println("Configuring Twitter OAuth")
configs.foreach{ case(key, value) =>
if (value.trim.isEmpty) {
throw new Exception("Error setting authentication - value for " + key + " not set")
}
val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
System.setProperty(fullKey, value.trim)
println("\tProperty " + fullKey + " set as [" + value.trim + "]")
}
println()
}
// Configure Twitter credentials
val apiKey = "xxxxxxxxxxxxxxxxxxxxxxxxx"
val apiSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
val accessToken = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
val accessTokenSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
import org.apache.spark.streaming.twitter._
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))
case class Tweet(createdAt:Long, text:String)
twt.map(status=>
Tweet(status.getCreatedAt().getTime()/1000, status.getText())
).foreachRDD(rdd=>
// Below line works only in spark 1.3.0.
// For spark 1.1.x and spark 1.2.x,
// use rdd.registerTempTable("tweets") instead.
rdd.toDF().registerAsTable("tweets")
)
twt.print
ssc.start()
数据检索
对于以下每个脚本,每次点击运行按钮时,您都会看到不同的结果,因为它是基于实时数据的。
让我们从提取最多10条包含girl这个词的推文开始。
%sql select * from tweets where text like '%girl%' limit 10
这次假设我们想查看在过去60秒内每秒创建了多少条推文。为此,请运行:
%sql select createdAt, count(1) from tweets group by createdAt order by createdAt
你可以创建用户自定义函数并在Spark SQL中使用它。让我们通过创建一个名为sentiment的函数来尝试一下。这个函数将返回对参数的三种态度之一(积极、消极、中立)。
def sentiment(s:String) : String = {
val positive = Array("like", "love", "good", "great", "happy", "cool", "the", "one", "that")
val negative = Array("hate", "bad", "stupid", "is")
var st = 0;
val words = s.split(" ")
positive.foreach(p =>
words.foreach(w =>
if(p==w) st = st+1
)
)
negative.foreach(p=>
words.foreach(w=>
if(p==w) st = st-1
)
)
if(st>0)
"positivie"
else if(st<0)
"negative"
else
"neutral"
}
// Below line works only in spark 1.3.0.
// For spark 1.1.x and spark 1.2.x,
// use sqlc.registerFunction("sentiment", sentiment _) instead.
sqlc.udf.register("sentiment", sentiment _)
要检查人们对女孩的看法,使用我们上面制作的sentiment函数,运行以下代码:
%sql select sentiment(text), count(1) from tweets where text like '%girl%' group by sentiment(text)