ReentrantLock(可重入锁),指的是一个线程再次对已持有的锁保护的临界资源时,重入请求将会成功。
简单的与我们常用的 Synchronized 进行比较:
ReentrantLock | Synchronized | |
锁实现机制 | 依赖 AQS | 监视器模式 |
灵活性 | 支持响应超时、中断、尝试获取锁 | 不灵活 |
释放形式 | 必须显示调用 unlock () 释放锁 | 自动释放监视器 |
锁类型 | 公平锁 & 非公平锁 | 非公平锁 |
条件队列 | 可关联多个条件队列 | 关联一个条件队列 |
可重入性 | 可重入 | 可重入 |
AQS 机制:如果被请求的共享资源空闲,那么就当前请求资源的线程设置为有效的工作线程,将共享资源通过 CAScompareAndSetState设置为锁定状态;如果共享资源被占用,就采用一定的阻塞等待唤醒机制(CLH 变体的 FIFO 双端队列)来保证锁分配。
可重入性:无论是公平锁还是非公平锁的情况,加锁过程会利用一个 state 值
private volatile int state
public class LockExample { static int count = 0; static ReentrantLock lock = new ReentrantLock(); public static void main(String[] args) throws InterruptedException { Runnable runnable = new Runnable() { @Override public void run() { try { // 加锁 lock.lock(); for (int i = 0; i < 10000; i++) { count++; } } catch (Exception e) { e.printStackTrace(); } finally { // 解锁,放在finally子句中,保证锁的释放 lock.unlock(); } } }; Thread thread1 = new Thread(runnable); Thread thread2 = new Thread(runnable); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("count: " + count); }}/** * 输出 * count: 20000 */
乐观锁即是无锁思想,一般都是基于 CAS 思想实现的,而在 MySQL 中通过 version 版本号 + CAS 无锁形式实现乐观锁;例如 T1,T2 两个事务一起并发执行时,当 T2 事务执行成功提交后,会对 version+1,所以 T1 事务执行的 version 条件就无法成立了。
对 sql 语句进行加锁以及状态机的操作,也可以避免不同线程同时对 count 值访问导致的数据不一致问题。
// 乐观锁 + 状态机update table_nameset version = version + 1, count = count + 1where id = id AND version = version AND count = [修改前的count值];// 行锁 + 状态机 update table_nameset count = count + 1where id = id AND count = [修改前的count值]for update;
如果我们直接采用 ReentrantLock 全局加锁,那么这种情况是一条线程获取到锁,整个程序全部的线程来到这里都会阻塞;但是我们在项目里面想要针对每个用户在操作的时候实现互斥逻辑,所以我们需要更加细粒度的锁。
public class LockExample { private static Map<String, Lock> lockMap = new ConcurrentHashMap<>(); public static void lock(String userId) { // Map中添加细粒度的锁资源 lockMap.putIfAbsent(userId, new ReentrantLock()); // 从容器中拿锁并实现加锁 lockMap.get(userId).lock(); } public static void unlock(String userId) { // 先从容器中拿锁,确保锁的存在 Lock locak = lockMap.get(userId); // 释放锁 lock.unlock(); }}
弊端:如果每一个用户请求共享资源,就会加锁一次,后续该用户就没有在登录过平台,但是锁对象会一直存在于内存中,这等价于发生了内存泄漏,所以锁的超时和淘汰机制机制需要实现。
上面的加锁机制使用到了锁容器ConcurrentHashMap,该容易为了线程安全的情况,多以底层还是会用到Synchronized机制,所以有些情况,使用 lockMap 需要加上两层锁。
那么我们是不是可以直接使用Synchronized来实现细粒度的锁机制
public class LockExample { public static void syncFunc1(Long accountId) { String lock = new String(accountId + "").intern(); synchronized (lock) { System.out.println(Thread.currentThread().getName() + "拿到锁了"); // 模拟业务耗时 try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "释放锁了"); } } public static void syncFunc2(Long accountId) { String lock = new String(accountId + "").intern(); synchronized (lock) { System.out.println(Thread.currentThread().getName() + "拿到锁了"); // 模拟业务耗时 try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + "释放锁了"); } } // 使用 Synchronized 来实现更加细粒度的锁 public static void main(String[] args) { new Thread(()-> syncFunc1(123456L), "Thread-1").start(); new Thread(()-> syncFunc2(123456L), "Thread-2").start(); }}/** * 打印 * Thread-1拿到锁了 * Thread-1释放锁了 * Thread-2拿到锁了 * Thread-2释放锁了 */
核心问题:我们需要找到一个多个进程之间所有线程可见的区域来定义这个互斥量。
一个优秀的分布式锁的实现方案应该满足如下几个特性:
代码实例:
// 扣库存接口@RequestMapping("/minusInventory")public String minusInventory(Inventory inventory) { // 获取锁 String lockKey = "lock-" + inventory.getInventoryId(); int timeOut = 100; Boolean flag = stringRedisTemplate.opsForValue() .setIfAbsent(lockKey, "竹子-熊猫",timeOut,TimeUnit.SECONDS); // 加上过期时间,可以保证死锁也会在一定时间内释放锁 stringRedisTemplate.expire(lockKey,timeOut,TimeUnit.SECONDS); if(!flag){ // 非阻塞式实现 return "服务器繁忙...请稍后重试!!!"; } // ----只有获取锁成功才能执行下述的减库存业务---- try{ // 查询库存信息 Inventory inventoryResult = inventoryService.selectByPrimaryKey(inventory.getInventoryId()); if (inventoryResult.getShopCount() <= 0) { return "库存不足,请联系卖家...."; } // 扣减库存 inventoryResult.setShopCount(inventoryResult.getShopCount() - 1); int n = inventoryService.updateByPrimaryKeySelective(inventoryResult); } catch (Exception e) { // 确保业务出现异常也可以释放锁,避免死锁 // 释放锁 stringRedisTemplate.delete(lockKey); } if (n > 0) return "端口-" + port + ",库存扣减成功!!!"; return "端口-" + port + ",库存扣减失败!!!";}作者:竹子爱熊猫链接:https://juejin.cn/post/7038473714970656775
过期时间的合理性分析:
因为对于不同的业务,我们设置的过期时间的长短都会不一样,太长了不合适,太短了也不合适;
所以我们想到的解决方案是设置一条子线程,给当前锁资源续命。具体实现是,子线程间隔 2-3s 去查询一次 key 是否过期,如果还没有过期则代表业务线程还在执行业务,那么则为该 key 的过期时间加上 5s。
但是为了避免主线程意外死亡后,子线程会一直为其续命,造成 “长生锁” 的现象,所以将子线程变为主(业务)线程的守护线程,这样子线程就会跟着主线程一起死亡。
// 续命子线程public class GuardThread extends Thread { private static boolean flag = true; public GuardThread(String lockKey, int timeOut, StringRedisTemplate stringRedisTemplate){ …… } @Override public void run() { // 开启循环续命 while (flag){ try { // 先休眠一半的时间 Thread.sleep(timeOut / 2 * 1000); }catch (Exception e){ e.printStackTrace(); } // 时间过了一半之后再去续命 // 先查看key是否过期 Long expire = stringRedisTemplate.getExpire( lockKey, TimeUnit.SECONDS); // 如果过期了,代表主线程释放了锁 if (expire <= 0){ // 停止循环 flag = false; } // 如果还未过期 // 再为则续命一半的时间 stringRedisTemplate.expire(lockKey,expire + timeOut/2,TimeUnit.SECONDS); } }}// 创建子线程为锁续命GuardThread guardThread = new GuardThread(lockKey,timeOut,stringRedisTemplate);// 设置为当前 业务线程 的守护线程guardThread.setDaemon(true);guardThread.start();作者:竹子爱熊猫 链接:https://juejin.cn/post/7038473714970656775
为了在开发过程保证 Redis 的高可用,会采用主从复制架构做读写分离,从而提升 Redis 的吞吐量以及可用性。但是如果一条线程在 redis 主节点上获取锁成功之后,主节点还没有来得及复制给从节点就宕机了,此时另一条线程访问 redis 就会在从节点上面访问,同时也获取锁成功,这时候临界资源的访问就会出现安全性问题了。
解决办法:
Zookeeper 数据区别于 redis 的数据,数据是实时同步的,主节点写入后需要一半以上的节点都写入才会返回成功。所以如果像电商、教育等类型的项目追求高性能,可以放弃一定的稳定性,推荐使用 redis 实现;例如像金融、银行、政府等类型的项目,追求高稳定性,可以牺牲一部分性能,推荐使用 Zookeeper 实现。
上面加锁确实解决了并发情况下线程安全的问题,但是我们面对 100w 个用户同时去抢购 1000 个商品的场景该如何解决呢?
1. https://juejin.cn/post/7236213437800890423
2. https://juejin.cn/post/7038473714970656775
3. https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
作者:京东科技 焦泽斌
来源:京东云开发者社区 转载请注明来源
本文链接:http://www.28at.com/showinfo-26-42229-0.html并发情况如何实现加锁来保证数据一致性?
声明:本网页内容旨在传播知识,不代表本站观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。