Java streams

Introduced in Java 8, streams are the easiest and most convenient way to operate on collections without altering the underlying data source.

What are Java streams and why are they used?

  • Java streams are used for functional style operations on a collection of objects or primitives like List<Monitor>, int[], HashMap<String, String>.
  • Streams are not a storage data structure. Streams operate on a source to produce results using functional operations, which can be pipelined. The original storage is never altered by streams.
  • Streams are AutoCloseable.
  • Unlike the forEach method (which operates in serial order), streams can be executed either in parallel or in serial order.
  • Streams would have zero or more intermediate functional operations pipelined followed by a terminal functional operation that indicates the end of the stream. To support pipelining, the intermediate operations will output streams.
Example
int totalVMwareCPU = monitors.stream().filter(monitor - > monitor.getMonitorType() == MonitorType.VMWARE).mapToInt(monitor - > monitor.getCPU()).sum();

In the above example, we are computing the total CPU of VMware monitors. filter() and mapToInt are intermediate functional operations pipelined (combined one another) and sum() is the terminal operation. Source data in Collection<Monitor> is never altered.

Streams are available from Java Development Kit (JDK) version 8. The following code is an another variant without using streams for previous JDK versions.

int totalVMwareCPU = 0;
for(Monitor monitor : monitors) {
if(monitor.getMonitorType() == MonitorType.VMWARE) {
totalVMwareCPU += monitor.getCPU();
}
}

Parallel streams

Parallel Streams in Java Fig. 1. Parallel Streams in Java
  • Any pipelined stream operations are performed sequentially (in serial) by default.
  • To execute operations in parallel, we need to specify explicitly by adding parallel method to any existing sequential stream or by creating a stream by calling the parallelStream method in the collection object.
  • Since the execution happens in parallel, the output would be the accumulation of all results of all parallel execution.
  • Parallel execution works using the fork/join framework. (https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html)
  • Though parallel execution comes with advantages like executing in multiple cores, you also has to consider the overhead like governing the multiple threads involved, splitting the sequential source, and merging the results.
Example
monitors.stream().filter(monitor - > monitor.getMonitorType() == MonitorType.VMWARE).parallel().mapToInt(Monitor::getCPU).sum();

We have used the method reference "Monitor::getCPU" inside the mapToInt functional operation. The exact lambda equivalent is mapToInt(monitor -> monitor.getCPU())

In the above code, we have explicitly mentioned parallel processing. The following code prints the threads involved in parallel processing.

monitors.stream() .filter(monitor -> monitor.getMonitorType() == MonitorType.VMWARE) .parallel().mapToInt(monitor -> {
System.out.println("Thread name: " + Thread.currentThread().getName());
eturn monitor.getCPU();
}).sum();
Output:
Thread name: ForkJoinPool.commonPool-worker-9
Thread name: ForkJoinPool.commonPool-worker-2
Thread name: ForkJoinPool.commonPool-worker-3

Fork/join framework

The fork/join framework is used and the worker threads in the common pool are employed for processing. Fork/join frameworks take care of splitting the sequential data across different worker threads and combining it by handling the callbacks on completion.

The number of threads in the common pool is based on the number of processor cores the machine has. To specify a custom value, the following code snippet has to be included in the JVM arguments.

-D java.util.concurrent.ForkJoinPool.common.parallelism=3

This setting is a global setting at the JVM level. Example using the reduce operation.

monitors.stream().filter(monitor - > monitor.getMonitorType() == MonitorType.VMWARE).parallel().reduce(0, (accumulatedCPU, monitor1) - > accumulatedCPU + monitor1.getCPU(), Integer::sum);

Reduce operation in Java streams

The reduce operation is used to produce a single value by applying and accumulating values using the specified operation on every sequence of object in the collection. In the above example:

  • Identity: 0 is the identity, i.e., the inital CPU value. The identity element is both the initial value of the reduction and the default result value if there are no elements in the stream.
  • Accumulator: The lambda function (accumulatedCPU, monitor1) -> accumulatedCPU + monitor1.getCPU() is the accumulator.
    1. accumulatedCPU is the partial CPU value (int data type) being computed on every invoke.
    2. monitor1 is the next running element in the sequence source.
  • Combiner: Since the stream is executed in parallel, the stream was divided into various sub-streams. Integer::sum combines the results from various sub streams.

The accumulator lambda function takes two arguments of different data types. Without a combiner, Java will throw a compilation error, because it would think both arguments are monitor types. Based on combiner result intelligence, the compiler would now treat the first argument accumulatedCPU as an integer.

Serial streams

This is the default mode in stream execution. A singe thread handles processing the pipeline operations.

Serial Streams in Java Fig. 2. Serial Streams in Java
Example
int totalVMwareCPU = monitors.stream().filter(monitor - > monitor.getMonitorType() == MonitorType.VMWARE).mapToInt(monitor - > monitor.getCPU()).sum();

Since a single thread is processing the operations, the order of processing the sequential elements in predictable.

Example using the reduce operation
List < String > words = Arrays.asList("Site24x7", "-", "All", "in", "one", "monitoring", "solution");
String sentence = words.stream()
.reduce("", (concat, word) - > concat + word + " ");
System.out.println(sentence);
Output:
Site24x7 - All in one monitoring solution

The following code snippet use "method reference" instead of lambda expression

List < String > words = Arrays.asList("Site24x7 ", "- ", "All ", "in ", "one ", "monitoring ", "solution");
String sentence = words.stream()
.reduce("", String::concat);
System.out.println(sentence);

Summing up

Java streams are considered important for their parallel processing capabilities. Choosing between serial and parallel streams depends on the developer's requirements.

In case if you have a huge data set to process with each data processing consuming more time, you can use parallel streams, so data processing is executed parallel in different CPU cores.