Apache Zeppelin SDK - 会话 API

概述

Session api 是 Zeppelin 的高级 API。在这个 API 中没有 Zeppelin 的概念(如笔记、段落)。最重要的是 ZSession,它代表一个正在运行的解释器进程。 创建一个 ZSession 非常简单,其 API 也非常直观,我们可以在下面看到一个具体的例子。

如何使用ZSession

创建一个ZSession非常简单,你需要提供ClientConfig、解释器,并且你可以通过指定其解释器属性来自定义你的ZSession。

在创建ZSession之后,您需要在运行任何代码之前启动它。ZSession的生命周期由您控制,您需要显式调用stop方法,否则解释器进程将继续运行。

ZSession session = null;
try {
  ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
  Map<String, String> intpProperties = new HashMap<>();
  intpProperties.put("spark.master", "local[*]");

  session = ZSession.builder()
          .setClientConfig(clientConfig)
          .setInterpreter("spark")
          .setIntpProperties(intpProperties)
          .build();

  session.start();
  System.out.println("Spark Web UI: " + session.getWeburl());

  // scala (single result)
  ExecuteResult result = session.execute("println(sc.version)");
  System.out.println("Spark Version: " + result.getResults().get(0).getData());

  // scala (multiple result)
  result = session.execute("println(sc.version)\n" +
          "val df = spark.createDataFrame(Seq((1,\"a\"), (2,\"b\")))\n" +
          "z.show(df)");

  // The first result is text output
  System.out.println("Result 1: type: " + result.getResults().get(0).getType() +
          ", data: " + result.getResults().get(0).getData() );
  // The second result is table output
  System.out.println("Result 2: type: " + result.getResults().get(1).getType() +
          ", data: " + result.getResults().get(1).getData() );
  System.out.println("Spark Job Urls:\n" + StringUtils.join(result.getJobUrls(), "\n"));

  // error output
  result = session.execute("1/0");
  System.out.println("Result status: " + result.getStatus() +
          ", data: " + result.getResults().get(0).getData());

  // pyspark
  result = session.execute("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\n" +
          "df.registerTempTable('df')\n" +
          "df.show()");
  System.out.println("PySpark dataframe: " + result.getResults().get(0).getData());

  // matplotlib
  result = session.execute("ipyspark", "%matplotlib inline\n" +
          "import matplotlib.pyplot as plt\n" +
          "plt.plot([1,2,3,4])\n" +
          "plt.ylabel('some numbers')\n" +
          "plt.show()");
  System.out.println("Matplotlib result, type: " + result.getResults().get(0).getType() +
          ", data: " + result.getResults().get(0).getData());

  // sparkr
  result = session.execute("r", "df <- as.DataFrame(faithful)\nhead(df)");
  System.out.println("Sparkr dataframe: " + result.getResults().get(0).getData());

  // spark sql
  result = session.execute("sql", "select * from df");
  System.out.println("Spark Sql dataframe: " + result.getResults().get(0).getData());

  // spark invalid sql
  result = session.execute("sql", "select * from unknown_table");
  System.out.println("Result status: " + result.getStatus() +
          ", data: " + result.getResults().get(0).getData());
} catch (Exception e) {
  e.printStackTrace();
} finally {
  if (session != null) {
    try {
      session.stop();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

以下是ZSession的API列表。

public void start() throws Exception

public void start(MessageHandler messageHandler) throws Exception

public void stop() throws Exception

public ExecuteResult execute(String code) throws Exception

public ExecuteResult execute(String subInterpreter,
                             Map<String, String> localProperties,
                             String code,
                             StatementMessageHandler messageHandler) throws Exception 

public ExecuteResult submit(String code) throws Exception 

public ExecuteResult submit(String subInterpreter,
                            Map<String, String> localProperties,
                            String code,
                            StatementMessageHandler messageHandler) throws Exception
                            
public void cancel(String statementId) throws Exception
 
public ExecuteResult queryStatement(String statementId) throws Exception

public ExecuteResult waitUntilFinished(String statementId) throws Exception

示例

有关session api的更详细用法,您可以查看模块zeppelin-client-examples中的示例