kafka 9

DataPipeline - Kafka Connector를 이용한 CDC Pipeline

우선 CDC 파이프라인이 무엇을 의미하는지 알아봅시다! Caht GPT를 활용해서 내용을 한번 적어보았습니다. CDC (Change Data Capture) 파이프라인은 데이터베이스 시스템에서 변경된 데이터를 실시간으로 감지하고 캡처하여 다른 시스템으로 전달하는 과정을 자동화하는 방법입니다. 1. 로그 모니터링: 데이터베이스 시스템의 로그를 모니터링하여 변경 사항을 감지합니다. 로그에는 데이터베이스에 수행된 삽입, 업데이트, 삭제 등의 작업이 기록됩니다. 2. 변경 사항 캡처: 모니터링된 로그에서 변경 사항을 식별하고 캡처합니다. 이 단계에서 변경된 데이터의 내용과 유형에 따라 필요한 처리가 수행될 수 있습니다. 3. 변경 데이터 저장: 캡처된 변경 데이터는 일시적으로 저장되거나, 대규모 시스템에서는 분..

DataPipeline 2023.05.28

kafka - Debezium kafka Connector K8s에 설치 및 확인

오늘은 Kafka Connector 중 하나인 Debezium Kafka Connector를 한번 설치해보고 어디에다 써먹는지 알아보겠습니다. 우선 Kafka Connector는 크게 두종류로 나눕니다. 1. Kafka Source Connector Source 즉 원본 데이터가 있을때 해당 데이터를 kafka로 전송해 주는 역할을 합니다. ex) DB Souce Connector : DB의 변경사항을 Catch 하여 Kafka로 변경사항을 Message로 전송합니다. 2. Kafka Sync Connector Kafka에 있는 데이터를 어딘가에 전송하거나 실시간으로 가공하여 전송하는 기능을 합니다. ex) S3 Sync Connector : Kafka의 특정 Topic을 구독하여 메세지를 micro B..

kafka 2023.05.27

Kafka - UI (UI for Apache Kafka) K8s에 설치

우리가 Kafka를 사용할 때 CLI를 사용해서 Topic을 컨트롤 하거나 아니면 상태를 보거나 하는일을 하게되는데 이게 생각보다 많이 불편함을 느낄 수 있습니다. 회사에서 운영하고 있는 카프카 클러스터를 관리하는데 많이 유용하게 사용되고 있어서 한번 소개 해 봅니다. 우선 우리 회사는 kafka를 k8s위에서 관리하고 있고, 그렇기 때문에 이번에 Kafka UI를 올리는 과정을 소개함에 있어서 Yaml파일을 작성할겁니다. 준비물이 있습니다. 1. 운영중 혹은 개발용인 카프카가 있어야 합니다. 2. K8s가 있어야 합니다. 3. 개발 환경에 kubectl과 같은 k8s CLI Tool이 있어야 합니다. 첫번째 Kafka UI는 어떤걸 사용하는가 이 툴에 대해서 다른 툴과 비교한 내용이 있는데 참고 해보기..

kafka 2023.05.25

kafka - Topic Partition 개수 설계

카프카에 파티션 수가 늘어날 수록 처리량을 올라가지만, 그에 따른 반작용이 있다. 바로 Latency 증가하고 recovery time 이 늘어난다. 그래서 무작정 파티션 수를 늘리는 것보다, 적절한 목표 처리량을 설정하고 그에 맞춰 파티션 개수를 산정해야한다. 이어 기능상에 대해서도 고려를 해야한다. 카프카 파티션 수 설계 가이드 파티션 수를 증가하면 처리량이 높아짐 (분산효과) 프로듀서, 브로커 : write 처리량이 증가, hardware 자원 사용 증가 컨슈머 : 파티션 수만큼 병렬 쓰레드 구동해서 처리량 증가 목표 처리량 (t) 이 있다면, 파티션 개수를 정할 수 있음 단일 파티션 쓰기 속도 계산 (p) 단일 파티션 읽기 속도 계산 (c) 파티션 개수 = max (t/p, t/c) 고려사항 ba..

