经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Redis » 查看文章
Redisson分布式锁实现
来源:cnblogs  作者:不要乱摸  时间:2018/10/23 9:28:42  对本文有异议

1.  基本用法

  1. <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.8.2</version></dependency>
  1. Config config = new Config();
  2. config.useClusterServers()
  3.     .setScanInterval(2000) // cluster state scan interval in milliseconds.addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
  4.     .addNodeAddress("redis://127.0.0.1:7002");
  5.  
  6. RedissonClient redisson = Redisson.create(config);
  7.  
  8. RLock lock = redisson.getLock("anyLock");
  9.  
  10. lock.lock();try {
  11.     ...
  12. } finally {
  13.     lock.unlock();
  14. }

针对上面这段代码,重点看一下Redisson是如何基于Redis实现分布式锁的

Redisson中提供的加锁的方法有很多,但大致类似,此处只看lock()方法

更多请参见 https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers

2.  加锁

可以看到,调用getLock()方法后实际返回一个RedissonLock对象,在RedissonLock对象的lock()方法主要调用tryAcquire()方法

由于leaseTime == -1,于是走tryLockInnerAsync()方法,这个方法才是关键

首先,看一下evalWriteAsync方法的定义

  1. <TR> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

最后两个参数分别是keys和params

实际调用是这样的:

单独将调用的那一段摘出来看

  1. commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,                  "if (redis.call('exists', KEYS[1]) == 0) then " +
  2.                       "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  3.                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  4.                       "return nil; " +
  5.                   "end; " +
  6.                   "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  7.                       "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  8.                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  9.                       "return nil; " +
  10.                   "end; " +
  11.                   "return redis.call('pttl', KEYS[1]);",
  12.                     Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

结合上面的参数声明,我们可以知道,这里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)

假设前面获取锁时传的name是&ldquo;abc&rdquo;,假设调用的线程ID是Thread-1,假设成员变量UUID类型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c

那么KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,这段脚本的意思是

  1、判断有没有一个叫&ldquo;abc&rdquo;的key

  2、如果没有,则在其下设置一个字段为&ldquo;6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1&rdquo;,值为&ldquo;1&rdquo;的键值对 ,并设置它的过期时间

  3、如果存在,则进一步判断&ldquo;6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1&rdquo;是否存在,若存在,则其值加1,并重新设置过期时间

  4、返回&ldquo;abc&rdquo;的生存时间(毫秒)

这里用的数据结构是hash,hash的结构是: key  字段1  值1 字段2  值2  。。。

用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程

所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数)

3.  解锁

  1. protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +
  2.                 "redis.call('publish', KEYS[2], ARGV[1]); " +
  3.                 "return 1; " +
  4.             "end;" +
  5.             "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  6.                 "return nil;" +
  7.             "end; " +
  8.             "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  9.             "if (counter > 0) then " +
  10.                 "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  11.                 "return 0; " +
  12.             "else " +
  13.                 "redis.call('del', KEYS[1]); " +
  14.                 "redis.call('publish', KEYS[2], ARGV[1]); " +
  15.                 "return 1; "+
  16.             "end; " +
  17.             "return nil;",
  18.             Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
  19.  
  20. }

我们还是假设name=abc,假设线程ID是Thread-1

同理,我们可以知道

KEYS[1]是getName(),即KEYS[1]=abc

KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{abc}

ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

ARGV[2]是生存时间

ARGV[3]是getLockName(threadId),即ARGV[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,上面脚本的意思是:

  1、判断是否存在一个叫&ldquo;abc&rdquo;的key

  2、如果不存在,向Channel中广播一条消息,广播的内容是0,并返回1

  3、如果存在,进一步判断字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在

  4、若字段不存在,返回空,若字段存在,则字段值减1

  5、若减完以后,字段值仍大于0,则返回0

  6、减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;

可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了

4.  等待

以上是正常情况下获取到锁的情况,那么当无法立即获取到锁的时候怎么办呢?

再回到前面获取锁的位置

  1. @Overridepublic void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {long threadId = Thread.currentThread().getId();
  2.     Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return;
  3.     }//    订阅RFuture<RedissonLockEntry> future = subscribe(threadId);
  4.     commandExecutor.syncSubscription(future);try {while (true) {
  5.             ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;
  6.             }// waiting for messageif (ttl >= 0) {
  7.                 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  8.             } else {
  9.                 getEntry(threadId).getLatch().acquire();
  10.             }
  11.         }
  12.     } finally {
  13.         unsubscribe(future, threadId);
  14.     }//        get(lockAsync(leaseTime, unit));}protected static final LockPubSub PUBSUB = new LockPubSub();protected RFuture<RedissonLockEntry> subscribe(long threadId) {return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
  15. }protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
  16.     PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
  17. }

这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源

当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞

5.  小结

 

 

 

6.  其它相关

基于Redis的分布式锁的简单实现

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号