J2Executor — это оболочка для пула потоков Java, изначально предназначенная для предоставления модели вторичного пула потоков. Ниже описывается использование вторичного пула потоков.
При реализации бизнес-логики часто возникают ситуации с разветвлёнными задачами. Мы можем поместить несколько несвязанных задач в несколько пулов потоков и выполнить их параллельно, а затем объединить результаты после завершения всех ветвей. Благодаря параллельной обработке ветвей можно сократить время обработки запросов.
Однако это также приводит к проблеме: поскольку задачи асинхронны, они могут быть отправлены в пул потоков, но не обязательно будут выполнены немедленно (из-за настройки самого пула, задача может потребоваться в очереди), что может привести к замедлению выполнения задачи.
Кроме того, из-за проблемы с очередью, возможность немедленного выполнения задачи зависит от того, занимают ли другие задачи слишком много ресурсов пула потоков. Из-за ненадёжности «других задач» мы часто реализуем «изоляцию пула потоков».
Мы надеемся, что каждая основная ветка будет выполняться отдельно в своём собственном пуле потоков, чтобы избежать внезапного всплеска некоторых задач канала (внезапный всплеск может вызвать сбой, например, неудачная обработка запроса, большое количество повторных попыток вверх по течению, удвоение количества запросов), что повлияет на задачи других каналов.
Но после изоляции пула потоков возникла новая проблема. Нам нужно настроить отдельные параметры потока для каждого канала. Если конфигурация неразумна, ресурсы будут потрачены впустую (много холостого хода) или некоторые запросы канала станут слишком большими, что приведёт к блокировке задач, и, самое главное, после изоляции потока каждый тип задачи ограничен использованием фиксированных ресурсов, что делает способность противостоять внезапным всплескам слабее (если есть общий пул потоков, то при внезапном всплеске трафика можно использовать больше ресурсов потока, чего нельзя сделать после изоляции).
Поэтому, чтобы снизить риски ручной настройки пула потоков и повысить способность справляться с внезапными всплесками трафика, я разработал эту структуру вторичного пула потоков.
J2executor решает проблему переполнения задач и может в определённой степени противостоять внезапному увеличению трафика, одновременно обеспечивая изоляцию задач каждого бизнеса (даже если происходит внезапное увеличение трафика, это не повлияет на другие бизнесы и не приведёт к сбою всей системы), кроме того, J2executor также может противостоять проблеме неравномерного распределения ресурсов и блокировки задач или холостого хода при изоляции пула потоков.
/**
* Created by virjar on 2018/2/25.<br>
* Использование примера
*/
public class J2ExecutorTest {
public static void main(String[] args) {
// Бизнес-пул потоков
// Пул потоков №1 использует LinkedBlockingDeque, не отклоняет задачи, но когда задачи блокируются, это вызовет тайм-аут
ThreadPoolExecutor pool1 = new ThreadPoolExecutor(2, 3, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(), new PrintLogRejectHandler("Пул потоков №1"));
// Пул потоков №2 использует ArrayBlockingQueue, задачи будут отклонены, и будет напечатан журнал
ThreadPoolExecutor pool2 = new ThreadPoolExecutor(2, 3, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10), new PrintLogRejectHandler("Пул потоков №2"));
// 1, 2 пулы потоков, их задачи будут отправлены в первичный пул потоков для выполнения (когда их собственные пулы заняты), чтобы противостоять проблемам внезапного увеличения трафика
// Пул потоков №3 использует LinkedBlockingDeque, скорость отправки задач меньше скорости потребления, пул потоков находится в нормальном состоянии
ThreadPoolExecutor pool3 = new ThreadPoolExecutor(2, 3, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(), new PrintLogRejectHandler("Пул потоков №3"));
// Пул потоков №4 использует ArrayBlockingQueue, скорость отправки задач меньше скорости потребления, пул потоков находится в нормальном состоянии
ThreadPoolExecutor pool4 = new ThreadPoolExecutor(2, 3, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10), new PrintLogRejectHandler("Пул потоков №4"));
final List<ThreadPoolExecutor> pools = new ArrayList<>();
pools.add(pool1);
pools.add(pool2);
pools.add(pool3);
pools.add(pool4);
// Использовать двухуровневый пул потоков для связывания каждого бизнес-пула потоков
new J2Executor(new ThreadPoolExecutor(20, 30, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10)))
.registrySubThreadPoolExecutors(pools);
// Запустить четыре задачи и одновременно отправить задачи в каждый собственный пул потоков
for (int i = 0; i < pools.size(); i++) {
new WorkThread(pools.get(i), (i + 1)).start();
}
}
private static class WorkThread extends Thread {
private ThreadPoolExecutor poolExecutor;
private int index;
public WorkThread(ThreadPoolExecutor poolExecutor, int index) {
this.poolExecutor = poolExecutor;
this.index = index;
}
@Override
public void run() {
for (int i = 0; i < 100000; i++) {
final long startTimestamp = System.currentTimeMillis();
poolExecutor.submit(new Runnable() {
@Override
public void run() {
// Каждая задача выполняется 2 секунды
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Задача группы " + index + " выполняется, время завершения: "
+ (System.currentTimeMillis() - startTimestamp) / 1000 + "s");
}
});
// Задачи групп 1 и 2 заполняются напрямую, остальные задачи отправляют одну задачу каждые 3 секунды
long sleepTime;
if (index == 1 || index == 2) {
sleepTime = 50;
} else {
sleepTime = 1500;
}
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
poolExecutor.shutdown();
}
}
private static class
``` **Класс PrintLogRejectHandler реализует RejectedExecutionHandler**
private String tag;
public PrintLogRejectHandler(String tag) { this.tag = tag; }
@Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(tag + "任务被拒绝"); }
**Тестирование эффекта**
На изображении (sample.png) показано:
1. В первом потоке задач при резком увеличении нагрузки используется стратегия кэширования задач, поэтому задачи не отбрасываются, но это может привести к серьёзному превышению времени ожидания выполнения задач.
2. Во втором потоке задач при резком увеличении нагрузки применяется стратегия отбрасывания задач, что приводит к отклонению некоторых задач и сохранению в очереди до 10 задач. Превышение времени ожидания происходит относительно редко.
3. Третий и четвёртый потоки задач работают при нормальной нагрузке, и задачи выполняются успешно.
4. Резкое увеличение нагрузки в первом и втором потоках задач не влияет на выполнение задач в третьем и четвёртом потоках, которые всегда успешно завершаются в течение 2 секунд.
5. Часть задач из первого и второго потоков, которые не были выполнены, обрабатываются с помощью основного потока.
**Рекомендации по настройке параметров**
Рекомендуется увеличить размер основного пула потоков и уменьшить размер дополнительного пула. В большинстве случаев дополнительный пул будет переполняться. Это позволит максимально использовать ресурсы и повысить устойчивость к резким увеличениям нагрузки. Дополнительный пул должен обеспечивать независимость основных бизнес-процессов от внешних воздействий.
**Использование в работе**
Эта модель была использована в системе ценообразования. Код для дополнительного пула был выпущен 18 марта 2018 года. После этого в мониторинге не наблюдалось резких скачков времени обработки запросов. Это демонстрирует способность дополнительного пула справляться с резкими увеличениями нагрузки.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )