窗口函数上下文
Java SDK 提供了一个窗口上下文对象,可以被窗口函数使用。这个上下文对象为 Pulsar 窗口函数提供了丰富的信息和功能,如下所示。
-
- 与函数关联的所有输入主题和输出主题的名称。
- 与函数关联的租户和命名空间。
- Pulsar窗口函数的名称、ID和版本。
- 运行窗口函数的Pulsar函数实例的ID。
- 调用窗口函数的实例数量。
- 输出模式的内置类型或自定义类名。
-
- 窗口函数使用的Logger对象,可用于创建窗口函数日志消息。
-
- 访问任意用户配置值。
-
- Pulsar窗口函数支持路由功能。Pulsar窗口函数根据
publish
接口将消息发送到任意主题。
- Pulsar窗口函数支持路由功能。Pulsar窗口函数根据
-
- 用于记录指标的接口。
-
- 用于在state storage中存储和检索状态的接口。
规格
Spec 包含一个函数的基本信息。
获取输入主题
getInputTopics
方法获取所有输入主题的名称列表。
此示例演示了如何在Java窗口函数中获取所有输入主题的名称列表。
public class GetInputTopicsWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
Collection<String> inputTopics = context.getInputTopics();
System.out.println(inputTopics);
return null;
}
}
获取输出主题
getOutputTopic
方法获取消息发送到的主题名称。
此示例演示了如何在Java窗口函数中获取输出主题的名称。
public class GetOutputTopicWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String outputTopic = context.getOutputTopic();
System.out.println(outputTopic);
return null;
}
}
获取租户
getTenant
方法获取与窗口函数关联的租户名称。
这个示例演示了如何在Java窗口函数中获取租户名称。
public class GetTenantWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String tenant = context.getTenant();
System.out.println(tenant);
return null;
}
}
获取命名空间
getNamespace
方法获取与窗口函数关联的命名空间。
这个示例演示了如何在Java窗口函数中获取命名空间。
public class GetNamespaceWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String ns = context.getNamespace();
System.out.println(ns);
return null;
}
}
获取函数名称
getFunctionName
方法获取窗口函数的名称。
此示例演示了如何在Java窗口函数中获取函数名称。
public class GetNameOfWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String functionName = context.getFunctionName();
System.out.println(functionName);
return null;
}
}
获取函数ID
getFunctionId
方法获取窗口函数的ID。
此示例演示了如何在Java窗口函数中获取函数ID。
public class GetFunctionIDWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String functionID = context.getFunctionId();
System.out.println(functionID);
return null;
}
}
获取函数版本
getFunctionVersion
方法获取窗口函数的版本。
此示例演示如何获取Java窗口函数的函数版本。
public class GetVersionOfWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String functionVersion = context.getFunctionVersion();
System.out.println(functionVersion);
return null;
}
}
获取实例ID
getInstanceId
方法获取窗口函数的实例ID。
这个示例演示了如何在Java窗口函数中获取实例ID。
public class GetInstanceIDWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
int instanceId = context.getInstanceId();
System.out.println(instanceId);
return null;
}
}
获取实例数量
getNumInstances
方法获取调用窗口函数的实例数量。
这个示例演示了如何在Java窗口函数中获取实例的数量。
public class GetNumInstancesWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
int numInstances = context.getNumInstances();
System.out.println(numInstances);
return null;
}
}
获取输出模式类型
getOutputSchemaType
方法获取输出模式的内置类型或自定义类名。
此示例演示如何获取Java窗口函数的输出模式类型。
public class GetOutputSchemaTypeWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String schemaType = context.getOutputSchemaType();
System.out.println(schemaType);
return null;
}
}
日志记录器
使用Java SDK的Pulsar窗口函数可以访问一个SLF4j Logger
对象,该对象可用于在选定的日志级别生成日志。
此示例根据传入的字符串是否包含单词danger
,在Java函数中记录WARNING
级别或INFO
级别的日志。
import java.util.Collection;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowContext;
import org.apache.pulsar.functions.api.WindowFunction;
import org.slf4j.Logger;
public class LoggingWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
Logger log = context.getLogger();
for (Record<String> record : inputs) {
log.info(record + "-window-log");
}
return null;
}
}
如果您需要您的函数生成日志,请在创建或运行函数时指定一个日志主题。
bin/pulsar-admin functions create \
--jar $PWD/my-functions.jar \
--classname my.package.LoggingFunction \
--log-topic persistent://public/default/logging-function-logs \
# Other function configs
你可以通过persistent://public/default/logging-function-logs
主题访问由LoggingFunction
生成的所有日志。
指标
Pulsar窗口函数可以发布任意指标到指标接口,这些指标可以被查询。
如果Pulsar窗口函数使用Java的语言原生接口,则该函数无法将指标和统计数据发布到Pulsar。
您可以使用上下文对象按每个键记录指标。
这个例子为process-count
键设置了一个指标,并为elevens-count
键设置了另一个不同的指标,每次函数在Java函数中处理消息时都会这样做。
import java.util.Collection;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowContext;
import org.apache.pulsar.functions.api.WindowFunction;
/**
* Example function that wants to keep track of
* the event time of each message sent.
*/
public class UserMetricWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
for (Record<String> record : inputs) {
if (record.getEventTime().isPresent()) {
context.recordMetric("MessageEventTime", record.getEventTime().get().doubleValue());
}
}
return null;
}
}
用户配置
当你运行或更新使用SDK创建的Pulsar Functions时,你可以通过--user-config
标志向它们传递任意的键/值对。键/值对必须以JSON格式指定。
此示例将用户配置的键/值传递给函数。
bin/pulsar-admin functions create \
--name word-filter \
--user-config '{"forbidden-word":"rosebud"}' \
# Other function configs
API
您可以使用以下API来获取窗口函数的用户定义信息。
获取用户配置映射
getUserConfigMap
API 获取窗口函数的所有用户定义的键/值配置的映射。
/**
* Get a map of all user-defined key/value configs for the function.
*
* @return The full map of user-defined config values
*/
Map<String, Object> getUserConfigMap();
获取用户配置值
getUserConfigValue
API 获取用户定义的键/值。
/**
* Get any user-defined key/value.
*
* @param key The key
* @return The Optional value specified by the user for that key.
*/
Optional<Object> getUserConfigValue(String key);
获取用户配置值或默认值
getUserConfigValueOrDefault
API 获取用户定义的键/值,如果没有则获取默认值。
/**
* Get any user-defined key/value or a default value if none is present.
*
* @param key
* @param defaultValue
* @return Either the user config value associated with a given key or a supplied default value
*/
Object getUserConfigValueOrDefault(String key, Object defaultValue);
此示例演示了如何访问提供给Pulsar窗口函数的键/值对。
Java SDK 上下文对象使您能够访问通过命令行(作为 JSON)提供给 Pulsar 窗口函数的键/值对。
对于传递给Java窗口函数的所有键/值对,key
和value
都是String
类型。要将值设置为不同的类型,需要从String
类型反序列化。
此示例在Java窗口函数中传递了一个键/值对。
bin/pulsar-admin functions create \
--user-config '{"word-of-the-day":"verdure"}' \
# Other function configs
此示例访问Java窗口函数中的值。
UserConfigFunction
函数每次被调用时(即每次消息到达时)都会记录字符串 "The word of the day is verdure"
。只有在通过多种方式(例如命令行工具或REST API)使用新配置值更新函数时,word-of-the-day
的用户配置才会被更改。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
import java.util.Optional;
public class UserConfigWindowFunction implements WindowFunction<String, String> {
@Override
public String process(Collection<Record<String>> input, WindowContext context) throws Exception {
Optional<Object> whatToWrite = context.getUserConfigValue("WhatToWrite");
if (whatToWrite.get() != null) {
return (String)whatToWrite.get();
} else {
return "Not a nice way";
}
}
}
如果没有提供值,您可以访问整个用户配置映射或设置默认值。
// Get the whole config map
Map<String, String> allConfigs = context.getUserConfigMap();
// Get value or resort to default
String wotd = context.getUserConfigValueOrDefault("word-of-the-day", "perspicacious");
路由
你可以使用context.publish()
接口发布任意数量的结果。
这个例子展示了PublishFunction
类在Java函数中使用内置函数将消息发布到publishTopic
。
public class PublishWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> input, WindowContext context) throws Exception {
String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
String output = String.format("%s!", input);
context.publish(publishTopic, output);
return null;
}
}
状态存储
Pulsar窗口函数使用Apache BookKeeper作为状态存储接口。Apache Pulsar安装(包括独立安装)包括BookKeeper bookies的部署。
Apache Pulsar 与 Apache BookKeeper 的 table service
集成,用于存储函数的 state
。例如,WordCount
函数可以通过 Pulsar Functions 状态 API 将其 counters
状态存储到 BookKeeper 表服务中。
状态是键值对,其中键是字符串,值是任意的二进制数据——计数器存储为64位大端序二进制值。键的作用域限定在单个Pulsar函数内,并在该函数的实例之间共享。
目前,Pulsar 窗口函数暴露了 Java API 来访问、更新和管理状态。这些 API 在使用 Java SDK 函数时在上下文对象中可用。
Java API | 描述 |
---|---|
incrCounter | 增加由键引用的内置分布式计数器。 |
getCounter | 获取键的计数器值。 |
putState | 更新键的状态值。 |
您可以使用以下API来访问、更新和管理Java窗口函数中的状态。
增加计数器
incrCounter
API 增加一个由键引用的内置分布式计数器。
应用程序使用incrCounter
API来根据给定的amount
更改给定key
的计数器。如果key
不存在,则会创建一个新的键。
/**
* Increment the built-in distributed counter referred by key
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);
获取计数器
getCounter
API 获取键的计数器值。
应用程序使用getCounter
API来检索由incrCounter
API更改的给定key
的计数器。
/**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);
除了getCounter
API,Pulsar 还为函数提供了一个通用的键/值 API(putState
),用于存储通用的键/值状态。
putState
putState
API 更新键的状态值。
/**
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);
此示例演示了应用程序如何在Pulsar窗口函数中存储状态。
WordCountWindowFunction
的逻辑简单明了。
-
该函数首先使用正则表达式
\\.
将接收到的字符串分割成多个单词。 -
对于每个
word
,函数通过incrCounter(key, amount)
将相应的counter
增加1。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import java.util.Arrays;
public class WordCountWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
for (Record<String> input : inputs) {
Arrays.asList(input.getValue().split("\\.")).forEach(word -> context.incrCounter(word, 1));
}
return null;
}
}