Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- PYTHON
- 컴퓨터네트워크
- linux
- Django
- airflow
- AWS
- 컴퓨터 네트워크
- http
- 데이터 파이프라인
- 운영체제
- S3
- TIL
- 데이터엔지니어링
- 데이터 엔지니어링
- 데브코스
- dockerfile
- 데이터베이스
- airflow.cfg
- HADOOP
- 종류
- sql
- 데이터 웨어하우스
- Docker
- TCP
- redshift
- 정리
- Go
- 자료구조
- 파이썬
- 가상환경
Archives
- Today
- Total
홍카나의 공부방
[DE 데브코스] 06.05 TIL - Airflow 예제 본문
Hello World DAG 만들어보기
- Python Operator를 이용하여 Airflow dag를 만들어본다.
from airflow.operators.python import PythonOperator
from datetime import datetime
from airflow import DAG
dag = DAG(
dag_id ="helloWorld",
start_date = datetime(2021,8,26),
catchup=False,
tags = ['example'],
schedule = '0 2 * * *'
)
def print_hello():
print("hello!")
return "hello!"
def print_world():
print("world!")
return "world!"
print_hello = PythonOperator(
task_id = 'print_hello',
python_callable= print_hello,
dag = dag
)
print_world = PythonOperator(
task_id = 'print_world',
python_callable= print_world,
dag = dag
)
# 이렇게 순서를 정하지 않으면, 동시에 독립적으로 실행
print_hello() >> print_world()
첫 번째는 PythonOperator를 이용하여 DAG를 구성하는 방법이다.
task를 함수 형태로 각각 만들고, PythonOperator 객체를 만들어줘서 python_callable 변수에 함수 이름을 기입한다.
맨 마지막으로 task간의 순서를 지정한다.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_world():
print("world!")
return "world!"
with DAG(
dag_id ="helloWorld",
start_date = datetime(2021,8,26),
catchup=False,
tags = ['example'],
schedule = '0 2 * * *'
) as dag:
# 이렇게 순서를 정하지 않으면, 동시에 독립적으로 실행
print_hello() >> print_world()
두 번째 방법은 이렇게 task decorator를 만들면 PythonOperator를 따로 지정해주지 않아도 된다.
task_id를 따로 지정하지 않았기 때문에, 이때는 함수 이름이 task_id가 되는 것이다.
코드가 직관적이고, 타이핑 소요도 적어진다.
DAG를 작성할 때는 with ~ as 구조로 작성한다.
DAG에 지정할 수 있는 파라미터들
- 위 dag를 살펴보면 dag_id, start_date 등의 파라미터를 지정해줬는데, 기타 파라미터들도 살펴본다.
- max_active_runs는 한 번에 동시에 실행될 수 있는 해당 dag의 수를 지칭한다. backfill을 할 때 의미가 있는 것이다.
- max_active_tasks는 동시에 실행될 수 있는 tasks를 지정하는 것이다.
- catchup은 start_date이 지금 시점보다 과거라면, 그 때부터 밀린 날짜를 catchup할 것인지 정하는 파라미터다.
- max_active_tasks에 아무리 큰 값을 지정해도, 현 airflow 워커 노드에 배정된 CPU 갯수로 상한선이 적용된다. 만약 워커 노드가 4개의 CPU를 할당 받았는데 값을 100을 줘도 실제로 적용되는 값은 4라는 뜻이다.
반응형
'Data Engineering > 프로그래머스 데브코스' 카테고리의 다른 글
[DE 데브코스] 06.04 TIL - 트랜잭션과 Airflow 설치 (0) | 2023.06.04 |
---|---|
[DE 데브코스] 05.25 TIL - Redshift 심화, Fact/Dimension/외부테이블과 View 차이점 등 (0) | 2023.05.25 |
[DE 데브코스] 05.23 TIL - Redshift, Serverless, ELT 예시 (0) | 2023.05.24 |
[DE 데브코스] 05.22 TIL - 데이터 팀에 대하여, DevOps 등 (0) | 2023.05.22 |
[DE 데브코스] 05.19 TIL - Lambda, Docker (0) | 2023.05.19 |