프로그래밍/Spark

[spark] 캐싱, 영속화, 조인

장경훈 2022. 11. 23. 16:45
이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다.

 

*데이터 .cache(캐싱)과 .persist(영속화)

데이터 캐싱과 영속화는 거의 유사하지만 영속화는 조금 더 세밀한 설정을 하여 더 나은 성능을 보여준다. 

# cache 예제
from pyspark.sql.functions import col
df=spark.range(1*100000).toDF("id").withColumn("square",col("id"))
df.cache()
df.count()

#persist 예제
from pyspark.sql.functions import col
df=spark.range(1*100000).toDF("id").withColumn("square",col("id"))
df.persist(StorageLevel.DISK_ONLY)## 데이터를 직렬화해서 디스크에 저장
df.count()

예제를 보면 캐싱, 영속화를 호출한 뒤 .count()를 호출해줬는데 이렇게 해야 완전하게 캐싱, 영속화가 적용된다.

또한 영속화 같은 경우에는 함수안에 들어가는 StorageLevel이 있는데 이는 다음 표와 같다.

StorageLevel 설명
MEMORY_ONLY 데이터가 바로 객체 형태로 메모리에 저장
MEMORY_ONLY_SER 데이터가 직렬화되어 용량이 최소화된 바이트 배열 형태로 메모리에 저장 but 사용시 역질렬화를 위한 비용 소모
MEMORY_AND_DISK 데이터가 바로 메모리에 객체 형태로 저장되지만 부족한 경우 직렬화되어 디스크에 저장
DISK_ONLY 데이터가 직렬화되어 디스크에 저장
OFF_HEAP 데이터가 OFF_HEAP 메모리에 저장된다.
MEMORY_AND_DISK_SER MENORY_AND_DISK와 비슷하지만 메모리에 저장되는 데이터가 직렬화된다.

 

*캐시, 영속화를 사용해야 하는 경우

  • 반복적인 머신러닝 학습을 하기 위해 접근해야 하는 데이터 프레임
  • ETL, 데이터 파이프라인 구축 시 빈도 높은 트랜스포메이션 연산으로 자주 접근 해야하는 데이터 프레임들

 

*캐시, 영속화를 사용하면 안되는 경우

  • 메모리에 들어가는 데이터 프레임이 너무 큰 경우
  • 크기에 상관 없이 자주 쓰지 않는 데이터 프레임에 대해 비용이 크지 않은 트랜스포메이션 수행

*Spark 조인 (BHJ, SMJ)

 

 

*브로드캐스트 해시 조인(BHJ) 

  • 한쪽은 작고 다른 한 쪽은 큰 경우 특정 조건이나 칼럼을 기준으로 조인
  • 기본적으로 작은 쪽의 데이터가 10MB이하일 때 브로드캐스트 조인을 사용. 이 설정 값은 spark.sql.autoBroadcastJoinThreshold에 저장. 메모리 크기에 대해 자신감이 있으면 10MB 이상도 도전 가능
  • 어떤 셔플도 일어나지 않기 때문에 가장 쉽고 빠른 조인 형태이다.

 

*브로드캐스트 해시 조인 사용 상황

  • 양쪽 데이터세트의 각 키가 스파크에서 동일한 파티션 안에 해시될 때
  • 한 데이터가 다른쪽 데이터보다 많이 작은 경우
  • 정렬되지 않은 키들로 Equal join 할 때
  • 더 작은 쪽의 데이터가 브로드캐스트될 때 발생하는 과도한 네트워크 대역폭이나 OOM 오류에 대한 걱정이 없을 때

 

*셔플 소트 머지 조인(SMJ)

  • 큰 두 종류의 데이터세트를 합칠 수 있는 효과적인 방법. 
  • 소트 머지라는 이름과 같이 2단계로 정렬 후 병합하는 수행을 한다.
  • spark.sql.join.preferSortMergeJoin 설정에 의해 활성화 된다.

 

*셔플 소트 머지 조인 최적화 방법

  • 공통의 정렬된 키나 칼럼을 위한 파티션된 버킷을 만들면 변환 단계를 지울수 있어서 성능을 올릴수 있다
#예시코드
from pyspark.sql.functions import col, asc
first_df=spark.range(1*100000).toDF("id").withColumn("square",col("id"))
second_df=spark.range(1*100000).toDF("sec_id").withColumn("square",col("sec_id"))

first_df.orderBy(asc("id"))\
    .write.format("parquet")\
    .bucketBy(8, "id")\
    .mode("overwrite")\
    .saveAsTable("first_tbl")
    
second_df.orderBy(asc("sec_id"))\
    .write.format("parquet")\
    .bucketBy(8, "sec_id")\
    .mode("overwrite")\
    .saveAsTable("second_tbl")

spark.sql("CACHE TABLE first_tbl")
spark.sql("CACHE TABLE second_tbl")

new_first_df= spark.table("first_tbl")
new_second_df= spark.table("second_tbl")

join_df= new_first_df.join(new_second_df, col("id")==col("sec_id"))

책을 참고해서 예시를 만들어봤다. sort merge join의 최적화 방법은 이 예시처럼 먼저 정렬된 데이터로 버킷을 만들어 저장한 후 다시 데이터를 가져와서 join 시킴으로써 정렬작업을 다시 안해도 되기 때문에 효율적으로 작동한다

 

*셔플 소트 머지 조인을 사용하는 경우

  • 두 큰 데이터세트가 정렬 및 해시되어 스파크에서 동일 파티션에 위치할 수 있을 때
  • 동일 조건 조인만을 사용하여 두 세트를 조합하기를 원할 때
  • 네트워크 간 큰 셔플을 일으키는 Exchange와 sort 연산을 피하고 싶을 때