多线程案例落地 - 库存扣减请求合并、库存一致性保证

首页 编程分享 PHP丨JAVA丨OTHER 正文

11来了 推荐 转载 编程分享 2024-11-30 22:05:28

简介 文章内容已经收录在《高级技术专家成长笔记》,欢迎订阅专栏! 从原理、系统设计出发,直击面试难点,实现更高维度的降维打击! 多线程案例落地 - 库存扣减请求合并、库存一致性保证 海哥也给出了合并库存扣减


文章内容已经收录在《高级技术专家成长笔记》,欢迎订阅专栏! 从原理、系统设计出发,直击面试难点,实现更高维度的降维打击!

多线程案例落地 - 库存扣减请求合并、库存一致性保证

海哥也给出了合并库存扣减请求的 Demo,是一个多线程的 Demo,对提升并发掌握很有帮助,这里也整理一篇文章进行分析!

参考 B 站 up 主《极海Channel》,视频链接:www.bilibili.com/video/BV1Hv…

背景

对于库存扣减流程中的优化,最主要的优化就是库存扣减请求的合并,海哥也给出了 Demo,这里将整体设计、演进思路整理出来

库存扣减请求整体合并思路:针对热门商品进行合并,因此需要判断商品并发数,如果超过阈值,则判定为比较热门的商品,进行库存扣减请求的合并,此时会将库存扣减请求提交到队列,并且将用户线程阻塞,等待异步线程处理,处理完成之后,通知用户线程响应结果即可

这里会对海哥给出的代码进行分析,并且整理评论区中给出的问题

库存扣减请求的合并,主要流程为:

  • 用户提交库存扣减请求
  • 将用户请求的库存扣减请求入队,用户线程阻塞等待 200ms,等待异步线程处理请求
  • 异步线程从队列中取出用户扣减请求,进行合并,再执行库存的扣减
  • 异步线程处理完之后,唤醒用户线程,并响应结果

接下来将各个步骤拆解开,来分析库存扣减合并的 Demo

1、库存扣减操作

先看库存扣减操作的实现,主要是用户请求进来之后,需要将用户请求加入到队列中,等待异步线程处理,此时用户线程阻塞等待处理,阻塞时间为 200ms

这里就涉及到了阻塞,采用了 wait() 来实现阻塞

对应代码如下,对于阈值判断、队列的创建,不是核心逻辑,先略过,这里主要关注用户请求的入队操作,以及阻塞等待操作如何实现:

// 定义阻塞队列,存放用户请求
private BlockingQueue<RequestPromise> queue = new LinkedBlockingQueue<>(10);

public Result operate(UserRequest userRequest) throws InterruptedException {
    // TODO 阈值判断
    // TODO 队列的创建
    
    // 1、创建 RequestPromise 封装用户请求,通过该对象,实现用户线程的阻塞和唤醒
    RequestPromise requestPromise = new RequestPromise(userRequest);

    synchronized (requestPromise) {
        // 2、用户请求入队
        boolean enqueueSuccess = queue.offer(requestPromise, 100, TimeUnit.MILLISECONDS);
        if (! enqueueSuccess) {
            return new Result(false, "系统繁忙");
        }
        try {
            // 3、用户请求阻塞 200ms,等待请求被处理
            requestPromise.wait(200);
            // 4、根据 Result 判断是否为用户等待 200ms 超时
            if (requestPromise.getResult() == null) {
                return new Result(false, "等待超时");
            }
        } catch (InterruptedException e) {
            return new Result(false, "被中断");
        }
    }
    return requestPromise.getResult();
}

核心有两个操作:

  • queue.offer() :将用户请求入队,之后异步线程会从队列取出请求,进行 合并处理
  • requestPromise.wait(200) :将用户线程阻塞 200ms,等待处理,如果 200ms 之内,异步线程仍然没有处理完毕,会判断 requestPromise.getResult() == null ,就算 等待超时 ,此时,直接返回结果为扣减失败即可

这里需要将入队操作放在 synchronized 同步代码块内(后边会详细解释):

