프로그래밍/Spark

[Spark] Spark 스트리밍 쿼리

장경훈 2022. 11. 28. 11:16

 

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다.

*스트리밍 쿼리 처리 단계

1. 입력 소스 지정

스트리밍 처리도 배치 처리와 동일하게 데이터 프레임을 지정해야 한다. 배치에서는 spark.read를 사용 했지만 스트리밍에서는 spark.readStream을 사용해주면 된다.

#파이썬 예제

stream_df=(spark
	.readStream.format("socket")
    .option("host" ,"localhost")
    .option("port", 9999)
    .load())

 

2. 데이터 변형

데이터 변형 작업은 말 그대로 입력받은 데이터를 어떻게 처리할지를 프로그래밍하는 단계이다. 

#예시
from pyspark.sql.functions import *

words=stream_df.select(split(col("value"),"\\s").alias("word"))
counts = words.groupBy("word").count()

위의 예시처럼 입력받은 데이터 즉 들어온 데이터를 뛰어쓰기를 기준으로 분리한 후 그룹핑하는 코드를 사용할 수 있다.

 

3.출력 싱크와 모드 결정

데이터 변환을 한 뒤에는 출력을 어떻게 할지 설정 할 수 있는 옵션이 있다.

 

*출력 방식

  • 추가 모드 - 기본 모드이며 출력된 모든 행이 이후의 쿼리에 의해 변경되지 않는다.
  • 전체 모드 - 모든 행이 매번 트리거 될 때 마지막 출력 대상이 된다. 보통 결과 테이블이 이전 테이블에 비해서 현저히 작아 메모리 유지가 되는 경우에 사용할 수 있다.
  • 업데이트 모드 - 출력 행이 추후에 쿼리에 의해서 수정될 수 있음. 추가 모드의 반대.
#전체모드 예시코드

writer = counts.writeStream.format("console").outputMode("complete")

 

4. 처리 세부사항 지정

데이터를 처리하는 방법을 정하는 단계이다. 예시부터 보면 다음과 같다.

check_dir=""
new_write=( writer
	.trigger(processingTime = "1 second")
   	.option("checkpointLocation", check_dir))

 

*트리거링 옵션 

  • 기본(default) - 따로 지정하지 앟는다면 앞선 배치가 완료되자 마자 다음 배치가 실행되는 곳부터 데이터를 실행
  • processingTime - 트리거 간격에 따른 시간을 설정할 수 있다
  • once(한번 실행) - 한번의 배치에서 모든 새로운 데이터를 처리한 뒤 멈춘다. 보통 외부 스케줄러와 같이 사용
  • continuos(연속 실행) - 배치 단위 대신 연속적으로 데이터를 처리하는 모드

 

*체크포인트 위치 옵션

  • 진행 중인 데이터 상황을 저장하는 디렉토리를 명시하는 옵션이다. 실패한 쿼리가 다시 재시작하기 위해 사용한다.

 

5. 쿼리 시작

모든 것을 설정 완료 했다면 쿼리를 실행한다.

Streaming=new_write2.start()
  • start()는 실행 즉시 객체를 리턴한다
  • start() 대신 awaitTermination()을 사용하면 ctrl+c로 중지시킬 수 있다.
  • stop()을 사용하면 진행중인 쿼리를 중단시킬 수 있다.

 

*동작 중인 쿼리 모니터링 방법

  • lastProgress() - 가장 마지막에 완료된 마이크로 배치에 대한 정보를 볼 수 있음
  • status() - 현재 상태를 볼 수 있음.
  • Dropwizard Metrics 라이브러리를 사용하서 수치들을 다른 모니터링 프레임워크에 제공 할 수 있음 (사용시 streaming.metricsEnabled 옵션을 true로 설정)
  • 자체 제작한 StreamingQueryListener을 사용하여 모니터링 할 수 있음