Skip to main content
Version: 4.0.x

配置Kubernetes运行时

Kubernetes 运行时在函数工作器生成并应用 Kubernetes 清单时工作。由函数工作器生成的清单包括:

  • 一个 StatefulSet 默认情况下,StatefulSet 清单包含一个具有多个副本的 pod。数量由函数的 parallelism 决定。pod 在启动时下载函数负载(通过函数工作器 REST API)。如果配置了函数运行时,pod 的容器镜像是可配置的。
  • 一个 Service(用于与 pod 通信)
  • 一个用于验证凭证的Secret(当适用时)。 Kubernetes 运行时支持密钥。您可以创建一个 Kubernetes 密钥并将其作为环境变量暴露在 pod 中。
tip

有关将Pulsar对象名称转换为Kubernetes资源标签的规则,请参阅说明

配置基本设置

要快速配置Kubernetes运行时,您可以使用conf/functions_worker.yml文件中的KubernetesRuntimeFactoryConfig的默认设置。

如果您已经使用Helm chart在Kubernetes上设置了Pulsar集群,这意味着函数工作器也已经在Kubernetes上设置好了,您可以使用与运行函数工作器的pod相关联的serviceAccount。否则,您可以通过将functionRuntimeFactoryConfigs设置为k8Uri来配置函数工作器与Kubernetes集群通信。

集成Kubernetes密钥

在Kubernetes中,Secret是一个包含一些机密数据的对象,例如密码、令牌或密钥。当你在部署函数的Kubernetes命名空间中创建一个secret时,函数可以安全地引用和分发它。要启用此功能,请在conf/functions-worker.yml文件中将secretsProviderConfiguratorClassName设置为org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator

例如,您部署一个函数pulsar-func Kubernetes命名空间,并且您有一个名为database-creds的密钥,其中包含一个字段名password,您希望将其作为名为DATABASE_PASSWORD的环境变量挂载到pod中。以下配置使函数能够引用该密钥并将其值作为环境变量挂载到pod中。

tenant: "mytenant"
namespace: "mynamespace"
name: "myfunction"
inputs: [ "persistent://mytenant/mynamespace/myfuncinput" ]
className: "com.company.pulsar.myfunction"

secrets:
# the secret will be mounted from the `password` field in the `database-creds` secret as an env var called `DATABASE_PASSWORD`
DATABASE_PASSWORD:
path: "database-creds"
key: "password"

启用令牌认证

当您使用令牌认证、TLS加密或自定义认证来保护与您的Pulsar集群的通信时,Pulsar会将您的证书颁发机构(CA)传递给客户端,以便客户端可以使用您签名的证书对集群进行认证。

要为您的Pulsar集群启用令牌认证,您需要通过实现org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider接口,为运行您的函数的pod指定一种机制来认证broker。

  • 对于令牌认证,Pulsar 包含了上述接口的实现来分发 CA。函数工作者捕获部署(或更新)函数的令牌,将其保存为秘密,并将其挂载到 pod 中。

    conf/function-worker.yml 文件中的配置如下。functionAuthProviderClassName 用于指定此实现的路径。

    functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider
  • 对于TLS或自定义认证,您可以实现org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider接口或使用其他机制。

note

如果您用于部署函数的令牌有到期日期,您可能需要在它到期后再次部署函数。

为函数Pod认证启用Kubernetes服务账户令牌投影

KubernetesServiceAccountTokenAuthProvider 使用 服务账户令牌卷投影 将令牌挂载到函数的 pod 中。函数工作器和代理可以使用 OpenID Connect 验证此令牌。这种集成的主要好处是令牌的生存时间短,由 Kubernetes 管理,并且不继承用于创建函数的权限。

note

此功能要求代理和函数工作者配置为使用AuthenticationProviderOpenID。启用此提供程序的文档可以在这里找到。

以下是函数工作者利用此功能的示例配置:

functionAuthProviderClassName: "org.apache.pulsar.functions.auth.KubernetesServiceAccountTokenAuthProvider"
kubernetesContainerFactory:
kubernetesFunctionAuthProviderConfig:
# Required
serviceAccountTokenExpirationSeconds: "600"
serviceAccountTokenAudience: "the-required-audience"
# Optional
brokerClientTrustCertsSecretName: "my-secret-pulsar-broker-client-trust-certs"

该函数pod使用目标命名空间的默认Kubernetes服务账户进行部署。由于服务账户名称映射到JWT的sub声明,该JWT被投射到pod的文件系统中,因此所有具有相同服务账户的pod在Pulsar中将具有相同的权限。目前正在进行改进此集成的工作。

