Redis 锁

image.png

简单锁

通过使用 redis 的 setnx 方法设置值,如果设置成功则说明获得锁

String lockKey = "myKey";
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, true);
try { 
    if (lock) { 
        // 获得锁,执行业务代码
    } 
}  finally { 
    // 释放锁
    redisTemplate.delete(lockKey);
} 

这种方式可以实现最简单的分布式锁,但存在较多问题,例如,在执行业务代码的时候线程被 kill 掉的话,将无法释放出该锁,导致后续线程将会一直无法获得锁。

配置自动过期时间

使用 setIfAbsent 方法的时候设置过期时间

String lockKey = "myKey";
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, true, 30, TimeUnit.SECONDS);
try { 
    if (lock) { 
        // 获得锁,执行业务代码
    } 
}  finally { 
    // 释放锁
    redisTemplate.delete(lockKey);
} 

注意不要采用下面这种方式设置时间

// 无法保证原子性,代码可能还未设置过期时间就被 kill 掉
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, true);
redisTemplate.expire(lockKey, 30, TimeUnit.SECONDS);

采用 setnx 和 过期时间 就可以避免锁无法被释放的问题。但仍存在问题,过期时间的设置也会存在问题,例如

  • 过了 30s 的时候线程 A 业务代码还没执行完成,此时锁将被自动释放
  • 此时线程 B 就可以获取锁执行业务代码
  • 在线程 B 执行业务代码的时候,线程 A 执行完成并将线程 B 的锁释放
  • 此时线程 C 也可以获取锁执行业务代码
  • 持续循环…

这样就会造成锁失效的问题。

配置锁 id

设置 clientId 到 redis 的值中,配置线程只能释放自己的锁,但是这样仍然无法解决锁失效的问题,因为问题的关键在于锁的过期时间设置。

String lockKey = "myKey";
String clientId = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
try { 
    if (lock) { 
        // 获得锁,执行业务代码
    } 
}  finally { 
    // 释放锁
    String id = Optional.ofNullable(redisTemplate.opsForValue().get(lockKey))
            .map(String::valueOf)
            .orElse("");
    if (StringUtils.equals(clientId, id)) { 
        redisTemplate.delete(lockKey);
    } 
} 

配置“看门狗”线程

每隔 10s 自动更新 lock 为 30s,直到业务代码执行完成之后再停止这个自动续期线程

String lockKey = "myKey";
String clientId = UUID.randomUUID().toString();
int timeout = 30;
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId, timeout, TimeUnit.SECONDS);
Thread thread = null;
try { 
    if (lock) { 
        // 配置自动续期线程
        thread = new Thread(() -> { 
            while (Thread.currentThread().isInterrupted()) { 
                try { 
                   TimeUnit.SECONDS.sleep(timeout / 3);
                    redisTemplate.expire(lockKey, timeout, TimeUnit.SECONDS);
                }  catch (InterruptedException e) { 
                    e.printStackTrace();
                } 
            } 
        } );
        thread.start();
        // 执行业务代码
    } 
}  catch (InterruptedException e) { 
    e.printStackTrace();
}  finally { 
    // 停止自动续期线程
    thread.interrupt();
    // 释放锁
    String id = Optional.ofNullable(redisTemplate.opsForValue().get(lockKey))
            .map(String::valueOf)
            .orElse("");
    if (StringUtils.equals(clientId, id)) { 
        redisTemplate.delete(lockKey);
    } 
} 

这样可以实现基本的分布式锁功能,但代码太过冗余,引入了太多业务代码无关的内容,所以可以直接使用 redisson 来解决问题

Redisson 锁

引入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.0</version>
</dependency>

编写配置类

@Configuration
public class RedissonConfigure { 

    @Bean
    RedissonClient redissonClient() { 
        // 1、创建配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        // 集群模式
        // useClusterServers ( )
        //.addNodeAddress ("redis://127.0.0.1:7004", "redis://127.0.0.1:7001");
        // 密码设置
        //.setPassword ("abc123456");
        // 2、根据 Config 实例创建出 RedissonClient 对象
        return Redisson.create(config);
    } 
} 

使用

@Resource
private RedissonClient redissonClient;

@GetMapping("/test")
private void redisson() { 
    String key = "myKey";
    // 获取锁
    RLock lock = redissonClient.getLock(key);
    // 加锁
    lock.lock();
    try { 
        // 执行业务代码
        TimeUnit.SECONDS.sleep(5);
    }  catch (InterruptedException e) { 
        e.printStackTrace();
    }  finally { 
        // 释放锁
        lock.unlock();
    } 
} 

redisson 底层是使用了 Lua 脚本执行的命令

Redisson 源码分析

tryLock()

