1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/wizardforcel-modern-java-zh

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
ch6.md 20 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 29.11.2024 19:52 2387d6d

Java 8: параллельное программирование. Атомарные переменные и ConcurrentMap

Перевод:

Приветствую вас в третьей части серии обучающих материалов по многопоточному программированию на Java 8! В этом уроке мы рассмотрим две важные составляющие параллельного API: атомарные переменные и ConcurrentMap. Благодаря недавно выпущенным лямбда-выражениям и функциональному программированию в Java 8, обе эти концепции получили значительные улучшения. Все эти новые функции будут описаны с помощью простых и понятных примеров кода. Надеюсь, вам понравится.

Для простоты в примерах кода этого урока используются две вспомогательные функции, определённые в этом месте: sleep(seconds) и stop(executor).

AtomicInteger

Пакет java.concurrent.atomic содержит множество полезных классов для выполнения атомарных операций. Если вы можете выполнять определённую операцию одновременно и безопасно в нескольких потоках без необходимости использования ключевого слова synchronized или блокировок, как описано в предыдущей главе, то эта операция является атомарной.

По сути, атомарные операции сильно зависят от сравнения и обмена (CAS), которые напрямую поддерживаются большинством современных процессоров в виде атомарных инструкций. Эти инструкции обычно быстрее, чем синхронизированные блоки. Поэтому, если вам нужно только параллельно изменять одну изменяемую переменную, я рекомендую использовать атомарный класс вместо блокировок, показанных в предыдущей главе.

Примечание переводчика: Для других языков атомарные операции могут быть реализованы с использованием блокировок вместо атомарных инструкций.

Теперь давайте выберем атомарный класс, например AtomicInteger:

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> executor.submit(atomicInt::incrementAndGet));

stop(executor);

System.out.println(atomicInt.get());    // => 1000

Используя AtomicInteger вместо Integer, мы можем безопасно увеличивать значения в параллельном режиме без синхронизации доступа к переменной. Метод incrementAndGet() является атомарной операцией, поэтому мы можем вызывать его безопасно из нескольких потоков.

AtomicInteger поддерживает различные атомарные операции. Метод updateAndGet() принимает лямбда-выражение, чтобы выполнить любую операцию над целым числом:

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.updateAndGet(n -> n + 2);
        executor.submit(task);
    });

stop(executor);

System.out.println(atomicInt.get());    // => 2000

Метод accumulateAndGet() принимает другой тип лямбда-выражения IntBinaryOperator. В следующем примере мы используем этот метод для параллельного вычисления суммы всех значений от 0 до 1000:

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.accumulateAndGet(i, (n, m) -> n + m);
        executor.submit(task);
    });

stop(executor);

System.out.println(atomicInt.get());    // => 499500

Другие полезные атомарные классы включают AtomicBoolean, AtomicLong и AtomicReference.

LongAdder

LongAdder — это альтернатива AtomicLong, предназначенная для последовательного добавления значений к некоторому числу.

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> executor.submit(adder::increment));

stop(executor);

System.out.println(adder.sumThenReset());   // => 1000

Методы add() и increment(), предоставляемые LongAdder, являются потокобезопасными, как и атомарные числовые классы. Однако этот класс внутренне поддерживает ряд переменных для уменьшения конкуренции между потоками, а не для вычисления единого результата. Фактический результат можно получить, вызвав методы sum() или sumThenReset().

Когда обновления из нескольких потоков происходят чаще, чем чтения, этот класс обычно работает лучше, чем атомарные числовые классы. Такая ситуация часто возникает при сборе статистических данных, например, когда вы хотите подсчитать количество запросов на веб-сервере. Недостатком LongAdder является более высокое использование памяти, поскольку он хранит ряд переменных в памяти.

LongAccumulator

LongAccumulator представляет собой более универсальную версию LongAdder. LongAccumulator строится с использованием типа LongBinaryOperator лямбда-выражений, а не просто выполняет операции сложения, как показано в этом коде:

