이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다.
*데이터 .cache(캐싱)과 .persist(영속화)
데이터 캐싱과 영속화는 거의 유사하지만 영속화는 조금 더 세밀한 설정을 하여 더 나은 성능을 보여준다.
# cache 예제
from pyspark.sql.functions import col
df=spark.range(1*100000).toDF("id").withColumn("square",col("id"))
df.cache()
df.count()
#persist 예제
from pyspark.sql.functions import col
df=spark.range(1*100000).toDF("id").withColumn("square",col("id"))
df.persist(StorageLevel.DISK_ONLY)## 데이터를 직렬화해서 디스크에 저장
df.count()
예제를 보면 캐싱, 영속화를 호출한 뒤 .count()를 호출해줬는데 이렇게 해야 완전하게 캐싱, 영속화가 적용된다.
또한 영속화 같은 경우에는 함수안에 들어가는 StorageLevel이 있는데 이는 다음 표와 같다.
StorageLevel | 설명 |
MEMORY_ONLY | 데이터가 바로 객체 형태로 메모리에 저장 |
MEMORY_ONLY_SER | 데이터가 직렬화되어 용량이 최소화된 바이트 배열 형태로 메모리에 저장 but 사용시 역질렬화를 위한 비용 소모 |
MEMORY_AND_DISK | 데이터가 바로 메모리에 객체 형태로 저장되지만 부족한 경우 직렬화되어 디스크에 저장 |
DISK_ONLY | 데이터가 직렬화되어 디스크에 저장 |
OFF_HEAP | 데이터가 OFF_HEAP 메모리에 저장된다. |
MEMORY_AND_DISK_SER | MENORY_AND_DISK와 비슷하지만 메모리에 저장되는 데이터가 직렬화된다. |
*캐시, 영속화를 사용해야 하는 경우
- 반복적인 머신러닝 학습을 하기 위해 접근해야 하는 데이터 프레임
- ETL, 데이터 파이프라인 구축 시 빈도 높은 트랜스포메이션 연산으로 자주 접근 해야하는 데이터 프레임들
*캐시, 영속화를 사용하면 안되는 경우
- 메모리에 들어가는 데이터 프레임이 너무 큰 경우
- 크기에 상관 없이 자주 쓰지 않는 데이터 프레임에 대해 비용이 크지 않은 트랜스포메이션 수행
*Spark 조인 (BHJ, SMJ)
*브로드캐스트 해시 조인(BHJ)
- 한쪽은 작고 다른 한 쪽은 큰 경우 특정 조건이나 칼럼을 기준으로 조인
- 기본적으로 작은 쪽의 데이터가 10MB이하일 때 브로드캐스트 조인을 사용. 이 설정 값은 spark.sql.autoBroadcastJoinThreshold에 저장. 메모리 크기에 대해 자신감이 있으면 10MB 이상도 도전 가능
- 어떤 셔플도 일어나지 않기 때문에 가장 쉽고 빠른 조인 형태이다.
*브로드캐스트 해시 조인 사용 상황
- 양쪽 데이터세트의 각 키가 스파크에서 동일한 파티션 안에 해시될 때
- 한 데이터가 다른쪽 데이터보다 많이 작은 경우
- 정렬되지 않은 키들로 Equal join 할 때
- 더 작은 쪽의 데이터가 브로드캐스트될 때 발생하는 과도한 네트워크 대역폭이나 OOM 오류에 대한 걱정이 없을 때
*셔플 소트 머지 조인(SMJ)
- 큰 두 종류의 데이터세트를 합칠 수 있는 효과적인 방법.
- 소트 머지라는 이름과 같이 2단계로 정렬 후 병합하는 수행을 한다.
- spark.sql.join.preferSortMergeJoin 설정에 의해 활성화 된다.
*셔플 소트 머지 조인 최적화 방법
- 공통의 정렬된 키나 칼럼을 위한 파티션된 버킷을 만들면 변환 단계를 지울수 있어서 성능을 올릴수 있다
#예시코드
from pyspark.sql.functions import col, asc
first_df=spark.range(1*100000).toDF("id").withColumn("square",col("id"))
second_df=spark.range(1*100000).toDF("sec_id").withColumn("square",col("sec_id"))
first_df.orderBy(asc("id"))\
.write.format("parquet")\
.bucketBy(8, "id")\
.mode("overwrite")\
.saveAsTable("first_tbl")
second_df.orderBy(asc("sec_id"))\
.write.format("parquet")\
.bucketBy(8, "sec_id")\
.mode("overwrite")\
.saveAsTable("second_tbl")
spark.sql("CACHE TABLE first_tbl")
spark.sql("CACHE TABLE second_tbl")
new_first_df= spark.table("first_tbl")
new_second_df= spark.table("second_tbl")
join_df= new_first_df.join(new_second_df, col("id")==col("sec_id"))
책을 참고해서 예시를 만들어봤다. sort merge join의 최적화 방법은 이 예시처럼 먼저 정렬된 데이터로 버킷을 만들어 저장한 후 다시 데이터를 가져와서 join 시킴으로써 정렬작업을 다시 안해도 되기 때문에 효율적으로 작동한다
*셔플 소트 머지 조인을 사용하는 경우
- 두 큰 데이터세트가 정렬 및 해시되어 스파크에서 동일 파티션에 위치할 수 있을 때
- 동일 조건 조인만을 사용하여 두 세트를 조합하기를 원할 때
- 네트워크 간 큰 셔플을 일으키는 Exchange와 sort 연산을 피하고 싶을 때
'프로그래밍 > Spark' 카테고리의 다른 글
[Spark] 스트리밍 집계, 조인 (0) | 2022.11.29 |
---|---|
[Spark] Spark 스트리밍 쿼리 (0) | 2022.11.28 |
[Spark] 스파크 최적화 및 튜닝 (0) | 2022.11.22 |
pyspark mysql 연동 (feat.docker) (0) | 2022.11.21 |
[Spark] sql + 데이터프레임 읽고 쓰기 (0) | 2022.11.17 |