프로그래밍/Spark 14

[Spark] Cluster mode vs Client mode

Spark Driver란?애플리케이션 실행의 핵심 구성 요소로, Spark 애플리케이션의 전체 실행을 지휘하고 제어하는 역할을 한다. Spark Driver는 클러스터 내 작업의 분배와 실행 상태를 관리하며, 사용자로부터 애플리케이션 로직을 전달받아 클러스터와의 상호작용을 수행한다.Client Mode(default)driver가 클러스터 외부에 있는 형태Application Master는 node manager에게 자원 요청만을 하고 Spark에서 사용중인 리소스를 표시주로 개발 환경에서 디버깅 할 때 의미가 있는 형태장점: 로컬 머신에서 실행되기 때문에 디버깅과 실시간 로그 모니터링이 용이하다.단점: 클러스터의 자원을 효율적으로 사용하지 못할 수 있다.Cluster Modedriver가 클러스터 내부..

[Spark] K8s로 spark cluster 구축하기

관련 깃허브https://github.com/bitnami/charts/tree/main/bitnami/spark charts/bitnami/spark at main · bitnami/chartsBitnami Helm Charts. Contribute to bitnami/charts development by creating an account on GitHub.github.com1.  Helm 다운로드 helm install my-spark-release oci://registry-1.docker.io/bitnamicharts/spark 2.  상태 확인helm status my-spark-release 3. 웹 ui 확인# localhost:80로 ui 포트포워딩 kubectl port-forward..

[Spark] MLlib 정리

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다. *Spark MLlib의 장점 데이터를 시스템에 맞게 다운샘플링 하지 않고 데이터 준비 및 모델 구축을 하나의 에코시스템으로 구축 가능 선형으로 확장되는 O(n)으로 방대한 양의 데이터를 확장할 수 있다. *MLlib 파이프라인 구성 #파이프라인에 적용할 샘플 데이터 8대2로 분리 filePath="""/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet""" airbnbDF=spark.read.parquet(filePath) airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms..

[Spark] 델타 레이크 구축

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다. *델타 레이크란 ? 리눅스 파운데이션에서 호스팅하는 오픈소스 프로젝트이며. 다른 것들과 유사하게 트랜잭션 보증을 제공하고 스키마 시행 및 발전을 가능하게 하는 오픈 데이터 저장 형식이다. *델타 레이크가 지원하는 것 정형화 스트리밍 소스 및 싱크를 사용하여 테이블 읽기 및 쓰기 이전 버전으로 롤백 가능 스키마를 명시적으로 변경하거나 데이터 프레임의 쓰기 중에도 임시적으로 스키마를 병합하여 변경가능 여러 동시 작성기 간의 직렬화 가능한 격리 *Spark로 델타 레이크 구성 내용들 1.대화형 쉘을 사용하여 연결 pyspark --packages io.delta:delta-core_2.12:0.7.0 이와 같이 터미넬에서..

[Spark] 데이터 베이스, 데이터 레이크 , 레이크 하우스

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다. *스토리지 솔루션 기본적으로 데이터 파이프라인을 구축할 때에는 스토리지 솔루션의 선택에 따라서 파이프라인의 견고함과 성능을 결정한다. 이런 스토리지 솔루션은 데이터베이스에서 데이터 레이크, 더 나아가서 데이터 레이크하우스로 넘어가게 되었다. *최적의 스토리지 솔루션에 필요한 속성 확장및 성능 트랜젝션 지원 다양한 데이터 형식 지원 다양한 워크로드 지원 개방성 *데이터베이스 *데이터 베이스 특징 데이터베이스는 구조화된 테이블을 SQL쿼리를 사용하여 읽을 수 있으며 데이터 저장 및 처리를 모두 최적화 가능 온라인 트랜잭션 처리 워크로드 ->간단한 쿼리로 높은 동시성, 짧은 지연 시간이 특징 온라인 분석 처리 -> OLA..

[Spark] 스트리밍 성능 튜닝 시 고려할 점

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다. 기본적으로 정형화 스트리밍을 할때 성능튜닝은 기존 포스팅에 썼던 방법을 사용해도 되지만 몇가지 고려해야 할 점들이 있다. *고려할 점 1. 클러스터의 자원 배치 스트리밍 쿼리를 실행하는 클러스터들은 24/7로 돌아가기 때문에 자원을 적절하게 배치해야 한다 스트리밍 쿼리의 특성에 맞춰 자원 할당 해야한다. 무상태 쿼리(코어 필요), 상태 정보 유지 쿼리(메모리 필요) 2. 셔플을 위한 파티션 숫자 스트리밍의 경우 셔플 숫자가 정형화에 비해 적지만 너무 잘개 쪼개면 오버헤드가 발생. 상태 정보 유지 연산에 의한 셔플은 훨씬 큰 오버헤드를 발생. 기본 셔플 파티션인 200을 코어 숫자 대비 두세 배 많은 정도로 수정 3...

[Spark] 스트리밍 집계, 조인

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다. * 스트리밍 집계 방법 스트리밍의 집계 방법으로는 크게 시간과 연관이 있거나 없거나로 2가지의 집계가 있다. 1. 전체 집계(시간과 상관 없음) 이름 그대로 스트림으로 생기는 데이터를 모두 집계하는 방법이다. stream_count=sensor.groupBy().count() 이 예시는 sensor라는 스트리밍 데이터 프레임의 집계를 한 예시이다. *여기서 주의해야 하는 점은 정적 데이터 같은 경우에는 .count()를 바로 사용해도 되지만 스트리밍 데이터는 집계 결과를 지속적으로 업데이트 해야 하므로 .groupBy() or .groupByKey()와 같은 함수와 같이 사용 해야한다. 2. 그룹화 집계(시간과 상관..

[Spark] Spark 스트리밍 쿼리

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다. *스트리밍 쿼리 처리 단계 1. 입력 소스 지정 스트리밍 처리도 배치 처리와 동일하게 데이터 프레임을 지정해야 한다. 배치에서는 spark.read를 사용 했지만 스트리밍에서는 spark.readStream을 사용해주면 된다. #파이썬 예제 stream_df=(spark .readStream.format("socket") .option("host" ,"localhost") .option("port", 9999) .load()) 2. 데이터 변형 데이터 변형 작업은 말 그대로 입력받은 데이터를 어떻게 처리할지를 프로그래밍하는 단계이다. #예시 from pyspark.sql.functions import * words=..

[spark] 캐싱, 영속화, 조인

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다. *데이터 .cache(캐싱)과 .persist(영속화) 데이터 캐싱과 영속화는 거의 유사하지만 영속화는 조금 더 세밀한 설정을 하여 더 나은 성능을 보여준다. # cache 예제 from pyspark.sql.functions import col df=spark.range(1*100000).toDF("id").withColumn("square",col("id")) df.cache() df.count() #persist 예제 from pyspark.sql.functions import col df=spark.range(1*100000).toDF("id").withColumn("square",col("id")) df.p..

[Spark] 스파크 최적화 및 튜닝

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다. *Spark 설정 세팅 방법 1. 파일 수정 방법 우선 기본적으로 설정파일은 spark가 다운되어 있는 폴더의 conf안에 .template형식으로 저장되어 있다. 이 파일을 수정하고 .template부분을 지우고 저장하면 spark가 새로운 값을 인식한다. 2. 프로그래밍 방법 >>>spark.conf.isModifiable("spark.sql.shuffle.partitison") #수정가능한 설정필드인지 확인 >>>spark.conf.get("spark.sql.shuffle.partitions") #현재필드 값 확인 >>>spark.conf.set("spark.sql.shuffle.partitions", 5) #..