이번 실습은 몇 가지 기본적인 Airflow 개념에 대해 알아 보고 첫 번째 파이프라인을 작성해 보겠습니다. Airflow는 Python programming을 통해 DAG (Directed Acyclic Graph)로 작업을 정의하여 Workflow를 생성합니다. 즉, Workflow는 영원히 루프 (Loop)에 구속되지 않아야 하기 때문에 비 순환성 방향성 그래프 구조로 구성합니다.
Airflow 파이프라인의 Python 스크립트는 Airflow DAG 오브젝트를 정의하기 위해 필요한 library 모듈을 Import 해야 합니다.
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
DAG 오브젝트와 몇 가지 작업 (task)을 만들기 전에 각 작업을 설정하기 위해 필요한 default DAG 설정을 Python dictionary로 미리 구성하여 사용할 수 있습니다
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
‘owner’: ‘airflow’,
‘depends_on_past’: False,
‘email’: [‘airflow@example.com’],
‘email_on_failure’: False,
‘email_on_retry’: False,
‘retries’: 1,
‘retry_delay’: timedelta(minutes=5)
}
작업을 만들기 전에 DAG 오브젝트를 생성해야 합니다. DAG 오브젝트는 DAG의 고유 식별자 역할을 하는 dag_id (e.g. “Basic_DAG”)을 포함하여 몇몇 기본 설정을 정의할 수 있습니다.
dag = DAG(
'Basic_DAG',
default_args=default_args,
description='A basic DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
)
작업은 Airflow Operator 오브젝트를 인스턴스화하여 생성합니다. 즉, Operator에서 인스턴스화 된 오브젝트를 작업이라고 합니다. Operator의 첫번째 인수인 task_id는 작업의 고유 식별자 역할을 합니다. 다음 t1 및 t2 작업은 BashOperator를 통해 생성 된 작업이며, 각 각 Bash shell에서 bash_command에 정의 된 명령어를 실행합니다.
t1 = BashOperator(
task_id=’print_date’,
bash_command=’date’,
dag=dag,
)
t2 = BashOperator(
task_id=’sleep’,
depends_on_past=False,
bash_command=’sleep 5’,
retries=3,
dag=dag,
)
Bash Operator는 모든 Operator의 기반이 되는 BaseOperator로부터 상속 된 공통적인 인수 (e.g. retries)와 BashOperator에서 제공하는 특정 인수 (e.g. bash_command)를 조합하여 작업을 구성합니다. BaseOperator의 인수와 그 기능에 대한 자세한 내용은 airflow.models.BaseOperator 문서를 참조하십시오.
작업에 인수가 적용되는 우선 순위는 다음과 같습니다.
이번에는 PythonOperator를 사용해서 작업을 구성해 보겠습니다. PythonOperator는 python_callable 인수에 정의 된 print_hello() 함수를 실행합니다.
def print_hello():
return “Hello world!”
t3 = PythonOperator(
task_id=’hello_task’,
python_callable=print_hello,
dag=dag
)
Airflow는 Jinja templating의 기능을 사용하여 파이프라인 작업을 구성하기 위해 built-in parameter 및 macros를 활용 할 수 있습니다. 또한, 파이프라인 작성자가 Airflow의 Hooks 기능을 통해 자체적으로 parameter, macros 및 template를 직접 커스텀 개발하여 사용할 수도 있습니다.
이 가이드는 Airflow에서 template으로 수행 할 수 있는 작업에 대해 간략히 설명하여 이러한 기능이 존재 함을 여러분들이 인지하는 것이 목표 입니다.
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t4 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
templated_command는 코드 로직을 {% %} 블록에 포함하고, {{ds}}와 같은 파라미터를 참조하고, {{macros.ds_add(ds, 7)}}와 같은 함수를 호출하고, {{params.my_param}}와 같은 BashOperator의 정의 된 user-defined 파라미터를 참조 할 수 있습니다.
Jinja Template에 대한 자세한 내용은 Jinja Template 문서를 참고하십시오. 그리고 Jinja Template에서 참조할 수 있는 parameter 및 Macros에 대한 자세한 내용은 Macros 문서를 참고 하십시오.
앞서 서로 종속성이 없는 t1, t2, t3 작업을 정의하였습니다. 다음은 Airflow에서 작업 간의 종속성을 정의하는 몇 가지 방법을 알아보겠습니다.
# This means that t2 will depend on t1
# and t3 depend on t2
# and t4 depend on t2
t1.set_downstream(t2)
t2.set_downstream(t3)
t2.set_downstream(t4)
# Or you can set it in opposite way
t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t2)
# Alternatively, you can use double bit shift operator to chain the tasks
t1 >> t2
t2 >> t3
t2 >> t4
# Or using the upstream dependency with the bit shift operator
t2 << t1
t3 << t2
t4 << t2
# A list of tasks can also be set as dependencies.
# These operations all have the same effect:
t1.set_downstream(t2)
t2.set_downstream([t3, t4])
t1 >> t2 >> [t3, t4]
[t3, t4] << t2 << t1
DAG를 실행 할 때 DAG에 루프 (loop)가 존재하거나, 작업의 종속성이 두 번 이상 참조 된 경우 Exception 에러가 발행합니다.
지금까지 기본적인 Airflow 개념을 바탕으로 빌드 업 해온 최종 코드는 다음과 같습니다.
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'Basic_DAG',
default_args=default_args,
description='A basic DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
def print_hello():
return 'Hello world!'
t3 = PythonOperator(
task_id='hello_task',
python_callable=print_hello,
dag=dag
)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t4 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
t1 >> t2 >> [t3, t4]
이제 완성 된 DAG를 실행 해 보기 위해 위 스크립트를 Cloud9 workspace로 copy/paste 하여 Python 파일 (.py)로 저장하고, Airflow S3 bucket의 dags 폴더로 업로드 하겠습니다.


