PySpark - SparkSQL Structured Streaming Kafka

2021. 7. 13. 15:27Spark

저번글에서는 Kafka와 Spark를 연결하기 위한 준비작업을 진행했는데 이번에는 직접

Spark 코드를 이용하여 kafka와 연동을 해보도록 하겠다. 

 

만약 이글이 처음이라면 아래 링크에서 준비를 마치고 다시 진행해보도록 한다.

링크 : https://todaycodeplus.tistory.com/27

 

PySpark - Kafka Structured Streaming 설정

Spark 환경 Spark 3.1 Docker를 활용한 환경 구성 특이사항 - Docker와 Jupyter Notebook으로 구성된 image를 다운받아 생성 PySpark 환경에서 새로운 무언가를 실행할때 항상 여러가지 확인할 것들이 생기는데..

todaycodeplus.tistory.com

 

Spark sql에서 kafka와 연동하여 진행하는  Structured Streaming은 두가지 방식으로 진행할 수 있다.

1. read : kafka massege를 Batch처리하여 가져옴 실시간성 X 

2. readstream : kafka에서 Massage를 실시간으로 Micro Batch처리하여 가져옴 실시간성 O

 

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, pandas_udf, split

#sparkSession 생성
spark = SparkSession \
    .builder \
    .appName("service") \
    .getOrCreate()

# spark structured streaming을 이용
# kafka에서 데이터를 읽어와 dataframe 생성
df = spark \
    .readStream\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("failOnDataLoss","False") \
    .option("subscribe", "Topic Name") \
    .load()

지금 이 코드에서는  .readstream 으로 실행하여 바로 스트리밍 처리를 위한 코드로 선언을 했다. 

 

이렇게 할경우 Kafka에서 가져온 데이터를 확인 할 수 없다. 단순히 스키마의 구조만 확인할 수 있다. 

만약 이상태로 데이터의 유무를 확인하고 싶다면

# row 개수 확인
df.count()

로 확인할 수 있다. 실제 내부 데이터를확인하기 위해서는 read로 처리하여 df.show()를 이용해서 데이터를 확인 할 수 있다.

df = spark \
    .read\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("failOnDataLoss","False") \
    .option("subscribe", "Topic Name") \
    .load()
    
df.show()

이후에 데이터를 처리하는 방법에 대해서는 천천히 다른글에서 더 알아보도록 하고 

마지막 이 Batch혹은 Micro Batch를 다시 다른 프로그램 혹은 콘솔로 내보내는 법을 보도록 하자.

 

ddf = df1.select(to_json(<<your columns data>>)).alias("value"))\
    .writeStream\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "<<Topic Name>>") \
    .option("checkpointLocation", "data/checkpoint/")\
    .start()

ddf.awaitTermination()

처리된 DataFrame에서 내가 내보낼 Column값들을 선택하고 이를 Json형태로 변환하여 Value라는 Column으로 묶어서 kafka로 보낼 수 있다.

 

데이터를 내보낼때 이를 어디까지 보냈는지 확인하기위한 CheckPoint를 지정하는데 특정 디렉토리를 지정해 주면 된다.