Skip to main content
Version: 4.0.x

窗口函数上下文

Java SDK 提供了一个窗口上下文对象,可以被窗口函数使用。这个上下文对象为 Pulsar 窗口函数提供了丰富的信息和功能,如下所示。

  • Spec

    • 与函数关联的所有输入主题和输出主题的名称。
    • 与函数关联的租户和命名空间。
    • Pulsar窗口函数的名称、ID和版本。
    • 运行窗口函数的Pulsar函数实例的ID。
    • 调用窗口函数的实例数量。
    • 输出模式的内置类型或自定义类名。
  • Logger

    • 窗口函数使用的Logger对象,可用于创建窗口函数日志消息。
  • User config

    • 访问任意用户配置值。
  • Routing

    • Pulsar窗口函数支持路由功能。Pulsar窗口函数根据publish接口将消息发送到任意主题。
  • Metrics

    • 用于记录指标的接口。
  • State storage

规格

Spec 包含一个函数的基本信息。

获取输入主题

getInputTopics 方法获取所有输入主题的名称列表

此示例演示了如何在Java窗口函数中获取所有输入主题的名称列表。

public class GetInputTopicsWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
Collection<String> inputTopics = context.getInputTopics();
System.out.println(inputTopics);

return null;
}

}

获取输出主题

getOutputTopic 方法获取消息发送到的主题名称

此示例演示了如何在Java窗口函数中获取输出主题的名称。

public class GetOutputTopicWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String outputTopic = context.getOutputTopic();
System.out.println(outputTopic);

return null;
}
}

获取租户

getTenant 方法获取与窗口函数关联的租户名称。

这个示例演示了如何在Java窗口函数中获取租户名称。

public class GetTenantWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String tenant = context.getTenant();
System.out.println(tenant);

return null;
}

}

获取命名空间

getNamespace 方法获取与窗口函数关联的命名空间。

这个示例演示了如何在Java窗口函数中获取命名空间。

public class GetNamespaceWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String ns = context.getNamespace();
System.out.println(ns);

return null;
}

}

获取函数名称

getFunctionName 方法获取窗口函数的名称。

此示例演示了如何在Java窗口函数中获取函数名称。

public class GetNameOfWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String functionName = context.getFunctionName();
System.out.println(functionName);

return null;
}

}

获取函数ID

getFunctionId 方法获取窗口函数的ID。

此示例演示了如何在Java窗口函数中获取函数ID。

public class GetFunctionIDWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String functionID = context.getFunctionId();
System.out.println(functionID);

return null;
}

}

获取函数版本

getFunctionVersion 方法获取窗口函数的版本。

此示例演示如何获取Java窗口函数的函数版本。

public class GetVersionOfWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String functionVersion = context.getFunctionVersion();
System.out.println(functionVersion);

return null;
}

}

获取实例ID

getInstanceId 方法获取窗口函数的实例ID。

这个示例演示了如何在Java窗口函数中获取实例ID。

public class GetInstanceIDWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
int instanceId = context.getInstanceId();
System.out.println(instanceId);

return null;
}

}

获取实例数量

getNumInstances 方法获取调用窗口函数的实例数量。

这个示例演示了如何在Java窗口函数中获取实例的数量。

public class GetNumInstancesWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
int numInstances = context.getNumInstances();
System.out.println(numInstances);

return null;
}

}

获取输出模式类型

getOutputSchemaType 方法获取输出模式的内置类型或自定义类名。

此示例演示如何获取Java窗口函数的输出模式类型。

public class GetOutputSchemaTypeWindowFunction implements WindowFunction<String, Void> {

@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
String schemaType = context.getOutputSchemaType();
System.out.println(schemaType);

return null;
}
}

日志记录器

使用Java SDK的Pulsar窗口函数可以访问一个SLF4j Logger对象,该对象可用于在选定的日志级别生成日志。

此示例根据传入的字符串是否包含单词danger,在Java函数中记录WARNING级别或INFO级别的日志。

import java.util.Collection;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowContext;
import org.apache.pulsar.functions.api.WindowFunction;
import org.slf4j.Logger;

public class LoggingWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
Logger log = context.getLogger();
for (Record<String> record : inputs) {
log.info(record + "-window-log");
}
return null;
}

}

如果您需要您的函数生成日志,请在创建或运行函数时指定一个日志主题。

bin/pulsar-admin functions create \
--jar $PWD/my-functions.jar \
--classname my.package.LoggingFunction \
--log-topic persistent://public/default/logging-function-logs \
# Other function configs

你可以通过persistent://public/default/logging-function-logs主题访问由LoggingFunction生成的所有日志。

指标

Pulsar窗口函数可以发布任意指标到指标接口,这些指标可以被查询。

note

如果Pulsar窗口函数使用Java的语言原生接口,则该函数无法将指标和统计数据发布到Pulsar。

您可以使用上下文对象按每个键记录指标。

这个例子为process-count键设置了一个指标,并为elevens-count键设置了另一个不同的指标,每次函数在Java函数中处理消息时都会这样做。

import java.util.Collection;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowContext;
import org.apache.pulsar.functions.api.WindowFunction;


/**
* Example function that wants to keep track of
* the event time of each message sent.
*/
public class UserMetricWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {

for (Record<String> record : inputs) {
if (record.getEventTime().isPresent()) {
context.recordMetric("MessageEventTime", record.getEventTime().get().doubleValue());
}
}

return null;
}
}

用户配置

当你运行或更新使用SDK创建的Pulsar Functions时,你可以通过--user-config标志向它们传递任意的键/值对。键/值对必须以JSON格式指定。

此示例将用户配置的键/值传递给函数。

bin/pulsar-admin functions create \
--name word-filter \
--user-config '{"forbidden-word":"rosebud"}' \
# Other function configs

API

您可以使用以下API来获取窗口函数的用户定义信息。

获取用户配置映射

getUserConfigMap API 获取窗口函数的所有用户定义的键/值配置的映射。

/**
* Get a map of all user-defined key/value configs for the function.
*
* @return The full map of user-defined config values
*/
Map<String, Object> getUserConfigMap();

获取用户配置值

getUserConfigValue API 获取用户定义的键/值。

/**
* Get any user-defined key/value.
*
* @param key The key
* @return The Optional value specified by the user for that key.
*/
Optional<Object> getUserConfigValue(String key);

获取用户配置值或默认值

getUserConfigValueOrDefault API 获取用户定义的键/值,如果没有则获取默认值。

/**
* Get any user-defined key/value or a default value if none is present.
*
* @param key
* @param defaultValue
* @return Either the user config value associated with a given key or a supplied default value
*/
Object getUserConfigValueOrDefault(String key, Object defaultValue);

此示例演示了如何访问提供给Pulsar窗口函数的键/值对。

Java SDK 上下文对象使您能够访问通过命令行(作为 JSON)提供给 Pulsar 窗口函数的键/值对。

tip

对于传递给Java窗口函数的所有键/值对,keyvalue都是String类型。要将值设置为不同的类型,需要从String类型反序列化。

此示例在Java窗口函数中传递了一个键/值对。

bin/pulsar-admin functions create \
--user-config '{"word-of-the-day":"verdure"}' \
# Other function configs

此示例访问Java窗口函数中的值。

UserConfigFunction 函数每次被调用时(即每次消息到达时)都会记录字符串 "The word of the day is verdure"。只有在通过多种方式(例如命令行工具或REST API)使用新配置值更新函数时,word-of-the-day 的用户配置才会被更改。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;

import java.util.Optional;

public class UserConfigWindowFunction implements WindowFunction<String, String> {
@Override
public String process(Collection<Record<String>> input, WindowContext context) throws Exception {
Optional<Object> whatToWrite = context.getUserConfigValue("WhatToWrite");
if (whatToWrite.get() != null) {
return (String)whatToWrite.get();
} else {
return "Not a nice way";
}
}
}

如果没有提供值,您可以访问整个用户配置映射或设置默认值。

// Get the whole config map
Map<String, String> allConfigs = context.getUserConfigMap();

// Get value or resort to default
String wotd = context.getUserConfigValueOrDefault("word-of-the-day", "perspicacious");

路由

你可以使用context.publish()接口发布任意数量的结果。

这个例子展示了PublishFunction类在Java函数中使用内置函数将消息发布到publishTopic

public class PublishWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> input, WindowContext context) throws Exception {
String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
String output = String.format("%s!", input);
context.publish(publishTopic, output);

return null;
}

}

状态存储

Pulsar窗口函数使用Apache BookKeeper作为状态存储接口。Apache Pulsar安装(包括独立安装)包括BookKeeper bookies的部署。

Apache Pulsar 与 Apache BookKeeper 的 table service 集成,用于存储函数的 state。例如,WordCount 函数可以通过 Pulsar Functions 状态 API 将其 counters 状态存储到 BookKeeper 表服务中。

状态是键值对,其中键是字符串,值是任意的二进制数据——计数器存储为64位大端序二进制值。键的作用域限定在单个Pulsar函数内,并在该函数的实例之间共享。

目前,Pulsar 窗口函数暴露了 Java API 来访问、更新和管理状态。这些 API 在使用 Java SDK 函数时在上下文对象中可用。

Java API描述
incrCounter增加由键引用的内置分布式计数器。
getCounter获取键的计数器值。
putState更新键的状态值。

您可以使用以下API来访问、更新和管理Java窗口函数中的状态。

增加计数器

incrCounter API 增加一个由键引用的内置分布式计数器。

应用程序使用incrCounter API来根据给定的amount更改给定key的计数器。如果key不存在,则会创建一个新的键。

    /**
* Increment the built-in distributed counter referred by key
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);

获取计数器

getCounter API 获取键的计数器值。

应用程序使用getCounter API来检索由incrCounter API更改的给定key的计数器。

    /**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);

除了getCounter API,Pulsar 还为函数提供了一个通用的键/值 API(putState),用于存储通用的键/值状态。

putState

putState API 更新键的状态值。

    /**
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);

此示例演示了应用程序如何在Pulsar窗口函数中存储状态。

WordCountWindowFunction 的逻辑简单明了。

  1. 该函数首先使用正则表达式 \\. 将接收到的字符串分割成多个单词。

  2. 对于每个word,函数通过incrCounter(key, amount)将相应的counter增加1。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.util.Arrays;

public class WordCountWindowFunction implements WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
for (Record<String> input : inputs) {
Arrays.asList(input.getValue().split("\\.")).forEach(word -> context.incrCounter(word, 1));
}
return null;

}
}