홍카나의 공부방

[Airflow Special] 비트코인 가격 수집 DAG 예시로 알아보는 Trigger Rule 본문

[Special Articles]

[Airflow Special] 비트코인 가격 수집 DAG 예시로 알아보는 Trigger Rule

홍문관카페나무 2024. 3. 23. 17:36

 

some_task = PythonOperator(
    task_id = "some_task",
    trigger_rule = "none_failed"
    )

 

 

 

먼저 Airflow가 DAG 내에서 작업(task)을 실행하는 방법을 알아봅시다. Airflow는 DAG를 실행할 때, 각 task를 지속적으로 확인하여 실행 가능 여부를 확인합니다. task 실행이 가능하다고 판단하면, 스케쥴러가 작업을 선택한 후에 실행을 예약합니다. 그렇다면 task 실행이 가능한지는 어떻게 판단할 수 있을까요?

 

기본적으로 하나의 task는 업스트림(up-stream) task들이 모두 수행되면 다운스트림(down-stream) task들이 실행되는 선형적인 실행 구조를 가지고 있습니다. 이 구조는 기본적인 트리거 규칙(Trigger Rule)에 의거합니다.

 

 

Trigger Rule

 

트리거 규칙은 task가 어떠한 로직에 의거하여 실행이 되는지를 나타내는 하나의 규칙입니다. Airflow의 기본 트리거 규칙은 all_success입니다. 즉, Task가 실행되려면 의존성을 가진 다른 태스크들이 모두 성공적으로 완료되어야 하죠.

 

아래에서 예시를 들어보겠습니다. 비트코인 가격 데이터를 1분마다 수집하는 DAG가 있다고 가정하겠습니다. 바이낸스에서는 BTCUSDT 가격을 수집하고, 업비트에서는 BTCKRW 가격을 수집합니다. 가격 데이터는 ETL(Extract-Transform-Load) 파이프라인 방식으로 수집되며, PostgresDB에 가격 데이터가 적재된다고 가정합니다. 그리고 데이터 적재가 완료되면, SVM 같은 머신러닝 기법을 이용하여 비트코인 미래 가격을 예측하는 task까지 수행된다고 하겠습니다.

 

...말은 참 거창하게 했지만 task의 모든 코드를 하나하나 구현하기에는 Trigger Rule 소개라는 글의 주제에 벗어납니다. 그리고 1분마다 가격을 수집해서 ML을 계속 돌리겠다는 것도 의문이 듭니다. 그냥 추상적으로만 위 workflow의 DAG 설정 및 task를 간단하게 구현해보겠습니다.

 

import airflow
import datetime as dt
import time

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator

# from binance
def _extract_binance(**context):
    time.sleep(4)
    print("Fetching BTCUSDT price data from Binance...")

# from upbit
def _extract_upbit(**context):
    time.sleep(2)
    print("Fetching BTCKRW price data from Upbit...")

# transform BTCUSDT data
def _transform_binance(**context):
    print("transform BTCUSDT price data to be stored in Postgres...")

# transform BTCKRW data
def _transform_upbit(**context):
    print("transform BTCKRW price data to be stored in Postgres...")

# Load BTC price data to Postgres
def _load_coin_price(**context):
    print("load BTC price data to Postgres...")

# something special..?
def _train_model(**context):
    print("Train BTC price predict model...")

