1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/yuluo-zkclient

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

["+path+"]"); }

        @Override
        public void handleDataDeleted(String path) throws Exception {
            System.out.println("узел удалён [" + path + "]");
        }

        @Override
        public void handleDataCreated(String path, Object data) throws Exception {
            System.out.println("создан узел [" + path + "]");
        }

        @Override
        public void handleDataChanged(String path, Object data) throws Exception {
            System.out.println("изменён узел [" + path + "]");
        }
    });

Подсчёт количества дочерних узлов

String path = "/parent";
ZKClient zkClient = ZKClientBuilder.newZKClient("localhost:2181").build();
// Регистрация прослушивателя
zkClient.listenChildCountChanges(path, new ZKChildCountListener() {
    
    @Override
    public void handleSessionExpired(String path, List<String> children) throws Exception {// сессия истекла
        System.out.println("children:" + children);
    }

    @Override
    public void handleChildCountChanged(String path, List<String> children) throws Exception {// изменилось количество дочерних узлов
         System.out.println("children:" + children);
    }
});

Прослушивание изменений в количестве дочерних узлов и их данных

String path = "/test";
ZKClient zkClient = ZKClientBuilder.newZKClient()
                            .servers("localhost:2181")
                            .build();

// Регистрация прослушивателя
zkClient.listenChildDataChanges(path, new ZKChildDataListener() {
    @Override
    public void handleSessionExpired(String path, Object data) throws Exception {// истекла сессия
       System.out.println("children:"+children);
    }

    @Override
    public void handleChildDataChanged(String path, Object data) throws Exception {// данные дочернего узла изменились
        System.out.println("данные дочернего узла изменены:[путь:" + path + ", данные:" + data + "]");
    }

    @Override
    public void handleChildCountChanged(String path, List<String> children) throws Exception {// количество дочерних узлов изменилось
           System.out.println("children:"+children);
    }
});

Мониторинг состояния клиента

ZKClient zkClient = ZKClientBuilder.newZKClient()
                            .servers("localhost:2181")
                            .build();
// Регистрация прослушивателя
zkClient.listenStateChanges(new ZKStateListener() {
        
        @Override
        public void handleStateChanged(KeeperState state) throws Exception {// состояние клиента изменилось
            System.out.println("состояние " + state);
        }

        @Override
        public void handleSessionError(Throwable error) throws Exception {// ошибка при создании сессии
            // игнорируем
        }

        @Override
        public void handleNewSession() throws Exception {// создана сессия
            System.out.println("новая сессия");
        }
    });

Четыре, расширенные функции

Распределённая блокировка

ZKClient zkClient = ZKClientBuilder.newZKClient()
                            .servers("localhost:2181")
                            .build();
final String lockPath = "/zk/lock";
zkClient.createRecursive(lockPath, null, CreateMode.PERSISTENT);
// Создаём распределённую блокировку, не потокобезопасный класс, каждый поток должен создавать отдельный экземпляр.
ZKDistributedLock lock = ZKDistributedLock.newInstance(zkClient,lockPath);

lock.lock(); // получаем блокировку

// выполняем действия

lock.unlock();// снимаем блокировку

Отложенное получение распределённой блокировки

Сеть может кратковременно отключаться, что приводит к очень коротким перерывам в работе сети. Однако это может вызвать серьёзные проблемы с распределёнными блокировками.

Например, поток 1 получил распределённую блокировку, но произошло кратковременное отключение сети. Если за это время сервер ZooKeeper удалит временный узел, то распределённая блокировка будет снята, хотя на самом деле работа потока 1 продолжается и он не завис.

Очевидно, что снятие блокировки из-за кратковременного отключения сети обычно нежелательно. Поэтому предоставляется распределённая блокировка с функцией задержки.

Если поток 1 получает блокировку и происходит кратковременное отключение сети, а затем сервер ZooKeeper удаляет временный узел, другие потоки не будут пытаться получить блокировку немедленно, а будут ждать некоторое время. Если в течение этого времени поток 1 успешно подключится, он продолжит удерживать блокировку. Получение и освобождение распределённой блокировки с задержкой

ZKDistributedDelayLock lock = ZKDistributedDelayLock.newInstance(zkClient1, lockPach);
lock.lock(); // Получение блокировки.  

// Выполняем какие-либо действия.

lock.unlock(); // Освобождение блокировки.

Выборы лидера

