프로그래밍/Spark

Spark 정형화 api

장경훈 2022. 11. 15. 13:42
이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다.

 

* 저수준 DSL과 고수준 DSL

여기서 저수준의 DSL은 spark 1.x 버전에 있던 RDD를 의미하고 그 이후에 2.x에서는 고수준 DSL을 사용하고 있다. 저수준과 고수준의 차이는 저수준 DSL보다 고수준 DSL이 훨씬 더 표현력이 높으며 이전보다 더 간단하게 사용할 수 있다는 것이다. 

#기존 저수준 DSL의 표현방식

RDD=sc.parallelize([("JNAG", 23), ("KIM",25), ("PARK",22), ("AN",22)])
agesRDD= RDD.map(lambda x: (x[0], (x[1], 1)))\
            .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))\
            .map(lambda x: (x[0], x[1][0]/x[1][1]))
#고수준 DSL 표현방식
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

spark =SparkSession.builder.appName("AuthorsAges").getOrCreate()

data_df= spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)], ["name", "age"])
avg_df = data_df.groupBy("name").agg(avg("age"))
avg_df.show()

위의 두 코드를 비교해보면 확실히 기존에 길게 표현해서 사용하던 방식보다는 groupBy 등을 사용해서 표현하니 훨씬 더 간결하고 이해하기 쉽게 사용되는 모습을 볼 수 있다.

 

*Spark의 데이터 타입

 

  • 기본 데이터 타입
    • 정수형 - ByteType, ShortType, IntegerType, LongType
    • 실수형 - FloatType, DoubleType
    • 문자열 - StringType
    • bool - BooleanType
    • Decimal - DecimalType
  • 정형화 데이터 타입
    • BinaryType 
    • DateType
    • TimestampType
    • ArrayType
    • MapType
    • StructType
    • StructField

 

*Spark Schema

스파크에서는 스키마를 정의하는 방법이 2가지 있는데 프로그래밍 스타일, DDL 스타일이 있다.

# 프로그래밍 스타일
from pyspark.sql.types import *
schema = StructType([StructField("title", StringType(), False),\
                     StructField("price", IntegerType(), False)])

 

#DDL스타일
schema = "title STRING, price INT"

이런 식으로 2가지의 방법으로 스키마를 만들 수 있는데 나는 개인적으로 조금 더 간편한 DDL 스타일을 더 많이 사용하게 될 것 같다.

*Spark 파일 입출력

#csv파일 불러오기 코드
file_path= "파일 위치 경로"
file_df= spark.read.csv(file_path, header= True, schema=앞서 선언한 스키마)

#parquet로 데이터 저장하기
path="저장하고 싶은 파일 위치"
file_df.write.format("parquet").save(path)

 

*Spark 필터링

spark에서는 sql과 매우 유사하게 데이터를 필터링할 수 있다. 이제부터 그 예시들을 정리해보려고 한다. 이후 모든 실습 환경은 데이터 브릭스 환경에서 실행시켰다.

#특정 column 필터링 예시 코드
from pyspark.sql.functions import *
new_fire_df= (fire_df
             .select("Incident Number", "Available DtTm", "CallType")
             .where(col("CallType") != "Medical Incident")
             .distinct()
             .show(5))

위의 코드를 실행시키면 sql에 있는 기능들로 spark에서 필요한 조건의 데이터를 조회를 시킬 수 있음을 알 수 있다.

#column의 이름 변경 
new_fire_df= fire_df.withColumnRenamed("Delay", "ResponseDelayedTime")

칼럼의 이름이 가독성이 떨어지거나 간결하지 못한 경우에는 위의 코드처럼. withColumnRenamed("기존 이름 " , "변경할 이름") 형식으로 변경시킬 수 있다.

#날짜 데이터 월/일/연도 형식으로 변환
from pyspark.sql.functions import *

new_fire_df = (fire_df
              .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
              .drop("CallDate"))

기존에 있던 날짜 데이터는 to_timestamp()을 사용해서 원하는 형식으로 변환 가능하다. 또한 변환된 데이터는 dayofmonth(), dayofyear()와 같은 함수를 사용하여 조회할 수 있다.

#집계연산  groupBy, orderBy, count 사용예제
from pyspark.sql.functions import *

(fire_df.select("CallType")
       .groupBy("CallType")
       .count()
       .orderBy("count", ascending=False)
       .show(5))

여기서도 sql의 기능인  집계 기능들을 똑같이 사용할 수 있는 모습이다. 사용방법도 다른 게 없기 때문에 크게 어렵지 않을 것 같다. 이 예시와 같이 max, avg, min 등의 연산 또한 동일하게 사용할 수 있다.

 

*데이터 프레임 ,  데이터 세트, RDD

데이터 프레임과 데이터 세트 , rdd는 각각 사용해야 할 때가 다를 것이다. 각각이 필요할 때를 정리해보면 다음과 같다

 

  • 데이터 프레임 
    • 스파크에게 어떻게 하는지가 아닌 무엇을 해야 하는지 말하고 싶을 때 사용
    • 풍부한 표현과 높은 추상화 및 DSL 연산이 필요하다면 사용
    • 높은 수준의 연산이 필요하다면 사용
    • SQL과 유사한 질의를 사용하고 싶다면 데이터 프레임을 사용
    • 코드 최적화, API 단순화 등을 사용하고 싶다면 데이터 프레임 사용
    • R 사용자는 데이터 프레임을 사용
    • 파이썬 사용자면 데이터 프레임을 사용 but 제어권을 조금 더 가지고 싶다면 RDD 사용
    • 공간/속도 효율성이 필요하다면 사용
  • 데이터 세트
    • 스파크에게 어떻게 하는지가 아닌 무엇을 해야하는지 말하고 싶을때 사용
    • 풍부한 표현과 높은 추상화 및 DSL 연산이 필요하다면 사용
    • 컴파일 시 엄격한 체크를 원하고 여러 개의 케이스 클래스를 만드는 것이 부담이 없다면 데이터 세트 사용
    • 높은 수준의 연산이 필요하다면 사용
    • 인코딩을 사용하여 이득을 보고 싶다면 데이터 세트 사용
  • RDD
    • RDD를 사용하도록 된 서드파티 패키지를 사용한다면 사용
    • 데이터 프레임이나 데이터 세트에서 얻을 수 있는 코드 최적화, 퍼포먼스 이득 등을 포기할 수 있다면 사용
    • 스파크가 어떻게 질의를 수행할지 정확하게 지정해주고 싶다면 사용