kafka - Debezium kafka Connector K8s에 설치 및 확인

2023. 5. 27. 19:29kafka

오늘은 Kafka Connector 중 하나인 Debezium Kafka Connector를 한번 설치해보고 어디에다 써먹는지 알아보겠습니다.

 

우선 Kafka Connector는 크게 두종류로 나눕니다.

 

1.  Kafka Source Connector 

   Source 즉 원본 데이터가 있을때 해당 데이터를 kafka로 전송해 주는 역할을 합니다.

    ex) DB Souce Connector : DB의 변경사항을 Catch 하여 Kafka로 변경사항을 Message로 전송합니다.

2. Kafka Sync Connector

   Kafka에 있는 데이터를 어딘가에 전송하거나 실시간으로 가공하여 전송하는 기능을 합니다. 

    ex) S3 Sync Connector : Kafka의 특정 Topic을 구독하여 메세지를 micro Batch 형식으로 끊어서 S3에 파일로 저장해주는 기능을가지고 있습니다.

이중에서 Debezium Kafka Connector는 첫번째 경우에 속하며 

 

RDS, Postgresql, Mysql, MariaDB등의 Source로 부터 데이터 변경점을 Catch하여 Message를 발행하는 Connector로 각각 DB에 Bin_log를 참고하여 동작합니다.

 

그러면 이것을 어디에 사용하느냐! -> 아래 링크에서 확인하세요!

<<링크 삽입>>

 

이제 설치를 한번 해봅시다.

 

준비물

1. K8s Cluster

2. Kubectl CLI 설정 완료

3. kafka Cluster 

4. 원활한 확인을 위한 kafka ui. -> 만약 없다면! 다음 링크를 참고하세요 https://todaycodeplus.tistory.com/58

 

Kafka - UI (UI for Apache Kafka)설치

우리가 Kafka를 사용할 때 CLI를 사용해서 Topic을 컨트롤 하거나 아니면 상태를 보거나 하는일을 하게되는데 이게 생각보다 많이 불편함을 느낄 수 있습니다. 회사에서 운영하고 있는 카프카 클러

todaycodeplus.tistory.com

 

자 그럼 마찬가지로 처음 할 일은 Yaml 파일을 작성하는 일 입니다. 

!!! 저는 2.1 버전을 사용했습니다. 참고하세요

 

debezium-kafka-con.yaml

apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app: kafcon
  name: kafcon
  namespace: cdc-pipe
spec:
  serviceName: kafcon
  replicas: 2
  selector:
    matchLabels:
      app: kafcon
  template:
    metadata:
      labels:
        app: kafcon
    spec:
      containers:
      - name: kafka-connector-rds
        image: quay.io/debezium/connect:2.1
        imagePullPolicy: Always
        env:
        # 이부분에 Kafka Cluster의 URL이 들어갑니다.
        - name: BOOTSTRAP_SERVERS
          value: kafka-dev-cp-kafka-0.kafka:9092, kafka-dev-cp-kafka-1.kafka:9092, kafka-dev-cp-kafka-2.kafka:9092
        # kafka Connector의 Group ID를 설정합니다.
        - name: GROUP_ID
          value: cdc-pipeline
        #내가 connector에 저장한 config값 저장할 Topic이름 설정
        - name: CONFIG_STORAGE_TOPIC
          value: cdc_pipeline_config
        #내가 connector가 읽어오는 bin_log의 offset을 저장할 Topic이름 설정
        - name: OFFSET_STORAGE_TOPIC
          value: cdc_pipeline_offsets
        #내가 connector에 저장한 status값을 저장할 Topic이름 설정  
        - name: STATUS_STORAGE_TOPIC
          value: cdc_pipeline_statuses
        # 위에서 설정한 3개의 Topic의 ReplicaSet 설정
        - name: CONFIG_STORAGE_REPLICATION_FACTOR
          value: "3"
        - name: OFFSET_STORAGE_REPLICATION_FACTOR
          value: "3"
        - name: STATUS_STORAGE_REPLICATION_FACTOR
          value: "3"
        ports:
        - containerPort: 8083
          name: kafcon
        readinessProbe:
          httpGet:
            port: 8083
            path: /connectors
          initialDelaySeconds: 15
          periodSeconds: 15
        livenessProbe:
          httpGet:
            port: 8083
            path: /connectors
          initialDelaySeconds: 15
          periodSeconds: 15
  volumeClaimTemplates:
  - metadata:
      name: kafcon-rds-data
    spec:
      accessModes:
        - "ReadWriteOnce"
      resources:
        requests:
          storage: 1Gi
      storageClassName: gp2
      volumeMode: Filesystem

두번째로 할일은 배포 하는일 입니다.

kubectl apply -f ./debezium-kafka-con.yaml -n cdc-pipe

이제 배포가 완료됐습니다. 그러면 Pod/StatefulSet/PV등이 모두 잘 있는 지 확인 해 주시고

 

 

kafka ui를 들어가서 설정한 Config, Offset, Status에 대한 Topic이 생성 됐는지 확인해 봅시다.

생성된 Topic 확인

 

이렇게 Connector 자체에 대한 배포는 완료가 됐습니다. 

 

실제로 CDC-Pipeline을 배포하는것은 다음글을 참고하세요!

'kafka' 카테고리의 다른 글

Kafka - UI (UI for Apache Kafka) K8s에 설치  (0) 2023.05.25
kafka - Topic Partition 개수 설계  (0) 2021.07.21
Kafka 기본 명령어  (0) 2021.06.26
아파치 카프카 기본개념 (Apache Kafka)  (0) 2021.05.28