代码API (.code)#

预计阅读时间:10分钟

本指南的目标是帮助您熟悉PySyft的代码API。您将学习:

  • 编写一个Syft函数

  • 测试一个Syft函数

什么是Syft函数?#

Syft函数是指任何用@sy.syft_function装饰器修饰的Python函数。这意味着您可以无缝编写任意Python代码,并直接在PySyft中使用它。

该装饰器创建一个函数对象,该对象会被PySyft识别,并允许您创建用于远程执行的代码请求。

以下是一个快速示例,展示如何创建和使用Syft函数。

Hide code cell source
import syft as sy
import pandas as pd

node = sy.orchestra.launch(name="demo_datasite", port="auto", dev_mode=False, reset=True)

admin_client = sy.login(
    url='localhost',
    port=node.port,
    email="[email protected]",
    password="changethis",
)

df = pd.DataFrame({'A': [1, 2, 3], 'B': [10, 20, 30]})
mock_df = pd.DataFrame({'A': [1, 2, 1], 'B': [20, 10, 20]})

main_contributor = sy.Contributor(
    name='John Doe',
    role='Uploader',
    email='[email protected]'
)

asset = sy.Asset(
    name='demo_asset',
    data=df,
    mock=mock_df,
    contributors=[main_contributor]
)

dataset = sy.Dataset(
    name='Demo Dataset',
    description='Demo Dataset',
    asset_list=[asset],
    contributors=[main_contributor]
)

admin_client.upload_dataset(dataset)
admin_client.settings.allow_guest_signup(enable=True)

首先,数据科学家会连接到域并探索可用的数据集。(有关数据站点上用户账户的更多详情,请参阅用户API。)

import syft as sy

# connect to the Datasite
datasite = sy.login_as_guest(url='localhost', port=node.port).register(
    email='[email protected]',
    name='Data Scientist',
    password='123',
    password_verify='123'
)
ds_client = sy.login(
    url='localhost', port=node.port,
    email='[email protected]',
    password='123'
)

ds_client.datasets
ds_client.datasets[0].assets[0]

检查完模拟数据集后,数据科学家可以编写一个Python函数原型进行分析(使用模拟数据集)。

def example_function(private_dataset):
    return private_dataset.sum()

example_function(ds_client.datasets[0].assets[0].mock)

然后,数据科学家可以将Python函数转换为Syft函数(使用@sy.syft_function装饰器),并向数据所有者的Datasite提交代码请求。

import syft as sy
from syft.service.policy.policy import ExactMatch, SingleExecutionExactOutput

@sy.syft_function(
    input_policy=ExactMatch(data=ds_client.datasets[0].assets[0]),
    output_policy=SingleExecutionExactOutput()
)
def example_function(private_dataset):
    return private_dataset.sum()

ds_client.code.request_code_execution(example_function)

在上面的示例中,example_function变成了一个Syft函数,一旦获得批准就可以在私有数据集上远程执行。

输入/输出策略#

输入和输出策略是定义哪些数据可以进入代码请求以及哪些数据可以离开代码请求的规则。它们主要用于确保代码提交与预期使用的数据集正确配对。

输入策略 处理类似以下问题:

您的代码可以在哪些数据集或资产上运行?

输入策略确保代码在指定的资产上运行(作为参数传递给函数)。这意味着已批准的代码请求无法在任何其他资产上运行。

输出策略处理以下问题:

你的代码可以运行多少次?

输出策略用于在执行之间维护状态。它们对于施加限制非常有用,例如仅允许代码执行一定次数。这让数据所有者能够控制代码请求可以执行的次数以及输出结构的形式。

你可以在Syft策略指南中了解更多关于IO策略的内容。

编写Syft函数#

由于Syft函数是一种设计用于远程工作流的对象,在编写时需要考虑一些方面。

函数主体#

函数体内不应包含任何对函数作用域外对象的引用。这包括任何:

  • 对象

  • 函数

  • 模块

编写Syft函数

一个通用的经验法则是,Syft函数应该是自包含的。

以下是一些示例:

🚫 不要使用函数作用域之外的变量。

CONST = 10

@sy.syft_function()
def example():
    return CONST * 2

务必在函数内部定义所有使用的变量。

@sy.syft_function()
def example():
    CONST = 10
    return CONST * 2

🚫 不要使用函数作用域之外的函数。

def helper(x):
    return x ** 2

@sy.syft_function()
def example():
    return helper(10)

务必在Syft函数内部定义辅助函数。

@sy.syft_function()
def example():
    def helper(x):
        return x ** 2
    return helper(10)

🚫 不要在函数作用域外导入模块。

import numpy as np

@sy.syft_function()
def example():
    return np.sum([1, 2, 3])

务必在Syft函数内部导入所使用的模块。

@sy.syft_function()
def example():
    import numpy as np
    return np.sum([1, 2, 3])

允许的返回类型#

