프로그래밍/airflow

[Airflow] Azure Databricks Spark submit

장경훈 2024. 6. 5. 23:49

1. 필요한 library 설치 Dockerfile

FROM apache/airflow:2.7.1-python3.11

USER root
RUN apt-get update && \
    apt-get install -y gcc python3-dev openjdk-11-jdk && \
    apt-get clean

# Set JAVA_HOME environment variable
ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-arm64

USER airflow

RUN pip install apache-airflow pyspark apache-airflow-providers-databricks

 

2. Docker-compose

version: '3'

x-airflow-common: &airflow-common
  image: databricks-airflow
  env_file:
    - airflow.env
  volumes:
    - ./jobs:/opt/airflow/jobs
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
  depends_on:
    - postgres
  networks:
    - code-with-yu

services:
  postgres:
    image: postgres
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    networks:
      - code-with-yu
  webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    depends_on:
      - scheduler
  scheduler:
    <<: *airflow-common
    command: bash -c "airflow db init && airflow db migrate && airflow users create --username admin --firstname Yusuf --lastname Ganiyu --role Admin --email airscholar@gmail.com --password admin && airflow scheduler"

networks:
  code-with-yu:

 

3. Airflow Conn 정의

4.  DAG 정의

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

default_args = {
    'owner': 'airflow',
    'retries': 1,
}

with DAG(
    dag_id='databricks_example_dag',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=days_ago(1),
) as dag:
	
    # 클러스터 정의
    new_cluster = {
        "num_workers": 0,
        "spark_version": "15.1.x-scala2.12",
        "spark_conf": {
            "spark.master": "local[*, 4]",
            "spark.databricks.cluster.profile": "singleNode"
        },
        "azure_attributes": {
            "first_on_demand": 1,
            "availability": "ON_DEMAND_AZURE",
            "spot_bid_max_price": -1
        },
        "node_type_id": "Standard_DS3_v2",
        "ssh_public_keys": [],
        "custom_tags": {
            "ResourceClass": "SingleNode"
        },
        "spark_env_vars": {
            "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
        },
        "enable_elastic_disk": True,
        "init_scripts": [],
        "single_user_name": "matkimchi@sju.ac.kr",
        "data_security_mode": "LEGACY_SINGLE_USER_STANDARD",
        "runtime_engine": "PHOTON"
    }
    #노트북 Path 설정
    notebook_task= {
        'notebook_path': '/Users/matkimchi@sju.ac.kr/test-notebook'
    }
	
    # Submit 정의 
    trigger_databricks_notebook = DatabricksSubmitRunOperator(
        task_id='test_notebook',
        databricks_conn_id="databricks_default",
        new_cluster=new_cluster,
        notebook_task=notebook_task
    )

    trigger_databricks_notebook

5. 결과 확인

참조링크

https://medium.com/gumgum-tech/integrating-apache-airflow-with-databricks-a-step-by-step-guide-a4e4202590ee