Data Engineering/프로그래머스 데브코스
[DE 데브코스] 06.05 TIL - Airflow 예제
홍문관카페나무
2023. 6. 6. 15:38
Hello World DAG 만들어보기
- Python Operator를 이용하여 Airflow dag를 만들어본다.
from airflow.operators.python import PythonOperator
from datetime import datetime
from airflow import DAG
dag = DAG(
dag_id ="helloWorld",
start_date = datetime(2021,8,26),
catchup=False,
tags = ['example'],
schedule = '0 2 * * *'
)
def print_hello():
print("hello!")
return "hello!"
def print_world():
print("world!")
return "world!"
print_hello = PythonOperator(
task_id = 'print_hello',
python_callable= print_hello,
dag = dag
)
print_world = PythonOperator(
task_id = 'print_world',
python_callable= print_world,
dag = dag
)
# 이렇게 순서를 정하지 않으면, 동시에 독립적으로 실행
print_hello() >> print_world()
첫 번째는 PythonOperator를 이용하여 DAG를 구성하는 방법이다.
task를 함수 형태로 각각 만들고, PythonOperator 객체를 만들어줘서 python_callable 변수에 함수 이름을 기입한다.
맨 마지막으로 task간의 순서를 지정한다.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_world():
print("world!")
return "world!"
with DAG(
dag_id ="helloWorld",
start_date = datetime(2021,8,26),
catchup=False,
tags = ['example'],
schedule = '0 2 * * *'
) as dag:
# 이렇게 순서를 정하지 않으면, 동시에 독립적으로 실행
print_hello() >> print_world()
두 번째 방법은 이렇게 task decorator를 만들면 PythonOperator를 따로 지정해주지 않아도 된다.
task_id를 따로 지정하지 않았기 때문에, 이때는 함수 이름이 task_id가 되는 것이다.
코드가 직관적이고, 타이핑 소요도 적어진다.
DAG를 작성할 때는 with ~ as 구조로 작성한다.
DAG에 지정할 수 있는 파라미터들
- 위 dag를 살펴보면 dag_id, start_date 등의 파라미터를 지정해줬는데, 기타 파라미터들도 살펴본다.
- max_active_runs는 한 번에 동시에 실행될 수 있는 해당 dag의 수를 지칭한다. backfill을 할 때 의미가 있는 것이다.
- max_active_tasks는 동시에 실행될 수 있는 tasks를 지정하는 것이다.
- catchup은 start_date이 지금 시점보다 과거라면, 그 때부터 밀린 날짜를 catchup할 것인지 정하는 파라미터다.
- max_active_tasks에 아무리 큰 값을 지정해도, 현 airflow 워커 노드에 배정된 CPU 갯수로 상한선이 적용된다. 만약 워커 노드가 4개의 CPU를 할당 받았는데 값을 100을 줘도 실제로 적용되는 값은 4라는 뜻이다.
반응형