尝试获取锁的方法,若不配置参数,则立刻返回结果,不阻塞
如果锁可用,则获取锁,并立即返回值true。如果锁不可用,则此方法将立即返回值false
image.png
1、 在 Redisson 源码中,使用 tryLockAsync() 方法获取 Future 表达式,通过 get 方法获取对应的值
image.png
image.png
2、 接着往下查看 tryLockAsync() 方法,获取到了当前线程的 id
image.png
3、默认的不设置等待时间 waitTime** **和 leaseTime(租赁时间,即锁的有效时间)
image.png
4、接着到了 tryAcquireOnceAsync() 方法中,当不指定 leaseTime 时间时,则使用配置的 internalLockLeaseTime 时间,应该就是 30 秒啦
image.png
image.png
image.png
5、再接着 tryLockInnerAsync() 方法就是实际的获取和设置锁的方法了,这里使用到了 lua (可以确保原子性),而 redisson 是使用 redis 的 hash 数据结构进行存储的
image.png

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { 
        return evalWriteAsync(
            // 获取锁的名字,也就是一开始 redisson.getLock(lockKey) 使用的 lockKey
            getRawName(), 
            // 解码器实例
            LongCodec.INSTANCE, 
            // 不清楚,好像是用来指定返回类型的
            command,
            // Lua 脚本
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            // key 集合,在脚本中通过  KEYS[1] 获取,索引从 1 开始
            Collections.singletonList(getRawName()), 
            // 锁的有效时间,对应参数1,对应 ARGV[1]
            unit.toMillis(leaseTime), 
            // 线程 id,作为 hash 的 key,对应 ARGV[2]
            getLockName(threadId));
    } 

image.png
6、回到刚才调用 Lua 脚本的方法这里,tryAcquireOnceAsync() 方法传递了 RedisCommands.EVAL_NULL_BOOLEAN 作为 command
image.png
EVAL_NULL_BOOLEAN 不仅指定了返回类型,还指定了结果转换器 BooleanNullReplayConvertor
image.png
在 BooleanNullReplayConvertor 中,则判断 Lua 脚本返回的内容是否为 null,如果为 null 则表示获取锁成功
image.png

Lua 脚本

redisson 中使用到的 lua 脚本如下:

  1. 当 redis 中不存在对应锁的 key 时,则创建指定 key 的 hash,当前线程的 id 作为 hash 的 key,值为 1,默认过期时间为 30 秒
  2. 当 redis 存在当前的 key,并且 hash 的 key 为当前线程创建的,则会刷新锁的时间(这里应该时给看门狗用的),锁的持有者加一
  3. 当 redis 存在当前 key,并且 hash 的 key 并非当前线程,则返回锁的剩余时间
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
    return redis.call('pttl', KEYS[1]);

lock()

1、而在 lock() 方法的流程如下,当不指定参数的时候,同样也是使用默认的有效时间
image.png
2、接着使用 tryAcquire() 方法去获取锁
image.png
image.png
3、这里用到了 tryAcquireAsync() 方法获取锁,跟 tryLock() 方法调用的 tryAcquireOnceAsync() 方法很相似,似乎只是返回类型不同而已
image.png
4、在接着就是执行同样的 Lua 脚本啦
image.png
5、不过在 lock() 方法中,使用的时 RedisCommands.EVAL_LONG 这个 command,但会发现并没有指定 convertor,所以会使用默认的 EmptyConvertor,直接返回 Lua 脚本的内容(nil 或 剩余时间)
image.png
image.png
image.png
6、现在回到 lock() 方法中,如果获取不到锁(返回值不为 null),则会等待锁释放,这里用到了 redis 的订阅功能
image.png

看门狗策略

1、回到获取锁的方法中 tryAcquireOnceAsync(),当获取到锁之后,且未修改锁有效期,则会调用 scheduleExpirationRenewal() 方法开启看门狗线程
image.png
2、在 scheduleExpirationRenewal() 方法中,会判断是否存在看门狗线程了,不存在的话则会调用 renewExpiration() 方法
image.png
3、在 renewExpiration() 方法中,则创建了一个新的线程,每隔 key 有效时间 / 3,去调用 renewExpirationAsync() 方法更新锁的时间
image.png
4、接着就执行对应的脚本啦
image.png

unLock()

1、通过调用 unlockAsync() 方法释放锁
image.png
2、unlockAsync() 方法定义了释放锁的逻辑
image.png
3、unlockInnerAsync() 方法才调用了 Lua 脚本进行 Redis 操作
image.png

protected RFuture<Boolean> unlockInnerAsync(long threadId) { 
    return evalWriteAsync(
        // 获取锁名称
        getRawName(),
        // 获取解码实例
        LongCodec.INSTANCE, 
        // 设置结果转换器
        RedisCommands.EVAL_BOOLEAN,
        // 判断当前线程的锁是否存在,锁不存在返回 nil
        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
        "end; " +
        // 锁存在,则将锁持有者减一
        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
        "if (counter > 0) then " +
            // 还剩有锁则刷新锁时间
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
        "else " +
            // 完全释放锁,并往通道发送事件
            "redis.call('del', KEYS[1]); " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
        "end; " +
        "return nil;",
        // 对应 KEY[1]、KEY[2]
        Arrays.asList(getRawName(), getChannelName()), 
        // 对应 ARG[1]
        LockPubSub.UNLOCK_MESSAGE, 
        // 对应 ARG[2]
        internalLockLeaseTime, 
        // 对应 ARG[3]
        getLockName(threadId));
} 

