Redisson 全面解析:从使用方法到工作原理的深度探索

Redisson是基于原生redis操作指令上进一步的封装,屏蔽了redis数据结构的实现细节,开发可以像操作普通java对象一样使用redis,而本文将针对Redisson中各种使用的数据结构和工具包使用及其实现进行详尽的分析,希望对你有帮助。

一、详解Redisson基本数据类型

1. Redisson前置配置说明

使用redisson的方式比较简单,我们首先需要引入redisson的依赖包:

复制
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.23.5</version> </dependency>1.2.3.4.5.

然后我们指明redis的ip、端口等配置即可:

复制
spring.redis.host=localhost spring.redis.port=63791.2.

有了上述配置后,我们就可以快速完成redisson客户端配置:

复制
@Configuration public class RedissonConfig { @Autowired private RedisProperties redisProperties; @Bean public RedissonClient redissonClient() { Config config = new Config(); String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + ""); config.useSingleServer().setAddress(redisUrl); return Redisson.create(config); } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.

后续在进行使用的时候,我们直接注入对应的客户端依赖即可:

复制
@Autowired private RedissonClient redissonClient;1.2.
2. 以bucket维度操作字符串

和我们第一次使用redis一样,我们先用redisson完成一个字符串的键值对存储,对应的使用例子如下所示,我们只需拿到对应的test-key的bucket即可进行读写操作:

复制
//生成 test-key 的bucket RBucket<Object> bucket = redissonClient.getBucket("test-key"); //查看对应的bucket是否存在 if (ObjUtil.isEmpty(bucket.get())) { //基于set指令进行插入 bucket.set("test-value"); //尝试通过get获取值 Object value = bucket.get(); log.info("value:{}", value); }1.2.3.4.5.6.7.8.9.10.

对于RBucket对象的set和get操作本质上都是基于redis字符串操作指令set和get的一层封装,在我们调用getBucket获取对应key的bucket的时候,redisson会基于当前客户端的连接信息和bucket键进行一次封装得到一个test-key的bucket对象:

对应的我们给出getBucket的底层实现,可以看到逻辑操作就是封装维护如下这份信息:

编码器和解码器codec,默认情况下是Kryo5Codec执行命令的commandExecutor,该对象记录redis客户端的基本信息。name也就是我们要操作的key的信息,也就是字符串key。
复制
public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) { this.codec = codec; this.commandExecutor = commandExecutor; if (name == null) { throw new NullPointerException("name cant be null"); } setName(name); }1.2.3.4.5.6.7.8.9.

然后就是执行set指令了,我们都知道redisson是基于Netty封装的redis操作工具,所以在进行redis操作时涉及了大量优秀的异步读写涉及,我们以上文set操作为例,实际上其底层执行时做了如下几件事:

基于传入的key,也就是我们的test-key定位到slot地址。获取到上一步封装的编码器codec。本次执行是set请求,所以如果我们采用主从模式进行部署,这一步是会从主库获取连接信息,因为我们就配置了一台redis,所以默认直接从默认库获取连接。基于连接信息发送指令。完成操作后归还连接。

这些步骤完成后,操作结果会被封装为Future对象,如果需要直到执行结果,我们调用get即可知晓处理情况:

对应的我们也给出set的源码入口,如笔者所说其底层就是一个set操作的异步调用setAsync,通过该回调会得到一个RFuture对象,通过get即可获取结果:

