Main DAG

Sub DAG에 구성 한 작업을 기반으로 Main DAG의 파이프라인을 완성 합니다.

  1. 다음 스크립트를 Cloud9 workspacecopy/paste하여 dags 폴더 안에 chinook_DAG.py를 생성 하십시오.
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors.celery_executor import CeleryExecutor
from chinookSubDAG.chinook_ods_subDAG import transfer_rds_to_s3_subdag, create_ods_table_subdag
from chinookSubDAG.chinook_dwh_subDAG import create_dwh_dim_table_subdag, create_dwh_fact_table_subdag
from chinookSubDAG.chinook_redshift_subDAG import load_redshift_table_subdag
from airflow.utils.dates import days_ago

# DAG default configurations
default_args = {
    "owner" : "airflow",
    "start_date" : days_ago(1)
}

DAG_NAME="chinook_DAG"

with DAG(dag_id=DAG_NAME, 
        schedule_interval=None, 
        default_args=default_args, 
        catchup=False
    ) as dag:
    
    # ODS task
    glue_transfer_rds_to_s3 = SubDagOperator(
        task_id='glue_transfer_rds_to_s3',
        subdag=transfer_rds_to_s3_subdag(DAG_NAME, 'glue_transfer_rds_to_s3', default_args),
        executor=CeleryExecutor()
    )

    athena_create_ods_table = SubDagOperator(
        task_id='athena_create_ods_table',
        subdag=create_ods_table_subdag(DAG_NAME, 'athena_create_ods_table', default_args),
        executor=CeleryExecutor()
    )   
    
    # DWH task
    athena_create_dwh_dim_table = SubDagOperator(
        task_id='athena_create_dwh_dim_table',
        subdag=create_dwh_dim_table_subdag(DAG_NAME, 'athena_create_dwh_dim_table', default_args),
        executor=CeleryExecutor(),
        trigger_rule="all_success"
    )
   
    emr_create_dwh_fact_table = SubDagOperator(
        task_id='emr_create_dwh_fact_table',
        subdag=create_dwh_fact_table_subdag(DAG_NAME, 'emr_create_dwh_fact_table', default_args),
        executor=CeleryExecutor(),
        trigger_rule="all_success"
    )
    
    # Redshift task
    load_redshift_table = SubDagOperator(
        task_id='load_redshift_table',
        subdag=load_redshift_table_subdag(DAG_NAME, 'load_redshift_table', default_args),
        executor=CeleryExecutor(),
        trigger_rule="all_success"
    )
    
    # Execution dependency
    glue_transfer_rds_to_s3 >> \
                athena_create_ods_table >> [athena_create_dwh_dim_table, \
                                                           emr_create_dwh_fact_table] >> load_redshift_table
  1. 스크립트 구성을 살펴 보겠습니다. Main DAG에 필요한 Airflow library를 import 합니다. 특히 SubDAG 파일에 구성 한 작업을 Main DAG에서 사용하려면 Airflow SubDagOperator librarySubDAG 파일의 각 DAG 함수를 Python package로 import 해야 합니다. 그리고 Main DAG 오브젝트를 구성하기 위해 필요한 기본 설정 (default_args)과 DAG 이름을 정의합니다. MainDag

  2. 다음은 DAG 오브젝트를 생성합니다. DAG 오브젝트의 작업은 SubDagOperator를 통해 정의합니다. 각 SubDagOperator의 작업은 ODS, DW 및 Redshift subDAG 스크립트에 작성 한 Python 함수를 subdag 파라미터에 정의하여 사용합니다. 그리고 SubDAG의 작업이 병렬 프로세싱 될 수 있게 executor 파라미터를 CeleryExecutor로 정의 합니다. 또한 일부 작업의 trigger_rule을 all_success로 설정하여 이 전 SubDAG의 작업이 모두 성공했을 경우에만 트리거 되도록 설정 합니다. 최종적으로 workflow의 작업 수행 순서가 어떻게 정의 되었는 지 확인 하십시오.

지금까지 수행했던 실습에서는 DAG 오브젝트를 생성하면 각 Operator의 작업 별로 dag 파라미터를 정의해야 했는 데, DAG 오브젝트 생성시 With as 절을 사용하고 그 밑에 Airflow 작업 정의를 indentation하여 작성하면 dag 파라미터를 제공 할 필요가 없어 코딩 작업을 줄 일수 있습니다.

MainDag

  1. Main DAG3개의 sub DAG 파일이 모두 준비 된 상태입니다. 이제 모든 DAG 파일을 Airflow S3 bucket의 dags 폴더로 업로드 해야합니다. Cloud9 workspace의 terminal에서 다음 AWS CLI 명령어를 실행하여 모든 DAG 파일을 S3로 업로드 하십시오.

CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.

cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/

MainDag