|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
1. 引言
在分布式系统中,由于多个节点同时访问共享资源,可能会导致数据不一致的问题。为了确保数据的一致性和完整性,我们需要引入分布式锁机制。Redis作为一个高性能的内存数据库,提供了多种实现分布式锁的方式,因其高性能、简单易用的特点,被广泛应用于分布式系统中。
Redis锁不仅可以有效控制并发访问,还能提高系统的可用性和稳定性。本文将详细解析Redis锁从获取到释放的全流程,帮助读者掌握分布式并发控制的关键技术,避免死锁问题,提升系统稳定性。
2. Redis锁基础概念
2.1 什么是Redis锁
Redis锁是利用Redis的特性实现的一种分布式锁机制。它通过在Redis中设置一个特殊的键值对来表示锁的状态,当多个客户端尝试获取同一个锁时,只有一个客户端能够成功获取锁,其他客户端需要等待或放弃。
2.2 为什么需要Redis锁
在分布式系统中,多个服务实例可能同时访问共享资源,如果没有适当的并发控制机制,可能会导致以下问题:
1. 数据不一致:多个节点同时修改同一份数据,可能导致数据错乱。
2. 资源竞争:多个节点同时竞争有限的资源,可能导致系统性能下降。
3. 重复操作:在某些场景下,如定时任务,多个节点可能同时执行相同的任务,导致重复处理。
Redis锁可以解决这些问题,确保在同一时间只有一个节点能够执行关键操作。
3. Redis锁的实现方式
3.1 基于SETNX的实现
SETNX(SET if Not eXists)是Redis提供的一个原子操作命令,它只在键不存在时设置键的值。这是实现Redis锁最基本的方式。
- // 获取锁
- public boolean tryLock(String key, String value, long expireTime) {
- return redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, TimeUnit.MILLISECONDS);
- }
- // 释放锁
- public void unlock(String key, String value) {
- String currentValue = redisTemplate.opsForValue().get(key);
- if (value.equals(currentValue)) {
- redisTemplate.delete(key);
- }
- }
复制代码
这种简单的实现存在几个问题:
1. 锁无法自动释放:如果获取锁的客户端崩溃,锁将无法释放,导致其他客户端无法获取锁。
2. 可能释放他人的锁:如果一个客户端获取锁后,由于操作时间过长导致锁过期,然后另一个客户端获取了锁,此时第一个客户端释放锁时可能会释放第二个客户端的锁。
3. 无法重入:同一个线程无法多次获取同一个锁。
为了解决上述问题,我们可以进行如下改进:
- // 获取锁
- public boolean tryLock(String key, String value, long expireTime) {
- // 使用SET命令同时设置NX和EX选项,确保原子性
- Boolean result = redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, TimeUnit.MILLISECONDS);
- return result != null && result;
- }
- // 释放锁,使用Lua脚本确保原子性
- public void unlock(String key, String value) {
- String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
- DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
- Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), value);
- // 可以根据result判断是否成功释放锁
- }
复制代码
3.2 基于RedLock算法的实现
RedLock是Redis官方提出的一种分布式锁算法,它通过在多个Redis实例上获取锁来提高锁的可靠性和安全性。
RedLock算法的基本思想是:在N个独立的Redis实例上获取锁,只有在大多数实例(N/2 + 1)上成功获取锁,并且获取锁的总时间小于锁的有效期时,才认为锁获取成功。
- public class RedLock {
- private final List<RedisTemplate<String, String>> redisTemplates;
- private final long lockValidityTime; // 锁的有效时间,单位毫秒
-
- public RedLock(List<RedisTemplate<String, String>> redisTemplates, long lockValidityTime) {
- this.redisTemplates = redisTemplates;
- this.lockValidityTime = lockValidityTime;
- }
-
- public boolean lock(String resourceId, String value) {
- int successCount = 0;
- long startTime = System.currentTimeMillis();
-
- // 尝试在所有Redis实例上获取锁
- for (RedisTemplate<String, String> template : redisTemplates) {
- Boolean result = template.opsForValue().setIfAbsent(resourceId, value, lockValidityTime, TimeUnit.MILLISECONDS);
- if (Boolean.TRUE.equals(result)) {
- successCount++;
- }
- }
-
- // 计算获取锁所花费的时间
- long elapsedTime = System.currentTimeMillis() - startTime;
-
- // 检查是否在大多数实例上获取了锁,并且获取锁的时间小于锁的有效期
- if (successCount >= (redisTemplates.size() / 2 + 1) && elapsedTime < lockValidityTime) {
- return true;
- }
-
- // 如果获取锁失败,释放已经获取的锁
- unlock(resourceId, value);
- return false;
- }
-
- public void unlock(String resourceId, String value) {
- String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
-
- for (RedisTemplate<String, String> template : redisTemplates) {
- DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
- template.execute(redisScript, Collections.singletonList(resourceId), value);
- }
- }
- }
复制代码
3.3 基于Redisson的实现
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式Java常用对象,还提供了许多分布式服务,其中包括分布式锁。
- // 创建Redisson客户端
- Config config = new Config();
- config.useSingleServer().setAddress("redis://127.0.0.1:6379");
- RedissonClient redisson = Redisson.create(config);
- // 获取锁对象
- RLock lock = redisson.getLock("myLock");
- try {
- // 尝试获取锁,最多等待100秒,锁自动释放时间为10秒
- boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
- if (res) {
- // 成功获取锁,执行业务逻辑
- doBusiness();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- // 释放锁
- lock.unlock();
- }
复制代码
1. 可重入:同一个线程可以多次获取同一个锁。
2. 自动续期:Redisson会自动为锁续期,确保业务逻辑执行完毕前锁不会过期。
3. 等待锁:可以设置获取锁的等待时间,在等待时间内不断尝试获取锁。
4. 锁超时:可以设置锁的自动释放时间,防止死锁。
4. Redis锁获取全流程
4.1 获取锁的基本流程
获取Redis锁的基本流程如下:
1. 客户端向Redis发送SETNX命令,尝试设置一个特定的键值对。
2. 如果设置成功(键之前不存在),则表示获取锁成功。
3. 如果设置失败(键已存在),则表示锁已被其他客户端持有,获取锁失败。
- public boolean acquireLock(String lockKey, String requestId, long expireTime) {
- // 使用SET命令同时设置NX和EX选项,确保原子性
- Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireTime, TimeUnit.MILLISECONDS);
- return result != null && result;
- }
复制代码
4.2 锁的超时设置
为了避免锁无法释放导致死锁,我们需要为锁设置一个合理的过期时间。过期时间的设置应该考虑以下因素:
1. 业务逻辑的执行时间:过期时间应该大于业务逻辑的正常执行时间。
2. 系统负载:在高负载情况下,业务逻辑的执行时间可能会延长,过期时间应该适当增加。
3. 锁的重要性:对于关键业务,可以适当延长过期时间,避免锁过早释放导致数据不一致。
- // 获取锁,并设置过期时间为30秒
- public boolean lockWithExpire(String lockKey, String requestId) {
- long expireTime = 30000; // 30秒
- return redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireTime, TimeUnit.MILLISECONDS);
- }
复制代码
4.3 锁的重入机制
锁的重入是指同一个线程可以多次获取同一个锁。实现锁的重入需要记录锁的持有者和重入次数。
1. 使用Redis的Hash结构存储锁信息,包括锁的持有者和重入次数。
2. 当同一个线程再次获取锁时,增加重入次数。
3. 当线程释放锁时,减少重入次数,只有当重入次数为0时才真正释放锁。
- // 获取可重入锁
- public boolean acquireReentrantLock(String lockKey, String requestId, long expireTime) {
- // 使用Lua脚本确保原子性
- String luaScript =
- "if redis.call('exists', KEYS[1]) == 0 then " +
- " redis.call('hset', KEYS[1], ARGV[1], 1) " +
- " redis.call('expire', KEYS[1], ARGV[2]) " +
- " return 1 " +
- "elseif redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " +
- " redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
- " redis.call('expire', KEYS[1], ARGV[2]) " +
- " return 1 " +
- "else " +
- " return 0 " +
- "end";
-
- DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
- Long result = redisTemplate.execute(redisScript,
- Collections.singletonList(lockKey),
- requestId,
- String.valueOf(expireTime / 1000));
-
- return result != null && result == 1;
- }
- // 释放可重入锁
- public boolean releaseReentrantLock(String lockKey, String requestId) {
- String luaScript =
- "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " +
- " return 0 " +
- "end " +
- "local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1) " +
- "if counter > 0 then " +
- " redis.call('expire', KEYS[1], ARGV[2]) " +
- " return 1 " +
- "else " +
- " redis.call('del', KEYS[1]) " +
- " return 1 " +
- "end";
-
- DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
- Long result = redisTemplate.execute(redisScript,
- Collections.singletonList(lockKey),
- requestId,
- String.valueOf(30)); // 30秒过期时间
-
- return result != null && result == 1;
- }
复制代码
4.4 等待锁的策略
当获取锁失败时,客户端可以采取以下策略:
1. 立即失败:获取锁失败后立即返回,不进行等待。
2. 阻塞等待:在一段时间内不断尝试获取锁,直到成功或超时。
3. 异步回调:获取锁失败后注册一个回调,当锁可用时自动调用回调函数。
- // 带等待时间的锁获取
- public boolean lockWithWait(String lockKey, String requestId, long expireTime, long waitTime) throws InterruptedException {
- long startTime = System.currentTimeMillis();
- long remainingTime = waitTime;
-
- while (true) {
- // 尝试获取锁
- if (acquireLock(lockKey, requestId, expireTime)) {
- return true;
- }
-
- // 计算剩余等待时间
- remainingTime = waitTime - (System.currentTimeMillis() - startTime);
- if (remainingTime <= 0) {
- return false;
- }
-
- // 短暂休眠后重试
- Thread.sleep(Math.min(100, remainingTime));
- }
- }
复制代码
5. Redis锁释放全流程
5.1 释放锁的基本流程
释放Redis锁的基本流程如下:
1. 客户端检查自己是否是锁的持有者。
2. 如果是锁的持有者,则删除锁的键值对。
3. 如果不是锁的持有者,则不执行任何操作。
- // 释放锁
- public boolean releaseLock(String lockKey, String requestId) {
- // 使用Lua脚本确保原子性
- String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
- DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
- Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId);
- return result != null && result == 1;
- }
复制代码
5.2 确保锁的安全性(避免释放他人的锁)
为了避免释放他人的锁,我们需要在释放锁时验证锁的持有者。这可以通过以下方式实现:
1. 在获取锁时,设置一个唯一的值(如UUID)作为锁的值。
2. 在释放锁时,先检查锁的值是否与自己的唯一值匹配,只有匹配时才释放锁。
- // 生成唯一请求ID
- private String generateRequestId() {
- return UUID.randomUUID().toString();
- }
- // 安全释放锁
- public boolean safeReleaseLock(String lockKey, String requestId) {
- // 使用Lua脚本确保原子性
- String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
- DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
- Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId);
- return result != null && result == 1;
- }
复制代码
5.3 锁续期机制
在业务逻辑执行时间可能超过锁的过期时间的情况下,我们需要实现锁的续期机制,确保业务逻辑执行完毕前锁不会过期。
1. 在获取锁后启动一个后台线程。
2. 后台线程定期检查锁是否仍然存在,以及是否需要续期。
3. 如果需要续期,则延长锁的过期时间。
- public class LockRenewalTask {
- private final RedisTemplate<String, String> redisTemplate;
- private final String lockKey;
- private final String requestId;
- private final long expireTime;
- private final long renewalInterval;
- private ScheduledExecutorService scheduler;
- private ScheduledFuture<?> renewalTask;
-
- public LockRenewalTask(RedisTemplate<String, String> redisTemplate, String lockKey, String requestId, long expireTime, long renewalInterval) {
- this.redisTemplate = redisTemplate;
- this.lockKey = lockKey;
- this.requestId = requestId;
- this.expireTime = expireTime;
- this.renewalInterval = renewalInterval;
- this.scheduler = Executors.newSingleThreadScheduledExecutor();
- }
-
- public void start() {
- // 启动续期任务
- renewalTask = scheduler.scheduleAtFixedRate(() -> {
- // 检查并续期
- String luaScript =
- "if redis.call('get', KEYS[1]) == ARGV[1] then " +
- " redis.call('expire', KEYS[1], ARGV[2]) " +
- " return 1 " +
- "else " +
- " return 0 " +
- "end";
-
- DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
- Long result = redisTemplate.execute(redisScript,
- Collections.singletonList(lockKey),
- requestId,
- String.valueOf(expireTime / 1000));
-
- // 如果续期失败(锁不存在或不属于当前客户端),停止续期任务
- if (result == null || result != 1) {
- stop();
- }
- }, renewalInterval / 2, renewalInterval, TimeUnit.MILLISECONDS);
- }
-
- public void stop() {
- if (renewalTask != null) {
- renewalTask.cancel(true);
- scheduler.shutdown();
- }
- }
- }
- // 使用锁续期
- public boolean lockWithRenewal(String lockKey, long expireTime, long waitTime, long businessLogicTimeout) {
- String requestId = generateRequestId();
- LockRenewalTask renewalTask = null;
-
- try {
- // 尝试获取锁
- if (!lockWithWait(lockKey, requestId, expireTime, waitTime)) {
- return false;
- }
-
- // 启动锁续期任务
- renewalTask = new LockRenewalTask(redisTemplate, lockKey, requestId, expireTime, expireTime / 3);
- renewalTask.start();
-
- // 执行业务逻辑,设置超时
- ExecutorService executor = Executors.newSingleThreadExecutor();
- Future<?> future = executor.submit(() -> {
- doBusiness();
- return null;
- });
-
- try {
- future.get(businessLogicTimeout, TimeUnit.MILLISECONDS);
- return true;
- } catch (TimeoutException e) {
- future.cancel(true);
- return false;
- } finally {
- executor.shutdown();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- } finally {
- // 停止锁续期任务
- if (renewalTask != null) {
- renewalTask.stop();
- }
-
- // 释放锁
- safeReleaseLock(lockKey, requestId);
- }
- }
复制代码
6. 避免死锁的策略
6.1 设置合理的过期时间
设置合理的过期时间是避免死锁的基本策略。过期时间应该根据业务逻辑的执行时间来设定,一般应该大于业务逻辑的正常执行时间,但也不宜过长,以免影响系统的可用性。
1. 业务逻辑的执行时间:通过测试和监控,了解业务逻辑的正常执行时间。
2. 系统负载:在高负载情况下,业务逻辑的执行时间可能会延长。
3. 网络延迟:考虑网络延迟对业务逻辑执行时间的影响。
4. 锁的重要性:对于关键业务,可以适当延长过期时间。
- // 动态计算过期时间
- public long calculateExpireTime(long normalExecutionTime, double safetyFactor) {
- // 安全系数,通常在1.5到3之间
- return (long) (normalExecutionTime * safetyFactor);
- }
- // 使用动态过期时间获取锁
- public boolean lockWithDynamicExpire(String lockKey, String requestId, long normalExecutionTime) {
- double safetyFactor = 2.0; // 安全系数为2
- long expireTime = calculateExpireTime(normalExecutionTime, safetyFactor);
- return redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireTime, TimeUnit.MILLISECONDS);
- }
复制代码
6.2 实现锁续期
锁续期是避免死锁的重要策略,特别是在业务逻辑执行时间不确定或可能超过锁的初始过期时间的情况下。
1. 后台线程续期:在获取锁后启动一个后台线程,定期为锁续期。
2. 定时任务续期:使用定时任务定期检查并续期。
3. 业务逻辑中续期:在业务逻辑执行过程中,定期检查并续期。
- // 使用Redisson的自动续期功能
- public void businessWithAutoRenewal() {
- Config config = new Config();
- config.useSingleServer().setAddress("redis://127.0.0.1:6379");
- RedissonClient redisson = Redisson.create(config);
-
- RLock lock = redisson.getLock("myLock");
-
- try {
- // 获取锁,并设置锁自动释放时间为30秒
- lock.lock(30, TimeUnit.SECONDS);
-
- // 执行业务逻辑,Redisson会自动为锁续期
- doBusiness();
- } finally {
- // 释放锁
- lock.unlock();
- redisson.shutdown();
- }
- }
复制代码
6.3 死锁检测与恢复
尽管我们可以通过设置过期时间和锁续期来减少死锁的可能性,但在某些情况下,死锁仍然可能发生。因此,我们需要实现死锁检测与恢复机制。
1. 超时检测:为锁设置一个超时时间,如果在超时时间内锁未被释放,则认为可能发生死锁。
2. 等待图检测:维护一个锁等待图,定期检查是否存在环路,如果存在环路,则认为发生死锁。
3. 资源监控:监控资源的使用情况,如果某个资源长时间被占用,则认为可能发生死锁。
1. 强制释放锁:当检测到死锁时,强制释放某些锁,打破死锁状态。
2. 回滚操作:回滚导致死锁的操作,恢复系统到一致状态。
3. 重启服务:在极端情况下,重启相关服务,清除所有锁状态。
- // 死锁检测与恢复
- public class DeadlockDetector {
- private final RedisTemplate<String, String> redisTemplate;
- private final long deadlockThreshold; // 死锁阈值,单位毫秒
- private ScheduledExecutorService scheduler;
-
- public DeadlockDetector(RedisTemplate<String, String> redisTemplate, long deadlockThreshold) {
- this.redisTemplate = redisTemplate;
- this.deadlockThreshold = deadlockThreshold;
- this.scheduler = Executors.newSingleThreadScheduledExecutor();
- }
-
- public void start() {
- // 定期检测死锁
- scheduler.scheduleAtFixedRate(this::detectAndRecoverDeadlocks,
- deadlockThreshold,
- deadlockThreshold,
- TimeUnit.MILLISECONDS);
- }
-
- private void detectAndRecoverDeadlocks() {
- // 获取所有锁的键
- Set<String> lockKeys = redisTemplate.keys("*lock*");
-
- for (String lockKey : lockKeys) {
- // 获取锁的剩余过期时间
- Long ttl = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
-
- // 如果锁没有设置过期时间或过期时间过长,可能发生死锁
- if (ttl == null || ttl > deadlockThreshold * 2) {
- // 记录死锁日志
- log.warn("Potential deadlock detected for lock: {}", lockKey);
-
- // 获取锁的值(持有者标识)
- String holder = redisTemplate.opsForValue().get(lockKey);
-
- // 尝试恢复死锁
- recoverFromDeadlock(lockKey, holder);
- }
- }
- }
-
- private void recoverFromDeadlock(String lockKey, String holder) {
- // 根据持有者标识,尝试通知持有者释放锁
- notifyLockHolder(holder, lockKey);
-
- // 如果通知失败,强制释放锁
- forceReleaseLock(lockKey, holder);
- }
-
- private void notifyLockHolder(String holder, String lockKey) {
- // 实现通知持有者的逻辑,例如通过消息队列或RPC调用
- // 这里只是一个示例,实际实现可能更复杂
- log.info("Notifying lock holder {} to release lock {}", holder, lockKey);
- }
-
- private void forceReleaseLock(String lockKey, String holder) {
- // 强制释放锁
- redisTemplate.delete(lockKey);
- log.warn("Force released lock {} held by {}", lockKey, holder);
- }
-
- public void stop() {
- scheduler.shutdown();
- }
- }
复制代码
7. Redis锁的最佳实践
7.1 锁的粒度控制
锁的粒度是指锁定的资源范围。锁的粒度越细,并发性越高,但实现起来也越复杂;锁的粒度越粗,并发性越低,但实现起来简单。
1. 根据业务需求选择:根据业务逻辑的特点,选择合适的锁粒度。
2. 考虑并发性能:在高并发场景下,尽量选择细粒度的锁。
3. 权衡实现复杂度:在满足性能需求的前提下,尽量选择简单的实现方式。
- // 粗粒度锁:锁定整个资源
- public boolean coarseGrainedLock(String resourceId, String requestId, long expireTime) {
- String lockKey = "resource:lock:" + resourceId;
- return redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireTime, TimeUnit.MILLISECONDS);
- }
- // 细粒度锁:锁定资源的特定部分
- public boolean fineGrainedLock(String resourceId, String partId, String requestId, long expireTime) {
- String lockKey = "resource:lock:" + resourceId + ":" + partId;
- return redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireTime, TimeUnit.MILLISECONDS);
- }
- // 使用细粒度锁的业务逻辑
- public void updateResourcePart(String resourceId, String partId, Object newValue) {
- String requestId = generateRequestId();
-
- try {
- // 获取细粒度锁
- if (!fineGrainedLock(resourceId, partId, requestId, 30000)) {
- throw new RuntimeException("Failed to acquire lock");
- }
-
- // 执行业务逻辑
- doUpdateResourcePart(resourceId, partId, newValue);
- } finally {
- // 释放细粒度锁
- safeReleaseLock("resource:lock:" + resourceId + ":" + partId, requestId);
- }
- }
复制代码
7.2 性能优化
在高并发场景下,Redis锁的性能可能成为系统的瓶颈。以下是一些优化Redis锁性能的策略:
1. 减少锁的持有时间:尽量在获取锁后快速执行关键操作,然后立即释放锁。
2. 使用本地缓存:对于频繁访问但不经常修改的数据,可以使用本地缓存减少对Redis的访问。
3. 批量操作:对于需要获取多个锁的操作,可以使用批量操作减少网络开销。
4. 使用连接池:使用Redis连接池减少连接创建和销毁的开销。
- // 减少锁的持有时间
- public void updateWithMinimalLock(String resourceId, Object newValue) {
- String requestId = generateRequestId();
- String lockKey = "resource:lock:" + resourceId;
-
- try {
- // 获取锁
- if (!acquireLock(lockKey, requestId, 30000)) {
- throw new RuntimeException("Failed to acquire lock");
- }
-
- // 尽快执行关键操作
- Object oldValue = getResource(resourceId);
- Object mergedValue = mergeValues(oldValue, newValue);
-
- // 尽快释放锁
- safeReleaseLock(lockKey, requestId);
-
- // 在锁外执行耗时操作
- doTimeConsumingUpdate(mergedValue);
- } catch (Exception e) {
- // 确保锁被释放
- safeReleaseLock(lockKey, requestId);
- throw e;
- }
- }
- // 使用本地缓存减少Redis访问
- public class ResourceCache {
- private final RedisTemplate<String, Object> redisTemplate;
- private final Cache<String, Object> localCache;
-
- public ResourceCache(RedisTemplate<String, Object> redisTemplate) {
- this.redisTemplate = redisTemplate;
- this.localCache = Caffeine.newBuilder()
- .expireAfterWrite(10, TimeUnit.SECONDS)
- .maximumSize(1000)
- .build();
- }
-
- public Object getResource(String resourceId) {
- // 首先尝试从本地缓存获取
- Object value = localCache.getIfPresent(resourceId);
- if (value != null) {
- return value;
- }
-
- // 本地缓存中没有,从Redis获取
- value = redisTemplate.opsForValue().get(resourceId);
- if (value != null) {
- localCache.put(resourceId, value);
- }
-
- return value;
- }
-
- public void updateResource(String resourceId, Object newValue, String requestId) {
- String lockKey = "resource:lock:" + resourceId;
-
- try {
- // 获取锁
- if (!acquireLock(lockKey, requestId, 30000)) {
- throw new RuntimeException("Failed to acquire lock");
- }
-
- // 更新Redis中的值
- redisTemplate.opsForValue().set(resourceId, newValue);
-
- // 更新本地缓存
- localCache.put(resourceId, newValue);
- } finally {
- // 释放锁
- safeReleaseLock(lockKey, requestId);
- }
- }
- }
复制代码
7.3 异常处理
在使用Redis锁时,需要注意各种异常情况,并进行适当的处理,以确保系统的稳定性和可靠性。
1. Redis连接异常:当Redis连接失败时,应该有备用方案,如降级处理或重试。
2. 获取锁失败:当获取锁失败时,应该有重试机制或降级策略。
3. 锁释放失败:当锁释放失败时,应该有监控和报警机制,及时发现和处理。
4. 业务逻辑异常:当业务逻辑执行失败时,应该确保锁被正确释放,避免死锁。
- // 带异常处理的锁操作
- public <T> T executeWithLock(String lockKey, long expireTime, long waitTime, Supplier<T> businessLogic) {
- String requestId = generateRequestId();
-
- try {
- // 尝试获取锁,带有重试机制
- boolean locked = false;
- long startTime = System.currentTimeMillis();
- long remainingTime = waitTime;
-
- while (!locked && remainingTime > 0) {
- try {
- locked = acquireLock(lockKey, requestId, expireTime);
- } catch (RedisConnectionFailureException e) {
- // Redis连接异常,记录日志并尝试重新连接
- log.error("Redis connection failed while acquiring lock", e);
- try {
- TimeUnit.MILLISECONDS.sleep(100);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted while waiting for Redis connection", ie);
- }
- }
-
- if (!locked) {
- try {
- // 短暂休眠后重试
- TimeUnit.MILLISECONDS.sleep(Math.min(100, remainingTime));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted while waiting for lock", e);
- }
-
- remainingTime = waitTime - (System.currentTimeMillis() - startTime);
- }
- }
-
- if (!locked) {
- // 获取锁失败,执行降级逻辑
- log.warn("Failed to acquire lock {} after {} ms", lockKey, waitTime);
- return executeFallback(businessLogic);
- }
-
- // 成功获取锁,执行业务逻辑
- try {
- return businessLogic.get();
- } catch (Exception e) {
- // 业务逻辑执行失败,记录日志
- log.error("Business logic execution failed", e);
- throw e;
- }
- } finally {
- // 确保锁被释放
- try {
- safeReleaseLock(lockKey, requestId);
- } catch (Exception e) {
- // 锁释放失败,记录日志
- log.error("Failed to release lock {}", lockKey, e);
- }
- }
- }
- // 降级逻辑
- private <T> T executeFallback(Supplier<T> businessLogic) {
- // 实现降级逻辑,例如返回默认值、从缓存读取数据等
- log.info("Executing fallback logic");
- try {
- return businessLogic.get();
- } catch (Exception e) {
- log.error("Fallback logic execution failed", e);
- throw new RuntimeException("Both primary and fallback logic failed", e);
- }
- }
复制代码
8. 实际应用场景与案例分析
8.1 库存扣减场景
在电商系统中,库存扣减是一个典型的需要使用分布式锁的场景。多个用户可能同时购买同一商品,如果不使用锁,可能会导致超卖。
1. 并发问题:多个请求同时读取库存,发现库存充足,然后同时扣减库存,导致超卖。
2. 数据一致性:库存数据需要保持一致性,不能出现负数。
3. 性能要求:在高并发场景下,需要保证系统的响应速度。
使用Redis锁来控制库存扣减的并发访问,确保同一时间只有一个请求能够扣减库存。
- // 库存服务
- public class InventoryService {
- private final RedisTemplate<String, String> redisTemplate;
-
- public InventoryService(RedisTemplate<String, String> redisTemplate) {
- this.redisTemplate = redisTemplate;
- }
-
- // 扣减库存
- public boolean deductInventory(String productId, int quantity) {
- String lockKey = "inventory:lock:" + productId;
- String requestId = UUID.randomUUID().toString();
-
- try {
- // 获取锁,最多等待5秒,锁自动释放时间为10秒
- boolean locked = lockWithWait(lockKey, requestId, 10000, 5000);
- if (!locked) {
- log.warn("Failed to acquire lock for product {}", productId);
- return false;
- }
-
- // 获取当前库存
- String inventoryKey = "inventory:" + productId;
- String inventoryStr = redisTemplate.opsForValue().get(inventoryKey);
- if (inventoryStr == null) {
- log.error("Inventory not found for product {}", productId);
- return false;
- }
-
- int inventory = Integer.parseInt(inventoryStr);
- if (inventory < quantity) {
- log.warn("Insufficient inventory for product {}. Required: {}, Available: {}",
- productId, quantity, inventory);
- return false;
- }
-
- // 扣减库存
- int newInventory = inventory - quantity;
- redisTemplate.opsForValue().set(inventoryKey, String.valueOf(newInventory));
-
- // 记录库存变更日志
- logInventoryChange(productId, inventory, newInventory);
-
- return true;
- } finally {
- // 释放锁
- safeReleaseLock(lockKey, requestId);
- }
- }
-
- private void logInventoryChange(String productId, int oldInventory, int newInventory) {
- // 实现库存变更日志记录逻辑
- log.info("Inventory changed for product {}. Old: {}, New: {}",
- productId, oldInventory, newInventory);
- }
- }
复制代码
8.2 定时任务场景
在分布式系统中,定时任务可能会在多个节点上同时执行,导致重复处理。使用Redis锁可以确保同一时间只有一个节点执行定时任务。
1. 重复执行:多个节点同时执行同一个定时任务,可能导致数据重复处理或资源浪费。
2. 任务协调:需要一种机制来协调多个节点上的任务执行。
3. 故障恢复:当执行任务的节点故障时,需要有机制让其他节点接管任务。
使用Redis锁来控制定时任务的执行,确保同一时间只有一个节点能够执行任务。
- // 定时任务执行器
- public class ScheduledTaskExecutor {
- private final RedisTemplate<String, String> redisTemplate;
- private final String taskId;
- private final Runnable task;
- private final long lockExpireTime;
- private final long taskInterval;
-
- public ScheduledTaskExecutor(RedisTemplate<String, String> redisTemplate,
- String taskId,
- Runnable task,
- long lockExpireTime,
- long taskInterval) {
- this.redisTemplate = redisTemplate;
- this.taskId = taskId;
- this.task = task;
- this.lockExpireTime = lockExpireTime;
- this.taskInterval = taskInterval;
- }
-
- public void start() {
- ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
- scheduler.scheduleAtFixedRate(this::executeWithLock,
- 0,
- taskInterval,
- TimeUnit.MILLISECONDS);
- }
-
- private void executeWithLock() {
- String lockKey = "task:lock:" + taskId;
- String requestId = UUID.randomUUID().toString();
-
- try {
- // 尝试获取锁
- boolean locked = acquireLock(lockKey, requestId, lockExpireTime);
- if (!locked) {
- log.debug("Failed to acquire lock for task {}", taskId);
- return;
- }
-
- log.info("Acquired lock for task {}, starting execution", taskId);
-
- // 执行任务
- long startTime = System.currentTimeMillis();
- try {
- task.run();
- long executionTime = System.currentTimeMillis() - startTime;
- log.info("Task {} completed in {} ms", taskId, executionTime);
- } catch (Exception e) {
- log.error("Task {} execution failed", taskId, e);
- }
- } finally {
- // 释放锁
- safeReleaseLock(lockKey, requestId);
- }
- }
- }
- // 使用定时任务执行器
- public class Application {
- public static void main(String[] args) {
- RedisTemplate<String, String> redisTemplate = createRedisTemplate();
-
- // 创建定时任务
- Runnable task = () -> {
- // 执行定时任务逻辑
- System.out.println("Executing scheduled task at " + new Date());
- doScheduledWork();
- };
-
- // 创建定时任务执行器
- ScheduledTaskExecutor executor = new ScheduledTaskExecutor(
- redisTemplate,
- "sampleTask",
- task,
- 30000, // 锁过期时间30秒
- 60000 // 任务执行间隔60秒
- );
-
- // 启动定时任务
- executor.start();
- }
-
- private static void doScheduledWork() {
- // 实现定时任务的具体逻辑
- }
-
- private static RedisTemplate<String, String> createRedisTemplate() {
- // 创建并配置RedisTemplate
- // ...
- return new RedisTemplate<>();
- }
- }
复制代码
8.3 分布式事务场景
在分布式系统中,跨多个服务或数据源的操作需要保证事务的一致性。Redis锁可以用于实现分布式事务,确保多个操作的原子性。
1. 跨服务操作:一个业务操作可能涉及多个服务,需要保证所有服务要么都成功,要么都失败。
2. 数据一致性:需要保证跨多个数据源的数据一致性。
3. 并发控制:需要控制对共享资源的并发访问,避免数据冲突。
使用Redis锁来实现分布式事务,确保多个操作的原子性。
- // 分布式事务管理器
- public class DistributedTransactionManager {
- private final RedisTemplate<String, String> redisTemplate;
- private final List<TransactionParticipant> participants;
-
- public DistributedTransactionManager(RedisTemplate<String, String> redisTemplate,
- List<TransactionParticipant> participants) {
- this.redisTemplate = redisTemplate;
- this.participants = participants;
- }
-
- // 执行分布式事务
- public boolean executeTransaction(String transactionId) {
- String lockKey = "transaction:lock:" + transactionId;
- String requestId = UUID.randomUUID().toString();
-
- try {
- // 获取全局事务锁
- boolean locked = lockWithWait(lockKey, requestId, 30000, 5000);
- if (!locked) {
- log.error("Failed to acquire transaction lock for {}", transactionId);
- return false;
- }
-
- log.info("Acquired transaction lock for {}", transactionId);
-
- // 准备阶段:预执行所有参与者的操作
- List<TransactionParticipant> preparedParticipants = new ArrayList<>();
- boolean prepareSuccess = true;
-
- for (TransactionParticipant participant : participants) {
- try {
- if (participant.prepare(transactionId)) {
- preparedParticipants.add(participant);
- } else {
- log.error("Participant {} prepare failed for transaction {}",
- participant.getId(), transactionId);
- prepareSuccess = false;
- break;
- }
- } catch (Exception e) {
- log.error("Participant {} prepare exception for transaction {}",
- participant.getId(), transactionId, e);
- prepareSuccess = false;
- break;
- }
- }
-
- // 如果准备阶段失败,回滚所有已准备的参与者
- if (!prepareSuccess) {
- log.warn("Transaction {} prepare failed, rolling back", transactionId);
- for (TransactionParticipant participant : preparedParticipants) {
- try {
- participant.rollback(transactionId);
- } catch (Exception e) {
- log.error("Participant {} rollback exception for transaction {}",
- participant.getId(), transactionId, e);
- }
- }
- return false;
- }
-
- // 提交阶段:提交所有参与者的操作
- boolean commitSuccess = true;
- for (TransactionParticipant participant : preparedParticipants) {
- try {
- if (!participant.commit(transactionId)) {
- log.error("Participant {} commit failed for transaction {}",
- participant.getId(), transactionId);
- commitSuccess = false;
- }
- } catch (Exception e) {
- log.error("Participant {} commit exception for transaction {}",
- participant.getId(), transactionId, e);
- commitSuccess = false;
- }
- }
-
- // 如果提交阶段失败,记录日志,可能需要人工干预
- if (!commitSuccess) {
- log.error("Transaction {} commit failed, manual intervention may be required",
- transactionId);
- return false;
- }
-
- log.info("Transaction {} completed successfully", transactionId);
- return true;
- } finally {
- // 释放全局事务锁
- safeReleaseLock(lockKey, requestId);
- }
- }
- }
- // 事务参与者接口
- public interface TransactionParticipant {
- String getId();
-
- // 预执行阶段
- boolean prepare(String transactionId);
-
- // 提交阶段
- boolean commit(String transactionId);
-
- // 回滚阶段
- boolean rollback(String transactionId);
- }
- // 示例:订单服务作为事务参与者
- public class OrderService implements TransactionParticipant {
- private final OrderRepository orderRepository;
-
- public OrderService(OrderRepository orderRepository) {
- this.orderRepository = orderRepository;
- }
-
- @Override
- public String getId() {
- return "orderService";
- }
-
- @Override
- public boolean prepare(String transactionId) {
- // 预创建订单,状态为PREPARED
- Order order = createOrder(transactionId);
- order.setStatus(OrderStatus.PREPARED);
- return orderRepository.save(order);
- }
-
- @Override
- public boolean commit(String transactionId) {
- // 更新订单状态为COMMITTED
- Order order = orderRepository.findByTransactionId(transactionId);
- if (order != null) {
- order.setStatus(OrderStatus.COMMITTED);
- return orderRepository.save(order);
- }
- return false;
- }
-
- @Override
- public boolean rollback(String transactionId) {
- // 更新订单状态为ROLLED_BACK
- Order order = orderRepository.findByTransactionId(transactionId);
- if (order != null) {
- order.setStatus(OrderStatus.ROLLED_BACK);
- return orderRepository.save(order);
- }
- return false;
- }
-
- private Order createOrder(String transactionId) {
- // 创建订单的逻辑
- return new Order();
- }
- }
复制代码
9. 总结与展望
9.1 总结
本文详细解析了Redis锁从获取到释放的全流程,介绍了分布式并发控制的关键技术,包括:
1. Redis锁的实现方式:包括基于SETNX的实现、基于RedLock算法的实现和基于Redisson的实现。
2. Redis锁获取全流程:包括获取锁的基本流程、锁的超时设置、锁的重入机制和等待锁的策略。
3. Redis锁释放全流程:包括释放锁的基本流程、确保锁的安全性和锁续期机制。
4. 避免死锁的策略:包括设置合理的过期时间、实现锁续期和死锁检测与恢复。
5. Redis锁的最佳实践:包括锁的粒度控制、性能优化和异常处理。
6. 实际应用场景与案例分析:包括库存扣减场景、定时任务场景和分布式事务场景。
通过掌握这些技术和策略,我们可以有效地使用Redis锁来控制分布式系统中的并发访问,避免死锁问题,提高系统的稳定性和可靠性。
9.2 展望
随着分布式系统的不断发展,Redis锁技术也在不断演进。未来,我们可以期待以下发展方向:
1. 更高效的锁算法:开发更高效的锁算法,减少锁的获取和释放开销。
2. 自适应锁策略:根据系统负载和业务特点,自动调整锁的策略和参数。
3. 更完善的死锁检测与恢复机制:开发更智能的死锁检测与恢复机制,提高系统的自愈能力。
4. 更好的可观测性:提供更全面的锁监控和诊断工具,帮助开发者快速定位和解决问题。
5. 与其他分布式技术的集成:与分布式事务、分布式缓存等技术更好地集成,提供更完整的分布式解决方案。
总之,Redis锁作为分布式系统中的重要组件,将在未来的分布式系统中继续发挥重要作用,为系统的稳定性和可靠性提供有力保障。 |
|