分布式锁:红锁redLock和联锁multiLock源码分析

导读:本篇文章讲解 分布式锁:红锁redLock和联锁multiLock源码分析,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

原文链接:https://blog.csdn.net/u013066244/article/details/109766759
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

long leaseTime = 3L;
lock.lock(leaseTime, TimeUnit.SECONDS);
RedissonRedLock redLock = new RedissonRedLock(lock);
// 有参数的情况
redLock.lock(leaseTime, TimeUnit.SECONDS);

在这里插入图片描述

/**
 * leaseTime租约时间,也就是键key的过期时间
 */
public void lock(long leaseTime, TimeUnit unit) {
    try {
        lockInterruptibly(leaseTime, unit);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

在这里插入图片描述

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
	// 以毫秒来算的话,1500==1.5s * 3 = 4.5s
	// 默认等待时间为4.5s
    long baseWaitTime = locks.size() * 1500;
    long waitTime = -1;
    if (leaseTime == -1) {
    	// 当没有设置租约时间时,waitTime 等于默认4.5s
        waitTime = baseWaitTime;
    } else {
    	// 设置了租约时间的情况
    	// 将前端传入的租约时间转为毫秒
        leaseTime = unit.toMillis(leaseTime);
        // 租约时间赋值给waitTime
        waitTime = leaseTime;
        if (waitTime <= 2000) {
        	// 小于 2s情况
            waitTime = 2000;
        } else if (waitTime <= baseWaitTime) {
        	// ThreadLocalRandom.current() 多线程下生成随机数
        	// 假设传入3s,那么就会在(3/2=1s, 3s)之间产生随机数赋值给waitTime
            waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
        } else {
            // 假设传入6s,那么就会在(4.5s, 6s)之间产生随机数赋值给waitTime
            waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
        }
    }
    // 假设leaseTime传入的是3s, 根据随机策略(1s, 3s),假设生成随机数是2s
    while (true) {
    	// waitTime = 2s, leaseTime = 3s
        if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
        	// 可以看出lock方法只能成功,才会生成退出。否则就得报异常退出
            return;
        }
    }
}

在这里插入图片描述
在这里插入图片描述

// waitTime = 2s, leaseTime = 3s, 时间单位毫秒
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1;
    // 设置了租约时间的情况
    // 这段逻辑 就是对有租约时间的情况下,设置newLeaseTime,实际获取分布式锁时用到
    if (leaseTime != -1) {
        if (waitTime == -1) {
            newLeaseTime = unit.toMillis(leaseTime);
        } else {
        	// 设置了等待时间的场景
        	// 根据我假设的情况,这里2 * 2 = 4s;
        	// 这里乘以2,个人认为,从申请获取锁到真正获取到锁是有时间消耗的,
        	// 为了防止获取到的锁不至于立马过期,所以乘以2,其实我觉得leaseTime*2也可以;
        	// 因为从代码最后释放锁的逻辑来看,这里的租约时间多长,并不会影响最后锁的统一释放
            newLeaseTime = unit.toMillis(waitTime)*2;
        }
    }
    // 当前时间 毫秒
    long time = System.currentTimeMillis();
    // 总的等待时间
    long remainTime = -1;
    if (waitTime != -1) {
    	// 因为waitTime=2s,所以remainTime=2s
        remainTime = unit.toMillis(waitTime);
    }
    // 这里红锁代码重写了calcLockWaitTime()方法
    // 根据下面代码calcLockWaitTime分析,此时lockWaitTime=1s;
    // 计算每个锁的等待时间  在联锁的场景下,就等于remainTime
    long lockWaitTime = calcLockWaitTime(remainTime);
    // 允许获取锁失败的次数 在联锁的场景下,固定为0
    int failedLocksLimit = failedLocksLimit();
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    // 循环每个redis客户端,去获取锁
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
            	// 红锁的情况中,根据计算规则肯定是取lockWaitTime
            	// lockWaitTime = 1s, remainTime=2s 取最小值 1s
                long awaitTime = Math.min(lockWaitTime, remainTime);
                // awaitTime=1s, newLeaseTime=4s 开始尝试获取锁
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            lockAcquired = false;
        }
        
        if (lockAcquired) {
        	// 如果获取到了,就记录起来
            acquiredLocks.add(lock);
        } else {
        	// 失败的话,比较下是否到了失败次数
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
				// 3 - 2 也就是说 只有在成功两个,失败一个情况下,才会执行这里
                // 换句话说,红锁的场景下走这里,联锁场景下一定不执行这里
                break;
            }
            
            // 重试机制
            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == -1) {
                    return false;
                }
                // 重置失败次数、锁列表、遍历游标,这说明要进行重试了
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
            	// 红锁失败场景执行,因为联锁的场景中failedLocksLimit=0
                failedLocksLimit--;
            }
        }
        // 超时控制代码块
        if (remainTime != -1) {
        	// System.currentTimeMillis() - time 单个Redis实例获取锁花费的时间
         	// remainTime = remainTime - System.currentTimeMillis() - time;
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
            	// 从这段逻辑可以看出 remainTime 就是总的等待时间,如果超过了,还没有走出循环,说明获取锁失败
            	// 对已经获取到的锁进行释放
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }
	
	// leaseTime = 3s
	// 下面逻辑是,key到了过期时间后,Redis利用异步线程进行删除,释放锁;
    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        // 设置好过期时间
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
        // 同步可中断的方式来释放到期锁
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
    
    return true;
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

