Trigger Rules

기본적으로 Workflow는 직전 업스트림 작업이 모두 성공해야만 그 다음 작업을 트리거하도록 동작합니다. Airflow는 더 복잡한 종속성(dependency) 설정을 구성 할 수 있습니다. 즉, 모든 Operatorstrigger_rule 파라미터를 통해 작업을 트리거 하기 위한 규칙을 설정할 수 있습니다. trigger_rule의 기본 값은 all_success이며, 즉 직전 업스트림 작업이 모두 성공한 경우에만 다음 작업을 트리거 한다는 의미입니다. 그 밖에도 작업에 적용할 수 있는 여러가지 trigger rule 이 다음과 같이 제공 됩니다.

  • all_success: (default) parent 작업이 모두 성공 한 경우
  • all_failed: parent 작업이 모두 실패 한 경우
  • all_done: parent 작업이 모두 실행 완료를 한 경우
  • one_failed: parent 작업 중 최소 1개가 실패 한 경우
  • one_success: parent 작업 중 최소 1개가 성공 한 경우
  • none_failed: parent 작업 중 실패가 없고 성공 또는 스킵 상태 인 경우
  • none_failed_or_skipped: parent 작업 중 실패 또는 스킵 상태가 없고, 최소 1개가 성공 한 경우
  • none_skipped: parent 작업 중 스킵 상태가 없는 경우

Trigger_rule을 적용해서 DAG가 어떻게 동작하는 지 실습 해 보겠습니다.

  1. Cloud9 workspace로 다음 스크립트를 copy/paste 하여 trigger_rule_DAG.py 파일을 dags 폴더에 생성 하십시오.
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "airflow",
    "start_date": days_ago(1),
}

dag = DAG(
    "trigger_rule_DAG", 
    default_args=default_args,
    schedule_interval=None,
)

t1 = BashOperator(
    task_id="sleep5", 
    bash_command="sleep 5",
    dag=dag
)

t2 = BashOperator(
    task_id="sleep10", 
    bash_command="sleep 10",
    dag=dag
)

t3 = BashOperator(
    task_id='sleep15', 
    bash_command="sleep 15",
    dag=dag
)

t4 = BashOperator(
    task_id='final_task',
    bash_command='echo DONE!',
    trigger_rule='all_success',
    dag=dag
)
[t1, t2, t3] >> t4
  1. DAG 스크립트는 각 5초, 10초, 15초 동안 sleep 하도록 구성 된 작업 (sleep5, sleep10, sleep15)을 동시에 트리거 하며 마지막에 final_task를 트리거 하도록 설정 된 Workflow 입니다. final_tasks의 trigger_rule이 기본 값인 all_success로 설정 된 것을 확인 하십시오. Trigger

  2. 새로 생성 한 DAG를 Airflow S3 bucket의 dags 폴더로 업로드 해야 합니다. Cloud9 workspace의 terminal에서 다음 AWS CLI 명령어를 실행하여 DAG 파일을 S3로 업로드 하십시오.

CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.

cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/

Trigger

  1. Airflow UI를 reload 하면 trigger_rule_DAG가 DAG 목록에 나타납니다. Trigger

  2. Graph View로 가서 toggle switch 상태를 On으로 변경하고 수동으로 DAG를 트리거 하십시오. 그리고 즉시 Graph View의 workflow 상태를 확인하면 sleep5, sleep10, sleep15 작업이 동시에 트리거 되는 걸 볼 수 있습니다.

Trigger

모든 sleep 작업이 성공 한 후에 final_task가 트리거 됩니다. 이유는 final_task의 trigger_rule이 all_success로 설정 되었기 때문입니다.

  1. 다음은 sleep10 작업이 실패하도록 DAG 스크립트의 bash_command를 다음과 같이 변경하고 DAG를 다시 트리거 해 보십시오.
t2 = BashOperator(
    task_id="sleep10", 
    bash_command="This task will fail!!",
    dag=dag
)
  1. final_task는 parent 작업 하나의 실패만으로 트리거 되지 못하고 upstream_failed 상태가 됩니다. Trigger

  2. 다음은 final_task의 trigger_rule을 one_success로 변경하고 DAG를 다시 트리거 해 보십시오.

t4 = BashOperator(
    task_id='final_task',
    bash_command='echo DONE!',
    trigger_rule='one_success',
    dag=dag
)

Trigger

final_task는 parent 작업 중에 한 개만 성공해도 바로 트리거 됩니다. 이유는 final_task의 trigger_rule이 one_success로 설정 되었기 때문입니다.

  1. [Optional] 다른 trigger_rule도 적용하여 테스트 해 보시길 바랍니다.