复制
@Override public void set(V value) { //基于setAsync提交异步set操作,然后通过get获取执行结果 get(setAsync(value)); }1.2.3.4.5.

对应的我们步入setAsync可以看到它会拿着我们上一步初始化所得来的key名称、编码器、set操作指令对象以及编码后的value值通过commandExecutor进行异步写入到redis服务端:

复制
@Override public RFuture<Void> setAsync(V value) { //...... //基于各种信息通过commandExecutor进行异步提交 return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.SET, getRawName(), encode(value)); }1.2.3.4.5.6.

我们再次步入即可来到第一个核心步骤,通过key获取到slot,因为我们部署结构是单体,所以source拿到的是默认值0,然后调用async正式执行异步写操作:

复制
@Override public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) { //定位slot NodeSource source = getNodeSource(key); //执行异步写 return async(false, source, codec, command, params, false, false); }1.2.3.4.5.6.7.

步入async即可看到我们的最核心的步骤了,该方法内部会通过RedisExecutor执行execute方法,大体就是执行了上图所说的:

获取编码器基于读写请求获取连接,注意获取连接的操作是异步的得到连接后调用sendCommand发送set请求,其内部本质上就是基于netty所封装的socketChannel执行set操作。完成写操作后释放连接
复制
public void execute() { //...... //1. 获取编码器 codec = getCodec(codec); //2.基于读写请求获取连接,注意获取连接的操作是异步的 CompletableFuture<RedisConnection> connectionFuture = getConnection(); //...... //3. 得到连接后调用sendCommand发送set请求 connectionFuture.whenComplete((connection, e) -> { //...... sendCommand(attemptPromise, connection); //...... }); attemptPromise.whenComplete((r, e) -> { //完成操作后释放连接 releaseConnection(attemptPromise, connectionFuture); checkAttemptPromise(attemptPromise, connectionFuture); }); }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.
3. 以Java API风格操作redis列表

列表操作就是对于redis列表的封装,可以看到redisson给出的操作函数完全按照java开发的习惯命名:

复制
RList<Object> list = redissonClient.getList("list"); //循环添加元素 for (int i = 0; i < 10; i++) { list.add(i); } //移除索引0位置的元素 list.remove(0);1.2.3.4.5.6.7.

getList和上述bucket操作类似这里就不多追赘述,这里我们就看看add的实现细节,本质上它就是异步调用redis的RPUSH指令将元素追加到列表末尾,整体流程原理和上述set操作差不多,这里就不多做赘述了:

对应的我们也给出底层源码的核心部分的介绍:

复制
@Override public boolean add(V e) { return get(addAsync(e)); } @Override public RFuture<Boolean> addAsync(V e) { //异步执行rpush指令将元素追加到末尾 return addAsync(e, RPUSH_BOOLEAN); }1.2.3.4.5.6.7.8.9.10.
4. 以Java API格式操作字典

映射集也就是我们java中常说的map,redisson底层使用的就是redis的dict字典,对应示例如下所示,注意这个put方法,每次操作后它会有一个返回值,即如果这个key存在于redis中,那么本次put擦咯做结束后就会返回覆盖前的值,就像下面这段代码一样,第二次put操作后就会返回value1:

复制
RMap<String, String> hashMap = redissonClient.getMap("hashMap"); //使用put操作,如果这个key存在则返回这个key原有的value值 String res = hashMap.put("key1", "value1"); log.info("before res:{}", res); res = hashMap.put("key1", "value2"); log.info("after res:{}", res);1.2.3.4.5.6.

这里我们也给出put的核心实现,对应的核心代码就是RedissonMap中的putAsync方法,大体逻辑是进行key和value的检查之后,调用putOperationAsync生成一个异步put操作的任务并得到一个future,最后封装成mapWriterFuture返回:

复制
@Override public RFuture<V> putAsync(K key, V value) { //进行键值对检查 checkKey(key); checkValue(value); //基于putOperationAsync执行键值对插入操作 RFuture<V> future = putOperationAsync(key, value); if (hasNoWriter()) { return future; } //返回结果 return mapWriterFuture(future, new MapWriterTask.Add(key, value)); }1.2.3.4.5.6.7.8.9.10.11.12.13.

所以来到putOperationAsync即可看到这段核心代码的实现,本质上为了保证返回覆盖前的值,redis用到的lua脚本,该脚本的执行流程为:

调用hget判断key是否存在若存在用v记录这个值。调用hset进行键值对设置。返回v即覆盖前的值。

对应的我们也给出这段源代码示例:

复制
protected RFuture<V> putOperationAsync(K key, V value) { String name = getRawName(key); return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE, "local v = redis.call(hget, KEYS[1], ARGV[1]); " + "redis.call(hset, KEYS[1], ARGV[1], ARGV[2]); " + "return v", Collections.singletonList(name), encodeMapKey(key), encodeMapValue(value)); }1.2.3.4.5.6.7.8.
5. 详解redisson自实现的阻塞队列

我们再来个阻塞队列的例子,整体使用也和java的阻塞队列差不多:

复制
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("blockingQueue"); //添加元素 blockingQueue.put("element"); //取出元素 String value = blockingQueue.take(); log.info("value:{}", value);1.2.3.4.5.6.7.

实际上队列的实现也是基于redis的列表,通过rpush实现入队,lpop实现出队:

对应我们也给出入队的代码核心实现印证这一点:

复制
@Override public RFuture<Void> putAsync(V e) { //使用rpush模拟入队 return addAsync(e, RedisCommands.RPUSH_VOID); }1.2.3.4.5.

用blpop实现出队操作:

复制
@Override public RFuture<V> takeAsync() { return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), 0); }1.2.3.4.
6. 详解redisson自实现延迟队列

在上文中我们给出阻塞队列的概念,实际上redisson在此基础上更进一步的封装做出了一个延迟队列的设计,如下面这段示例,该代码会在5s后提交给blockingQueue一个element元素,通过blockingQueue的take方法即可实现5s后准时出去元素:

复制
//创建延迟队列 RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("blockingQueue"); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); //添加元素 delayedQueue.offer("element", 5, TimeUnit.SECONDS); //取出元素 long begin = System.currentTimeMillis(); String value = blockingQueue.take(); long end = System.currentTimeMillis(); log.info("value:{} cost:{}ms", value, end - begin);1.2.3.4.5.6.7.8.9.10.11.12.13.14.

对应的我们也给出这段代码示例的输出结果,可以看到阻塞队列必须等到5s左右才能得到元素:

复制
2025-01-14 10:52:27.134 INFO 17684 --- [ main] com.sharkChili.TestRunner : value:element cost:5034ms1.

其实现原理也很简单,上述代码我们指明了队列名称为blockingQueue,在使用offer进行延迟提交本质上就是通过lua脚本实现元素延迟提交,其工作内容为:

基于我们给定的名称blockingQueue生成一个有序集合redisson_delay_queue_timeout:{blockingQueue}告知element元素的超时时间。基于我们给定的名称blockingQueue生成列表redisson_delay_queue:{blockingQueue}一个编码后的元素值element。到有序集合redisson_delay_queue:{blockingQueue}中查看第一个元素是否是当前元素,如果是则通过publish发送一个给redisson_delay_queue_channel:{blockingQueue}这个topic告知元素提交的到期时间。

对应的我们给出offer底层的实现,可以看到该方法通过我们传入的时间得到一个超时后的时间,然后封装成lua脚本,也就是我们上面所说的含义提交到redis服务端:

复制
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) { //...... //计算超时后的时间 long delayInMs = timeUnit.toMillis(delay); long timeout = System.currentTimeMillis() + delayInMs; //生成随机数构成一个唯一的lua脚本 byte[] random = getServiceManager().generateIdArray(8); //基于随机数生成lua脚本 return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack(Bc0Lc0, string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);" //提交到超时队列redisson_delay_queue_timeout:{blockingQueue}记录元素value插入的时间为ARGV[1],即入参中的timeout + "redis.call(zadd, KEYS[2], ARGV[1], value);" //提交到元素队列redisson_delay_queue:{blockingQueue}当前元素值为element + "redis.call(rpush, KEYS[3], value);" //从redisson_delay_queue_timeout:{blockingQueue}获取第一个元素,如果是当前元素则通过redisson_delay_queue_channel:{blockingQueue}这个channel发布元素的到期时间为ARGV[1],即入参中的timeout + "local v = redis.call(zrange, KEYS[2], 0, 0); " + "if v[1] == value then " + "redis.call(publish, KEYS[4], ARGV[1]); " + "end;", //这个list代表keys列表,getRawName是blockingqueue、timeout就是redisson_delay_queue_timeout:{blockingQueue}、queueName就是redisson_delay_queue:{blockingQueue}、channel就是基于redisson_delay_queue_channel:{blockingQueue} Arrays.asList(getRawName(), timeoutSetName, queueName, channelName), //代表arg timeout即超时的时间,random是随机数、e就是我们本次插入的编码后的element timeout, random, encode(e)); }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.

基于上述的执行脚本,我们的延迟队列在初始化时会创建一个QueueTransferTask,从上一步发布到redisson_delay_queue_channel:{blockingQueue}的信息,这个QueueTransferTask会监听到元素的到期时间然后生成一个定时任务,到点后执行如下逻辑:

从redisson_delay_queue_timeout:{blockingQueue}这个超时队列中获取到期的元素。将元素值提交到blockingQueue中。将本次延迟提交的元素从redisson_delay_queue_timeout:{blockingQueue}、redisson_delay_queue:{blockingQueue}中移除。

由此一次完整的元素提交就成功了:

对应的我们给出延迟队列的初始化代码,它会进行各种队列初始化的任务提交工作,整体步骤为:

基于传入的blockingQueue生成channel、列表、超时队列。它会创建一个lua脚本,内容就是上面所说的延迟提交入队列然后移除延迟提交的任务信息。调用schedule启动task。
复制
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); //基于传入的blockingQueue生成channel、列表、超时队列- channelName = prefixName("redisson_delay_queue_channel", getRawName()); queueName = prefixName("redisson_delay_queue", getRawName()); timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName()); QueueTransferTask task = new QueueTransferTask(commandExecutor.getServiceManager()) { @Override protected RFuture<Long> pushTaskAsync() { //基于初始化的channel、元素列表、延迟队列信息生成lua提交 return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call(zrangebyscore, KEYS[2], 0, ARGV[1], limit, 0, ARGV[2]); " + "if #expiredValues > 0 then " + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack(Bc0Lc0, v);" + "redis.call(rpush, KEYS[1], value);" + "redis.call(lrem, KEYS[3], 1, v);" + "end; " + "redis.call(zrem, KEYS[2], unpack(expiredValues));" + "end; " // get startTime from scheduler queue head task + "local v = redis.call(zrange, KEYS[2], 0, 0, WITHSCORES); " + "if v[1] ~= nil then " + "return v[2]; " + "end " + "return nil;", Arrays.asList(getRawName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); } //初始化channel的topic为 channelName @Override protected RTopic getTopic() { return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName); } }; //调用schedule提交这个task queueTransferService.schedule(queueName, task); //...... }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.

对应我们步入这个schedule方法即可看到,封装的task启动后会执行会监听redisson_delay_queue_channel:{blockingqueue}得到元素的到期时间并基于这个时间到点执行提交队列的lua脚本:

复制
public void start() { //获取到上一步初始化的channel即redisson_delay_queue_channel:{blockingqueue} RTopic schedulerTopic = getTopic(); //...... //订阅这个channel收到消息后,基于对应的startTime即延迟提交元素的到期时间通过scheduleTask执行上述的lua脚本将元素提交至blockingqueue中 messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() { @Override public void onMessage(CharSequence channel, Long startTime) { scheduleTask(startTime); } }); }1.2.3.4.5.6.7.8.9.10.11.12.

如下以来我们只需通过阻塞队列的task方法就可以等到元素到期后取出,完成逻辑闭环。

二、更多关于Redisson

1. 详解Redisson 中的原子类

因为redis执行用户指令是单线程的,所以针对key执行INCR即可实现元素自增,所以redisson也利用到这一点封装了一个原子类,对应的使用示例如下:

复制
RAtomicLong atomicLong = redissonClient.getAtomicLong("atomicLong"); atomicLong.incrementAndGet(); log.info("atomicLong = {}", atomicLong.get());1.2.3.
2. 详解redisson中的发布订阅模型

对应发布订阅模型,redisson也做了很好的封装时,使用时的api也非常方便,如下所示,通过publish即可发布消息,通过addListener即可得到对应的channel和message:

复制
CountDownLatch countDownLatch = new CountDownLatch(2); //订阅topic消息 new Thread(() -> { RTopic topic = redissonClient.getTopic("topic"); topic.addListener(String.class, (c, m) -> { log.info("c:{},m:{}", c, m); }); countDownLatch.countDown(); }).start(); //发布消息到topic new Thread(() -> { RTopic topic = redissonClient.getTopic("topic"); topic.publish("hello redssion"); countDownLatch.countDown(); }).start(); countDownLatch.await(); log.info("finish");1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.

三、小结

本文演示了redisson几个常用的数据结构以及一些简单并发流程工具使用示例和底层源码分析,希望对你有帮助。

阅读剩余
THE END