In this part, a summary of Stream API in Java 8 is introduced. Although there are various tutorials in the Web, this text attempts to highlight the important points. The previous part, Functional Programming, addressed functional programming and Lambda paradigm in Java 8.

Stream

Stream class, itself, is a Java interface with some static methods to create a stream. Some other classes like collections has a specific method to return a Stream. The following sample demonstrates some of Stream API, and the following sections describe those APIs.

The APIs can be divided into three groups

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import org.junit.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static org.junit.Assert.assertEquals;

public class TestStream {

  @Test
  public void testFibonacci() {
    Stream<int[]> iterate;

    iterate = Stream.iterate(new int[]{1, 1}, n -> new int[]{n[1], n[0] + n[1]});
    int nth = iterate
      .peek(n -> System.out.printf("Debug: %s \n", Arrays.toString(n)))
      .limit(5)
      .reduce((a, b) -> b)
      .orElse(new int[]{0, 0})[1];
    assertEquals(8, nth);

    iterate = Stream.iterate(new int[]{1, 1}, n -> new int[]{n[1], n[0] + n[1]});
    List<Integer> list = iterate
      .limit(5)
      .map(n -> n[1])
      //.collect(ArrayList::new, ArrayList::add, ArrayList::addAll)
      .collect(Collectors.toList());
    assertEquals(list, Arrays.asList(1, 2, 3, 5, 8));
  }

  @Test
  public void test_Files_FlatMap_Distinct_Sorted_Reduction() throws IOException {
    final String content = "test01 passed\ntest02 passed\ntest11 failed";
    final String grepped = "test01 passed\ntest11 failed";

    final List<String> words =
      Arrays.asList("test01", "passed", "test02", "passed", "test11", "failed");

    final List<String> distinctWords =
      Arrays.asList("test01", "passed", "test02", "test11", "failed");

    final List<String> sortedDistinctWords =
      Arrays.asList("test11", "test02", "test01", "passed", "failed");

    final Path file = Files.createTempFile("__", "__");
    Files.write(file, content.getBytes());

    // Grepping lines containing '1'
    try (Stream<String> lines = Files.lines(file)) {
      String result = lines
        .filter(line -> line.contains("1"))
        .collect(Collectors.joining("\n"));
      assertEquals(grepped, result);
    }

    // List of words
    try (Stream<String> lines = Files.lines(file)) {
      List<String> result = lines
        .flatMap(line -> Stream.of(line.split("\\s")))
        .collect(Collectors.toList());
      assertEquals(words, result);
    }

    // List of distinct words
    try (Stream<String> lines = Files.lines(file)) {
      List<String> result = lines
        .flatMap(line -> Stream.of(line.split("\\s")))
        .distinct()
        .collect(Collectors.toList());
      assertEquals(distinctWords, result);
    }

    // List of distinct & descending-sorted words
    try (Stream<String> lines = Files.lines(file)) {
      List<String> result = lines
        .flatMap(line -> Stream.of(line.split("\\s")))
        .distinct()
        .sorted(Comparator.reverseOrder())
        .collect(Collectors.toList());
      assertEquals(sortedDistinctWords, result);
    }

    // List of distinct & descending-sorted words
    try (Stream<String> lines = Files.lines(file)) {
      String result = lines
        .flatMap(line -> Stream.of(line.split("\\s")))
        .distinct()
        .sorted(Comparator.reverseOrder())
        .findFirst() // min(Comparator.reverseOrder()) instead of sorted() & findFirst()
        .get();
      assertEquals("test11", result);
    }

    // Count number of words
    try (Stream<String> lines = Files.lines(file)) {
      long result = lines
        .flatMap(line -> Stream.of(line.split("\\s")))
        .count();
      assertEquals(words.size(), result);
    }

    // Count number of characters of words (1/2)
    String fileAsStr = new String(Files.readAllBytes(file));
    long result = Pattern.compile("\\s")
      .splitAsStream(fileAsStr)
      .mapToLong(String::length)
      .sum();
    assertEquals(36, result);

    // Count number of characters of words (2/2)
    fileAsStr = new String(Files.readAllBytes(file));
    result = Pattern.compile("\\s")
      .splitAsStream(fileAsStr)
      .reduce(0L,
        (total, word) -> total + word.length(),
        (total1, total2) -> total1 + total2);
    assertEquals(36, result);
  }