Выборы лидера происходят асинхронно. Для запуска и участия в выборах достаточно вызвать метод selector.start(). Если текущий сервис становится главным, то будет запущен прослушиватель ZKLeaderSelectorListener.

ZKClient zkClient = ZKClientBuilder.newZKClient()
                        .servers("localhost:2181")
                        .build();
final String lockPath = "/zk/leader";
final ZKLeaderSelector selector = new ZKLeaderSelector("service1", true, zkClient1, leaderPath, 
    new ZKLeaderSelectorListener() {
        
        // Функция обратного вызова после того, как текущий сервис стал лидером.
        @Override
        public void takeLeadership(ZKClient client, ZKLeaderSelector selector) {
            // Здесь можно написать код, который должен быть выполнен главным сервисом.
            System.out.println("I am the leader-" + selector.getLeader());
        }
    });
// Запуск и участие в выборах лидера.
selector.start();

// Получение идентификатора текущего главного сервиса.
selector.getLeader();

// Завершение участия в выборах лидера.
selector.close();

Задержка выборов лидера

Например, поток 1 был выбран лидером, но произошло кратковременное отключение сети. Если сервер ZooKeeper удаляет временный узел, другие потоки будут считать, что лидер вышел из строя, и начнут новый раунд выборов. Однако поток 1 продолжает работать без сбоя.

Очевидно, что такая ситуация не является желательной.

Проблема решается с помощью задержки выборов лидера. Если поток 1 становится лидером, а затем происходит кратковременное отключение сети, другие потоки не сразу начинают конкурировать за лидерство, а ждут некоторое время.

Если за это время поток 1 успешно подключится, он сохранит роль лидера.

ZKClient zkClient = ZKClientBuilder.newZKClient()
                        .servers("localhost:2181")
                        .build();
String lockPath = "/zk/delayleader";
// Задержка на 3 секунды перед началом выборов.
LeaderSelector selector = new ZKLeaderDelySelector("server1", true,3000, zkClient, leaderPath, new ZKLeaderSelectorListener() {
        
        @Override
        public void takeLeadership(ZKClient client, LeaderSelector selector) {
            msgList.add("server1 I am the leader");
           
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("server1: I am the leader-" + selector.getLeader());
            zkClient.reconnect();
        }
    });
// Запуск и участие в выборах лидера.
selector.start();

// Получение текущего идентификатора главного сервиса.
selector.getLeader();

// Завершение участия в выборах лидера.
selector.close();

Распределённая очередь

ZKClient zkClient = ZKClientBuilder.newZKClient()
                        .servers("localhost:2181")
                        .build();
final String rootPath = "/zk/queue";
zkClient.createRecursive(rootPath, null, CreateMode.PERSISTENT);

// Создание объекта распределённой очереди.
ZKDistributedQueue<String> queue = new ZKDistributedQueue(zkClient, rootPath);

queue.offer("123"); // Добавление элемента.

String value = queue.poll(); // Удаление и получение верхнего элемента.

String value =  queue.peek(); // Получение верхнего элемента без удаления.

Главный и подчинённый сервисы с блокировкой

ZKClient zkClient = ZKClientBuilder.newZKClient()
                        .servers("localhost:2181")
                        .build();
final String lockPath = "/zk/halock";
zkClient.createRecursive(rootPath, null, CreateMode.PERSISTENT);

// Создаём блокировку, которая не является потокобезопасной. Каждый поток должен создавать свой экземпляр.
ZKHALock lock = ZKHALock.newInstance(zkClient, lockPach);

lock.lock(); // Попытка получить блокировку.

// Если блокировка получена, текущий поток становится главным сервисом. Блокировка будет удерживаться до тех пор, пока главный сервис не выйдет из строя или не разорвёт соединение с сервером ZooKeeper. В этом случае подчинённый сервис попытается получить блокировку и стать новым главным сервисом.

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Это клиент ZooKeeper, который реализует восстановление после обрыва соединения, восстановление после истечения времени сеанса, постоянное наблюдение и отслеживание изменений данных в дочерних узлах. Также он включает в себя часто используемые функции, такие как распределённая блокировка, выбор лидера, блокировка ведущий-ведомый, распределённая о... Развернуть Свернуть
Apache-2.0
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/yuluo-zkclient.git
git@api.gitlife.ru:oschina-mirror/yuluo-zkclient.git
oschina-mirror
yuluo-zkclient
yuluo-zkclient
master