PySyft 拥有自定义的对象序列化实现,因此只有这些类型可以作为 Syft 函数的返回类型。

以下是PySyft可以序列化的完整对象列表:

  • Python 基础类型(包括集合)

  • pandas.DataFrame

  • pandas.Series

  • pandas.Timestamp

  • numpy.ndarray

  • numpy 数值类型

  • datetime.date

  • datetime.time

  • datetime.datetime

  • result.Ok

  • result.Err

  • result.Result

  • pymongo.collection.Collection

  • io.BytesIO

  • inspect.Signature

  • inspect.Parameter

序列化过程是递归的,因此上述任何数据类型的组合都能正常工作(例如包含numpy数组的字典)。

在Syft函数中使用其他数据类型作为返回值可能会导致错误。

不过,如果您需要使用PySyft无法序列化的数据类型,可以将其转换为受支持的数据类型作为临时解决方案。例如,您可以在从函数返回值之前,将包含图表的图像转换为二进制缓冲区:

@sy.syft_function()
def example():
    from io import BytesIO
    import matplotlib.pyplot as plt
    import numpy as np

    x = np.arange(10)
    y = np.sin(x)

    plt.plot(x, y)
    
    figfile = BytesIO()
    plt.savefig(figfile, format='png')
    return figfile
from io import BytesIO

b = BytesIO()

type(b)

测试一个函数#

为了提高代码请求获得批准的可能性,在创建代码请求之前测试您的函数非常重要。您可以在本地和远程进行测试。

本地测试#

要在本地测试一个函数,只需使用模拟数据运行您的实验,无需创建Syft函数

def example(data):
    return data.sum()

mock_data = ds_client.datasets[0].assets[0].mock

example(mock_data)

如果一切看起来都没问题,就将其转换为Syft函数并在服务器端进行测试。

在模拟服务器中进行测试#

您可以在一个"临时"服务器上测试Syft函数,该服务器模拟数据所有者的服务器(仅使用模拟数据),只需创建一个Syft函数并调用它即可。

资产限制

在服务端测试函数时,您需要传入完整的资产。PySyft会自动选择模拟数据来调用底层函数。

@sy.syft_function(
    input_policy=ExactMatch(data=ds_client.datasets[0].assets[0]),
    output_policy=SingleExecutionExactOutput()
)
def example(data):
    return data.sum()

data = ds_client.datasets[0].assets[0]

example(data=data)

在远程服务器上进行测试#

在某些场景下,直接在数据所有者服务器上测试代码更为合理(在请求获批前仅使用模拟数据)。例如,当实验性质涉及大量计算时,这种方法可能很有用。在这种情况下,数据所有者可以授权数据科学家访问计算集群,以便在模拟数据上测试其代码。

警告

要实现这一点,数据所有者必须为使用此功能的外部研究人员启用模拟执行。

# the Datasite admin must enable mock execution for a specific user

admin_client.users[1].allow_mock_execution()
# the data scientist can now test their code on the remote server

@sy.syft_function(
    input_policy=ExactMatch(data=ds_client.datasets[0].assets[0]),
    output_policy=SingleExecutionExactOutput()
)
def example(data):
    return data.sum()

ds_client.code.submit(example)

提交Syft函数后,您可以使用client.code.FUNCTION(args)进行测试

ds_client.code
ds_client.code.example(data=mock_data)

阻塞式与非阻塞式执行#

当提交代码请求在数据所有者的服务器上执行时,默认情况下会以阻塞方式运行。这意味着在计算完成之前,您的客户端将无法使用。

为了缓解这个问题,您可以发送非阻塞请求,这些请求会被排队,仅在服务器有足够可用资源时执行。有关如何处理非阻塞请求的更多详情,请参阅Jobs API

嵌套代码请求#

当处理需要更多计算能力的大型数据时,单个代码执行环境可能不够用。

作为数据所有者,您可以在集群中部署PySyft(详情请参阅部署指南和WorkerPool API),并允许数据科学家使用类似MapReduce模型进行计算。

例如,以下是如何在PySyft中提交聚合计算的方式:

asset = ds_client.datasets[0].assets[0]
mock_data = ds_client

# setup processing functions
@sy.syft_function()
def process_batch(batch):
    return batch.to_numpy().sum()


@sy.syft_function()
def aggregate_job(job_results):
    return sum(job_results)

    
# Syft function with nested requests
@sy.syft_function_single_use(data=asset)
def process_all(datasite, data):
    import numpy as np
    
    job_results = []
    for batch in np.array_split(data, 2):
        batch_job = datasite.launch_job(process_batch, batch=batch)
        job_results += [batch_job.result]

    job = datasite.launch_job(aggregate_job, job_results=job_results)
    return job.result


# submit the processing functions so they are available on the Data Owner's server
ds_client.code.submit(process_batch)
ds_client.code.submit(aggregate_job)

# create a code request
ds_client.code.request_code_execution(process_all)