数据转换管道

学习如何将数据转换为Redis类型

Write-behind的数据转换功能允许用户超越源类型到Redis类型的默认转换来转换他们的数据。这种转换不需要编码。相反,它在一组人类可读的YAML文件中描述,每个源表一个文件。

摄入的格式和类型因来源而异。目前,唯一支持的来源是Debezium。从Debezium类型到带有Redis类型的原生JSON的第一次转换是自动完成的,无需用户指令。然后,这个JSON被传递到用户定义的转换管道。

每个作业描述了要对来自单个源的数据执行的转换逻辑。源通常是一个数据库表或集合,并指定为该表/集合的完整名称。作业可能包括过滤逻辑,以跳过符合条件的数据。作业中的其他逻辑步骤将数据转换为所需的输出,这些输出将以哈希或JSON的形式存储在Redis中。

默认任务

在需要对所有摄入的记录执行转换而不为特定表创建特定作业的情况下,使用默认作业。与此作业关联的转换将应用于所有没有明确定义自己作业的表。默认作业必须具有表名“*”,并且只允许存在一个此类作业的实例。

例如,默认作业可以简化任务,如为Redis键添加前缀或后缀,或为新的哈希和JSON添加字段,而无需自定义每个源表。

目前,默认作业仅支持摄取管道。

Example

此示例演示了使用add_field块向所有缺少明确定义作业的表添加一个值为fooapp_code字段的过程。此外,它还为每个生成的哈希键添加了一个aws前缀和一个gcp后缀。

default.yaml

source:
  table: "*"
  row_format: full
transform:
  - uses: add_field
    with:
      fields:
        - field: after.app_code
          expression: "`foo`"
          language: jmespath
output:
  - uses: redis.write
    with:
      data_type: hash
      key:
        expression: concat(['aws', '#', table, '#', keys(key)[0], '#', values(key)[0], '#gcp'])
        language: jmespath

工作

每个作业都在一个单独的YAML文件中定义。所有这些文件都将使用deploy命令上传到Write-behind。 有关更多信息,请参阅deploy configuration)。如果您正在使用 scaffold命令, 请将作业文件放置在jobs文件夹中。

作业 YAML 结构

字段

  • source:

    本节描述了作业操作的表:

    • server_name: 逻辑服务器名称(可选)。对应于Debezium Server的application.properties配置文件中指定的debezium.source.topic.prefix属性
    • db: 数据库名称(可选)
    • schema: 数据库模式(可选)
    • table: 数据库表名
    • row_format: 要转换的数据格式:data_only(默认)- 仅有效载荷,full - 完整的变更记录

注意:任何对属性server_namedbschematable的引用默认情况下将被视为不区分大小写。这可以通过将case_insensitive设置为false来更改。

仅限Cassandra:在Cassandra中,keyspace大致相当于其他数据库中的schema。Write-behind使用作业文件中声明的schema属性来匹配传入更改记录的keyspace属性。

仅限MongoDB:在MongoDB中,replica set是一个包含数据的分片集群,可以大致等同于关系数据库中的schema。MongoDB中的collection类似于其他数据库中的table。Write-behind使用作业文件中声明的schematable属性分别匹配传入变更记录的replica setcollection属性。

  • transform:

    本节包括一系列定义数据如何转换的块。 有关更多信息,请参阅 支持的块JMESPath 自定义函数

  • output:

    本节定义了处理数据的输出目标:

    • Cassandra:
      • uses: cassandra.write: 写入Cassandra数据存储
      • with:
        • connection: 连接名称
        • keyspace: 键空间
        • table: 目标表
        • keys: 键列数组
        • mapping: 映射列数组
        • opcode_field: 有效载荷中保存此记录在数据库中的操作(c - 创建,d - 删除,u - 更新)的字段名称
    • Redis:
      • uses: redis.write: 写入Redis数据结构。同一作业中允许多个此类块
      • with:
        • connection: 在config.yaml中定义的连接名称(默认使用名为'target'的连接)
        • data_type: 写入数据到Redis时的目标数据结构(支持的值有hash、json、set和stream)
        • key: 这允许您通过应用自定义逻辑覆盖记录的键:
          • expression: 要执行的表达式
          • language: 表达式语言,JMESPath或SQL
        • expire: 正整数值,表示键过期的秒数。如果未设置,键将永不过期
    • SQL:
      • uses: relational.write: 写入SQL兼容的数据存储
      • with:
        • connection: 连接名称
        • schema: 模式
        • table: 目标表名称
        • keys: 键列数组
        • mapping: 映射列数组
        • opcode_field: 有效载荷中保存此记录在数据库中的操作(c - 创建,d - 删除,u - 更新)的字段名称

笔记

  • source 是必需的。
  • 应指定transformkey或两者。

在转换中使用键

要访问Redis键(例如在写后作业中),您需要执行以下步骤:

  • 设置 row_format: full 以允许访问完整数据条目中的键。
  • 使用表达式 key.key 将 Redis 键作为字符串获取。

前后值

更新事件通常会报告beforeafter部分,提供更新前后数据状态的访问。 要显式访问“之前”的值,您需要:

  • 设置 row_format: full 以允许访问完整数据条目中的键。
  • 使用before.模式。

Example

此示例展示了如何使用rename_field块将表emp中的fname字段重命名为first_name。它还演示了如何设置此记录的键,而不是依赖默认逻辑。

redislabs.dbo.emp.yaml

source:
  server_name: redislabs
  schema: dbo
  table: emp
transform:
  - uses: rename_field
    with:
      from_field: fname
      to_field: first_name
output:
  - uses: redis.write
    with:
      connection: target
      key:
        expression: concat(['emp:fname:',fname,':lname:',lname])
        language: jmespath

部署配置

为了将您的作业部署到远程Write-behind数据库,请运行:

redis-di deploy

在Kubernetes上部署配置

如果Write-behind CLI作为pod部署在Kubernetes集群中,请按照以下步骤部署您的作业:

  • 从您的jobs文件夹中的YAML文件创建一个ConfigMap:

    kubectl create configmap redis-di-jobs --from-file=jobs/
    
  • 部署你的任务:

    kubectl exec -it pod/redis-di-cli -- redis-di deploy
    

注意:在创建/修改ConfigMap和其在redis-di-cli pod中的可用性之间会有延迟。在运行redis-di deploy命令之前,请等待大约30秒。

你有两种更新ConfigMap的选项:

  • 对于较小的更改,您可以直接使用此命令编辑ConfigMap:

    kubectl edit configmap redis-di-jobs
    
  • 对于较大的更改,例如添加另一个作业文件,请编辑本地jobs文件夹中的文件,然后运行以下命令:

    kubectl create configmap redis-di-jobs --from-file=jobs/ --dry-run=client -o yaml | kubectl apply -f -
    

注意:在使用任一选项更新ConfigMap后,您需要运行kubectl exec -it pod/redis-di-cli -- redis-di deploy

RATE THIS PAGE
Back to top ↑