锁的获取过程

redisson 获取锁的过程会存在阻塞,并利用到了 Redis 的订阅发布功能,监听指定通道,当有锁释放的时候,lock() 方法会接收到信号 Semaphore,并再次尝试获取锁
1、查看 redisson 的 lock() 方法,在执行 Lua 脚本之后,如果得到的结果不为 null(没得到锁),则会执行下列逻辑
image.png

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { 
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // 得到锁,返回
    if (ttl == null) { 
        return;
    } 

    // 订阅锁的通道消息,future 中包含 Semaphore 信号量
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    if (interruptibly) { 
        commandExecutor.syncSubscriptionInterrupted(future);
    }  else { 
        commandExecutor.syncSubscription(future);
    } 

    try { 
        while (true) { 
            // 再次尝试获取锁,成功则返回
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            if (ttl == null) { 
                break;
            } 

            // 根据锁的剩余时间,进行阻塞获取信号量
            if (ttl >= 0) { 
                try { 
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }  catch (InterruptedException e) { 
                    if (interruptibly) { 
                        throw e;
                    } 
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } 
            }  else { 
                // 锁不会自动过期,则一直阻塞等待
                if (interruptibly) { 
                    future.getNow().getLatch().acquire();
                }  else { 
                    future.getNow().getLatch().acquireUninterruptibly();
                } 
            } 
        } 
    }  finally { 
        // 获取锁成功则不再订阅
        unsubscribe(future, threadId);
    } 
} 

2、查看订阅方法 subscribe(),发现这里的 threadId 并没有作用
image.png
image.png
3、创建监听器,并进行监听,监听器的 value 中就包含等会返回的 newPromise
image.png
image.png
4、当监听器被触发的时候,就会调用到 LockPubSub 的 onMessage() 方法,对信号量 Semaphore 继续释放
image.png
image.png
5、释放了信号量之后,就可以在 lock() 方法中获取到资源往下走
image.png
6、而监听器最终会被添加到 RedisPubSubConnection 类的 listeners 队列中,当监听到事件的时候就会依次调用监听器
image.png

for update 行锁

For update是MySQL中用于实现行锁的一种语法,其主要作用是在SELECT查询语句中加上FOR UPDATE子句,以保证查询结果集中的每一行都被锁定,避免其他事务对这些行进行修改。

用法

创建一个数据表

CREATE TABLE `xxl_job_lock` (
  `lock_name` varchar(50) NOT NULL COMMENT '锁名称',
  PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

image.png
在 java 代码中的事务使用 for update 获取锁

Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
try { 
    conn = dataSource.getConnection();
    connAutoCommit = conn.getAutoCommit();
    conn.setAutoCommit(false);

    preparedStatement = conn.prepareStatement(  "select * from SCHEDULE_JOB_LOCK where lock_name = 'schedule_lock' for update" );
    preparedStatement.execute();

    // 执行业务代码

}  catch (Exception e) { 
    logger.error(e.getMessage(), e);
}  finally { 
    // 提交事务
    if (conn != null) { 
        try { 
            conn.commit();
        }  catch (SQLException e) { 
            if (!scheduleThreadToStop) { 
                logger.error(e.getMessage(), e);
            } 
        } 
        try { 
            conn.setAutoCommit(connAutoCommit);
        }  catch (SQLException e) { 
            if (!scheduleThreadToStop) { 
                logger.error(e.getMessage(), e);
            } 
        } 
        try { 
            conn.close();
        }  catch (SQLException e) { 
            if (!scheduleThreadToStop) { 
                logger.error(e.getMessage(), e);
            } 
        } 
    } 

    // close PreparedStatement
    if (null != preparedStatement) { 
        try { 
            preparedStatement.close();
        }  catch (SQLException e) { 
            if (!scheduleThreadToStop) { 
                logger.error(e.getMessage(), e);
            } 
        } 
    } 
} 

如何判断 for update 是行锁还是表锁

  • 是否使用主键或唯一索引:如果查询条件中使用了主键或唯一索引,并且该索引的值在表中确实存在,那么MySQL通常会使用行锁
  • 是否使用非唯一索引:如果查询条件中使用了非唯一索引,MySQL可能会使用行锁,但这也取决于具体的查询优化和表的结构。在某些情况下,如果非唯一索引不能有效地缩小结果集,MySQL可能会选择使用表锁。
  • 是否使用普通字段:MySQL通常会使用表锁,因为无法有效地确定需要锁定哪些行。