Pools

동시에 많은 작업이 실행 되면 Airflow 클러스터의 Worker 리소스가 부족하여 문제가 발생 할 수 있습니다. Airflow는 사용자가 효율적으로 리소스를 제어 및 관리하기 위해 Pool을 정의하여 작업의 병렬 처리 수행을 제한 시킬 수 있습니다. 관리자는 Airflow UI의 Admins->Pools 화면에서 Pool을 구성하여 사용할 수 있는 Worker slots을 설정하고 작업을 Pool에 할당하여 효과적으로 리소스 사용을 제어 합니다.

  1. Airflow UI에서 Admin->Pools 화면으로 가면 다음과 같이 default_pool 하나가 존재합니다. Pools

  2. 새로운 Pool을 생성 하겠습니다. (+) 버튼 클릭 하십시오. Pools

  3. pool1은 3 slots, pool2는 1 slots을 할당하여 두 개의 pool을 생성 하십시오. Slots은 작업을 처리하기 위해 Pool이 사용 할 수 있는 Worker의 개 수를 의미합니다. Pools Pools Pools

  4. Cloud9 workspace로 다음 스크립트를 copy/paste 하여 pool_DAG.pydags 폴더에 생성 하십시오.

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "airflow",
    "start_date": days_ago(1),
}

dag = DAG(
    "pool_DAG", 
    default_args=default_args,
    schedule_interval=None,
)

t1 = BashOperator(
    task_id="task1", 
    bash_command="sleep 5", 
    pool="pool1", 
    dag=dag
)

t2 = BashOperator(
    task_id="task2", 
    bash_command="sleep 5", 
    pool="pool1", 
    dag=dag
)

t3 = BashOperator(
    task_id="task3", 
    bash_command="sleep 5", 
    pool="pool2", 
    dag=dag
)

t4 = BashOperator(
    task_id="task4", 
    bash_command="sleep 5", 
    pool="pool2", 
    dag=dag
)
  1. DAG 스크립트를 살펴보면 4개의 BashOperator의 작업이 있는 데 각 각 5초동안 sleep하는 작업 입니다. 그리고 작업 간의 종속 관계가 없는 독립적인 작업입니다. 각 작업은 pool 파라미터를 통해 pool에 할당 됩니다. 따라서 t1과 t2는 pool1, t3과 t4 작업 인스턴스는 pool2에 할당 되었습니다. Pools

  2. Cloud9 workspace의 terminal에서 다음 AWS CLI 명령어를 실행하여 새로 생성 된 DAG를 Airflow S3 bucket의 dags 폴더로 업로드 하십시오.

CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.

cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/

Pools

  1. Airflow UI를 reload 하면 pool_DAG가 DAG 목록에 나타납니다. Pools

  2. Graph View로 가서 toggle switch의 상태를 On으로 변경하고 수동으로 DAG를 트리거 하십시오. 그리고 작업의 상태를 확인하기 위해 즉시 Graph View에서 refresh 버튼을 여러 번 클릭하십시오. 그러면 다음 화면과 같이 task1, task2와 task3이 먼저 트리거 되고 task4는 잠시 동안 스케줄링 상태로 남아 있습니다. Pools

작업의 종속성이 없는데도 불과하고 task4가 동시에 트리거 되지 않는 이유는 task1과 task2는 pool1에 할당 되었고 pool1은 동시에 3개의 worker 리소스를 사용할 수 있습니다. 반면에 task3과 task4는 pool2에 할당 되었고 동시에 1개의 worker 리소스만 사용할 수 있기 때문에 이 경우에 task3이 먼저 트리거 되어 작업 리소스를 릴리스 할 때까지 task4는 대기해야만 하는 것 입니다.

  1. DAG를 트리거 하고 즉시 Pools 화면으로 가서 확인하면 Pool1은 2개, Pool2는 1개의 작업이 Queue Slots에 대기하고 있는 걸 볼 수 있습니다. 워낙 빠르게 상태가 변경 되기때문에 확인을 원하실 경우 두 개의 Airflow UI 창을 열어 놓고 보는 걸 권장 드립니다. Pools

  2. 작업의 Pool 파라미터와 함께 priority_weight 파라미터를 추가 설정하면 할당 된 Pool에서 Task의 수행 우선 순위를 조정 할 수 있습니다. 기본 priority_weight 값은 1 이며 숫자가 높으면 우선 순위 또한 높아 집니다. Task4의 priority_weight 값을 2로 설정하여 DAG를 트리거 해 보겠습니다. DAG 스크립트의 task4 코드를 다음과 같이 변경 한 다음 Airflow S3 bucket의 dags 폴더로 업로드 하십시오.

이 전 DAG run에서 Airflow스케줄러의 의해 Task4가 Task3 보다 먼저 실행 되었던 경우라면 task4가 아닌 task3의 코드를 대신 변경 하십시오.

t4 = BashOperator(
    task_id="task4", 
    bash_command="sleep 5", 
    pool="pool2",
    priority_weight=2,
    dag=dag
)
  1. 변경 된 스크립트가 Airflow UI에 잘 반영 되었는 지 Code View를 통해 확인 하십시오. 바로 반영 되지 않았을 경우 Refresh 버튼을 클릭하고 브라우저를 reload 하십시오. Pools

  2. DAG를 수동으로 트리거 한 다음 바로 Graph View를 통해 확인하면 다음 그림과 같이 task4가 pool2의 작업 우선 순위로 인해 task3 보다 먼저 트리거 되고 task3는 스케줄링 상태에 머물고 있는 걸 볼 수 있습니다. Pools

  3. Pool2의 Slots0으로 변경하고 DAG를 트리거하면 어떻게 될까요? Pools

  4. 예상대로 Pool2에 가용한 slots이 없기 때문에 task3과 task4 모두 트리거 되지 못하고 스케줄링 상태에 머물게 됩니다. Pools

  5. Pool2의 Slots을 이번에 2로 업데이트하면 그제서야 task3과 task4는 동시에 트리거 됩니다. Pools

  6. Gantt View에서 task3와 task4가 트리거 된 시점이 Pool slots을 2로 변경 한 직후 인 것을 확인 할 수 있습니다. Pools