프로그래밍/Spark

[Spark] MLlib 정리

장경훈 2022. 12. 7. 19:36
이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다.

*Spark MLlib의 장점

  • 데이터를 시스템에 맞게 다운샘플링 하지 않고 데이터 준비 및 모델 구축을 하나의 에코시스템으로 구축 가능
  • 선형으로 확장되는 O(n)으로 방대한 양의 데이터를 확장할 수 있다.

 

*MLlib 파이프라인 구성

#파이프라인에 적용할 샘플 데이터 8대2로 분리
filePath="""/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet"""
airbnbDF=spark.read.parquet(filePath)
airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms", "number_of_reviews", "price").show(5)

trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)
# 데이터 변환기를 사용하여 데이터 변환 
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol= "features")
vecTrainDF= vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms", "features", "price").show(10)
# 추정기를 사용하여 모델을 반환 
from pyspark.ml.regression import LinearRegression
lr= LinearRegression(featuresCol= "features", labelCol="price")
lrModel= lr.fit(vecTrainDF)
#파이프라인 생성 및 데이터 세트에 적용 
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)

predDF= pipelineModel.transform(testDF)
predDF.select("bedrooms", "features", "price", "prediction").show(10)

위의 예시와 같이 머신러닝 파이프라인은  데이터 수집 - 변환 - 추정- 파이프라인 구축 순으로 만들면 된다.

 

*모델 저장 및 로드 방법

# 모델 저장
pipelinePath = "저장할 경로"
pipelineModel.write().overwrite().save(pipelinePath)

# 모델 불러오기 
from pyspark.ml import PipelineModel
save_model= PipelineModel.load(pipelinePath)

 

*파이프라인 최적화 방법

 

1.parallesim

spark.ml은 기본적으로 병렬이 아닌 순차적으로 모델 컬렉션을 훈련하는데 이 문제점을 해결하기 위해서 parallelism 매개변수를 사용하여 병렬로 훈련할 모델 수를 결정할 수 있다.

*여기서 parallesim값은 최대한 신중하게 설정해야 하며 항상 값이 크다고 성능이 향상되는 것은 아니다. 일반적으로 10이면 충분하다.

#parallelism 설정 방법
cvmodel= cv.setParallelism(4).fit(trainDF)

 

2.파이프라인 내부에 교차 검증기를 배치

#예시
cv = CrossValidator(estimator =rf,
                    evaluator=evalutor,
                    numFolds=3,
                    parallelism=4,
                    seed=42)
pipeline=Pipeline(stages=[stringIndexer, vecAssembler, cv])
pipelineModel = pipeline.fit(trainDF)

위의 예시코드와 같이 Pipeline인자에 직접 교차검증기를 배치하여 조금이라도 훈련 시간을 단축하는 방법을 사용할 수 있다.