Python 函数包
Python 函数支持以下三种打包格式:
一个Python文件
要将一个Python函数打包成一个Python文件,请完成以下步骤。
-
编写一个Python函数。
from pulsar import Function # import the Function module from Pulsar
# The classic ExclamationFunction that appends an exclamation at the end
# of the input
class ExclamationFunction(Function):
def __init__(self):
pass
def process(self, input, context):
return input + '!'在这个例子中,当你编写一个Python函数时,你需要继承Function类并实现
process()
方法。process()
主要有两个参数:-
input
表示您的输入。 -
context
表示由 Pulsar Function 暴露的接口。您可以根据提供的 context 对象在 Python 函数中获取属性。
-
-
安装一个Python客户端。Python函数的实现依赖于Python客户端。
pip install pulsar-client==2.10.0
并安装protobuf工具以生成proto文件:
pip install 'protobuf==3.20.*'
-
将Python函数文件复制到Pulsar镜像中。
docker exec -it [CONTAINER ID] /bin/bash
docker cp <path of Python function file> CONTAINER ID:/pulsar -
使用以下命令运行Python函数。
./bin/pulsar-admin functions localrun \
--classname <Python Function file name>.<Python Function class name> \
--py <absolute path of Python Function file> \
--inputs persistent://public/default/my-topic-1 \
--output persistent://public/default/test-1 \
--tenant public \
--namespace default \
--name PythonFunction以下日志表明Python函数已成功启动。
...
07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
...
ZIP文件
要将Python函数打包成ZIP文件,请完成以下步骤。
-
准备ZIP文件。
Assuming the zip file is named as `func.zip`, unzip the `func.zip` folder:
"func/src"
"func/requirements.txt"
"func/deps"以exclamation.zip文件为例。示例的内部结构如下。
.
├── deps
│ └── sh-1.12.14-py2.py3-none-any.whl
└── src
└── exclamation.py -
将ZIP文件复制到Pulsar镜像中。
docker exec -it [CONTAINER ID] /bin/bash
docker cp <path of ZIP file> CONTAINER ID:/pulsar -
使用以下命令运行Python函数。
./bin/pulsar-admin functions localrun \
--classname exclamation \
--py <absolute path of ZIP file> \
--inputs persistent://public/default/in-topic \
--output persistent://public/default/out-topic \
--tenant public \
--namespace default \
--name PythonFunction以下日志表明Python函数已成功启动。
...
07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
...
PIP
note
PIP 方法仅在 Kubernetes 运行时中受支持。
要使用PIP打包一个Python函数,请完成以下步骤。
-
配置
functions_worker.yml
文件。#### Kubernetes Runtime ####
installUserCodeDependencies: true -
编写你的Python函数。
from pulsar import Function
import js2xml
# The classic ExclamationFunction that appends an exclamation at the end
# of the input
class ExclamationFunction(Function):
def __init__(self):
pass
def process(self, input, context):
# add your logic
return input + '!'您可以引入额外的依赖项。当Python函数检测到当前使用的文件是
whl
并且指定了installUserCodeDependencies
参数时,系统会使用pip install
命令来安装Python函数中所需的依赖项。 -
生成
whl
文件。cd $PULSAR_HOME/pulsar-functions/scripts/python
chmod +x generate.sh
./generate.sh <path of your Python Function> <path of the whl output dir> <the version of whl>
# e.g: ./generate.sh /path/to/python /path/to/python/output 1.0.0输出被写入
/path/to/python/output
:-rw-r--r-- 1 root staff 1.8K 8 27 14:29 pulsarfunction-1.0.0-py2-none-any.whl
-rw-r--r-- 1 root staff 1.4K 8 27 14:29 pulsarfunction-1.0.0.tar.gz
-rw-r--r-- 1 root staff 0B 8 27 14:29 pulsarfunction.whl