Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- redshift
- 데브코스
- 데이터 웨어하우스
- PYTHON
- 가상환경
- linux
- 데이터엔지니어링
- Go
- AWS
- 데이터베이스
- Docker
- 종류
- TCP
- airflow
- S3
- 데이터 파이프라인
- 파이썬
- dockerfile
- 데이터 엔지니어링
- TIL
- Django
- sql
- http
- airflow.cfg
- 정리
- 컴퓨터네트워크
- HADOOP
- 컴퓨터 네트워크
- 자료구조
- 운영체제
Archives
- Today
- Total
홍카나의 공부방
[Airflow] Operator, Task, Scheduler, Worker, DAG 기초 예시 본문
Data Engineering/Airflow
[Airflow] Operator, Task, Scheduler, Worker, DAG 기초 예시
홍문관카페나무 2023. 11. 4. 13:14
Operator(오퍼레이터)
- 특정 행위를 할 수 있는 기능을 모아 놓은 클래스, 설계도
- Bash 오퍼레이터는 쉘 스크립트 명령을 수행하는 오퍼레이터
- Python, S3, GCS는 각각 파이썬, AWS S3, GCP의 GCS를 컨트롤 할 수 있게 만들어주는 오퍼레이터
Task(태스크)
- 오퍼레이터에서 객체화(인스턴스화)되어 DAG에서 실행 가능한 오브젝트
- Task는 방향성을 가지고 있고, 순환되지 않는 형태로 연결되어 있음(비순환 그래프의 특징)
Scheduler(스케쥴러)
- DAG 파일을 파싱하고 메타 DB에 정보를 저장하는 역할을 한다.
- start time을 확인하고 워커에게 실제 작업을 수행하라는 명령을 내린다.
Worker(워커)
- DAG 코드를 읽어 들인 후, 실제 작업을 수행한다.
- 메타 DB에 결과를 업데이트한다.
DAG 예시
import datetime
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_operator_v1", # 파이썬 파일명과 일치 권고
schedule="0 0 * * *", # Cron 스케줄 - 분, 시, 일, 월, 요일로 읽는다.
start_date=pendulum.datetime(2021, 1, 1, tz="Asia/Seoul"), # DAG의 시작 날짜. timezone이 UTC로 두면 9시간 늦게 돌음.
catchup=False, # Backfill(이전에 실행되지 않은 작업을 돌리는 것)을 할 것인지 여부
#dagrun_timeout=datetime.timedelta(minutes=60), # 60분 이상 돌면 실패
# tags=["hongcana_exmample"], # 해쉬태그 설정(optional)
# params={"example_key": "example_value"}, # task들에 공통적으로 넘겨줄 파라미터(optional)
) as dag:
bash_t1 = BashOperator(
task_id = "bash_t1", # Web UI에 표출되는 값. 객체명과 일치하도록 표준 권고
bash_command= "echo whoami", # bash 명령 입력
)
bash_t2 = BashOperator(
task_id = "bash_t2",
bash_command= "echo $HOSTNAME",
)
bash_t1 >> bash_t2
위 DAG을 수행하고, bash_t2 TASK의 로그를 살펴보면
echo $HOSTNAME의 output으로 특정 16진수가 나온 것을 볼 수가 있는데,
이는 Airflow Task 작업을 수행한 Worker 컨테이너의 ID다.
docker exec -it bd01a2666574 bash
echo $HOSTNAME
터미널에서 worker 컨테이너로 들어가고 위 명령어를 쳐도 결과 값이 같다.
개념 출처 : Airflow 마스터 클래스(인프런)
반응형
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] 쉘 스크립트(Shell) 실행하기 (8) | 2023.11.12 |
---|---|
[Airflow] Cron Schedule 정리 (1) | 2023.11.04 |
[Airflow] WSL에서 pip install로 Airflow를 사용하지 않는 이유 (0) | 2023.11.04 |
[Airflow] cfg에서의 Timezone (0) | 2023.06.21 |
[Airflow] task 실패시 분석하기 (1) | 2023.06.06 |