Redis 锁

简单锁
通过使用 redis 的 setnx 方法设置值,如果设置成功则说明获得锁
String lockKey = "myKey";
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, true);
try {
if (lock) {
// 获得锁,执行业务代码
}
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
这种方式可以实现最简单的分布式锁,但存在较多问题,例如,在执行业务代码的时候线程被 kill 掉的话,将无法释放出该锁,导致后续线程将会一直无法获得锁。
配置自动过期时间
使用 setIfAbsent 方法的时候设置过期时间
String lockKey = "myKey";
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, true, 30, TimeUnit.SECONDS);
try {
if (lock) {
// 获得锁,执行业务代码
}
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
注意不要采用下面这种方式设置时间
// 无法保证原子性,代码可能还未设置过期时间就被 kill 掉
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, true);
redisTemplate.expire(lockKey, 30, TimeUnit.SECONDS);
采用 setnx 和 过期时间 就可以避免锁无法被释放的问题。但仍存在问题,过期时间的设置也会存在问题,例如
- 过了 30s 的时候线程 A 业务代码还没执行完成,此时锁将被自动释放
- 此时线程 B 就可以获取锁执行业务代码
- 在线程 B 执行业务代码的时候,线程 A 执行完成并将线程 B 的锁释放
- 此时线程 C 也可以获取锁执行业务代码
- 持续循环…
这样就会造成锁失效的问题。
配置锁 id
设置 clientId 到 redis 的值中,配置线程只能释放自己的锁,但是这样仍然无法解决锁失效的问题,因为问题的关键在于锁的过期时间设置。
String lockKey = "myKey";
String clientId = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
try {
if (lock) {
// 获得锁,执行业务代码
}
} finally {
// 释放锁
String id = Optional.ofNullable(redisTemplate.opsForValue().get(lockKey))
.map(String::valueOf)
.orElse("");
if (StringUtils.equals(clientId, id)) {
redisTemplate.delete(lockKey);
}
}
配置“看门狗”线程
每隔 10s 自动更新 lock 为 30s,直到业务代码执行完成之后再停止这个自动续期线程
String lockKey = "myKey";
String clientId = UUID.randomUUID().toString();
int timeout = 30;
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId, timeout, TimeUnit.SECONDS);
Thread thread = null;
try {
if (lock) {
// 配置自动续期线程
thread = new Thread(() -> {
while (Thread.currentThread().isInterrupted()) {
try {
TimeUnit.SECONDS.sleep(timeout / 3);
redisTemplate.expire(lockKey, timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} );
thread.start();
// 执行业务代码
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 停止自动续期线程
thread.interrupt();
// 释放锁
String id = Optional.ofNullable(redisTemplate.opsForValue().get(lockKey))
.map(String::valueOf)
.orElse("");
if (StringUtils.equals(clientId, id)) {
redisTemplate.delete(lockKey);
}
}
这样可以实现基本的分布式锁功能,但代码太过冗余,引入了太多业务代码无关的内容,所以可以直接使用 redisson 来解决问题
Redisson 锁
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>
编写配置类
@Configuration
public class RedissonConfigure {
@Bean
RedissonClient redissonClient() {
// 1、创建配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
// 集群模式
// useClusterServers ( )
//.addNodeAddress ("redis://127.0.0.1:7004", "redis://127.0.0.1:7001");
// 密码设置
//.setPassword ("abc123456");
// 2、根据 Config 实例创建出 RedissonClient 对象
return Redisson.create(config);
}
}
使用
@Resource
private RedissonClient redissonClient;
@GetMapping("/test")
private void redisson() {
String key = "myKey";
// 获取锁
RLock lock = redissonClient.getLock(key);
// 加锁
lock.lock();
try {
// 执行业务代码
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}
}
redisson 底层是使用了 Lua 脚本执行的命令
Redisson 源码分析
tryLock()
尝试获取锁的方法,若不配置参数,则立刻返回结果,不阻塞
如果锁可用,则获取锁,并立即返回值true。如果锁不可用,则此方法将立即返回值false
1、 在 Redisson 源码中,使用 tryLockAsync() 方法获取 Future 表达式,通过 get 方法获取对应的值

2、 接着往下查看 tryLockAsync() 方法,获取到了当前线程的 id
3、默认的不设置等待时间 waitTime** **和 leaseTime(租赁时间,即锁的有效时间)
4、接着到了 tryAcquireOnceAsync() 方法中,当不指定 leaseTime 时间时,则使用配置的 internalLockLeaseTime 时间,应该就是 30 秒啦


5、再接着 tryLockInnerAsync() 方法就是实际的获取和设置锁的方法了,这里使用到了 lua (可以确保原子性),而 redisson 是使用 redis 的 hash 数据结构进行存储的
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(
// 获取锁的名字,也就是一开始 redisson.getLock(lockKey) 使用的 lockKey
getRawName(),
// 解码器实例
LongCodec.INSTANCE,
// 不清楚,好像是用来指定返回类型的
command,
// Lua 脚本
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
// key 集合,在脚本中通过 KEYS[1] 获取,索引从 1 开始
Collections.singletonList(getRawName()),
// 锁的有效时间,对应参数1,对应 ARGV[1]
unit.toMillis(leaseTime),
// 线程 id,作为 hash 的 key,对应 ARGV[2]
getLockName(threadId));
}

6、回到刚才调用 Lua 脚本的方法这里,tryAcquireOnceAsync() 方法传递了 RedisCommands.EVAL_NULL_BOOLEAN 作为 command
EVAL_NULL_BOOLEAN 不仅指定了返回类型,还指定了结果转换器 BooleanNullReplayConvertor
在 BooleanNullReplayConvertor 中,则判断 Lua 脚本返回的内容是否为 null,如果为 null 则表示获取锁成功
Lua 脚本
redisson 中使用到的 lua 脚本如下:
- 当 redis 中不存在对应锁的 key 时,则创建指定 key 的 hash,当前线程的 id 作为 hash 的 key,值为 1,默认过期时间为 30 秒
- 当 redis 存在当前的 key,并且 hash 的 key 为当前线程创建的,则会刷新锁的时间(这里应该时给看门狗用的),锁的持有者加一
- 当 redis 存在当前 key,并且 hash 的 key 并非当前线程,则返回锁的剩余时间
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
lock()
1、而在 lock() 方法的流程如下,当不指定参数的时候,同样也是使用默认的有效时间
2、接着使用 tryAcquire() 方法去获取锁

3、这里用到了 tryAcquireAsync() 方法获取锁,跟 tryLock() 方法调用的 tryAcquireOnceAsync() 方法很相似,似乎只是返回类型不同而已
4、在接着就是执行同样的 Lua 脚本啦
5、不过在 lock() 方法中,使用的时 RedisCommands.EVAL_LONG 这个 command,但会发现并没有指定 convertor,所以会使用默认的 EmptyConvertor,直接返回 Lua 脚本的内容(nil 或 剩余时间)


6、现在回到 lock() 方法中,如果获取不到锁(返回值不为 null),则会等待锁释放,这里用到了 redis 的订阅功能
看门狗策略
1、回到获取锁的方法中 tryAcquireOnceAsync(),当获取到锁之后,且未修改锁有效期,则会调用 scheduleExpirationRenewal() 方法开启看门狗线程
2、在 scheduleExpirationRenewal() 方法中,会判断是否存在看门狗线程了,不存在的话则会调用 renewExpiration() 方法
3、在 renewExpiration() 方法中,则创建了一个新的线程,每隔 key 有效时间 / 3,去调用 renewExpirationAsync() 方法更新锁的时间
4、接着就执行对应的脚本啦
unLock()
1、通过调用 unlockAsync() 方法释放锁
2、unlockAsync() 方法定义了释放锁的逻辑
3、unlockInnerAsync() 方法才调用了 Lua 脚本进行 Redis 操作
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(
// 获取锁名称
getRawName(),
// 获取解码实例
LongCodec.INSTANCE,
// 设置结果转换器
RedisCommands.EVAL_BOOLEAN,
// 判断当前线程的锁是否存在,锁不存在返回 nil
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 锁存在,则将锁持有者减一
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
// 还剩有锁则刷新锁时间
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 完全释放锁,并往通道发送事件
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
// 对应 KEY[1]、KEY[2]
Arrays.asList(getRawName(), getChannelName()),
// 对应 ARG[1]
LockPubSub.UNLOCK_MESSAGE,
// 对应 ARG[2]
internalLockLeaseTime,
// 对应 ARG[3]
getLockName(threadId));
}
锁的获取过程
redisson 获取锁的过程会存在阻塞,并利用到了 Redis 的订阅发布功能,监听指定通道,当有锁释放的时候,lock() 方法会接收到信号 Semaphore,并再次尝试获取锁
1、查看 redisson 的 lock() 方法,在执行 Lua 脚本之后,如果得到的结果不为 null(没得到锁),则会执行下列逻辑
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// 得到锁,返回
if (ttl == null) {
return;
}
// 订阅锁的通道消息,future 中包含 Semaphore 信号量
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
// 再次尝试获取锁,成功则返回
ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
break;
}
// 根据锁的剩余时间,进行阻塞获取信号量
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
// 锁不会自动过期,则一直阻塞等待
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 获取锁成功则不再订阅
unsubscribe(future, threadId);
}
}
2、查看订阅方法 subscribe(),发现这里的 threadId 并没有作用