  @Test
  public void testFactorial() {
    long result = LongStream
      //.range(1, 5)        [1, 5)
      .rangeClosed(1, 5) // [1, 5]
      .reduce((left, right) -> left * right)
      .orElse(0);

    assertEquals(120, result);

    result = LongStream
      //.range(1, 5)        [1, 5)
      .rangeClosed(1, 5) // [1, 5]
      .reduce(1, (left, right) -> left * right);

    assertEquals(120, result);
  }
  
  @Test
  public void testNumberStream() {
    List<Integer> integers = DoubleStream
      .generate(Math::random)        // generate unlimited by calling Math.random
      .mapToInt(d -> (int) (d * 10)) // convert to IntStream
      .limit(10)                     // limit to first 10 elements
      .boxed()                       // convert to Stream<Integer>
      .collect(Collectors.toList()); // collect to a list

    assertEquals(10, integers.size());


    StringBuilder password = DoubleStream
      .generate(Math::random)
      .mapToInt(d -> (int) (d * 1000))
      .filter(value -> (value >= 'A' && value <= 'Z') ||
        (value >= 'a' && value <= 'z') ||
        (value >= '0' && value <= '9'))
      .limit(10)
      .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append);

    assertEquals(10, password.toString().length());
  }

  @Test
  public void testCollectors() {
    List<Employee> list = Arrays.asList(
      new Employee("John", 5000),
      new Employee("Jack", 6000),
      new Employee("Jack", 7000),
      new Employee("Bill", 3000));

    Map<String, Employee> name2employee = list.stream()
      .collect(Collectors.toMap(Employee::getName, Function.identity(), (curV, newV) -> newV));

    assertEquals(3, name2employee.size());
    assertEquals(7000, name2employee.get("Jack").getSalary().intValue());


    final Map<String, List<Employee>> name2employees = list.stream()
      .collect(Collectors.groupingBy(Employee::getName, LinkedHashMap::new, Collectors.toList()));

    assertEquals("John", name2employees.keySet().stream().findFirst().get());
    assertEquals(3, name2employees.size());
    assertEquals(1, name2employees.get("Bill").size());
    assertEquals(2, name2employees.get("Jack").size());


    final int averageSalary = (int) list.stream()
      .mapToInt(Employee::getSalary)
      .average()
      .orElse(0);
    assertEquals(5250, averageSalary);

    final Map<Boolean, List<Employee>> highSalaryEmployees = list.stream()
      .collect(Collectors.partitioningBy(emp-> emp.getSalary() > averageSalary));

    assertEquals(2, highSalaryEmployees.get(true).size());
    assertEquals(2, highSalaryEmployees.get(false).size());
  }

  // ------------------------------

  class Employee {
    private String name;
    private Integer salary;

    Employee(String name, Integer salary) {
      this.name = name;
      this.salary = salary;
    }

    String getName() {
      return name;
    }

    Integer getSalary() {
      return salary;
    }

    @Override
    public String toString() {
      return getName() + ", " + getSalary();
    }
  }
}

Creation

The following methods return a Stream object.

API Description
Interface Static Methods (also valid for IntStream, LongStream and DoubleStream)
Stream.of(T... varg) Stream<String> stream = Stream.of("1", "5", "7")
Stream.generate(Supplier)

Stream<Double> randoms = Stream.generate(Math::random)

  • Returns an infinite sequential unordered stream
  • Each element is generated by the provided Supplier
  • Suitable for generating constant streams, streams of random elements, etc
Stream.iterate(
  T seed,
  UnaryOperator<T>)
  • Returns an infinite sequential ordered Stream
  • Produced by iterative application of a function f to an initial element seed
  • Consisting of seed, f(seed), f(f(seed)), etc.
Object's Method Returning a Stream
COLLECTION_VAR.stream() or COLLECTION_VAR.parallelStream()
Stream<String> words = Pattern.compile("\\s").splitAsStream(CONTENT_VAR)
Stream<String> lines = Files.lines(PATH_VAR)
BufferedReader reader = new BufferedReader(...)
/*
Note: new BufferedReader(new InputStreamReader(INPUT_STREAM_OBJ))
      so any InputStream can be converted to Stream<String>
*/
Stream<String> lines = reader.lines()
IntStream chars = "جاوا".codePoints()
IntStream stream = new Random().ints()
LongStream stream = new Random().longs()
DoubleStream stream = new Random().doubles()
Stream<T> s = Arrays.stream(T[] array)
Stream<T> s = Arrays.stream(T[] array, int startInclusive, int endExclusive)

Transformation & Resizing

The following methods return an Stream or Stream-based object.

API Description
filter(Predicate<? super T> p)
// T t -> boolean
public interface Predicate<T> {
  boolean test(T t);
}
so filter(n -> n > 12)
map(Function<? super T, ? extends R> mapper)
// T t -> R
public interface Function<T, R> {
  R apply(T t);
}
so map(s -> s.length())
mapToInt(ToIntFunction<? super T> mapper): IntStream
// T t -> int
public interface ToIntFunction<T> {
  int applyAsInt(T value);
}
so mapToInt(s -> s.length())
mapToLong() and mapToDouble() are similar to the above.
flatMap(Function<? super T,
  ? extends Stream<? extends R>> mapper)
Stream<String> lines = Files.lines(path)
Stream<String> words = lines.flatMap(
  line -> Stream.of(line.split(" +")))
limit(long n) Returns a stream consisting of the first n elements in the encounter order, so it can be quite expensive on ordered parallel pipelines.
Note: If ordered elements is required, and there is poor performance or memory utilization with limit() in parallel pipelines, switching to sequential execution with sequential() may improve performance.
skip(long n) Returns a stream remaining of the elements after discarding the first n elements in the encounter order, so it can be quite expensive on ordered parallel pipelines.
If this stream contains fewer than n elements then an empty stream will be returned.
Note: the note in limit()
distinct() Returns a stream consisting of the distinct elements (according to equals()).
For ordered streams, the selection of distinct elements is stable, however for unordered streams no stability guarantees are made.
Note: the note in limit()
sorted()
sorted(Comparator<? super T> comparator)
Returns a stream of sorted elements according to natural order or given comparator.
For ordered streams, the sort is stable, however for unordered streams no stability guarantees are made.

Reduction & Collection

  • These methods are terminal operations and get the final answer from a Stream.
  • Reduction ones mostly return an Optional object
  • Collection ones mostly return a Collection
  • After calling these methods, the Stream object is closed
API Description
Reduction
findFirst(): Optional Returns an Optional describing the first element
findAny(): Optional Returns an Optional element of the stream.
The behavior is nondeterministic, so it is effective when you parallelize the stream and the first match in any of the examined segments will complete the computation.
anyMatch(Predicate): boolean Returns whenever any elements of this stream match the provided predicate
There are allMatch() and noneMatch(), the same syntax as above, that return true if all or no elements match a predicate
reduce(BinaryOperator<T> accumulator): Optional
// T t, T t -> T
public interface BinaryOperator<T> {
  T apply(T t, T t);
}
Performs a reduction on the elements of this stream, using an associative accumulation function (e.g. sum and product, string concatenation, maximum and minimum, set union and intersection), and returns an Optional describing the reduced value.
Note: in fact BinaryOperator<T> extends BiFunction<T, T, T>
count(): long Count the elements in this stream.
This is a special case of reduction equivalent to:
return mapToLong(e -> 1L).sum();
max() min() sum() average() summaryStatistics() The related mathematical function is applied on the numerical elements of the Stream. So the stream must be IntStream, LongStream, or DoubleStream.
Collection
<R> R collect(
  Supplier<R> supplier,
  BiConsumer<R, ? super T> accumulator,
  BiConsumer<R, R> combiner)
stream.collect(
  ArrayList::new,
  ArrayList::add,
  ArrayList::addAll)
collect(Collector collector) collect(Collectors.asList())
collect(Collectors.asSet())
collect(Collectors.toCollection(LinkedHashSet::new))

collect(Collectors.joining())
collect(Collectors.joining(","))
If your stream contains objects other than strings, you need to first convert them to strings, like this: stream.map(Object::toString).collect(Collectors.joining(","))

// map name to salary
collect(Collectors.toMap(
  Employee::getName, 
  Employee::getSalary))
// map name to employee
collect(Collectors.toMap(
  Employee::getName,
  Function.identity()))
Note: Duplicate key results in exception in previous two toMap(), however the following trie to handle it!
// map name to employee, on duplicate key use first one
collect(Collectors.toMap(
  Employee::getName,
  Function.identity(), 
  (curVal, newVal) -> curVal))

collect(Collectors.groupingBy(
  Employee::getName): Map<String, List<Employee>>
collect(Collectors.groupingBy(
  Employee::getName, 
  LinkedHashMap::new, 
  Collectors.toList())): Map<String, List<Employee>>
<A> A[] toArray(IntFunction<A[]> generator)
// int v -> R
public interface IntFunction<R> {
  R apply(int value);
}
The most common call is toArray(TYPE[]::new) (constructor reference)

Another Example

This a simplified version of a real example. Suppose there are two entities: Report & Group. These two entities has a many-to-many association, navigable from Report to Group (following code). Now, when a list of Report objects is fetched from database, each Report knows its Group, but for presentation, a reverse relation is needed, and it is required to group Reports by Group. The following code shows the solution:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import org.junit.Test;

import java.util.*;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;

public class TestManyToMany {

  @Test
  public void test() {
    Group scm = new Group("SCM");
    Group mtc = new Group("MTC");
    Group hse = new Group("HSE");

    Report stock = new Report("Stock Inventory", scm, mtc);
    Report incid = new Report("Incidents", hse);
    Report artcl = new Report("Accounting Articles", scm, mtc, hse);
    Report mttr = new Report("MTTR", mtc);

    List<Report> reports = Arrays.asList(stock, incid, artcl, mttr);

    Map<Group, List<Report>> expected = new TreeMap<>();
    expected.put(scm, Arrays.asList(stock, artcl));
    expected.put(mtc, Arrays.asList(stock, artcl, mttr));
    expected.put(hse, Arrays.asList(incid, artcl));


    Map<Group, List<Report>> result = reports.stream()
      .flatMap(report ->
        report.getGroups().stream().map(dataGroup ->
          new AbstractMap.SimpleEntry<>(dataGroup, report)
        )
      )
      .collect(Collectors.groupingBy(
        AbstractMap.SimpleEntry::getKey,
        TreeMap::new,
        Collectors.mapping(
          AbstractMap.SimpleEntry::getValue,
          Collectors.toList()))
      );

    assertEquals(expected, result);
  }

  // ------------------------------

  private class Report {
    private String name;
    private List<Group> groups;

    Report(String name, Group... groups) {
      this.name = name;
      this.groups = Arrays.asList(groups);
    }

    String getName() {
      return name;
    }

    List<Group> getGroups() {
      return groups;
    }

    @Override
    public String toString() {
      return getName();
    }
  }

  // ------------------------------

  private class Group implements Comparable<Group> {
    private String name;

    Group(String name) {
      this.name = name;
    }

    String getName() {
      return name;
    }

    @Override
    public String toString() {
      return getName();
    }

    @Override
    public int compareTo(Group o) {
      return getName().compareTo(o.getName());
    }
  }
}
  • Lines 30 to 34 act like a Cartesian Product and the result is tuples of (Group, Report)
  • AbstractMap.SimpleEntry is used as the data structure for tuple
  • Lines 38 to 40 create a mapping from AbstractMap.SimpleEntry<Group, Report> to Report