홍카나의 공부방

[Airflow] Operator, Task, Scheduler, Worker, DAG 기초 예시 본문

Data Engineering/Airflow

[Airflow] Operator, Task, Scheduler, Worker, DAG 기초 예시

홍문관카페나무 2023. 11. 4. 13:14

 

 

Operator(오퍼레이터)

 

- 특정 행위를 할 수 있는 기능을 모아 놓은 클래스, 설계도

- Bash 오퍼레이터는 쉘 스크립트 명령을 수행하는 오퍼레이터

- Python, S3, GCS는 각각 파이썬, AWS S3, GCP의 GCS를 컨트롤 할 수 있게 만들어주는 오퍼레이터

 

Task(태스크)

 

- 오퍼레이터에서 객체화(인스턴스화)되어 DAG에서 실행 가능한 오브젝트

- Task는 방향성을 가지고 있고, 순환되지 않는 형태로 연결되어 있음(비순환 그래프의 특징)

 

Scheduler(스케쥴러)

 

- DAG 파일을 파싱하고 메타 DB에 정보를 저장하는 역할을 한다.

- start time을 확인하고 워커에게 실제 작업을 수행하라는 명령을 내린다.

 

Worker(워커)

 

- DAG 코드를 읽어 들인 후, 실제 작업을 수행한다.

- 메타 DB에 결과를 업데이트한다.

 

DAG 예시

import datetime
import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_operator_v1", # 파이썬 파일명과 일치 권고
    schedule="0 0 * * *", # Cron 스케줄 - 분, 시, 일, 월, 요일로 읽는다.
    start_date=pendulum.datetime(2021, 1, 1, tz="Asia/Seoul"), # DAG의 시작 날짜. timezone이 UTC로 두면 9시간 늦게 돌음.
    catchup=False, # Backfill(이전에 실행되지 않은 작업을 돌리는 것)을 할 것인지 여부
    #dagrun_timeout=datetime.timedelta(minutes=60), # 60분 이상 돌면 실패
    # tags=["hongcana_exmample"], # 해쉬태그 설정(optional)
    # params={"example_key": "example_value"}, # task들에 공통적으로 넘겨줄 파라미터(optional)
) as dag:
    bash_t1 = BashOperator(
        task_id = "bash_t1", # Web UI에 표출되는 값. 객체명과 일치하도록 표준 권고
        bash_command= "echo whoami", # bash 명령 입력
    )

    bash_t2 = BashOperator(
        task_id = "bash_t2",
        bash_command= "echo $HOSTNAME",
    )

    bash_t1 >> bash_t2

 

 

위 DAG을 수행하고, bash_t2 TASK의 로그를 살펴보면

 

bash_t2 log

 

echo $HOSTNAME의 output으로 특정 16진수가 나온 것을 볼 수가 있는데,

이는 Airflow Task 작업을 수행한 Worker 컨테이너의 ID다.

 

docker exec -it bd01a2666574 bash
echo $HOSTNAME

 

터미널에서 worker 컨테이너로 들어가고 위 명령어를 쳐도 결과 값이 같다.

 

 

 

 

 

 

 

 


개념 출처 : Airflow 마스터 클래스(인프런)

 

반응형