Apache Zeppelin SDK - 会话 API
概述
Session api 是 Zeppelin 的高级 API。在这个 API 中没有 Zeppelin 的概念(如笔记、段落)。最重要的是 ZSession,它代表一个正在运行的解释器进程。 创建一个 ZSession 非常简单,其 API 也非常直观,我们可以在下面看到一个具体的例子。
如何使用ZSession
创建一个ZSession非常简单,你需要提供ClientConfig、解释器,并且你可以通过指定其解释器属性来自定义你的ZSession。
在创建ZSession之后,您需要在运行任何代码之前启动它。ZSession的生命周期由您控制,您需要显式调用stop方法,否则解释器进程将继续运行。
ZSession session = null;
try {
ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
Map<String, String> intpProperties = new HashMap<>();
intpProperties.put("spark.master", "local[*]");
session = ZSession.builder()
.setClientConfig(clientConfig)
.setInterpreter("spark")
.setIntpProperties(intpProperties)
.build();
session.start();
System.out.println("Spark Web UI: " + session.getWeburl());
// scala (single result)
ExecuteResult result = session.execute("println(sc.version)");
System.out.println("Spark Version: " + result.getResults().get(0).getData());
// scala (multiple result)
result = session.execute("println(sc.version)\n" +
"val df = spark.createDataFrame(Seq((1,\"a\"), (2,\"b\")))\n" +
"z.show(df)");
// The first result is text output
System.out.println("Result 1: type: " + result.getResults().get(0).getType() +
", data: " + result.getResults().get(0).getData() );
// The second result is table output
System.out.println("Result 2: type: " + result.getResults().get(1).getType() +
", data: " + result.getResults().get(1).getData() );
System.out.println("Spark Job Urls:\n" + StringUtils.join(result.getJobUrls(), "\n"));
// error output
result = session.execute("1/0");
System.out.println("Result status: " + result.getStatus() +
", data: " + result.getResults().get(0).getData());
// pyspark
result = session.execute("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\n" +
"df.registerTempTable('df')\n" +
"df.show()");
System.out.println("PySpark dataframe: " + result.getResults().get(0).getData());
// matplotlib
result = session.execute("ipyspark", "%matplotlib inline\n" +
"import matplotlib.pyplot as plt\n" +
"plt.plot([1,2,3,4])\n" +
"plt.ylabel('some numbers')\n" +
"plt.show()");
System.out.println("Matplotlib result, type: " + result.getResults().get(0).getType() +
", data: " + result.getResults().get(0).getData());
// sparkr
result = session.execute("r", "df <- as.DataFrame(faithful)\nhead(df)");
System.out.println("Sparkr dataframe: " + result.getResults().get(0).getData());
// spark sql
result = session.execute("sql", "select * from df");
System.out.println("Spark Sql dataframe: " + result.getResults().get(0).getData());
// spark invalid sql
result = session.execute("sql", "select * from unknown_table");
System.out.println("Result status: " + result.getStatus() +
", data: " + result.getResults().get(0).getData());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (session != null) {
try {
session.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
以下是ZSession的API列表。
public void start() throws Exception
public void start(MessageHandler messageHandler) throws Exception
public void stop() throws Exception
public ExecuteResult execute(String code) throws Exception
public ExecuteResult execute(String subInterpreter,
Map<String, String> localProperties,
String code,
StatementMessageHandler messageHandler) throws Exception
public ExecuteResult submit(String code) throws Exception
public ExecuteResult submit(String subInterpreter,
Map<String, String> localProperties,
String code,
StatementMessageHandler messageHandler) throws Exception
public void cancel(String statementId) throws Exception
public ExecuteResult queryStatement(String statementId) throws Exception
public ExecuteResult waitUntilFinished(String statementId) throws Exception
示例
有关session api的更详细用法,您可以查看模块zeppelin-client-examples中的示例