// 红锁重写
// 根据我的假设,此时remainTime=2s,locks.size()=3
@Override
protected long calcLockWaitTime(long remainTime) {
	// Math.max(2 / 3, 1) = (0, 1) = 1
	// 说明:按照下面算法,应该是每个实例等待时间和1进行比较,取最大值
    return Math.max(remainTime / locks.size(), 1);
}

在这里插入图片描述

// 锁可以失败的次数,锁的数量-锁成功客户端最小的数量
protected int failedLocksLimit() {
    return locks.size() - minLocksAmount(locks);
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

// 假设waitTime=1s, leaseTime=3s,单位毫秒
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
	// 设置了租约时间的情况
    if (leaseTime != -1) {
    	// tryLockInnerAsync 真正加锁的方法,其会调用脚本执行redis命令插入key
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 没有设置租约时间的情况
    // 程序(看门狗)会设置一个默认值为30s的租约时间
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                            commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired  获取锁成功
        if (ttlRemaining == null) {
        	// 对该设置时间轮询器,也可以理解为监听器
        	// 该监听器会以租约时间的三分之一的频率,不断延迟租约时间
        	// 目的是为了防止,业务程序还没有跑完,锁就被释放掉了。
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

在这里插入图片描述

private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
    	// 说明该线程已经获取到锁了,
    	// 内部变量值会加1,即:可重入锁
        oldEntry.addThreadId(threadId);
    } else {
    	// 该线程第一次获取到该锁
        entry.addThreadId(threadId);
        // 启动时间轮询器,开始每隔leaseTime/3的时间,不断去延长租约时间
        renewExpiration();
    }
}

在这里插入图片描述

private void renewExpiration() {
   ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
   if (ee == null) {
       return;
   }
   // newTimeout 底层使用的是netty
   // 
   Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
       @Override
       public void run(Timeout timeout) throws Exception {
           ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
           if (ent == null) {
               return;
           }
           Long threadId = ent.getFirstThreadId();
           if (threadId == null) {
               return;
           }
           
           RFuture<Boolean> future = renewExpirationAsync(threadId);
           future.onComplete((res, e) -> {
               if (e != null) {
               		// 有异常,结束
                   log.error("Can't update lock " + getName() + " expiration", e);
                   return;
               }
               
               if (res) {
                   // reschedule itself 
                   // 递归调用
                   renewExpiration();
               }
           });
       }
       // 定时器的时间为租约时间的三分之一
   }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
   
   ee.setTimeout(task);
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

@Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    						// 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),
    						// 并通过pexpire设置失效时间(也是锁的租约时间)
                            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                            "if (mode == false) then " +
                              "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                              "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                              "redis.call('set', KEYS[2] .. ':1', 1); " +
                              "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
                              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                              "return nil; " +
                            "end; " +
                            // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
                            "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
                              "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                              "local key = KEYS[2] .. ':' .. ind;" +
                              "redis.call('set', key, 1); " +
                              "redis.call('pexpire', key, ARGV[1]); " +
                              "local remainTime = redis.call('pttl', KEYS[1]); " +
                              "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
                              "return nil; " +
                            "end;" +
                            // 获取分布式锁的KEY的失效时间毫秒数
                            "return redis.call('pttl', KEYS[1]);",
                            // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
                    Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), 
                    internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}

