Skip to main content
Version: 4.0.x

Python 函数包

Python 函数支持以下三种打包格式:

一个Python文件

要将一个Python函数打包成一个Python文件,请完成以下步骤。

  1. 编写一个Python函数。

    from pulsar import Function #  import the Function module from Pulsar

    # The classic ExclamationFunction that appends an exclamation at the end
    # of the input
    class ExclamationFunction(Function):
    def __init__(self):
    pass

    def process(self, input, context):
    return input + '!'

    在这个例子中,当你编写一个Python函数时,你需要继承Function类并实现process()方法。

    process() 主要有两个参数:

    • input 表示您的输入。

    • context 表示由 Pulsar Function 暴露的接口。您可以根据提供的 context 对象在 Python 函数中获取属性。

  2. 安装一个Python客户端。Python函数的实现依赖于Python客户端。

    pip install pulsar-client==2.10.0

    并安装protobuf工具以生成proto文件:

    pip install 'protobuf==3.20.*'
  3. 将Python函数文件复制到Pulsar镜像中。

    docker exec -it [CONTAINER ID] /bin/bash
    docker cp <path of Python function file> CONTAINER ID:/pulsar
  4. 使用以下命令运行Python函数。

    ./bin/pulsar-admin functions localrun \
    --classname <Python Function file name>.<Python Function class name> \
    --py <absolute path of Python Function file> \
    --inputs persistent://public/default/my-topic-1 \
    --output persistent://public/default/test-1 \
    --tenant public \
    --namespace default \
    --name PythonFunction

    以下日志表明Python函数已成功启动。

     ...
    07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
    ...

ZIP文件

要将Python函数打包成ZIP文件,请完成以下步骤。

  1. 准备ZIP文件。

     Assuming the zip file is named as `func.zip`, unzip the `func.zip` folder:
    "func/src"
    "func/requirements.txt"
    "func/deps"

    exclamation.zip文件为例。示例的内部结构如下。

     .
    ├── deps
    │ └── sh-1.12.14-py2.py3-none-any.whl
    └── src
    └── exclamation.py
  2. 将ZIP文件复制到Pulsar镜像中。

     docker exec -it [CONTAINER ID] /bin/bash
    docker cp <path of ZIP file> CONTAINER ID:/pulsar
  3. 使用以下命令运行Python函数。

    ./bin/pulsar-admin functions localrun \
    --classname exclamation \
    --py <absolute path of ZIP file> \
    --inputs persistent://public/default/in-topic \
    --output persistent://public/default/out-topic \
    --tenant public \
    --namespace default \
    --name PythonFunction

    以下日志表明Python函数已成功启动。

     ...
    07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
    ...

PIP

note

PIP 方法仅在 Kubernetes 运行时中受支持。

要使用PIP打包一个Python函数,请完成以下步骤。

  1. 配置functions_worker.yml文件。

     #### Kubernetes Runtime ####
    installUserCodeDependencies: true
  2. 编写你的Python函数。

    from pulsar import Function
    import js2xml

    # The classic ExclamationFunction that appends an exclamation at the end
    # of the input
    class ExclamationFunction(Function):
    def __init__(self):
    pass

    def process(self, input, context):
    # add your logic
    return input + '!'

    您可以引入额外的依赖项。当Python函数检测到当前使用的文件是whl并且指定了installUserCodeDependencies参数时,系统会使用pip install命令来安装Python函数中所需的依赖项。

  3. 生成whl文件。

    cd $PULSAR_HOME/pulsar-functions/scripts/python
    chmod +x generate.sh
    ./generate.sh <path of your Python Function> <path of the whl output dir> <the version of whl>
    # e.g: ./generate.sh /path/to/python /path/to/python/output 1.0.0

    输出被写入 /path/to/python/output:

     -rw-r--r--  1 root  staff   1.8K  8 27 14:29 pulsarfunction-1.0.0-py2-none-any.whl
    -rw-r--r-- 1 root staff 1.4K 8 27 14:29 pulsarfunction-1.0.0.tar.gz
    -rw-r--r-- 1 root staff 0B 8 27 14:29 pulsarfunction.whl