Data Engineering/Airflow
[Airflow] python operator - 템플릿 사용이 가능한 파라미터
홍문관카페나무
2023. 11. 19. 15:00
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html
airflow.operators.python — Airflow Documentation
airflow.apache.org
파이썬 오퍼레이터 docs를 들어가보면
아래와 같이 (templated)라고 붙은 Parameter가 있는 것을 볼 수 있다.
해당 키워드가 붙으면, Jinja Template과 같은 템플릿을 사용할 수 있는 파라미터다.
op_args, op_kwargs는 (templated) 태그가 붙어있지 않지만,
아래 template_fields에 해당 파라미터들이 포함되어 있기 때문에, 사실상 템플릿을 사용할 수 있다.
다음은 예시 코드다.
from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
with DAG(
dag_id = 'dags_python_template',
schedule = "0 15 * * *",
start_date = pendulum.datetime(2023, 11, 1, tz='Asia/Tokyo'),
catchup = False
) as dag:
def python_function1(start_date, end_date, **kwargs):
print(start_date)
print(end_date)
python_t1 = PythonOperator(
task_id = 'python_t1',
python_callable=python_function1,
op_kwargs = {'start_date':'{{data_interval_start | ds}}', 'end_date' : '{{data_interval_end | ds}}'}
)
그리고 파이썬 오퍼레이터는 **kwargs에서 원하는 템플릿을 지정하여 사용할 수 있다.
task decorator를 이용하여 사용하면 되는데, 다음 예시코드를 참조하면 된다.
@task(task_id='python_t2')
def python_function2(**kwargs):
print(kwargs)
print('ds:' + kwargs['ds'])
print('ts:' + kwargs['ts'])
print('data_interval_start: ' + str(kwargs['data_interval_start']))
print('data_interval_end: ' + str(kwargs['data_interval_end']))
print('task instance: '+ str(kwargs['ti']))
python_task_2 = python_function2()
반응형