Java 8之自定义Stream收集器

声明:本篇文章除部分引用外,均为原创内容,如有雷同纯属巧合,引用转载请附上原文链接与声明。
阅读条件:读本篇文章需掌握java集合基础知识、Stream基本语法与使用、函数式接口、lambda表达式、泛型知识、lombok插件使用、guava基础集合工具使用、java8双冒号的使用。
注:本文若包含部分下载内容,本着一站式阅读的想法,本站提供其对应软件的直接下载方式,但是由于带宽原因下载缓慢是必然的,建议读者去相关官网进行下载,若某些软件禁止三方传播,请在主页上通过联系作者的方式将相关项目进行取消。

参考引用

文章大纲

  • Stream基础收集功能演示
  • Stream收集原理分析
  • Collectors.toList()、Collectors.toMap()、Collectors.toSet()源码分析
  • Collectors.joining()源码分析
  • Collectors.reducing()源码分析
  • 自定义Stream收集器演示
一.Stream基本收集功能演示
    public void testStream() {
        Student s1 = new Student("s1", 1, 20);
        Student s2 = new Student("s2", 2, 24);
        Student s3 = new Student("s3", 3, 26);
        List<Student> students = Lists.newArrayList(s1, s2, s3);
        
        // 将students Stream化处理
        List<String> names = students.stream() 
                // 只取年龄大于23的元素,即保留:s2,s3
                .filter(stu -> stu.getAge() > 23)
                // 获取将Stream流中Student元素中的姓名组成新的Stream流
                .map(Student::getName) 
                // 将Stream流收集为List集合中
                .collect(Collectors.toList()); 
                
        // names 进行挨个打印 结果为:s2 , s3
        names.forEach(System.out::println);  
    }

    @Data
    @AllArgsConstructor
    public class Student {
        private String name;// 姓名
        private Integer stuNo; // 学号
        private Integer age; // 年龄
    }

在上述例子中,从结果的角度,该例是通过stream提供的api取得了students集合中的元素年龄大于23的姓名构成的新集合。从过程的角度主要分为以下几步:

  • 将然后调用students.stream()将其转换为Stream流准备进行后续处理;
  • 调用filter(Predicate predicate)将stream流中的元素进行过滤:仅保留age > 23的元素
  • 调用map(Function mapper)将stream流中的元素进行包装:取得student的name取代原Stream流中的元素
  • 调用collect(Collector collector)将Stream元素进行收集,按照传入的Collector规则进行收集:Collectors.toList()返回的Collector是将Stream流中的元素收集为List集合。
二.Stream收集原理分析

Collectors.toList()返回了一个已初始化完 成的Collector实例,查看Collectors.toList()方法,可以看到该方法实现如下:

public static <T> Collector<T, ?, List<T>> toList() {
        return new CollectorImpl<>(
            (Supplier<List<T>>) ArrayList::new,
            List::add,
            (left, right) -> { left.addAll(right); return left; },
            CH_ID);
    }

可以看到该方法返回了一个CollectorImpl对象,CollectorImpl类是Collectors的静态内部类,实现了Collector接口。接下来再查看CollectorImpl类及其对应的构造方法,如下:

    static class CollectorImpl<T, A, R> implements Collector<T, A, R> {
        private final Supplier<A> supplier;
        private final BiConsumer<A, T> accumulator;
        private final BinaryOperator<A> combiner;
        private final Function<A, R> finisher;
        private final Set<Characteristics> characteristics;

        CollectorImpl(Supplier<A> supplier,
                      BiConsumer<A, T> accumulator,
                      BinaryOperator<A> combiner,
                      Function<A,R> finisher,
                      Set<Characteristics> characteristics) {
            this.supplier = supplier;
            this.accumulator = accumulator;
            this.combiner = combiner;
            this.finisher = finisher;
            this.characteristics = characteristics;
        }

        CollectorImpl(Supplier<A> supplier,
                      BiConsumer<A, T> accumulator,
                      BinaryOperator<A> combiner,
                      Set<Characteristics> characteristics) {
            this(supplier, accumulator, combiner, castingIdentity(), characteristics);
        }
}

可以看到CollectorImpl构造需要三个泛型,我们将挨个讨论这三个泛型所指定的类型,且存在两个构造函数,但实际上都是调用的第一个构造函数进行的初始化,这里笔者先介绍第一个构造函数。可以看到初始化CollectorImpl需要5个参数,分别为:

  • Supplier supplier:提供元素容器 泛型 A 表示容器类型
  • BiConsumer<A, T> accumulator:将Stream流中的元素累加到容器中 泛型 T 表示Stream流中远射类型
  • BinaryOperator combiner:将多个容器合并为一个容器
  • Function<A,R> finisher:将最后累加完成后且合并为最后成的最后一个容器进行转换,转换为结果集,泛型 R 表示调用Collect方法后最后返回的类型,也就是最终的结果类型
  • Set characteristics:Collector接口中定义的枚举常量。可以先暂时不做考虑,后续将进行讨论。

