Можно вернуть результат типа Future, который можно использовать для получения фактического результата в будущем.
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);
System.out.println("future done? " + future.isDone());
Integer result = future.get();
System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);
После того как мы отправим callable исполнителю, мы сначала проверяем, завершился ли он, с помощью вызова isDone(). Я уверен, что это произойдёт, потому что перед возвратом целого числа callable будет спать одну минуту.
При вызове метода get() текущий поток блокируется до тех пор, пока не завершится выполнение callable и не вернётся результат 123. Теперь, когда будущее выполнено, мы можем увидеть следующий результат на консоли:
future done? false
future done? true
result: 123
Future тесно связан с базовым исполнителем. Помните, что если вы закроете исполнителя, все незавершённые futures будут генерировать исключения.
executor.shutdownNow();
future.get();
Вы могли заметить, что мы создали исполнителя немного иначе, чем в предыдущем примере. Мы использовали newFixedThreadPool (1), чтобы создать однопоточный пул потоков для службы исполнителя. Это эквивалентно использованию newSingleThreadExecutor, но с последним мы можем позже увеличить размер пула, просто передав большее значение.
Любой вызов future.get() будет блокироваться и ждать завершения callable. В худшем случае callable может выполняться бесконечно, и ваша программа перестанет отвечать. Мы можем передать время ожидания, чтобы избежать этой ситуации.
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(2);
return 123;
}
catch (InterruptedException e) {
throw new IllegalStateException("task interrupted", e);
}
});
future.get(1, TimeUnit.SECONDS);
Выполнение этого кода вызовет исключение TimeoutException:
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
Возможно, вы уже догадались, почему возникло это исключение. Мы указали максимальное время ожидания в одну минуту, а callable потребовалось две минуты, чтобы вернуть результат.
Executors поддерживает одновременную отправку нескольких callables с помощью invokeAll(). Этот метод возвращает список futures.
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
() -> "task1",
() -> "task2",
() -> "task3");
executor.invokeAll(callables)
.stream()
.map(future -> {
try {
return future.get();
}
catch (Exception e) {
throw new IllegalStateException(e);
}
})
.forEach(System.out::println);
В этом примере мы используем функции потока Java8 (stream) для обработки всех futures, возвращаемых invokeAll(). Сначала мы сопоставляем каждый future с его результатом, а затем выводим каждый результат в консоль. Если вы ещё не знакомы с потоками, вы можете прочитать мой учебник по Java8 Stream.
Другой способ отправки нескольких callables — invokeAny(), который работает немного иначе, чем invokeAll(). Во время ожидания future этот метод блокируется, пока один из callables не завершит работу и не вернёт результат.
Чтобы проверить это поведение, мы используем вспомогательный метод для имитации callables с разным временем выполнения. Этот метод возвращает callable, который спит указанное количество секунд, прежде чем вернуть заданный результат.
Callable<String> callable(String result, long sleepSeconds) {
return () -> {
TimeUnit.SECONDS.sleep(sleepSeconds);
return result;
};
}
Мы используем этот метод для создания набора callables с разными временами выполнения, от одной до трёх минут. С помощью invokeAny() мы отправляем эти callables исполнителю и получаем самый быстрый результат — в данном случае задача 2:
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));
String result = executor.invokeAny(callables);
System.out.println(result);
// => task2
Этот пример также использует другой способ создания исполнителя — вызов newWorkStealingPool(). Этот фабричный метод был представлен в Java8 и возвращает тип ForkJoinPool, который отличается от других распространённых исполнителей. Вместо использования фиксированного размера пула потоков, ForkJoinPools используют параллельный фактор для создания, по умолчанию равный количеству доступных ядер процессора на хост-машине.
ForkJoinPools были представлены в Java7 и будут подробно рассмотрены в следующих уроках. Давайте углубимся в понимание запланированных исполнителей, чтобы завершить этот урок.
Теперь, когда мы изучили, как отправлять и выполнять задачи в исполнителе, мы можем использовать запланированные исполнители для многократного выполнения обычных задач.
ScheduledExecutorService поддерживает планирование задач для непрерывного или отложенного выполнения.
Следующий пример планирует задачу, которая будет выполнена через три минуты:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);
TimeUnit.MILLISECONDS.sleep(1337);
long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
Планирование задачи создаёт специальный тип future — ScheduleFuture, который предоставляет все методы Future и метод getDelay() для получения оставшегося времени задержки. Когда задержка истечёт, задача будет выполнена параллельно.
Для планирования задач для постоянного выполнения исполнители предоставляют... Предоставили два метода scheduleAtFixedRate()
и scheduleWithFixedDelay()
. Первый метод используется для выполнения задачи с фиксированной частотой, например, в следующем примере — каждую минуту:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
Также этот метод принимает начальную задержку, чтобы указать время ожидания перед первым выполнением задачи.
Обратите внимание: scheduleAtFixedRate()
не учитывает фактическое время выполнения задачи. Поэтому, если вы укажете период в одну минуту, а задача будет выполняться две минуты, пул потоков будет выполнять её быстрее для повышения производительности.
В этом случае следует рассмотреть использование scheduleWithFixedDelay()
, который работает аналогично описанному выше методу. Разница заключается в том, что задержка применяется между завершением одной задачи и началом следующей. Например:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("Scheduling: " + System.nanoTime());
}
catch (InterruptedException e) {
System.err.println("task interrupted");
}
};
executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
Этот пример планирует задачу и устанавливает фиксированную задержку в одну минуту между завершением одного выполнения и началом следующего. Начальная задержка равна нулю, время выполнения задачи также равно нулю. Таким образом, мы завершаем выполнение задачи через 0 секунд, 3 секунды, 6 секунд, 9 секунд и так далее. Как видите, scheduleWithFixedDelay()
полезен, когда вы не можете предсказать продолжительность выполнения запланированной задачи.
Это первая часть серии руководств по параллелизму. Я рекомендую вам самостоятельно попробовать примеры кода, представленные выше. Вы можете найти все примеры кода из этой статьи на GitHub, поэтому я приглашаю вас разветвить этот репозиторий и отслеживать его.
Надеюсь, вам понравится эта статья. Если у вас есть какие-либо вопросы, вы можете оставить комментарий ниже или связаться со мной через Twitter.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )