각 Sub DAG에 구성 한 작업을 기반으로 Main DAG의 파이프라인을 완성 합니다.
chinook_DAG.py를 생성 하십시오.from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors.celery_executor import CeleryExecutor
from chinookSubDAG.chinook_ods_subDAG import transfer_rds_to_s3_subdag, create_ods_table_subdag
from chinookSubDAG.chinook_dwh_subDAG import create_dwh_dim_table_subdag, create_dwh_fact_table_subdag
from chinookSubDAG.chinook_redshift_subDAG import load_redshift_table_subdag
from airflow.utils.dates import days_ago
# DAG default configurations
default_args = {
"owner" : "airflow",
"start_date" : days_ago(1)
}
DAG_NAME="chinook_DAG"
with DAG(dag_id=DAG_NAME,
schedule_interval=None,
default_args=default_args,
catchup=False
) as dag:
# ODS task
glue_transfer_rds_to_s3 = SubDagOperator(
task_id='glue_transfer_rds_to_s3',
subdag=transfer_rds_to_s3_subdag(DAG_NAME, 'glue_transfer_rds_to_s3', default_args),
executor=CeleryExecutor()
)
athena_create_ods_table = SubDagOperator(
task_id='athena_create_ods_table',
subdag=create_ods_table_subdag(DAG_NAME, 'athena_create_ods_table', default_args),
executor=CeleryExecutor()
)
# DWH task
athena_create_dwh_dim_table = SubDagOperator(
task_id='athena_create_dwh_dim_table',
subdag=create_dwh_dim_table_subdag(DAG_NAME, 'athena_create_dwh_dim_table', default_args),
executor=CeleryExecutor(),
trigger_rule="all_success"
)
emr_create_dwh_fact_table = SubDagOperator(
task_id='emr_create_dwh_fact_table',
subdag=create_dwh_fact_table_subdag(DAG_NAME, 'emr_create_dwh_fact_table', default_args),
executor=CeleryExecutor(),
trigger_rule="all_success"
)
# Redshift task
load_redshift_table = SubDagOperator(
task_id='load_redshift_table',
subdag=load_redshift_table_subdag(DAG_NAME, 'load_redshift_table', default_args),
executor=CeleryExecutor(),
trigger_rule="all_success"
)
# Execution dependency
glue_transfer_rds_to_s3 >> \
athena_create_ods_table >> [athena_create_dwh_dim_table, \
emr_create_dwh_fact_table] >> load_redshift_table
스크립트 구성을 살펴 보겠습니다. Main DAG에 필요한 Airflow library를 import 합니다. 특히 SubDAG 파일에 구성 한 작업을 Main DAG에서 사용하려면 Airflow SubDagOperator library와 SubDAG 파일의 각 DAG 함수를 Python package로 import 해야 합니다. 그리고 Main DAG 오브젝트를 구성하기 위해 필요한 기본 설정 (default_args)과 DAG 이름을 정의합니다.

다음은 DAG 오브젝트를 생성합니다. DAG 오브젝트의 작업은 SubDagOperator를 통해 정의합니다. 각 SubDagOperator의 작업은 ODS, DW 및 Redshift subDAG 스크립트에 작성 한 Python 함수를 subdag 파라미터에 정의하여 사용합니다. 그리고 SubDAG의 작업이 병렬 프로세싱 될 수 있게 executor 파라미터를 CeleryExecutor로 정의 합니다. 또한 일부 작업의 trigger_rule을 all_success로 설정하여 이 전 SubDAG의 작업이 모두 성공했을 경우에만 트리거 되도록 설정 합니다. 최종적으로 workflow의 작업 수행 순서가 어떻게 정의 되었는 지 확인 하십시오.
지금까지 수행했던 실습에서는 DAG 오브젝트를 생성하면 각 Operator의 작업 별로 dag 파라미터를 정의해야 했는 데, DAG 오브젝트 생성시 With as 절을 사용하고 그 밑에 Airflow 작업 정의를 indentation하여 작성하면 dag 파라미터를 제공 할 필요가 없어 코딩 작업을 줄 일수 있습니다.

CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.
cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/
