Tutorial on how to build and deploy a KafkaSource
Eventing source using a Knative Serving Service
.
source/kafka-topic.yaml
with your desired:Topic
Cluster Name
Partitions
Replicas
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: knative-demo-topic
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 3
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
Deploy the KafkaTopic
$ kubectl apply -f strimzi-topic.yaml
kafkatopic.kafka.strimzi.io/knative-demo-topic created
Ensure the KafkaTopic
is running.
$ kubectl -n kafka get kafkatopics.kafka.strimzi.io
NAME AGE
knative-demo-topic 16s
Download a copy of the code:
git clone -b "{{< branch >}}" https://github.com/knative/docs knative-docs
cd knative-docs/docs/eventing/samples/kafka/source
Build the Event Display Service (event-display.yaml
)
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
namespace: default
spec:
template:
spec:
containers:
- # This corresponds to
# https://github.com/knative/eventing-contrib/tree/master/cmd/event_display/main.go
image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
Deploy the Event Display Service
$ kubectl apply --filename event-display.yaml
...
service.serving.knative.dev/event-display created
Ensure that the Service pod is running. The pod name will be prefixed with
event-display
.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
event-display-00001-deployment-5d5df6c7-gv2j4 2/2 Running 0 72s
...
Modify source/event-source.yaml
accordingly with bootstrap servers, topics,
etc...:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
Deploy the event source.
$ kubectl apply -f event-source.yaml
...
kafkasource.sources.knative.dev/kafka-source created
Check that the event source pod is running. The pod name will be prefixed
with kafka-source
.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
kafka-source-xlnhq-5544766765-dnl5s 1/1 Running 0 40m
Ensure the Apache Kafka Event Source started with the necessary configuration.
$ kubectl logs --selector='knative-eventing-source-name=kafka-source'
{"level":"info","ts":"2020-05-28T10:39:42.104Z","caller":"adapter/adapter.go:81","msg":"Starting with config: ","Topics":".","ConsumerGroup":"...","SinkURI":"...","Name":".","Namespace":"."}
Produce a message ({"msg": "This is a test!"}
) to the Apache Kafka topic,
like shown below:
kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic
If you don't see a command prompt, try pressing enter.
>{"msg": "This is a test!"}
Check that the Apache Kafka Event Source consumed the message and sent it to
its sink properly. Since these logs are captured in debug level, edit the key level
of config-logging
configmap in knative-sources
namespace to look like this:
data:
loglevel.controller: info
loglevel.webhook: info
zap-logger-config: |
{
"level": "debug",
"development": false,
"outputPaths": ["stdout"],
"errorOutputPaths": ["stderr"],
"encoding": "json",
"encoderConfig": {
"timeKey": "ts",
"levelKey": "level",
"nameKey": "logger",
"callerKey": "caller",
"messageKey": "msg",
"stacktraceKey": "stacktrace",
"lineEnding": "",
"levelEncoder": "",
"timeEncoder": "iso8601",
"durationEncoder": "",
"callerEncoder": ""
}
}
Now manually delete the kafkasource deployment and allow the kafka-controller-manager
deployment running in knative-sources
namespace to redeploy it. Debug level logs should be visible now.
$ kubectl logs --selector='knative-eventing-source-name=kafka-source'
...
{"level":"debug","ts":"2020-05-28T10:40:29.400Z","caller":"kafka/consumer_handler.go:77","msg":"Message claimed","topic":".","value":"."}
{"level":"debug","ts":"2020-05-28T10:40:31.722Z","caller":"kafka/consumer_handler.go:89","msg":"Message marked","topic":".","value":"."}
Ensure the Event Display received the message sent to it by the Event Source.
$ kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
☁️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 1.0
type: dev.knative.kafka.event
source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic
subject: partition:0#564
id: partition:0/offset:564
time: 2020-02-10T18:10:23.861866615Z
datacontenttype: application/json
Extensions,
key:
Data,
{
"msg": "This is a test!"
}
Remove the Apache Kafka Event Source
\$ kubectl delete -f source/source.yaml kafkasource.sources.knative.dev
"kafka-source" deleted
\$ kubectl delete -f source/event-display.yaml service.serving.knative.dev
"event-display" deleted
\$ kubectl delete -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml
serviceaccount "kafka-controller-manager" deleted
clusterrole.rbac.authorization.k8s.io "eventing-sources-kafka-controller"
deleted clusterrolebinding.rbac.authorization.k8s.io
"eventing-sources-kafka-controller" deleted
customresourcedefinition.apiextensions.k8s.io "kafkasources.sources.knative.dev"
deleted service "kafka-controller" deleted statefulset.apps
"kafka-controller-manager" deleted
(Optional) Remove the Apache Kafka Topic
$ kubectl delete -f kafka-topic.yaml
kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted
When KafkaSource
receives a message from Kafka, it dumps the key in the Event
extension called Key
and dumps Kafka message headers in the extensions
starting with kafkaheader
.
You can specify the key deserializer among four types:
string
(default) for UTF-8 encoded stringsint
for 32-bit & 64-bit signed integersfloat
for 32-bit & 64-bit floating pointsbyte-array
for a Base64 encoded byte arrayTo specify it, add the label kafkasources.sources.knative.dev/key-type
to the KafkaSource
definition like:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
labels:
kafkasources.sources.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
The KafkaSource supports TLS and SASL authentication methods. For enabling TLS authentication, please have the below files
KafkaSource expects these files to be in pem format, if it is in other format like jks, please convert to pem.
Create the certificate files as secrets in the namespace where KafkaSource is going to be set up
$ kubectl create secret generic cacert --from-file=caroot.pem
secret/cacert created
$ kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
secret/key created
Apply the KafkaSource, change bootstrapServers and topics accordingly.
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source-with-tls
spec:
net:
tls:
enable: true
cert:
secretKeyRef:
key: tls.crt
name: kafka-secret
key:
secretKeyRef:
key: tls.key
name: kafka-secret
caCert:
secretKeyRef:
key: caroot.pem
name: cacert
consumerGroup: knative-group
bootstrapServers:
- my-secure-kafka-bootstrap.kafka:443
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )