Skip to main content
Version: 4.0.x

教程

编写一个用于单词计数的函数

note

以下示例是一个有状态的函数。默认情况下,函数的状态是禁用的。有关更多说明,请参见启用有状态函数

要编写一个单词计数函数,请完成以下步骤。

  1. 使用SDK for Java编写Java函数。

     package org.example.functions;

    import org.apache.pulsar.functions.api.Context;
    import org.apache.pulsar.functions.api.Function;

    import java.util.Arrays;

    public class WordCountFunction implements Function<String, Void> {
    // This function is invoked every time a message is published to the input topic
    @Override
    public Void process(String input, Context context) throws Exception {
    Arrays.asList(input.split(" ")).forEach(word -> {
    String counterKey = word.toLowerCase();
    context.incrCounter(counterKey, 1);
    });
    return null;
    }
    }
  2. 打包并构建JAR文件,然后使用pulsar-admin命令将其部署到您的Pulsar集群中。

    bin/pulsar-admin functions create \
    --jar $PWD/target/my-jar-with-dependencies.jar \
    --classname org.example.functions.WordCountFunction \
    --tenant public \
    --namespace default \
    --name word-count \
    --inputs persistent://public/default/sentences \
    --output persistent://public/default/count

编写一个用于基于内容路由的函数

要编写一个基于内容的路由函数,请完成以下步骤。

  1. 使用SDK for Python在Python中编写函数。

     from pulsar import Function

    class RoutingFunction(Function):
    def __init__(self):
    self.fruits_topic = "persistent://public/default/fruits"
    self.vegetables_topic = "persistent://public/default/vegetables"

    def is_fruit(item):
    return item in [b"apple", b"orange", b"pear", b"other fruits..."]

    def is_vegetable(item):
    return item in [b"carrot", b"lettuce", b"radish", b"other vegetables..."]

    def process(self, item, context):
    if self.is_fruit(item):
    context.publish(self.fruits_topic, item)
    elif self.is_vegetable(item):
    context.publish(self.vegetables_topic, item)
    else:
    warning = "The item {0} is neither a fruit nor a vegetable".format(item)
    context.get_logger().warn(warning)
  2. 假设这段代码存储在~/router.py中,那么你可以使用pulsar-admin命令将其部署到你的Pulsar集群中。

    bin/pulsar-admin functions create \
    --py ~/router.py \
    --classname router.RoutingFunction \
    --tenant public \
    --namespace default \
    --name route-fruit-veg \
    --inputs persistent://public/default/basket-items

编写一个用于单词计数的窗口函数

note

目前,窗口函数仅在Java中可用。

这个示例演示了如何使用语言原生接口在Java中编写窗口函数。

每个输入消息都是一个句子,该句子被分割成单词,并且每个单词都被计数。内置的计数器状态用于以持久和一致的方式跟踪单词计数。

public class WordCountFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
Arrays.asList(input.split("\\s+")).forEach(word -> context.incrCounter(word, 1));
return null;
}
}