LongBinaryOperator op = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(op, 1L);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10)
    .forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

stop(executor);

System.out.println(accumulator.getThenReset());     // => 2539

Мы создали LongAccumulator, используя функцию 2 * x + y, с начальным значением 1. Каждый раз, когда вызывается accumulate(i), текущее значение и значение i передаются в качестве аргументов лямбда-выражению.

Как и LongAdder, LongAccumulator внутренне поддерживает ряд переменных, чтобы уменьшить конкуренцию между потоками.

ConcurrentMap

Интерфейс ConcurrentMap наследуется от интерфейса Map и определяет один из наиболее полезных типов параллельных коллекций. Java 8 вводит функциональное программирование, добавляя новые методы в этот интерфейс.

В следующем коде мы используем пример отображения для демонстрации этих методов:

ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");

Метод forEach() принимает тип BiConsumer лямбда-выражения, передавая ключ и значение в качестве параметров. Он может служить заменой цикла for-each для перебора элементов параллельной карты. Итерация выполняется последовательно в текущем потоке.

map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));
``` **Новый метод putIfAbsent()** добавляет новое значение в отображение только в том случае, если предоставленный ключ отсутствует. По крайней мере, в реализации ConcurrentHashMap этот метод является потокобезопасным, поэтому при параллельном доступе к отображению не требуется никакой синхронизации.

```java
String value = map.putIfAbsent("c3", "p1");
System.out.println(value);    // p0

Метод getOrDefault() возвращает значение указанного ключа. Если предоставленный ключ не существует, возвращается указанное значение по умолчанию:

String value = map.getOrDefault("hi", "there");
System.out.println(value);    // there

Метод replaceAll() принимает лямбда-выражение типа BiFunction. BiFunction принимает два параметра и возвращает одно значение. Функция вызывается для каждого элемента с ключом и значением, и возвращается новое значение, которое должно быть сопоставлено с текущим ключом.

map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value);
System.out.println(map.get("r2"));    // d3

Метод compute() позволяет преобразовывать отдельные элементы, а не заменять все значения в отображении. Этот метод принимает ключ, который необходимо обработать, и BiFunction, которая используется для указания преобразования значения.

map.compute("foo", (key, value) -> value + value);
System.out.println(map.get("foo"));   // barbar

Помимо compute(), есть ещё два варианта: computeIfAbsent() и computeIfPresent(). Эти методы вызывают функцию-параметр только тогда, когда ключ отсутствует или присутствует.

Наконец, метод merge() можно использовать для объединения нового значения с существующим значением в отображении. Этот метод принимает ключ, новое значение для объединения с существующим элементом и BiFunction для определения поведения слияния двух значений.

map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal);
System.out.println(map.get("foo"));   // boo was foo

Все эти методы являются частью интерфейса ConcurrentMap, поэтому они могут быть вызваны на всех реализациях этого интерфейса. Кроме того, наиболее важная реализация ConcurrentHashMap использует некоторые новые методы для улучшения выполнения параллельных операций над отображением.

Подобно параллельным потокам, эти методы используют определённый ForkJoinPool, предоставляемый Java8 через ForkJoinPool.commonPool(). Этот пул использует предопределённый механизм параллелизма, зависящий от количества доступных ядер. Мой компьютер имеет четыре ядра, что делает параллелизм равным 3:

System.out.println(ForkJoinPool.getCommonPoolParallelism());  // 3

Это значение можно увеличить или уменьшить, установив следующий параметр JVM:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

Мы используем тот же пример отображения, но на этот раз мы используем конкретную реализацию ConcurrentHashMap вместо интерфейса ConcurrentMap, чтобы мы могли получить доступ ко всем общедоступным методам этого класса:

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");

Java8 представил три типа параллельных операций: forEach, search и reduce. Каждый из этих методов предоставляет четыре формы, которые принимают функции с параметрами, состоящими из ключей, значений, элементов или пар ключ-значение.