kafka 2021.07.21

Python - Kafka Producer 구성

kafka에 message를 보내기 위해서 파이썬으로 프로듀서를 구성해보았다. 우선먼저 필요사항 Python kafka-python 그리하여 먼저 Kafak-python 라이브러리를 설치한다. !pip install kafka-python 아주 가뿐하게 설치를 하고 나면 이제야 비로서 코드를 한번 알아보도록 하자. from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') for _ in range(100): producer.send('foobar', b'some_message_bytes') 세상 너무 간단하다.. 정말 이것으로 우리가 원하는 메세지를 생성할 수 있나? 실제로 가능하지만.. 너무 기..

Python 2021.07.21

PySpark - Kafka Structured Streaming 설정

Spark 환경 Spark 3.1 Docker를 활용한 환경 구성 특이사항 - Docker와 Jupyter Notebook으로 구성된 image를 다운받아 생성 PySpark 환경에서 새로운 무언가를 실행할때 항상 여러가지 확인할 것들이 생기는데 그중하나가 Spark Dir에 있는 jars 내부 jar파일들이다. kafka와 연동하기 위한 jar 파일은 maven에서 구할수 있다. 아래 두가지 파일을 구하여 jars 디렉토리에 넣어준다. kafka-clients-2.6.0.jar spark-sql-kafka-0-10_2.12-3.1.1.jar spark-token-provider-kafka-0-10_2.12-3.1.1.jar 내 환경에서는 cd /usr/local/spark/jars 로 이동하면 위 링크..

Spark 2021.07.13

Logstash - Input Kafka

Data를 손실없이 처리하는 Massege Queue방식의 카프카로 부터 안정적으로 Data를 받아 처리 할 수 있도록 kafka로부터 Data를 받는 방법에 대하여 알아보자. input { kafka { bootstrap_servers => ["kafka:9092"] # kafka server & Port 지정 topics => [""] # Topic 이름 지정 codec => json #Data Codec 지정 auto_offset_reset => "earliest" #Offset 지정 consumer_threads => 1 # Consumer 처리 Thread갯수 지정 type => "kafka" # Plugin type지정 } } 이중 기능이 좀 필요한 것들 몇개만 더 파보자 1. Auto_off..

ELK Stack 2021.07.09

Logstash - Output Kafka

Logstash는 오픈소스 데이터 수집 프로그램으로 다양한 플러그인을 지원하여 여러가지 데이터 소스를 효율적으로 처리하고 처리된 데이터를 원하는 곳으로 전송하는 기능을 담당하고 있다. 이 데이터를 전송받는 프로그램중 가장 많이 사용하고 활용도가 높은것이 Kafka인데 Logstash를 활용하여 Kafka로 데이터를 전송해보자. 사실 너무 간단해서 소개할것도 없다. output { kafka { codec => json topic_id => "" bootstrap_servers => ["kafka:9092"] } } Logstsh의 Output plugin부분에 위와같은 코드로 사용할 수 있다. Bootstrap-server 같은경우에는 본인이 Docker Container로 구성했다면 해당 Contain..

ELK Stack 2021.07.09

Kafka 기본 명령어

카프카 구성후 사용하는 기본 명령어 List 1. Topic List 조회 kafka-topics --list --bootstrap-server :9092 2. 신규 Topic 생성 --> Streaming 처리중 Logstash 또는 Spark에 의해서 자동으로 Topic이 생성되나 토픽을 수동으로 생성할때 사용 kafka-topics --create --bootstrap-server :9092 --replication-factor --partitions --topic --> Replication : Cluster내부 복제 Topic의 개수 설정 Kafka 기본설정에서 Default 값 설정 가능 --> Partition : Topic 내부 Partition 초기 개수 설정 Kafka 기본설정에서 De..

kafka 2021.06.26