分批查询超大数据量,避免JVM出现OOM,这样做就对了!

需求背景

开发中,经常有这样一种场景:需要查询很多的数据量出来,然后去做各种业务操作。对于刚入行的小伙伴来说,通常就是查询过多的数据加载到内存,没有对数据量做限制。

表面上看,在该表数据量不大,在机器配置足够高的情况下短时间内不会有什么大问题,但随着业务的增长,数据的增加,莫名其妙的在线上出现了OOM,很多人为之疑惑。

博主在企业工作的多年经历中,配合开发运维排查了大量线上OOM问题,由于开发人员水平的参次不齐,很多这种扫全表或一次查询过多数据到内存的操作,导致了线上机器 的OOM。

问题分析

所谓jvm的OOM,就是Out Of Memory,也就是在我们的内存中加载了过多对象,但又没办法回收,对于查询一张超大的表,不做任何限制,很容易就导致OOM。

不规范操作

select * from table;

然后查询到内存中,处理各种业务。

所以比较推荐类似的操作应该是:

select * from table limit 0,200;

通常情况能做到这一步的程序员才算是入了行的,但这种情况在数据量不大的情况下可能也不会有什么太大的问题,但处理过一定大数据量的同学会可能会发现一个问题,使用limit分页,在数据量大的情况下这种limit的性能是极低的。

select * from table limit 10000000,200;

如何你表的id主键使用这种连续自增id,在这种情况下可以通过这种方式处理:

select * from table where id > 0 and id < 201

代码实现

基于上面的分析思路,代码实现的差异不大,以下代码基于目前大家使用比较多的Mybatis-Plus提供代码示例:

/**
 * 定义所需要的参数
 */

@Data
@Builder
@RequiredArgsConstructor
public class BulkExecutorParam<T{
       /**
     * batch size
     */

    @Builder.Default
    private Integer batchSize = 500;

    /**
     * start page
     */

    @Builder.Default
    private Integer start = 1;

    /**
     * query wrapper
     */

    private Wrapper<T> queryWrapper;

    /**
     * query service
     */

    private IService<T> service;

    /**
     * is need multiple threads
     */

    @Builder.Default
    private Boolean isMultiThreaded = Boolean.TRUE;

    /**
     * parallelism for executors
     */

    @Builder.Default
    private Integer parallelism = 0;

    /**
     * count: if present,use it else use service.count()
     */

    @Builder.Default
    private Integer count = 0;

    /**
     * query function
     */

    private Function<Integer, List<T>> queryFunc;


    /**
     * execute consumer
     */

    private Consumer<List<T>> execConsumer;

    /**
     * thread pool
     */

    private ExecutorService executors;

    public ExecutorService getExecutors() {
        return this.parallelism > 0 ? Executors.newWorkStealingPool(this.parallelism) : Executors.newWorkStealingPool();
    }

}

核心执行类抽象:

/**
 * @author caoyong
 * @version 1.0.0
 **/

public class BulkExecutorUtil {

    /**
     * execute batch
     *
     * @param param    parameters that need
     * @param consumer consumer
     * @param <T>      execute type
     */

    public static <T> void execute(BulkExecutorParam<T> param, Consumer<List<T>> consumer) {
        param.setExecConsumer(consumer);
        submit(param, null);
    }

    /**
     * batch submit with result
     *
     * @param param    parameters that need
     * @param execFunc execute function
     * @param <T>      type that param
     * @param <R>      type that result
     * @return result of future
     */

