프로그래밍 33

[spark] 캐싱, 영속화, 조인

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다. *데이터 .cache(캐싱)과 .persist(영속화) 데이터 캐싱과 영속화는 거의 유사하지만 영속화는 조금 더 세밀한 설정을 하여 더 나은 성능을 보여준다. # cache 예제 from pyspark.sql.functions import col df=spark.range(1*100000).toDF("id").withColumn("square",col("id")) df.cache() df.count() #persist 예제 from pyspark.sql.functions import col df=spark.range(1*100000).toDF("id").withColumn("square",col("id")) df.p..

[Spark] 스파크 최적화 및 튜닝

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다. *Spark 설정 세팅 방법 1. 파일 수정 방법 우선 기본적으로 설정파일은 spark가 다운되어 있는 폴더의 conf안에 .template형식으로 저장되어 있다. 이 파일을 수정하고 .template부분을 지우고 저장하면 spark가 새로운 값을 인식한다. 2. 프로그래밍 방법 >>>spark.conf.isModifiable("spark.sql.shuffle.partitison") #수정가능한 설정필드인지 확인 >>>spark.conf.get("spark.sql.shuffle.partitions") #현재필드 값 확인 >>>spark.conf.set("spark.sql.shuffle.partitions", 5) #..

pyspark mysql 연동 (feat.docker)

책을 공부하며 spark와 mysql을 연동시키는 방법에 대해서 실습하려고 했다. 하지만 내 mac에 라이브러리, 버전 등이 너무 꼬여버려서 차라리 이럴 거면 docker로 구축해야겠다고 생각해서 docker로 구현한 것을 정리해 보았다. 1. docker- compsoe.yml 작성 version: '3' services: spark: image: docker.io/bitnami/spark:3.3 environment: - SPARK_MODE=master - SPARK_RPC_AUTHENTICATION_ENABLED=no - SPARK_RPC_ENCRYPTION_ENABLED=no - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no - SPARK_SSL_ENABLED=no p..

[Spark] sql + 데이터프레임 읽고 쓰기

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다. *Spark sql spark에서는 Sparksession을 사용하여 sql을 사용할 수 있다. 예제 코드를 보면 다음과 같다. from pyspark.sql import SparkSession spark=(SparkSession .builder .appName("sqlExam") .getOrCreate()) csv_file = "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv" df= (spark.read.format("csv") .option("inferSchema", "true") .option("header", "true") .load(c..

Spark 정형화 api

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용입니다. * 저수준 DSL과 고수준 DSL 여기서 저수준의 DSL은 spark 1.x 버전에 있던 RDD를 의미하고 그 이후에 2.x에서는 고수준 DSL을 사용하고 있다. 저수준과 고수준의 차이는 저수준 DSL보다 고수준 DSL이 훨씬 더 표현력이 높으며 이전보다 더 간단하게 사용할 수 있다는 것이다. #기존 저수준 DSL의 표현방식 RDD=sc.parallelize([("JNAG", 23), ("KIM",25), ("PARK",22), ("AN",22)]) agesRDD= RDD.map(lambda x: (x[0], (x[1], 1)))\ .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]..

Spark 기본 개념 및 용어정리

이 포스팅은 ⟪러닝 스파크(2판)⟫(제이펍, 2022년)을 공부하면서 정리한 내용 입니다. *Spark 애플리케이션 개념 SparkSession - 스파크의 코어 기능들을 사용할 수 있는 객체 job(잡) - 스파크 액션에 대한 응답으로 생성 (여러 task로 이루어진 병렬 연산) stage(스테이지) - 잡은 의존성을 가지는 다수의 태스크 모음으로 나뉘는데 이를 스테이지라고 한다. task(태스크) - 스파크의 작업 실행의 가장 기본적인 단위 드라이버-> 잡 -> 스테이지 -> 테스크 순으로 분산처리 된다. ex) 드라이버에서 3개의 job으로 나뉘고 각각의 job에서 스테이지로 스테이지에서 3개의 테스크로 나뉜다. *트랜스포메이션과 액션 스파크는 기본적으로 분산 데이터 연산으로 트랜스포메이션과 액션으..

Elastic search 쿼리문

* Elastic search의 검색은 크게 쿼리 컨텍스트와 필터 컨텍스트로 나누어진다. 이 둘의 차이점은 쿼리 컨텍스트는 정확하게 일치하는 목록을 검색하는 것이고 필터 컨텍스트는 검색조건에 맞는지 아닌지에 대해서만 표현하는 검색 방법이다. 또한 쿼리 컨텍스트는 유사도를 기준으로 더 정확한 결과를 먼저 알려준다. GET tmdb_5000_movie3/_search # 쿼리 컨텍스트 예시 { "query": { "match": { "category": "movie" } } } GET tmdb_5000_movie3/_search #필터 컨텍스트 예시 { "query": { "bool": { "filter": {"term": { "category": "movie" } } } } } 위의 예시를 보면 필드 컨텍..

프로그래밍/ELK 2022.09.23

Elastic Search 기본정리

1. 엘라스틱서치 요청과 응답 기본적으로 elastic search는 REST API를 사용한다 REST API는 http의 장점을 이용해서 리소스를 주고받는 형태이고 다음과 같은 메소드를 사용해 CRUD 작업을 진행한다 POST 리소스 추가 GET 리소스 조회 PUT 리소스 수정 DELETE 리소스 삭제 2. 시스템 상태 확인 하는법 위와 같이 시스템의 상태를 확인하는 방법으로는 cat API를 사용하면 되며 GET _cat을 사용해서 호출할 수 있다. 만약 내부 인덱스의 목록을 확인해보고 싶다면 GET _cat/indices?v를 요청하면 된다. 인덱스와 도큐먼트 도큐먼트란 엘라스틱에서 데이터가 저장되는 기본 단위로 JSON형태이다. { "name": "Jang", "age": 23, "gender"..

프로그래밍/ELK 2022.09.07

[DOCKER] ELK Stack 환경 구축하기

최근에 ELK 파이프라인을 구축해보기 위해서 책으로 공부하기 시작했다. 우선 실습을 하기 위해서 Docker를 사용하여 환경을 구축한 것을 포스팅 하려고한다. 1. docker-compose git clone으로 가져오기 git clone https://github.com/deviantony/docker-elk.git 위의 저장소를 클론하게 되면 가장 최신버전의 ELK를 clone 해오게 되는데 혹시 나처럼 elastic7 버전으로 실습환경을 구축하고 싶은 사람이라면 아래의 코드를 사용하면 될 것 같다. git clone -b release-7.x https://github.com/deviantony/docker-elk 혹시라도 다른버전을 사용하고 싶은 사람은 깃허브 저장소에 가게되면 다양한 버전의 브런..

프로그래밍/ELK 2022.09.06

[Python] selenium multiprocessing 병렬 처리 방법

최근에 팀 토이 프로젝트를 하면서 크롤링을 하고 있는데 크롤링해야 하는 데이터 양이 많다 보니 데이터 수집하는데 시간이 너무 오래 걸리는 것을 느꼈고 방법이 없을까 하다가 문득 머릿속에 병렬 처리 방법이 떠오르게 되었다. 그래서 바로 시도해봤고 나름 성공적으로 되어서 혹시 나와 같은 사람들이 있다면 python multiprocessing을 사용하여 병렬 처리를 해보자! *예제 코드 import multiprocessing def crawling(s,e,lists): divide=[(1,6),(6,11),(11,16),(16,21),(21,26),(26,31),(31,36)] manager=multiprocessing.Manager() lists=manager.list() process=[] for st..