У Java-клиента Etcd есть множество открытых реализаций. Jetcd — это официальный клиент для Java из репозитория Etcd, который похож на официальный go-клиент по общему дизайну и реализации API. Среди прочего, в нём есть два интерфейса для продления срока аренды: keepAliveOnce и keepAlive. Как следует из их названий, keepAliveOnce — это интерфейс для однократного продления аренды, которым практически не пользуются, так как для сохранения аренды его нужно активировать вручную. А keepAlive — это интерфейс автоматического продления аренды. В большинстве случаев достаточно использовать keepAlive, но для разных сценариев необходимо учитывать ещё несколько вопросов, таких как настройка времени жизни аренды (TTL) и обработка исключений при продлении аренды.
Есть приложение, которое основано на MySQL и подписывается на изменения данных в двоичном журнале. Это критически важное приложение, потому что оно работает с данными, которые нельзя потерять. Чтобы избежать отказа единственного узла, позже был использован механизм блокировки и продления аренды с помощью Jetcd для обеспечения переключения между основным и резервным сервером за секунды. Более подробно об этом можно прочитать в статье «Etcd выбирает основной узел, реализация отказоустойчивости основного и резервного серверов с переключением за секунды». После запуска системы в эксплуатацию было обнаружено, что переключение между основным и резервным серверами происходит часто, хотя до этого не было случаев сбоя службы двоичного журнала. В итоге проблема была связана с настройкой времени жизни аренды (TTL). Здесь мы просто обозначим проблему и её местоположение, а затем рассмотрим реализацию keepAlive в Jetcd и проанализируем, почему возникла эта проблема.
Сначала рассмотрим использование keepAlive:
private long acquireActiveLease() throws InterruptedException, ExecutionException {
long leaseId = leaseClient.grant(leaseTTL).get().getID();
logger.debug("LeaderSelector get leaseId:[{}] and ttl:[{}]", leaseId, leaseTTL);
this.leaseCloser = leaseClient.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
@Override
public void onNext(LeaseKeepAliveResponse value) {
logger.debug("LeaderSelector lease keeps alive for [{}]s:", value.getTTL());
}
@Override
public void onError(Throwable t) {
logger.debug("LeaderSelector lease renewal Exception!", t.fillInStackTrace());
cancelTask();
}
@Override
public void onCompleted() {
logger.info("LeaderSelector lease renewal completed! start canceling task.");
cancelTask();
}
});
return leaseId;
}
Все реализации аренды находятся в классе LeaseImpl. Получив экземпляр LeaseImpl от EtcdClient, сначала вызывается метод grant для установки TTL и получения идентификатора аренды. Затем аренда передаётся в качестве аргумента методу keepAlive, вторым аргументом которого является наблюдатель. Этот наблюдатель имеет три метода: onNext, который вызывается после определения времени следующего продления аренды, onError, который вызывается при возникновении исключения при продлении аренды, и onCompleted, который вызывается, когда срок аренды истекает.
Метод keepAlive выглядит следующим образом:
public synchronized CloseableClient keepAlive(long leaseId, StreamObserver<LeaseKeepAliveResponse> observer) {
if (this.closed) {
throw newClosedLeaseClientException();
}
KeepAlive keepAlive = this.keepAlives.computeIfAbsent(leaseId, (key) -> new KeepAlive(leaseId));
keepAlive.addObserver(observer);
if (!this.hasKeepAliveServiceStarted) {
this.hasKeepAliveServiceStarted = true;
this.start();
}
return new CloseableClient() {
@Override
public void close() {
keepAlive.removeObserver(observer);
}
};
}
В классе LeaseImpl поддерживается карта, где ключом является идентификатор аренды (leaseId), а значением — объект KeepAlive. Класс KeepAlive содержит набор наблюдателей, время истечения срока действия (deadLine), следующее время продления (nextKeepAlive) и идентификатор аренды для продления (leaseId). При первом вызове метода keepAlive запускается поток для продления аренды (sendKeepAliveExecutor()) и проверки истечения срока аренды (deadLineExecutor()).
private void sendKeepAliveExecutor() {
this.keepAliveResponseObserver = Observers.observer(
response -> processKeepAliveResponse(response),
error -> processOnError()
);
this.keepAliveRequestObserver = this.leaseStub.leaseKeepAlive(this.keepAliveResponseObserver);
this.keepAliveFuture = scheduledExecutorService.scheduleAtFixedRate(
() -> {
// send keep alive req to the leases whose next keep alive is before now.
this.keepAlives.entrySet().stream()
.filter(entry -> entry.getValue().getNextKeepAlive() < System.currentTimeMillis())
.map(Entry::getKey)
.map(leaseId -> LeaseKeepAliveRequest.newBuilder().setID(leaseId).build())
.forEach(keepAliveRequestObserver::onNext);
},
0,
500,
TimeUnit.MILLISECONDS
);
}
Метод sendKeepAliveExecutor является ключевым для реализации функции keepAlive. Он запускается только один раз в экземпляре LeaseImpl и запускает задачу, которая выполняется каждые 500 миллисекунд. Каждый раз задача проверяет, какие объекты KeepAlive имеют время следующего продления меньше текущего времени, и отправляет запрос на продление аренды для этих объектов. Время следующего продления устанавливается в методе processKeepAliveResponse, который обновляет значение nextKeepAlive в объекте KeepAlive.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )