이전 게시글에서 EMQX 를 이용해서 MQTT Broker Cluster 를 설정하였다. 이번 게시글에서는 Kafka Cluster 를 Kubernetes 클러스터 위에 구성하고 MQTT 로 들어오는 메세지를 Kafka Cluster 로 Produce 되도록 설정해보자. Kafka Connect 에서 MQTT Connector 를 지원하는 방법은 굉장히 많은데, Confluent Platform 의 공식 MQTT Source Connector 는 Enterprise 버전 이상에서만 작동하고, Lenses.io 에서 제공하는 MQTT Source Connector 가 있긴 하지만, 이번 게시글에서는 Apache Camel 을 이용한 구현을 해 보겠다.
[k8s] MQTT Broker Cluster 와 Kafka Cluster 를 이용한 Scalable 아키텍쳐 구성
IoT 를 위한 백엔드 아키텍쳐를 구성할 때에, MQTT 브로커는 IoT 디바이스로부터 오는 정보를 서버의 가장 앞에서 받아오는 역할을 한다. MQTT 는 굉장히 적은 전력과 리소스로 작동하는 통신 프로토
dev-seb.tistory.com
Prerequisite
지난 게시글에서 설정한 것처럼 emqx mqtt broker cluster 가 설정되어 있는 상태인지 확인한다.
$ kubectl get pods -n mqtt
emqx-core-944bb84db-0 1/1 Running 0 100s
emqx-core-944bb84db-1 1/1 Running 0 100s
emqx-core-944bb84db-2 1/1 Running 0 100s
emqx-replicant-699944997b-6rjtq 1/1 Running 0 47s
emqx-replicant-699944997b-9kf5t 1/1 Running 0 47s
emqx-replicant-699944997b-j5h7z 1/1 Running 0 47s
emqx-replicant-699944997b-jhltf 1/1 Running 0 47s
emqx-replicant-699944997b-r5wsw 1/1 Running 0 47s
emqx-replicant-699944997b-tx5ws 1/1 Running 0 47s
emqx-replicant-699944997b-wst48 1/1 Running 0 47s
현재는 core node 3개와 replicant node 7개로 이루어져 있다.
Kafka Broker Cluster 배포하기
Kafka 클러스터는 Helm 을 이용해서 배포해 보자. Bitnami 의 Kafka 차트를 이용해서 배포해 보자. 실습을 위해 간단한 설정으로 배포하기 위해 value 를 튜닝할 것인데, kafka-values.yaml 파일을 생성해서 다음과 같이 작성한다.
# kafka-values.yaml
fullnameOverride: my-kafka
extraConfig: |
num.partitions=5
auto.create.topics.enable=true
controller:
replicaCount: 3
persistence:
enabled: false
listeners:
client:
protocol: PLAINTEXT
provisioning:
enabled: false
bitnami 차트를 이용해서 kafka cluster 를 배포한다.
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install my-kafka bitnami/kafka -f kafka-values.yaml -n kafka --create-namespace --version 26.4.3
배포가 성공하면 broker pod 들이 생성될 때까지 다음 커맨드로 기다린다.
kubectl wait --for=condition=Ready -l helm.sh/chart=kafka-26.4.3 pods -n kafka
생성이 완료되면, 다음 커맨드로 pod 들을 확인해본다.
$ kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
my-kafka-controller-0 1/1 Running 0 3m32s
my-kafka-controller-1 1/1 Running 0 3m32s
my-kafka-controller-2 1/1 Running 0 3m32s
Strimzi Operator 설치하기
Kafka Connect 는 별도의 pod 으로 배포할 수 있다. Strimzi Operator 를 이용해서 Connect 를 배포해 보자. 먼저, Strimzi Operator 를 설치한다. Operator 도 Helm 을 이용해서 설치해 보자.
helm repo add strimzi https://strimzi.io/charts
helm repo update
helm install strimzi strimzi/strimzi-kafka-operator -n kafka --version 0.39.0
strimzi operator 가 생성될 때까지 기다려 보자.
kubectl wait --for=condition=Ready -l name=strimzi-cluster-operator pods -n kafka
strimzi operator 가 생성되면, KafkaConnect 리소스를 생성해 보자. KafkaConnect 에서 우리는 Camel MQTT source kafka connector 을 이용해서 MQTT 메세지를 kafka cluster 로 produce 할 것이다. Camel Kafka Connector 는 Apache Camel 프로젝트의 스택을 Kafka Connector 로 이용할 수 있는 기능인데, Kafka Connect 의 플러그인으로 설치할 수 있다. Camel Kafka Connector 를 이용할 수 있도록 커스텀 이미지를 사용해야 하는데, 두 가지 방법 중 하나를 선택할 수 있다.
Option A: Dockerhub 에서 public image 사용하기
k2sebeom/kafka-connect-with-camel-mqtt:strimzi-latest 를 사용할 수 있다. 필자가 직접 생성한 Docker 이미지이다.
Option B: Image 직접 생성하기
Github Repo 를 clone 해서 Dockerfile 을 확인한다. plugins 디렉토리 안에 camel-mqtt-source-kafka-connector 를 다운받은 후에, docker build 과정에서 plugin 안의 jar 파일을 복사해 넣을 것이다. plugin 은 링크 에서도 찾아서 다운받을 수 있다.
GitHub - k2sebeom/kafka-connect-with-camel-mqtt: Kafka Connect Image with Apache Camel MQTT Source Plugin
Kafka Connect Image with Apache Camel MQTT Source Plugin - k2sebeom/kafka-connect-with-camel-mqtt
github.com
cd plugins
wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-mqtt-source-kafka-connector/3.20.6/camel-mqtt-source-kafka-connector-3.20.6-package.tar.gz
tar xfz camel-mqtt-source-kafka-connector-3.20.6-package.tar.gz
rm camel-mqtt-source-kafka-connector-3.20.6-package.tar.gz
plugins 에 다운을 완료했으면 docker 이미지를 생성하면 된다.
docker build -t kafka-connect-with-camel-mqtt -f dockerfiles/Dockerfile.streamzi .
Kafka Connect 배포하기
Kafka Connect pod 를 생성하자. kafka-connect.yaml 파일을 생성하고 다음과 같이 작성한다.
# kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-kafka-connect
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
replicas: 1
image: k2sebeom/kafka-connect-with-camel-mqtt:strimzi-latest
bootstrapServers: my-kafka.kafka:9092
config:
config.storage.replication.factor: 1
config.storage.topic: my-connect-configs
group.id: my-connect
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
offset.storage.replication.factor: 1
offset.storage.topic: my-connect-offsets
status.storage.replication.factor: 1
status.storage.topic: my-connect-status
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
해당 manifest 를 이용해서 Connect 를 생성한다.
kubectl create -f kafka-connect.yaml
kubectl wait --for=condition=Ready pods -l strimzi.io/kind=KafkaConnect -n kafka
connect pod 이 생성 완료되면 connector 도 생성해 보자. 공식문서 에 필요한 config 이 설정되어 있다.
mqtt-connector.yaml 파일을 생성한다.
# mqtt-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mqtt-source
namespace: kafka
labels:
strimzi.io/cluster: my-kafka-connect
spec:
tasksMax: 1
class: org.apache.camel.kafkaconnector.mqttsource.CamelMqttsourceSourceConnector
config:
topics: mqtt
camel.kamelet.mqtt-source.topic: test
camel.kamelet.mqtt-source.brokerUrl: tcp://emqx-listeners.mqtt:1883
camel.kamelet.mqtt-source.clientId: kafka-connector
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
mqtt source 로부터 test 라는 topic 을 구독하고, 들어오는 메세지를 mqtt 라는 kafka topic 으로 produce 하겠다는 뜻이다. 해당 manifest 로 생성해 보자.
kubectl create -f mqtt-connector.yaml
생성이 잘 되었다면, emqx dashboard 에서 위에서 설정한 것과 같이 kafka-connector 라는 구독자가 생긴 것을 확인할 수 있다.
MQTT 메세지가 Kafka Record 로 들어오는지 확인하기
먼저 mqtt listener 서비스를 localhost 로 노출시키자.
minikube service emqx-listeners -n mqtt
이전 게시글 처럼 test 토픽으로 메세지를 생성해 보자. mqttx 를 사용해도 좋고, 원하는 producer cli 를 사용해도 된다.
mqttx bench pub -c 10 -h localhost -p 52415 -t test -m 'Hello World'
EMQX Dashboard 를 보면, 초당 10개의 메세지가 잘 들어오는 것을 확인할 수 있다.
이제 생성된 메세지가 잘 Kafka Cluster 로 들어오는지 확인한다. Kafka Script 들을 사용하기 위해서 kafka connect container 안으로 접속해 보자.
kubectl exec -it -n kafka my-kafka-connect-connect-0 -- /bin/bash
cd bin
bin 디렉토리 안에 다양한 스크립트 파일이 있는 것을 알 수 있다. 그 중 kafka-console-consumer 를 사용해 보자.
$ ./kafka-console-consumer.sh --bootstrap-server my-kafka.kafka:9092 --topic mqtt
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
Hello World
...
mqtt 로 생성한 Hello World 라는 이미지가 들어오는 것이 확인되면 성공이다!
결론
MQTT 메세지를 Kafka 로 들어오게 하는 데에 성공했다. 사실 Kafka 를 제대로 쓰려면, Key 나 Partition 을 잘 활용하면 좋은데, Camel Connector 는 key 가 항상 null 이라서 구체적인 튜닝은 다소 어렵다. 하지만 open source 로 mqtt connector 을 구현할 수 있다는 점이 굉장한 장점인 것 같다.
Appendix
전체 코드는 Github Repo에서 확인할 수 있습니다.
GitHub - k2sebeom/kafka-connect-mqtt-camel-example
Contribute to k2sebeom/kafka-connect-mqtt-camel-example development by creating an account on GitHub.
github.com
'개발 일지' 카테고리의 다른 글
[Next.js] Next 앱 도커 이미지 다이어트하기 (1) | 2024.02.13 |
---|---|
[k8s] MQTT Broker Cluster 와 Kafka Cluster 를 이용한 Scalable 아키텍쳐 구성 (0) | 2024.01.25 |
[k8s] minikube 로 k8s 클러스터 실습하기 (0) | 2024.01.12 |
[Windows] NSIS 를 이용해서 Forms Application 배포하기 (0) | 2024.01.08 |
[Windows] NSIS 를 이용해서 설치 파일 패키징하기 (0) | 2024.01.08 |