以下是在EKS中运行此功能生成的示例JWT(部分信息已编辑):

{
"aud": [
"your-audience"
],
"exp": 1710969822,
"iat": 1679433822,
"iss": "https://oidc.eks.us-east-2.amazonaws.com/id/some-id",
"kubernetes.io": {
"namespace": "pulsar-function",
"pod": {
"name": "function-pod-0",
"uid": "fbac8f9e-a47d-4ad7-a8f0-cc9a65d1331c"
},
"serviceaccount": {
"name": "default",
"uid": "5964f9d3-3dce-467c-8dbe-d0f463063d7a"
},
"warnafter": 1679437429
},
"nbf": 1679433822,
"sub": "system:serviceaccount:pulsar-function:default"
}

要授予此功能pod的权限,您需要授予角色声明的权限,默认情况下是sub声明,system:serviceaccount:pulsar-function:default

自定义Kubernetes运行时

自定义Kubernetes运行时允许您自定义由运行时创建的Kubernetes资源,包括如何生成清单、如何将认证数据传递给pod,以及如何集成密钥。

要自定义Kubernetes运行时,您可以在conf/functions-worker.yml文件中设置runtimeCustomizerClassName并使用完全限定的类名。

函数API提供了一个名为customRuntimeOptions的标志,该标志传递给org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer接口。要初始化KubernetesManifestCustomizer,您可以在conf/functions-worker.yml文件中设置runtimeCustomizerConfig

note

runtimeCustomizerConfig 在所有函数中都是相同的。如果你同时提供了 runtimeCustomizerConfigcustomRuntimeOptions,你需要在实现 KubernetesManifestCustomizer 接口时决定如何管理这两个配置。

Pulsar 包含一个内置实现,使用 runtimeCustomizerConfig 初始化。它允许您传递一个 JSON 文档作为 customRuntimeOptions,并带有某些属性以增强功能。要使用此内置实现,请将 runtimeCustomizerClassName 设置为 org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer

如果同时提供了runtimeCustomizerConfigcustomRuntimeOptions并且存在冲突,BasicKubernetesManifestCustomizer会使用customRuntimeOptions来覆盖runtimeCustomizerConfig

以下是配置 customRuntimeOptions 的示例。

{
"jobName": "jobname", // the k8s pod name to run this function instance
"jobNamespace": "namespace", // the k8s namespace to run this function in
"extractLabels": { // extra labels to attach to the statefulSet, service, and pods
"extraLabel": "value"
},
"extraAnnotations": { // extra annotations to attach to the statefulSet, service, and pods
"extraAnnotation": "value"
},
"nodeSelectorLabels": { // node selector labels to add on to the pod spec
"customLabel": "value"
},
"tolerations": [ // tolerations to add to the pod spec
{
"key": "custom-key",
"value": "value",
"effect": "NoSchedule"
}
],
"resourceRequirements": { // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
"requests": {
"cpu": 1,
"memory": "4G"
},
"limits": {
"cpu": 2,
"memory": "8G"
}
}
}

如何在 Kubernetes 中运行 Pulsar 时定义 Pulsar 资源名称

如果您在Kubernetes上运行Pulsar Functions或连接器,无论使用哪种管理界面,都需要遵循Kubernetes命名约定来定义Pulsar资源的名称。

Kubernetes 需要一个可以用作 DNS 子域名的名称,如 RFC 1123 中所定义。Pulsar 支持比 Kubernetes 命名约定更多的合法字符。如果您创建了一个包含 Kubernetes 不支持的特殊字符的 Pulsar 资源名称(例如,在 Pulsar 命名空间名称中包含冒号),Kubernetes 运行时会将这些 Pulsar 对象名称转换为符合 RFC 1123 规范的 Kubernetes 资源标签。因此,您可以使用 Kubernetes 运行时运行函数或连接器。将 Pulsar 对象名称转换为 Kubernetes 资源标签的规则如下:

  • 截断至63个字符

  • 将以下字符替换为破折号(-):

    • 非字母数字字符

    • 下划线(_)

    • 点(.)

  • 将开头和结尾的非字母数字字符替换为0

tip
  • 如果在将Pulsar对象名称转换为Kubernetes资源标签时遇到错误(例如,如果您的Pulsar对象名称过长,可能会出现命名冲突)或想要自定义转换规则,请参阅自定义Kubernetes运行时
  • 有关如何配置Kubernetes运行时的信息,请参阅instructions