Первым параметром всех этих методов является общий parallelismThreshold. Это пороговое значение указывает размер минимальной коллекции, при котором операция будет выполняться параллельно. Например, если вы передадите пороговое значение 500, а фактическое количество элементов в отображении равно 499, операция будет выполнена последовательно в одном потоке. В следующем примере мы всегда используем пороговое значение 1, чтобы принудительно выполнять операции параллельно.

forEach

Метод forEach() может выполнять параллельную итерацию по парам ключ-значение в отображении. BiConsumer вызывается с текущим итерируемым элементом, его ключом и значением. Чтобы визуализировать параллельное выполнение, мы выводим имя текущего потока в консоль. Следует отметить, что подсистема ForkJoinPool использует максимум три потока.

map.forEach(1, (key, value) ->
    System.out.printf("key: %s; value: %s; thread: %s\n",
        key, value, Thread.currentThread().getName()));

// key: r2; value: d2; thread: main
// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
// key: c3; value: p0; thread: main

search

Метод search() принимает BiFunction и возвращает непустой результат поиска для текущей пары ключ-значение или null, если текущий элемент не соответствует никаким условиям поиска. Как только возвращается непустое значение, поиск прекращается. Следует помнить, что ConcurrentHashMap неупорядочен. Функция поиска не должна зависеть от фактического порядка обработки отображения. Если несколько элементов отображения соответствуют указанной функции поиска, результат не определён.

String result = map.search(1, (key, value) -> {
    System.out.println(Thread.currentThread().getName());
    if ("foo".equals(key)) {
        return value;
    }
    return null;
});
System.out.println("Result: " + result);

// ForkJoinPool.commonPool-worker-2
// main
// ForkJoinPool.commonPool-worker-3
// Result: bar

Вот ещё один пример, который ищет только значения в отображении:

String result = map.searchValues(1, value -> {
    System.out.println(Thread.currentThread().getName());
    if (value.length() > 3) {
        return value;
    }
    return null;
});

System.out.println("Result: " + result);

// ForkJoinPool.commonPool-worker-2
// main
// main
// ForkJoinPool.commonPool-worker-1
// Result: solo

reduce

Метод reduce() уже использовался в потоках данных Java 8. Он принимает два BiFunction типа lambda-выражений. Первая функция преобразует каждую пару ключ-значение в любое одиночное значение. Вторая функция объединяет все преобразованные значения в одно окончательное значение и игнорирует все возможные значения null.

String result = map.reduce(1,
    (key, value) -> {
        System.out.println("Transform: " + Thread.currentThread().getName());
        return key + "=" + value;
    },
    (s1, s2) -> {
        System.out.println("Reduce: " + Thread.currentThread().getName());
        return s1 + ", " + s2;
    });

System.out.println("Result: " + result);

// Transform: ForkJoinPool.commonPool-worker-2
// Transform: main
// Transform: ForkJoinPool.commonPool-worker-3
// Reduce: ForkJoinPool.commonPool-worker-3
// Transform: main
// Reduce: main
// Reduce: main
// Result: r2=d2, c3=p0, han=solo, foo=bar Я надеюсь, что вам понравится третья часть моего обучающего курса по параллелизму в Java 8. Примеры кода для этого курса размещены на GitHub (https://github.com/winterbe/java8-tutorial), там же можно найти и множество других фрагментов кода на Java 8. Вы можете воспользоваться ими или даже внести свой вклад, сделав форк моего репозитория.

Если вы хотите поддержать мою работу, пожалуйста, поделитесь этим курсом со своими друзьями. Также вы можете подписаться на меня в Twitter (@winterbe_), так как я регулярно публикую материалы, связанные с Java и программированием.

* Часть I: Поток и исполнитель (ch4.md)
* Часть II: Синхронизация и блокировки (ch5.md)
* Часть III: Атомарные переменные и ConcurrentMap (ch6.md)

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/wizardforcel-modern-java-zh.git
git@api.gitlife.ru:oschina-mirror/wizardforcel-modern-java-zh.git
oschina-mirror
wizardforcel-modern-java-zh
wizardforcel-modern-java-zh
master