2023. 5. 27. 19:29ㆍkafka
오늘은 Kafka Connector 중 하나인 Debezium Kafka Connector를 한번 설치해보고 어디에다 써먹는지 알아보겠습니다.
우선 Kafka Connector는 크게 두종류로 나눕니다.
1. Kafka Source Connector
Source 즉 원본 데이터가 있을때 해당 데이터를 kafka로 전송해 주는 역할을 합니다.
ex) DB Souce Connector : DB의 변경사항을 Catch 하여 Kafka로 변경사항을 Message로 전송합니다.
2. Kafka Sync Connector
Kafka에 있는 데이터를 어딘가에 전송하거나 실시간으로 가공하여 전송하는 기능을 합니다.
ex) S3 Sync Connector : Kafka의 특정 Topic을 구독하여 메세지를 micro Batch 형식으로 끊어서 S3에 파일로 저장해주는 기능을가지고 있습니다.
이중에서 Debezium Kafka Connector는 첫번째 경우에 속하며
RDS, Postgresql, Mysql, MariaDB등의 Source로 부터 데이터 변경점을 Catch하여 Message를 발행하는 Connector로 각각 DB에 Bin_log를 참고하여 동작합니다.
그러면 이것을 어디에 사용하느냐! -> 아래 링크에서 확인하세요!
<<링크 삽입>>
이제 설치를 한번 해봅시다.
준비물
1. K8s Cluster
2. Kubectl CLI 설정 완료
3. kafka Cluster
4. 원활한 확인을 위한 kafka ui. -> 만약 없다면! 다음 링크를 참고하세요 https://todaycodeplus.tistory.com/58
자 그럼 마찬가지로 처음 할 일은 Yaml 파일을 작성하는 일 입니다.
!!! 저는 2.1 버전을 사용했습니다. 참고하세요
debezium-kafka-con.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
app: kafcon
name: kafcon
namespace: cdc-pipe
spec:
serviceName: kafcon
replicas: 2
selector:
matchLabels:
app: kafcon
template:
metadata:
labels:
app: kafcon
spec:
containers:
- name: kafka-connector-rds
image: quay.io/debezium/connect:2.1
imagePullPolicy: Always
env:
# 이부분에 Kafka Cluster의 URL이 들어갑니다.
- name: BOOTSTRAP_SERVERS
value: kafka-dev-cp-kafka-0.kafka:9092, kafka-dev-cp-kafka-1.kafka:9092, kafka-dev-cp-kafka-2.kafka:9092
# kafka Connector의 Group ID를 설정합니다.
- name: GROUP_ID
value: cdc-pipeline
#내가 connector에 저장한 config값 저장할 Topic이름 설정
- name: CONFIG_STORAGE_TOPIC
value: cdc_pipeline_config
#내가 connector가 읽어오는 bin_log의 offset을 저장할 Topic이름 설정
- name: OFFSET_STORAGE_TOPIC
value: cdc_pipeline_offsets
#내가 connector에 저장한 status값을 저장할 Topic이름 설정
- name: STATUS_STORAGE_TOPIC
value: cdc_pipeline_statuses
# 위에서 설정한 3개의 Topic의 ReplicaSet 설정
- name: CONFIG_STORAGE_REPLICATION_FACTOR
value: "3"
- name: OFFSET_STORAGE_REPLICATION_FACTOR
value: "3"
- name: STATUS_STORAGE_REPLICATION_FACTOR
value: "3"
ports:
- containerPort: 8083
name: kafcon
readinessProbe:
httpGet:
port: 8083
path: /connectors
initialDelaySeconds: 15
periodSeconds: 15
livenessProbe:
httpGet:
port: 8083
path: /connectors
initialDelaySeconds: 15
periodSeconds: 15
volumeClaimTemplates:
- metadata:
name: kafcon-rds-data
spec:
accessModes:
- "ReadWriteOnce"
resources:
requests:
storage: 1Gi
storageClassName: gp2
volumeMode: Filesystem
두번째로 할일은 배포 하는일 입니다.
kubectl apply -f ./debezium-kafka-con.yaml -n cdc-pipe
이제 배포가 완료됐습니다. 그러면 Pod/StatefulSet/PV등이 모두 잘 있는 지 확인 해 주시고
kafka ui를 들어가서 설정한 Config, Offset, Status에 대한 Topic이 생성 됐는지 확인해 봅시다.
이렇게 Connector 자체에 대한 배포는 완료가 됐습니다.
실제로 CDC-Pipeline을 배포하는것은 다음글을 참고하세요!
'kafka' 카테고리의 다른 글
Kafka - UI (UI for Apache Kafka) K8s에 설치 (0) | 2023.05.25 |
---|---|
kafka - Topic Partition 개수 설계 (0) | 2021.07.21 |
Kafka 기본 명령어 (0) | 2021.06.26 |
아파치 카프카 기본개념 (Apache Kafka) (0) | 2021.05.28 |