프로그래밍/Spark

[Spark] 델타 레이크 구축

장경훈 2022. 12. 6. 14:47

 

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다.

*델타 레이크란 ?

리눅스 파운데이션에서 호스팅하는 오픈소스 프로젝트이며. 다른 것들과 유사하게 트랜잭션 보증을 제공하고 스키마 시행 및 발전을 가능하게 하는 오픈 데이터 저장 형식이다.

 

*델타 레이크가 지원하는 것

  • 정형화 스트리밍 소스 및 싱크를 사용하여 테이블 읽기 및 쓰기
  • 이전 버전으로 롤백 가능
  • 스키마를 명시적으로 변경하거나 데이터 프레임의 쓰기 중에도 임시적으로 스키마를 병합하여 변경가능
  • 여러 동시 작성기 간의 직렬화 가능한 격리

 

*Spark로 델타 레이크 구성 내용들 

 

1.대화형 쉘을 사용하여 연결 

pyspark --packages io.delta:delta-core_2.12:0.7.0

이와 같이 터미넬에서 실행시켜 주면 된다. 여기서 주의해야 할 점은 스파크 2.4를 실행해주는 경우에는 0.7.0이 아닌 0.6.0을 사용하여야 한다.

 

2. 델타 레이크 테이블에 데이터 로드

sourcePath="/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
deltaPath="/tmp/loans_delta"
(spark.read.format("parquet").load(sourcePath).write.format("delta").save(deltaPath))
spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")

#저장된 데이터 출력 
spark.sql("SELECT count(*) FROM loans_delta").show()

간단하게 기존에 있던 format만 "delta"로 변경하여 사용하면 된다.

 

3. 델타 레이크 테이블에 데이터 스트림 로드

stream_df=
checkpoint_dir=
streaming=(stream_df.writeStream
           .format("delta")
           .option("checkpointLocation", checkpoint_dir)
           .trigger(processingTime= "10 seconds")
           .start(deltaPath))

 

기존의 스트리밍하는 것과 형식은 거의 동일하다고 보면 된다. format부분만 변경해서 실행했다. 이 델타 레이크 형식은 다른 기존 형식에 비해 추가적인 이점이 있다.

  • 배치 처리 및 스트리밍 작업 시 모두 동일한 테이블에 쓰기 가능
  • 여러 스트리밍 작업에서 동일한 테이블에 데이터 추가 가능
  • 동시 쓰기에서도 ACID 보장을 제공

 

4.델타 레이크 테이블에 새로운 열 추가방법

test.write.format("delta").mode("append")
	.option("mergeSchema", "true")
	.save(deltaPath)

위와 같이 옵션에서 mergeSchema를 true로 설정해주면 가능하다

 

5. 델타 레이크 또한 SQL과 동일하게 DML 작업을 사용할 수 있다. 

간단한 예시로 merge에 대한 예제를 보면 다음과 같다.

(deltaTb
	.alias("t")
	.merge(loan.alias("s"), "t.load_id=s.load_id")
    .whenMatched.updateAll()
    .whenNotMatchedInsertAll()
    .execute())

 

6.작업 내용 확인

델타 레이크 테이블의 모든 변경사항은 트랜젝션 로그에 남는다. 이것을 확인 하는 방법이다.

(deltaTable
    .history(3)
    .select("version", "timestamp", "operation", "operationParameters")
    .show(truncate=False))

 

7. 테이블의 이전 스냅샷 쿼리 방법

DataFrameReader 옵션에서 "versionAsOf", "timestampAsOf"를 사용하여 이전 버전 스냅샷을 쿼리 할 수 있다.

(spark.read
    .format("delta")
    .option("timestampAsOf", "2022-01-01")
	.load(deltaPath))
    
(spark.read
    .format("delta")
    .option("versionAsof","4")
    .load(deltaPath))

이 기능을 사용할 때

  • 특정 테이블 버전에서 작업을 다시 실행하고 싶을때
  • 감사를 위해 서로 다른 버전 간 데이터 비교
  • 잘못된 변경사항 롤백시 사용