专注Java教育14年 全国咨询/投诉热线:400-8080-105
动力节点LOGO图
始于2009,口口相传的Java黄埔军校
首页 hot资讯 Redis集群分布式锁的实现

Redis集群分布式锁的实现

更新时间:2022-02-15 10:18:51 来源:动力节点 浏览2151次

1.Redis集群分布式锁

Redis 单节点实现了分布式锁。如果你通过 Sentinel 有高可用,如果主会话发生变化,如果某些原因发生变化,就会发生锁丢失。

(1)客户端 1 获得 Redis 的 Master 节点上的锁。

(2)Master宕机,存储锁的钥匙还没有到Slave。

(3)Master故障,故障转移,SLAVE节点升级为Master节点。

(4)Client 2 从新的 master 获取相同资源对应的锁。

这样,客户端1和客户端2同时持有相同资源的锁。锁的安全性被破坏了。对于这个问题。Redis 作者 Antirez 提出了 Redlock 算法来解决这个问题。

2.锁

当不同进程需要互斥访问共享资源时,分布式锁是一种非常有用的技术手段。实现高效分布式锁的三个属性需要考虑:

(1)安全属性:互斥,无论何时,只有一个客户端持有锁。

(2)效率属性 A:不会死。

(3)效率属性B:容错性,只要大部分Redis节点正常工作,客户端都可以获取和释放锁。

3.RedLock算法

在算法的分布式版本中,我们假设我们有 N 个完全独立的 Redis Master 节点,我们没有任何副本或其他隐式分布式协调算法。我们已经描述了如何在单节点环境中安全地获取和释放锁。因此,我们自然应该使用这种方法来获取和释放每个单个节点中的锁。在我们的示例中,我们将 N 设置为 5,这个数字是一个比较合理的值,所以我们需要在不同的计算机或虚拟机上运行 5 个 Master 节点,以确保它们不会同时出现。机器。客户端需要执行以下操作才能获得锁:

(1)获取当前时间(单位为毫秒)。

(2)turnt用于n个节点上相同的key和随机值。在这一步中,当客户端请求每个master上的锁时,会有一个比总锁释放时间小的超时。时间。比如锁自动释放时间为10秒,每个节点锁请求的超时时间可能在5-50毫秒的范围内,这样可以防止客户端在一个过期的Master节点上阻塞太久,如果一个Master节点是不可用,我们应该尽快尝试下一个Master节点。

(3)客户端计算第二步花费锁的时间,只有当客户端成功获取锁(这里是3),并且总时间消耗不超过锁释放时间,这个锁才算成功.

(4)如果锁获取成功,锁自动释放时间是在锁消耗之前花费初始锁释放时间的时间。

(5)如果锁获取失败,无论是因为锁成功不超过一半(N/2+1)还是因为总耗时时间超过锁释放时间,客户端都会释放每个Master节点上的锁,即使是那些相信没有成功锁定的人。

4.Redisson实现Redlock

Redisson 包已经封装了 RedLock 算法,该算法需要使用 Redisson 包查看分布式锁的正确姿势。

<!-- JDK 1.8 + 兼容 -->
<依赖>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <版本> 3.9。0 </版本>
</依赖>   
<!-- JDK 1.6 + 兼容 -->
<依赖>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <版本> 2.14。0 </版本>
</依赖>

Redisson管理类:

导入 org.redisson.Redisson;
导入 org.redisson.api.RAtomicLong;
导入 org.redisson.api.RedissonClient;
导入 org.redisson.config.Config;
公共 类RedissonManager {
    私有 静态配置配置 =新配置();
    私有 静态RedissonClient redisson = null ;
    private  static final String RAtomicName = " genId_ " ;
    公共 静态 无效初始化(){
        尝试{
            config.useClusterServers()
                    .setScanInterval( 200000 ) //设置集群状态扫描间隔
                    .setMasterConnectionPoolSize( 10000 ) //设置Master节点的连接池最大连接数为10000 
                    .setSlaveConnectionPoolSize( 10000 ) //设置Master节点的最大连接数SLAVE 节点的连接池为 500 
                    .setIdleConnectionTimeout( 10000 ) //如果当前连接池中的连接数超过了最小空闲连接,同时还有一个连接空闲时间超过了该值,那么这些连接将被自动关闭并从连接池中移除。时间单位是毫秒。
                    .setConnectTimeout( 30000 ) //等待与任意节点建立连接。时间单位是毫秒。
                    .setTimeout( 3000 ) //等待节点回复命令。该时间从命令发送成功时开始。
                    .setRetryInterval( 3000 ) //当与节点断开连接时,等待时间间隔重新建立连接。时间单位是毫秒。
                    .addNodeAddress( " redis://127.0.0.1:7000 " , " redis://127.0.0.1:7001 " , " redis://127.0.0.1:7002 " , " redis://127.0.0.1:7003 " , " redis://127.0.0.1:7004 " , " redis://127.0.0.1:7005 " );
            redisson = Redisson.create(config); 
            RAtomicLong atomicLong = redisson.getAtomicLong (RAtomicName);
            原子长。设置(0);//自增设置为从 0 开始
        } catch (Exception e){
            e.printStackTrace();
        }
    }
    公共 静态RedissonClient getRedisson(){
         if (redisson == null ){
            RedissonManager.init(); //初始化
        }
         return redisson;
    }

我们已经配置了很多参数,其实有十个参数,我们只是设置了几个比较重要的。

getredisson 方法是用户初始化 Redisson。

NEXTID 方法 返回 RatomicName 变量的总次数,也就是我成功使用分布式锁的次数。

分布式锁定操作类:

导入 com.config.RedissonManager;
导入 org.redisson.api.RLock;
导入 org.redisson.api.RedissonClient;
导入 org.slf4j.Logger;
导入 org.slf4j.LoggerFactory;
导入 org.springframework.stereotype.Component;
导入 java.util.concurrent.TimeUnit;  
@零件
public  class RedissonLock {
     private  static final Logger LOGGER = LoggerFactory.getLogger(RedissonLock.class ) ; 
    私有 静态RedissonClient redissonClient = RedissonManager.getRedisson();  
    公共 无效 锁(字符串锁名){
        字符串键=锁名;
        RLock myLock = redissonClient.getLock(key);
        // Lock 提供 Timeout 参数,Timeout 结束强制解锁防止死锁
        myLock. 锁定(2 ,TimeUnit.SECONDS);
        //   1. 最常用的使用方法
        // lock.lock();
        //   2. 支持过期解锁功能,10秒后自动解锁,无需调用UNLOCK方法手动解锁
        // lock.lock(10, TimeUnit.SECONDS);
        //   3. 尝试加锁,等待3秒,加锁后10秒自动解锁
//         try {
 //            boolean res = mylock.tryLock(3, 10, TimeUnit.SECONDS);
//         } catch (InterruptedException e) {
 //             e.printStackTrace();
//         } 
        System.err.println( " ======lock====== " + Thread.currentThread().getName());
    } 
    公共 无效解锁(字符串锁名){
        字符串键=锁名;
        RLock myLock = redissonClient.getLock(key);
        myLock.unlock();
        System.err.println( " ======解锁====== " + Thread.currentThread().getName());
    }
}

LOCK方法是锁定操作,UNLOCK方法是解锁。如果您想了解更多相关知识,可以关注一下动力节点的Java在线学习,里面的课程从入门到精通,通俗易懂,适合小白学习,希望对大家能够有所帮助。

提交申请后,顾问老师会电话与您沟通安排学习

免费课程推荐 >>
技术文档推荐 >>