[DE 데브코스] 06.05 TIL - Airflow 예제

[DE 데브코스] 06.05 TIL - Airflow 예제

2023. 6. 6.

Hello World DAG 만들어보기

  • Python Operator를 이용하여 Airflow dag를 만들어본다.
from airflow.operators.python import PythonOperator
from datetime import datetime
from airflow import DAG

dag = DAG(
    dag_id ="helloWorld",
    start_date = datetime(2021,8,26),
    tags = ['example'],
    schedule = '0 2 * * *'

def print_hello():
    return "hello!"

def print_world():
    return "world!"

print_hello = PythonOperator(
    task_id = 'print_hello',
    python_callable= print_hello,
    dag = dag

print_world = PythonOperator(
    task_id = 'print_world',
    python_callable= print_world,
    dag = dag

# 이렇게 순서를 정하지 않으면, 동시에 독립적으로 실행
print_hello() >> print_world()

첫 번째는 PythonOperator를 이용하여 DAG를 구성하는 방법이다.

task를 함수 형태로 각각 만들고, PythonOperator 객체를 만들어줘서 python_callable 변수에 함수 이름을 기입한다.

맨 마지막으로 task간의 순서를 지정한다.


from datetime import datetime
from airflow import DAG
from airflow.decorators import task

def print_hello():
    return "hello!"

def print_world():
    return "world!"

with DAG(
    dag_id ="helloWorld",
    start_date = datetime(2021,8,26),
    tags = ['example'],
    schedule = '0 2 * * *'
) as dag:

    # 이렇게 순서를 정하지 않으면, 동시에 독립적으로 실행
    print_hello() >> print_world()

두 번째 방법은 이렇게 task decorator를 만들면 PythonOperator를 따로 지정해주지 않아도 된다.

task_id를 따로 지정하지 않았기 때문에, 이때는 함수 이름이 task_id가 되는 것이다.

코드가 직관적이고, 타이핑 소요도 적어진다.

DAG를 작성할 때는 with ~ as 구조로 작성한다.


DAG에 지정할 수 있는 파라미터들

  • 위 dag를 살펴보면 dag_id, start_date 등의 파라미터를 지정해줬는데, 기타 파라미터들도 살펴본다.
  • max_active_runs는 한 번에 동시에 실행될 수 있는 해당 dag의 수를 지칭한다. backfill을 할 때 의미가 있는 것이다.
  • max_active_tasks는 동시에 실행될 수 있는 tasks를 지정하는 것이다.
  • catchup은 start_date이 지금 시점보다 과거라면, 그 때부터 밀린 날짜를 catchup할 것인지 정하는 파라미터다.
  • max_active_tasks에 아무리 큰 값을 지정해도, 현 airflow 워커 노드에 배정된 CPU 갯수로 상한선이 적용된다. 만약 워커 노드가 4개의 CPU를 할당 받았는데 값을 100을 줘도 실제로 적용되는 값은 4라는 뜻이다.

