2021. 7. 13. 15:27ㆍSpark
저번글에서는 Kafka와 Spark를 연결하기 위한 준비작업을 진행했는데 이번에는 직접
Spark 코드를 이용하여 kafka와 연동을 해보도록 하겠다.
만약 이글이 처음이라면 아래 링크에서 준비를 마치고 다시 진행해보도록 한다.
링크 : https://todaycodeplus.tistory.com/27
PySpark - Kafka Structured Streaming 설정
Spark 환경 Spark 3.1 Docker를 활용한 환경 구성 특이사항 - Docker와 Jupyter Notebook으로 구성된 image를 다운받아 생성 PySpark 환경에서 새로운 무언가를 실행할때 항상 여러가지 확인할 것들이 생기는데..
todaycodeplus.tistory.com
Spark sql에서 kafka와 연동하여 진행하는 Structured Streaming은 두가지 방식으로 진행할 수 있다.
1. read : kafka massege를 Batch처리하여 가져옴 실시간성 X
2. readstream : kafka에서 Massage를 실시간으로 Micro Batch처리하여 가져옴 실시간성 O
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, pandas_udf, split
#sparkSession 생성
spark = SparkSession \
.builder \
.appName("service") \
.getOrCreate()
# spark structured streaming을 이용
# kafka에서 데이터를 읽어와 dataframe 생성
df = spark \
.readStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("failOnDataLoss","False") \
.option("subscribe", "Topic Name") \
.load()
지금 이 코드에서는 .readstream 으로 실행하여 바로 스트리밍 처리를 위한 코드로 선언을 했다.
이렇게 할경우 Kafka에서 가져온 데이터를 확인 할 수 없다. 단순히 스키마의 구조만 확인할 수 있다.
만약 이상태로 데이터의 유무를 확인하고 싶다면
# row 개수 확인
df.count()
로 확인할 수 있다. 실제 내부 데이터를확인하기 위해서는 read로 처리하여 df.show()를 이용해서 데이터를 확인 할 수 있다.
df = spark \
.read\
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("failOnDataLoss","False") \
.option("subscribe", "Topic Name") \
.load()
df.show()
이후에 데이터를 처리하는 방법에 대해서는 천천히 다른글에서 더 알아보도록 하고
마지막 이 Batch혹은 Micro Batch를 다시 다른 프로그램 혹은 콘솔로 내보내는 법을 보도록 하자.
ddf = df1.select(to_json(<<your columns data>>)).alias("value"))\
.writeStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "<<Topic Name>>") \
.option("checkpointLocation", "data/checkpoint/")\
.start()
ddf.awaitTermination()
처리된 DataFrame에서 내가 내보낼 Column값들을 선택하고 이를 Json형태로 변환하여 Value라는 Column으로 묶어서 kafka로 보낼 수 있다.
데이터를 내보낼때 이를 어디까지 보냈는지 확인하기위한 CheckPoint를 지정하는데 특정 디렉토리를 지정해 주면 된다.
'Spark' 카테고리의 다른 글
Spark - Apache Spark란.. (0) | 2021.07.21 |
---|---|
Spark - Docker로 Spark Cluster + Jupyterlab 구성 (2) | 2021.07.21 |
PySpark - Kafka Structured Streaming 설정 (0) | 2021.07.13 |
Spark SQL - DataFrame Row 개수 구하기 (0) | 2021.07.13 |
PySpark - Azure Event Hub Structured Streaming 설정 (0) | 2021.07.05 |