在上一篇文章中,我们通过双异步的方式导入了10万行的Excel,有个小伙伴在评论区问我,如果保证事务呢,如果分批的话。
通过串行读取Excel,单个Excel耗时191s。
通过Future获取异步返回值,再和Excel文件数据行进行比较,实现对数据准确性的判断!
Java8中引入了CompletableFuture,它实现了对Future的全面升级,可以通过回调的方式,获取异步线程返回值。
CompletableFuture的异步执行通过ForkJoinPool实现, 它使用守护线程去执行任务。
ForkJoinPool在于可以充分利用多核CPU的优势,把一个任务拆分成多个小任务,把多个小任务放到多个CPU上并行执行,当多个小任务执行完毕后,再将其执行结果合并起来。
想要保证事务,肯定是使用@Transactional来实现。
现在的场景是导入若干个大的Excel文件数据,因为每个Excel导入的表不同,所以只要保证单Excel的事务即可。
上文中,是使用异步批量读取并插入的方式实现的Excel文件入库。
也就是说,1个主线程事务 + 若干个子线程事务,我们想要保证单Excel的插入事务,所有异步子线程有任何一个报错,都要进行事务回滚,如果全部都没报错,则进行事务提交。
这个时候,有的小伙伴可能会想到,主线程加个@Transactional注解,所有子线程分别加@Transactional注解,就可以了吧?
但是,这样是不行的,子线程的异常只会回滚其自身的事务。
如果Excel中有10万条数据,一次插入4200条数据,最后一次插入3400条。如果其它线程都插入成功了,最后一个报错了,此时,数据库中还是会有96600条数据插入成功,与单Excel的事务需求不符。
通过代码模拟这种情况:
if(end == sheet.getLastRowNum()){ logger.info("插入最后一批数据,模拟异常"); int a = 1/0;}
声明式事务管理建立在AOP之上的。其本质是对方法前后进行拦截,然后在目标方法开始之前创建或者加入一个事务,在执行完目标方法之后根据执行情况提交或者回滚事务。
简而言之,@Transactional注解在代码执行出错的时候能够进行事务的回滚。
使用@Transactional后,当程序发生RuntimeException运行时异常在没有使用try,catch进行捕获的时候,程序都会中止,当程序发生中止,则会触发数据库的回滚。
当使用了trycatch进行捕获到这个异常,假如在catch中加入了throw e抛出异常,则程序中止,数据库回滚。
加入在try catch中没有throw e 抛出异常,只是简单的打印异常,则异常被捕获未抛出异常去终止程序,在trycatch中的操作数据库语句插入失败,在trycatch上面和下面的数据库相关插入语句成功,也就是程序成功跑完,数据库不会发生回滚。
在@Transactional注解中如果不配置rollbackFor属性,那么事物只会在遇到RuntimeException的时候才会回滚,加上rollbackFor=Exception.class,可以让事物在遇到非运行时异常时也回滚。
事务拦截器在目标方法执行前后进行拦截,内部会调用方法来获取Transactional 注解的事务配置信息,调用前会检查目标方法的修饰符是否为 public,不是 public则不会获取@Transactional 的属性配置信息。
rollbackFor 可以指定能够触发事务回滚的异常类型。
Spring默认抛出了未检查unchecked异常(继承自 RuntimeException 的异常)或者 Error才回滚事务;其他异常不会触发回滚事务。
如果在事务中抛出其他类型的异常,但却期望 Spring 能够回滚事务,就需要指定rollbackFor属性。
开发中避免不了会对同一个类里面的方法调用,比如有一个类Test,它的一个方法A,A再调用本类的方法B(不论方法B是用public还是private修饰),但方法A没有声明注解事务,而B方法有。则外部调用方法A之后,方法B的事务是不会起作用的。这也是经常犯错误的一个地方。
那为啥会出现这种情况?其实这还是由于使用Spring AOP代理造成的,因为只有当事务方法被当前类以外的代码调用时,才会由Spring生成的代理对象来管理。
在同一个类中调用异步方法,等于调用this本类的方法,没有走Spring生成的代理类,也就不会让他异步执行,@Transactional的原理也类似。
如果你手动的catch捕获这个异常并进行处理,事务管理器会认为当前事务应该正常commit,就会导致注解失效,如果非要捕获且不失效,就必须在代码块内throw new Exception抛出异常。
@Transactional(rollbackFor = Exception.class)public void readXls(String filePath, String filename) throws Exception{ try { // 省略一些复杂操作... List<Future<Integer>> futureList = new ArrayList<>(); for (int time = 0; time < times; time++) { Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsyncMybatis(); futureList.add(sumFuture); } // 主线程获取Future返回值 boolean futureFlag = getFutureResult(futureList, excelRow); if (futureFlag) { logger.info("readXlsCacheAsync---插入数据成功,提交事务"); } else { TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); logger.info("readXlsCacheAsync---插入数据失败,回滚事务"); } } catch (Exception e) { TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); logger.error("readXlsCacheAsync---插入数据异常,回滚事务:", e); }}@Async("async-executor")//是否开启异步@Overridepublic Integer readXlsCacheAsyncMybatis() { try { // 省略一些复杂操作... }catch (Exception e){ throw new RuntimeException("插入数据库异常", e); }}
如果入库异常,事务回滚成功。
回滚失败!
public void readXls(String filePath, String filename) throws Exception{ // 手动开启事务,不自动提交 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { // 省略一些复杂操作... List<Future<Integer>> futureList = new ArrayList<>(); for (int time = 0; time < times; time++) { Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsyncMybatis(); futureList.add(sumFuture); } // 主线程获取Future返回值 boolean futureFlag = getFutureResult(futureList, excelRow); if (futureFlag) { dataSourceTransactionManager.commit(transactionStatus); // 提交 logger.info("readXlsCacheAsync---插入数据成功,提交事务"); } else { dataSourceTransactionManager.rollback(transactionStatus);// 回滚 logger.info("readXlsCacheAsync---插入数据失败,回滚事务"); } } catch (Exception e) { dataSourceTransactionManager.rollback(transactionStatus);// 回滚 logger.error("readXlsCacheAsync---插入数据异常,回滚事务:", e); }}@Async("async-executor")//是否开启异步@Overridepublic Integer readXlsCacheAsyncMybatis() { try { // 省略一些复杂操作... }catch (Exception e){ throw new RuntimeException("插入数据库异常", e); }}
如果入库异常,事务回滚成功。
回顾一下需求:异步某线程失败时,主线程回滚所有异步线程的事务!
是代码有问题,还是就是实现不了呢?
@Async和@Transactional注解都是通过Spring aop实现的,核心都是靠着关键的MethodInterceptor实现,@Async会给对应bean代理对象中放入一个AnnotationAsyncExecutionInterceptor拦截器,而@Transactional会给对应bean的代理对象中放入一个TransactionInterceptor拦截器。
Spring事务管理的传播机制是使用 ThreadLocal 实现的。因为 ThreadLocal 是线程私有的,所以 Spring 的事务传播机制是不能够跨线程的。
/** * 数据源事务管理器 */private DataSourceTransactionManager dataSourceTransactionManager;@Autowiredpublic void setUserService(DataSourceTransactionManager dataSourceTransactionManager) { this.dataSourceTransactionManager = dataSourceTransactionManager;}@Overridepublic void readXls(String filePath, String filename) { List<TransactionStatus> transactionStatusList = Collections.synchronizedList(new ArrayList<>()); List<TransactionResource> transactionResourceList = Collections.synchronizedList(new ArrayList<>()); try { List<Future<Integer>> futureList = new ArrayList<>(); for (int time = 0; time < times; time++) { Future<Integer> sumFuture = readAsyncFutureTransactionDBService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder,transactionStatusList,transactionResourceList); futureList.add(sumFuture); } // 主线程获取Future返回值 boolean futureFlag = getFutureResult(futureList, excelRow); if (futureFlag) { for (int i = 0; i < transactionStatusList.size(); i++) { TransactionStatus transactionStatus = transactionStatusList.get(i); dataSourceTransactionManager.commit(transactionStatus); // 提交 } logger.info("readXlsCacheAsync---插入数据成功,提交事务"); } else { for (int i = 0; i < transactionStatusList.size(); i++) { TransactionStatus transactionStatus = transactionStatusList.get(i); dataSourceTransactionManager.rollback(transactionStatus);// 回滚 } logger.info("readXlsCacheAsync---插入数据失败,事务回滚"); throw new RuntimeException("readXlsCacheAsync---插入数据异常,异常事务回滚"); } } catch (Exception e) { logger.error("readXlsCacheAsync---插入数据异常,事务回滚:", e); for (int i = 0; i < transactionStatusList.size(); i++) { TransactionStatus transactionStatus = transactionStatusList.get(i); dataSourceTransactionManager.rollback(transactionStatus);// 回滚 } //connection.rollback(); throw new RuntimeException("readXlsCacheAsync---插入数据异常,异常事务回滚"); }}
@Async("async-executor")@Overridepublic Future<Integer> readXlsCacheAsyncMybatis(XSSFSheet sheet, XSSFRow row, int start, int end, StringBuilder insertBuilder, List<TransactionStatus> transactionStatusList, List<ReadAsyncFutureTransactionServiceImpl.TransactionResource> transactionResourceList) throws Exception { DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition(); TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition); // 开启新事务 transactionStatusList.add(transactionStatus); // copy事务资源 transactionResourceList.add(ReadAsyncFutureTransactionServiceImpl.TransactionResource.copyTransactionResource()); try { // 入库操作 }catch (Exception e){ throw new RuntimeException("readXlsCacheAsyncMybatis分批异步读取Excel,通过Mybatis插入数据库异常"); }}
/** * 保存当前事务资源,用于线程间的事务资源COPY操作 * <p> * `@Builder`注解是Lombok库提供的一个注解,它可以用于自动生成Builder模式的代码,使用@Builder注解可以简化创建对象实例的过程,并且可以使代码更加清晰和易于维护 */static class TransactionResource { // TransactionSynchronizationManager类内部默认提供了下面六个ThreadLocal属性,分别保存当前线程对应的不同事务资源 // 保存当前事务关联的资源,默认只会在新建事务的时候保存当前获取到的DataSource和当前事务对应Connection的映射关系 // 当然这里Connection被包装为了ConnectionHolder // 事务结束后默认会移除集合中的DataSource作为key关联的资源记录 private Map<Object, Object> resources; //下面五个属性会在事务结束后被自动清理,无需我们手动清理 // 事务监听者,在事务执行到某个阶段的过程中,会去回调监听者对应的回调接口(典型观察者模式的应用),默认为空集合 private Set<TransactionSynchronization> synchronizations; // 存放当前事务名字 private String currentTransactionName; // 存放当前事务是否是只读事务 private Boolean currentTransactionReadOnly; // 存放当前事务的隔离级别 private Integer currentTransactionIsolationLevel; // 存放当前事务是否处于激活状态 private Boolean actualTransactionActive; /** * 对事务资源进行复制 * * @return TransactionResource */ public static TransactionResource copyTransactionResource() { return TransactionResource.builder() //返回的是不可变集合 .resources(TransactionSynchronizationManager.getResourceMap()) //如果需要注册事务监听者,这里记得修改,我们这里不需要,就采用默认负责,spring事务内部默认也是这个值 .synchronizations(new LinkedHashSet<>()).currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName()).currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly()).currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel()).actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive()).build(); } /** * 使用 */ public void autoWiredTransactionResource() { resources.forEach(TransactionSynchronizationManager::bindResource); //如果需要注册事务监听者,这里记得修改,我们这里不需要,就采用默认负责,spring事务内部默认也是这个值 TransactionSynchronizationManager.initSynchronization(); TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive); TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly); } /** * 移除 */ public void removeTransactionResource() { // 事务结束后默认会移除集合中的DataSource作为key关联的资源记录 // DataSource如果重复移除,unbindResource时会因为不存在此key关联的事务资源而报错 resources.keySet().forEach(key -> { if (!(key instanceof DataSource)) { TransactionSynchronizationManager.unbindResource(key); } }); }}
如何不加会怎么样?
在提交和回滚的时候,会出现异常:
经过不懈的努力,终于解决了“异步某线程失败时,主线程回滚所有异步线程的事务!”这个看起来很简单的问题。
也是对双异步入库系列的一个完结。
通过添加事务,可以有效的控制Excel异步插入数据的准确性。
本文链接:http://www.28at.com/showinfo-26-70399-0.html双异步系列完结撒花,如何解決异步事务问题?
声明:本网页内容旨在传播知识,不代表本站观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。