PySpark - SparkSQL Structured Streaming Kafka

저번글에서는 Kafka와 Spark를 연결하기 위한 준비작업을 진행했는데 이번에는 직접
Spark 코드를 이용하여 kafka와 연동을 해보도록 하겠다.
만약 이글이 처음이라면 아래 링크에서 준비를 마치고 다시 진행해보도록 한다.
링크 : https://todaycodeplus.tistory.com/27
PySpark - Kafka Structured Streaming 설정
Spark 환경 Spark 3.1 Docker를 활용한 환경 구성 특이사항 - Docker와 Jupyter Notebook으로 구성된 image를 다운받아 생성 PySpark 환경에서 새로운 무언가를 실행할때 항상 여러가지 확인할 것들이 생기는데..
todaycodeplus.tistory.com
Spark sql에서 kafka와 연동하여 진행하는 Structured Streaming은 두가지 방식으로 진행할 수 있다.
1. read : kafka massege를 Batch처리하여 가져옴 실시간성 X
2. readstream : kafka에서 Massage를 실시간으로 Micro Batch처리하여 가져옴 실시간성 O
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, pandas_udf, split
#sparkSession 생성
spark = SparkSession \
.builder \
.appName("service") \
.getOrCreate()
# spark structured streaming을 이용
# kafka에서 데이터를 읽어와 dataframe 생성
df = spark \
.readStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("failOnDataLoss","False") \
.option("subscribe", "Topic Name") \
.load()
지금 이 코드에서는 .readstream 으로 실행하여 바로 스트리밍 처리를 위한 코드로 선언을 했다.
이렇게 할경우 Kafka에서 가져온 데이터를 확인 할 수 없다. 단순히 스키마의 구조만 확인할 수 있다.
만약 이상태로 데이터의 유무를 확인하고 싶다면
# row 개수 확인
df.count()
로 확인할 수 있다. 실제 내부 데이터를확인하기 위해서는 read로 처리하여 df.show()를 이용해서 데이터를 확인 할 수 있다.
df = spark \
.read\
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("failOnDataLoss","False") \
.option("subscribe", "Topic Name") \
.load()
df.show()
이후에 데이터를 처리하는 방법에 대해서는 천천히 다른글에서 더 알아보도록 하고
마지막 이 Batch혹은 Micro Batch를 다시 다른 프로그램 혹은 콘솔로 내보내는 법을 보도록 하자.
ddf = df1.select(to_json(<<your columns data>>)).alias("value"))\
.writeStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "<<Topic Name>>") \
.option("checkpointLocation", "data/checkpoint/")\
.start()
ddf.awaitTermination()
처리된 DataFrame에서 내가 내보낼 Column값들을 선택하고 이를 Json형태로 변환하여 Value라는 Column으로 묶어서 kafka로 보낼 수 있다.
데이터를 내보낼때 이를 어디까지 보냈는지 확인하기위한 CheckPoint를 지정하는데 특정 디렉토리를 지정해 주면 된다.