["+path+"]"); }
@Override
public void handleDataDeleted(String path) throws Exception {
System.out.println("узел удалён [" + path + "]");
}
@Override
public void handleDataCreated(String path, Object data) throws Exception {
System.out.println("создан узел [" + path + "]");
}
@Override
public void handleDataChanged(String path, Object data) throws Exception {
System.out.println("изменён узел [" + path + "]");
}
});
String path = "/parent";
ZKClient zkClient = ZKClientBuilder.newZKClient("localhost:2181").build();
// Регистрация прослушивателя
zkClient.listenChildCountChanges(path, new ZKChildCountListener() {
@Override
public void handleSessionExpired(String path, List<String> children) throws Exception {// сессия истекла
System.out.println("children:" + children);
}
@Override
public void handleChildCountChanged(String path, List<String> children) throws Exception {// изменилось количество дочерних узлов
System.out.println("children:" + children);
}
});
String path = "/test";
ZKClient zkClient = ZKClientBuilder.newZKClient()
.servers("localhost:2181")
.build();
// Регистрация прослушивателя
zkClient.listenChildDataChanges(path, new ZKChildDataListener() {
@Override
public void handleSessionExpired(String path, Object data) throws Exception {// истекла сессия
System.out.println("children:"+children);
}
@Override
public void handleChildDataChanged(String path, Object data) throws Exception {// данные дочернего узла изменились
System.out.println("данные дочернего узла изменены:[путь:" + path + ", данные:" + data + "]");
}
@Override
public void handleChildCountChanged(String path, List<String> children) throws Exception {// количество дочерних узлов изменилось
System.out.println("children:"+children);
}
});
ZKClient zkClient = ZKClientBuilder.newZKClient()
.servers("localhost:2181")
.build();
// Регистрация прослушивателя
zkClient.listenStateChanges(new ZKStateListener() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {// состояние клиента изменилось
System.out.println("состояние " + state);
}
@Override
public void handleSessionError(Throwable error) throws Exception {// ошибка при создании сессии
// игнорируем
}
@Override
public void handleNewSession() throws Exception {// создана сессия
System.out.println("новая сессия");
}
});
ZKClient zkClient = ZKClientBuilder.newZKClient()
.servers("localhost:2181")
.build();
final String lockPath = "/zk/lock";
zkClient.createRecursive(lockPath, null, CreateMode.PERSISTENT);
// Создаём распределённую блокировку, не потокобезопасный класс, каждый поток должен создавать отдельный экземпляр.
ZKDistributedLock lock = ZKDistributedLock.newInstance(zkClient,lockPath);
lock.lock(); // получаем блокировку
// выполняем действия
lock.unlock();// снимаем блокировку
Сеть может кратковременно отключаться, что приводит к очень коротким перерывам в работе сети. Однако это может вызвать серьёзные проблемы с распределёнными блокировками.
Например, поток 1 получил распределённую блокировку, но произошло кратковременное отключение сети. Если за это время сервер ZooKeeper удалит временный узел, то распределённая блокировка будет снята, хотя на самом деле работа потока 1 продолжается и он не завис.
Очевидно, что снятие блокировки из-за кратковременного отключения сети обычно нежелательно. Поэтому предоставляется распределённая блокировка с функцией задержки.
Если поток 1 получает блокировку и происходит кратковременное отключение сети, а затем сервер ZooKeeper удаляет временный узел, другие потоки не будут пытаться получить блокировку немедленно, а будут ждать некоторое время. Если в течение этого времени поток 1 успешно подключится, он продолжит удерживать блокировку. Получение и освобождение распределённой блокировки с задержкой
ZKDistributedDelayLock lock = ZKDistributedDelayLock.newInstance(zkClient1, lockPach);
lock.lock(); // Получение блокировки.
// Выполняем какие-либо действия.
lock.unlock(); // Освобождение блокировки.
Выборы лидера
Выборы лидера происходят асинхронно. Для запуска и участия в выборах достаточно вызвать метод selector.start(). Если текущий сервис становится главным, то будет запущен прослушиватель ZKLeaderSelectorListener.
ZKClient zkClient = ZKClientBuilder.newZKClient()
.servers("localhost:2181")
.build();
final String lockPath = "/zk/leader";
final ZKLeaderSelector selector = new ZKLeaderSelector("service1", true, zkClient1, leaderPath,
new ZKLeaderSelectorListener() {
// Функция обратного вызова после того, как текущий сервис стал лидером.
@Override
public void takeLeadership(ZKClient client, ZKLeaderSelector selector) {
// Здесь можно написать код, который должен быть выполнен главным сервисом.
System.out.println("I am the leader-" + selector.getLeader());
}
});
// Запуск и участие в выборах лидера.
selector.start();
// Получение идентификатора текущего главного сервиса.
selector.getLeader();
// Завершение участия в выборах лидера.
selector.close();
Задержка выборов лидера
Например, поток 1 был выбран лидером, но произошло кратковременное отключение сети. Если сервер ZooKeeper удаляет временный узел, другие потоки будут считать, что лидер вышел из строя, и начнут новый раунд выборов. Однако поток 1 продолжает работать без сбоя.
Очевидно, что такая ситуация не является желательной.
Проблема решается с помощью задержки выборов лидера. Если поток 1 становится лидером, а затем происходит кратковременное отключение сети, другие потоки не сразу начинают конкурировать за лидерство, а ждут некоторое время.
Если за это время поток 1 успешно подключится, он сохранит роль лидера.
ZKClient zkClient = ZKClientBuilder.newZKClient()
.servers("localhost:2181")
.build();
String lockPath = "/zk/delayleader";
// Задержка на 3 секунды перед началом выборов.
LeaderSelector selector = new ZKLeaderDelySelector("server1", true,3000, zkClient, leaderPath, new ZKLeaderSelectorListener() {
@Override
public void takeLeadership(ZKClient client, LeaderSelector selector) {
msgList.add("server1 I am the leader");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("server1: I am the leader-" + selector.getLeader());
zkClient.reconnect();
}
});
// Запуск и участие в выборах лидера.
selector.start();
// Получение текущего идентификатора главного сервиса.
selector.getLeader();
// Завершение участия в выборах лидера.
selector.close();
Распределённая очередь
ZKClient zkClient = ZKClientBuilder.newZKClient()
.servers("localhost:2181")
.build();
final String rootPath = "/zk/queue";
zkClient.createRecursive(rootPath, null, CreateMode.PERSISTENT);
// Создание объекта распределённой очереди.
ZKDistributedQueue<String> queue = new ZKDistributedQueue(zkClient, rootPath);
queue.offer("123"); // Добавление элемента.
String value = queue.poll(); // Удаление и получение верхнего элемента.
String value = queue.peek(); // Получение верхнего элемента без удаления.
Главный и подчинённый сервисы с блокировкой
ZKClient zkClient = ZKClientBuilder.newZKClient()
.servers("localhost:2181")
.build();
final String lockPath = "/zk/halock";
zkClient.createRecursive(rootPath, null, CreateMode.PERSISTENT);
// Создаём блокировку, которая не является потокобезопасной. Каждый поток должен создавать свой экземпляр.
ZKHALock lock = ZKHALock.newInstance(zkClient, lockPach);
lock.lock(); // Попытка получить блокировку.
// Если блокировка получена, текущий поток становится главным сервисом. Блокировка будет удерживаться до тех пор, пока главный сервис не выйдет из строя или не разорвёт соединение с сервером ZooKeeper. В этом случае подчинённый сервис попытается получить блокировку и стать новым главным сервисом.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )