이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다.
*델타 레이크란 ?
리눅스 파운데이션에서 호스팅하는 오픈소스 프로젝트이며. 다른 것들과 유사하게 트랜잭션 보증을 제공하고 스키마 시행 및 발전을 가능하게 하는 오픈 데이터 저장 형식이다.
*델타 레이크가 지원하는 것
- 정형화 스트리밍 소스 및 싱크를 사용하여 테이블 읽기 및 쓰기
- 이전 버전으로 롤백 가능
- 스키마를 명시적으로 변경하거나 데이터 프레임의 쓰기 중에도 임시적으로 스키마를 병합하여 변경가능
- 여러 동시 작성기 간의 직렬화 가능한 격리
*Spark로 델타 레이크 구성 내용들
1.대화형 쉘을 사용하여 연결
pyspark --packages io.delta:delta-core_2.12:0.7.0
이와 같이 터미넬에서 실행시켜 주면 된다. 여기서 주의해야 할 점은 스파크 2.4를 실행해주는 경우에는 0.7.0이 아닌 0.6.0을 사용하여야 한다.
2. 델타 레이크 테이블에 데이터 로드
sourcePath="/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
deltaPath="/tmp/loans_delta"
(spark.read.format("parquet").load(sourcePath).write.format("delta").save(deltaPath))
spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")
#저장된 데이터 출력
spark.sql("SELECT count(*) FROM loans_delta").show()
간단하게 기존에 있던 format만 "delta"로 변경하여 사용하면 된다.
3. 델타 레이크 테이블에 데이터 스트림 로드
stream_df=
checkpoint_dir=
streaming=(stream_df.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_dir)
.trigger(processingTime= "10 seconds")
.start(deltaPath))
기존의 스트리밍하는 것과 형식은 거의 동일하다고 보면 된다. format부분만 변경해서 실행했다. 이 델타 레이크 형식은 다른 기존 형식에 비해 추가적인 이점이 있다.
- 배치 처리 및 스트리밍 작업 시 모두 동일한 테이블에 쓰기 가능
- 여러 스트리밍 작업에서 동일한 테이블에 데이터 추가 가능
- 동시 쓰기에서도 ACID 보장을 제공
4.델타 레이크 테이블에 새로운 열 추가방법
test.write.format("delta").mode("append")
.option("mergeSchema", "true")
.save(deltaPath)
위와 같이 옵션에서 mergeSchema를 true로 설정해주면 가능하다
5. 델타 레이크 또한 SQL과 동일하게 DML 작업을 사용할 수 있다.
간단한 예시로 merge에 대한 예제를 보면 다음과 같다.
(deltaTb
.alias("t")
.merge(loan.alias("s"), "t.load_id=s.load_id")
.whenMatched.updateAll()
.whenNotMatchedInsertAll()
.execute())
6.작업 내용 확인
델타 레이크 테이블의 모든 변경사항은 트랜젝션 로그에 남는다. 이것을 확인 하는 방법이다.
(deltaTable
.history(3)
.select("version", "timestamp", "operation", "operationParameters")
.show(truncate=False))
7. 테이블의 이전 스냅샷 쿼리 방법
DataFrameReader 옵션에서 "versionAsOf", "timestampAsOf"를 사용하여 이전 버전 스냅샷을 쿼리 할 수 있다.
(spark.read
.format("delta")
.option("timestampAsOf", "2022-01-01")
.load(deltaPath))
(spark.read
.format("delta")
.option("versionAsof","4")
.load(deltaPath))
이 기능을 사용할 때
- 특정 테이블 버전에서 작업을 다시 실행하고 싶을때
- 감사를 위해 서로 다른 버전 간 데이터 비교
- 잘못된 변경사항 롤백시 사용
'프로그래밍 > Spark' 카테고리의 다른 글
[Spark] K8s로 spark cluster 구축하기 (0) | 2024.06.05 |
---|---|
[Spark] MLlib 정리 (0) | 2022.12.07 |
[Spark] 데이터 베이스, 데이터 레이크 , 레이크 하우스 (0) | 2022.12.03 |
[Spark] 스트리밍 성능 튜닝 시 고려할 점 (0) | 2022.12.02 |
[Spark] 스트리밍 집계, 조인 (0) | 2022.11.29 |