Branching

때로는 Workflow의 업스트림 작업과 관련 된 임의의 조건에 따라 특정 경로의 작업을 뛰어넘는 branching 기능이 필요 할 수 있습니다. Airflow에서 이를 쉽게 수행하는 방법은 BranchPythonOperator를 사용하는 것입니다. BranchPythonOperator는 PythonOperator와 유사하게도 python_callable의 정의 된 함수를 호출하지만 그 함수가 반환하는 task_id(s)를 통해 일치하는 작업 경로로 branching 되어 작업을 수행합니다. 그럼 branching 기능을 사용하는 방법을 실습 해 보겠습니다.

  1. Cloud9 workspace로 다음 스크립트를 copy/paste 하여 branch_DAG.py 파일을 dags 폴더에 생성 하십시오.
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "airflow",
    "start_date": days_ago(1),
}

dag = DAG(
    "branch_DAG", 
    default_args=default_args,
    schedule_interval=None,
)

def push_function(**kwargs):
    pushed_value = 0
    ti = kwargs['ti']
    ti.xcom_push(key="pushed_value", value=pushed_value)

def branch_function(**kwargs):
    ti = kwargs['ti']
    pulled_value = int(ti.xcom_pull(key='pushed_value', task_ids='start_task'))
    
    if pulled_value == 0:
        return 'stop_task'
    else:
        return 'continue_task'


t1 = PythonOperator(
    task_id='start_task', 
    python_callable=push_function, 
    provide_context=True,
    dag=dag
)

t2 = BranchPythonOperator(
    task_id='branch_task', 
    python_callable=branch_function, 
    provide_context=True,
    dag=dag
)

t3 = BashOperator(
    task_id='stop_task', 
    bash_command='echo "Executing stop_task...."',
    dag=dag
)

t4 = BashOperator(
    task_id='continue_task', 
    bash_command='echo "Executing continue_task...."',
    dag=dag
)

t1 >> t2 >> [t3, t4]
  1. DAG 스크립트에서 새로운 부분은 BranchPythonOperator 입니다. BranchPythonOperator는 python_calllable에 정의 한 branch_function()을 호출하며, branch_function()은 start_task에서 push한 XCom 값을 기반으로 stop_task 또는 continue_task 문자열의 task_id를 반환합니다. BranchPythonOperator는 반환 된 task_id와 일치하는 작업으로 이동하여 해당 작업을 수행 합니다. Branch

  2. 새로 생성 한 DAG를 Airflow S3 bucket의 dags 폴더로 업로드 해야 합니다. Cloud9 workspace의 terminal에서 다음 AWS CLI 명령어를 실행하여 DAG 파일을 S3로 업로드 하십시오.

CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.

cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/

Branch

  1. Airflow UI를 reload 하면 branch_DAG가 DAG 목록에 나타납니다. Branch

  2. Graph View로 가서 Workflow를 확인 하십시오. 그리고 DAG를 트리거 하기 전에 branch_task 이 후에 stop_task와 continue_task어떤 작업이 트리거 될지 예상해 보십시오. Branch

  3. Graph View에서 toggle switch 상태를 On으로 변경하고 수동으로 DAG를 트리거 하십시오. 어떤 작업이 마지막에 트리거 되었나요?

Branch

DAG 스크립트의 push_function()을 보면 XComs로 push 한 값이 0이며, branch_function()은 값이 0 일 경우 task_id로 stop_task를 반환합니다. 따라서, branch 조건에 따라 stop_task가 트리거되고 continue_task는 스킵 되는 것 입니다.

  1. XComs 화면으로 가서 생성 된 XCom을 확인 해 보십시오. branch_task가 어떤 task_id를 반환하였나요? Branch

  2. [Optional] DAG 스크립트의 push_function()에서 pushed_value 변수를 1로 변경하고 DAG를 다시 실행해 보십시오. 이번에는 어떤 결과가 나오나요? Branch