Посібник із Stream.reduce ()

1. Огляд

API Stream надає багатий репертуар проміжних, редукційних та термінальних функцій, які також підтримують розпаралелювання.

Більш конкретно, операції скорочення потоку дозволяють нам отримати один єдиний результат із послідовності елементів , повторно застосовуючи операцію комбінування до елементів у послідовності.

У цьому уроці ми розглянемо загальне призначення Stream.reduce () операція і побачити його в деяких випадках застосування бетону.

2. Ключові поняття: ідентичність, накопичувач та комбінатор

Перш ніж ми глибше розглянемо використання операції Stream.reduce () , давайте розберемо елементи учасника операції на окремі блоки. Таким чином, ми легше зрозуміємо роль, яку виконує кожен з них:

  • Ідентичність - елемент, який є початковим значенням операції скорочення та результатом за замовчуванням, якщо потік порожній
  • Акумулятор - функція, яка приймає два параметри: частковий результат операції скорочення і наступний елемент потоку
  • Combiner - функція, що використовується для об'єднання часткового результату операції скорочення при паралельному скороченні або коли є невідповідність між типами аргументів накопичувача та типами реалізації накопичувача

3. Використання Stream.reduce ()

Щоб краще зрозуміти функціональність елементів ідентифікатора, накопичувача та об’єднання, давайте розглянемо кілька основних прикладів:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int result = numbers .stream() .reduce(0, (subtotal, element) -> subtotal + element); assertThat(result).isEqualTo(21);

В цьому випадку, ціле значення 0 тотожно. Він зберігає початкове значення операції зменшення, а також результат за замовчуванням, коли потік цілочисельних значень порожній.

Так само, лямбда-вираз :

subtotal, element -> subtotal + element

є накопичувачем , оскільки він бере часткову суму значень Integer і наступного елемента в потоці.

Щоб зробити код ще більш стислим, ми можемо використовувати посилання на метод замість лямбда-виразу:

int result = numbers.stream().reduce(0, Integer::sum); assertThat(result).isEqualTo(21);

Звичайно, ми можемо використовувати операцію зменшення () для потоків, що містять інші типи елементів.

Наприклад, ми можемо використовувати reduce () для масиву елементів String і об'єднати їх в один результат:

List letters = Arrays.asList("a", "b", "c", "d", "e"); String result = letters .stream() .reduce("", (partialString, element) -> partialString + element); assertThat(result).isEqualTo("abcde");

Аналогічним чином ми можемо перейти на версію, яка використовує посилання на метод:

String result = letters.stream().reduce("", String::concat); assertThat(result).isEqualTo("abcde");

Давайте використаємо операцію reduce () для приєднання великих елементів масиву літер :

String result = letters .stream() .reduce( "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase()); assertThat(result).isEqualTo("ABCDE");

Окрім цього, ми можемо використовувати redu () у паралелізованому потоці (про це далі):

List ages = Arrays.asList(25, 30, 45, 28, 32); int computedAges = ages.parallelStream().reduce(0, a, b -> a + b, Integer::sum);

Коли потік виконується паралельно, середовище виконання Java розбиває потік на кілька підпотоків. У таких випадках нам потрібно використовувати функцію для об’єднання результатів підпотоків в єдину . Це роль комбінатора - у наведеному фрагменті це посилання на метод Integer :: sum .

Як не дивно, цей код не компілюється:

List users = Arrays.asList(new User("John", 30), new User("Julie", 35)); int computedAges = users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge()); 

У цьому випадку ми маємо потік об’єктів User , а типами аргументів накопичувача є Integer та User. Однак реалізація накопичувача - це сума цілих чисел, тому компілятор просто не може зробити висновок про тип користувацького параметра.

Ми можемо вирішити цю проблему за допомогою комбайнера:

int result = users.stream() .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); assertThat(result).isEqualTo(65);

Простіше кажучи, якщо ми використовуємо послідовні потоки, а типи типів аргументів акумулятора та типи його реалізації збігаються, нам не потрібно використовувати комбінатор .

4. Зменшення паралельно

Як ми вже дізналися раніше, ми можемо використовувати redu () на паралелізованих потоках.

Коли ми використовуємо паралелізовані потоки, нам слід переконатись, що reduce () або будь-які інші сукупні операції, що виконуються над потоками:

  • асоціативний : порядок операндів не впливає на результат
  • не заважає : операція не впливає на джерело даних
  • бездержавний і детермінований : операція не має стану і видає однакові результати для даного входу

Ми повинні виконати всі ці умови, щоб запобігти непередбачуваним результатам.

Як і слід було очікувати, операції, що виконуються на паралелізованих потоках, включаючи зменшення (), виконуються паралельно, отже, використовуючи переваги багатоядерних апаратних архітектур.

З очевидних причин паралелізовані потоки набагато ефективніші, ніж послідовні аналоги . Незважаючи на це, вони можуть бути надмірними, якщо операції, застосовані до потоку, не є дорогими або кількість елементів у потоці невелика.

