이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다.
*스트리밍 쿼리 처리 단계
1. 입력 소스 지정
스트리밍 처리도 배치 처리와 동일하게 데이터 프레임을 지정해야 한다. 배치에서는 spark.read를 사용 했지만 스트리밍에서는 spark.readStream을 사용해주면 된다.
#파이썬 예제
stream_df=(spark
.readStream.format("socket")
.option("host" ,"localhost")
.option("port", 9999)
.load())
2. 데이터 변형
데이터 변형 작업은 말 그대로 입력받은 데이터를 어떻게 처리할지를 프로그래밍하는 단계이다.
#예시
from pyspark.sql.functions import *
words=stream_df.select(split(col("value"),"\\s").alias("word"))
counts = words.groupBy("word").count()
위의 예시처럼 입력받은 데이터 즉 들어온 데이터를 뛰어쓰기를 기준으로 분리한 후 그룹핑하는 코드를 사용할 수 있다.
3.출력 싱크와 모드 결정
데이터 변환을 한 뒤에는 출력을 어떻게 할지 설정 할 수 있는 옵션이 있다.
*출력 방식
- 추가 모드 - 기본 모드이며 출력된 모든 행이 이후의 쿼리에 의해 변경되지 않는다.
- 전체 모드 - 모든 행이 매번 트리거 될 때 마지막 출력 대상이 된다. 보통 결과 테이블이 이전 테이블에 비해서 현저히 작아 메모리 유지가 되는 경우에 사용할 수 있다.
- 업데이트 모드 - 출력 행이 추후에 쿼리에 의해서 수정될 수 있음. 추가 모드의 반대.
#전체모드 예시코드
writer = counts.writeStream.format("console").outputMode("complete")
4. 처리 세부사항 지정
데이터를 처리하는 방법을 정하는 단계이다. 예시부터 보면 다음과 같다.
check_dir=""
new_write=( writer
.trigger(processingTime = "1 second")
.option("checkpointLocation", check_dir))
*트리거링 옵션
- 기본(default) - 따로 지정하지 앟는다면 앞선 배치가 완료되자 마자 다음 배치가 실행되는 곳부터 데이터를 실행
- processingTime - 트리거 간격에 따른 시간을 설정할 수 있다
- once(한번 실행) - 한번의 배치에서 모든 새로운 데이터를 처리한 뒤 멈춘다. 보통 외부 스케줄러와 같이 사용
- continuos(연속 실행) - 배치 단위 대신 연속적으로 데이터를 처리하는 모드
*체크포인트 위치 옵션
- 진행 중인 데이터 상황을 저장하는 디렉토리를 명시하는 옵션이다. 실패한 쿼리가 다시 재시작하기 위해 사용한다.
5. 쿼리 시작
모든 것을 설정 완료 했다면 쿼리를 실행한다.
Streaming=new_write2.start()
- start()는 실행 즉시 객체를 리턴한다
- start() 대신 awaitTermination()을 사용하면 ctrl+c로 중지시킬 수 있다.
- stop()을 사용하면 진행중인 쿼리를 중단시킬 수 있다.
*동작 중인 쿼리 모니터링 방법
- lastProgress() - 가장 마지막에 완료된 마이크로 배치에 대한 정보를 볼 수 있음
- status() - 현재 상태를 볼 수 있음.
- Dropwizard Metrics 라이브러리를 사용하서 수치들을 다른 모니터링 프레임워크에 제공 할 수 있음 (사용시 streaming.metricsEnabled 옵션을 true로 설정)
- 자체 제작한 StreamingQueryListener을 사용하여 모니터링 할 수 있음
'프로그래밍 > Spark' 카테고리의 다른 글
[Spark] 스트리밍 성능 튜닝 시 고려할 점 (0) | 2022.12.02 |
---|---|
[Spark] 스트리밍 집계, 조인 (0) | 2022.11.29 |
[spark] 캐싱, 영속화, 조인 (0) | 2022.11.23 |
[Spark] 스파크 최적화 및 튜닝 (0) | 2022.11.22 |
pyspark mysql 연동 (feat.docker) (0) | 2022.11.21 |