任务API (.jobs)#
预计阅读时间:13分钟
你将学习的内容#
本指南旨在帮助您理解如何使用jobs API以非阻塞方式远程执行代码,甚至实现并行执行。
简介#
PySyft 允许数据科学家在数据所有者的服务器上执行远程代码。每次执行代码时,都会创建一个作业来处理执行过程。该作业使用户能够监控其进度并在完成后获取结果。
什么是“job”?#
“任务”代表提交的异步执行代码请求。换句话说,它以非阻塞方式处理,允许用户在服务器处理请求时继续在笔记本中工作,而无需等待其完成。
这对于并行运行请求以处理大量数据特别有用。 然而,由于任务对于远程代码执行至关重要,PySyft也将其用于其他目的。
理解任务#
当任务提交到服务器时,会经历以下阶段:
作业提交: 客户端向服务器提交代码请求。这不会立即启动作业,而是指定要运行的代码。每当用户尝试运行代码时(无论是在模拟数据还是私有数据上),他们可以选择以非阻塞方式运行,这将提交一个新作业执行。
任务队列: 提交后,任务会被放入队列中。这确保了任务能够有序管理,并让服务器能高效处理多个任务。为了保证任务及时执行,数据所有者可以选择扩展可用工作节点的数量。
任务执行: 服务器按顺序从队列中获取任务。然后异步执行该任务,使服务器能够高效管理其资源,同时处理其他传入任务。
任务监控: 在执行过程中,服务器会持续跟踪任务的进度。这可能包括更新任务状态以及提供中间结果(如果适用)。
任务完成: 任务完成后,服务器会更新其状态以标记完成。任务结果随后将对用户的客户端可用。
使用作业实现并行化#
您可以通过调度作业多次并行运行相同代码来创建更复杂的分析。这通过嵌套作业和代码请求实现,可以模拟映射-归约管道或您偏好的其他方法。具体操作示例可参考此处,更多指南即将推出。
任务生命周期的示例#
现在,让我们通过实验了解任务经历的每个阶段,以掌握如何正确使用它们。
在本地开发服务器上试验任务
在本地试验任务以学习如何使用它们是非常好的。但请注意,如果您至少没有传递一个任务消费者(n_consumers=1)并创建一个任务生产者(create_producer=True),默认的本地开发服务器将无法执行任务。您可以在本地部署指南中了解更多相关信息。
让我们首先启动一个演示设置,以便创建一个虚拟示例。
Show code cell source
import syft as sy
import pandas as pd
node = sy.orchestra.launch(name="demo_datasite", port="auto", dev_mode=False, reset=True, n_consumers=1, create_producer=True)
admin_client = sy.login(
url='localhost',
port=node.port,
email="[email protected]",
password="changethis",
)
df = pd.DataFrame({'A': [1, 2, 3], 'B': [10, 20, 30]})
mock_df = pd.DataFrame({'A': [1, 2, 1], 'B': [20, 10, 20]})
main_contributor = sy.Contributor(
name='John Doe',
role='Uploader',
email='[email protected]'
)
asset = sy.Asset(
name='demo_asset',
data=df,
mock=mock_df,
contributors=[main_contributor]
)
dataset = sy.Dataset(
name='Demo Dataset',
description='Demo Dataset',
asset_list=[asset],
contributors=[main_contributor]
)
admin_client.upload_dataset(dataset)
admin_client.settings.allow_guest_signup(enable=True)
admin_client.register(
email='[email protected]',
name='Data Scientist',
password='123',
password_verify='123'
)
admin_client.users[-1].allow_mock_execution()
Show code cell output
Starting demo_datasite server on 0.0.0.0:53566
INFO: Started server process [19618]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:53566 (Press CTRL+C to quit)
Waiting for server to start Done.
You have launched a development server at http://0.0.0.0:53566.It is intended only for local use.
Logged into <demo_datasite: High side Datasite> as <[email protected]>
You are using a default password. Please change the password using `[your_client].account.set_password([new_password])`.
Uploading: demo_asset: 100%|██████████| 1/1 [00:00<00:00, 4.21it/s]
User details successfully updated.
首先,数据科学家将连接到域并获取可用数据的引用。
import syft as sy
ds_client = sy.login(url='localhost', port=node.port, email='[email protected]', password='123')
data_asset = ds_client.datasets[0].assets[0]
mock_asset = ds_client.datasets[0].assets[0].mock
Logged into <demo_datasite: High side Datasite> as <[email protected]>
提交并执行任务#
当用户指定了想要运行的代码后,就可以准备启动作业执行了。指定代码的主要方式是通过syft函数来实现。
指定作业代码的其他方式
在某些情况下,您想要运行的代码可能已被现有用户(例如希望运行数据科学家代码的管理员)或管理员自己(例如数据科学家可以直接运行可用的预定义自定义端点)指定。
我们在下面定义一个虚拟的syft函数来指定代码。
@sy.syft_function_single_use(data=data_asset)
def example_function(data):
print('Started execution..')
result = data.sum()
print('Finalized execution..')
return result
ds_client.code.request_code_execution(example_function)
Syft function 'example_function' successfully created. To add a code request, please create a project using `project = syft.Project(...)`, then use command `project.create_code_request`.
请求
ID: 4d4d9ca9e41a4e2b8a6a93f67627d6d8
请求时间:2024-10-01 18:56:55
状态:RequestStatus.PENDING
请求时间: Demo_datasite 类型为 Datasite
请求者: 数据科学家 ([email protected])
变更内容: 请求将 example_function (池ID:default-pool) 的权限状态更改为 RequestStatus.APPROVED。无嵌套请求。
请注意,到目前为止还没有创建任何任务。让我们启动两个任务在服务器上远程运行:
第一个运行在模拟数据上
第二个运行在私有数据上
权限:谁可以在模拟数据上运行任务?
只有在获得适当权限的情况下,才能在模拟数据上执行任务。具体而言,管理员必须明确允许数据科学家在其服务器资源上对模拟数据集执行实验。您可以在上面的设置中查看如何操作,或向服务器管理员申请这些权限。
我们使用code API来识别要运行的代码。
ds_client.code
用户代码列表
总计: 0
ds_client.code.example_function(data=mock_asset, blocking=False)
同样地,我们可以发起一个在私有数据上运行的任务。但除非请求获得批准,否则无法执行。因此,我们先让管理员完成审批,然后通过client.code API来启动执行。
admin_client.requests[-1].approve()
Approving request on change example_function for datasite demo_datasite
Request 4d4d9ca9e41a4e2b8a6a93f67627d6d8 changes applied
ds_client.code.example_function(data=data_asset, blocking=False)
现在我们可以通过访问以下链接查看所有已提交的任务及其状态:
ds_client.jobs
任务列表
总计: 0
权限:谁可以查看任务?
直观地说,数据科学家只能监控他们自己启动的任务。然而,管理员可以查看服务器上所有已启动或正在运行的任务。
监控任务进度#
状态#
我们可以在上表中通过client.jobs查看任务的当前状态。了解预期的状态类型很有帮助:
JobStatus.CREATED: 任务已提交至服务器队列;您无法加速处理过程,只能耐心等待。如果任务在队列中停留时间过长,请联系管理员。JobStatus.PROCESSING: 任务正在执行中;在此期间,您可以查看任务的日志,但请注意运行在私有数据上的任务日志不会立即可用JobStatus.ERRORED: 任务执行失败,预计日志中将显示堆栈跟踪信息JobStatus.COMPLETED: 任务已完成执行,您可以获取结果JobStatus.TERMINATING: 任务正在被终止且状态正在被清除;这种情况仅当用户终止任务或运行该任务的工作节点被强制关闭时发生JobStatus.INTERRUPTED: 任务被中断;这种情况也可能是由于用户操作导致的
如果您的任务卡住或中断,下方最后一节提供了进一步的指导。
日志#
为作业设置日志记录只需在代码中添加打印语句即可,因为stdout和stderr都会被重定向到日志存储区。
让我们从作业API获取一个任务:
job = ds_client.jobs[0]
job
我们可以查看其状态、启动时间、最近更新时间、被调度到哪个工作池,以及结果和日志部分。
如果结果缺失,通常会看到syft.service.action.action_data_empty.ObjectNotReady。否则,您的结果已经填充完毕。要查看日志,您可以运行:
job.wait()
任务结果#
进行中任务的结果#
如果您急于查看结果,可以通过使用.wait()方法来阻塞客户端,直到结果可用。
这可以通过以下方式实现:
ds_client.jobs[0].wait().get()
01/10/24 21:57:01 FUNCTION LOG (94bce081aad0429d8dd1c0e2dd8f441e): Started execution..
01/10/24 21:57:01 FUNCTION LOG (94bce081aad0429d8dd1c0e2dd8f441e): Finalized execution..
A 4
B 50
Name: 0, dtype: int64
已完成任务的结果#
如果任务已完成,可以通过以下方式轻松获取结果:
# Mock execution
ds_client.jobs[0].result.get()
A 4
B 50
Name: 0, dtype: int64
# Private, yet approved job
ds_client.jobs[1].result.get()
class ObjectNotReady:
id: str = 9582fde2c88a433f9f92f30bea0fd2c6
如果任务已完成,将立即返回结果!
调试失败的任务#
有时事情并不会如你所预期的那样运行。让我们来看三种情况:
我的任务没有被从队列中选取 (
CREATED)我的任务运行时间过长 (
PROCESSING)我的任务失败了 (
ERRORED)
情况1:我的任务未被拾取(已创建)#
如果您发现任务长时间处于CREATED状态,可能是队列积压严重,服务器无法并行处理过多任务。作为数据科学家,您难以自行调试该问题,请联系管理员处理。
管理员需要做什么?
如果队列处理速度非常缓慢,您可以通过client.worker_pools[0].scale(n=X)来调整能够消费队列任务的消费者/工作者数量。但如果这似乎不是原因所在,下一步就是检查容器和工作者的日志以调查问题根源。若您无法自行解决该情况,请通过Slack上的#support频道联系我们。
场景2:我的任务运行时间过长(处理中)#
您的代码可能运行时间过长。常见原因包括:
私有数据比模拟版本大得多
除了数据规模外,私有数据还存在显著差异,导致您的代码效率低下
服务器上没有你需要的硬件。例如,由于硬件限制,在某些服务器上训练机器学习模型或进行推理可能不切实际。
让我们看看如何使用这个功能。
@sy.syft_function_single_use(data=data_asset)
def my_func_long_execution(data):
print('Started execution..')
result = 0
for i in range(10000000000): # this will take a while
result += i**2
print('Finalized execution..')
return result
ds_client.code.request_code_execution(my_func_long_execution)
Syft function 'my_func_long_execution' successfully created. To add a code request, please create a project using `project = syft.Project(...)`, then use command `project.create_code_request`.
请求
编号: eb7d27421c4a4723b6df24ab6ccfeec7
请求时间:2024-10-01 18:57:02
状态:RequestStatus.PENDING
请求时间: Demo_datasite 类型为 Datasite
请求者: 数据科学家 ([email protected])
变更内容: 请求将 my_func_long_execution (池ID:default-pool) 的权限状态更改为 RequestStatus.APPROVED。无嵌套请求。
为了简化操作,让我们在模拟数据上执行它们:
ds_client.code.my_func_long_execution(data=mock_asset, blocking=False)
job = ds_client.jobs[-1]
job
我们可以稍等片刻,发现任务尚未完成。
import time as tt
tt.sleep(5) # wait for the job to be picked up from the queue
job.status
<JobStatus.CREATED: 'created'>
我们可以尝试估算该代码运行的合理时间。通常模拟数据和私有数据在规模上不会相似,低效的实现可能在获得批准后耗时更长。不过,如果超出您的等待阈值,您可以通过job.kill()终止它。
场景3:我的任务失败(错误状态)#
如果任务失败,你可能不知道从何入手。这里我们提供两个主要原因作为排查起点:
情况1 - 代码无法运行: 当你基于模拟数据编写原型代码时,在提交前至少要在模拟数据上测试一次以尽早发现问题,这一点至关重要。然而在某些情况下,由于数据结构或数据差异,能在模拟数据上运行的代码却无法在私有数据上完成执行。我们建议从检查日志中的错误回溯信息开始排查。如果仍无法解决,最好联系管理员以确定差异可能源自何处。
案例2 - 代码能运行,但PySyft执行失败:在极少数情况下,PySyft内部负责在相应容器中执行任务的组件可能会失败。第一步是重启任务。如果问题仍然存在,您可能发现了一个bug,我们建议您可以在
#support频道寻求帮助或在GitHub上提交问题。
@sy.syft_function_single_use(data=data_asset)
def my_func_bad_execution(data):
import time as tt
print('Started execution..')
result = data.mysum() # typo, won't run
print('Finalized execution..')
return result
ds_client.code.request_code_execution(my_func_bad_execution)
Syft function 'my_func_bad_execution' successfully created. To add a code request, please create a project using `project = syft.Project(...)`, then use command `project.create_code_request`.
请求
ID: 0de4c26e2281495288fe93d0f3ac02a7
请求时间:2024-10-01 18:57:09
状态:RequestStatus.PENDING
请求时间: Demo_datasite 类型为 Datasite
请求者: 数据科学家 ([email protected])
变更内容: 请求将 my_func_bad_execution(池ID:default-pool)的权限状态更改为 RequestStatus.APPROVED。无嵌套请求。
ds_client.code.my_func_bad_execution(data=mock_asset, blocking=False)
job = ds_client.jobs[-1]
job.wait()
01/10/24 21:57:12 FUNCTION LOG (6cc340bc281c4d0db023800b311c9611): Started execution..
01/10/24 21:57:12 FUNCTION LOG (6cc340bc281c4d0db023800b311c9611): Finalized execution..
01/10/24 21:57:12 FUNCTION LOG (6cc340bc281c4d0db023800b311c9611): Started execution..
01/10/24 21:57:12 FUNCTION LOG (6cc340bc281c4d0db023800b311c9611): Finalized execution..
01/10/24 21:57:22 FUNCTION LOG (07558529879445799f0d6d92f5ed8db4): Started execution..
如您所见,在此配置中只有管理员可以访问日志。如需调试该问题,您可以联系管理员,管理员可以检查回溯信息(如果可用):
admin_client.jobs[-1].logs()