with DAG(
    dag_id = "05_trigger_practice_BTC",
    start_date = dt.datetime(2023,3,22),
    schedule_interval = "*/1 * * * *", # 매 1분마다 실행
    catchup=False,
) as dag:
    start = EmptyOperator(
        task_id="start"
    )

    extract_binance = PythonOperator(
        task_id = "extract_binance",
        python_callable=_extract_binance
    )

    extract_upbit = PythonOperator(
        task_id = "extract_upbit",
        python_callable=_extract_upbit
    )

    transform_binance = PythonOperator(
        task_id = "transform_binance",
        python_callable=_transform_binance
    )

    transform_upbit = PythonOperator(
        task_id = "transform_upbit",
        python_callable=_transform_upbit
    )

    load_coin_price = PythonOperator(
        task_id = "load_coin_price",
        python_callable=_load_coin_price,
        # trigger_rule = "all_success"
    )

    train_model = PythonOperator(
        task_id = "train_model",
        python_callable=_train_model
    )

    start >> [extract_binance, extract_upbit]
    extract_binance >> transform_binance
    extract_upbit >> transform_upbit
    [transform_binance, transform_upbit] >> load_coin_price
    load_coin_price >> train_model

 

Graph UI

 

 

위 DAG를 그래프 형태로 나타나면 이미지와 같은 형태로 나타납니다.

 

1. 먼저 start task는 EmptyOperator를 이용한 dummy task로, Fan-out 방식으로 다운스트림 task인 extract task들을 실행시킬 수 있도록 만들었습니다. 위와 같이 dummy task를 이용하면, 추후 팬아웃 방식으로 여러 다운스트림 task들이 추가될 때 확장성을 확보하는데 유리할 수 있습니다.

 

2. extract_upbit와 extract_binance task는 각각 업비트, 바이낸스에서 BTC 원화 가격, BTC 테더(USDT) 가격을 수집하는 task가 될 것입니다. 내부 비즈니스 로직은 구현하지 않았으며, 필요하다면 Upbit는 스크래핑을 이용하거나 API에서 가져오는 방식을 택할 수 있겠으며, Binance는 API를 이용하거나 이를 Wrapping한 패키지(ccxt 등)를 이용해서 가격 데이터를 가져오면 될 것입니다.ㅈ

 

3. transform task는 추후 데이터베이스로 가격 데이터 적재시, 변환 로직이 필요할 것을 대비해서 만들어둔 task입니다. 역시나 내부 로직은 구현하지 않았으며, 필요에 따라서 transform 단계를 스킵하고 바로 DB에 적재할 수 있겠죠.

 

4. load task는 데이터베이스로 가격을 적재하는 task입니다. load_coin_price를 보시면 trigger_rule 변수를 만들어 뒀는데 주석처리 했습니다. 그 이유는 trigger_rule은 default 값으로 all_success를 가지기 때문입니다. 즉, 모든 업스트림 태스크가 수행되면 그때 의존성이 해결되어 해당 task가 실행되는 구조입니다.

 

task 실패의 경우, 하위 task들은 upstream_failed 상태로 기록됨.

 

만약 binance API에 장애가 발생하여 데이터 수집을 못하는 경우가 발생한다면, extract_binance task가 실패하겠죠. 그렇다면 다운 스트림 task인 transfrom_binance는 당연히 upstream_failed 상태로 넘어갈 것이며, load 이후의 단계도 upstream_failed 단계로 남게 되어 결과적으로 DAG 수행은 실패하게 될 것입니다.

 

load_coin_price의 트리거 규칙을 all_success에서 따로 수정하지 않았으므로, 모든 상위 task가 성공적으로 완료되면 작업이 수행되는 all_success 규칙에 의거하여 load_coin_price도 수행되지 않을 것입니다. 만약 이때 업비트에서 원화 가격만이라도 수집하여 적재할 수 있도록 하고 싶다면, load task의 트리거 규칙은 어떻게 수정해야할까요?

 

먼저 trigger rule에 어떤 값을 넣을 수 있는지 알아야겠죠. 공식 문서를 보면 다음과 같이 나와있습니다.

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html

 

DAGs — Airflow Documentation

 

airflow.apache.org

 

 

 

airflow docs

 

 

 

기본적으로, Airflow는 task를 수행하기 전에 모든 Upstream tasks(상위 작업들)이 성공하기까지 기다리게 된다고 나와있네요. trigger_rule argument를 바꿔주면 이 로직을 수정할 수 있다고 합니다. 기본 값은 모든 업스트림이 성공해야 하는 all_success임을 알아두고, 다른 case들을 살펴봅시다.

 

