Bodo平台SDK快速入门¶
本快速入门指南将引导您使用本地机器上安装的Bodo Platform SDK在Bodo平台上运行作业。
入门指南¶
安装¶
创建工作区客户端¶
要使用Bodo平台API进行身份验证,您需要创建一个API令牌:
- 登录您的工作空间,网址为 https://platform.bodo.ai/。
- 在管理控制台中导航至API令牌页面。
- 生成一个令牌并复制客户端ID和密钥。
使用这些凭证来定义一个BodoWorkspaceClient以便与平台进行交互:
from bodosdk import BodoWorkspaceClient
my_workspace = BodoWorkspaceClient(
client_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
secret_key="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
)
创建集群¶
在您的工作区中创建一个单节点集群,使用最新可用的Bodo版本:
from bodosdk import BodoWorkspaceClient
my_workspace = BodoWorkspaceClient(
client_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
secret_key="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
)
my_cluster = my_workspace.ClusterClient.create(
name='My first cluster',
instance_type='c6i.large',
workers_quantity=1
)
my_cluster.wait_for_status(['RUNNING'])
print(my_cluster.id)
运行Python任务¶
步骤1:编写作业脚本
访问 https://platform.bodo.ai 并在您的工作区中打开Jupyter笔记本。在您的主目录中创建以下test.py文件:
import bodo
import time
import numpy as np
@bodo.jit
def calc_pi(n):
t1 = time.time()
x = 2 * np.random.ranf(n) - 1
y = 2 * np.random.ranf(n) - 1
pi = 4 * np.sum(x**2 + y**2 < 1) / n
print("Execution time:", time.time()-t1, "\nresult:", pi)
calc_pi(2 * 10**6)
步骤2:运行作业
使用SDK在您的集群上运行作业,等待其状态变为SUCCEED,然后检查其日志:
from bodosdk import BodoWorkspaceClient
my_workspace = BodoWorkspaceClient(
client_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
secret_key="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
)
my_cluster = my_workspace.ClusterClient.get("cluster_id")
my_job = my_cluster.run_job(
code_type='PYTHON',
source={'type': 'WORKSPACE', 'path': '/'},
exec_file='test.py'
)
# Print stdout from job
print(my_job.wait_for_status(['SUCCEEDED']).get_stdout())
运行SQL作业¶
要运行SQL作业,请在https://platform.bodo.ai上创建一个test.sql文件和一个catalog。然后按如下方式运行作业:
from bodosdk import BodoWorkspaceClient
my_workspace = BodoWorkspaceClient(
client_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
secret_key="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
)
my_cluster = my_workspace.ClusterClient.get("cluster_id")
my_job = my_cluster.run_job(
code_type='SQL',
source={'type': 'WORKSPACE', 'path': '/'},
exec_file='test.sql',
catalog="MyCatalog"
)
# 打印作业的标准输出
print(my_sql_job.wait_for_status(['SUCCEEDED']).get_stdout())
执行SQL查询¶
通过仅传递查询文本执行SQL查询,如下所示:
from bodosdk import BodoWorkspaceClient
my_workspace = BodoWorkspaceClient(
client_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
secret_key="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
)
my_cluster = my_workspace.ClusterClient.get("cluster_id")
# Execute query
my_sql_job = my_cluster.run_sql_query(sql_query="SELECT 1", catalog="MyCatalog")
# Print stdout from job
print(my_sql_job.wait_for_status(['SUCCEEDED']).get_stdout())
连接器¶
通过游标使用集群连接器执行SQL查询:
from bodosdk import BodoWorkspaceClient
my_workspace = BodoWorkspaceClient(
client_id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
secret_key="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
)
my_cluster = my_workspace.ClusterClient.get("cluster_id")
# Connect and execute query
connection = my_cluster.connect('MyCatalog')
result = connection.cursor().execute("SELECT 1").fetchone()
print(result)
另请参阅