3、创建监听器,并进行监听,监听器的 value 中就包含等会返回的 newPromise

4、当监听器被触发的时候,就会调用到 LockPubSub 的 onMessage() 方法,对信号量 Semaphore 继续释放

5、释放了信号量之后,就可以在 lock() 方法中获取到资源往下走
6、而监听器最终会被添加到 RedisPubSubConnection 类的 listeners 队列中,当监听到事件的时候就会依次调用监听器
for update 行锁
For update是MySQL中用于实现行锁的一种语法,其主要作用是在SELECT查询语句中加上FOR UPDATE子句,以保证查询结果集中的每一行都被锁定,避免其他事务对这些行进行修改。
用法
创建一个数据表
CREATE TABLE `xxl_job_lock` (
`lock_name` varchar(50) NOT NULL COMMENT '锁名称',
PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

在 java 代码中的事务使用 for update 获取锁
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
try {
conn = dataSource.getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from SCHEDULE_JOB_LOCK where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// 执行业务代码
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// 提交事务
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
如何判断 for update 是行锁还是表锁
- 是否使用主键或唯一索引:如果查询条件中使用了主键或唯一索引,并且该索引的值在表中确实存在,那么MySQL通常会使用行锁
- 是否使用非唯一索引:如果查询条件中使用了非唯一索引,MySQL可能会使用行锁,但这也取决于具体的查询优化和表的结构。在某些情况下,如果非唯一索引不能有效地缩小结果集,MySQL可能会选择使用表锁。
- 是否使用普通字段:MySQL通常会使用表锁,因为无法有效地确定需要锁定哪些行。
- 本文链接:https://lxjblog.gitee.io/2024/07/20/%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81/
- 版权声明:本博客所有文章除特别声明外,均默认采用 许可协议。