本篇文章基于redisson-3.17.6版本源码进行分析
一、RCountDownLatch的使用
RCountDownLatch的功能跟CountDownLatch,用于实现某个线程需要等待其他线程都完成之后,我再去执行,这种场景就可以使用CountDownLatch。
- @Test
- public void testRCountDownLatch() {
- Config config = new Config();
- config.useSingleServer().setAddress("redis://127.0.0.1:6379");
- RedissonClient redissonClient = Redisson.create(config);
- RCountDownLatch rCountDownLatch = redissonClient.getCountDownLatch("anyCountDownLatch");
- rCountDownLatch.trySetCount(5);
- for (int i = 1; i <= 5; i++) {
- new Thread(() -> {
- System.out.println(Thread.currentThread().getName() + "离开教师...");
- rCountDownLatch.countDown();
- }, "A" + i).start();
- }
- try {
- rCountDownLatch.await();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- System.out.println("班长锁门...");
- }
A1离开教师...
A2离开教师...
A4离开教师...
A3离开教师...
A5离开教师...
班长锁门...
二、trySetCount()设置计数器
- /**
- * 仅当先前的计数已达到零或根本未设置时才设置新的计数值。
- */
- boolean trySetCount(long count);
- public RFuture<Boolean> trySetCountAsync(long count) {
- return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- // 往redis中写入一个String类型的数据 anyCountDownLatch:5
- "if redis.call('exists', KEYS[1]) == 0 then "
- + "redis.call('set', KEYS[1], ARGV[2]); "
- + "redis.call('publish', KEYS[2], ARGV[1]); "
- + "return 1 "
- + "else "
- + "return 0 "
- + "end",
- Arrays.asList(getRawName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE, count);
- }
同样,在redis中写入了一个{key}:{计数器总数}的String类型的数据。
三、countDown()源码
减少锁存器的计数器。当计数达到零时通知所有等待线程。
- public RFuture<Void> countDownAsync() {
- return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- // 减少redis中计数器的值
- "local v = redis.call('decr', KEYS[1]);" +
- // 计数器减为0后,删除对应的key
- "if v <= 0 then redis.call('del', KEYS[1]) end;" +
- "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",
- Arrays.<Object>asList(getRawName(), getChannelName()), CountDownLatchPubSub.ZERO_COUNT_MESSAGE);
- }
四、await()源码
等到计数器达到零。
- public void await() throws InterruptedException {
- // 如果计数器为0,直接返回
- if (getCount() == 0) {
- return;
- }
- // 订阅redisson_countdownlatch__channel__{anyCountDownLatch}的消息
- CompletableFuture<RedissonCountDownLatchEntry> future = subscribe();
- RedissonCountDownLatchEntry entry = commandExecutor.getInterrupted(future);
- try {
- // 不断循环判断计数器的值是否大于0,大于0说明还有线程没执行完成,在这里阻塞:LockSupport.park(this)
- while (getCount() > 0) {
- // waiting for open state
- entry.getLatch().await();
- }
- } finally {
- unsubscribe(entry);
- }
- }
到此这篇关于Redisson分布式闭锁RCountDownLatch的使用详细讲解的文章就介绍到这了,更多相关Redisson RCountDownLatch内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!