프로그래밍/airflow

[Docker]Airflow과 mysql 연동하기

장경훈 2022. 7. 22. 16:24

Airflow를 사용하여 데이터 파이프라인을 구축하기 위해 mysql과 연동시키는 작업을 하려고 했었다. 아직 docker-compose에 대한 개념이 완벽하지 않은 상황에서 Airflow, mysql에 대한 지식 모두 완벽하지는 않아서 이 연동을 구축하는 데에 다양한 오류를 만나면서 대략 12시간 만에 구글링을 통해 연동을 성공시켰다...  이 글이 docker로 Airflow+mysql을 구축하려는 사람에게 도움이 되었으면 좋겠다.

 

1. 우선 docker-compose.yaml을 내가 설치하려는 폴더에 만들어준 후 터미널로 이 폴더에 진입한 후 아래의 명령어를 입력해주자.

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.3/docker-compose.yaml'

2. 위의 명령어를 입력하면 docker-compose.yaml이라는 도커 파일이 생성될 것이다.  우선 services: 항목에 아래의 코드를  입력해주자

 mysql:

    image: mysql:8
    environment:
      - MYSQL_ROOT_PASSWORD=root
    volumes:
      - ./mysql_data:/var/lib/mysql-files/
      - ./mysql.cnf:/etc/mysql/mysql.cnf
    ports:
      3306:3306

여기서 -MYSQL_ROOT_PASSWORD =root 이부분은 초기 비밀번호를 root로 설정해준것이니 각자 바꾸고 싶은 비밀번호로 바꿔주면 된다

3.  이대로 docker compsoe up -d 을 하게 된다면 아마 차후에 No module ~ mysqldb라는 에러가 나타나게 될것이다. 이것의 이유는 mysqldb라이브러리가 없다는 것인데 여기서 우리는 pymysql을  사용하여 mysql을 대체하는 방향으로 설치할 것이다. 

FROM apache/airflow:2.3.3
COPY lib.txt /lib.txt
RUN pip install --user --upgrade pip
RUN pip install --no-cache-dir --user -r /lib.txt

우선 docker-compose.yaml이 있는 폴더 안에 Dockerfile을 만들어 준 후 위의 코드를 집어 넣어주고 여기서 lib.txt 또한 같은 폴더에 만들어 준다.

PyMySQL==1.0.2

lib.txt의 내용은 위와 같다 여기서도 마찬가지로 lib이라는 이름은 바꿔도 되는것이니 각자 바꾸고 싶은데로 하면 될것 같다

만약 이 내용이 이해가 안간다면 https://www.youtube.com/watch?v=0UepvC9X4HY 이 영상을 참고하면 쉽게 해결할 수 있다.

4. 이제 위에서 만든 도커파일의 이미지를 만들어 줄 것이다.

docker build . --tag extending_airflow:second

터미널에 위의 명령어를 입력해주면 이미지가 만들어 질 것이다. 각자 테그와 이름은 원하는 것으로 바꿔줘도 된다. 성공적으로 만들어 졌다면 이제 docker-compose파일의 이미지를 새로만든 이미지로 수정 해줘야 한다. 

image: ${AIRFLOW_IMAGE_NAME:-extending_airflow:second}

위의 내용과 같이 airflow 이미지를 자신이 만든 이미지 파일로 바꿔주면 된다. 이후에 터미널에 docker compose up -d 을 하게 되면 아마 UID 오류가 발생한다. 이를 방지하기 위해 .env파일을 폴더에 만들어 준다. 파일 내용은 다음과 같다

AIRFLOW_UID=50000
AIRFLOW_GID=0

5. 이제  localhost:8080을 들어가면 정상적으로 작동하는것을 알 수 있다. 초기 아이디 비밀번호는 airflow/airflow이므로 입력하고 들어가면 다음과 같은 이미지가 나올 것이다.

여기서 위의 상단에 있는 Admin->connections로 들어가서 mysql 서버와 연결 해 주자.

위의 connection ID 는 차후에 자신이 DAG에 쓰려고 하는 이름이기 때문에 각자 자신이 원하는 이름으로 설정 해 주면되며 Login과 password 또한 위에서 자신이 설정한 것으로 넣어주면 된다. 나머지는 위의 사진과 동일하게 해준 뒤 save를 해주자.

 

6.이제 설정이 완료되었으니 테스트를 해보자.

from airflow.operators.mysql_operator import MySqlOperator
from airflow import DAG
from datetime import datetime,timedelta
import pymysql
pymysql.install_as_MySQLdb()

default_args={"owner":"airflow", "start_date":datetime(2021,3,7)}
with DAG(dag_id="workflow", default_args=default_args, schedule_interval='@daily') as dag:
    
    create_table =MySqlOperator(
        task_id ="create_table",
        mysql_conn_id="mysql_db" ,
        sql="CREATE DATABASE qweql",
    )

    create_table

위와 같은 dag파일을 작성하여 저장하자 여기서 import pymysql과 pymysql.install_as_MySQLdb()이 부분이 중요하다 이 부분을 위해서 앞에서 lib파일을 만들고 라이브러리를 다운 해준것이다. 만약에 이게 없다면 No module ~ MYSQLdb의 오류가 발생하게 된다 그렇기 때문에 mysql을 사용하는 DAG에는 꼭 저 2줄을 넣어줘야 한다. 이 기능에 대해서 설명하자면 pymysql이 mysql과 같은 역할을 하게되고 구글에서 여기저기 찾아보니 pymysql이 mysqldb인것처럼 속이는 것이라고 한다.. 위의 dag를 실행하면 처음 1번은 성공하게 될 것이다 하지만 그 이후에는 실패하게 되는데 잘못된게 아니라 데이터베이스가 이미 있기 때문에 안만들어 지는것이므로 안심하자!

7.테스트 결과

대략적으로 보면 빨간색이 많은 것처럼 보이지만 테스트를 여러번 하게 되었을때 이런 경우가 나오게 되며 한번만 실행하게 되면 성공적으로 만들어 진다. 여기서 빨간 색들은 위에서 설명한 이미 있는 데이터베이스를 만든 경우다. 여기서 mysql workbench에도 잘 나타나는지 확인 해보면 

 

정상적으로 데이터베이스가 만들어진걸 볼 수 있다!

 

 

 

 

이 연동을 구축하는데 정말 많은 오류를 경험하였다. ERROR 2003 (HY000): Can't connect to MySQL server on '127.0.0.1' (111), No module ~ 등등등.. 정말 많이 힘들었다 만약 이 글을 보는 사람들이 있다면 나 보다는 조금 덜 힘들게 연동에 성공 했으면 좋겠다.