DataPipeline - Kafka Connector를 이용한 CDC Pipeline

2023. 5. 28. 23:14DataPipeline

우선 CDC 파이프라인이 무엇을 의미하는지 알아봅시다!

 

Caht GPT를 활용해서 내용을 한번 적어보았습니다.

CDC (Change Data Capture) 파이프라인은 데이터베이스 시스템에서 변경된 데이터를 실시간으로 감지하고 캡처하여 다른 시스템으로 전달하는 과정을 자동화하는 방법입니다.

1. 로그 모니터링: 데이터베이스 시스템의 로그를 모니터링하여 변경 사항을 감지합니다. 로그에는 데이터베이스에 수행된 삽입, 업데이트, 삭제 등의 작업이 기록됩니다.

2. 변경 사항 캡처: 모니터링된 로그에서 변경 사항을 식별하고 캡처합니다. 이 단계에서 변경된 데이터의 내용과 유형에 따라 필요한 처리가 수행될 수 있습니다.

3. 변경 데이터 저장: 캡처된 변경 데이터는 일시적으로 저장되거나, 대규모 시스템에서는 분산 데이터 저장소에 저장될 수 있습니다. 이는 변경 데이터의 안정적인 보존과 추적을 가능하게 합니다.

4. 변경 데이터 전달: 캡처된 변경 데이터는 다른 시스템이나 애플리케이션으로 전달됩니다. 이 단계에서는 데이터 변환, 필터링, 정렬 등의 작업이 수행될 수 있습니다. 변경 데이터는 실시간으로 전달되어 다른 시스템의 데이터를 업데이트하거나 분석 등의 용도로 활용될 수 있습니다.

CDC 파이프라인은 데이터베이스 간의 데이터 통합, 데이터 웨어하우징, 비즈니스 인텔리전스, 데이터 분석 등 다양한 시나리오에서 활용될 수 있습니다. 이를 통해 실시간으로 데이터를 동기화하고 변화를 감지하여 업데이트하는 프로세스를 자동화할 수 있습니다.

 

 

다음과 같은 내용에서 알 수 있듯 실시간으로 변경점을 반영하여 다른 시스템에 전달한다. 입니다.

 

우리는 Debezium Kafka Connector를 활용해서 실제로 RDS 혹은 MariaDB, MySql에서 데이터 변환점을 메세지로 Kafka로 전송하는 방법을 알아볼 것 입니다.

 

Debezium Kafka connector설치가 궁금하다면 -> https://todaycodeplus.tistory.com/59

 

kafka - Debezium kafka Connector K8s에 설치 및 확인

오늘은 Kafka Connector 중 하나인 Debezium Kafka Connector를 한번 설치해보고 어디에다 써먹는지 알아보겠습니다. 우선 Kafka Connector는 크게 두종류로 나눕니다. 1. Kafka Source Connector Source 즉 원본 데이터가

todaycodeplus.tistory.com

준비물 

  1. Jupyter Notebook + Python + Requests Lib
  2. Kafka Cluster
  3. Debezium kafka Connector
  4. DB ( MariaDB, MySql, RDS(mariaDB, MySql)
  5. DB Bin_log 설정이 완료된 상태
    Bin_log 설정은 반드시 필요합니다. 꼭꼭 확인 해 주세요!

자 준비물이 완료됐으면 시작합시다.

 

먼저 Jupyter Notebook에 코드를 작성합니다. 정상적으로 커넥터가 배포 됐다면 8083포트로 접근하여 API를 사용할 수 있습니다. 
API 정의는 -> https://docs.confluent.io/platform/current/connect/references/restapi.html

 

Connect REST Interface | Confluent Documentation

Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port 8083. When executed in distributed mode, the REST API will be the primary interface to the cluster. You can ma

docs.confluent.io

코드를 작성합니다.

import json
import requests

headers = {"Accept":"application/json"}

# Deployment Configuration
cdc_pipe ={
  "name": "test_cdc",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",  
    "database.hostname": "mysql-db-url0",  
    "database.port": "3306",
    "database.user": "testuser",
    "database.password": "testuser123!",
    "database.server.id": "11223344",
    "database.history.kafka.bootstrap.servers": "kafka-0.kafka:9092, \
        kafka-1.kafka:9092, kafka-2.kafka:9092",
    "database.history.kafka.topic": "dbhistory.test",
    "topic.prefix": "DataCDC",  
    "table.include.list": "user.user_name",  
    "schema.history.internal.kafka.bootstrap.servers": "kafka-0.kafka:9092, \
        kafka-1.kafka:9092, kafka-2.kafka:9092",  
    "schema.history.internal.kafka.topic": "schema-changes.test",
    "snapshot.mode": "schema_only"
  }
}

headers = {"Accept":"application/json"}

# Deployment -- Post
rep = requests.post(url="http://rds-kafka-connector-0:8083/connectors", headers=headers,  json=cdc_pipe)

이렇게 하면 바로 배포가 이루어집니다. 

여기서 중요하게 생각해야할 변수 값들을 알아보면

  • database.history.kafka.bootstrap.server : DB변경점을 저장할 kafka Cluster를 지정합니다.
  • topic.prefix : 생성할 Topic앞에 붙을 prefix값을 지정합니다.
  • schema.history.kafka.bootstrap.server : Schema 변경점을 저장할 kafka Cluster를 지정합니다.
  • Snapshot.mode : cdc변경점을 어디까지 읽었는지 기록하는 Snapshot을 저장할 유형을 지정합니다.
    • schema_only를 지정하면 bin_log를 모두 읽지 않고 현재 가장 최근 시점의 변경점 부터 읽어옵니다. 그래서 앞서 쌓인 변경점이 많은경우 이 옵션을 지정하여 현재 시점부터 변경점을 읽어오고, 그 이전시점의 변경점은 DB로 부터 마이그레이션 해서 사용하게 됩니다. 

배포 하여주면 kafka UI에서 지정한 Topic들이 생성된것을 확인하면 완료가 됩니다. 배포를 직접 해보고 오류가 날 경우 디버깅하는 과정을 통해서 아마 사용방법을 더 깊게 배울 수 있을겁니다. 

 

단순히 글로는 다 표현이 안되네요 ㅠㅠ

 

추가로 저 config 설정에 들어가는 값에 대한 자세한 설명은 다음 링크를 참고하세요

https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties

 

Debezium connector for MySQL :: Debezium Documentation

Time, date, and timestamps can be represented with different kinds of precision, including: adaptive_time_microseconds (the default) captures the date, datetime and timestamp values exactly as in the database using either millisecond, microsecond, or nanos

debezium.io