场景

客户在执行了还款操作后,系统会记录交易流水、更新客户还款计划、更新已还金额和未还金额,这些数据都是交易数据,会在本次交易中立刻计算。但是还有一些非交易数据(如:逾期天数,最新逾期时间,逾期等级),不是在交易动作里实时计算的,而是按照计划的频次批量计算和更新(当时更新时点和频次是每晚一次)。直接造成的影响是客户投诉,因为催收人员需要等到第二天才能知道客户是否还款,经常打催收电话给客户时客户已经还款了。

目标

在不降低原交易体验的情况下降低数据延时。
最优:在不影响原交易体验情况下做到实时更新
次优:在不影响原交易体验情况下做到一小时内更新

实现方式

在一番交流和纠结后,决定使用发布订阅模式:交易完成后发布一个交易完成通知,需要处理的任务订阅通知,并在收到通知后执行。

优点:

  1. 代码入侵程度小,或无需入侵
  2. 对原有交易体验,逻辑处理时长,事物时长影响几乎为零,因为仅将消息发布出去而已。
  3. 时效性高,虽然是准实时处理,但几乎可以认为是实时的处理。

缺点:

  1. 业务相关性相较之前更紧密
  2. 需要维护一个消息队列(此项目截止目前未使用消息队列)

具体执行

一开始执行就开始争论一个问题了:

  1. 在交易执行完成后由开发者主动调用发布通知
  2. 通过动态代理拦截交易操作,在交易成功后统一发布通知

最终选择了2的执行方式。理由如下:

  1. 截止目前,不同的交易已经有超过10种处理方式了,每一种处理方式都是一个交易接口子类
  2. 开发者手动调用,在交易处理细节处,可能会有多个执行完毕出口,容易遗漏某些出口处的调用
  3. 大批量代码文件的相同修改,提交后的代码审查不太好解释

说了这么多,下面开始进入激动人心的环节。

code-v1.0

包含一个具体任务处理实现一共三个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import xxxx.BusinessTransactionContext;

/**
* 处理器
*
* @author tangjialin on 2019-07-25.
*/
public interface Processor {
/**
* 处理方法
*
* @param context 交易上下文
*/
void handle(BusinessTransactionContext context);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import xxxx.BusinessTransactionContext;
import org.springframework.stereotype.Component;

/**
* 更新贷款余额附属表
*
* @author tangjialin on 2019-07-25.
*/
@Component
public class UpdateLoanBalanceRelativeProcessor implements Processor {
@Override
public void handle(BusinessTransactionContext context) {
// 更新贷款余额附属表
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import xxxx.BusinessTransactionContext;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* 交易拦截处理器管理
* <p>执行顺序:around -> before -> 被拦截的方法 -> around -> after -> afterReturning(方法执行无异常) 或 afterThrowing(方法执行异常)</p>
*
* @author tangjialin on 2019-07-25.
*/
@Slf4j
@Aspect
@Component
public class TransactionProcessManager implements InitializingBean, DisposableBean {
/** 线程分组 */
private static final ThreadGroup THREAD_GROUP = new ThreadGroup("PostProcess");
/** 线程编号 */
private static final AtomicLong THREAD_NUMBER = new AtomicLong(1);
/** 线程池 */
private ExecutorService executorService;
/** 处理器,注入所有Processor实现类 */
@Resource
private List<Processor> processors;

/**
* 初始化线程池
*/
@Override
public void afterPropertiesSet() {
// 最小空闲池数量
int corePoolSize = 5;
int maximumPoolSize = 50;
long keepAliveTime = 10L;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(5000);
ThreadFactory threadFactory = runnable -> new Thread(THREAD_GROUP, runnable, THREAD_GROUP.getName() + "-" + THREAD_NUMBER.getAndIncrement());
executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue, threadFactory);
}

/**
* 关闭线程池
*/
@Override
public void destroy() {
executorService.shutdown();
}

/**
* 若应用中没有任何Processor的示例,则创建EmptyProcessor
*/
@Bean
@ConditionalOnMissingBean(Processor.class)
public Processor emptyProcessor() {
return context -> {};
}

/**
* 此方法用于抽离定义的切点
*/
@Pointcut("execution(* xxxx.*.commit.*.action(..))")
private void pointcut() {}

/**
* 拦截所有IBusinessAccountService子类的action方法
* 在方法成功执行后执行,若原方法抛出异常,则不执行
*
* @param pjp 切点包装参数
*/
@AfterReturning("pointcut()")
public void afterReturning(JoinPoint pjp) {
String declaringTypeName = pjp.getSignature().getDeclaringTypeName();
Object[] args = pjp.getArgs();
BusinessTransactionContext context = (BusinessTransactionContext) args[0];
for (Processor processor : processors) {
log.info("交易执行完毕,触发后置任务:{} 交易类:{}", processor.getClass().getName(), declaringTypeName);
executorService.submit(() -> {
// 增加异常捕获
// 不捕获异常也不会影响外部程序运行,因为在executorService.submit内部有异常捕获,
// 但是捕获了不会打印,需要别的方式得到异常并记录
// 所以在此使用主动try-catch的方式记录异常信息
try {
processor.handle(context);
} catch (Exception e) {
log.warn("后置任务执行失败:{} 交易类:{}", processor.getClass().getName(), declaringTypeName, e);
}
});
}
}

}

存在的问题

问题现象

后置处理中查询的数据是修改前的数据。

直接原因

后置处理是在一个新的独立线程里执行。

详细说明

