跳至内容

Bodo平台SDK快速入门

本快速入门指南将引导您使用本地机器上安装的Bodo Platform SDK在Bodo平台上运行作业。

入门指南

安装

pip install bodosdk

创建工作区客户端

要使用Bodo平台API进行身份验证,您需要创建一个API令牌:

  1. 登录您的工作空间,网址为 https://platform.bodo.ai/。
  2. 在管理控制台中导航至API令牌页面。
  3. 生成一个令牌并复制客户端ID和密钥。

使用这些凭证来定义一个BodoWorkspaceClient以便与平台进行交互:

from bodosdk import BodoWorkspaceClient

my_workspace = BodoWorkspaceClient(
    client_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
    secret_key="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
)

创建集群

在您的工作区中创建一个单节点集群,使用最新可用的Bodo版本:

from bodosdk import BodoWorkspaceClient

my_workspace = BodoWorkspaceClient(
    client_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
    secret_key="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
)

my_cluster = my_workspace.ClusterClient.create(
    name='My first cluster',
    instance_type='c6i.large',
    workers_quantity=1
)
my_cluster.wait_for_status(['RUNNING'])
print(my_cluster.id)

运行Python任务

步骤1:编写作业脚本

访问 https://platform.bodo.ai 并在您的工作区中打开Jupyter笔记本。在您的主目录中创建以下test.py文件:

import bodo
import time
import numpy as np

@bodo.jit
def calc_pi(n):
    t1 = time.time()
    x = 2 * np.random.ranf(n) - 1
    y = 2 * np.random.ranf(n) - 1
    pi = 4 * np.sum(x**2 + y**2 < 1) / n
    print("Execution time:", time.time()-t1, "\nresult:", pi)

calc_pi(2 * 10**6)

步骤2:运行作业

使用SDK在您的集群上运行作业,等待其状态变为SUCCEED,然后检查其日志:

from bodosdk import BodoWorkspaceClient

my_workspace = BodoWorkspaceClient(
    client_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
    secret_key="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
)

my_cluster = my_workspace.ClusterClient.get("cluster_id")
my_job = my_cluster.run_job(
    code_type='PYTHON',
    source={'type': 'WORKSPACE', 'path': '/'},
    exec_file='test.py'
)

# Print stdout from job
print(my_job.wait_for_status(['SUCCEEDED']).get_stdout())

运行SQL作业

要运行SQL作业,请在https://platform.bodo.ai上创建一个test.sql文件和一个catalog。然后按如下方式运行作业:

from bodosdk import BodoWorkspaceClient

my_workspace = BodoWorkspaceClient(
    client_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
    secret_key="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
)

my_cluster = my_workspace.ClusterClient.get("cluster_id")
my_job = my_cluster.run_job(
    code_type='SQL',
    source={'type': 'WORKSPACE', 'path': '/'},
    exec_file='test.sql',
    catalog="MyCatalog"
)

# 打印作业的标准输出
print(my_sql_job.wait_for_status(['SUCCEEDED']).get_stdout())

执行SQL查询

通过仅传递查询文本执行SQL查询,如下所示:

from bodosdk import BodoWorkspaceClient

my_workspace = BodoWorkspaceClient(
    client_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
    secret_key="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
)

my_cluster = my_workspace.ClusterClient.get("cluster_id")

# Execute query
my_sql_job = my_cluster.run_sql_query(sql_query="SELECT 1", catalog="MyCatalog")

# Print stdout from job
print(my_sql_job.wait_for_status(['SUCCEEDED']).get_stdout())

连接器

通过游标使用集群连接器执行SQL查询:

from bodosdk import BodoWorkspaceClient

my_workspace = BodoWorkspaceClient(
    client_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
    secret_key="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
)

my_cluster = my_workspace.ClusterClient.get("cluster_id")

# Connect and execute query
connection = my_cluster.connect('MyCatalog')
result = connection.cursor().execute("SELECT 1").fetchone()
print(result)

另请参阅