기본적으로 Workflow는 직전 업스트림 작업이 모두 성공해야만 그 다음 작업을 트리거하도록 동작합니다. Airflow는 더 복잡한 종속성(dependency) 설정을 구성 할 수 있습니다. 즉, 모든 Operators는 trigger_rule 파라미터를 통해 작업을 트리거 하기 위한 규칙을 설정할 수 있습니다. trigger_rule의 기본 값은 all_success이며, 즉 직전 업스트림 작업이 모두 성공한 경우에만 다음 작업을 트리거 한다는 의미입니다. 그 밖에도 작업에 적용할 수 있는 여러가지 trigger rule 이 다음과 같이 제공 됩니다.
Trigger_rule을 적용해서 DAG가 어떻게 동작하는 지 실습 해 보겠습니다.
trigger_rule_DAG.py 파일을 dags 폴더에 생성 하십시오.# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
"owner": "airflow",
"start_date": days_ago(1),
}
dag = DAG(
"trigger_rule_DAG",
default_args=default_args,
schedule_interval=None,
)
t1 = BashOperator(
task_id="sleep5",
bash_command="sleep 5",
dag=dag
)
t2 = BashOperator(
task_id="sleep10",
bash_command="sleep 10",
dag=dag
)
t3 = BashOperator(
task_id='sleep15',
bash_command="sleep 15",
dag=dag
)
t4 = BashOperator(
task_id='final_task',
bash_command='echo DONE!',
trigger_rule='all_success',
dag=dag
)
[t1, t2, t3] >> t4
DAG 스크립트는 각 5초, 10초, 15초 동안 sleep 하도록 구성 된 작업 (sleep5, sleep10, sleep15)을 동시에 트리거 하며 마지막에 final_task를 트리거 하도록 설정 된 Workflow 입니다. final_tasks의 trigger_rule이 기본 값인 all_success로 설정 된 것을 확인 하십시오.

새로 생성 한 DAG를 Airflow S3 bucket의 dags 폴더로 업로드 해야 합니다. Cloud9 workspace의 terminal에서 다음 AWS CLI 명령어를 실행하여 DAG 파일을 S3로 업로드 하십시오.
CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.
cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/

Airflow UI를 reload 하면 trigger_rule_DAG가 DAG 목록에 나타납니다.

Graph View로 가서 toggle switch 상태를 On으로 변경하고 수동으로 DAG를 트리거 하십시오. 그리고 즉시 Graph View의 workflow 상태를 확인하면 sleep5, sleep10, sleep15 작업이 동시에 트리거 되는 걸 볼 수 있습니다.

모든 sleep 작업이 성공 한 후에 final_task가 트리거 됩니다. 이유는 final_task의 trigger_rule이 all_success로 설정 되었기 때문입니다.
t2 = BashOperator(
task_id="sleep10",
bash_command="This task will fail!!",
dag=dag
)
final_task는 parent 작업 하나의 실패만으로 트리거 되지 못하고 upstream_failed 상태가 됩니다.

다음은 final_task의 trigger_rule을 one_success로 변경하고 DAG를 다시 트리거 해 보십시오.
t4 = BashOperator(
task_id='final_task',
bash_command='echo DONE!',
trigger_rule='one_success',
dag=dag
)

final_task는 parent 작업 중에 한 개만 성공해도 바로 트리거 됩니다. 이유는 final_task의 trigger_rule이 one_success로 설정 되었기 때문입니다.