1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/mirrors-KSQL

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
klip-26-java-client-interfaces.md 57 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 28.11.2024 07:33 e6dfb69

KLIP 26 — Java-клиентские интерфейсы

Автор: Виктория Ся (@vcrfxia) | Целевая версия: 0.10.0; 6.0.0 | Статус: объединён | Обсуждение: GitHub PR

Краткое содержание: KLIP 15 уже привёл аргументы в пользу введения Java-клиента для ksqlDB. Этот KLIP предлагает интерфейсы для клиента.

Мотивация и предыстория

См. KLIP 15.

Что входит в рамки проекта

Java-клиент будет поддерживать следующие операции:

  • Push-запрос;
  • Pull-запрос;
  • Операции DDL SELECT:
    • CREATE STREAM;
    • CREATE TABLE;
    • CREATE STREAM... AS SELECT;
    • CREATE TABLE... AS SELECT;
    • DROP STREAM;
    • DROP TABLE;
    • INSERT INTO;
    • TERMINATE ;
  • Операции администрирования SELECT:
    • SHOW TOPICS (нерасширенные);
    • SHOW STREAMS (нерасширенные);
    • SHOW TABLES (нерасширенные);
    • SHOW QUERIES (нерасширенные);
  • Вставка значений, то есть вставка строк в существующий поток;
  • Завершение push-запроса (через конечную точку /close-query). Цель этого KLIP — достичь соглашения по интерфейсам / публичным API. Детали реализации не рассматриваются.

Что не входит в рамки проекта

Этот KLIP не охватывает поддержку Java-клиентом следующих функций:

  • DESCRIBE <stream/table>, DESCRIBE FUNCTION;
  • EXPLAIN ;
  • PRINT ;
  • SHOW TOPICS EXTENDED, SHOW <STREAMS/TABLES> EXTENDED, SHOW QUERIES EXTENDED;
  • CREATE CONNECTOR, DROP CONNECTOR, SHOW CONNECTORS, DESCRIBE CONNECTOR;
  • CREATE TYPE, DROP TYPE, SHOW TYPES;
  • SHOW FUNCTIONS, SHOW PROPERTIES;
  • RUN SCRIPT;
  • Использование других конечных точек (info, healthcheck, terminate cluster, status и т. д.). Мы всегда можем добавить поддержку этих операций в будущем, если потребуется. Как и выше, детали реализации выходят за рамки, поскольку цель этого KLIP состоит в том, чтобы достичь согласия по интерфейсам / публичным API.

Значение/возврат

См. KLIP 15.

Публичные API

В следующих подразделах описаны методы интерфейса Client:

public interface Client {
    ...
}

Конструкторы

static Client create(ClientOptions clientOptions) {
  return new ClientImpl(clientOptions);
}

static Client create(ClientOptions clientOptions, Vertx vertx) {
  return new ClientImpl(clientOptions, vertx);
}

Java-клиент будет реализован как Vert.x HttpClient. Мы предоставляем конструктор, который позволяет пользователям предоставлять свой собственный экземпляр Vertx, чтобы при желании воспользоваться общим пулом соединений и другими свойствами.

ClientOptions изначально будет выглядеть следующим образом:

public interface ClientOptions {

  ClientOptions setHost(String host);

  ClientOptions setPort(int port);

  ClientOptions setUseTls(boolean useTls);
  
  ClientOptions setVerifyHost(boolean verifyHost);
  
  ClientOptions setUseAlpn(boolean useAlpn);

  ClientOptions setTrustStore(String trustStorePath);

  ClientOptions setTrustStorePassword(String trustStorePassword);

  ClientOptions setKeyStore(String keyStorePath);

  ClientOptions setKeyStorePassword(String keyStorePassword);

  ClientOptions setBasicAuthCredentials(String username, String password);
  
  ClientOptions setExecuteQueryMaxResultRows(int maxRows);

  String getHost();

  int getPort();

  boolean isUseTls();
  
  boolean isVerifyHost();
  
  boolean isUseAlpn();

  boolean isUseBasicAuth();

  String getTrustStore();

  String getTrustStorePassword();

  String getKeyStore();

  String getKeyStorePassword();

  String getBasicAuthUsername();

  String getBasicAuthPassword();

  ClientOptions copy();

  static ClientOptions create() {
    return new ClientOptionsImpl();
  }
}

