Redis 模块和阻塞命令
如何在Redis模块中实现阻塞命令
Redis 在内置命令集中有一些阻塞命令。
其中最常用的是 BLPOP
(或对称的 BRPOP
),它会阻塞等待元素到达列表。
关于阻塞命令的一个有趣事实是,它们不会阻塞整个服务器,而只是阻塞调用它们的客户端。通常阻塞的原因是我们期望发生一些外部事件:这可能是Redis数据结构中的一些变化,比如在BLPOP
的情况下,线程中发生的长时间计算,从网络接收一些数据,等等。
Redis 模块也能够实现阻塞命令,本文档展示了 API 的工作原理,并描述了一些可用于建模阻塞命令的模式。
阻塞和恢复的工作原理。
注意:您可能想查看Redis源代码树中src/modules
目录下的helloblock.c
示例,这是一个简单易懂的示例,展示了如何应用阻塞API。
在Redis模块中,命令是通过回调函数实现的,当用户调用特定命令时,Redis核心会调用这些回调函数。通常,回调函数通过向客户端发送一些回复来终止其执行。使用以下函数,实现模块命令的函数可以请求将客户端置于阻塞状态:
RedisModuleBlockedClient *RedisModule_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms);
该函数返回一个RedisModuleBlockedClient
对象,稍后用于解除客户端的阻塞。参数的含义如下:
ctx
是命令执行上下文,通常与 API 的其余部分一样。reply_callback
是回调函数,具有与普通命令函数相同的原型,当客户端解除阻塞时调用,以便向客户端返回回复。timeout_callback
是回调函数,具有与普通命令函数相同的原型,当客户端达到ms
超时时调用。free_privdata
是用于释放私有数据的回调函数。私有数据是一个指向某些数据的指针,这些数据在用于解除客户端阻塞的API和将回复发送给客户端的回调函数之间传递。我们将在本文档的后面部分看到这种机制的工作原理。ms
是以毫秒为单位的超时时间。当达到超时时间时,将调用超时回调函数,并且客户端将自动中止。
一旦客户端被阻止,可以使用以下API解除阻止:
int RedisModule_UnblockClient(RedisModuleBlockedClient *bc, void *privdata);
该函数以之前调用RedisModule_BlockClient()
返回的被阻塞客户端对象作为参数,并解除客户端的阻塞。在客户端被解除阻塞之前,会调用客户端被阻塞时指定的reply_callback
函数:该函数将能够访问此处使用的privdata
指针。
重要提示:上述函数是线程安全的,可以从正在执行某些工作的线程中调用,以实现阻塞客户端的命令。
当客户端解除阻塞时,privdata
数据将使用 free_privdata
回调自动释放。这很有用,因为回复回调可能永远不会被调用,以防客户端超时或与服务器断开连接,因此重要的是,如果需要释放传递的数据,应由外部函数负责。
为了更好地理解API的工作原理,我们可以想象编写一个命令,该命令会阻塞客户端一秒钟,然后发送回复“Hello!”。
注意:为了简化示例,此命令中未实现参数数量检查和其他非重要事项。
int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc)
{
RedisModuleBlockedClient *bc =
RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);
pthread_t tid;
pthread_create(&tid,NULL,threadmain,bc);
return REDISMODULE_OK;
}
void *threadmain(void *arg) {
RedisModuleBlockedClient *bc = arg;
sleep(1); /* Wait one second and unblock. */
RedisModule_UnblockClient(bc,NULL);
}
上述命令会尽快阻塞客户端,生成一个线程,该线程将等待一秒钟并解除对客户端的阻塞。让我们检查一下回复和超时回调,在我们的案例中它们非常相似,因为它们只是用不同的回复类型回复客户端。
int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc)
{
return RedisModule_ReplyWithSimpleString(ctx,"Hello!");
}
int timeout_func(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc)
{
return RedisModule_ReplyWithNull(ctx);
}
回复回调只是向客户端发送“Hello!”字符串。 这里重要的是,当客户端从线程中解除阻塞时,会调用回复回调。
超时命令返回NULL
,这在实际的Redis阻塞命令超时时经常发生。
解除屏蔽时传递回复数据
上面的例子很容易理解,但缺少实际阻塞命令实现的一个重要现实方面:通常回复函数需要知道如何回复客户端,而这些信息通常在客户端解除阻塞时提供。
我们可以修改上面的例子,使线程在等待一秒后生成一个随机数。你可以将其视为某种实际上的扩展操作。然后,这个随机数可以传递给回复函数,以便我们将其返回给命令调用者。为了使这个功能正常工作,我们按照以下方式修改函数:
void *threadmain(void *arg) {
RedisModuleBlockedClient *bc = arg;
sleep(1); /* Wait one second and unblock. */
long *mynumber = RedisModule_Alloc(sizeof(long));
*mynumber = rand();
RedisModule_UnblockClient(bc,mynumber);
}
如你所见,现在解锁调用正在传递一些私有数据,即mynumber
指针,给回复回调。为了获取这些私有数据,回复回调将使用以下函数:
void *RedisModule_GetBlockedClientPrivateData(RedisModuleCtx *ctx);
所以我们的回复回调是这样修改的:
int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc)
{
long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
/* IMPORTANT: don't free mynumber here, but in the
* free privdata callback. */
return RedisModule_ReplyWithLongLong(ctx,mynumber);
}
请注意,在使用RedisModule_BlockClient()
阻塞客户端时,我们还需要传递一个free_privdata
函数,因为分配的长值必须被释放。我们的回调函数将如下所示:
void free_privdata(void *privdata) {
RedisModule_Free(privdata);
}
注意:需要强调的是,私有数据最好在free_privdata
回调中释放,因为如果客户端断开连接或超时,回复函数可能不会被调用。
还要注意,私有数据也可以通过超时回调访问,始终使用GetBlockedClientPrivateData()
API。
中止对客户端的阻止
有时会出现的一个问题是,我们需要分配资源以实现非阻塞命令。因此,我们阻塞客户端,然后,例如,尝试创建一个线程,但线程创建函数返回一个错误。在这种情况下,为了恢复,我们该怎么办?我们不想让客户端保持阻塞状态,也不想调用UnblockClient()
,因为这将触发回复回调被调用。
在这种情况下,最好的做法是使用以下函数:
int RedisModule_AbortBlock(RedisModuleBlockedClient *bc);
实际上,这是如何使用它:
int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc)
{
RedisModuleBlockedClient *bc =
RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);
pthread_t tid;
if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
RedisModule_AbortBlock(bc);
RedisModule_ReplyWithError(ctx,"Sorry can't create a thread");
}
return REDISMODULE_OK;
}
客户端将被解除阻塞,但不会调用回复回调。
使用单一函数实现命令、回复和超时回调
以下函数可用于实现回复和回调,使用与实现主要命令功能相同的函数:
int RedisModule_IsBlockedReplyRequest(RedisModuleCtx *ctx);
int RedisModule_IsBlockedTimeoutRequest(RedisModuleCtx *ctx);
因此,我可以重写示例命令,而不使用单独的回复和超时回调:
int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc)
{
if (RedisModule_IsBlockedReplyRequest(ctx)) {
long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
return RedisModule_ReplyWithLongLong(ctx,mynumber);
} else if (RedisModule_IsBlockedTimeoutRequest) {
return RedisModule_ReplyWithNull(ctx);
}
RedisModuleBlockedClient *bc =
RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);
pthread_t tid;
if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
RedisModule_AbortBlock(bc);
RedisModule_ReplyWithError(ctx,"Sorry can't create a thread");
}
return REDISMODULE_OK;
}
功能上是相同的,但有些人会更喜欢将大部分命令逻辑集中在一个函数中的不那么冗长的实现。
在线程内处理数据的副本
为了处理实现命令慢速部分的线程,一个有趣的模式是使用数据的副本,这样当在某个键上执行操作时,用户仍然可以看到旧版本。然而,当线程完成其工作时,表示会被交换,新的处理版本将被使用。
这种方法的一个例子是 Neural Redis模块 其中神经网络在不同的线程中进行训练,而用户仍然可以执行和检查它们的旧版本。
未来工作
目前正在开发一个API,以便允许以安全的方式从线程调用Redis模块API,从而使线程命令能够访问数据空间并执行增量操作。
此功能没有预计到达时间,但可能会在Redis 4.0发布过程中的某个时候出现。