这里假如将 queue.offer() 放在 synchronized 外部,那么假如用户请求刚加入队列之后,该请求就立即被异步线程处理,异步线程处理之后会通过 notify() 去唤醒这个用户线程,此时用户线程还没有执行 wait() 操作,也就是 notify() 先于 wait() 执行,导致用户线程只能阻塞 200ms 之后才能获取结果

因此通过 synchronized 主要是为了保证 wait()notify() 操作的串行执行。当请求加入队列之后,执行 wait() ,用户线程释放锁,之后异步线程才可以通过 synchronized 拿到做,再执行 notify() 操作

2、异步线程、合并请求处理

在请求进入队列之后,需要异步线程去队列中获取请求,并进行合并,合并之后进行库存的扣减:

  • 需要将该商品的多个扣减请求合并,比如 3 个请求,每个请求扣减 1 个库存 --------》合并为 1 个请求,扣减 3个库存,以此来减少数据库 IO 次数
// 模拟数据库中的库存值
private Integer stock = 6;

// 创建异步线程,对请求进行合并、处理
public void mergeJob() {
    new Thread(() -> {
        List<RequestPromise> list = new ArrayList<>();
        while (true) {
            // 1、不断从 queue 中获取请求
            if (queue.isEmpty()) {
                try {
                    // 2、每次 while 循环都睡眠 10ms,避免 CPU 一直空转
                    Thread.sleep(10);
                    continue;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            // 3、判断队头元素是否为空,不为空的话,就将队列中的请求全部取出来,放到 List 集合中进行合并
            while (queue.peek() != null) {
                list.add(queue.poll());
            }

            System.out.println(Thread.currentThread().getName() + ":合并扣减库存:" + list);
			// 4、将所有请求需要扣减的库存数相加,一次性扣减
            int sum = list.stream().mapToInt(e -> e.getUserRequest().getCount()).sum();
            // 5、如果库存数足够,则直接扣减
            if (sum <= stock) {
                stock -= sum;
                list.forEach(requestPromise -> {
                    requestPromise.setResult(new Result(true, "ok"));
                    synchronized (requestPromise) {
                        // 6、扣减成功之后,通过 notify() 唤醒阻塞的用户线程
                        requestPromise.notify();
                    }
                });
                list.clear();
                continue;
            }
            // 7、如果剩余库存数不足,需要循环 list,先扣减较大的库存数
            for (RequestPromise requestPromise : list) {
                int count = requestPromise.getUserRequest().getCount();
                if (count <= stock) {
                    stock -= count;
                    requestPromise.setResult(new Result(true, "ok"));
                } else {
                    // 8、如果库存数量不足,则响应为库存不足,并唤醒用户线程
                    requestPromise.setResult(new Result(false, "库存不足"));
                }
                // 9、唤醒用户线程
                synchronized (requestPromise) {
                    requestPromise.notify();
                }
            }
            list.clear();
        }
    }, "mergeThread").start();
}

代码分析

mergeJob() 中,启动了异步线程去处理请求,里边有几个实现细节需要注意

  • 避免 CPU 空转

这里异步线程需要不断去队列 queue 中取出请求进行处理,因此是在 while 循环中处理的

这里在 while 循环中,加入了 Thread.sleep(10) ,每次循环之后,让线程睡眠 10ms,就是避免 CPU 一直空转

  • 唤醒用户阻塞线程

在用户库存扣减请求进入时,该用户线程创建了一个 RequestPromise,之后 用户线程异步处理线程 通过这个 RequestPromise 进行线程间的通信

当用户线程将请求加入队列之后,调用 requestPromise.wait() 阻塞线程,等待处理结果

当异步线程处理完成,调用 requestPromise.notify() 通知对应的用户线程,表示已经处理完毕

存在问题

这段代码还存在一个问题,在第 3 步的时候,通过 while() 循环去弹出队列元素,加入到 list 集合

但是这里存在死循环的可能,如果队列一直在入队,并且入队的速度大于这里出队的速度,就会一直从队列中取出元素,list 中的元素不断增多,最终导致 OOM

// 3、判断队头元素是否为空,不为空的话,就将队列中的请求全部取出来,放到 List 集合中进行合并
while (queue.peek() != null) {
    list.add(queue.poll());
}

因此这里需要设定一个批次,每次从 queue 中获取指定数量的请求,避免一直获取,优化后代码如下:

// 3、批次从队列中取出请求进行处理
int batchSize = queue.size();
for (int i = 0;i < batchSize; i ++) {
    list.add(queue.poll());
}

这里先获取队列中的长度,再从队列中获取指定长度的请求,避免死循环

3、模拟并发用户请求

接下来,创建 10 个库存扣减请求任务,模拟并发请求

  • 通过 CountDownLatch,先将任务提交到线程池,等所有任务都提交到线程池之后,再一起执行
public static void main(String[] args) throws InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    KillDemo killDemo = new KillDemo();
    // 1、开启异步处理线程
    killDemo.mergeJob();
    // 2、等待异步线程启动
    Thread.sleep(2000);
    List<Future<Result>> futureList = new ArrayList<>();
    // 3、模拟用户并发请求,将 10 个请求都生成完之后,再执行 killDemo.operate() 操作
    CountDownLatch countDownLatch = new CountDownLatch(10);
    for (int i = 0; i < 10; i++) {
        final Long orderId = i + 100L;
        final Long userId = Long.valueOf(i);
        Future<Result> future = executorService.submit(() -> {
            countDownLatch.countDown();
            countDownLatch.await(1, TimeUnit.SECONDS);
            return killDemo.operate(new UserRequest(orderId, userId, 1));
        });
        futureList.add(future);
    }
    futureList.forEach(future -> {
        try {
            // 4、获取用户请求结果
            Result result = future.get(300, TimeUnit.MILLISECONDS);
            System.out.println(Thread.currentThread().getName() + ":客户端请求响应:" + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
}

这里将库存数设置为 stock = 6,但是并发有 10 个请求,因此,有 4 个请求会扣减失败,输出如下:

mergeThread:合并扣减库存:[RequestPromise{userRequest=UserRequest{orderId=105, userId=5, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=109, userId=9, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=106, userId=6, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=102, userId=2, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=101, userId=1, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=100, userId=0, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=107, userId=7, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=104, userId=4, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=103, userId=3, count=1}, result=null}, RequestPromise{userRequest=UserRequest{orderId=108, userId=8, count=1}, result=null}]
main:客户端请求响应:Result{success=true, msg='ok'}
main:客户端请求响应:Result{success=true, msg='ok'}
main:客户端请求响应:Result{success=true, msg='ok'}
main:客户端请求响应:Result{success=false, msg='库存不足'}
main:客户端请求响应:Result{success=false, msg='库存不足'}
main:客户端请求响应:Result{success=true, msg='ok'}
main:客户端请求响应:Result{success=true, msg='ok'}
main:客户端请求响应:Result{success=false, msg='库存不足'}
main:客户端请求响应:Result{success=false, msg='库存不足'}
main:客户端请求响应:Result{success=true, msg='ok'}

请求合并总结

库存扣减请求合并,需要将请求放入队列,并阻塞用户线程,之后异步线程去队列拉取请求,进行合并、以及后续处理,这里总结一下其中涉及的几个技术点

一、用户线程阻塞和唤醒:

通过 wait()notify() 对用户线程进行阻塞和唤醒操作,但是这两个方法是在 Object 内部,因此需要创建一个对象,也就是 RequestPromise 对象

operate() 方法处理用户库存扣减请求时,创建了该对象,该对象封装了用户请求,之后将该对象入队,核心代码如下:

// 1、创建 RequestPromise
RequestPromise requestPromise = new RequestPromise(userRequest);

// 2、RequestPromise 入队
boolean enqueueSuccess = queue.offer(requestPromise, 100, TimeUnit.MILLISECONDS);

// 3、阻塞用户线程,等待处理
requestPromise.wait(200);

阻塞和唤醒流程如下图:

二:RequestPromise 入队需要 synchronized 加锁

这里先列举一下有问题的代码(将 queue.offer() 放在 synchronized 代码块外部),如下,在 operate() 方法中,希望将 RequestPromise 入队之后,调用 requestPromise.wait() 方法阻塞用户线程,但是 wait() 方法需要在 synchronized 代码块内部调用,因此使用 synchronized 包裹起来 wait() 方法

但是这样写会存在 并发问题: 如果 RequestPromise 在入队之后,立即被异步线程取出来,进行库存扣减,并执行 notify() 唤醒用户线程,但此时用户线程还没有调用 wait() 方法进行阻塞,因此每个用户线程都会阻塞等待 200ms 才会返回响应结果

public Result operate(UserRequest userRequest) throws InterruptedException {
    // TODO 阈值判断
    // TODO 队列的创建
    RequestPromise requestPromise = new RequestPromise(userRequest);
    /* ---并发异常代码块--- */
    boolean enqueueSuccess = queue.offer(requestPromise, 100, TimeUnit.MILLISECONDS);
    if (! enqueueSuccess) {
        return new Result(false, "系统繁忙");
    }
    /* ---并发异常代码块--- */
    synchronized (requestPromise) { 
        try {
            requestPromise.wait(200);
            if (requestPromise.getResult() == null) {
                return new Result(false, "等待超时");
            }
        } catch (InterruptedException e) {
            return new Result(false, "被中断");
        }
    }
    return requestPromise.getResult();
}

因此需要将 queue.offer() 移入同步代码块,这样 wait()notify() 都会竞争同一个 RequestPromise 对象上的锁,当用户线程将 RequestPromise 入队之后,调用 wait() 方法才会去释放对应的锁,之后异步线程处理队列中的请求,再通过 synchronized 拿到 RequestPromise 上的锁,并且调用 notify() 去唤醒对应的用户线程

数据一致性问题

如果有细心的同学可以发现,上边这块代码还少了一块内容

数据一致性问题: 当用户线程调用 wait() 方法阻塞 200ms 之后,如果在 200ms 到达之后,异步线程还没来得及处理用户的库存扣减请求,此时用户线程就会响应结果为等待超时(比如应用 full gc、异步线程报错,都会导致超时),也就是库存扣减失败。如果在用户线程响应失败之后,异步线程完成了库存的扣减,那么库存数据会出现不一致的问题!

因此在上一篇文章(mp.weixin.qq.com/s/C59KKURKZ… 库存流水 去保证库存的数据一致性

在这个 Demo 中的话,只需要在异步线程扣减库存成功的时候,记录库存流水,当上游发现库存扣减失败,就发送消息,表示库存扣减失败,此时需要去进行 库存数量的校准

发送的消息带有订单号,根据订单号查询是否存在库存流水,如果存在,则表示异步线程已经扣减了库存,需要进行库存的回滚

完整的 Demo 可见海哥 Github 仓库代码:github.com/JiHaiChanne…

隐藏小问题

在 Github 仓库最新的代码中,还存在一个这样的问题,在请求进行合并时,指定批次 batchSize = 3 进行合并,使用 queue.take() 来取出队列的请求,但是如果队列没有请求的话, take() 方法会阻塞等待

比如说,此时队列还有 1 个请求,取出 1 个之后,队列中没有请求了,导致异步处理线程阻塞在 take() ,因此刚取出来的请求也就无法处理,之后用户线程等待 200ms 超时,发现库存扣减失败。

虽然可以设置阈值,对于并发数比较少的商品,不走请求合并,但是既然走了请求合并,肯定是希望他可以成功的,存在问题代码如下:

public void mergeJob() {
    new Thread(() -> {
        while (true) {
			
            int batchSize = 3;
            // 获取指定数量的请求
            for (int i = 0; i < batchSize; i++) {
                try {
                    list.add(queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

因此,对于 batchSize 的值可以设置为 Math.min(3, queue.size()) ,这样当队列中请求数量不足 3 个的话,根据队列中的请求数量来获取即可:

public void mergeJob() {
    new Thread(() -> {
        while (true) {
			
			int batchSize = Math.min(queue.size(), 3);
            // 获取指定数量的请求
            for (int i = 0; i < batchSize; i++) {
                try {
                    list.add(queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

参考资料:

转载链接:https://juejin.cn/post/7442175081424732210


Tags:


本篇评论 —— 揽流光,涤眉霜,清露烈酒一口话苍茫。


    声明:参照站内规则,不文明言论将会删除,谢谢合作。


      最新评论




ABOUT ME

Blogger:袅袅牧童 | Arkin

Ido:PHP攻城狮

WeChat:nnmutong

Email:nnmutong@icloud.com

标签云