Stream流中的元素可以理解为基本有序,其顺序与调用Stream方法对象中的顺序相同,再调用Collect方法时实则是将Stream流中的元素进行处理并收集。Collector实现由Stream流转换为 R 类型结果集。其具体流程如下:

  • 首先调用supplier.get(),创建新的元素容器A a,可能会多次调用创建多个元素容器,也可能只会创建一个,这取决于Stream流是否支持并行处理及其他情况
  • 对将Stream流中所有的元素调用accumulator.accept(A a,T t)方法,a为通过supplier创建的容器,t为Stream流中的元素
  • 在Stream流中每一个元素均调用了accumulator.accept(A a,T t)方法后即累加到元素容器中后,调用combiner.apply(A a1,A a2)方法合并两个容器并返回一个新的A类型容器,该容器将作为下一步combiner.apply(A a1,A a2)方法新的入参,直到合并到最后容器数量只有最后一个的情况下该步骤才结束
  • 调用finisher.apply(A a) 方法,对最终的A类型容器进行最后的处理,返回R类型的最终结果。

上述流程是表述了由Stream流转换为R类型最终结果的过程,直接理解可能较为抽象。下面将对Collectos.toList()、Collectos.toMap()、Collectos.toSet()源码进行分析,并结合上述流程进行讲解以加强理解。

三.Collectors.toList()、Collectors.toMap()、Collectors.toSet()源码分析

3.1 Collectors.toList()返回的Collector实现如下:

    public static <T> Collector<T, ?, List<T>> toList() {
        return new CollectorImpl<>(
                //  supplier
                (Supplier<List<T>>) ArrayList::new,
                // accumulator
                List::add,
                //combiner
                (left, right) -> { left.addAll(right); return left; },
                // finisher
                i -> (List<T>) i,
                // characteristics
                CH_ID); 
    }

结合大纲二讨论的流程,我们详细分析Collectors.toList()所返回的Collector的执行流程,从方法签名上看,该方法定义的Stream元素类型(T)为T ,而容器类型(A)定义为未知类型(从supplier的实现上看是List类型),而最终的结果类型(R)为List类型,我们分别分析其实现的supplier、accumulator、combiner、finisher

  • supplier
    实现为:(Supplier<List>) ArrayList::new;表明创建容器时,是通过该supplier的是返回了一个ArrayList作为容器
  • accumulator
    实现为:List::add(不明白双冒号的使用请参考本文的引用参考);表明在累加Stream元素进入容器时,是调用的Supplier提供的ArrayList容器调用add方法将Stream元素添加到ArrayList容器中。
  • combiner
    实现为:(left,right) -> {left.addAll(right); return left;};表明当两个容器进行合并时,将右边容器的全部元素加入左边容器,并且返回左边的容器参与下一次容器合并。
  • finisher
    实现为:i -> (List) i;表明对最后的A类型容器直接强制转换为List类型的,而A类型本质为ArrayList类型,所以直接转换是没有问题的。

根据大纲二所讨论流程详细讨论:

  • 首先调用supplier.get()方法得到一个或多个ArrayList容器(假设容器名为a,b,c...)
  • 对Stream流中的所有元素调用accumulator.apply(a,t),即a.add(t),将Stream流中的元素添加进容器a中
  • 在将Stream流中的元素均累加到容器中后(此时可能有多个容器,也可能只有一个,只初始化一个容器的情况下不会进入该流程,即不需要合并容器,直接进行下一步,这里假定存在多个容器,需要进行容器合并合并合并为一个),调用combiner.apply(a,b),在其具体实现为:a.addAll(b);return a;容器a被返回参与下一步合并操作,知道合并为只有最后一个容器(假定容器名为d)
  • 将最后的容器的类型转换为所需要的结果类型(List),因为容器d的类型为ArrayList类型,所以根据finisher的实现:i -> (List)i; 即:d -> (List)d ;即强制转换然后返回为最终结果,按照上述流程,该ArrayList中包含了Stream流中的所有元素,即是Stream流中的元素组成的集合。

思考:Collectors.toList()为什么需要如此实现Collector?
从Collector.toList()实现的Collect可以看出来,其功能是将Stream流中的元素累加到一个List集合中。他的实现原理如下:

  • 提供ArrayList容器(一个,或多个)
  • 将Stream中的元素添加到ArrayList容器中
  • 将生成的ArrayList容器进行合并,最终合并为一个ArrayList容器
  • 从实现上,该ArrayList容器就是所需要的结果,而finisher是将最后这个容器进行最后的变换,所以只需要强制转换以下即可

3.2 Collectors.toMap()

Collectors.toMap()返回了一个已初始化完成的Collector实例,查看Collectors.toMap()方法源码,可以看到该方法实现如下:

    public static <T, K, U, M extends Map<K, U>>
    Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
                                Function<? super T, ? extends U> valueMapper,
                                BinaryOperator<U> mergeFunction,
                                Supplier<M> mapSupplier) {
        BiConsumer<M, T> accumulator
                = (map, element) -> 
                map.merge(keyMapper.apply(element),
                          valueMapper.apply(element), mergeFunction);
        return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID);
    }

首先从入参上看需要4个参数(本质上是5个,finisher采用的默认的强制转换finisher),比toList(),toSet()多一个入参,原因就是因为需要对Stream流中的元素进行key,value操作。其次,可以看到,入参中的keyMapper和valueMapper实际上组成了accumulator,所以本质上和仍然是4个入参,accumulator需要完成的功能需要由keyMapper和valueMapper配合共同完成。其实现也如代码所示,针对Stream中的元素获取key,value,并且当key出现重复时执行mergeFunction策略。
在jdk 8中,toMap()其实有一个小bug,初始化toMap()时,有三个初始化方法如图所示。在上面的源码中展示的最终的调用函数。

在第一个初始化方法中,只需要传入keyMapper和valueMapper,实际上使用的是Collectors中默认的mergeFunction(throwingMerger()),该处理策略是只要有相同的key,则抛出异常;但是抛出的异常信息实际上提示的是重复的key下对应的值。

该bug在open JDK的bug编号为:JDK-8040892
Incorrect message in Exception thrown by Collectors.toMap(Function,Function)
在jdk 9中才进行的修复:JDK-8173464
Wrong exception message when collecting a stream to a map
Pallavi Sonal added a comment - 2017-01-27 00:42
This has been fixed in JDK 9 with JDK-8040892.
In JDK8 versions, it throws the wrong message i.e. instead of Duplicate key , it shows Duplicate key .

3.3 Collectors.toSet() 源码分析
要实现toSet(),原理同toList(),只需要改变初始化容器Supplier 为Set,Accumulator改为Set::add,Combiner改为::(left,right) -> left.addAll(right);return left;即可。这里就不再冗诉。

四.Collectors.joining()源码分析

Collectors.joining()有三个初始化方法,如图所示。

点开源码会发现,第三种方法初始化实际上是第一种。第二种与其他两种初始化方法不一致。内部实现只是简单的采用了StringBuilder作为supplier容。而第一种采用的StringJoiner来实现,相比较无参初始化方法,第一种稍微复杂一点,但实际上原理是一样的,这里笔者分析并介绍第一种初始化方法。其源码如下:

    public static Collector<CharSequence, ?, String> joining(CharSequence delimiter,CharSequence prefix,CharSequence suffix) {
        return new CollectorImpl<>(
                () -> new StringJoiner(delimiter, prefix, suffix),
                StringJoiner::add, StringJoiner::merge,
                StringJoiner::toString, CH_NOID);
    }

可以看到,Supplier容器为StringJoiner(delimiter,prefix,suffix),accumulator为StringJoiner::add,combiner为StringJoiner::merge,finisher为StringJoiner::toString。
依次查看上述方法;
supplier:

    public StringJoiner(CharSequence delimiter,
                        CharSequence prefix,
                        CharSequence suffix) {
        Objects.requireNonNull(prefix, "The prefix must not be null");
        Objects.requireNonNull(delimiter, "The delimiter must not be null");
        Objects.requireNonNull(suffix, "The suffix must not be null");
        // make defensive copies of arguments
        this.prefix = prefix.toString();
        this.delimiter = delimiter.toString();
        this.suffix = suffix.toString();
        this.emptyValue = this.prefix + this.suffix;
    }

accumulator :

    public StringJoiner add(CharSequence newElement) {
        prepareBuilder().append(newElement);
        return this;
    }

prepareBuilder:

    private StringBuilder prepareBuilder() {
        if (value != null) {
            value.append(delimiter);
        } else {
            value = new StringBuilder().append(prefix);
        }
        return value;
    }

combiner :

    public StringJoiner merge(StringJoiner other) {
        Objects.requireNonNull(other);
        if (other.value != null) {
            final int length = other.value.length();
            // lock the length so that we can seize the data to be appended
            // before initiate copying to avoid interference, especially when
            // merge 'this'
            StringBuilder builder = prepareBuilder();
            builder.append(other.value, other.prefix.length(), length);
        }
        return this;
    }

finisher :

    public String toString() {
        if (value == null) {
            return emptyValue;
        } else {
            if (suffix.equals("")) {
                return value.toString();
            } else {
                int initialLength = value.length();
                String result = value.append(suffix).toString();
                // reset value to pre-append initialLength
                value.setLength(initialLength);
                return result;
            }
        }
    }

可以看到,在Collector.join(delimiter,prefix,suffix)方法实际上是利用StrongJoinner来完成的将多个Charsequence合并为一个。需要注意的是,finisher在执行的时候,实际上是调用的StringJoiner的toString方法,当suffix存在时,需要保证(通过initialLength来实现)真实的value不要append suffix,这样就可以多次调用,结果都是正确的。同理Collector.join()方法,是StringBuilder来实现,更为简单。

五、Collectors.reducing()源码分析

待更新

六、自定义Stream收集器演示

待更新