1. Översikt
Stream API ger en rik repertoar av mellan-, reduktions- och terminalfunktioner, som också stöder parallellisering.
Mer specifikt tillåter reduktionsströmoperationer oss att producera ett enda resultat från en sekvens av element , genom att upprepade gånger applicera en kombinerande operation på elementen i sekvensen.
I den här guiden kommer vi att titta på det allmänna ändamål Stream.reduce () drift och ser det i vissa konkreta användningsfall.
2. Nyckelbegreppen: Identitet, Ackumulator och Combiner
Innan vi tittar djupare på användningen av Stream.reduce () , låt oss dela upp operationens deltagarelement i separata block. På det sättet förstår vi lättare vilken roll var och en spelar:
- Identitet - ett element som är det initiala värdet för minskningsoperationen och standardresultatet om strömmen är tom
- Ackumulator - en funktion som tar två parametrar: ett delvis resultat av reduktionsoperationen och nästa element i strömmen
- Combiner - en funktion som används för att kombinera det partiella resultatet av reduktionsoperationen när reduktionen är parallelliserad, eller när det finns en obalans mellan typerna av ackumulatorargumenten och typerna av ackumulatorimplementeringen
3. Använda Stream.reduce ()
För att bättre förstå funktionaliteten hos identitets-, ackumulator- och kombinationselementen, låt oss titta på några grundläggande exempel:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int result = numbers .stream() .reduce(0, (subtotal, element) -> subtotal + element); assertThat(result).isEqualTo(21);
I det här fallet, det Integer är värdet 0 identitet. Det lagrar det inledande värdet av reduktionsåtgärden och även standardresultatet när strömmen av heltalsvärden är tom.
På samma sätt lambdauttrycket :
subtotal, element -> subtotal + element
är ackumulatorn , eftersom den tar den partiella summan av heltalsvärden och nästa element i strömmen.
För att göra koden ännu mer kortfattad kan vi använda en metodreferens istället för ett lambdauttryck:
int result = numbers.stream().reduce(0, Integer::sum); assertThat(result).isEqualTo(21);
Naturligtvis kan vi använda en reducera () operation på strömmar som innehåller andra typer av element.
Till exempel kan vi använda reducera () på en rad strängelement och gå med dem i ett enda resultat:
List letters = Arrays.asList("a", "b", "c", "d", "e"); String result = letters .stream() .reduce("", (partialString, element) -> partialString + element); assertThat(result).isEqualTo("abcde");
På samma sätt kan vi växla till den version som använder en metodreferens:
String result = letters.stream().reduce("", String::concat); assertThat(result).isEqualTo("abcde");
Låt oss använda reducera () för att sammanfoga de stora bokstäverna i bokstäverna :
String result = letters .stream() .reduce( "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase()); assertThat(result).isEqualTo("ABCDE");
Dessutom kan vi använda reducera () i en parallelliserad ström (mer om detta senare):
List ages = Arrays.asList(25, 30, 45, 28, 32); int computedAges = ages.parallelStream().reduce(0, a, b -> a + b, Integer::sum);
När en ström körs parallellt delar Java-körningen strömmen i flera delströmmar. I sådana fall måste vi använda en funktion för att kombinera resultaten av delströmmarna till en enda . Det här är rollen som kombineraren - i ovanstående kod är det Integer :: sum- metodreferensen.
Roligt nog kommer den här koden inte att sammanställas:
List users = Arrays.asList(new User("John", 30), new User("Julie", 35)); int computedAges = users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge());
I det här fallet har vi en ström av användarobjekt, och vilka typer av ackumulatorn argumenten är Integer och användare. Ackumulatorimplementeringen är dock en summan av heltal, så kompilatorn kan bara inte dra slutsatsen om typen av användarparameter .
Vi kan åtgärda problemet genom att använda en kombinerare:
int result = users.stream() .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); assertThat(result).isEqualTo(65);
För att uttrycka det enkelt, om vi använder sekventiella strömmar och typerna av ackumulatorargumenten och typerna av dess implementering matchar behöver vi inte använda en kombinerare .
4. Minska parallellt
Som vi lärde oss tidigare kan vi använda reducera () på parallelliserade strömmar.
When we use parallelized streams, we should make sure that reduce() or any other aggregate operations executed on the streams are:
- associative: the result is not affected by the order of the operands
- non-interfering: the operation doesn't affect the data source
- stateless and deterministic: the operation doesn't have state and produces the same output for a given input
We should fulfill all these conditions to prevent unpredictable results.
As expected, operations performed on parallelized streams, including reduce(), are executed in parallel, hence taking advantage of multi-core hardware architectures.
For obvious reasons, parallelized streams are much more performant than the sequential counterparts. Even so, they can be overkill if the operations applied to the stream aren't expensive, or the number of elements in the stream is small.
Of course, parallelized streams are the right way to go when we need to work with large streams and perform expensive aggregate operations.
Let's create a simple JMH (the Java Microbenchmark Harness) benchmark test and compare the respective execution times when using the reduce() operation on a sequential and a parallelized stream:
@State(Scope.Thread) private final List userList = createUsers(); @Benchmark public Integer executeReduceOnParallelizedStream() { return this.userList .parallelStream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } @Benchmark public Integer executeReduceOnSequentialStream() { return this.userList .stream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); }
In the above JMH benchmark, we compare execution average times. We simply create a List containing a large number of User objects. Next, we call reduce() on a sequential and a parallelized stream and check that the latter performs faster than the former (in seconds-per-operation).
These are our benchmark results:
Benchmark Mode Cnt Score Error Units JMHStreamReduceBenchMark.executeReduceOnParallelizedStream avgt 5 0,007 ± 0,001 s/op JMHStreamReduceBenchMark.executeReduceOnSequentialStream avgt 5 0,010 ± 0,001 s/op
5. Throwing and Handling Exceptions While Reducing
In the above examples, the reduce() operation doesn't throw any exceptions. But it might, of course.
For instance, say that we need to divide all the elements of a stream by a supplied factor and then sum them:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int divider = 2; int result = numbers.stream().reduce(0, a / divider + b / divider);
This will work, as long as the divider variable is not zero. But if it is zero, reduce() will throw an ArithmeticException exception: divide by zero.
We can easily catch the exception and do something useful with it, such as logging it, recovering from it and so forth, depending on the use case, by using a try/catch block:
public static int divideListElements(List values, int divider) { return values.stream() .reduce(0, (a, b) -> { try { return a / divider + b / divider; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return 0; }); }
While this approach will work, we polluted the lambda expression with the try/catch block. We no longer have the clean one-liner that we had before.
To fix this issue, we can use the extract function refactoring technique, and extract the try/catch block into a separate method:
private static int divide(int value, int factor) { int result = 0; try { result = value / factor; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return result }
Now, the implementation of the divideListElements() method is again clean and streamlined:
public static int divideListElements(List values, int divider) { return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider)); }
Assuming that divideListElements() is a utility method implemented by an abstract NumberUtils class, we can create a unit test to check the behavior of the divideListElements() method:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);
Let's also test the divideListElements() method, when the supplied List of Integer values contains a 0:
List numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);
Finally, let's test the method implementation when the divider is 0, too:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);
6. Complex Custom Objects
We can also use Stream.reduce() with custom objects that contain non-primitive fields. To do so, we need to provide a relevant identity, accumulator, and combiner for the data type.
Suppose our User is part of a review website. Each of our Users can possess one Rating, which is averaged over many Reviews.
First, let's start with our Review object. Each Review should contain a simple comment and score:
public class Review { private int points; private String review; // constructor, getters and setters }
Next, we need to define our Rating, which will hold our reviews alongside a points field. As we add more reviews, this field will increase or decrease accordingly:
public class Rating { double points; List reviews = new ArrayList(); public void add(Review review) { reviews.add(review); computeRating(); } private double computeRating() { double totalPoints = reviews.stream().map(Review::getPoints).reduce(0, Integer::sum); this.points = totalPoints / reviews.size(); return this.points; } public static Rating average(Rating r1, Rating r2) { Rating combined = new Rating(); combined.reviews = new ArrayList(r1.reviews); combined.reviews.addAll(r2.reviews); combined.computeRating(); return combined; } }
We have also added an average function to compute an average based on the two input Ratings. This will work nicely for our combiner and accumulator components.
Next, let's define a list of Users, each with their own sets of reviews.
User john = new User("John", 30); john.getRating().add(new Review(5, "")); john.getRating().add(new Review(3, "not bad")); User julie = new User("Julie", 35); john.getRating().add(new Review(4, "great!")); john.getRating().add(new Review(2, "terrible experience")); john.getRating().add(new Review(4, "")); List users = Arrays.asList(john, julie);
Nu när John och Julie redovisas, låt oss använda Stream.reduce () för att beräkna ett genomsnittligt betyg för båda användarna. Som en identitet , låt oss returnera ett nytt betyg om vår inmatningslista är tom :
Rating averageRating = users.stream() .reduce(new Rating(), (rating, user) -> Rating.average(rating, user.getRating()), Rating::average);
Om vi gör matte borde vi upptäcka att medelvärdet är 3,6:
assertThat(averageRating.getPoints()).isEqualTo(3.6);
7. Slutsats
I den här handledningen lärde vi oss hur man använder Stream.reduce () . Dessutom lärde vi oss att utföra reduktioner på sekventiella och parallelliserade strömmar, och hur man hanterar undantag samtidigt som man minskar .
Som vanligt är alla kodprover som visas i denna handledning tillgängliga på GitHub.