트리거 규칙 작동 로직 사례
all_failed 모든 업스트림 태스크들이 failed 혹은 upstream_failed 상태일 때 상위 태스크에서 하나 이상 실패가 예상될 때 오류 처리 코드를 trigger하기 위함
all_done 모든 업스트림 태스크들이 결과에 상관없이 실행을 완료했을 때 모든 태스크가 완료되었을 때, clean-up code 실행을 위함
all_skipped 모든 업스트림 태스크들이 skipped 상태일 때 태스크 스킵에 대비한 코드를 trigger하기 위함
one_failed 최소 하나의 업스트림 태스크가 실패했을 때
(모든 업스트림 태스크의 완료를 기다리지 않음) 
task 실패에 대비한 slack 알림 코드 Trigger
one_success 최소 하나의 업스트림 태스크가 성공했을 때
(모든 업스트림 태스크의 성공을 기다리지 않음)
하나의 task에서 나온 결과를 즉각 사용할 수 있을 경우
one_done 최소 하나의 업스트림 태스크가 succeeded 또는 failed 일 때 하나의 task라도 성공하면 Trigger하되, 수행을 모두 기다리기 위함.
none_failed 모든 업스트림 태스크가 failed, upstream_failed 상태가 아닐 때, 즉 모든 상위 task가 skip or success. 조건부 Branch 코드로 특정 상위 task가 skip되었을 때 분기 처리 가능
none_failed_min_one_success 모든 업스트림 태스크가 failed, upstream_failed 상태가 아니고, 하나의 태스크는 성공했을 때 위와 같으나 상위 task가 하나는 성공해야 할 때
none_skipped 모든 업스트림 태스크가 skipped state가 아닐 때 모든 업스트림 태스크가 수행된 경우, 결과를 무시하고 수행해야 할 때
always 의존성 없이 그냥 항상 수행 테스트 용 or 알림 전송용

 

 

업비트나 바이낸스 중 한 곳이라도 가격 데이터 수집이 가능하다면 그 데이터를 적재하고 싶습니다. 그래서 트리거 규칙을 all_done으로 변경하였습니다. all_done은 모든 업스트림 태스크들이 결과에 상관없이 수행을 완료했을 때, 해당 task가 실행되게끔 만드는 트리거 규칙입니다.

 

 

load_coin_price의 trigger_rule을 수정한 결과

 

 

이번엔 upbit에서 가격 데이터 수집을 실패했다고 가정하고 DAG를 수행해봤는데 적재 task 수행에 성공한 것으로 볼 수 있습니다. 다만, 실제로 위 DAG를 설계할 때 load task의 트리거 규칙을 all_done으로 설정하면 DB 데이터 적재 로직에 결함이 발생할 수 있습니다. 가격 데이터 수집에 모두 실패하게 되더라도 all_done 트리거 규칙에 의거하여 load task가 수행되게 됩니다. 이상하지 않나요? 

 

이 논리적 결함을 고쳐보고자 trigger_rule을 none_failed로 설정하면 upstream_failed를 기록한 상위 태스크에 막혀서 load 과정이 수행이 되지 않을 것이고, one_success로 설정하면 load_coin_price 태스크가 모든 업스트림 태스크의 성공을 기다리지 않기 때문에 적재 과정에서 DB 데이터 무결성을 보장할 수 없습니다. 이 논리적 결함을 해결하기 위해선 단순한 trigger_rule 수정뿐만 아니라 branch 전략을 시도하거나, task를 따로 더 생성해서 설계를 달리해 볼 수도 있겠습니다.

 

 

5. 설명이 늦었지만 train_model은 ML 모델을 훈련할 경우를 대비해서 만들어둔 task입니다.

반응형