basic_DAG.py으로 입력하고 dags 폴더에 저장하십시오.

CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.
cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/
5. basic_DAG.py 파일이 S3 bucket에 성공적으로 업로드 되었는 지 S3 콘솔에서도 확인 하십시오.
6. Airflow UI를 액세스하면 Basic_DAG가 화면에 나타납니다. DAG는 스케줄링이 켜져 있지 않은 Off 상태이며 On 상태로 변경하면 DAG 설정 되로 UTC 시간으로 매일 12:00am에 트리거 됩니다. DAG가 실행되면 Recent Tasks 컬럼과 DAG Runs 컬럼을 통해 Task 및 DAG의 수행 상태가 표시 됩니다. Links 컬럼에는 DAG의 작업 로그 및 다양한 View를 볼 수 있는 기능이 제공됩니다.
아래 스크린샷은 Airflow v1.10.12 입니다. v2.0.2에서는 UI가 개선되어 조금 세련 된 느낌을 제공하지만 전반적으로 기능과 네비게이션 경험 크게 다르지 않습니다. 만약 메뉴/기능을 찾는 데 어려움이 있다면 강사에게 문의 해 주십시오.

v1.10.12:

v2.0.2:

DAG 스크립트에서 정의 한 대로 작업의 종속성 (dependency) 및 수행 순서대로 workflow가 정렬 되어 있습니다. 그리고 왼쪽 상단과 오른쪽 상단에 밑줄로 표신 된 부분에는 각 작업이 어떤 Operator의 작업이고 어떤 상태인지 표시합니다. 즉, print_date, sleep 및 templated 작업은 BashOperator의 작업이고, hello_task는 PythonOperator의 작업 임을 나타냅니다. 현재 모든 작업은 실행 된 적이 없는 no_status 상태 입니다.

다음은 Code View를 클릭 하십시오. Code View는 Airflow에서 현재 사용 중인 DAG 스크립트를 확인 할 수 있습니다. DAG 스크립트는 Code View에서 변경 할 수 없는 read-only 입니다.

DAG 스크립트를 변경하고 Airflow에 반영해 보겠습니다. Cloud9에서 basic_DAG.py 스크립트 첫번째 줄에 주석(comment)을 추가하고 저장하십시오. 그리고 변경 된 스크립트를 Airflow S3 bucket의 dags 폴더로 다시 업로드 하십시오.

Airflow UI에 Code View로 돌아와서 Refresh를 클릭하십시오. 그리고 첫번째 줄에 추가 한 주석이 Code View에 반영 되었는 지 확인 하십시오.
Airflow의 DAG 동기화 주기로 인해 즉시 업데이트 되지 않을 수도 있으니 바로 반영 안될 경우 Refresh 버튼을 여러 번 시도해 보십시오.
v1.10.12:

v2.0.2:

Airflow v2.0.2에는 Auto-refresh 기능이 UI에 추가되어 해당 기능을 활성화만하면 수동으로 Refresh를 클릭하지 않아도 됩니다.
v1.10.12:

v2.0.2:









