Airflow는 스케줄링 할 작업을 DAG 단위로 구분합니다. DAG를 구성하다보면 반복되는 패턴들이 존재하여 workflow가 overload 되어 복잡 또는 난잡 해 질 수 있습니다. SubDAG는 반복 패턴을 단조롭게 하는 Airflow 도구입니다. Airflow는 DAG 오브젝트를 Modular 하게 디자인하여 사용하는 패턴을 권장합니다.
예를 들면, 아래와 같은 DAG를 고려해 본다면:

모든 병렬 작업 Operator를 단일 SubDAG로 결합하여 다음과 같이 단조로워진 DAG가 되도록 만들 수 있습니다.

Airflow에서 SubDAG를 활용하는 방법을 실습 해 보겠습니다.
SubDAG는 DAG를 구성하는 방법과 동일하며, 단지 Python 함수를 통해 sub 작업을 포함하는 DAG 오브젝트를 Main DAG로 반환하도록 구현하는 방식입니다.
Cloud9 workspace로 가서 dags 폴더안에 mySubDAG 폴더를 생성 하십시오.
다음 스크립트를 copy/paste 하여 simple_subDAG.py 파일을 dags/mySubDAG 폴더에 생성 하십시오.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
def simple_subdag(parent_dag_id, child_dag_id, default_args):
subdag = DAG(
dag_id=f'{parent_dag_id}.{child_dag_id}',
default_args=default_args,
start_date=days_ago(1),
schedule_interval="@daily",
)
for i in range(5):
DummyOperator(
task_id='{}-task-{}'.format(child_dag_id, i + 1),
default_args=default_args,
dag=subdag,
)
return subdag
subDAG라고 해서 뭔가 특별한 작업이 필요 할 것 같지만 지금까지 봐왔던 DAG와 다르지 않습니다. 단지 subDAG 스크립트의 Python 함수가 Main DAG로부터 parent_dag_id와 child_dag_id 그리고 DAG의 기본 설정을 포함하는 default_args를 인자로 받아서 DAG 오브젝트를 생성하는 것입니다. 실습 코드는 For Loop를 통해 5개의 DummyOperator의 작업을 생성합니다. 그리고 최종적으로 함수의 return 값을 통해 구성 된 DAG 오브젝트를 Main DAG로 반환합니다.

Python runtime 환경에서 새로 생성 한 mySubDAG 폴더를 Python 패키지로 취급하기 위해서 mySubDAG 폴더 안에 __init__.py 파일을 생성 해야합니다. __init__.py은 내용 물이 없는 빈 파일이므로 그냥 새로운 파일을 생성해서 이름만 동일하게 변경해 주십시오.

다음은 subDAG를 사용 할 Main DAG를 생성하겠습니다. 다음 스크립트를 copy/paste 하여 main_DAG.py 파일을 dags 폴더에 생성 하십시오.
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from mySubDAG.simple_subDAG import simple_subdag
DAG_NAME = 'main_DAG'
default_args = {
'owner': 'Airflow',
'start_date': days_ago(1),
}
dag = DAG(
dag_id=DAG_NAME,
default_args=default_args,
schedule_interval=None
)
start = DummyOperator(
task_id='start',
dag=dag
)
section_1 = SubDagOperator(
task_id='section-1',
subdag=simple_subdag(DAG_NAME, 'section-1', default_args),
dag=dag
)
some_other_task = DummyOperator(
task_id='some-other-task',
dag=dag
)
section_2 = SubDagOperator(
task_id='section-2',
subdag=simple_subdag(DAG_NAME, 'section-2', default_args),
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag
)
start >> section_1 >> some_other_task >> section_2 >> end
Main DAG 스크립트는 Airflow library의 SubDagOperator와 subDAG를 구현 한 simple_subDAG.py 파일을 python package로 import 합니다. 그 밑에는 DAG 오브젝트를 생성하고 각 Airflow Operator를 통해 작업을 구성합니다. 반복 패턴을 가지고 있는 Section_1와 Section_2 작업은 SubDagOperator의 subdag 파라미터에 정의 된 simple_subdag 함수를 통해 작업을 구성합니다.

새로 생성 된 DAG와 subDAG 파일을 Airflow S3 bucket의 dags 폴더로 업로드 해야 합니다. Cloud9 workspace의 terminal에서 다음 AWS CLI 명령어를 실행하여 변경 된 파일을 S3로 업로드 하십시오.
CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.
cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/

Airflow UI를 reload 하면 main_DAG가 DAG 목록에 나타납니다.

Graph View로 가서 Workflow의 작업이 어떤 Operator의 작업인지 구분 해 보십시오. 즉, Section-1과 Section-2는 SubDagOperator의 작업이고 나머지는 DummyOperator의 작업입니다.

Graph View에서 section-1 작업을 선택하고 Zoom Into Sub DAG를 클릭하십시오.

Graph View에서 SubDAG 오브젝트에 구성 된 5개의 작업을 볼 수 있습니다.

Section-2 작업도 동일한 방법으로 확인 해 보십시오.

Main DAG의 Graph View에서 toggle switch의 상태를 On으로 변경하고 수동으로 DAG를 트리거 하십시오. 그리고 Main DAG의 작업이 Workflow 순서대로 트리거 되는 지 확인하십시오.

Workflow가 수행 되는 동안 section-1과 section-2의 sub 작업의 상태도 확인 해 보십시오.
