1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/kekingcn-kkbinlog

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
浅析jetcd中的KeepAlive实现.md 8.2 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 26.11.2024 19:33 e516fda

Введение

У Java-клиента Etcd есть множество открытых реализаций. Jetcd — это официальный клиент для Java из репозитория Etcd, который похож на официальный go-клиент по общему дизайну и реализации API. Среди прочего, в нём есть два интерфейса для продления срока аренды: keepAliveOnce и keepAlive. Как следует из их названий, keepAliveOnce — это интерфейс для однократного продления аренды, которым практически не пользуются, так как для сохранения аренды его нужно активировать вручную. А keepAlive — это интерфейс автоматического продления аренды. В большинстве случаев достаточно использовать keepAlive, но для разных сценариев необходимо учитывать ещё несколько вопросов, таких как настройка времени жизни аренды (TTL) и обработка исключений при продлении аренды.

Проблема

Есть приложение, которое основано на MySQL и подписывается на изменения данных в двоичном журнале. Это критически важное приложение, потому что оно работает с данными, которые нельзя потерять. Чтобы избежать отказа единственного узла, позже был использован механизм блокировки и продления аренды с помощью Jetcd для обеспечения переключения между основным и резервным сервером за секунды. Более подробно об этом можно прочитать в статье «Etcd выбирает основной узел, реализация отказоустойчивости основного и резервного серверов с переключением за секунды». После запуска системы в эксплуатацию было обнаружено, что переключение между основным и резервным серверами происходит часто, хотя до этого не было случаев сбоя службы двоичного журнала. В итоге проблема была связана с настройкой времени жизни аренды (TTL). Здесь мы просто обозначим проблему и её местоположение, а затем рассмотрим реализацию keepAlive в Jetcd и проанализируем, почему возникла эта проблема.

Реализация KeepAlive

Сначала рассмотрим использование keepAlive:

    private long acquireActiveLease() throws InterruptedException, ExecutionException {
        long leaseId = leaseClient.grant(leaseTTL).get().getID();
        logger.debug("LeaderSelector get leaseId:[{}] and ttl:[{}]", leaseId, leaseTTL);
        this.leaseCloser = leaseClient.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
            @Override
            public void onNext(LeaseKeepAliveResponse value) {
                logger.debug("LeaderSelector lease keeps alive for [{}]s:", value.getTTL());
            }
            @Override
            public void onError(Throwable t) {
                logger.debug("LeaderSelector lease renewal Exception!", t.fillInStackTrace());
                cancelTask();
            }
            @Override
            public void onCompleted() {
                logger.info("LeaderSelector lease renewal completed! start canceling task.");
                cancelTask();
            }
        });
        return leaseId;
    }

Все реализации аренды находятся в классе LeaseImpl. Получив экземпляр LeaseImpl от EtcdClient, сначала вызывается метод grant для установки TTL и получения идентификатора аренды. Затем аренда передаётся в качестве аргумента методу keepAlive, вторым аргументом которого является наблюдатель. Этот наблюдатель имеет три метода: onNext, который вызывается после определения времени следующего продления аренды, onError, который вызывается при возникновении исключения при продлении аренды, и onCompleted, который вызывается, когда срок аренды истекает.

Метод keepAlive выглядит следующим образом:

  public synchronized CloseableClient keepAlive(long leaseId, StreamObserver<LeaseKeepAliveResponse> observer) {
    if (this.closed) {
      throw newClosedLeaseClientException();
    }

    KeepAlive keepAlive = this.keepAlives.computeIfAbsent(leaseId, (key) -> new KeepAlive(leaseId));
    keepAlive.addObserver(observer);

    if (!this.hasKeepAliveServiceStarted) {
      this.hasKeepAliveServiceStarted = true;
      this.start();
    }

    return new CloseableClient() {
      @Override
      public void close() {
        keepAlive.removeObserver(observer);
      }
    };
  }

В классе LeaseImpl поддерживается карта, где ключом является идентификатор аренды (leaseId), а значением — объект KeepAlive. Класс KeepAlive содержит набор наблюдателей, время истечения срока действия (deadLine), следующее время продления (nextKeepAlive) и идентификатор аренды для продления (leaseId). При первом вызове метода keepAlive запускается поток для продления аренды (sendKeepAliveExecutor()) и проверки истечения срока аренды (deadLineExecutor()).

  private void sendKeepAliveExecutor() {
    this.keepAliveResponseObserver = Observers.observer(
      response -> processKeepAliveResponse(response),
      error -> processOnError()
    );
    this.keepAliveRequestObserver = this.leaseStub.leaseKeepAlive(this.keepAliveResponseObserver);
    this.keepAliveFuture = scheduledExecutorService.scheduleAtFixedRate(
        () -> {
            // send keep alive req to the leases whose next keep alive is before now.
            this.keepAlives.entrySet().stream()
                .filter(entry -> entry.getValue().getNextKeepAlive() < System.currentTimeMillis())
                .map(Entry::getKey)
                .map(leaseId -> LeaseKeepAliveRequest.newBuilder().setID(leaseId).build())
                .forEach(keepAliveRequestObserver::onNext);
        },
        0,
        500,
        TimeUnit.MILLISECONDS
    );
  }

Метод sendKeepAliveExecutor является ключевым для реализации функции keepAlive. Он запускается только один раз в экземпляре LeaseImpl и запускает задачу, которая выполняется каждые 500 миллисекунд. Каждый раз задача проверяет, какие объекты KeepAlive имеют время следующего продления меньше текущего времени, и отправляет запрос на продление аренды для этих объектов. Время следующего продления устанавливается в методе processKeepAliveResponse, который обновляет значение nextKeepAlive в объекте KeepAlive.

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/kekingcn-kkbinlog.git
git@api.gitlife.ru:oschina-mirror/kekingcn-kkbinlog.git
oschina-mirror
kekingcn-kkbinlog
kekingcn-kkbinlog
master