1. Przegląd
Stream API zapewnia bogaty repertuar funkcji pośrednich, redukcyjnych i terminalowych, które również obsługują równoległość.
Mówiąc dokładniej, operacje na strumieniu redukcji pozwalają nam uzyskać jeden wynik z sekwencji elementów , poprzez wielokrotne stosowanie operacji łączenia do elementów w sekwencji.
W tym samouczku przyjrzymy się operacji Stream.reduce () ogólnego przeznaczenia i zobaczymy ją w niektórych konkretnych przypadkach użycia.
2. Kluczowe pojęcia: tożsamość, akumulator i łącznik
Zanim przyjrzymy się dokładniej używaniu operacji Stream.reduce () , podzielmy elementy uczestników operacji na osobne bloki. W ten sposób łatwiej zrozumiemy rolę, jaką każdy z nich odgrywa:
- Tożsamość - element będący początkową wartością operacji redukcji i domyślnym wynikiem, jeśli strumień jest pusty
- Akumulator - funkcja przyjmująca dwa parametry: częściowy wynik operacji redukcji oraz kolejny element strumienia
- Combiner - funkcja używana do łączenia częściowego wyniku operacji redukcji, gdy redukcja jest równoległa lub gdy występuje niezgodność między typami argumentów akumulatora a typami implementacji akumulatora
3. Korzystanie z Stream.reduce ()
Aby lepiej zrozumieć działanie elementów tożsamości, akumulatora i sumatora, spójrzmy na kilka podstawowych przykładów:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int result = numbers .stream() .reduce(0, (subtotal, element) -> subtotal + element); assertThat(result).isEqualTo(21);
W tym przypadku, całkowita wartość 0 jest tożsamość. Przechowuje początkową wartość operacji redukcji, a także domyślny wynik, gdy strumień wartości Integer jest pusty.
Podobnie wyrażenie lambda :
subtotal, element -> subtotal + element
jest akumulatorem , ponieważ przyjmuje częściową sumę wartości całkowitych i następny element w strumieniu.
Aby uczynić kod jeszcze bardziej zwięzłym, możemy użyć odwołania do metody zamiast wyrażenia lambda:
int result = numbers.stream().reduce(0, Integer::sum); assertThat(result).isEqualTo(21);
Oczywiście możemy zastosować operację redukuj () na strumieniach zawierających inne typy elementów.
Na przykład, możemy użyć redukuj () na tablicy elementów String i połączyć je w jeden wynik:
List letters = Arrays.asList("a", "b", "c", "d", "e"); String result = letters .stream() .reduce("", (partialString, element) -> partialString + element); assertThat(result).isEqualTo("abcde");
Podobnie możemy przełączyć się na wersję, która używa odwołania do metody:
String result = letters.stream().reduce("", String::concat); assertThat(result).isEqualTo("abcde");
Użyjmy operacji redukuj () do łączenia elementów tablicy z dużymi literami :
String result = letters .stream() .reduce( "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase()); assertThat(result).isEqualTo("ABCDE");
Oprócz tego możemy użyć redukuj () w równoległym strumieniu (więcej o tym później):
List ages = Arrays.asList(25, 30, 45, 28, 32); int computedAges = ages.parallelStream().reduce(0, a, b -> a + b, Integer::sum);
Gdy strumień jest wykonywany równolegle, środowisko wykonawcze Java dzieli strumień na wiele strumieni podrzędnych. W takich przypadkach musimy użyć funkcji, aby połączyć wyniki podstrumieni w jeden . To jest rola sumatora - w powyższym fragmencie jest to referencja do metody Integer :: sum .
Co zabawne, ten kod się nie skompiluje:
List users = Arrays.asList(new User("John", 30), new User("Julie", 35)); int computedAges = users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge());
W tym przypadku mamy strumień obiektów User , a typy argumentów akumulatorów to Integer i User. Jednak implementacja akumulatora jest sumą liczb całkowitych, więc kompilator po prostu nie może wywnioskować typu parametru użytkownika .
Możemy rozwiązać ten problem za pomocą sumatora:
int result = users.stream() .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); assertThat(result).isEqualTo(65);
Mówiąc prościej, jeśli używamy strumieni sekwencyjnych, a typy argumentów akumulatora i typy jego implementacji są zgodne, nie musimy używać sumatora .
4. Równoległe zmniejszanie
Jak dowiedzieliśmy się wcześniej, na równoległych strumieniach możemy używać funkcji redukuj () .
Kiedy używamy strumieni równoległych, powinniśmy upewnić się, że redukcja () lub inne zagregowane operacje wykonywane na strumieniach są:
- asocjacyjny : kolejność operandów nie wpływa na wynik
- niezakłócający : operacja nie wpływa na źródło danych
- bezstanowe i deterministyczne : operacja nie ma stanu i wytwarza te same dane wyjściowe dla danego wejścia
Powinniśmy spełnić wszystkie te warunki, aby zapobiec nieprzewidywalnym skutkom.
Zgodnie z oczekiwaniami operacje wykonywane na równoległych strumieniach, w tym redukcja (), są wykonywane równolegle, dzięki czemu wykorzystuje się wielordzeniowe architektury sprzętowe.
Z oczywistych powodów równoległe strumienie są znacznie bardziej wydajne niż ich odpowiedniki sekwencyjne . Mimo to mogą być przesadą, jeśli operacje stosowane w strumieniu nie są drogie lub liczba elementów w strumieniu jest niewielka.
Oczywiście równoległe strumienie są właściwym sposobem, gdy musimy pracować z dużymi strumieniami i wykonywać kosztowne operacje agregujące.
Stwórzmy prosty test porównawczy JMH (Java Microbenchmark Harness) i porównajmy odpowiednie czasy wykonania podczas korzystania z operacji redukuj () na strumieniu sekwencyjnym i równoległym:
@State(Scope.Thread) private final List userList = createUsers(); @Benchmark public Integer executeReduceOnParallelizedStream() { return this.userList .parallelStream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } @Benchmark public Integer executeReduceOnSequentialStream() { return this.userList .stream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); }
In the above JMH benchmark, we compare execution average times. We simply create a List containing a large number of User objects. Next, we call reduce() on a sequential and a parallelized stream and check that the latter performs faster than the former (in seconds-per-operation).
These are our benchmark results:
Benchmark Mode Cnt Score Error Units JMHStreamReduceBenchMark.executeReduceOnParallelizedStream avgt 5 0,007 ± 0,001 s/op JMHStreamReduceBenchMark.executeReduceOnSequentialStream avgt 5 0,010 ± 0,001 s/op
5. Throwing and Handling Exceptions While Reducing
In the above examples, the reduce() operation doesn't throw any exceptions. But it might, of course.
For instance, say that we need to divide all the elements of a stream by a supplied factor and then sum them:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int divider = 2; int result = numbers.stream().reduce(0, a / divider + b / divider);
This will work, as long as the divider variable is not zero. But if it is zero, reduce() will throw an ArithmeticException exception: divide by zero.
We can easily catch the exception and do something useful with it, such as logging it, recovering from it and so forth, depending on the use case, by using a try/catch block:
public static int divideListElements(List values, int divider) { return values.stream() .reduce(0, (a, b) -> { try { return a / divider + b / divider; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return 0; }); }
While this approach will work, we polluted the lambda expression with the try/catch block. We no longer have the clean one-liner that we had before.
To fix this issue, we can use the extract function refactoring technique, and extract the try/catch block into a separate method:
private static int divide(int value, int factor) { int result = 0; try { result = value / factor; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return result }
Now, the implementation of the divideListElements() method is again clean and streamlined:
public static int divideListElements(List values, int divider) { return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider)); }
Assuming that divideListElements() is a utility method implemented by an abstract NumberUtils class, we can create a unit test to check the behavior of the divideListElements() method:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);
Let's also test the divideListElements() method, when the supplied List of Integer values contains a 0:
List numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);
Finally, let's test the method implementation when the divider is 0, too:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);
6. Complex Custom Objects
We can also use Stream.reduce() with custom objects that contain non-primitive fields. To do so, we need to provide a relevant identity, accumulator, and combiner for the data type.
Suppose our User is part of a review website. Each of our Users can possess one Rating, which is averaged over many Reviews.
First, let's start with our Review object. Each Review should contain a simple comment and score:
public class Review { private int points; private String review; // constructor, getters and setters }
Next, we need to define our Rating, which will hold our reviews alongside a points field. As we add more reviews, this field will increase or decrease accordingly:
public class Rating { double points; List reviews = new ArrayList(); public void add(Review review) { reviews.add(review); computeRating(); } private double computeRating() { double totalPoints = reviews.stream().map(Review::getPoints).reduce(0, Integer::sum); this.points = totalPoints / reviews.size(); return this.points; } public static Rating average(Rating r1, Rating r2) { Rating combined = new Rating(); combined.reviews = new ArrayList(r1.reviews); combined.reviews.addAll(r2.reviews); combined.computeRating(); return combined; } }
We have also added an average function to compute an average based on the two input Ratings. This will work nicely for our combiner and accumulator components.
Next, let's define a list of Users, each with their own sets of reviews.
User john = new User("John", 30); john.getRating().add(new Review(5, "")); john.getRating().add(new Review(3, "not bad")); User julie = new User("Julie", 35); john.getRating().add(new Review(4, "great!")); john.getRating().add(new Review(2, "terrible experience")); john.getRating().add(new Review(4, "")); List users = Arrays.asList(john, julie);
Teraz, gdy John i Julie są uwzględnieni, użyjmy Stream.reduce (), aby obliczyć średnią ocenę dla obu użytkowników. Jako tożsamość zwróćmy nową ocenę, jeśli nasza lista wejściowa jest pusta :
Rating averageRating = users.stream() .reduce(new Rating(), (rating, user) -> Rating.average(rating, user.getRating()), Rating::average);
Jeśli zrobimy matematykę, powinniśmy stwierdzić, że średni wynik to 3,6:
assertThat(averageRating.getPoints()).isEqualTo(3.6);
7. Wnioski
W tym samouczku dowiedzieliśmy się, jak używać operacji Stream.reduce () . Ponadto nauczyliśmy się, jak wykonywać redukcje w strumieniach sekwencyjnych i równoległych oraz jak obsługiwać wyjątki podczas redukcji .
Jak zwykle wszystkie przykłady kodu pokazane w tym samouczku są dostępne w serwisie GitHub.