文章内容已经收录在《高级技术专家成长笔记》,欢迎订阅专栏! 从原理、系统设计出发,直击面试难点,实现更高维度的降维打击!
多线程案例落地 - 库存扣减请求合并、库存一致性保证
海哥也给出了合并库存扣减请求的 Demo,是一个多线程的 Demo,对提升并发掌握很有帮助,这里也整理一篇文章进行分析!
参考 B 站 up 主《极海Channel》,视频链接:www.bilibili.com/video/BV1Hv…
背景
对于库存扣减流程中的优化,最主要的优化就是库存扣减请求的合并,海哥也给出了 Demo,这里将整体设计、演进思路整理出来
库存扣减请求整体合并思路:针对热门商品进行合并,因此需要判断商品并发数,如果超过阈值,则判定为比较热门的商品,进行库存扣减请求的合并,此时会将库存扣减请求提交到队列,并且将用户线程阻塞,等待异步线程处理,处理完成之后,通知用户线程响应结果即可
这里会对海哥给出的代码进行分析,并且整理评论区中给出的问题
库存扣减请求的合并,主要流程为:
- 用户提交库存扣减请求
- 将用户请求的库存扣减请求入队,用户线程阻塞等待 200ms,等待异步线程处理请求
- 异步线程从队列中取出用户扣减请求,进行合并,再执行库存的扣减
- 异步线程处理完之后,唤醒用户线程,并响应结果
接下来将各个步骤拆解开,来分析库存扣减合并的 Demo
1、库存扣减操作
先看库存扣减操作的实现,主要是用户请求进来之后,需要将用户请求加入到队列中,等待异步线程处理,此时用户线程阻塞等待处理,阻塞时间为 200ms
这里就涉及到了阻塞,采用了 wait()
来实现阻塞
对应代码如下,对于阈值判断、队列的创建,不是核心逻辑,先略过,这里主要关注用户请求的入队操作,以及阻塞等待操作如何实现:
// 定义阻塞队列,存放用户请求
private BlockingQueue<RequestPromise> queue = new LinkedBlockingQueue<>(10);
public Result operate(UserRequest userRequest) throws InterruptedException {
// TODO 阈值判断
// TODO 队列的创建
// 1、创建 RequestPromise 封装用户请求,通过该对象,实现用户线程的阻塞和唤醒
RequestPromise requestPromise = new RequestPromise(userRequest);
synchronized (requestPromise) {
// 2、用户请求入队
boolean enqueueSuccess = queue.offer(requestPromise, 100, TimeUnit.MILLISECONDS);
if (! enqueueSuccess) {
return new Result(false, "系统繁忙");
}
try {
// 3、用户请求阻塞 200ms,等待请求被处理
requestPromise.wait(200);
// 4、根据 Result 判断是否为用户等待 200ms 超时
if (requestPromise.getResult() == null) {
return new Result(false, "等待超时");
}
} catch (InterruptedException e) {
return new Result(false, "被中断");
}
}
return requestPromise.getResult();
}
核心有两个操作:
-
queue.offer()
:将用户请求入队,之后异步线程会从队列取出请求,进行 合并处理 -
requestPromise.wait(200)
:将用户线程阻塞 200ms,等待处理,如果 200ms 之内,异步线程仍然没有处理完毕,会判断requestPromise.getResult() == null
,就算 等待超时 ,此时,直接返回结果为扣减失败即可
这里需要将入队操作放在 synchronized 同步代码块内(后边会详细解释):
这里假如将 queue.offer()
放在 synchronized 外部,那么假如用户请求刚加入队列之后,该请求就立即被异步线程处理,异步线程处理之后会通过 notify()
去唤醒这个用户线程,此时用户线程还没有执行 wait()
操作,也就是 notify()
先于 wait()
执行,导致用户线程只能阻塞 200ms 之后才能获取结果
因此通过 synchronized 主要是为了保证 wait()
和 notify()
操作的串行执行。当请求加入队列之后,执行 wait()
,用户线程释放锁,之后异步线程才可以通过 synchronized 拿到做,再执行 notify()
操作
2、异步线程、合并请求处理
在请求进入队列之后,需要异步线程去队列中获取请求,并进行合并,合并之后进行库存的扣减:
- 需要将该商品的多个扣减请求合并,比如 3 个请求,每个请求扣减 1 个库存 --------》合并为 1 个请求,扣减 3个库存,以此来减少数据库 IO 次数
// 模拟数据库中的库存值
private Integer stock = 6;
// 创建异步线程,对请求进行合并、处理
public void mergeJob() {
new Thread(() -> {
List<RequestPromise> list = new ArrayList<>();
while (true) {
// 1、不断从 queue 中获取请求
if (queue.isEmpty()) {
try {
// 2、每次 while 循环都睡眠 10ms,避免 CPU 一直空转
Thread.sleep(10);
continue;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 3、判断队头元素是否为空,不为空的话,就将队列中的请求全部取出来,放到 List 集合中进行合并
while (queue.peek() != null) {
list.add(queue.poll());
}
System.out.println(Thread.currentThread().getName() + ":合并扣减库存:" + list);
// 4、将所有请求需要扣减的库存数相加,一次性扣减
int sum = list.stream().mapToInt(e -> e.getUserRequest().getCount()).sum();
// 5、如果库存数足够,则直接扣减
if (sum <= stock) {
stock -= sum;
list.forEach(requestPromise -> {
requestPromise.setResult(new Result(true, "ok"));
synchronized (requestPromise) {
// 6、扣减成功之后,通过 notify() 唤醒阻塞的用户线程
requestPromise.notify();
}
});
list.clear();
continue;
}
// 7、如果剩余库存数不足,需要循环 list,先扣减较大的库存数
for (RequestPromise requestPromise : list) {
int count = requestPromise.getUserRequest().getCount();
if (count <= stock) {
stock -= count;
requestPromise.setResult(new Result(true, "ok"));
} else {
// 8、如果库存数量不足,则响应为库存不足,并唤醒用户线程
requestPromise.setResult(new Result(false, "库存不足"));
}
// 9、唤醒用户线程
synchronized (requestPromise) {
requestPromise.notify();
}
}
list.clear();
}
}, "mergeThread").start();
}
代码分析
在 mergeJob()
中,启动了异步线程去处理请求,里边有几个实现细节需要注意
- 避免 CPU 空转
这里异步线程需要不断去队列 queue 中取出请求进行处理,因此是在 while 循环中处理的
这里在 while 循环中,加入了 Thread.sleep(10)
,每次循环之后,让线程睡眠 10ms,就是避免 CPU 一直空转
- 唤醒用户阻塞线程
在用户库存扣减请求进入时,该用户线程创建了一个 RequestPromise,之后 用户线程 和 异步处理线程 通过这个 RequestPromise 进行线程间的通信
当用户线程将请求加入队列之后,调用 requestPromise.wait()
阻塞线程,等待处理结果
当异步线程处理完成,调用 requestPromise.notify()
通知对应的用户线程,表示已经处理完毕
存在问题
这段代码还存在一个问题,在第 3 步的时候,通过 while() 循环去弹出队列元素,加入到 list 集合
但是这里存在死循环的可能,如果队列一直在入队,并且入队的速度大于这里出队的速度,就会一直从队列中取出元素,list 中的元素不断增多,最终导致 OOM
// 3、判断队头元素是否为空,不为空的话,就将队列中的请求全部取出来,放到 List 集合中进行合并
while (queue.peek() != null) {
list.add(queue.poll());
}
因此这里需要设定一个批次,每次从 queue 中获取指定数量的请求,避免一直获取,优化后代码如下:
// 3、批次从队列中取出请求进行处理
int batchSize = queue.size();
for (int i = 0;i < batchSize; i ++) {
list.add(queue.poll());
}
这里先获取队列中的长度,再从队列中获取指定长度的请求,避免死循环
3、模拟并发用户请求
接下来,创建 10 个库存扣减请求任务,模拟并发请求
- 通过 CountDownLatch,先将任务提交到线程池,等所有任务都提交到线程池之后,再一起执行
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
KillDemo killDemo = new KillDemo();
// 1、开启异步处理线程
killDemo.mergeJob();
// 2、等待异步线程启动
Thread.sleep(2000);
List<Future<Result>> futureList = new ArrayList<>();
// 3、模拟用户并发请求,将 10 个请求都生成完之后,再执行 killDemo.operate() 操作
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
final Long orderId = i + 100L;
final Long userId = Long.valueOf(i);
Future<Result> future = executorService.submit(() -> {
countDownLatch.countDown();
countDownLatch.await(1, TimeUnit.SECONDS);
return killDemo.operate(new UserRequest(orderId, userId, 1));
});
futureList.add(future);
}
futureList.forEach(future -> {
try {
// 4、获取用户请求结果
Result result = future.get(300, TimeUnit.MILLISECONDS);
System.out.println(Thread.currentThread().getName() + ":客户端请求响应:" + result);
} catch (Exception e) {
e.printStackTrace();
}
});
}
这里将库存数设置为 stock = 6,但是并发有 10 个请求,因此,有 4 个请求会扣减失败,输出如下:
mergeThread:合并扣减库存:[RequestPromise{userRequest=UserRequest{orderId=105, userId=5, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=109, userId=9, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=106, userId=6, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=102, userId=2, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=101, userId=1, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=100, userId=0, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=107, userId=7, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=104, userId=4, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=103, userId=3, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=108, userId=8, count=1}, result=null}]
main:客户端请求响应:Result{success=true, msg='ok'}
main:客户端请求响应:Result{success=true, msg='ok'}
main:客户端请求响应:Result{success=true, msg='ok'}
main:客户端请求响应:Result{success=false, msg='库存不足'}
main:客户端请求响应:Result{success=false, msg='库存不足'}
main:客户端请求响应:Result{success=true, msg='ok'}
main:客户端请求响应:Result{success=true, msg='ok'}
main:客户端请求响应:Result{success=false, msg='库存不足'}
main:客户端请求响应:Result{success=false, msg='库存不足'}
main:客户端请求响应:Result{success=true, msg='ok'}
请求合并总结
库存扣减请求合并,需要将请求放入队列,并阻塞用户线程,之后异步线程去队列拉取请求,进行合并、以及后续处理,这里总结一下其中涉及的几个技术点
一、用户线程阻塞和唤醒:
通过 wait()
和 notify()
对用户线程进行阻塞和唤醒操作,但是这两个方法是在 Object 内部,因此需要创建一个对象,也就是 RequestPromise
对象
在 operate()
方法处理用户库存扣减请求时,创建了该对象,该对象封装了用户请求,之后将该对象入队,核心代码如下:
// 1、创建 RequestPromise
RequestPromise requestPromise = new RequestPromise(userRequest);
// 2、RequestPromise 入队
boolean enqueueSuccess = queue.offer(requestPromise, 100, TimeUnit.MILLISECONDS);
// 3、阻塞用户线程,等待处理
requestPromise.wait(200);
阻塞和唤醒流程如下图:
二:RequestPromise 入队需要 synchronized 加锁
这里先列举一下有问题的代码(将 queue.offer() 放在 synchronized 代码块外部),如下,在 operate()
方法中,希望将 RequestPromise 入队之后,调用 requestPromise.wait()
方法阻塞用户线程,但是 wait()
方法需要在 synchronized 代码块内部调用,因此使用 synchronized 包裹起来 wait() 方法
但是这样写会存在 并发问题: 如果 RequestPromise 在入队之后,立即被异步线程取出来,进行库存扣减,并执行 notify()
唤醒用户线程,但此时用户线程还没有调用 wait()
方法进行阻塞,因此每个用户线程都会阻塞等待 200ms 才会返回响应结果
public Result operate(UserRequest userRequest) throws InterruptedException {
// TODO 阈值判断
// TODO 队列的创建
RequestPromise requestPromise = new RequestPromise(userRequest);
/* ---并发异常代码块--- */
boolean enqueueSuccess = queue.offer(requestPromise, 100, TimeUnit.MILLISECONDS);
if (! enqueueSuccess) {
return new Result(false, "系统繁忙");
}
/* ---并发异常代码块--- */
synchronized (requestPromise) {
try {
requestPromise.wait(200);
if (requestPromise.getResult() == null) {
return new Result(false, "等待超时");
}
} catch (InterruptedException e) {
return new Result(false, "被中断");
}
}
return requestPromise.getResult();
}
因此需要将 queue.offer()
移入同步代码块,这样 wait()
和 notify()
都会竞争同一个 RequestPromise 对象上的锁,当用户线程将 RequestPromise 入队之后,调用 wait() 方法才会去释放对应的锁,之后异步线程处理队列中的请求,再通过 synchronized 拿到 RequestPromise 上的锁,并且调用 notify()
去唤醒对应的用户线程
数据一致性问题
如果有细心的同学可以发现,上边这块代码还少了一块内容
数据一致性问题: 当用户线程调用 wait()
方法阻塞 200ms 之后,如果在 200ms 到达之后,异步线程还没来得及处理用户的库存扣减请求,此时用户线程就会响应结果为等待超时(比如应用 full gc、异步线程报错,都会导致超时),也就是库存扣减失败。如果在用户线程响应失败之后,异步线程完成了库存的扣减,那么库存数据会出现不一致的问题!
因此在上一篇文章(mp.weixin.qq.com/s/C59KKURKZ… 库存流水 去保证库存的数据一致性
在这个 Demo 中的话,只需要在异步线程扣减库存成功的时候,记录库存流水,当上游发现库存扣减失败,就发送消息,表示库存扣减失败,此时需要去进行 库存数量的校准
发送的消息带有订单号,根据订单号查询是否存在库存流水,如果存在,则表示异步线程已经扣减了库存,需要进行库存的回滚
完整的 Demo 可见海哥 Github 仓库代码:github.com/JiHaiChanne…
隐藏小问题
在 Github 仓库最新的代码中,还存在一个这样的问题,在请求进行合并时,指定批次 batchSize = 3
进行合并,使用 queue.take()
来取出队列的请求,但是如果队列没有请求的话, take()
方法会阻塞等待
比如说,此时队列还有 1 个请求,取出 1 个之后,队列中没有请求了,导致异步处理线程阻塞在 take()
,因此刚取出来的请求也就无法处理,之后用户线程等待 200ms 超时,发现库存扣减失败。
虽然可以设置阈值,对于并发数比较少的商品,不走请求合并,但是既然走了请求合并,肯定是希望他可以成功的,存在问题代码如下:
public void mergeJob() {
new Thread(() -> {
while (true) {
int batchSize = 3;
// 获取指定数量的请求
for (int i = 0; i < batchSize; i++) {
try {
list.add(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
因此,对于 batchSize 的值可以设置为 Math.min(3, queue.size())
,这样当队列中请求数量不足 3 个的话,根据队列中的请求数量来获取即可:
public void mergeJob() {
new Thread(() -> {
while (true) {
int batchSize = Math.min(queue.size(), 3);
// 获取指定数量的请求
for (int i = 0; i < batchSize; i++) {
try {
list.add(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
参考资料:
- 极海 Channel:www.bilibili.com/video/BV1Hv…
- 代码仓库:github.com/JiHaiChanne…