Skip to main content
Version: 4.0.x

使用API

下表概述了可用于在Java、Python和Go中开发Pulsar Functions的API。

接口描述使用场景
Language-native interface for Java/Python不需要Pulsar特定的库或特殊依赖(仅需要核心库)。不需要访问context的函数。
Pulsar Functions SDK for Java/Python/GoPulsar特定的库,提供了一系列在语言原生接口中不可用的功能,例如状态管理或用户配置。需要访问context的函数。
Extended Pulsar Functions SDK for JavaPulsar特定库的扩展,提供Java中的初始化和关闭接口。需要初始化和释放外部资源的函数。

使用Java/Python的原生语言接口

语言原生接口提供了一种简单而干净的方法来编写Java/Python函数,通过在所有传入的字符串中添加一个感叹号并将输出字符串发布到一个主题。它没有外部依赖。

以下示例是语言原生函数。

要将一段Java代码用作“语言原生”函数,您需要实现java.util.Function接口。您可以在apply方法中包含任何类型的复杂逻辑,以提供更多的处理能力。

import java.util.function.Function;

public class JavaNativeExclamationFunction implements Function<String, String> {
@Override
public String apply(String input) {
return String.format("%s!", input);
}
}

更多详情,请参见代码示例

使用SDK for Java/Python/Go

Pulsar Functions SDK 的实现指定了一个功能接口,该接口包括 context 对象作为参数。

以下示例使用不同语言的Pulsar Functions SDK。

在使用Java SDK开发函数时,您需要实现org.apache.pulsar.functions.api.Function接口。它只指定了一个您需要实现的方法,称为process

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public class ExclamationFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
return String.format("%s!", input);
}
}

更多详情,请参见代码示例

函数的返回类型可以包装在Record泛型中,这样您可以更好地控制输出消息,例如主题、模式、属性等。 使用Context::newOutputRecordBuilder方法来构建这个Record输出。

import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;

public class RecordFunction implements Function<String, Record<String>> {

@Override
public Record<String> process(String input, Context context) throws Exception {
String output = String.format("%s!", input);
Map<String, String> properties = new HashMap<>(context.getCurrentRecord().getProperties());
context.getCurrentRecord().getTopicName().ifPresent(topic -> properties.put("input_topic", topic));

return context.newOutputRecordBuilder(Schema.STRING)
.value(output)
.properties(properties)
.build();
}
}

更多详情,请参见代码示例

使用扩展的Java SDK

这个扩展的Pulsar Functions SDK提供了两个额外的接口来初始化和释放外部资源。

  • 通过使用initialize接口,您可以在函数实例启动时初始化只需要一次性初始化的外部资源。
  • 通过使用close接口,您可以在函数实例关闭时关闭引用的外部资源。
note

扩展的Pulsar Functions SDK for Java仅在Pulsar 2.10.0或更高版本中可用。在使用之前,您需要在Pulsar 2.10.0或更高版本中设置函数工作者

以下示例使用Pulsar Functions SDK for Java的扩展接口在函数实例启动时初始化RedisClient,并在函数实例关闭时释放它。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import io.lettuce.core.RedisClient;

public class InitializableFunction implements Function<String, String> {
private RedisClient redisClient;

private void initRedisClient(Map<String, Object> connectInfo) {
redisClient = RedisClient.create(connectInfo.get("redisURI"));
}

@Override
public void initialize(Context context) {
Map<String, Object> connectInfo = context.getUserConfigMap();
redisClient = initRedisClient(connectInfo);
}

@Override
public String process(String input, Context context) {
String value = client.get(key);
return String.format("%s-%s", input, value);
}

@Override
public void close() {
redisClient.close();
}
}