이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다.
* 스트리밍 집계 방법
스트리밍의 집계 방법으로는 크게 시간과 연관이 있거나 없거나로 2가지의 집계가 있다.
1. 전체 집계(시간과 상관 없음)
이름 그대로 스트림으로 생기는 데이터를 모두 집계하는 방법이다.
stream_count=sensor.groupBy().count()
이 예시는 sensor라는 스트리밍 데이터 프레임의 집계를 한 예시이다.
*여기서 주의해야 하는 점은 정적 데이터 같은 경우에는 .count()를 바로 사용해도 되지만 스트리밍 데이터는 집계 결과를 지속적으로 업데이트 해야 하므로 .groupBy() or .groupByKey()와 같은 함수와 같이 사용 해야한다.
2. 그룹화 집계(시간과 상관 없음)
앞서 전체를 집계 했다면 이번에는 키 별로 집게하는 방법이다.
sum_value = sensor.groupBy("sensor_id").sum("value")
위의 예시처럼 사용하면 된다. 이 집계를 제외하고 다양한 집계들도 있다. ex) mean, stddev, countDistinct 등
3. 이벤트 타임 집계 (시간과 관련된 집계)
스트리밍을 실행중에 특정 시간 간격에 따른 집계를 보고 싶을때 사용한다.
from pyspark.sql.functions import (*
(sensor.groupBy("sensor_id", windows("eventTime", "10 minute"))
.count())
이 예제는 10분 단위의 결과를 동적으로 그룹화 하여 집계하는 방법이다.
4.워터마크
지연된 데이터 값을 얼마나 보관할 지 정해주는 방법. 즉 늦게 도착하는 데이터를 엔진이 얼마나 기다릴 수 있는지를 정해준다.
(sensor
.withWatermark("eventTime", "10 minutes")
.groupBy("sensor_id", window("eventTime", "10 minutes", "5 minutes"))
.mean("value"))
이 예제에서는 워터마크를 10분으로 지정했다. 즉 10분 이상 늦은 데이터는 무시되고 상태 정보에서 제거 된다. 여기서 코드를 작성할때 주의점은 groupBy()를 호출하기 전에 withWatermark()를 호출 해야하고 시간 간격을 위한 컬럼을 동일하게 사용해야 한다. ex)이 예시에서는 eventTime을 동일하게 사용했다.
사용시 고려할 점: 워터마크는 설정된 시간 이내로 지연된 데이터는 절대 버리지 않지만 그렇다고 해서 10분 이상의 데이터가 항상 삭제되지는 않는다. 따라서 이런 데이터가 집계 될지 안될지는 정확한 레코트 도착 타이밍과 배치 처리 시작 순간에 달려있다.
5. 지원되는 출력모드
- 갱신 모드 - 집계가 갱신된 부분의 열만 출력, 모든타입에 사용 가능, 스트리밍 집계를 하면서 쿼리 실행 시 가장 유용하고 효과적이다. 하지만 이 모드는 파케이나 ORC와 같은 파일 기반 포맷 같은 추가 전용 스트리밍 집계를 출력할 때는 사용 불가능.
- 전체 모드 -모든 갱신된 집계를 변화에 상관없이 출력한다. 모든 타입에 사용 가능 하지만 윈도우 집계에서는 정보가 초기화 되지 않는다. 상태 정보 크기와 메모리가 무한하게 증가할 여지가 있으므로 윈도우 집계에서는 주의깊게 사용해야 한다.
- 추가 모드 - 워터마크를 쓰는 이벤트 타임 윈도우 집계에서만 사용가능. 워터마크가 집계를 더 이상 갱신하지 않는 것이 확실한 시점에 각 키와 최종 집계값을 출력. 이 모드의 장점은 추가 전용 스트리밍 싱크에 집계 내용을 사용 가능 단점은 워터마크 시간만큼 출력도 늦어지게 된다.
*스트리밍 조인
*스트림-정적 데이터 조인
대부분의 사용 패턴은 정적 데이터에 데이터 스트림을 조인하는 경우이다.
#정적 데이터 프레임 생성
static_df= spark.read ...
#스트림 데이터 프레임 생성
stream_df = spark.readStream ...
#두 데이터 프레임 조인 공통 칼럼 ID를 예로 들었을 때 내부 조인
join_df = stream_df.join(static_df, "ID")
#외부조인 방법 왼쪽이 스트림 데이터인 경우
join_df = stream_df.join(static_df, "ID", "leftOuter")
*스트림-정적 조인에서 알아야할 사항
- 무상태 연산이므로 어떤 종류의 워터마킹도 필요 없다.
- 처리속도를 올리고 싶다면 캐시를 해야 한다.
- 정적 데이터 프레임이 정의된 데이터가 변경되는 경우, 그 변경이 스트리밍에서 보일지는 지정된 정책에 달려있다.
*스트림-스트림 데이터 조인
스트림-스트림 데이터 사이에서 지속적인 조인은 문제점이 있을 수 있다. 왜냐하면 서로 데이터가 얼마나 지연될지도 모르며 항상 완전한 상태라고 보장 할 수 없기 때문이다.
*워터마킹을 사용한 내부 조인
- 구현 방식 : 양쪽 스트림에 워터마크 지연을 정의한 후 두 입력간 이벤트 타임을 조정하여 한쪽의 오래된 기록이 다른쪽에서 언제 필요 없어질지 알 수 있게 한다.
first_stream_Watermark = (first_stream
.selectExpr("id AS first_id", "first_streamTime")
.withWatermark("frist_streamTime", "2 hours"))
second_stream_Watermark = (second_stream
.selectExpr("id AS second_id", "second_streamTime")
.withWatermark("second_streamTime", "3 hours"))
(first_stream_Watermark.join(second_stream_Watermark,
expr("""
second_id = first_id AND
second_streamTime BETWEEN first_streamTIME AND
first_streamTime + interval 1 hour""")))
- 내부 조인시 기억해야할 key point
- 내부 조인에서 워터마크를 지정하는 것은 선택 사항, 하지만 설정하지 않으면 위험을 감수해야함
- 워터마크 지연은 지정된 시간까지는 보장되지만 지정된 시간 이후 부터는 보장할 수 없다.
*워터마킹을 사용한 외부 조인
외부 조인은 앞선 내부 조인에서 outer옵션만 추가 해주면 된다.
(first_stream_Watermark.join(second_stream_Watermark,
expr("""
second_id = first_id AND
second_streamTime BETWEEN first_streamTIME AND
first_streamTime + interval 1 hour"""),
"leftOuter"))
- 외부 조인시 기억해야할 key point
- 외부 조인에서는 워터마크 지연이나 이벤트 타임 제한이 필수이다. NULL을 출력하기 위해서는 어느 시점부터 매칭이 되지 않았는지를 알아야 하기 때문이다.
- NULL 결과 출력을 위해 매치되는 정보가 확실히 없다는 것을 알기 위해 대기 해야하며 이 지연은 최대 버퍼링 시간이다.
'프로그래밍 > Spark' 카테고리의 다른 글
[Spark] 데이터 베이스, 데이터 레이크 , 레이크 하우스 (0) | 2022.12.03 |
---|---|
[Spark] 스트리밍 성능 튜닝 시 고려할 점 (0) | 2022.12.02 |
[Spark] Spark 스트리밍 쿼리 (0) | 2022.11.28 |
[spark] 캐싱, 영속화, 조인 (0) | 2022.11.23 |
[Spark] 스파크 최적화 및 튜닝 (0) | 2022.11.22 |