Звичайно, паралелізовані потоки - це правильний шлях, коли нам потрібно працювати з великими потоками та виконувати дорогі сукупні операції.

Давайте створимо простий тест тесту JMH (Java Microbenchmark Harness) та порівняємо відповідні терміни виконання при використанні операції зменшення () на послідовному та паралелізованому потоці:

@State(Scope.Thread) private final List userList = createUsers(); @Benchmark public Integer executeReduceOnParallelizedStream() { return this.userList .parallelStream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } @Benchmark public Integer executeReduceOnSequentialStream() { return this.userList .stream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } 

In the above JMH benchmark, we compare execution average times. We simply create a List containing a large number of User objects. Next, we call reduce() on a sequential and a parallelized stream and check that the latter performs faster than the former (in seconds-per-operation).

These are our benchmark results:

Benchmark Mode Cnt Score Error Units JMHStreamReduceBenchMark.executeReduceOnParallelizedStream avgt 5 0,007 ± 0,001 s/op JMHStreamReduceBenchMark.executeReduceOnSequentialStream avgt 5 0,010 ± 0,001 s/op

5. Throwing and Handling Exceptions While Reducing

In the above examples, the reduce() operation doesn't throw any exceptions. But it might, of course.

For instance, say that we need to divide all the elements of a stream by a supplied factor and then sum them:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int divider = 2; int result = numbers.stream().reduce(0, a / divider + b / divider); 

This will work, as long as the divider variable is not zero. But if it is zero, reduce() will throw an ArithmeticException exception: divide by zero.

We can easily catch the exception and do something useful with it, such as logging it, recovering from it and so forth, depending on the use case, by using a try/catch block:

public static int divideListElements(List values, int divider) { return values.stream() .reduce(0, (a, b) -> { try { return a / divider + b / divider; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return 0; }); }

While this approach will work, we polluted the lambda expression with the try/catch block. We no longer have the clean one-liner that we had before.

To fix this issue, we can use the extract function refactoring technique, and extract the try/catch block into a separate method:

private static int divide(int value, int factor) { int result = 0; try { result = value / factor; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return result } 

Now, the implementation of the divideListElements() method is again clean and streamlined:

public static int divideListElements(List values, int divider) { return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider)); } 

Assuming that divideListElements() is a utility method implemented by an abstract NumberUtils class, we can create a unit test to check the behavior of the divideListElements() method:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21); 

Let's also test the divideListElements() method, when the supplied List of Integer values contains a 0:

List numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21); 

Finally, let's test the method implementation when the divider is 0, too:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);

6. Complex Custom Objects

We can also use Stream.reduce() with custom objects that contain non-primitive fields. To do so, we need to provide a relevant identity, accumulator, and combiner for the data type.

Suppose our User is part of a review website. Each of our Users can possess one Rating, which is averaged over many Reviews.

First, let's start with our Review object. Each Review should contain a simple comment and score:

public class Review { private int points; private String review; // constructor, getters and setters }

Next, we need to define our Rating, which will hold our reviews alongside a points field. As we add more reviews, this field will increase or decrease accordingly:

public class Rating { double points; List reviews = new ArrayList(); public void add(Review review) { reviews.add(review); computeRating(); } private double computeRating() { double totalPoints = reviews.stream().map(Review::getPoints).reduce(0, Integer::sum); this.points = totalPoints / reviews.size(); return this.points; } public static Rating average(Rating r1, Rating r2) { Rating combined = new Rating(); combined.reviews = new ArrayList(r1.reviews); combined.reviews.addAll(r2.reviews); combined.computeRating(); return combined; } }

We have also added an average function to compute an average based on the two input Ratings. This will work nicely for our combiner and accumulator components.

Next, let's define a list of Users, each with their own sets of reviews.

User john = new User("John", 30); john.getRating().add(new Review(5, "")); john.getRating().add(new Review(3, "not bad")); User julie = new User("Julie", 35); john.getRating().add(new Review(4, "great!")); john.getRating().add(new Review(2, "terrible experience")); john.getRating().add(new Review(4, "")); List users = Arrays.asList(john, julie); 

Тепер, коли враховано Джона та Джулі, давайте використаємо Stream.reduce () для обчислення середньої оцінки для обох користувачів. Як ідентичність , давайте повернемо новий рейтинг, якщо наш список введення порожній :

Rating averageRating = users.stream() .reduce(new Rating(), (rating, user) -> Rating.average(rating, user.getRating()), Rating::average);

Якщо ми підраховуємо, ми повинні виявити, що середній бал становить 3,6:

assertThat(averageRating.getPoints()).isEqualTo(3.6);

7. Висновок

У цьому посібнику ми дізналися, як використовувати операцію Stream.reduce () . Крім того, ми дізналися, як виконувати скорочення на послідовних та паралелізованих потоках, і як обробляти винятки при зменшенні .

Як завжди, усі зразки коду, показані в цьому посібнику, доступні на GitHub.