때로는 Workflow의 업스트림 작업과 관련 된 임의의 조건에 따라 특정 경로의 작업을 뛰어넘는 branching 기능이 필요 할 수 있습니다. Airflow에서 이를 쉽게 수행하는 방법은 BranchPythonOperator를 사용하는 것입니다. BranchPythonOperator는 PythonOperator와 유사하게도 python_callable의 정의 된 함수를 호출하지만 그 함수가 반환하는 task_id(s)를 통해 일치하는 작업 경로로 branching 되어 작업을 수행합니다. 그럼 branching 기능을 사용하는 방법을 실습 해 보겠습니다.
branch_DAG.py 파일을 dags 폴더에 생성 하십시오.# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.dates import days_ago
default_args = {
"owner": "airflow",
"start_date": days_ago(1),
}
dag = DAG(
"branch_DAG",
default_args=default_args,
schedule_interval=None,
)
def push_function(**kwargs):
pushed_value = 0
ti = kwargs['ti']
ti.xcom_push(key="pushed_value", value=pushed_value)
def branch_function(**kwargs):
ti = kwargs['ti']
pulled_value = int(ti.xcom_pull(key='pushed_value', task_ids='start_task'))
if pulled_value == 0:
return 'stop_task'
else:
return 'continue_task'
t1 = PythonOperator(
task_id='start_task',
python_callable=push_function,
provide_context=True,
dag=dag
)
t2 = BranchPythonOperator(
task_id='branch_task',
python_callable=branch_function,
provide_context=True,
dag=dag
)
t3 = BashOperator(
task_id='stop_task',
bash_command='echo "Executing stop_task...."',
dag=dag
)
t4 = BashOperator(
task_id='continue_task',
bash_command='echo "Executing continue_task...."',
dag=dag
)
t1 >> t2 >> [t3, t4]
DAG 스크립트에서 새로운 부분은 BranchPythonOperator 입니다. BranchPythonOperator는 python_calllable에 정의 한 branch_function()을 호출하며, branch_function()은 start_task에서 push한 XCom 값을 기반으로 stop_task 또는 continue_task 문자열의 task_id를 반환합니다. BranchPythonOperator는 반환 된 task_id와 일치하는 작업으로 이동하여 해당 작업을 수행 합니다.

새로 생성 한 DAG를 Airflow S3 bucket의 dags 폴더로 업로드 해야 합니다. Cloud9 workspace의 terminal에서 다음 AWS CLI 명령어를 실행하여 DAG 파일을 S3로 업로드 하십시오.
CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.
cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/

Airflow UI를 reload 하면 branch_DAG가 DAG 목록에 나타납니다.

Graph View로 가서 Workflow를 확인 하십시오. 그리고 DAG를 트리거 하기 전에 branch_task 이 후에 stop_task와 continue_task 중 어떤 작업이 트리거 될지 예상해 보십시오.

Graph View에서 toggle switch 상태를 On으로 변경하고 수동으로 DAG를 트리거 하십시오. 어떤 작업이 마지막에 트리거 되었나요?

DAG 스크립트의 push_function()을 보면 XComs로 push 한 값이 0이며, branch_function()은 값이 0 일 경우 task_id로 stop_task를 반환합니다. 따라서, branch 조건에 따라 stop_task가 트리거되고 continue_task는 스킵 되는 것 입니다.
XComs 화면으로 가서 생성 된 XCom을 확인 해 보십시오. branch_task가 어떤 task_id를 반환하였나요?

[Optional] DAG 스크립트의 push_function()에서 pushed_value 변수를 1로 변경하고 DAG를 다시 실행해 보십시오. 이번에는 어떤 결과가 나오나요?
