用流收集数据-Java 8

导读:本篇文章讲解 用流收集数据-Java 8,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

概述

流可以用类似于数据库的操作帮助你处理集合。你可以把Java 8的流看做花哨又懒惰的数据集迭代器。它们支持两种类型的操作:中间操作(如filter或map)和终端操作(如count、findFirst、forEach和reduce)。中间操作可以链接起来,将一个流转换为另一个流。这些操作不会消耗流,其目的是建立一个流水线。与此相反,终端操作会消耗流,产生一个最终结果,例如返回流中的最大元素。它们通常可以通过优化流水线来缩短计算时间。
我们之前用过collect终端操作了,当时主要是用来把Stream中所有的元素结合成一个List。在本章中,你会发现collect是一个归约操作,就像reduce一样可以接受各种做法作为参数,将流中的元素累积成一个汇总结果。具体的做法是通过定义新的Collector接口来定义的,因此区分Collection、Collector和collect是很重要的。
下面是一个收集器的例子。想象一下,你有一个由Transaction构成的List,并且想安装名义货币进行分组。在没有Lambda的Java里,哪怕像这种简单的用例实现起来都很啰嗦,就像下面这样。

        for (Transaction transaction : transactions) {
            final String currency = transaction.getCurrency();
            List<Transaction> list = transactionMap.get(currency);
            if (list == null) {
                list = new ArrayList<>();
                transactionMap.put(currency, list);
            }
            list.add(transaction);
        }

如果你是一位经验丰富的java程序员,写这种东西可能挺顺手的,不过你必须承认,做这么简单的一件事就得写很多代码。更糟糕的是,读起来比写起来还费劲!代码的目的并不容易看出来,尽管换做白话很直截了当:“把列表中的交易按货币分组”。Stream中的collect方法有一个更通用的Collector参数,你就可以用一句话实现完全相同的功能。

Map<String, List<Transaction>> collect = transactions.stream().collect(Collectors.groupingBy(Transaction::getCurrency));

这一比,差的还真多。

收集器简介

前一个例子清楚的展示了函数式编程相对于指令式编程的一个主要优势:你只需要支持希望的结果一一“做什么”,而不用操作执行的步骤一一“如何做”。在上一个例子里,传递给collect方法的参数是Collector接口的一个实现,也就是给Stream中元素做汇总的方法。之前说过的Collectors.toList()只是说“按顺序给每个元素生成一个列表”;在本例中,Collectors.groupingBy说的是“生成一个Map,它的键是货币桶,值是桶中的那些元素的列表”。
要是做多级分组,指令式和函数式之间的区别就会更加明显:由于需要好多层嵌套循环和条件,指令式代码很快就变的更难阅读、更难维护、更难修改。相比之下,函数式版本只要在加上一个收集器就可以轻松增加功能了。

收集器用于高级归约

刚刚的结论又引出了一个优秀的函数式API设计的另一个好处:更易复合和重用。收集器非常有用,因为用他可以简洁而灵活地定义collect用来生成结果集合的标准。更具体地说,对流调用collect方法将对流中的元素触发一个归约操作(由Collector来参数化)。
一般来说,Collector会对元素应用一个转换函数(很多时候是不体现任何效果的恒等转换,例如toList),并将结果累积在一个数据结构中,从而产生这一过程的最终输出。例如,在前面所示的交易分组的例子中,转换函数提取了每笔交易的货币,随后使用货币作为键,将交易本身累积在生成的Map中。
Collector接口中方法的实现决定了如何对流执行归约操作。Collectors实用类提供了很多静态工厂方法,可以方便的创建常见收集器的实例,只要拿来用就可以了。

预定义收集器

我们来探讨预定义收集器的功能,也就是那些可以从Collectors类提供的工厂方法(例如groupingBy)创建的收集器。它们主要提供三大功能:

  • 将流元素归约和汇总为一个值
  • 元素分组
  • 元素分区
    我们先来看看可以进行归约和汇总的收集器。它们在很多场合下都很方便,比如前面例子中提到的一系列校验的总交易额。
    然后你将看到如何对流中的元素进行分组,同时把前一个例子推广到多层次分组,或把不同的收集器结合起来,对每个子组进行进一步归约操作。我们还将谈到分组的特殊的情况“分区”,即使用谓词(返回一个布尔值的单参数函数)作为分组函数。

归约和汇总

查找流中的最大值和最小值

假如你想要找出菜单中热量最高的菜。你可以使用两个收集器,Collectors.maxBy和Collectors.minBy,来计算流中的最大或最小值。这两个收集器接收一个Comparator参数来比较流中的元素。你可以创建一个Comparator来根据所含热量对菜肴进行比较,并把它传递给Collectors.maxBy。

        final Comparator<Dish> dishComparator = Comparator.comparingInt(Dish::getCalories);
        final Optional<Dish> mostCaloriesDish = menu.stream().collect(Collectors.maxBy(dishComparator));