在这里插入图片描述

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    // 释放锁时需要在redis实例上执行的lua命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 如果分布式锁KEY不存在,那么向channel发布一条消息
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // 如果就是当前线程占有分布式锁,那么将重入次数减1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

在这里插入图片描述

long leaseTime = 3L;
lock.lock(leaseTime, TimeUnit.SECONDS);
RedissonRedLock redLock = new RedissonRedLock(lock);
// 有参数的情况
redLock.lock();

在这里插入图片描述

// 租约时间为 -1  相当于没有设置
@Override
public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);
}

在这里插入图片描述

waitTime=4.5s leaseTime = -1
...
long remainTime = -1;
if (waitTime != -1) {
	// remainTime = waitTime = 4.5s
    remainTime = unit.toMillis(waitTime);
}
// 这里红锁代码重写了calcLockWaitTime()方法
// 根据下面代码calcLockWaitTime分析,此时lockWaitTime=1s;
// 计算每个锁的等待时间 (4.5/3 = 1, 1)取最大,=1
long lockWaitTime = calcLockWaitTime(remainTime);
// 允许获取锁的失败次数=1
int failedLocksLimit = failedLocksLimit();
...

在这里插入图片描述

...
// min(1, 4.5) = 1
// 在redLock场景中lockWaitTime永远都会比remainTime值小
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime=1, newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
...
// 无参的情况下leaseTime=-1,也就说无参的情况下,必须手动释放锁;redis不会自动释放
if (leaseTime != -1) {
    List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
    for (RLock rLock : acquiredLocks) {
        RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
        futures.add(future);
    }
    
    for (RFuture<Boolean> rFuture : futures) {
        rFuture.syncUninterruptibly();
    }
}
...

在这里插入图片描述

calcLockWaitTime()
failedLocksLimit()

在这里插入图片描述

long leaseTime = 3L;
RedissonMultiLock multiLock = new RedissonMultiLock(lock);
multiLock.lock(leaseTime, TimeUnit.SECONDS);

在这里插入图片描述

public void lock(long leaseTime, TimeUnit unit) {
    try {
        lockInterruptibly(leaseTime, unit);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
// leaseTime=3s 
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    // 默认等待时间
    long baseWaitTime = locks.size() * 1500;
    long waitTime = -1;
    if (leaseTime == -1) {
    	// waitTime = 3 * 1.5 = 4.5s
        waitTime = baseWaitTime;
    } else {
        leaseTime = unit.toMillis(leaseTime);
        // 3s
        waitTime = leaseTime;
        if (waitTime <= 2000) {
            waitTime = 2000;
        } else if (waitTime <= baseWaitTime) {
        	// 3 <= 4.5 
        	// 3/2=1, 3  假设随机数为2
            waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
        } else {
            waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
        }
    }
    
    while (true) {
    	// waitTime 2s,leaseTime=3s
        if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
            return;
        }
    }
}

在这里插入图片描述

