使用API
下表概述了可用于在Java、Python和Go中开发Pulsar Functions的API。
| 接口 | 描述 | 使用场景 |
|---|---|---|
| Language-native interface for Java/Python | 不需要Pulsar特定的库或特殊依赖(仅需要核心库)。 | 不需要访问context的函数。 |
| Pulsar Functions SDK for Java/Python/Go | Pulsar特定的库,提供了一系列在语言原生接口中不可用的功能,例如状态管理或用户配置。 | 需要访问context的函数。 |
| Extended Pulsar Functions SDK for Java | Pulsar特定库的扩展,提供Java中的初始化和关闭接口。 | 需要初始化和释放外部资源的函数。 |
使用Java/Python的原生语言接口
语言原生接口提供了一种简单而干净的方法来编写Java/Python函数,通过在所有传入的字符串中添加一个感叹号并将输出字符串发布到一个主题。它没有外部依赖。
以下示例是语言原生函数。
- Java
- Python
要将一段Java代码用作“语言原生”函数,您需要实现java.util.Function接口。您可以在apply方法中包含任何类型的复杂逻辑,以提供更多的处理能力。
import java.util.function.Function;
public class JavaNativeExclamationFunction implements Function<String, String> {
@Override
public String apply(String input) {
return String.format("%s!", input);
}
}
更多详情,请参见代码示例。
要将一段Python代码用作“语言原生”函数,您必须有一个名为process的方法,如下所示。它会在接收到的任何字符串值后附加一个感叹号。
def process(input):
return "{}!".format(input)
更多详情,请参见代码示例。
用Python 3编写Pulsar函数。为了确保您的函数能够运行,您需要为函数工作者安装Python 3,并将Python 3设置为默认解释器。
使用SDK for Java/Python/Go
Pulsar Functions SDK 的实现指定了一个功能接口,该接口包括 context 对象作为参数。
以下示例使用不同语言的Pulsar Functions SDK。
- Java
- Python
- Go
在使用Java SDK开发函数时,您需要实现org.apache.pulsar.functions.api.Function接口。它只指定了一个您需要实现的方法,称为process。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public class ExclamationFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
return String.format("%s!", input);
}
}
更多详情,请参见代码示例。
函数的返回类型可以包装在Record泛型中,这样您可以更好地控制输出消息,例如主题、模式、属性等。
使用Context::newOutputRecordBuilder方法来构建这个Record输出。
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
public class RecordFunction implements Function<String, Record<String>> {
@Override
public Record<String> process(String input, Context context) throws Exception {
String output = String.format("%s!", input);
Map<String, String> properties = new HashMap<>(context.getCurrentRecord().getProperties());
context.getCurrentRecord().getTopicName().ifPresent(topic -> properties.put("input_topic", topic));
return context.newOutputRecordBuilder(Schema.STRING)
.value(output)
.properties(properties)
.build();
}
}
更多详情,请参见代码示例。
要使用Python SDK开发一个函数,您需要将pulsar客户端依赖项添加到您的Python安装中。
from pulsar import Function
class ExclamationFunction(Function):
def __init__(self):
pass
def process(self, input, context):
return input + '!'
更多详情,请参见代码示例。
要使用Go SDK开发一个函数,您需要将pulsar客户端依赖项添加到您的Go安装中,并在main()方法内的pf.Start()方法中提供函数的名称。这将向Pulsar Functions框架注册该函数,并确保在收到新消息时可以调用指定的函数。
package main
import (
"context"
"fmt"
"github.com/apache/pulsar/pulsar-function-go/pf"
)
func HandleRequest(ctx context.Context, in []byte) error{
fmt.Println(string(in) + "!")
return nil
}
func main() {
pf.Start(HandleRequest)
}
更多详情,请参见代码示例。
使用扩展的Java SDK
这个扩展的Pulsar Functions SDK提供了两个额外的接口来初始化和释放外部资源。
- 通过使用
initialize接口,您可以在函数实例启动时初始化只需要一次性初始化的外部资源。 - 通过使用
close接口,您可以在函数实例关闭时关闭引用的外部资源。
扩展的Pulsar Functions SDK for Java仅在Pulsar 2.10.0或更高版本中可用。在使用之前,您需要在Pulsar 2.10.0或更高版本中设置函数工作者。
以下示例使用Pulsar Functions SDK for Java的扩展接口在函数实例启动时初始化RedisClient,并在函数实例关闭时释放它。
- Java
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import io.lettuce.core.RedisClient;
public class InitializableFunction implements Function<String, String> {
private RedisClient redisClient;
private void initRedisClient(Map<String, Object> connectInfo) {
redisClient = RedisClient.create(connectInfo.get("redisURI"));
}
@Override
public void initialize(Context context) {
Map<String, Object> connectInfo = context.getUserConfigMap();
redisClient = initRedisClient(connectInfo);
}
@Override
public String process(String input, Context context) {
String value = client.get(key);
return String.format("%s-%s", input, value);
}
@Override
public void close() {
redisClient.close();
}
}