Этот текст был взят из springboot-guide: https://github.com/Snailclimb/springboot-guide (Сборник основных知识点整理。基于Spring Boot 2.19+。)
В этой статье вы узнаете следующие知识点:
ThreadPoolTaskExecutor
;Асинхронное программирование очень полезно при работе с долгими операциями и обработкой множества задач одновременно. Это позволяет лучше использовать процессор и память системы, повышая их производительность. Существует множество паттернов многопоточного программирования, и паттерн Future является одним из самых распространенных в многопоточном программировании. В этой статье мы будем использовать этот паттерн для объяснения асинхронного программирования на SpringBoot.
Перед практическими примерами я кратко опишу основные идеи паттерна Future.Основная идея паттерна Future заключается в асинхронном вызове. Когда мы вызываем метод, содержащий несколько долгих задач, которые нужно выполнить одновременно, и нам не нужно ждать результата сразу, мы можем позволить клиенту немедленно вернуться, а затем продолжить вычисления в фоновом режиме. Конечно, вы также можете выбрать дождаться выполнения всех задач и только затем вернуть результат клиенту. В Java есть хорошая поддержка для этого, и я подробно сравню эти два подхода в последующих примерах.## Практическое применение асинхронного программирования на SpringBoot
Если вам нужно реализовать асинхронное программирование на SpringBoot, две аннотации, предоставляемые Spring, делают это очень простым.
@EnableAsync
: добавьте эту аннотацию к конфигурационному классу или основному классу, чтобы включить поддержку асинхронных методов.@Async
может быть применена к классу или методу. Если она применяется к классу, это означает, что все методы этого класса являются асинхронными.TaskExecutor
Многие люди не очень хорошо знакомы с TaskExecutor
, поэтому мы потратим немного времени на его описание. Из названия можно понять, что это исполнитель задач, который управляет потоками для выполнения задач, как командующий, а потоки — как отдельные армии, которые могут асинхронно атаковать противника👊.
Spring предоставляет интерфейс TaskExecutor
как абстракцию исполнителя задач, который очень похож на интерфейс Executor
из пакета java.util.concurrent
. Сlightly different, the TaskExecutor
interface uses Java 8 syntax @FunctionalInterface
to declare this interface as a functional interface.org.springframework.core.task.TaskExecutor
@FunctionalInterface
public interface TaskExecutor extends Executor {
void execute(Runnable var1);
}
Если не задан пользовательский Executor, Spring создаст SimpleAsyncTaskExecutor
и будет использовать его.```java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/** @author shuang.kou */ @Configuration @EnableAsync public class AsyncConfig implements AsyncConfigurer {
private static final int CORE_POOL_SIZE = 6; private static final int MAX_POOL_SIZE = 10; private static final int QUEUE_CAPACITY = 100;
@Bean public Executor taskExecutor() { // По умолчанию Spring настраивает количество ядерных потоков на 1, максимальное количество потоков не ограничено, а также размер очереди. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // Количество ядерных потоков executor.setCorePoolSize(CORE_POOL_SIZE); // Максимальное количество потоков executor.setMaxPoolSize(MAX_POOL_SIZE); // Размер очереди executor.setQueueCapacity(QUEUE_CAPACITY); // Когда очередь заполнена, эта политика гарантирует, что запросы задач не будут потеряны, но это может повлиять на общую производительность приложения. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-"); executor.initialize(); return executor; } }
**Основные понятия `ThreadPoolTaskExecutor`:**- **Core Pool Size :** Количество ядерных потоков определяет минимальное количество одновременно выполняемых потоков.
- **Queue Capacity :** Когда поступает новая задача, сначала проверяется, достигло ли количество текущих потоков значения ядерных потоков. Если достигнуто, задача будет помещена в очередь.
- **Maximum Pool Size :** Когда количество задач в очереди достигает максимального размера очереди, количество одновременно выполняемых потоков становится максимальным. **Что произойдет, если очередь уже заполнена, а количество одновременно выполняющихся потоков достигло максимального значения, и поступит новая задача?**По умолчанию Spring использует `ThreadPoolExecutor.AbortPolicy`. В этом случае `ThreadPoolExecutor` выбросит исключение `RejectedExecutionException`, отклонив новую задачу. Это означает, что обработка этой задачи будет потеряна. Для масштабируемых приложений рекомендуется использовать `ThreadPoolExecutor.CallerRunsPolicy`. Эта стратегия выполняет задачу в том потоке, который её отправил, что обеспечивает масштабируемость при заполненной максимальной очереди.
**Определение стратегии насыщения `ThreadPoolTaskExecutor`:**
Если количество одновременно выполняющихся потоков достигло максимального значения, `ThreadPoolTaskExecutor` определяет следующие стратегии:
- **ThreadPoolExecutor.AbortPolicy**: Выбрасывает исключение `RejectedExecutionException`, чтобы отклонить обработку новой задачи.
- **ThreadPoolExecutor.CallerRunsPolicy**: Задача выполняется в том потоке, который её отправил. Вы не потеряете запрос на задачу. Однако эта стратегия может замедлить скорость подачи новых задач, что может негативно сказаться на производительности программы. Кроме того, эта стратегия предпочитает увеличение размера очереди. Если ваше приложение может выдержать эту задержку и вы не можете позволить себе потерю даже одного запроса на задачу, вы можете выбрать эту стратегию.
- **ThreadPoolExecutor.DiscardPolicy**: Новая задача не обрабатывается и просто отбрасывается.
- **ThreadPoolExecutor.DiscardOldestPolicy**: Эта стратегия отбрасывает самую старую необработанную задачу.### 2. Написание асинхронного метода
Ниже представлен пример метода, который ищет фильмы, начинающиеся с определённой буквы. Мы добавили аннотацию `@Async`, чтобы указать Spring, что это асинхронный метод. Возвращаемое значение метода `CompletableFuture.completedFuture(results)` означает, что нам нужно вернуть результат, то есть программа должна завершить выполнение задачи перед тем, как вернуть результат пользователю.
**Обратите внимание на первую строку лога в методе `completableFutureTask`, эта строка будет использована в анализе программы и является очень важной!**
```java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/** @author shuang.kou */
@Service
public class AsyncService {
private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);
private List<String> movies =
new ArrayList<>(Arrays.asList(
"Forrest Gump",
"Titanic",
"Spirited Away",
"The Shawshank Redemption",
"Zootopia",
"Farewell",
"Joker",
"Crawl"));
``` /**
* Пример использования: найти фильмы, начинающиеся с определенного символа/строки
*/
@Async
public CompletableFuture<List<String>> completableFutureTask(String start) {
// Вывод лога
logger.warn(Thread.currentThread().getName() + " начал выполнение задачи!");
// Найти фильмы, начинающиеся с определенного символа/строки
List<String> results =
movies.stream().filter(movie -> movie.startsWith(start)).collect(Collectors.toList());
// Моделирование долгой задачи
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Возвращение нового CompletableFuture, завершенного заданным значением
return CompletableFuture.completedFuture(results);
}
}```
### 3. Тестирование написанного асинхронного метода
```java
/** @author shuang.kou */
@RestController
@RequestMapping("/async")
public class AsyncController {
@Autowired
AsyncService asyncService;
@GetMapping("/movies")
public String completableFutureTask() throws ExecutionException, InterruptedException {
// Время начала
long start = System.currentTimeMillis();
// Начало выполнения большого количества асинхронных задач
List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
List<CompletableFuture<List<String>>> completableFutureList =
words.stream()
.map(word -> asyncService.completableFutureTask(word))
.collect(Collectors.toList());
// Метод CompletableFuture.join() позволяет получить результаты и объединить их
List<List<String>> results = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
// Вывод результата и времени выполнения программы
System.out.println("Время выполнения: " + (System.currentTimeMillis() - start));
return results.toString();
}
}
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-1] g.j.a.service.AsyncService : Начал выполнение задачи ThreadPoolTaskExecutor-1! 2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-6] g.j.a.service.AsyncService : Начал выполнение задачи ThreadPoolTaskExecutor-6! 2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-5] g.j.a.service.AsyncService : Начал выполнение задачи ThreadPoolTaskExecutor-5! 2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-4] g.j.a.service.AsyncService : Начал выполнение задачи ThreadPoolTaskExecutor-4! 2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-3] g.j.a.service.AsyncService : Начал выполнение задачи ThreadPoolTaskExecutor-3! 2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-2] g.j.a.service.AsyncService : Начал выполнение задачи ThreadPoolTaskExecutor-2! Затраченное время: 1010
Сначала мы можем заметить, что время, затраченное на выполнение всех задач, составляет примерно 1 с. Это связано с нашим пользовательским `ThreadPoolTaskExecutor`, в котором мы настроили количество ядерных потоков на 6, а затем с помощью следующего кода симулировали распределение 6 задач для выполнения системой. Таким образом, каждый поток получает по одной задаче, и время выполнения каждой задачи составляет 1 с, поэтому общее время выполнения 6 задач составляет 1 с.```java
List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
List<CompletableFuture<Void>> completableFutureList =
words.stream()
.map(word -> asyncService.completableFutureTask(word))
.collect(Collectors.toList());
Вы можете самостоятельно проверить это, попробовав изменить количество ядерных потоков на 3, и снова запросить этот интерфейс. Вы заметите, что время, затраченное на выполнение всех задач, составляет примерно 2 с.
Кроме того, из вышеупомянутого результата можно заключить, что результаты возвращаются только после завершения всех задач. В этом случае мы должны вернуть результаты выполнения задач клиенту. А что, если нам не нужно возвращать результаты выполнения задач клиенту? Например, если мы загружаем большой файл в систему, и после загрузки проверяем, соответствует ли он требованиям. В обычной ситуации нам нужно ждать завершения загрузки файла, прежде чем вернуть сообщение пользователю, что это может занять很长时间。采用异步的方式,用户上传文件后立即返回消息给用户,然后系统默默地处理上传任务。这也带来了一些麻烦,因为文件可能会上传失败,因此系统也需要一些机制来补偿这个问题,比如当上传出现问题时,发送消息通知用户。
Вот пример ситуации, когда клиенту не нужно возвращать результаты:
Измените метод completableFutureTask
на void тип:
@Async
public void completableFutureTask(String start) {
......
// Здесь может быть обработка результата выполнения задачи, например, запись в базу данных и т.д.
// doSomeThingWithResults(results);
}
```Измените код контроллера следующим образом:
```java
@GetMapping("/movies")
public String completableFutureTask() throws ExecutionException, InterruptedException {
// Запуск таймера
long start = System.currentTimeMillis();
// Запуск нескольких асинхронных поисков
List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
words.stream()
.forEach(word -> asyncService.completableFutureTask(word));
// Ожидание завершения всех задач
// Вывод результатов, включая затраченное время
System.out.println("Затраченное время: " + (System.currentTimeMillis() - start));
return "Завершено";
}
Запросите этот интерфейс, и в консоли будет выведено следующее содержимое:
Затраченное время: 0
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-4] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-4start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-3] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-3start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-2] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-2start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-1] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-1start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-6] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-6start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-5] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-5start this task!
Можно заметить, что система сразу возвращает результат пользователю, а затем начинает выполнять задачу.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )