그러나 Flow가 복잡하다면 Task 관리 도구 (oozie, luigi, airflow 등)을 사용해 관리할 수 있습니다
본 문서에선 Airflow를 활용한 내용을 알려드리겠습니다
(왼쪽부터 설명드리자면)
In [1]:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
# start_date를 현재날자보다 과거로 설정하면, backfill(과거 데이터를 채워넣는 액션)이 진행됩니다
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 10, 1),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill', # Only celery option
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
# dag 객체 생성
dag = DAG('test', description='First DAG',
schedule_interval = '55 14 * * *',
default_args=default_args)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
# BashOperator를 사용
# task_id는 unique한 이름이어야 합니다
# bash_command는 bash에서 date를 입력한다는 뜻
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
# set_upstream은 t1 작업이 끝나야 t2가 진행된다는 뜻
t2.set_upstream(t1)
# t1.set_downstream(t2)와 동일한 표현입니다
# t1 >> t2 와 동일 표현
t3.set_upstream(t1)
위 소스를 [Airflow Home]/dags/ 에 test.py로 저장해주세요!
DAGs는 각 Workflow를 뜻하고, Operator는 DAG 내에서 정의되는 작업 함수입니다. Operator가 DAG에서 호출되는 것이 Task입니다
airflow list_dags
- airflow의 dags 폴더 아래에 *.py 파일을 넣은 후, 위 명령어를 입력하면 DAGs의 리스트를 알 수 있습니다
- 여기에 나오는 dags의 이름은 코드에서 DAG 객체를 생성할 때 넣은 이름이 나타납니다
airflow list_tasks test
- test라는 dags안에 있는 tasks의 리스트를 알 수 있습니다
airflow list_tasks test --tree
- test라는 dags안에 있는 tasks를 tree 형태로 알 수 있습니다
airflow test [DAG id] [Task id] [date]
예시) airflow test test print_date 2017-10-01
- DAG의 Task 단위로 test해볼 수 있습니다
airflow scheduler
- Test를 모두 완료한 후, 스케쥴러를 실행해줍니다. DAG 코드에 정의된 스케쥴에 따라 실행해줍니다
airflow -h
- airflow 관련 help 명령어입니다
In [8]:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 10, 1),
'email': ['yourmail@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': False,
'retry_delay': timedelta(minutes=2),
}
dag = DAG('airflow_bigquery', default_args=default_args)
t1 = BigQueryOperator(
task_id='bigquery_test',
bql='SELECT COUNT(vendor_id) FROM [nyc-tlc:yellow.trips]',
destination_dataset_table='temp.airflow_test1',
bigquery_conn_id='bigquery_default',
delegate_to=True,
udf_config=False,
dag=dag,
)
def print_hello():
return 'Hello Airflow'
t3 = PythonOperator(
task_id='python_operator',
python_callable = print_hello,
dag = dag)
t4 = BigQueryOperator(
task_id='bigquery_test2',
bql='SELECT COUNT(vendor_id) as user FROM [nyc-tlc:yellow.trips]',
destination_dataset_table='temp.airflow_test2',
bigquery_conn_id='bigquery_default',
delegate_to=True,
udf_config=False,
dag=dag,
)
t1 >> t3 >> t4
Out[8]: