본문 바로가기

개발 일지

[k8s] Kafka Connect 를 이용해서 MQTT 메세지를 Kafka Broker 로 Produce 하기

이전 게시글에서 EMQX 를 이용해서 MQTT Broker Cluster 를 설정하였다. 이번 게시글에서는 Kafka Cluster 를 Kubernetes 클러스터 위에 구성하고 MQTT 로 들어오는 메세지를 Kafka Cluster 로 Produce 되도록 설정해보자. Kafka Connect 에서 MQTT Connector 를 지원하는 방법은 굉장히 많은데, Confluent Platform 의 공식 MQTT Source Connector 는 Enterprise 버전 이상에서만 작동하고, Lenses.io 에서 제공하는 MQTT Source Connector 가 있긴 하지만, 이번 게시글에서는 Apache Camel 을 이용한 구현을 해 보겠다.

 

[k8s] MQTT Broker Cluster 와 Kafka Cluster 를 이용한 Scalable 아키텍쳐 구성

IoT 를 위한 백엔드 아키텍쳐를 구성할 때에, MQTT 브로커는 IoT 디바이스로부터 오는 정보를 서버의 가장 앞에서 받아오는 역할을 한다. MQTT 는 굉장히 적은 전력과 리소스로 작동하는 통신 프로토

dev-seb.tistory.com

Prerequisite

지난 게시글에서 설정한 것처럼 emqx mqtt broker cluster 가 설정되어 있는 상태인지 확인한다.

$ kubectl get pods -n mqtt
emqx-core-944bb84db-0             1/1     Running   0          100s
emqx-core-944bb84db-1             1/1     Running   0          100s
emqx-core-944bb84db-2             1/1     Running   0          100s
emqx-replicant-699944997b-6rjtq   1/1     Running   0          47s
emqx-replicant-699944997b-9kf5t   1/1     Running   0          47s
emqx-replicant-699944997b-j5h7z   1/1     Running   0          47s
emqx-replicant-699944997b-jhltf   1/1     Running   0          47s
emqx-replicant-699944997b-r5wsw   1/1     Running   0          47s
emqx-replicant-699944997b-tx5ws   1/1     Running   0          47s
emqx-replicant-699944997b-wst48   1/1     Running   0          47s

현재는 core node 3개와 replicant node 7개로 이루어져 있다.

Kafka Broker Cluster 배포하기

Kafka 클러스터는 Helm 을 이용해서 배포해 보자. Bitnami 의 Kafka 차트를 이용해서 배포해 보자. 실습을 위해 간단한 설정으로 배포하기 위해 value 를 튜닝할 것인데, kafka-values.yaml 파일을 생성해서 다음과 같이 작성한다.

# kafka-values.yaml
fullnameOverride: my-kafka

extraConfig: |
  num.partitions=5
  auto.create.topics.enable=true

controller:
  replicaCount: 3
  persistence:
    enabled: false

listeners:
  client:
    protocol: PLAINTEXT

provisioning:
  enabled: false

bitnami 차트를 이용해서 kafka cluster 를 배포한다.

helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install my-kafka bitnami/kafka -f kafka-values.yaml -n kafka --create-namespace --version 26.4.3

 

배포가 성공하면 broker pod 들이 생성될 때까지 다음 커맨드로 기다린다.

kubectl wait --for=condition=Ready -l helm.sh/chart=kafka-26.4.3 pods -n kafka

 

생성이 완료되면, 다음 커맨드로 pod 들을 확인해본다.

$ kubectl get pods -n kafka
NAME                    READY   STATUS    RESTARTS   AGE
my-kafka-controller-0   1/1     Running   0          3m32s
my-kafka-controller-1   1/1     Running   0          3m32s
my-kafka-controller-2   1/1     Running   0          3m32s

Strimzi Operator 설치하기

Kafka Connect 는 별도의 pod 으로 배포할 수 있다. Strimzi Operator 를 이용해서 Connect 를 배포해 보자. 먼저, Strimzi Operator 를 설치한다. Operator 도 Helm 을 이용해서 설치해 보자.

helm repo add strimzi https://strimzi.io/charts
helm repo update
helm install strimzi strimzi/strimzi-kafka-operator -n kafka --version 0.39.0

 

strimzi operator 가 생성될 때까지 기다려 보자.

kubectl wait --for=condition=Ready -l name=strimzi-cluster-operator pods -n kafka

 

strimzi operator 가 생성되면, KafkaConnect 리소스를 생성해 보자. KafkaConnect 에서 우리는 Camel MQTT source kafka connector 을 이용해서 MQTT 메세지를 kafka cluster 로 produce 할 것이다. Camel Kafka Connector 는 Apache Camel 프로젝트의 스택을 Kafka Connector 로 이용할 수 있는 기능인데, Kafka Connect 의 플러그인으로 설치할 수 있다. Camel Kafka Connector 를 이용할 수 있도록 커스텀 이미지를 사용해야 하는데, 두 가지 방법 중 하나를 선택할 수 있다.

Option A: Dockerhub 에서 public image 사용하기

k2sebeom/kafka-connect-with-camel-mqtt:strimzi-latest 를 사용할 수 있다. 필자가 직접 생성한 Docker 이미지이다.

Option B: Image 직접 생성하기

Github Repo 를 clone 해서 Dockerfile 을 확인한다. plugins 디렉토리 안에 camel-mqtt-source-kafka-connector 를 다운받은 후에, docker build 과정에서 plugin 안의 jar 파일을 복사해 넣을 것이다. plugin 은 링크 에서도 찾아서 다운받을 수 있다.

 

GitHub - k2sebeom/kafka-connect-with-camel-mqtt: Kafka Connect Image with Apache Camel MQTT Source Plugin

Kafka Connect Image with Apache Camel MQTT Source Plugin - k2sebeom/kafka-connect-with-camel-mqtt

github.com

cd plugins
wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-mqtt-source-kafka-connector/3.20.6/camel-mqtt-source-kafka-connector-3.20.6-package.tar.gz
tar xfz camel-mqtt-source-kafka-connector-3.20.6-package.tar.gz
rm camel-mqtt-source-kafka-connector-3.20.6-package.tar.gz

 

plugins 에 다운을 완료했으면 docker 이미지를 생성하면 된다.

docker build -t kafka-connect-with-camel-mqtt -f dockerfiles/Dockerfile.streamzi .

Kafka Connect 배포하기

Kafka Connect pod 를 생성하자. kafka-connect.yaml 파일을 생성하고 다음과 같이 작성한다.

# kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-kafka-connect
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  replicas: 1
  image: k2sebeom/kafka-connect-with-camel-mqtt:strimzi-latest
  bootstrapServers: my-kafka.kafka:9092
  config:
    config.storage.replication.factor: 1
    config.storage.topic: my-connect-configs
    group.id: my-connect
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    offset.storage.replication.factor: 1
    offset.storage.topic: my-connect-offsets
    status.storage.replication.factor: 1
    status.storage.topic: my-connect-status
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

 

해당 manifest 를 이용해서 Connect 를 생성한다.

kubectl create -f kafka-connect.yaml
kubectl wait --for=condition=Ready pods -l strimzi.io/kind=KafkaConnect -n kafka

 

connect pod 이 생성 완료되면 connector 도 생성해 보자. 공식문서 에 필요한 config 이 설정되어 있다.

mqtt-connector.yaml 파일을 생성한다.

# mqtt-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mqtt-source
  namespace: kafka
  labels:
    strimzi.io/cluster: my-kafka-connect
spec:
  tasksMax: 1
  class: org.apache.camel.kafkaconnector.mqttsource.CamelMqttsourceSourceConnector
  config:
    topics: mqtt
    camel.kamelet.mqtt-source.topic: test
    camel.kamelet.mqtt-source.brokerUrl: tcp://emqx-listeners.mqtt:1883
    camel.kamelet.mqtt-source.clientId: kafka-connector
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

 

mqtt source 로부터 test 라는 topic 을 구독하고, 들어오는 메세지를 mqtt 라는 kafka topic 으로 produce 하겠다는 뜻이다. 해당 manifest 로 생성해 보자.

kubectl create -f mqtt-connector.yaml

 

생성이 잘 되었다면, emqx dashboard 에서 위에서 설정한 것과 같이 kafka-connector 라는 구독자가 생긴 것을 확인할 수 있다.

어머 구독자가 늘었어요

MQTT 메세지가 Kafka Record 로 들어오는지 확인하기

먼저 mqtt listener 서비스를 localhost 로 노출시키자.

minikube service emqx-listeners -n mqtt

 

이전 게시글 처럼 test 토픽으로 메세지를 생성해 보자. mqttx 를 사용해도 좋고, 원하는 producer cli 를 사용해도 된다.

mqttx bench pub -c 10 -h localhost -p 52415 -t test -m 'Hello World'

 

EMQX Dashboard 를 보면, 초당 10개의 메세지가 잘 들어오는 것을 확인할 수 있다.

 

mqtt 메세지를 생성 중이다

 

이제 생성된 메세지가 잘 Kafka Cluster 로 들어오는지 확인한다. Kafka Script 들을 사용하기 위해서 kafka connect container 안으로 접속해 보자.

kubectl exec -it -n kafka my-kafka-connect-connect-0 -- /bin/bash
cd bin

 

bin 디렉토리 안에 다양한 스크립트 파일이 있는 것을 알 수 있다. 그 중 kafka-console-consumer 를 사용해 보자.

$ ./kafka-console-consumer.sh --bootstrap-server my-kafka.kafka:9092 --topic mqtt
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
...

 

mqtt 로 생성한 Hello World 라는 이미지가 들어오는 것이 확인되면 성공이다!

결론

MQTT 메세지를 Kafka 로 들어오게 하는 데에 성공했다. 사실 Kafka 를 제대로 쓰려면, Key 나 Partition 을 잘 활용하면 좋은데, Camel Connector 는 key 가 항상 null 이라서 구체적인 튜닝은 다소 어렵다. 하지만 open source 로 mqtt connector 을 구현할 수 있다는 점이 굉장한 장점인 것 같다.

 

Appendix

전체 코드는 Github Repo에서 확인할 수 있습니다.

 

GitHub - k2sebeom/kafka-connect-mqtt-camel-example

Contribute to k2sebeom/kafka-connect-mqtt-camel-example development by creating an account on GitHub.

github.com