Руководство по сборке и развертыванию KafkaSource
источника Eventing с использованием службы Knative Serving.
source/kafka-topic.yaml
с вашими предпочтениями:Тема
Имя кластера
Разделы
Реплики
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: knative-demo-topic
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 3
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
Разверните KafkaTopic
$ kubectl apply -f strimzi-topic.yaml
kafkatopic.kafka.strimzi.io/knative-demo-topic created
Убедитесь, что KafkaTopic
работает.
$ kubectl -n kafka get kafkatopics.kafka.strimzi.io
NAME AGE
knative-demo-topic 16s
Скачайте копию кода:
git clone -b "{{< branch >}}" https://github.com/knative/docs knative-docs
cd knative-docs/docs/eventing/samples/kafka/source
Соберите службу Event Display (event-display.yaml
)
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
namespace: default
spec:
template:
spec:
containers:
- # Это соответствует
# https://github.com/knative/eventing-contrib/tree/master/cmd/event_display/main.go
image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
```1. Разверните службу Event Display
$ kubectl apply --filename event-display.yaml ... service.serving.knative.dev/event-display created
Убедитесь, что паддинг службы работает. Имя паддинга будет иметь префикс event-display
.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
event-display-00001-deployment-5d5df6c7-gv2j4 2/2 Running 0 72s
...
Измените source/event-source.yaml
в соответствии с серверами-запускателями, темами и т.д.:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # обратите внимание на пространство имен kafka
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
Разверните источник событий.
$ kubectl apply -f event-source.yaml
...
kafkasource.sources.knative.dev/kafka-source created
Проверьте, что под источника событий запущен. Имя пода будет иметь префикс kafka-source
.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
kafka-source-xlnhq-5544766765-dnl5s 1/1 Running 0 40m
Убедитесь, что источник событий Apache Kafka запущен с необходимой конфигурацией.
$ kubectl logs --selector='knative-eventing-source-name=kafka-source'
{"level":"info","ts":"2020-05-28T10:39:42.104Z","caller":"adapter/adapter.go:81","msg":"Starting with config: ","Topics":".","ConsumerGroup":"...","SinkURI":"...","Name":".","Namespace":"."}
```### Проверка1. Отправьте сообщение (`{"msg": "Это тест!"}`) в топик Apache Kafka, как показано ниже:
kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic Если командная строка не отображается, попробуйте нажать Enter.
{"msg": "Это тест!"}
Проверьте, что Apache Kafka Event Source приняло сообщение и правильно передало его в синк. Поскольку эти логи собираются на уровне отладки, отредактируйте ключ level
конфигмапа config-logging
в пространстве имен knative-sources
, чтобы он выглядел так:
data:
loglevel.controller: info
loglevel.webhook: info
zap-logger-config: |
{
"level": "debug",
"development": false,
"outputPaths": ["stdout"],
"errorOutputPaths": ["stderr"],
"encoding": "json",
"encoderConfig": {
"timeKey": "ts",
"levelKey": "level",
"nameKey": "logger",
"callerKey": "caller",
"messageKey": "msg",
"stacktraceKey": "stacktrace",
"lineEnding": "",
"levelEncoder": "",
"timeEncoder": "iso8601",
"durationEncoder": "",
"callerEncoder": ""
}
}
``` ```
Теперь вручную удалите развертывание kafkasource и дайте развертыванию `kafka-controller-manager`, которое работает в пространстве имен `knative-sources`, возможность перезапустить его. Теперь должны быть видны журналы уровня отладки.
``` ```
$ kubectl logs --selector='knative-eventing-source-name=kafka-source'
...
{"level":"debug","ts":"2020-05-28T10:40:29.400Z","caller":"kafka/consumer_handler.go:77","msg":"Сообщение взято","topic":".","value":"."}
{"level":"debug","ts":"2020-05-28T10:40:31.722Z","caller":"kafka/consumer_handler.go:89","msg":"Сообщение помечено","topic":".","value":"."}
Убедитесь, что Event Display получил сообщение, отправленное Event Source.
$ kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
☁️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 1.0
type: dev.knative.kafka.event
source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic
subject: partition:0#564
id: partition:0/offset:564
time: 2020-02-10T18:10:23.861866615Z
datacontenttype: application/json
Extensions,
key:
Data,
{
"msg": "This is a test!"
}
Удалите Apache Kafka Event Source
$ kubectl delete -f source/source.yaml kafkasource.sources.knative.dev
"kafka-source" deleted
Удалите Event Display
$ kubectl delete -f source/event-display.yaml service.serving.knative.dev
"event-display" deleted
Удалите Apache Kafka Event Controller
$ kubectl delete -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml
serviceaccount "kafka-controller-manager" deleted
clusterrole.rbac.authorization.k8s.io "eventing-sources-kafka-controller" deleted
clusterrolebinding.rbac.authorization.k8s.io "eventing-sources-kafka-controller" deleted
customresourcedefinition.apiextensions.k8s.io "kafkasources.sources.knative.dev" deleted
service "kafka-controller" deleted
statefulset.apps "kafka-controller-manager" deleted
(Необязательно) Удалите Apache Kafka Topic
$ kubectl delete -f kafka-topic.yaml
kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted
KafkaSource
получает сообщение из Kafka, он сохраняет ключ в расширении события, называемом Key
, а заголовки сообщений Kafka сохраняет в расширениях, начинающихся с kafkaheader
.Вы можете указать десериализатор ключа среди четырёх типов:
string
(по умолчанию) для строк, закодированных в UTF-8int
для 32-битных и 64-битных знаковых целых чиселfloat
для 32-битных и 64-битных чисел с плавающей запятойbyte-array
для закодированного в Base64 массива байтовЧтобы указать его, добавьте метку kafkasources.sources.knative.dev/key-type
к определению KafkaSource
, например:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
labels:
kafkasources.sources.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # обратите внимание на пространство имен kafka
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
KafkaSource
поддерживает методы аутентификации TLS и SASL. Для включения аутентификации TLS вам потребуются следующие файлы:
KafkaSource
ожидает, что эти файлы будут в формате pem, если они в другом формате, например jks, пожалуйста, преобразуйте в pem.
Создайте файлы сертификатов как секреты в пространстве имен, где будет установлен KafkaSource
$ kubectl create secret generic cacert --from-file=caroot.pem
secret/cacert created
``` $ kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
secret/kafka-secret created
Примените KafkaSource
, измените bootstrapServers
и topics
соответственно.
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source-with-tls
spec:
net:
tls:
enable: true
cert:
secretKeyRef:
key: tls.crt
name: kafka-secret
key:
secretKeyRef:
key: tls.key
name: kafka-secret
caCert:
secretKeyRef:
key: caroot.pem
name: cacert
consumerGroup: knative-group
bootstrapServers:
- my-secure-kafka-bootstrap.kafka:443
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )