1. 필요한 library 설치 Dockerfile
FROM apache/airflow:2.7.1-python3.11
USER root
RUN apt-get update && \
apt-get install -y gcc python3-dev openjdk-11-jdk && \
apt-get clean
# Set JAVA_HOME environment variable
ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-arm64
USER airflow
RUN pip install apache-airflow pyspark apache-airflow-providers-databricks
2. Docker-compose
version: '3'
x-airflow-common: &airflow-common
image: databricks-airflow
env_file:
- airflow.env
volumes:
- ./jobs:/opt/airflow/jobs
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
depends_on:
- postgres
networks:
- code-with-yu
services:
postgres:
image: postgres
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
networks:
- code-with-yu
webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
depends_on:
- scheduler
scheduler:
<<: *airflow-common
command: bash -c "airflow db init && airflow db migrate && airflow users create --username admin --firstname Yusuf --lastname Ganiyu --role Admin --email airscholar@gmail.com --password admin && airflow scheduler"
networks:
code-with-yu:
3. Airflow Conn 정의
4. DAG 정의
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
default_args = {
'owner': 'airflow',
'retries': 1,
}
with DAG(
dag_id='databricks_example_dag',
default_args=default_args,
schedule_interval='@daily',
start_date=days_ago(1),
) as dag:
# 클러스터 정의
new_cluster = {
"num_workers": 0,
"spark_version": "15.1.x-scala2.12",
"spark_conf": {
"spark.master": "local[*, 4]",
"spark.databricks.cluster.profile": "singleNode"
},
"azure_attributes": {
"first_on_demand": 1,
"availability": "ON_DEMAND_AZURE",
"spot_bid_max_price": -1
},
"node_type_id": "Standard_DS3_v2",
"ssh_public_keys": [],
"custom_tags": {
"ResourceClass": "SingleNode"
},
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
},
"enable_elastic_disk": True,
"init_scripts": [],
"single_user_name": "matkimchi@sju.ac.kr",
"data_security_mode": "LEGACY_SINGLE_USER_STANDARD",
"runtime_engine": "PHOTON"
}
#노트북 Path 설정
notebook_task= {
'notebook_path': '/Users/matkimchi@sju.ac.kr/test-notebook'
}
# Submit 정의
trigger_databricks_notebook = DatabricksSubmitRunOperator(
task_id='test_notebook',
databricks_conn_id="databricks_default",
new_cluster=new_cluster,
notebook_task=notebook_task
)
trigger_databricks_notebook
5. 결과 확인
참조링크
'프로그래밍 > airflow' 카테고리의 다른 글
Airflow + Spark submit 구현 with Docker (0) | 2024.06.03 |
---|---|
[airflow]Kubernetes dags github 연동 (1) | 2023.11.24 |
[airflow] KubernetesExecutor 환경 구축하기 Local_mount.ver (2) | 2023.11.22 |
[Docker]Airflow과 mysql 연동하기 (4) | 2022.07.22 |