1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/knative-docs

Клонировать/Скачать
README.md 12 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 27.05.2025 06:04 d330922

Руководство по сборке и развертыванию KafkaSource источника Eventing с использованием службы Knative Serving.

Предварительные требования

Тема Apache Kafka (необязательно)

  1. Если вы используете Strimzi, вы можете установить тему, модифицируя source/kafka-topic.yaml с вашими предпочтениями:
  • Тема

  • Имя кластера

  • Разделы

  • Реплики

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaTopic
    metadata:
      name: knative-demo-topic
      namespace: kafka
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 3
      replicas: 1
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
  1. Разверните KafkaTopic

    $ kubectl apply -f strimzi-topic.yaml
    kafkatopic.kafka.strimzi.io/knative-demo-topic created
  2. Убедитесь, что KafkaTopic работает.

    $ kubectl -n kafka get kafkatopics.kafka.strimzi.io
    NAME                 AGE
    knative-demo-topic   16s

Создание службы Event Display

  1. Скачайте копию кода:

    git clone -b "{{< branch >}}" https://github.com/knative/docs knative-docs
    cd knative-docs/docs/eventing/samples/kafka/source
  2. Соберите службу Event Display (event-display.yaml)

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: default
    spec:
      template:
        spec:
          containers:
            - # Это соответствует
              # https://github.com/knative/eventing-contrib/tree/master/cmd/event_display/main.go
              image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
    ```1. Разверните службу Event Display
    

    $ kubectl apply --filename event-display.yaml ... service.serving.knative.dev/event-display created

  3. Убедитесь, что паддинг службы работает. Имя паддинга будет иметь префикс event-display.

    $ kubectl get pods
    NAME                                            READY     STATUS    RESTARTS   AGE
    event-display-00001-deployment-5d5df6c7-gv2j4   2/2       Running   0          72s
    ...

Источник событий Apache Kafka

  1. Измените source/event-source.yaml в соответствии с серверами-запускателями, темами и т.д.:

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 # обратите внимание на пространство имен kafka
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
  2. Разверните источник событий.

    $ kubectl apply -f event-source.yaml
    ...
    kafkasource.sources.knative.dev/kafka-source created
  3. Проверьте, что под источника событий запущен. Имя пода будет иметь префикс kafka-source.

    $ kubectl get pods
    NAME                                  READY     STATUS    RESTARTS   AGE
    kafka-source-xlnhq-5544766765-dnl5s   1/1       Running   0          40m
  4. Убедитесь, что источник событий Apache Kafka запущен с необходимой конфигурацией.

    $ kubectl logs --selector='knative-eventing-source-name=kafka-source'
    {"level":"info","ts":"2020-05-28T10:39:42.104Z","caller":"adapter/adapter.go:81","msg":"Starting with config: ","Topics":".","ConsumerGroup":"...","SinkURI":"...","Name":".","Namespace":"."}
    ```### Проверка1. Отправьте сообщение (`{"msg": "Это тест!"}`) в топик Apache Kafka, как показано ниже:

    kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic Если командная строка не отображается, попробуйте нажать Enter.

    {"msg": "Это тест!"}

    
    
  5. Проверьте, что Apache Kafka Event Source приняло сообщение и правильно передало его в синк. Поскольку эти логи собираются на уровне отладки, отредактируйте ключ level конфигмапа config-logging в пространстве имен knative-sources, чтобы он выглядел так:

    data:
      loglevel.controller: info
      loglevel.webhook: info
      zap-logger-config: |
        {
          "level": "debug",
          "development": false,
          "outputPaths": ["stdout"],
          "errorOutputPaths": ["stderr"],
          "encoding": "json",
          "encoderConfig": {
            "timeKey": "ts",
            "levelKey": "level",
            "nameKey": "logger",
            "callerKey": "caller",
            "messageKey": "msg",
            "stacktraceKey": "stacktrace",
            "lineEnding": "",
            "levelEncoder": "",
            "timeEncoder": "iso8601",
            "durationEncoder": "",
            "callerEncoder": ""
          }
        }
    ```   ```
    Теперь вручную удалите развертывание kafkasource и дайте развертыванию `kafka-controller-manager`, которое работает в пространстве имен `knative-sources`, возможность перезапустить его. Теперь должны быть видны журналы уровня отладки.
    ```   ```
    $ kubectl logs --selector='knative-eventing-source-name=kafka-source'
    ...
    
    {"level":"debug","ts":"2020-05-28T10:40:29.400Z","caller":"kafka/consumer_handler.go:77","msg":"Сообщение взято","topic":".","value":"."}
    {"level":"debug","ts":"2020-05-28T10:40:31.722Z","caller":"kafka/consumer_handler.go:89","msg":"Сообщение помечено","topic":".","value":"."}
  6. Убедитесь, что Event Display получил сообщение, отправленное Event Source.

    $ kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
    
    ☁️ cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic
      subject: partition:0#564
      id: partition:0/offset:564
      time: 2020-02-10T18:10:23.861866615Z
      datacontenttype: application/json
    Extensions,
      key:
    Data,
        {
          "msg": "This is a test!"
        }

Шаги разборки

  1. Удалите Apache Kafka Event Source

    $ kubectl delete -f source/source.yaml kafkasource.sources.knative.dev
    "kafka-source" deleted
  2. Удалите Event Display

    $ kubectl delete -f source/event-display.yaml service.serving.knative.dev
    "event-display" deleted
  3. Удалите Apache Kafka Event Controller

    $ kubectl delete -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml
    serviceaccount "kafka-controller-manager" deleted
    clusterrole.rbac.authorization.k8s.io "eventing-sources-kafka-controller" deleted
    clusterrolebinding.rbac.authorization.k8s.io "eventing-sources-kafka-controller" deleted
    customresourcedefinition.apiextensions.k8s.io "kafkasources.sources.knative.dev" deleted
    service "kafka-controller" deleted
    statefulset.apps "kafka-controller-manager" deleted
  4. (Необязательно) Удалите Apache Kafka Topic

    $ kubectl delete -f kafka-topic.yaml
    kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted

(Необязательно) Укажите десериализатор ключаКогда KafkaSource получает сообщение из Kafka, он сохраняет ключ в расширении события, называемом Key, а заголовки сообщений Kafka сохраняет в расширениях, начинающихся с kafkaheader.

Вы можете указать десериализатор ключа среди четырёх типов:

  • string (по умолчанию) для строк, закодированных в UTF-8
  • int для 32-битных и 64-битных знаковых целых чисел
  • float для 32-битных и 64-битных чисел с плавающей запятой
  • byte-array для закодированного в Base64 массива байтов

Чтобы указать его, добавьте метку kafkasources.sources.knative.dev/key-type к определению KafkaSource, например:

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
 name: kafka-source
 labels:
   kafkasources.sources.knative.dev/key-type: int
spec:
 consumerGroup: knative-group
 bootstrapServers:
 - my-cluster-kafka-bootstrap.kafka:9092 # обратите внимание на пространство имен kafka
 topics:
 - knative-demo-topic
 sink:
   ref:
     apiVersion: serving.knative.dev/v1
     kind: Service
     name: event-display

Подключение к брокеру Kafka, поддерживающему TLS

KafkaSource поддерживает методы аутентификации TLS и SASL. Для включения аутентификации TLS вам потребуются следующие файлы:

  • Сертификат корневого сертификата (CA)
  • Сертификат клиента и ключ

KafkaSource ожидает, что эти файлы будут в формате pem, если они в другом формате, например jks, пожалуйста, преобразуйте в pem.

  1. Создайте файлы сертификатов как секреты в пространстве имен, где будет установлен KafkaSource

    $ kubectl create secret generic cacert --from-file=caroot.pem
    secret/cacert created
    ```   $ kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
    secret/kafka-secret created
  2. Примените KafkaSource, измените bootstrapServers и topics соответственно.

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
     name: kafka-source-with-tls
    spec:
     net:
       tls:
         enable: true
         cert:
           secretKeyRef:
             key: tls.crt
             name: kafka-secret
         key:
           secretKeyRef:
             key: tls.key
             name: kafka-secret
         caCert:
           secretKeyRef:
             key: caroot.pem
             name: cacert
    consumerGroup: knative-group
    bootstrapServers:
    - my-secure-kafka-bootstrap.kafka:443
    topics:
    - knative-demo-topic
    sink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: event-display

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/knative-docs.git
git@api.gitlife.ru:oschina-mirror/knative-docs.git
oschina-mirror
knative-docs
knative-docs
master