场景
客户在执行了还款操作后,系统会记录交易流水、更新客户还款计划、更新已还金额和未还金额,这些数据都是交易数据,会在本次交易中立刻计算。但是还有一些非交易数据(如:逾期天数,最新逾期时间,逾期等级),不是在交易动作里实时计算的,而是按照计划的频次批量计算和更新(当时更新时点和频次是每晚一次)。直接造成的影响是客户投诉,因为催收人员需要等到第二天才能知道客户是否还款,经常打催收电话给客户时客户已经还款了。
目标
在不降低原交易体验的情况下降低数据延时。
最优:在不影响原交易体验情况下做到实时更新
次优:在不影响原交易体验情况下做到一小时内更新
实现方式
在一番交流和纠结后,决定使用发布订阅模式:交易完成后发布一个交易完成通知,需要处理的任务订阅通知,并在收到通知后执行。
优点:
- 代码入侵程度小,或无需入侵
- 对原有交易体验,逻辑处理时长,事物时长影响几乎为零,因为仅将消息发布出去而已。
- 时效性高,虽然是准实时处理,但几乎可以认为是实时的处理。
缺点:
- 业务相关性相较之前更紧密
- 需要维护一个消息队列(此项目截止目前未使用消息队列)
具体执行
一开始执行就开始争论一个问题了:
- 在交易执行完成后由开发者主动调用发布通知
- 通过动态代理拦截交易操作,在交易成功后统一发布通知
最终选择了2
的执行方式。理由如下:
- 截止目前,不同的交易已经有超过10种处理方式了,每一种处理方式都是一个交易接口子类
- 开发者手动调用,在交易处理细节处,可能会有多个执行完毕出口,容易遗漏某些出口处的调用
- 大批量代码文件的相同修改,提交后的代码审查不太好解释
说了这么多,下面开始进入激动人心的环节。
code-v1.0
包含一个具体任务处理实现一共三个类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import xxxx.BusinessTransactionContext;
public interface Processor {
void handle(BusinessTransactionContext context);
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import xxxx.BusinessTransactionContext; import org.springframework.stereotype.Component;
@Component public class UpdateLoanBalanceRelativeProcessor implements Processor { @Override public void handle(BusinessTransactionContext context) { } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| import xxxx.BusinessTransactionContext; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong;
@Slf4j @Aspect @Component public class TransactionProcessManager implements InitializingBean, DisposableBean { private static final ThreadGroup THREAD_GROUP = new ThreadGroup("PostProcess"); private static final AtomicLong THREAD_NUMBER = new AtomicLong(1); private ExecutorService executorService; @Resource private List<Processor> processors;
@Override public void afterPropertiesSet() { int corePoolSize = 5; int maximumPoolSize = 50; long keepAliveTime = 10L; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(5000); ThreadFactory threadFactory = runnable -> new Thread(THREAD_GROUP, runnable, THREAD_GROUP.getName() + "-" + THREAD_NUMBER.getAndIncrement()); executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue, threadFactory); }
@Override public void destroy() { executorService.shutdown(); }
@Bean @ConditionalOnMissingBean(Processor.class) public Processor emptyProcessor() { return context -> {}; }
@Pointcut("execution(* xxxx.*.commit.*.action(..))") private void pointcut() {}
@AfterReturning("pointcut()") public void afterReturning(JoinPoint pjp) { String declaringTypeName = pjp.getSignature().getDeclaringTypeName(); Object[] args = pjp.getArgs(); BusinessTransactionContext context = (BusinessTransactionContext) args[0]; for (Processor processor : processors) { log.info("交易执行完毕,触发后置任务:{} 交易类:{}", processor.getClass().getName(), declaringTypeName); executorService.submit(() -> { try { processor.handle(context); } catch (Exception e) { log.warn("后置任务执行失败:{} 交易类:{}", processor.getClass().getName(), declaringTypeName, e); } }); } }
}
|
存在的问题
问题现象
后置处理中查询的数据是修改前的数据。
直接原因
后置处理是在一个新的独立线程里执行。
详细说明
- 在Spring的事务管理方式下,事务是通过
ThreadLocal
来传递的,在后置处理新开启的线程里,肯定是和主线程上下文的事务没有任何的关系了,会开启新的一个事务。如果后置处理能保证在主操作事务提交后才执行,那么也不会有什么问题。
- 因为后置处理是处于一个新的线程中执行,所以理论上来说,就有可能后置处理执行完了,主线程的事务还未提交,或者后置处理在查询数据的时候,主线程事务还未提交。
场景还原:
拦截的方法就是数据入库方法,数据入库方法如下
1 2 3 4 5 6 7 8
| @Transactional(rollbackFor = Exception.class) @Override public boolean action(BusinessTransactionContext businessTransactionContext) { }
|
客户进行单笔还款时,大多时候不会出现此问题。因为在此操作下,事务开闭在action
方法上,数据入库就表示操作结束了,接下来立马就是Spring来操作后续的事务处理了,一般情况下事务会在后置处理执行查询前关闭。
如果是第三方回盘,或者是财务批量导入的还款,就一定会出现此问题。因为这两种还款在目前的处理方式中,都是批量还款操作,这一批还款包装成了一个事务,如下:
1 2 3 4 5 6 7
| @Transactional(rollbackFor = Exception.class) @Override public void batch() { for (Xxx xxx : xxxList) { action(businessTransactionContext) } }
|
这样就存在问题了,因为拦截的action
方法,所以for
循环里每次调用action
都会触发一次后置处理,但是触发后主线程中事务未提交,所以每次后置处理里查询的数据都是修改之前的数据。
优化点
从以上可以看出,每此执行action
方法后,都会执行一遍后置处理。经常会有同一笔贷款,一次还多期,或一期还多次,对于这样的情况,显然后置处理只需执行一次即可,而目前是还几次执行几次。另一方面,在一批还款处理的情况下,也会导致一边还款,一边处理后置任务,会造成主任务的性能降低。
我们接下来就处理以上存在的问题和需要优化的地方
code-v2.0
包含一个具体任务处理实现一共五个类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import xxxx.BusinessTransactionContext;
public interface Processor {
void handle(BusinessTransactionContext context);
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import xxxx.BusinessTransactionContext; import org.springframework.stereotype.Component;
@Component public class UpdateLoanBalanceRelativeProcessor implements Processor { @Override public void handle(BusinessTransactionContext context) { } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import xxxx.BusinessTransactionContext; import lombok.Getter; import lombok.Setter; import lombok.experimental.Accessors;
@Getter @Setter @Accessors(chain = true) public class TransactionProcessData { private String declaringTypeName; private String loanNo; private BusinessTransactionContext businessTransactionContext;
@Override public int hashCode() { return loanNo.hashCode(); }
@Override public boolean equals(Object obj) { if (!(obj instanceof TransactionProcessData)) { return false; } TransactionProcessData entry = (TransactionProcessData) obj; return loanNo.equals(entry.loanNo); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| import xxxx.BusinessTransactionContext; import xxxx.T; import xxxx.LoanBalance; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.springframework.stereotype.Component; import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.Resource;
@Slf4j @Aspect @Component public class TransactionProcessAspect { @Resource private TransactionProcessManager transactionProcessManager;
@Pointcut("execution(* xxxx.*.commit.*.action(..))") private void pointcut() {}
@AfterReturning("pointcut()") public void afterReturning(JoinPoint pjp) { if (TransactionSynchronizationManager.isSynchronizationActive()) { TransactionProcessData entry = new TransactionProcessData(); String declaringTypeName = pjp.getSignature().getDeclaringTypeName(); entry.setDeclaringTypeName(declaringTypeName); Object[] args = pjp.getArgs(); BusinessTransactionContext context = (BusinessTransactionContext) args[0]; entry.setBusinessTransactionContext(context); try { LoanBalance loanBalance = context.get(T.LOANBALANCE, LoanBalance.class); entry.setLoanNo(loanBalance.getLoanNo()); TransactionSynchronizationManager.registerSynchronization(transactionProcessManager); transactionProcessManager.add(entry); } catch (Exception e) { log.warn("交易后置处理贷款信息获取失败,不执行后置处理.交易类:{}", declaringTypeName, e); } } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
| import xxxx.BusinessTransactionContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import javax.annotation.Resource; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong;
@Slf4j @Component public class TransactionProcessManager extends TransactionSynchronizationAdapter implements InitializingBean, DisposableBean { private static final ThreadGroup THREAD_GROUP = new ThreadGroup("PostProcess"); private static final AtomicLong THREAD_NUMBER = new AtomicLong(1);
private static final ThreadLocal<Set<TransactionProcessData>> PROCESS_ENTRY_THREAD_LOCAL = ThreadLocal.withInitial(HashSet::new); private ExecutorService executorService; @Resource private List<Processor> processors;
public void add(TransactionProcessData entry) { PROCESS_ENTRY_THREAD_LOCAL.get().add(entry); }
@Override public void afterPropertiesSet() { int corePoolSize = 5; int maximumPoolSize = 50; long keepAliveTime = 10L; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(5000); ThreadFactory threadFactory = runnable -> new Thread(THREAD_GROUP, runnable, THREAD_GROUP.getName() + "-" + THREAD_NUMBER.getAndIncrement()); executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue, threadFactory); }
@Override public void destroy() { executorService.shutdown(); }
@Override public void afterCommit() { List<TransactionProcessData> processEntries = new ArrayList<>(PROCESS_ENTRY_THREAD_LOCAL.get()); log.info("交易执行完毕,交易事物已提交,待执行交易后置任务:{}", processEntries.size()); executorService.submit(() -> { for (TransactionProcessData data : processEntries) { String declaringTypeName = data.getDeclaringTypeName(); String loanNo = data.getLoanNo(); BusinessTransactionContext context = data.getBusinessTransactionContext(); for (Processor processor : processors) { String processorName = processor.getClass().getName(); log.info("执行交易后置任务:{} 交易类:{} 交易数据:{}", processorName, declaringTypeName, loanNo); try { processor.handle(context); log.info("交易后置任务执行成功:{} 交易类:{} 交易数据:{}", processorName, declaringTypeName, loanNo); } catch (Exception e) { log.warn("交易后置任务执行失败:{} 交易类:{} 交易数据:{}", processorName, declaringTypeName, loanNo, e); } } } }); }
@Override public void afterCompletion(int status) { if (log.isInfoEnabled()) { String statusText = "正常提交"; if (status == TransactionSynchronization.STATUS_ROLLED_BACK) { statusText = "事物回滚"; } else if (status == TransactionSynchronization.STATUS_UNKNOWN) { statusText = "状态未知"; } log.info("回收后置处理任务资源.事物状态:{}", statusText); } PROCESS_ENTRY_THREAD_LOCAL.get().clear(); }
@Bean @ConditionalOnMissingBean(Processor.class) public Processor emptyProcessor() { return context -> {}; }
}
|
不同点:
- 在代理方法中通过
TransactionSynchronizationManager.registerSynchronization
注册了一个TransactionSynchronizationAdapter
,所有的后置处理在TransactionSynchronizationAdapter
的afterCommit
方法里调用
这个操作保证了后置处理一定会在事务提交后执行,解决后置处理中查询的数据是修改前的数据这个问题(即code-v1.0中出现的问题)。
- 优化执行数据,同一贷款编号的数据无论交易了多少次,在一次事务提交后,只执行一次后置处理
- 优化后置调用方式,所有的后置任务均在同一个处理线程里处理
避免批量还款造成创建大量线程,降低CPU调度成本,时效性上可以接受。