数据转换管道
学习如何将数据转换为Redis类型
Write-behind的数据转换功能允许用户超越源类型到Redis类型的默认转换来转换他们的数据。这种转换不需要编码。相反,它在一组人类可读的YAML文件中描述,每个源表一个文件。
摄入的格式和类型因来源而异。目前,唯一支持的来源是Debezium。从Debezium类型到带有Redis类型的原生JSON的第一次转换是自动完成的,无需用户指令。然后,这个JSON被传递到用户定义的转换管道。
每个作业描述了要对来自单个源的数据执行的转换逻辑。源通常是一个数据库表或集合,并指定为该表/集合的完整名称。作业可能包括过滤逻辑,以跳过符合条件的数据。作业中的其他逻辑步骤将数据转换为所需的输出,这些输出将以哈希或JSON的形式存储在Redis中。

默认任务
在需要对所有摄入的记录执行转换而不为特定表创建特定作业的情况下,使用默认作业。与此作业关联的转换将应用于所有没有明确定义自己作业的表。默认作业必须具有表名“*”,并且只允许存在一个此类作业的实例。
例如,默认作业可以简化任务,如为Redis键添加前缀或后缀,或为新的哈希和JSON添加字段,而无需自定义每个源表。
目前,默认作业仅支持摄取管道。
Example
此示例演示了使用add_field块向所有缺少明确定义作业的表添加一个值为foo
的app_code
字段的过程。此外,它还为每个生成的哈希键添加了一个aws
前缀和一个gcp
后缀。
default.yaml
source:
table: "*"
row_format: full
transform:
- uses: add_field
with:
fields:
- field: after.app_code
expression: "`foo`"
language: jmespath
output:
- uses: redis.write
with:
data_type: hash
key:
expression: concat(['aws', '#', table, '#', keys(key)[0], '#', values(key)[0], '#gcp'])
language: jmespath
工作
每个作业都在一个单独的YAML文件中定义。所有这些文件都将使用deploy
命令上传到Write-behind。
有关更多信息,请参阅deploy configuration)。如果您正在使用
scaffold命令,
请将作业文件放置在jobs
文件夹中。
作业 YAML 结构
字段
-
source
:本节描述了作业操作的表:
server_name
: 逻辑服务器名称(可选)。对应于Debezium Server的application.properties
配置文件中指定的debezium.source.topic.prefix
属性db
: 数据库名称(可选)schema
: 数据库模式(可选)table
: 数据库表名row_format
: 要转换的数据格式:data_only
(默认)- 仅有效载荷,full - 完整的变更记录
注意:任何对属性
server_name
、db
、schema
和table
的引用默认情况下将被视为不区分大小写。这可以通过将case_insensitive
设置为false
来更改。
仅限Cassandra:在Cassandra中,
keyspace
大致相当于其他数据库中的schema
。Write-behind使用作业文件中声明的schema
属性来匹配传入更改记录的keyspace
属性。
仅限MongoDB:在MongoDB中,
replica set
是一个包含数据的分片集群,可以大致等同于关系数据库中的schema
。MongoDB中的collection
类似于其他数据库中的table
。Write-behind使用作业文件中声明的schema
和table
属性分别匹配传入变更记录的replica set
和collection
属性。
-
transform
:本节包括一系列定义数据如何转换的块。 有关更多信息,请参阅 支持的块 和 JMESPath 自定义函数。
-
output
:本节定义了处理数据的输出目标:
- Cassandra:
uses
:cassandra.write
: 写入Cassandra数据存储with
:connection
: 连接名称keyspace
: 键空间table
: 目标表keys
: 键列数组mapping
: 映射列数组opcode_field
: 有效载荷中保存此记录在数据库中的操作(c - 创建,d - 删除,u - 更新)的字段名称
- Redis:
uses
:redis.write
: 写入Redis数据结构。同一作业中允许多个此类块with
:connection
: 在config.yaml
中定义的连接名称(默认使用名为'target'的连接)data_type
: 写入数据到Redis时的目标数据结构(支持的值有hash、json、set和stream)key
: 这允许您通过应用自定义逻辑覆盖记录的键:expression
: 要执行的表达式language
: 表达式语言,JMESPath或SQL
expire
: 正整数值,表示键过期的秒数。如果未设置,键将永不过期
- SQL:
uses
:relational.write
: 写入SQL兼容的数据存储with
:connection
: 连接名称schema
: 模式table
: 目标表名称keys
: 键列数组mapping
: 映射列数组opcode_field
: 有效载荷中保存此记录在数据库中的操作(c - 创建,d - 删除,u - 更新)的字段名称
- Cassandra:
笔记
source
是必需的。- 应指定
transform
、key
或两者。
在转换中使用键
要访问Redis键(例如在写后作业中),您需要执行以下步骤:
- 设置
row_format: full
以允许访问完整数据条目中的键。 - 使用表达式
key.key
将 Redis 键作为字符串获取。
前后值
更新事件通常会报告before
和after
部分,提供更新前后数据状态的访问。
要显式访问“之前”的值,您需要:
- 设置
row_format: full
以允许访问完整数据条目中的键。 - 使用
before.
模式。
Example
此示例展示了如何使用rename_field
块将表emp
中的fname
字段重命名为first_name
。它还演示了如何设置此记录的键,而不是依赖默认逻辑。
redislabs.dbo.emp.yaml
source:
server_name: redislabs
schema: dbo
table: emp
transform:
- uses: rename_field
with:
from_field: fname
to_field: first_name
output:
- uses: redis.write
with:
connection: target
key:
expression: concat(['emp:fname:',fname,':lname:',lname])
language: jmespath
部署配置
为了将您的作业部署到远程Write-behind数据库,请运行:
redis-di deploy
在Kubernetes上部署配置
如果Write-behind CLI作为pod部署在Kubernetes集群中,请按照以下步骤部署您的作业:
-
从您的
jobs
文件夹中的YAML文件创建一个ConfigMap:kubectl create configmap redis-di-jobs --from-file=jobs/
-
部署你的任务:
kubectl exec -it pod/redis-di-cli -- redis-di deploy
注意:在创建/修改ConfigMap和其在
redis-di-cli
pod中的可用性之间会有延迟。在运行redis-di deploy
命令之前,请等待大约30秒。
你有两种更新ConfigMap的选项:
-
对于较小的更改,您可以直接使用此命令编辑ConfigMap:
kubectl edit configmap redis-di-jobs
-
对于较大的更改,例如添加另一个作业文件,请编辑本地
jobs
文件夹中的文件,然后运行以下命令:kubectl create configmap redis-di-jobs --from-file=jobs/ --dry-run=client -o yaml | kubectl apply -f -
注意:在使用任一选项更新ConfigMap后,您需要运行
kubectl exec -it pod/redis-di-cli -- redis-di deploy
。