  1. 在Spring的事务管理方式下,事务是通过ThreadLocal来传递的,在后置处理新开启的线程里,肯定是和主线程上下文的事务没有任何的关系了,会开启新的一个事务。如果后置处理能保证在主操作事务提交后才执行,那么也不会有什么问题。
  2. 因为后置处理是处于一个新的线程中执行,所以理论上来说,就有可能后置处理执行完了,主线程的事务还未提交,或者后置处理在查询数据的时候,主线程事务还未提交。

场景还原:

拦截的方法就是数据入库方法,数据入库方法如下

1
2
3
4
5
6
7
8
@Transactional(rollbackFor = Exception.class)
@Override
public boolean action(BusinessTransactionContext businessTransactionContext) {
// 入库交易信息
// 入库其它信息A
// 入库还款计划
// 入库其它信息B
}

客户进行单笔还款时,大多时候不会出现此问题。因为在此操作下,事务开闭在action方法上,数据入库就表示操作结束了,接下来立马就是Spring来操作后续的事务处理了,一般情况下事务会在后置处理执行查询前关闭。

如果是第三方回盘,或者是财务批量导入的还款,就一定会出现此问题。因为这两种还款在目前的处理方式中,都是批量还款操作,这一批还款包装成了一个事务,如下:

1
2
3
4
5
6
7
@Transactional(rollbackFor = Exception.class)
@Override
public void batch() {
for (Xxx xxx : xxxList) {
action(businessTransactionContext)
}
}

这样就存在问题了,因为拦截的action方法,所以for循环里每次调用action都会触发一次后置处理,但是触发后主线程中事务未提交,所以每次后置处理里查询的数据都是修改之前的数据。

优化点

从以上可以看出,每此执行action方法后,都会执行一遍后置处理。经常会有同一笔贷款,一次还多期,或一期还多次,对于这样的情况,显然后置处理只需执行一次即可,而目前是还几次执行几次。另一方面,在一批还款处理的情况下,也会导致一边还款,一边处理后置任务,会造成主任务的性能降低。

我们接下来就处理以上存在的问题和需要优化的地方

code-v2.0

包含一个具体任务处理实现一共五个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import xxxx.BusinessTransactionContext;

/**
* 处理器
*
* @author tangjialin on 2019-07-25.
*/
public interface Processor {
/**
* 处理方法
*
* @param context 交易上下文
*/
void handle(BusinessTransactionContext context);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import xxxx.BusinessTransactionContext;
import org.springframework.stereotype.Component;

/**
* 更新贷款余额附属表
*
* @author tangjialin on 2019-07-25.
*/
@Component
public class UpdateLoanBalanceRelativeProcessor implements Processor {
@Override
public void handle(BusinessTransactionContext context) {
// 更新贷款余额附属表
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import xxxx.BusinessTransactionContext;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;

/**
* 交易信息
*
* @author tangjialin on 2019-08-12.
*/
@Getter
@Setter
@Accessors(chain = true)
public class TransactionProcessData {
/** 执行的交易类 */
private String declaringTypeName;
/** 贷款编号 */
private String loanNo;
/** 业务交易上下文 */
private BusinessTransactionContext businessTransactionContext;

@Override
public int hashCode() {
return loanNo.hashCode();
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof TransactionProcessData)) { return false; }
TransactionProcessData entry = (TransactionProcessData) obj;
return loanNo.equals(entry.loanNo);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import xxxx.BusinessTransactionContext;
import xxxx.T;
import xxxx.LoanBalance;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import javax.annotation.Resource;

/**
* 交易拦截处理切面
* <p>执行顺序:around -> before -> 被拦截的方法 -> around -> after -> afterReturning(方法执行无异常) 或 afterThrowing(方法执行异常)</p>
*
* @author tangjialin on 2019-08-12.
*/
@Slf4j
@Aspect
@Component
public class TransactionProcessAspect {
@Resource
private TransactionProcessManager transactionProcessManager;

/**
* 此方法用于抽离定义的切点
*/
@Pointcut("execution(* xxxx.*.commit.*.action(..))")
private void pointcut() {}

/**
* 拦截所有IBusinessAccountService子类的action方法
* 在方法成功执行后执行,若原方法抛出异常,则不执行
*
* @param pjp 切点包装参数
*/
@AfterReturning("pointcut()")
public void afterReturning(JoinPoint pjp) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionProcessData entry = new TransactionProcessData();
String declaringTypeName = pjp.getSignature().getDeclaringTypeName();
entry.setDeclaringTypeName(declaringTypeName);
Object[] args = pjp.getArgs();
BusinessTransactionContext context = (BusinessTransactionContext) args[0];
entry.setBusinessTransactionContext(context);
try {
LoanBalance loanBalance = context.get(T.LOANBALANCE, LoanBalance.class);
entry.setLoanNo(loanBalance.getLoanNo());
// 注册事物同步执行器(用于在事物提交后执行后置处理)
TransactionSynchronizationManager.registerSynchronization(transactionProcessManager);
transactionProcessManager.add(entry);
} catch (Exception e) {
log.warn("交易后置处理贷款信息获取失败,不执行后置处理.交易类:{}", declaringTypeName, e);
}
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import xxxx.BusinessTransactionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* 事物步骤回调处理器
* <p>
* 此处理器用于检测事物的提交,保证交易置处理器里的任务都在事物提交后执行,避免因事物未关闭加载的不是新数据
*
* @author tangjialin on 2019-07-25.
*/
@Slf4j
@Component
public class TransactionProcessManager extends TransactionSynchronizationAdapter implements InitializingBean, DisposableBean {
/** 线程分组 */
private static final ThreadGroup THREAD_GROUP = new ThreadGroup("PostProcess");
/** 线程编号 */
private static final AtomicLong THREAD_NUMBER = new AtomicLong(1);
/**
* 实际处理数据集合
* 使用{@link ThreadLocal}管理集合
* 此集合本就不需要不同线程间共享,所以使用{@link ThreadLocal}可以解决并发文件
*/
private static final ThreadLocal<Set<TransactionProcessData>> PROCESS_ENTRY_THREAD_LOCAL = ThreadLocal.withInitial(HashSet::new);
/** 线程池执行器 */
private ExecutorService executorService;
/** 所有的交易处理器,注入所有Processor实现类 */
@Resource
private List<Processor> processors;

/**
* 添加待处理数据(同一个出账编号的多次提交,只会处理一次)
* <p>
* 此操作是针对同一个线程里的数据操作,因此无需额外使用同步手段
*
* @param entry 待处理数据
*/
public void add(TransactionProcessData entry) {
PROCESS_ENTRY_THREAD_LOCAL.get().add(entry);
}

/**
* 初始化线程池
*/
@Override
public void afterPropertiesSet() {
// 最小空闲池数量
int corePoolSize = 5;
int maximumPoolSize = 50;
long keepAliveTime = 10L;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(5000);
ThreadFactory threadFactory = runnable -> new Thread(THREAD_GROUP, runnable, THREAD_GROUP.getName() + "-" + THREAD_NUMBER.getAndIncrement());
executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue, threadFactory);
}

/**
* 关闭线程池
*/
@Override
public void destroy() {
executorService.shutdown();
}

/**
* 事物提交后置处理
*/
@Override
public void afterCommit() {
// 获取需要处理的数据
// 1.不能在executorService.submit(新线程)里执行的,因为要依赖当前的线程环境
// 2.取出来后拷贝一份到新的集合(深拷贝浅拷贝都可以),否则在线程里遍历数据前就被[afterCompletion]清理了就无法处理了
List<TransactionProcessData> processEntries = new ArrayList<>(PROCESS_ENTRY_THREAD_LOCAL.get());
log.info("交易执行完毕,交易事物已提交,待执行交易后置任务:{}", processEntries.size());
// 用一个新线程处理当前线程下的所有数据
executorService.submit(() -> {
for (TransactionProcessData data : processEntries) {
String declaringTypeName = data.getDeclaringTypeName();
String loanNo = data.getLoanNo();
BusinessTransactionContext context = data.getBusinessTransactionContext();
for (Processor processor : processors) {
String processorName = processor.getClass().getName();
log.info("执行交易后置任务:{} 交易类:{} 交易数据:{}", processorName, declaringTypeName, loanNo);
// 增加异常捕获
// 不捕获异常也不会影响外部程序运行,因为在executorService.submit内部有异常捕获,
// 但是捕获了不会打印,需要别的方式得到异常并记录
// 所以在此使用主动try-catch的方式记录异常信息
try {
processor.handle(context);
log.info("交易后置任务执行成功:{} 交易类:{} 交易数据:{}", processorName, declaringTypeName, loanNo);
} catch (Exception e) {
log.warn("交易后置任务执行失败:{} 交易类:{} 交易数据:{}", processorName, declaringTypeName, loanNo, e);
}
}
}
});
}

/**
* 统一回收后置处理任务资源
* <p>
* 在事务提交/回滚之后调用
*
* @param status 事物的执行状态
* @see #STATUS_COMMITTED
* @see #STATUS_ROLLED_BACK
* @see #STATUS_UNKNOWN
*/
@Override
public void afterCompletion(int status) {
if (log.isInfoEnabled()) {
String statusText = "正常提交";
if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
statusText = "事物回滚";
} else if (status == TransactionSynchronization.STATUS_UNKNOWN) {
statusText = "状态未知";
}
log.info("回收后置处理任务资源.事物状态:{}", statusText);
}
PROCESS_ENTRY_THREAD_LOCAL.get().clear();
}

/**
* 若应用中没有任何Processor的示例,则创建EmptyProcessor
*/
@Bean
@ConditionalOnMissingBean(Processor.class)
public Processor emptyProcessor() {
return context -> {};
}

}

不同点:

  1. 在代理方法中通过TransactionSynchronizationManager.registerSynchronization注册了一个TransactionSynchronizationAdapter,所有的后置处理在TransactionSynchronizationAdapterafterCommit方法里调用
    这个操作保证了后置处理一定会在事务提交后执行,解决后置处理中查询的数据是修改前的数据这个问题(即code-v1.0中出现的问题)。
  2. 优化执行数据,同一贷款编号的数据无论交易了多少次,在一次事务提交后,只执行一次后置处理
  3. 优化后置调用方式,所有的后置任务均在同一个处理线程里处理
    避免批量还款造成创建大量线程,降低CPU调度成本,时效性上可以接受。