Мы всегда сможем добавить дополнительные параметры конфигурации позже. Также мы можем захотеть предоставить пользователям доступ к Vert.x HttpClientOptions для опытных пользователей, которые хотят предоставить собственные настройки. Выполняет запрос (push или pull) и возвращает результаты по одной строке за раз.

  • Если от сервера получен ответ, отличный от 200, то CompletableFuture будет неудачным.

  • По умолчанию запросы push, отправленные через этот метод, возвращают результаты, начиная с начала потока или таблицы. Чтобы переопределить это поведение, используйте метод {@link #streamQuery(String, Map)}, чтобы передать в запросе свойство {@code auto.offset.reset} со значением, установленным в {@code latest}.

@param sql оператор запроса для выполнения @return будущее, которое завершается после получения ответа сервера и содержит результат запроса в случае успеха

Выполняет запрос (push или pull) и возвращает результаты по одной строке за раз.

  • Если от сервера получен ответ, отличный от 200, то CompletableFuture будет неудачным.

  • Запросы push, отправленные с помощью этого метода, по умолчанию возвращают результаты, начинающиеся с начала потока или таблицы. Для переопределения этого поведения передайте в запросе свойство {@code auto.offset.reset}, установив значение {@code latest}.

@param sql оператор запроса для выполнения @param properties свойства запроса @return будущее, которое завершается после получения ответа сервера и содержит результат запроса в случае успеха

где StreamedQueryResult выглядит следующим образом:

Результат запроса (push или pull), передаваемый по потоку по одной строке за раз. Записи могут быть использованы либо путём подписки на издателя, либо путём опроса (блокировки) по одной записи за раз. Эти два способа потребления являются взаимоисключающими; может использоваться только один метод (для каждого StreamedQueryResult).

  • Метод {@code subscribe()} не может быть вызван, если {@link #isFailed} истинно.

public interface StreamedQueryResult extends Publisher {

List columnNames();

List columnTypes();

/**

  • Возвращает идентификатор основного запроса, если запрос является запросом push. В противном случае возвращает null.
  • @return идентификатор запроса */ String queryID();

/**

  • Возвращает следующую строку. Блокируется до тех пор, пока она не станет доступна или основной запрос не будет завершён (либо корректно, либо из-за ошибки).
  • @return строка или null, если запрос был завершён. */ Row poll();

/**

  • Возвращает следующую строку. Блокируется до тех пор, пока она не станет доступна, не истечёт указанный тайм-аут или основной запрос не будет завершён (либо корректно, либо из-за ошибки).
  • @param timeout время ожидания строки. Неположительное значение приведёт к блокировке этого метода до получения строки или завершения запроса.
  • @return строка, или null, если истекло время ожидания или запрос был завершён. */ Row poll(Duration timeout);

/**

  • Определяет, завершён ли {@code StreamedQueryResult}.
  • {@code StreamedQueryResult} считается завершённым, если HTTP-соединение, связанное с этим запросом, было корректно закрыто. После завершения {@code StreamedQueryResult} продолжит доставлять любые оставшиеся строки, а затем вызовет {@code onComplete()} у подписчика, если таковой имеется.
  • @return завершённость {@code StreamedQueryResult}. */ boolean isComplete();

/**

  • Определяет, завершился ли {@code StreamedQueryResult} неудачно.
  • Если {@code StreamedQueryResult} завершился неудачно, то от сервера была получена ошибка. После неудачи {@code onError()} вызывается у подписчика, если он есть, все существующие вызовы {@code poll()} вернут null, а новые вызовы {@link #poll} и {@code subscribe()} будут отклонены.
  • @return неудачное завершение {@code StreamedQueryResult}. */ boolean isFailed(); }

Обратите внимание, что StreamedQueryResult — это издатель Reactive Streams, поэтому пользователи могут передавать результаты. Пользователи также могут вызывать poll(), чтобы... Вот перевод текста на русский язык:

  /**
   * Возвращает значение для определённого столбца {@code Row} как long.
   */
  Long getLong(String columnName);

  /**
   * Возвращает значение определённого столбца {@code Row} в виде double.
   *
   * @param columnIndex индекс столбца (отсчёт с 1)
   * @return значение столбца
   * @throws ClassCastException если значение столбца не является числом {@code Number}
   * @throws IndexOutOfBoundsException если индекс недопустим
   */
  Double getDouble(int columnIndex);

  /**
   * Возвращает значение определённого столбца {@code Row} в виде double.
   *
   * @param columnName имя столбца
   * @return значение столбца
   * @throws ClassCastException если значение столбца не является числом {@code Number}
   * @throws IllegalArgumentException если имя столбца недопустимо
   */
  Double getDouble(String columnName);

  /**
   * Возвращает значение определённого столбца {@code Row} в виде boolean.
   *
   * @param columnIndex индекс столбца (отсчёт с 1)
   * @return значение столбца
   * @throws ClassCastException если значение столбца не является логическим значением
   * @throws IndexOutOfBoundsException если индекс недопустим
   */
  Boolean getBoolean(int columnIndex);

  /**
   * Возвращает значение определённого столбца {@code Row} в виде boolean.
   *
   * @param columnName имя столбца
   * @return значение столбца
   * @throws ClassCastException если значение столбца не является логическим значением
   * @throws IllegalArgumentException если имя столбца недопустимо
   */
  Boolean getBoolean(String columnName);

  /**
   * Возвращает значение определённого столбца {@code Row} в виде BigDecimal.
   *
   * @param columnIndex индекс столбца (отсчёт с 1)
   * @return значение столбца
   * @throws ClassCastException если значение столбца не является числом {@code Number}
   * @throws IndexOutOfBoundsException если индекс недопустим
   */
  BigDecimal getDecimal(int columnIndex);

  /**
   * Возвращает значение определённого столбца {@code Row} в виде BigDecimal.
   *
   * @param columnName имя столбца
   * @return значение столбца
   * @throws ClassCastException если значение столбца не является числом {@code Number}
   * @throws IllegalArgumentException если имя столбца недопустимо
   */
  BigDecimal getDecimal(String columnName);

  /**
   * Возвращает значение определённого столбца {@code Row} в виде KsqlObject.
   * Полезно для типов столбцов {@code MAP} и {@code STRUCT}.
   *
   * @param columnIndex индекс столбца (отсчёт с 1)
   * @return значение столбца
   * @throws ClassCastException если значение столбца нельзя преобразовать в карту
   * @throws IndexOutOfBoundsException если индекс недопустим
   */
  KsqlObject getKsqlObject(int columnIndex);

  /**
   * Возвращает значение определённого столбца {@code Row} в виде KsqlObject.
   * Полезно для типов столбцов {@code MAP} и {@code STRUCT}.
   *
   * @param columnName имя столбца
   * @return значение столбца
   * @throws ClassCastException если значение столбца нельзя преобразовать в карту
   * @throws IllegalArgumentException если имя столбца недопустимо
   */
  KsqlObject getKsqlObject(String columnName);

  /**
   * Возвращает значение определённого столбца {@code Row} в виде KsqlArray.
   * Полезно для типа столбца {@code ARRAY}.
   *
   * @param columnIndex индекс столбца (отсчёт с 1)
   * @return значение столбца
   * @throws ClassCastException если значение столбца нельзя преобразовать в список
   * @throws IndexOutOfBoundsException если индекс недопустим
   */
  KsqlArray getKsqlArray(int columnIndex);

  /**
   * Возвращает значение определённого столбца {@code Row} в виде KsqlArray.
   * Полезно для типа столбца {@code ARRAY}.
   *
   * @param columnName имя столбца
   * @return значение столбца
   * @throws ClassCastException если значение столбца нельзя преобразовать в список
   * @throws IllegalArgumentException если имя столбца недопустимо
   */
  KsqlaArray getKsqlArray(String columnName);
}
``` **Сериализация в и из JSON**) — это то, что мы можем использовать в будущем.

Мы решили ввести новые типы вместо того, чтобы напрямую использовать `List<Object>` и `Map<String, Object>`, чтобы обеспечить большую гибкость для развития этих API в будущем.
(Мне не нравятся названия `KsqlArray` и `KsqlObject`, но я не смог придумать ничего лучше. Буду признателен за предложения!)

Методы, предоставляемые каждым типом, следующие:

/**

  • Представление массива значений. */ public KsqlArray {

/**

  • Создаёт пустой экземпляр. */ public KsqlArray() { delegate = new JsonArray(); }

/**

  • Создает экземпляр с указанными значениями.
  • @param list значения */ public KsqlArray(final List<?> list) { delegate = new JsonArray(list); }

/**

  • Возвращает размер (количество значений) массива.
  • @return размер */ public int size() { return delegate.size(); }

/**

  • Проверяет, пуст ли массив.
  • @return является ли массив пустым */ public boolean isEmpty() { return delegate.isEmpty(); }

/**

  • Возвращает значения массива в виде списка.
  • @return список значений */ public List<?> getList() { return delegate.getList(); }

/**

  • Возвращает итератор по значениям массива.
  • @return итератор */ public Iterator iterator() { return delegate.iterator(); }

/**

  • Возвращает поток значений массива.
  • @return поток */ public java.util.stream.Stream stream() { return delegate.stream(); }

/**

  • Возвращает значение по указанному индексу как объект.
  • @param pos индекс
  • @return значение */ public Object getValue(final int pos) { return delegate.getValue(pos); }

/**

  • Возвращает значение по указанному индексу в виде строки.
  • @param pos индекс
  • @return значение
  • @throws ClassCastException если значение не является строкой
  • @throws IndexOutOfBoundsException если индекс недействителен */ public String getString(final int pos) { return delegate.getString(pos); }

/**

  • Возвращает значение по указанному индексу в виде целого числа.
  • @param pos индекс
  • @return значение
  • @throws ClassCastException если значение не относится к типу {@code Number}
  • @throws IndexOutOfBoundsException если индекс недействителен */ public Integer getInteger(final int pos) { return delegate.getInteger(pos); }

/**

  • Возвращает значение по указанному индексу в виде длинного числа.
  • @param pos индекс
  • @return значение
  • @throws ClassCastException если значение не относится к типу {@code Number}
  • @throws IndexOutOfBoundsException если индекс недействителен */ public Long getLong(final int pos) { return delegate.getLong(pos); }

/**

  • Возвращает значение по указанному индексу в виде двойного числа.
  • @param pos индекс
  • @return значение
  • @throws ClassCastException если значение не относится к типу {@code Number}
  • @throws IndexOutOfBoundsException если индекс недействителен */ public Double getDouble(final int pos) { return delegate.getDouble(pos); }

/**

  • Возвращает значение по указанному индексу в виде логического значения.
  • @param pos индекс
  • @return значение
  • @throws ClassCastException если значение не является логическим
  • @throws IndexOutOfBoundsException если индекс недействителен */ public Boolean getBoolean(final int pos) { return delegate.getBoolean(pos); }

/**

  • Возвращает значение по указанному индексу в виде BigDecimal.
  • @param pos индекс
  • @return значение
  • @throws ClassCastException если значение не относится к типу {@code Number}
  • @throws IndexOutOfBoundsException если индекс недействителен */ public BigDecimal getDecimal(final int pos) { return new BigDecimal(getValue(pos).toString()); }

/**

  • Возвращает значение по указанному индексу в виде KsqlArray.

  • @param pos индекс

  • @return значение

  • @throws ClassCastException если значение нельзя преобразовать в список

  • @throws IndexOutOfBoundsException */ Текст 1:

  • @return the JSON string

    public String toJsonString() { return delegate.toString(); }

    /** Возвращает строку JSON.

    @return строка JSON

    @Override

    public String toString() { return toJsonString(); } }*

Текст 2:

/** Представление карты строковых ключей и значений. Полезно для представления объекта JSON.

public KsqlObject {

public KsqlObject() { delegate = new JsonObject(); }**

public KsqlObject(final Map<String, Object> map) { delegate = new JsonObject(map); }**

KsqlObject(final JsonObject jsonObject) { delegate = Objects.requireNonNull(jsonObject); }**

public boolean containsKey(final String key) { return delegate.containsKey(key); }**

public Set fieldNames() { return delegate.fieldNames(); }**

public int size() { return delegate.size(); }**

public boolean isEmpty() { return delegate.isEmpty(); }**

public Map<String, Object> getMap() { return delegate.getMap(); }**

public Iterator<Entry<String,Object>> iterator() { return delegate.iterator(); }**

public java.util.stream.Stream<Map.Entry<String,Object>> stream() { return delegate.stream(); }**

public Object getValue(final String key) { return delegate.getValue(key); }**

public String getString(final String key) { return delegate.getString(key); }**

public Integer getInteger(final String key) { return delegate.getInteger(key); }**

public Long getLong(final String key) { return delegate.getLong(key); }**

public Double getDouble(final String key) { return delegate.getDouble(key); }**

public Boolean getBoolean(final String key) { return delegate.getBoolean(key); }**

public void put(final String key, final Object value) { Objects.requireNonNull(key); if (value == null) { throw new NullPointerException("value"); }** delegate.put(key, value); }**

public void remove(final String key) { Objects.requireNonNull(key); delegate.remove(key); }**

public static class Builder { private final Map<String, Object> entries = new HashMap<>();

 ***public Builder put(final String key, final Object value) {***
 *entries.put(key, value);*
 *return this;*

}**

public Builder remove(final String key) { entries.remove(key); return this; }**

public KsqlObject build() { return new KsqlObject(entries); }}}*

В запросе представлен исходный код на языке Java, который описывает класс KsqlObject, представляющий собой карту строковых ключей и значений, полезных для представления объектов JSON. Класс содержит методы для работы с картой, такие как добавление, удаление и получение значений по ключу. Также в коде представлен конструктор класса и статический внутренний класс Builder, который позволяет удобно создавать экземпляры KsqlObject с заданными значениями. Как BigDecimal. Возвращает null, если:

  • ключ отсутствует;

  • @param key ключ;

  • @return значение;

  • @throws ClassCastException если значение не является числом.

    public BigDecimal getDecimal(final String key) { return new BigDecimal(getValue(key).toString()); }

Возвращает значение, связанное с указанным ключом как KsqlArray. Возвращает null, если ключ отсутствует.

  • @param key ключ;

  • @return значение;

  • @throws ClassCastException если значение нельзя преобразовать в список.

    public KsqlArray getKsqlArray(final String key) { return new KsqlArray(delegate.getJsonArray(key)); }

Возвращает значение, связанное с указанным ключом, как KsqlObject. Возвращает null, если ключ отсутствует.

  • @param key ключ;

    • @return значение;
  • @throws ClassCastException если значение невозможно преобразовать в карту.

    public KsqlObject getKsqlObject(final String key) { return new Ksqlobject(delegate.getJsonObject(key)); }

Удаляет значение, связанное с заданным ключом.

  • @param key ключ;

  • @return удалённое значение или null, если ключа не было.

    public Object remove(final String key) { return delegate.remove(key); }

Добавляет запись для указанного ключа и значения в карту.

  • @param key ключ;

  • @param value значение;

  • @return ссылка на этот объект.

    public KsqlObject put(final String key, final Integer value) { delegate.put(key, value); return this; }

Добавляет запись для указанного ключа и значения в карту.

  • @param key ключ;

  • @param value значение;

  • @return ссылка на этот объект.

    public KsqlObject put(final String key, final Long value) { delegate.put(key, value); return this; }

Добавляет запись для указанного ключа и значения в карту.

  • @param key ключ;

  • @param value значение;

  • @return ссылка на этот объект.

    public KsqlObject put(final String key, final String value) { delegate.put(key, value); return this; }

Добавляет запись для указанного ключа и значения в карту.

  • @param key ключ;

  • @param value значение;

  • @return ссылка на этот объект.

    public KsqlObject put(final String key, final Double value) { delegate.put(key, value); return this; }

Добавляет запись для указанного ключа и значения в карту.

  • @param key ключ;

  • @param value значение;

  • @return ссылка на этот объект.

    public KsqlObject put(final String key, final Boolean value) { delegate.put(key, value); return this; }

Добавляет запись для указанного ключа и значения в карту.

  • @param key ключ;

  • @param value значение;

  • @return ссылка на этот объект.

    public KsqlObject put(final String key, final BigDecimal value) { // Vert.x JsonObject не принимает значения BigDecimal. Вместо этого мы сохраняем значение в виде строки, чтобы не потерять точность. delegate.put(key, value.toString()); return this; }

Добавляет запись для указанного ключа и значения в карту.

  • @param key ключ;

  • @param value значение;

  • @return ссылка на этот объект.

    public KsqlObject put(final String key, final KsqlArray value) { delegate.put(key, KsqlArray.toJsonArray(value)); return this; }

Добавляет запись для указанного ключа и значения в карту.

  • @param key ключ;

  • @param value значение;

  • @return ссылка на этот объект.

    public KsqlObject put(final String key, final KsqlObject value) { delegate.put(key, KsqlObject.toJsonObject(value)); return this; }

Добавляет запись для указанного ключа и значения в карту.

  • @param key ключ;

  • @param value значение;

  • @return ссылка на этот объект.

    public KsqlObject put(final String key, final Object value) { delegate.put(key, value); return this; null значение в карту.

  • @param key ключ

  • @return ссылка на этот объект */ public KsqlObject putNull(final String key) { delegate.putNull(key); return this; }

/**

  • Добавляет записи из указанного {@code KsqlObject} в этот экземпляр.
  • @param other записи для добавления
  • @return ссылка на этот объект */ public KsqlObject mergeIn(final KsqlObject other) { delegate.mergeIn(toJsonObject(other)); return this; }

/**

  • Возвращает копию этого объекта.
  • @return копия */ public KsqlObject copy() { return new KsqlObject(delegate.copy()); }

/**

  • Возвращает строку JSON, представляющую записи на карте.
  • @return строка JSON */ public String toJsonString() { return delegate.toString(); }

/**

  • Возвращает строку JSON, представляющую записи на карте. То же самое, что и {@link #toJsonString()}.
  • @return строка JSON */ @Override public String toString() { return toJsonString(); } Наконец, ColumnType:
/**
* Тип столбца, возвращаемого как часть результата запроса.
*/
public interface ColumnType {

  enum Type { STRING, INTEGER, BIGINT, DOUBLE, BOOLEAN, DECIMAL, ARRAY, MAP, STRUCT }

  Type getType();

}

На данный момент ColumnType просто оборачивает тип, представленный в виде перечисления. В будущем мы можем добавить дополнительные методы, если захотим предоставить более подробную информацию о типе (например, десятичную шкалу и точность или внутренние типы для вложенных/сложных типов). (В настоящее время серверные API возвращают информацию о типах только в виде строк, что означает предоставление полностью определённого типа требует либо клиентского анализа, либо изменения на стороне сервера. Последнее предпочтительнее, но это довольно много дополнительной работы.)

Как указано в javadocs, запросы push, отправленные через этот метод, по умолчанию используют auto.offset.reset=earliestt (как следствие использования нового /query-stream endpoint), что является отклонением от старого REST API и ksqlDB CLI.

Для getDecimal(...) в интерфейсе Row вместо попытки проанализировать тип столбца для извлечения точности и масштаба мы просто преобразуем значение в BigDecimal без явного указания точности и масштаба. Мы также могли бы добавить опцию для пользователей, чтобы указать масштаб и точность в геттере, если мы считаем, что это будет полезно.

Интересно, что метод getKsqlObject() в интерфейсе Row используется для представления обоих типов ksqlDB: MAP и STRUCT. Я не уверен, какой может быть лучшая альтернатива, чтобы избежать этой путаницы.

Транзиентные запросы — Не потоковые

Интерфейс Client также предоставит следующие методы для получения результатов транзиентного запроса (push или pull) в одном пакете (не потоковом), как только запрос завершится:

  /**
   * Выполняет запрос (push или pull) и возвращает все строки результатов в одном пакете, как только запрос
   * завершится.
   *
   * <p>По умолчанию запросы push, выданные с помощью этого метода, возвращают результаты, начиная с начала
   * потока или таблицы. Чтобы переопределить это поведение, используйте метод
   * {@link #executeQuery(String, Map)} для передачи в свойстве запроса {@code auto.offset.reset}
   * со значением, установленным в {@code latest}.
   *
   * @param sql оператор запроса для выполнения
   * @return результат запроса
   */
  BatchedQueryResult executeQuery(String sql);

  /**
   * Выполняет запрос (push или pull) и возвращает все строки результатов в одном пакете, как только запрос
   * завершится.
   *
   * <p>По умолчанию запросы push, выданные с помощью этого метода, возвращают результаты, начиная с начала
   * потока или таблицы. Чтобы переопределить это поведение, передайте в свойстве запроса
   * {@code auto.offset.reset} со значением, установленным в {@code latest}.
   *
   * @param sql оператор запроса для выполнения
   * @param properties свойства запроса
   * @return результат запроса
   */
  BatchedQueryResult executeQuery(String sql, Map<String, Object> properties);

где

/**
 * Результат запроса (push или pull), возвращаемый в виде одного пакета после завершения выполнения запроса или его завершения. Для **Непрерывающие push-запросы**

* {@link StreamedQueryResult} следует использовать вместо этого.

*
Если с сервера получен ответ, не равный 200, это будущее завершится исключением.

*
Максимальное количество строк {@code Row}, которые могут быть возвращены из {@code BatchedQueryResult}, по умолчанию равно {@link ClientOptions#DEFAULT_EXECUTE_QUERY_MAX_RESULT_ROWS} и может быть настроено через {@link ClientOptions#setExecuteQueryMaxResultRows(int)}.

public abstract class BatchedQueryResult extends CompletableFuture<List> { /**

  • Возвращает {@code CompletableFuture}, содержащий идентификатор базового запроса, если запрос является push-запросом, иначе null. Будущее завершается после получения ответа от сервера.
  • @return будущее, содержащее идентификатор запроса (или null в случае pull-запросов) */ public abstract CompletableFuture queryID(); }
Завершение запроса может означать:
* Запрос является pull-запросом;
* Запрос — это push-запрос с предложением limit, и предел достигнут;
* Запрос — это push-запрос, который был завершён.

Подобно методам `streamQuery()`, описанным выше, push-запросы, отправленные через этот метод, по умолчанию используют `auto.offset.reset=earliestt` (вследствие использования новой конечной точки `/query-stream`), что отличается от старого REST API и ksqlDB CLI.

### Вставка значений

Метод для вставки одной строки за раз:

/**

  • Вставляет строку в поток ksqlDB.
  • Будущее {@code CompletableFuture} будет неудачным, если с сервера будет получен ответ, отличный от 200, или если сервер столкнётся с ошибкой при обработке вставки.

  • @param streamName имя целевого потока
  • @param row строка для вставки. Ключи — имена столбцов, а значения — значения столбцов.
  • @return будущее, которое завершается после получения ответа сервера */ CompletableFuture insertInto(String streamName, KsqlObject row);

Метод потоковой вставки (через конечную точку `/inserts-stream`):

CompletableFuture<Publisher> streamInserts(String streamName, Publisher insertsPublisher); }

где `InsertResponse` выглядит следующим образом:

public interface InsertResponse {

/**

  • Была ли строка успешно вставлена или нет. */ boolean isSuccessful();

/**

  • Уникальный порядковый номер строки в потоке вставок. */ int getSequenceNum();

/**

  • Если неудачно, сообщение об ошибке. */ String getErrorMessage();

/**

  • Если неудачно, код ошибки. */ int getErrorCode();

}

Обратите внимание, что эти методы позволяют вставлять строки только в потоки ksqlDB, но не в таблицы.
Мы сознательно решили не предоставлять метод для пакетной вставки строк в не потоковом режиме, поскольку отсутствие гарантий транзакций означает, что некоторые строки могут быть вставлены, а другие — нет (например, если на полпути возникает ошибка).

### DDL-операции

#### `CREATE <STREAM/TABLE>`, `CREATE <STREAM/TABLE> ... AS SELECT`, `INSERT INTO`

Методы клиента:

/**

  • Выполнить оператор DDL: CREATE STREAM, CREATE TABLE, CREATE STREAM ... AS SELECT, CREATE TABLE ... AS SELECT, INSERT INTO */ CompletableFuture executeStatement(String sql);

/**

  • Выполнить оператор DDL: CREATE STREAM, CREATE TABLE, CREATE STREAM ... AS SELECT, CREATE TABLE ... AS SELECT, INSERT INTO */ CompletableFuture executeStatement(String sql, Map<String, Object> properties);

/**

  • Выполнить оператор DDL: CREATE STREAM, CREATE TABLE, CREATE STREAM ... AS SELECT, CREATE TABLE ... AS SELECT, INSERT INTO */ CompletableFuture executeStatement(String sql, Map<String, Object> properties, long commandSequenceNumber);
с

public interface ExecuteStatementResponse {

long getCommandSequenceNumber();

}

Номер последовательности команд раскрывается, так как это механизм, используемый ksqlDB для обеспечения того, чтобы сервер, получающий новый запрос, выполнил более ранние операторы, от которых зависит новый запрос. В будущем мы... Можно рассмотреть введение настройки «pipeline запросов» на клиенте, аналогичной той, что используется в CLI ksqlDB, которая автоматически отслеживает и передаёт последний номер последовательности команд в последующие команды (вместо того, чтобы пользователь делал это самостоятельно). Это будет немного сложнее, чем для CLI ksqlDB, поскольку клиент может использоваться несколькими потоками.

Интересно, что новые серверные API (используемые `streamQuery()` и `executeQuery()`) не поддерживают опцию номера последовательности команд, поэтому мы должны либо добавить её для согласованности (и затем добавить её также в клиент), либо отказаться от поддержки номера последовательности команд для старых серверных API (включая конечную точку `/ksql`, используемую командами DDL и admin).

Считаем ли мы целесообразным вообще избегать отображения номера последовательности команд на клиенте для обеспечения согласованности?

В более ранней версии этого KLIP предлагалось иметь отдельные методы для `executeDdl()` и `executeDml()`, а не один метод `executeStatement()`, поскольку название `executeStatement()` потенциально вводит в заблуждение своей широтой. Однако термины DDL и DML не используются в наших документах, и была некоторая путаница относительно того, какие утверждения относятся к какой категории, так что наличие одного метода `executeStatement()` кажется наиболее простым решением.

Другие рассмотренные альтернативы включают:

CompletableFuture createStream(String sql);

CompletableFuture createStream(String sql, Map<String, Object> properties);

CompletableFuture createStream(String sql, Map<String, Object> properties, long commandSequenceNumber);

CompletableFuture createTable(String sql);

CompletableFuture createTable(String sql, Map<String, Object> properties);

CompletableFuture createTable(String sql, Map<String, Object> properties, long commandSequenceNumber);

CompletableFuture insertIntoSource(String sql);

CompletableFuture insertIntoSource(String sql, Map<String, Object> properties);

CompletableFuture insertIntoSource(String sql, Map<String, Object> properties, long commandSequenceNumber);


В этой версии реализации `createStream(...)` и `createTable(...)` будут идентичными и могут быть заменены одним методом `createSource(...)`. Однако название этого метода вызывает путаницу. `CreateSourceResponse` и `InsertIntoResponse` идентичны `ExecuteStatementResponse`, описанному выше.

Среди других альтернатив — разделение `CREATE <STREAM/TABLE>` и `CREATE <STREAM/TABLE> ... AS SELECT` на отдельные методы, но это кажется излишне сложным.

#### `DROP <STREAM/TABLE>`

Методы клиента:

/**

  • Drop stream. The underlying Kafka topic will not be deleted. */ CompletableFuture dropStream(String streamName);

/**

  • Drop stream. The underlying Kafka topic may optionally be deleted. */ CompletableFuture dropStream(String streamName, boolean deleteTopic);

/**

  • Drop table. The underlying Kafka topic will not be deleted. */ CompletableFuture dropTable(String tableName);

/**

  • Drop table. The underlying Kafka topic may optionally be deleted. */ CompletableFuture dropTable(String tableName, boolean deleteTopic);
где `DropSourceResponse` фактически совпадает с `ExecuteStatementResponse`/`CreateSourceResponse`/`InsertIntoResponse`, описанными выше.

Опять же, реализации `dropStream(...)` и `dropTable(...)` были бы одинаковыми, поэтому вместо этого мы могли бы иметь один метод `dropSource(...)`, но название могло бы вызвать путаницу.

Если мы решим оставить их отдельными, остаётся открытым вопрос о том, должен ли клиент проверять, что `dropStream(...)` не используется для удаления таблицы, и наоборот. На мой взгляд, такая проверка добавит сложности без особой пользы, хотя этот момент неоднозначности заставляет меня предпочесть единый метод `dropSource(...)` если мы сможем договориться о названии метода, которое не вызовет путаницы.

Обратите внимание, что пользователи могут... Также выполняйте запросы `DROP <STREAM/TABLE>` через `executeStatement()`, чтобы мы могли полностью избавиться от этих дополнительных методов.

Решение о том, делать это или нет, зависит от того, видим ли мы ценность в предоставлении метода, который пользователь может вызвать с именем потока/таблицы напрямую, вместо передачи полной строки SQL для команды.

#### `TERMINATE <queryId>`

CompletableFuture terminatePersistentQuery(String queryId);

где `TerminateQueryResponse` такой же, как `ExecuteStatementResponse`/`CreateSourceResponse`/`InsertIntoResponse`/`DropSourceResponse`, описанные выше.

Метод `terminatePersistenQuery(...)` предназначен для отличия от `terminatePushQuery(...)` ниже.

Как и выше, пользователи также могут завершать постоянные запросы через `executeStatement()`, поэтому мы можем удалить `terminatePersistentQuery()`, если не видим ценности в предоставлении удобного метода, требующего только идентификатор запроса, а не полную строку SQL для команды.

### Операции администратора

#### `SHOW TOPICS`

CompletableFuture<List> listTopics();

с

public interface TopicInfo { String getName(); int getPartitions(); List getReplicasPerPartition(); }


#### `SHOW <STREAMS/TABLES>`

CompletableFuture<List> listStreams(); CompletableFuture<List> listTables();

с

public interface StreamInfo { String getName(); String getTopic(); String getFormat(); }

и

public interface TableInfo { String getName(); String getTopic(); String getFormat(); boolean isWindowed(); }


Рассматривая вопрос о том, имеет ли больше смысла для `StreamInfo#getFormat()` и `TableInfo#getFormat()` возвращать строку или значение перечисления, последнее облегчило бы пользователю знание возможных значений, но нам пришлось бы поддерживать список в актуальном состоянии, а также пожертвовать прямой совместимостью. Алмог также отмечает, что возврат формата в виде строки позволяет в будущем поддерживать пользовательские форматы, что является огромным плюсом, поэтому пока мы будем использовать строки.

#### `SHOW QUERIES`

CompletableFuture<List> listQueries();

с

public interface QueryInfo { boolean isPersistentQuery(); boolean isPushQuery();

/**

  • Идентификатор запроса, используемый для операций управления, таких как завершение запроса */ String getId(); String getSql();

/**

  • Имя приёмника для постоянного запроса. Иначе пусто. */ Optional getSink();

/**

  • Тема приёмника для постоянного запроса. Иначе пуста. */ Optional getSinkTopic(); }

### Завершение push-запроса

/**

  • Завершает push-запрос с указанным идентификатором запроса.
  • Если от сервера получен ответ, отличный от 200, будущее будет неудачным.

  • @param queryId Идентификатор завершаемого запроса
  • @return Будущее, которое завершается после получения ответа сервера */ CompletableFuture terminatePushQuery(String queryId);

### Разное

Интерфейс `Client` также будет иметь следующее:

/**

  • Закрывает базовый HTTP-клиент. */ void close();
## Дизайн

Реализация клиента выходит за рамки данного документа. Этот KLIP касается только интерфейсов.

## План тестирования

N/A. Данный KLIP касается только интерфейсов.

## LOE и этапы поставки

В выпуске 0.10.0 будут доступны следующие клиентские методы:
* Push и pull-запросы;
* Завершение push-запросов;
* Вставка значений — не потоковая (т. е. только `insertInto()`).

Следующие методы предназначены для версии 0.11.0:
* Вставка значений — потоковая (т.е. `streamInserts()`);
* DDL-операторы;
* Операции администрирования.

## Обновление документации

HTML-документы API, созданные на основе javadocs на клиентских интерфейсах, будут размещены на микросайте ksqlDB. У нас также будет отдельная страница документации с примером использования каждого из методов. (См. https://github.com/confluentinc/ksql/pull/5434 для черновика в разработке.)

## Последствия совместимости

N/A

## Последствия для безопасности

N/A

## Отклоненные альтернативы

### Соединители (вне рамок) **Область действия**

В более ранней версии этого KLIP предлагалось реализовать поддержку `CREATE CONNECTOR`, `DROP CONNECTOR` и `SHOW CONNECTORS` в рамках первой версии Java-клиента, рассматриваемой в этом KLIP. Однако мы решили отложить поддержку коннекторов в этой первой версии клиента, поскольку более предпочтительным долгосрочным решением было бы для Connect представить Java-клиент, вместо того чтобы пользователи отправляли команды через клиент ksqlDB для пересылки в Connect. Тем временем пользователи могут при желании использовать REST API для управления коннекторами через ksqlDB.

Ниже я записал предложенные интерфейсы для управления коннекторами для справки, если мы захотим вернуться к этому вопросу в будущем.

#### `CREATE CONNECTOR`, `DROP CONNECTOR`

CompletableFuture createSourceConnector(String name, Map<String, String> properties);

CompletableFuture createSinkConnector(String name, Map<String, String> properties);

CompletableFuture dropConnector(String name);

где `ConnectorInfo` — это модуль Apache Kafka ([ссылка](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java)). Возможно, мы захотим обернуть этот тип, чтобы избежать зависимости от Apache Kafka в клиенте ksqlDB.

#### `SHOW CONNECTORS`

CompletableFuture listConnectors();

с

public interface ConnectorList {

List getConnectors();

/**

  • Любые предупреждения, возвращаемые сервером в результате перечисления коннекторов. */ List getWarnings();

}

и

public interface ConnectorInfo {

enum ConnectorType { SOURCE, SINK, UNKNOWN; }

String getName();

ConnectorType getType();

String getClassName();

String getState();

}


Не очень хорошо, что введение `ConnectorList`, обёрнутого вокруг `List<ConnectorInfo>`, нарушает шаблон, установленный `SHOW TOPICS`/`SHOW <STREAMS/TABLES>`/`SHOW QUERIES`, но кажется важным передавать любые предупреждения сервера пользователю, так что компромисс стоит того, IMO.

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/mirrors-KSQL.git
git@api.gitlife.ru:oschina-mirror/mirrors-KSQL.git
oschina-mirror
mirrors-KSQL
mirrors-KSQL
master