你可能在想Optional是怎么回事。要回答这个问题,我们需要问“要是menu为空怎么办”。那就没有返回的菜了!Java 8引入了Optional,它是一个容器,可以包含也可以不包含值。这里它完美地代表了可能也可能不返回菜肴的情况。

汇总

Colloctors类专门为汇总提供了一个工厂方法:Collectors.summingInt。它可接受一个把对象映射为求和所需int的函数,并返回一个收集器;该收集器在传递给普通的collect方法后即执行我们需要的汇总操作。

final Integer totalCalories = menu.stream().collect(Collectors.summingInt(Dish::getCalories));

Collectors.summingLong和Collectors.summingDouble方法的作用完全一样,可以用于求和字段为long或double的情况。
到目前为止,你已经看到了如何使用收集器给流中的元素计数,找到这些元素数值属性的最大值和最小值,以及计数其总和和平均值。不过很多时候,你可能想要得到两个或更多这样的结果,而且你希望只需异常操作就可以完成。在这种情况下,你可以使用summarizingInt工厂方法返回的收集器。

final IntSummaryStatistics summaryStatistics = menu.stream().collect(Collectors.summarizingInt(Dish::getCalories));

同样,相应的summarizingLong和summarizingDouble工厂方法有相关的LongSummaryStatistics和DoubleSummaryStatistics类型,适用于收集的属性是原始类型long、double的情况。

连接字符串

joining工厂方法返回的收集器会把对流中每一个对象应用toString方法得到的所有字符串连接成一个字符串。这意味着你把菜单中所有菜肴的名字连接起来。

final String menuNames = menu.stream().map(Dish::getName).collect(Collectors.joining());

请注意,joining在内部使用了StringBuilder来把生成的字符串逐个追加起来。
但这样生成的字符串可读性并不好。幸好,joining工厂方法有一个重载版本可以接收元素之间的分解符,这样你就可以得到一个逗号分隔符的菜肴名称列表。

final String menuNames = menu.stream().map(Dish::getName).collect(Collectors.joining(", "));

下面我们会展示为什么所有这种形式的归约过程,其实都是Collectors.reducing工厂方法提供的更广义归约收集器的特殊情况。

广义的归约汇总

事实上,我们已经讨论的所有收集器,都是一个可以用reducing工厂方法定义的归约过程的特殊情况而已。Collectors.reducing工厂方法是所有这些特殊情况的一般化。可以说,先前讨论的案例仅仅是为了方便程序员而已(请记住,方便程序员和可读性是头等大事)。

分组

一个常见的数据库操作是根据一个或多个属性对集合中的项目进行分组。
我们来看这个功能的第二个例子:假设你要把菜单中的菜按照类型进行分类。

Map<String, List<Dish>> dishesByType = menu.stream().collect(Collectors.groupingBy(Dish::getType));

这里,你给groupingBy方法传递了一个Function(以方法引用的形式),它提取了流中每一道Dish的Dish.Type。我们把这个Function叫做分类函数,因为它用来把流中的元素分成不同的组。分组操作的结果是一个Map,把分组函数返回的值作为映射的键,把流中所有具有这个分类值的元素的列表作为对应的映射值。
但是,分类函数不一定像方法引用那样可用,因为你想用以分类的条件可能比较简单的属性访问器要复杂。例如,你可能想把热量不到40卡路里的菜划分为“低热量”(diet),热量400到700卡路里的菜划分为“普通”(normal),高于700卡路里的划分为“高热量”(fat)。由于Dish类的作者没有把这个操作写成一个方法,你无法使用方法引用,但你可以把这个逻辑写成Lambda表达式:

Map<CaloricLevel, List<Dish>> caloricLevelListMap = menu.stream().collect(Collectors.groupingBy(dish -> {
            if (dish.getCalories() <= 400) {
                return CaloricLevel.DIET;
            } else if (dish.getCalories() <= 700) {
                return CaloricLevel.NORMAL;
            } else {
                return CaloricLevel.FAT;
            }
        }));

多级分组

要实现多级分组,我们可以使用一个由双参数版本的Collectors.groupingBy工厂方法创建的收集器,它除了普通的分类函数之外,还可以接受collector类型的第二个参数。那么要进行二级分组的话,我们可以把一个内存的groupingBy传递给外层的groupingBy,并定义一个为流中项目分类的二级标准。

Map<String, Map<CaloricLevel, List<Dish>>> dishesMap = menu.stream().collect(Collectors.groupingBy(Dish::getType, Collectors.groupingBy(dish -> {
            if (dish.getCalories() <= 400) {
                return CaloricLevel.DIET;
            } else if (dish.getCalories() <= 700) {
                return CaloricLevel.NORMAL;
            } else {
                return CaloricLevel.FAT;
            }
        })));

按子组收集数据

传递给第一个groupingBy的第二个收集器可以是任何类型,而不一定是一个groupingBy。例如,要数一数菜单中每类菜有多少个,可以传递counting收集器作为groupingBy收集器的第二个参数:

Map<String, Long> dishesMap = menu.stream().collect(Collectors.groupingBy(Dish::getType, Collectors.counting()));

分区

分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称为分区函数。分区函数返回一个布尔值,这意味着得到分组Map的键类型是Boolean,于是它最多可以分为两组一一true是一组,false是一组。

Map<Boolean, List<Dish>> dishesMap = menu.stream().collect(Collectors.partitioningBy(Dish::isVegetarian));

收集器接口

Collector接口包含了一系列方法,为实现具体的归约操作(及收集器)提供了范本。这意味着你可以为Collector接口提供自己的实现,从而自由地创建自定义归约操作。
首先我们在下面的列表中看看Collector接口的定义,它列出了接口的签名以及声明的五个方法。

public interface Collector<T,  A,  R> {
	supplier<A> supplier();
	BiConsumer<A, T> accumulator();
	Function<A, R> finisher();
	BinaryOperator<A> combiner();
	Set<Characteristics> characteristics();
}
  • T是流中要收集的元素泛型。
  • A是累加器的类型,累加器是在收集过程中用于累积部分结果的对象。
  • R是收集操作得到的对象(通常但并不一定是集合)的类型。
    例如,你可以实现一个ToListCollector类,将Stream中的所有元素收集到一个List里,签名如下:
public calss ToListCollector<T> implements Collector<T, List<T>, List<T>>

我们很快就会澄清,这里用于累积的对象也将是收集过程中的最终结果。

理解Collector接口声明的方法

现在我们可以一个个来分析Collector接口声明的五个方法了。通过分析,你会注意到,前4个方法都会返回一个被collect方法调用的函数,而第五个方法characteristics则提供了一系列特征,也就是一个提示列表,告诉collect方法在执行归约操作的时候可以应用哪些优化(比如并行化)。

建立新的结果容器:supplier方法

supplier方法必须返回一个结果为空的Supplier,也就是一个无参函数,在调用时它会创建一个空的累加器实例,供数据收集过程使用。很明显,对于将累加器本身作为结果返回的收集器,比如我们的ToListCollector,在对空流执行操作的时候,这个空的累加器也代表了收集过程的结果。在我们的ToListCollector中,supplier返回一个空的list,如下所示:

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

将元素添加到结果容器:accumulator方法

accumulator方会返回执行归约操作的函数。当遍历到流中第n个元素时,这个函数执行是会有两个参数:保存归约结果的累加器(已收集了流中的前n-1个项目),还有第n个元素本身。该函数返回将返回void,因为累加器是原位更新,即函数的执行改变了它的内部状态以体现遍历的元素的效果。对于ToListCollector,这个函数仅仅会把当前项目添加至已经遍历过的项目的列表:

    public BiConsumer<List<T>, T> accumulator() {
        return List::add;
    }

对结果容器应用最终转变:finisher方法

在遍历完流后,finisher方法必须返回在累积过程的最后要调用的一个函数,以便累加器对象转换为整个集合操作的最终结果。通常,就像ToListCollector的情况一样,累机器对象恰好符合预期的最终结果,因此无需进行任何转换。所以finisher方法只需返回identity函数:

    public Function<List<T>, List<T>> finisher() {
        return Function.identity();
    }

这三个方法已经足以对流进行顺序归约,至少从逻辑上看可以进行。实践中的实现细节可能还要复杂点,一方面是因为流的延迟性质,可能在collect操作之前还需要完成其他中间操作的流水线,另一方面则是理论上可能要进行并行归约。
在这里插入图片描述

合并两个结果容器:combiner方法

四个方法中的最后一个一一一combiner方法会返回一个供归约操作使用的函数,它定义了对流的各个子部分进行并行处理时,各个子部分归约所得的累加器要如何合并。对应toList而言,这个方法的实现非常简单,只要把从流的第二个部分收集到的元素列表加到遍历第一部分时得到的列表后面就行了。

    public BinaryOperator<List<T>> combiner() {
        return (list1, list2) -> {
            list1.addAll(list2);
            return list1;
        };
    }

有了第四个方法,就可以对流进行并行归约了。它会用到java7中引入的分支/合并关键和Spliterator抽象。
过程如下:

  • 原始流会以递归方法拆分为子流,直到定义流是否需要进一步拆分的一个条件为非(如果分布式工作单位太小,并行计算往往比顺序计算要慢,而且要是生成的并行任务比处理器内核数多很多的话就毫无意义了)。
  • 现在,所有的子流都可以并行处理,即对每个子流应用顺序归约算法。
  • 最后,使用收集器combiner方法返回的函数,将所有的部分结果两两合并。这时会把原始流每次拆分时得到的子流对应的结果合并起来。

characteristics方法

最后一个方法一一一characteristics会返回一个不可变的Characteristics集合,它定义了收集器的行为一一一尤其是关于流是否可以并行归约,以及可以使用哪些优化的提示。Characteristics是一个包含三个元素的枚举。

  • UNORDERED一一归约结果不受流中元素的遍历顺序和累积顺序的影响。
  • CONCURRENT一一accumulator函数可以从多个线程同时调用,且该收集器可以并行归约流。当标记为UNORDERED是才起作用,因此它仅在用于无序数据源时才可以并行归约。
  • IDENTITY_FINISH一一这表明完成器方法返回的函数是一个恒等函数,可以跳过。这种情况下,累加器对象将会直接用作归约过程的最终结果。这也意味着,将累加器A不加检查地转换为结果R是安全的。
    我们迄今开发的ToListCollector是IDENTITY_FINISH的,因为用来累积流中元素的List已经是我们要的最终结果,用不着进一步转换了,但它并不是UNORDERED,因为用在有序流上的时候,我们还是希望顺序能够保留在得到的List中。最后,它是CONCURRENT的,但我们刚才说过了,仅仅在背后的数据源是无序时才会并行处理。

全部融合到一起

public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {
    @Override
    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    @Override
    public BiConsumer<List<T>, T> accumulator() {
        return List::add;
    }

    @Override
    public BinaryOperator<List<T>> combiner() {
        return (list1, list2) -> {
            list1.addAll(list2);
            return list1;
        };
    }

    @Override
    public Function<List<T>, List<T>> finisher() {
        return Function.identity();
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT));
    }
}

请注意,这个实现与Collectors.toList方法并不完全相同,但区别仅仅是一些小的优化,这些优化的一个主要方面是Java API所提供的收集器在需要返回空列表是使用了Collections.emptyList()这个单例。这意味着它可安全的替代原生Java,来收集菜单流中的所有Dish的列表。

List<Dish> dishes = menuStream.collect(new ToListCollector<Dish>());

这个实现和标准的

List<Dish> dishes = menuStream.collect(Collectors.toList());

构造之间的其他差异在于toList是一个工厂,而ToListCollector必须用new来实例化。

进行自定义收集而不去实现Collector

对于IDENTITY_FINISH的收集操作,还有一种方法可以得到同样的结果而无需从头实现新的Collector接口。Stream有一个重载的collect方法可以接收另外三个函数一一supplier、accumulator和combiner,其语义和Collector接口的响应方法返回的函数完全相同。所以比如说,我们可以像下面这样把菜肴流中的元素收集到一个List中:

List<Dish> dishes = menuStream.stream().collect(
                ArrayList::new,
                List::add,
                List::addAll
        );

我们认为,这第二种形式虽然比自己实现Collector更为紧凑和简洁,却不那么易读。此外,以恰当的类来实现自己的自定义收集器有助于重用并可避免代码重复。另外值得注意的是,这第二个collect方法不能传递任何characteristics,所以它永远都是一个IDENTITY_FINISH和CONCURRENT但并非UNORDERED的收集器。

collector总结

collector中的supplier、accumulator、finisher分别对应终端Sink(ReducingSink)中的begin、accept、end操作。
实际上终端操作collect就是根据collector来创建终端Sink,最后执行元素循环操作。
具体信息可以参考《Java8流源码解析-串行流

    public static <T, I> TerminalOp<T, I>
    makeRef(Collector<? super T, I, ?> collector) {
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        BiConsumer<I, ? super T> accumulator = collector.accumulator();
        BinaryOperator<I> combiner = collector.combiner();
        class ReducingSink extends Box<I>
                implements AccumulatingSink<T, I, ReducingSink> {
            @Override
            public void begin(long size) {
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                accumulator.accept(state, t);
            }

            @Override
            public void combine(ReducingSink other) {
                state = combiner.apply(state, other.state);
            }
        }
        return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }

            @Override
            public int getOpFlags() {
                return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                       ? StreamOpFlag.NOT_ORDERED
                       : 0;
            }
        };
    }

小结

  • collect是一个终端操作,它接受的参数是将流中元素累积到汇总结果的各种方式(称为收集器)。
  • 预定义收集器包括将流元素归约和汇总到一个值,例如计算最小值、最大值或平均值。
  • 预定义收集器可以用groupingBy对流中元素进行分组,或用partitioningBy进行分区。
  • 收集器可以高效的复合起来,进行多级分组、分区和归约。
  • 你可以实现Collector接口中定义的方法来开发你自己的收集器。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/100153.html

(0)
小半的头像小半

相关推荐

半码博客——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!