이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다.
* 저수준 DSL과 고수준 DSL
여기서 저수준의 DSL은 spark 1.x 버전에 있던 RDD를 의미하고 그 이후에 2.x에서는 고수준 DSL을 사용하고 있다. 저수준과 고수준의 차이는 저수준 DSL보다 고수준 DSL이 훨씬 더 표현력이 높으며 이전보다 더 간단하게 사용할 수 있다는 것이다.
#기존 저수준 DSL의 표현방식
RDD=sc.parallelize([("JNAG", 23), ("KIM",25), ("PARK",22), ("AN",22)])
agesRDD= RDD.map(lambda x: (x[0], (x[1], 1)))\
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))\
.map(lambda x: (x[0], x[1][0]/x[1][1]))
#고수준 DSL 표현방식
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
spark =SparkSession.builder.appName("AuthorsAges").getOrCreate()
data_df= spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)], ["name", "age"])
avg_df = data_df.groupBy("name").agg(avg("age"))
avg_df.show()
위의 두 코드를 비교해보면 확실히 기존에 길게 표현해서 사용하던 방식보다는 groupBy 등을 사용해서 표현하니 훨씬 더 간결하고 이해하기 쉽게 사용되는 모습을 볼 수 있다.
*Spark의 데이터 타입
- 기본 데이터 타입
- 정수형 - ByteType, ShortType, IntegerType, LongType
- 실수형 - FloatType, DoubleType
- 문자열 - StringType
- bool - BooleanType
- Decimal - DecimalType
- 정형화 데이터 타입
- BinaryType
- DateType
- TimestampType
- ArrayType
- MapType
- StructType
- StructField
*Spark Schema
스파크에서는 스키마를 정의하는 방법이 2가지 있는데 프로그래밍 스타일, DDL 스타일이 있다.
# 프로그래밍 스타일
from pyspark.sql.types import *
schema = StructType([StructField("title", StringType(), False),\
StructField("price", IntegerType(), False)])
#DDL스타일
schema = "title STRING, price INT"
이런 식으로 2가지의 방법으로 스키마를 만들 수 있는데 나는 개인적으로 조금 더 간편한 DDL 스타일을 더 많이 사용하게 될 것 같다.
*Spark 파일 입출력
#csv파일 불러오기 코드
file_path= "파일 위치 경로"
file_df= spark.read.csv(file_path, header= True, schema=앞서 선언한 스키마)
#parquet로 데이터 저장하기
path="저장하고 싶은 파일 위치"
file_df.write.format("parquet").save(path)
*Spark 필터링
spark에서는 sql과 매우 유사하게 데이터를 필터링할 수 있다. 이제부터 그 예시들을 정리해보려고 한다. 이후 모든 실습 환경은 데이터 브릭스 환경에서 실행시켰다.
#특정 column 필터링 예시 코드
from pyspark.sql.functions import *
new_fire_df= (fire_df
.select("Incident Number", "Available DtTm", "CallType")
.where(col("CallType") != "Medical Incident")
.distinct()
.show(5))
위의 코드를 실행시키면 sql에 있는 기능들로 spark에서 필요한 조건의 데이터를 조회를 시킬 수 있음을 알 수 있다.
#column의 이름 변경
new_fire_df= fire_df.withColumnRenamed("Delay", "ResponseDelayedTime")
칼럼의 이름이 가독성이 떨어지거나 간결하지 못한 경우에는 위의 코드처럼. withColumnRenamed("기존 이름 " , "변경할 이름") 형식으로 변경시킬 수 있다.
#날짜 데이터 월/일/연도 형식으로 변환
from pyspark.sql.functions import *
new_fire_df = (fire_df
.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
.drop("CallDate"))
기존에 있던 날짜 데이터는 to_timestamp()을 사용해서 원하는 형식으로 변환 가능하다. 또한 변환된 데이터는 dayofmonth(), dayofyear()와 같은 함수를 사용하여 조회할 수 있다.
#집계연산 groupBy, orderBy, count 사용예제
from pyspark.sql.functions import *
(fire_df.select("CallType")
.groupBy("CallType")
.count()
.orderBy("count", ascending=False)
.show(5))
여기서도 sql의 기능인 집계 기능들을 똑같이 사용할 수 있는 모습이다. 사용방법도 다른 게 없기 때문에 크게 어렵지 않을 것 같다. 이 예시와 같이 max, avg, min 등의 연산 또한 동일하게 사용할 수 있다.
*데이터 프레임 , 데이터 세트, RDD
데이터 프레임과 데이터 세트 , rdd는 각각 사용해야 할 때가 다를 것이다. 각각이 필요할 때를 정리해보면 다음과 같다
- 데이터 프레임
- 스파크에게 어떻게 하는지가 아닌 무엇을 해야 하는지 말하고 싶을 때 사용
- 풍부한 표현과 높은 추상화 및 DSL 연산이 필요하다면 사용
- 높은 수준의 연산이 필요하다면 사용
- SQL과 유사한 질의를 사용하고 싶다면 데이터 프레임을 사용
- 코드 최적화, API 단순화 등을 사용하고 싶다면 데이터 프레임 사용
- R 사용자는 데이터 프레임을 사용
- 파이썬 사용자면 데이터 프레임을 사용 but 제어권을 조금 더 가지고 싶다면 RDD 사용
- 공간/속도 효율성이 필요하다면 사용
- 데이터 세트
- 스파크에게 어떻게 하는지가 아닌 무엇을 해야하는지 말하고 싶을때 사용
- 풍부한 표현과 높은 추상화 및 DSL 연산이 필요하다면 사용
- 컴파일 시 엄격한 체크를 원하고 여러 개의 케이스 클래스를 만드는 것이 부담이 없다면 데이터 세트 사용
- 높은 수준의 연산이 필요하다면 사용
- 인코딩을 사용하여 이득을 보고 싶다면 데이터 세트 사용
- RDD
- RDD를 사용하도록 된 서드파티 패키지를 사용한다면 사용
- 데이터 프레임이나 데이터 세트에서 얻을 수 있는 코드 최적화, 퍼포먼스 이득 등을 포기할 수 있다면 사용
- 스파크가 어떻게 질의를 수행할지 정확하게 지정해주고 싶다면 사용
'프로그래밍 > Spark' 카테고리의 다른 글
[spark] 캐싱, 영속화, 조인 (0) | 2022.11.23 |
---|---|
[Spark] 스파크 최적화 및 튜닝 (0) | 2022.11.22 |
pyspark mysql 연동 (feat.docker) (0) | 2022.11.21 |
[Spark] sql + 데이터프레임 읽고 쓰기 (0) | 2022.11.17 |
Spark 기본 개념 및 용어정리 (0) | 2022.11.14 |