long remainTime = -1;
if (waitTime != -1) {
	// 2s
    remainTime = unit.toMillis(waitTime);
}
// 联锁的场景下,就是remainTime=2s
long lockWaitTime = calcLockWaitTime(remainTime);
// 联锁的场景下,固定为0
int failedLocksLimit = failedLocksLimit();

在这里插入图片描述

...
// 联锁的场景下,lockWaitTime和remainTime是相等的;
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime = lockWaitTime = remainTime = 2s,newLeaseTime=4s
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...

在这里插入图片描述

if (failedLocksLimit == 0) {
	// 失败次数用完了
    unlockInner(acquiredLocks);
    if (waitTime == -1) {
        return false;
    }
    // 重置失败次数、锁列表、遍历游标,这说明要进行重试了
    failedLocksLimit = failedLocksLimit();
    acquiredLocks.clear();
    // reset iterator
    while (iterator.hasPrevious()) {
        iterator.previous();
    }
}
// 时间控制   可以看出上面的逻辑存在重试机制,所以才有下面的超时判断逻辑
if (remainTime != -1) {
    remainTime -= System.currentTimeMillis() - time;
    time = System.currentTimeMillis();
    if (remainTime <= 0) {
        unlockInner(acquiredLocks);
        return false;
    }
}

在这里插入图片描述

RedissonMultiLock multiLock = new RedissonMultiLock(lock);
multiLock.lock();

在这里插入图片描述

waitTime=4.5s  leaseTime=-1

在这里插入图片描述

long remainTime = -1;
if (waitTime != -1) {
	// remainTime = waitTime=4.5s
    remainTime = unit.toMillis(waitTime);
}
// lockWaitTime = remainTime = 4.5s
long lockWaitTime = calcLockWaitTime(remainTime);
// 固定为0
int failedLocksLimit = failedLocksLimit();

在这里插入图片描述

...
// lockWaitTime = remainTime = 4.5s
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime=4.5s  newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...

在这里插入图片描述

RedissonRedLock redLock = new RedissonRedLock(lock);
redLock.tryLock();
public boolean tryLock() {
   try {
       return tryLock(-1, -1, null);
   } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       return false;
   }
}

在这里插入图片描述

waitTime=-1s  leaseTime=-1

在这里插入图片描述

...
// 红锁 Max(-1/3, 1) 为1
long lockWaitTime = calcLockWaitTime(remainTime);
// 红锁 3 - 3/2+1 = 1
int failedLocksLimit = failedLocksLimit();
...

在这里插入图片描述

...
if (waitTime == -1 && leaseTime == -1) {
    // 逻辑会走这里 尝试获取锁
    lockAcquired = lock.tryLock();
}
...

在这里插入图片描述

tryLock(waitTime, -1, unit)

在这里插入图片描述

...
long remainTime = -1;
if (waitTime != -1) {
	// remainTime = 7s
    remainTime = unit.toMillis(waitTime);
}
//max(7/3=2, 1)=2
long lockWaitTime = calcLockWaitTime(remainTime);
// 1
int failedLocksLimit = failedLocksLimit();
...

在这里插入图片描述

...
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime = lockWaitTime = 2s, newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...

在这里插入图片描述

public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    return tryLock(waitTime, -1, unit);
}

在这里插入图片描述

// lockWaitTime = remainTime = waitTime =7s
long lockWaitTime = calcLockWaitTime(remainTime);
// 0
int failedLocksLimit = failedLocksLimit();

在这里插入图片描述

...
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime = lockWaitTime = remainTime = 7s, newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...

在这里插入图片描述

tryLock(-1, -1, null);

在这里插入图片描述

// lockWaitTime = remainTime = waitTime =-1s
long lockWaitTime = calcLockWaitTime(remainTime);
// 0
int failedLocksLimit = failedLocksLimit();

在这里插入图片描述

...
if (waitTime == -1 && leaseTime == -1) {
    lockAcquired = lock.tryLock();
}
...

在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/73816.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!