管道和事务
学习如何使用Redis管道和事务
Redis 允许你将一系列命令批量发送到服务器。 你可以使用两种类型的批量操作:
- 管道通过在一次通信中向服务器发送多个命令来避免网络和处理开销。然后服务器会返回一个包含所有响应的通信。更多信息请参见Pipelining页面。
- 事务保证所有包含的命令将执行完毕,而不会被其他客户端的命令中断。有关更多信息,请参阅事务页面。
执行一个管道
要在管道中执行命令,首先需要创建一个管道对象,然后使用类似于标准命令方法(例如,set()
和 get()
)的方法向其中添加命令。这些命令会在管道中缓冲,只有在调用管道对象的 sync()
方法时才会执行。
管道命令的主要区别在于它们返回
Response
对象,其中 Type
是
标准命令方法的返回类型。一个 Response
对象只有在管道执行完毕后
才包含有效的结果。您可以使用
Response
对象的 get()
方法访问结果。
import java.util.List;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.AbstractPipeline;
import redis.clients.jedis.AbstractTransaction;
import redis.clients.jedis.Response;
public class PipeTransExample {
public void run() {
UnifiedJedis jedis = new UnifiedJedis("redis://localhost:6379");
AbstractPipeline pipe = jedis.pipelined();
for (int i = 0; i < 5; i++) {
pipe.set(String.format("seat:%d", i), String.format("#%d", i));
}
pipe.sync();
pipe = jedis.pipelined();
Response<String> resp0 = pipe.get("seat:0");
Response<String> resp3 = pipe.get("seat:3");
Response<String> resp4 = pipe.get("seat:4");
pipe.sync();
// Responses are available after the pipeline has executed.
System.out.println(resp0.get()); // >>> #0
System.out.println(resp3.get()); // >>> #3
System.out.println(resp4.get()); // >>> #4
AbstractTransaction trans = jedis.multi();
trans.incrBy("counter:1", 1);
trans.incrBy("counter:2", 2);
trans.incrBy("counter:3", 3);
trans.exec();
System.out.println(jedis.get("counter:1")); // >>> 1
System.out.println(jedis.get("counter:2")); // >>> 2
System.out.println(jedis.get("counter:3")); // >>> 3
// Set initial value of `shellpath`.
jedis.set("shellpath", "/usr/syscmds/");
// Start the transaction and watch the key we are about to update.
trans = jedis.transaction(false); // create a Transaction object without sending MULTI command
trans.watch("shellpath"); // send WATCH command(s)
trans.multi(); // send MULTI command
String currentPath = jedis.get("shellpath");
String newPath = currentPath + ":/usr/mycmds/";
// Commands added to the `trans` object
// will be buffered until `trans.exec()` is called.
Response<String> setResult = trans.set("shellpath", newPath);
List<Object> transResults = trans.exec();
// The `exec()` call returns null if the transaction failed.
if (transResults != null) {
// Responses are available if the transaction succeeded.
System.out.println(setResult.get()); // >>> OK
// You can also get the results from the list returned by
// `trans.exec()`.
for (Object item: transResults) {
System.out.println(item);
}
// >>> OK
System.out.println(jedis.get("shellpath"));
// >>> /usr/syscmds/:/usr/mycmds/
}
jedis.close();
}
}
执行一个事务
事务的工作方式与管道类似。使用multi()
创建一个事务对象,在该对象上调用命令方法,然后调用事务对象的exec()
方法来执行它。你可以像使用管道一样,使用Response
对象访问事务中的命令结果。然而,exec()
方法还返回一个List
值,该值包含按命令执行顺序排列的所有结果值(参见下面的监视键的更改以获取使用结果列表的示例)。
import java.util.List;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.AbstractPipeline;
import redis.clients.jedis.AbstractTransaction;
import redis.clients.jedis.Response;
public class PipeTransExample {
public void run() {
UnifiedJedis jedis = new UnifiedJedis("redis://localhost:6379");
AbstractPipeline pipe = jedis.pipelined();
for (int i = 0; i < 5; i++) {
pipe.set(String.format("seat:%d", i), String.format("#%d", i));
}
pipe.sync();
pipe = jedis.pipelined();
Response<String> resp0 = pipe.get("seat:0");
Response<String> resp3 = pipe.get("seat:3");
Response<String> resp4 = pipe.get("seat:4");
pipe.sync();
// Responses are available after the pipeline has executed.
System.out.println(resp0.get()); // >>> #0
System.out.println(resp3.get()); // >>> #3
System.out.println(resp4.get()); // >>> #4
AbstractTransaction trans = jedis.multi();
trans.incrBy("counter:1", 1);
trans.incrBy("counter:2", 2);
trans.incrBy("counter:3", 3);
trans.exec();
System.out.println(jedis.get("counter:1")); // >>> 1
System.out.println(jedis.get("counter:2")); // >>> 2
System.out.println(jedis.get("counter:3")); // >>> 3
// Set initial value of `shellpath`.
jedis.set("shellpath", "/usr/syscmds/");
// Start the transaction and watch the key we are about to update.
trans = jedis.transaction(false); // create a Transaction object without sending MULTI command
trans.watch("shellpath"); // send WATCH command(s)
trans.multi(); // send MULTI command
String currentPath = jedis.get("shellpath");
String newPath = currentPath + ":/usr/mycmds/";
// Commands added to the `trans` object
// will be buffered until `trans.exec()` is called.
Response<String> setResult = trans.set("shellpath", newPath);
List<Object> transResults = trans.exec();
// The `exec()` call returns null if the transaction failed.
if (transResults != null) {
// Responses are available if the transaction succeeded.
System.out.println(setResult.get()); // >>> OK
// You can also get the results from the list returned by
// `trans.exec()`.
for (Object item: transResults) {
System.out.println(item);
}
// >>> OK
System.out.println(jedis.get("shellpath"));
// >>> /usr/syscmds/:/usr/mycmds/
}
jedis.close();
}
}
监视键值的变化
Redis 支持乐观锁以避免对不同键的不一致更新。基本思想是在处理更新时监视事务中使用的任何键的更改。如果监视的键发生变化,则必须使用键中的最新数据重新启动更新。有关乐观锁的更多信息,请参见事务。
下面的代码读取一个表示命令shell的PATH
变量的字符串,然后在尝试将其写回之前将一个新的命令路径附加到该字符串。如果在写入之前监视的键被另一个客户端修改,则事务将中止。请注意,您应该在通常的客户端对象(在我们的示例中称为jedis
)上同步调用监视键的只读命令,但您仍然在事务对象上调用事务的命令。
对于生产使用,您通常会在循环中调用如下代码,以重试直到成功,否则报告或记录失败。
import java.util.List;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.AbstractPipeline;
import redis.clients.jedis.AbstractTransaction;
import redis.clients.jedis.Response;
public class PipeTransExample {
public void run() {
UnifiedJedis jedis = new UnifiedJedis("redis://localhost:6379");
AbstractPipeline pipe = jedis.pipelined();
for (int i = 0; i < 5; i++) {
pipe.set(String.format("seat:%d", i), String.format("#%d", i));
}
pipe.sync();
pipe = jedis.pipelined();
Response<String> resp0 = pipe.get("seat:0");
Response<String> resp3 = pipe.get("seat:3");
Response<String> resp4 = pipe.get("seat:4");
pipe.sync();
// Responses are available after the pipeline has executed.
System.out.println(resp0.get()); // >>> #0
System.out.println(resp3.get()); // >>> #3
System.out.println(resp4.get()); // >>> #4
AbstractTransaction trans = jedis.multi();
trans.incrBy("counter:1", 1);
trans.incrBy("counter:2", 2);
trans.incrBy("counter:3", 3);
trans.exec();
System.out.println(jedis.get("counter:1")); // >>> 1
System.out.println(jedis.get("counter:2")); // >>> 2
System.out.println(jedis.get("counter:3")); // >>> 3
// Set initial value of `shellpath`.
jedis.set("shellpath", "/usr/syscmds/");
// Start the transaction and watch the key we are about to update.
trans = jedis.transaction(false); // create a Transaction object without sending MULTI command
trans.watch("shellpath"); // send WATCH command(s)
trans.multi(); // send MULTI command
String currentPath = jedis.get("shellpath");
String newPath = currentPath + ":/usr/mycmds/";
// Commands added to the `trans` object
// will be buffered until `trans.exec()` is called.
Response<String> setResult = trans.set("shellpath", newPath);
List<Object> transResults = trans.exec();
// The `exec()` call returns null if the transaction failed.
if (transResults != null) {
// Responses are available if the transaction succeeded.
System.out.println(setResult.get()); // >>> OK
// You can also get the results from the list returned by
// `trans.exec()`.
for (Object item: transResults) {
System.out.println(item);
}
// >>> OK
System.out.println(jedis.get("shellpath"));
// >>> /usr/syscmds/:/usr/mycmds/
}
jedis.close();
}
}