经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Redis » 查看文章
Redisson分布式信号量RSemaphore的使用超详细讲解
来源:jb51  时间:2023/2/13 8:43:54  对本文有异议

本篇文章基于redisson-3.17.6版本源码进行分析

一、RSemaphore的使用

  1. @Test
  2. public void testRSemaphore() {
  3. Config config = new Config();
  4. config.useSingleServer().setAddress("redis://127.0.0.1:6379");
  5. RedissonClient redissonClient = Redisson.create(config);
  6. RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore");
  7. // 设置5个许可,模拟五个停车位
  8. rSemaphore.trySetPermits(5);
  9. // 创建10个线程,模拟10辆车过来停车
  10. for (int i = 1; i <= 10; i++) {
  11. new Thread(() -> {
  12. try {
  13. rSemaphore.acquire();
  14. System.out.println(Thread.currentThread().getName() + "进入停车场...");
  15. TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
  16. System.out.println(Thread.currentThread().getName() + "离开停车场...");
  17. rSemaphore.release();
  18. } catch (InterruptedException e) {
  19. throw new RuntimeException(e);
  20. }
  21. }, "A" + i).start();
  22. }
  23. try {
  24. TimeUnit.MINUTES.sleep(1);
  25. } catch (InterruptedException e) {
  26. throw new RuntimeException(e);
  27. }
  28. }

二、RSemaphore设置许可数量

初始化RSemaphore,需要调用trySetPermits()设置许可数量:

  1. /**
  2. * 尝试设置许可数量,设置成功,返回true,否则返回false
  3. */
  4. boolean trySetPermits(int permits);

trySetPermits()内部调用了trySetPermitsAsync():

  1. // 异步设置许可
  2. @Override
  3. public RFuture<Boolean> trySetPermitsAsync(int permits) {
  4. RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  5. // 判断分布式信号量的key是否存在,如果不存在,才设置
  6. "local value = redis.call('get', KEYS[1]); " +
  7. "if (value == false) then "
  8. // set "semaphore" permits
  9. // 使用String数据结构设置信号量的许可数
  10. + "redis.call('set', KEYS[1], ARGV[1]); "
  11. // 发布一条消息到redisson_sc:{semaphore}通道
  12. + "redis.call('publish', KEYS[2], ARGV[1]); "
  13. // 设置成功,返回1
  14. + "return 1;"
  15. + "end;"
  16. // 否则返回0
  17. + "return 0;",
  18. Arrays.asList(getRawName(), getChannelName()), permits);
  19. if (log.isDebugEnabled()) {
  20. future.thenAccept(r -> {
  21. if (r) {
  22. log.debug("permits set, permits: {}, name: {}", permits, getName());
  23. } else {
  24. log.debug("unable to set permits, permits: {}, name: {}", permits, getName());
  25. }
  26. });
  27. }
  28. return future;
  29. }

可以看到,设置许可数量底层使用LUA脚本,实际上就是使用redis的String数据结构,保存了我们指定的许可数量。如下图:

参数说明:

  • KEYS[1]: 我们指定的分布式信号量key,例如redissonClient.getSemaphore("semaphore")中的"semaphore")
  • KEYS[2]: 释放锁的channel名称,redisson_sc:{分布式信号量key},在本例中,就是redisson_sc:{semaphore}
  • ARGV[1]: 设置的许可数量

总结设置许可执行流程为:

  • get semaphore,获取到semaphore信号量的当前的值
  • 第一次数据为0, 然后使用set semaphore 3,将这个信号量同时能够允许获取锁的客户端的数量设置为3。(注意到,如果之前设置过了信号量,将无法再次设置,直接返回0。想要更改信号量总数可以使用addPermits方法)
  • 然后redis发布一些消息,返回1

三、RSemaphore的加锁流程

许可数量设置好之后,我们就可以调用acquire()方法获取了,如果未传入许可数量,默认获取一个许可。

  1. public void acquire() throws InterruptedException {
  2. acquire(1);
  3. }
  4. public void acquire(int permits) throws InterruptedException {
  5. // 尝试获取锁成功,直接返回
  6. if (tryAcquire(permits)) {
  7. return;
  8. }
  9. // 对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息
  10. CompletableFuture<RedissonLockEntry> future = subscribe();
  11. semaphorePubSub.timeout(future);
  12. RedissonLockEntry entry = commandExecutor.getInterrupted(future);
  13. try {
  14. // 不断循环尝试获取许可
  15. while (true) {
  16. if (tryAcquire(permits)) {
  17. return;
  18. }
  19. entry.getLatch().acquire();
  20. }
  21. } finally {
  22. // 取消订阅
  23. unsubscribe(entry);
  24. }
  25. // get(acquireAsync(permits));
  26. }

可以看到,获取许可的核心逻辑在tryAcquire()方法中,如果tryAcquire()返回true说明获取许可成功,直接返回;如果返回false,说明当前没有许可可以使用,则对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息,并通过死循环不断尝试获取锁。

我们看一下tryAcquire()方法的逻辑,内部调用了tryAcquireAsync()方法:

  1. // 异步获取许可
  2. @Override
  3. public RFuture<Boolean> tryAcquireAsync(int permits) {
  4. if (permits < 0) {
  5. throw new IllegalArgumentException("Permits amount can't be negative");
  6. }
  7. if (permits == 0) {
  8. return new CompletableFutureWrapper<>(true);
  9. }
  10. return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  11. // 获取当前剩余的许可数量
  12. "local value = redis.call('get', KEYS[1]); " +
  13. // 许可不为空,并且许可数量 大于等于 当前线程申请的许可数量
  14. "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
  15. // 通过decrby减少剩余可用许可
  16. "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
  17. // 返回1
  18. "return 1; " +
  19. "end; " +
  20. // 其它情况,返回0
  21. "return 0;",
  22. Collections.<Object>singletonList(getRawName()), permits);
  23. }

从源码可以看到,获取许可就是操作redis中的数据,首先获取到redis中剩余的许可数量,只有当剩余的许可数量大于线程申请的许可数量时,才获取成功,返回1;否则获取失败,返回0;

总结加锁执行流程为:

  • get semaphore,获取到一个当前的值,比如说是3,3 > 1
  • decrby semaphore 1,将信号量允许获取锁的客户端的数量递减1,变成2
  • decrby semaphore 1
  • decrby semaphore 1
  • 执行3次加锁后,semaphore值为0
  • 此时如果再来进行加锁则直接返回0,然后进入死循环去获取锁

四、RSemaphore的解锁流程

通过前面对RSemaphore获取锁的分析,我们很容易能猜到,释放锁,无非就是归还许可数量到redis中。我们查看具体的源码:

  1. public RFuture<Void> releaseAsync(int permits) {
  2. if (permits < 0) {
  3. throw new IllegalArgumentException("Permits amount can't be negative");
  4. }
  5. if (permits == 0) {
  6. return new CompletableFutureWrapper<>((Void) null);
  7. }
  8. RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
  9. // 通过incrby增加许可数量
  10. "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
  11. // 发布一条消息到redisson_sc:{semaphore}中
  12. "redis.call('publish', KEYS[2], value); ",
  13. Arrays.asList(getRawName(), getChannelName()), permits);
  14. if (log.isDebugEnabled()) {
  15. future.thenAccept(o -> {
  16. log.debug("released, permits: {}, name: {}", permits, getName());
  17. });
  18. }
  19. return future;
  20. }

到此这篇关于Redisson分布式信号量RSemaphore的使用超详细讲解的文章就介绍到这了,更多相关Redisson RSemaphore内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号