    public static <T, R> List<Future<R>> submit(BulkExecutorParam<T> param, Function<List<T>, R> execFunc) {
        List<Future<R>> futures = new ArrayList<>();
        //query service
        IService<T> service = param.getService();
        //query wrapper
        Wrapper<T> queryWrapper = param.getQueryWrapper();
        int count = param.getCount() > 0 ? param.getCount() : service.count(queryWrapper);
        if (count == 0) {
            return futures;
        }
        Integer batchSize = param.getBatchSize();
        int pageCount = (count + batchSize - 1) / batchSize;
        //iterate all page
        IntStream.rangeClosed(param.getStart(), pageCount)
                .forEach(currentPage -> {

                    //support multi thread and can alternate execute thread pool
                    if (param.getIsMultiThreaded()) {
                        if (execFunc == null) {
                            //execute only consumer type task
                            param.getExecutors().execute(() -> {
                                List<T> records = getRecords(param.getQueryFunc(), service, queryWrapper, batchSize, currentPage);
                                if (CollectionUtil.isEmpty(records)) {
                                    return;
                                }
                                param.getExecConsumer().accept(records);
                            });
                        } else {
                            Future<R> submit = param.getExecutors().submit(() -> {
                                //execute submit task with futures return
                                List<T> records = getRecords(param.getQueryFunc(), service, queryWrapper, batchSize, currentPage);
                                return execFunc.apply(records);
                            });
                            futures.add(submit);
                        }
                    } else {
                        //with main thread execute
                        List<T> records = getRecords(param.getQueryFunc(), service, queryWrapper, batchSize, currentPage);
                        if (CollectionUtil.isEmpty(records)) {
                            return;
                        }
                        param.getExecConsumer().accept(records);
                    }
                });
        return futures;
    }

    /**
     * get records
     *
     * @param queryFunc    query function
     * @param service      query service
     * @param queryWrapper query wrapper
     * @param batchSize    batch execute size
     * @param currentPage  current page
     * @param <T>          param type
     * @return return type
     */

    private static <T> List<T> getRecords(Function<Integer, List<T>> queryFunc,
                                          IService<T> service,
                                          Wrapper<T> queryWrapper,
                                          Integer batchSize,
                                          int currentPage)
 
{
        //execute records:
        //if customizing supply current page for it else using mybatis-plus service and query wrapper
        return service == null ? queryFunc.apply(currentPage) :
                service.page(new Page<>(currentPage, batchSize), queryWrapper).getRecords();
    }
}

execute使用示例:

 private void batchProcessor(List<T> records) {
  //实现你的处理逻辑
 }
//bulk executor
LambdaQueryWrapper<T> wrapper = Wrappers.lambdaQuery(T.class);
        BulkExecutorParam<T> param = BulkExecutorParam.<T>builder()
                .service(this).queryWrapper(wrapper).build();
BulkExecutorUtil.execute(param, this::batchProcessor);

submit使用示例:

 private File batchProcessor(List<T> records) {
  //实现你的处理逻辑
  File file = new File("/your-path");
  return file;
 }
 //IO密集型的任务,核心线程数使用 CPU核数*2 + 1
int parallelism = Runtime.getRuntime().availableProcessors() * 2 + 1;
BulkExecutorParam<T> exeParam = BulkExecutorParam.<T>builder()
        .queryWrapper(param.getQueryWrapper())
        .batchSize(param.getBatchSize())
        .service(this)
        .parallelism(parallelism)
        .build();

List<Future<File>> futures = BulkExecutorUtil.submit(exeParam, this::batchProcessorFile);
if (CollectionUtil.isEmpty(futures)) {
    return downloadFilePath;
}
//iterator all future file
for (Future<File> future : futures) {
    try {
     //处理你的相关逻辑
        File file = future.get();
    } catch (Exception e) {
        log.error("future get error:{}", e.getMessage(), e);
    }
}

来源:blog.csdn.net/m0_37797991/

article/details/123015082


后端专属技术群

构建高质量的技术交流社群,欢迎从事编程开发、技术招聘HR进群,也欢迎大家分享自己公司的内推信息,相互帮助,一起进步!

文明发言,以交流技术职位内推行业探讨为主

广告人士勿入,切勿轻信私聊,防止被骗

分批查询超大数据量,避免JVM出现OOM,这样做就对了!
加我好友,拉你进群 

原文始发于微信公众号(Java面试题精选):分批查询超大数据量,避免JVM出现OOM,这样做就对了!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/146718.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!