Xcoms

XComcross-communication을 의미하며 작업 간에 메시지 또는 소량의 데이터교환하기 위한 Airflow의 메커니즘입니다. XCom은 다음 Airflow UI 화면에서처럼 Key, Value 및 Timestamp 필드가 있는 작은 오브젝트로 생각 할 수 있습니다. XCom은 Admin->XComs를 통해 액세스 할 수 있습니다 Xcoms

위 화면을 보면 Basic_DAG의 hello_task에서 이미 XCom을 생성했던 흔적이 있습니다. XCom이 어떻게 동작하는지 실습을 통해 알아 보겠습니다.

  1. Cloud9 workspace에서 다음 스크립트를 copy/paste 하여 xcom_DAG.py 파일을 dags 폴더에 생성하십시오.
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "airflow",
    "start_date": days_ago(1),
}

dag = DAG(
    "xcom_DAG", 
    default_args=default_args,
    schedule_interval=None,
)

def push_function(**kwargs):
    message='pushed XCom message.'
    # ti is Task Instance in the provide_context
    ti = kwargs['ti']
    ti.xcom_push(key="message", value=message)

def pull_function(**kwargs):
    ti = kwargs['ti']
    message = ti.xcom_pull(key="message", task_ids='push_task') 
    print("Pulled Message: '%s'" % message)

t1 = PythonOperator(
    task_id='push_task',
    python_callable=push_function,
    provide_context=True,
    dag=dag)

t2 = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    provide_context=True,
    dag=dag)
    
t1 >> t2
  1. DAG 스크립트에는 2개의 Python 함수 (push_function, pull_function)가 구현되어 있습니다. 그리고 두 함수는 PythonOperator의 python_callable 파라미터를 통해 호출 됩니다. 작업은 명시적으로 xcom_push()와 xcom_pull() 메소드를 호출하여 XCom을 push 또는 pull 할 수 있습니다. Xcom 메소드는 작업 인스턴스 (Task Instance) 오브젝트를 통해서만 액세스 할 수 있으며, 본 실습에서 사용하는 Airflow 1.10.x 버전은 **kwargs argument을 통해 제공되는 context 정보에서 작업 인스턴스 오브젝트를 액세스 할 수 있습니다. xcom_push() 메소드는 XCom을 생성하기 위해 key와 value를 파라미터로 제공합니다. xcom_pull() 메소드는 XCom을 조회하기 위해 key와 task_id를 제공합니다. Xcoms

  2. 새로 생성 한 DAG를 Airflow S3 bucket의 dags 폴더로 업로드 해야 합니다. Cloud9 workspace의 terminal에서 다음 AWS CLI 명령어를 실행하여 DAG 파일을 S3로 업로드 하십시오.

CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.

cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/

Xcoms

  1. Airflow UI를 reload 하면 xcom_DAG가 DAG 목록에 나타납니다. Xcoms

  2. Graph View로 가서 toggle switch의 상태를 On으로 변경하고 수동으로 DAG를 트리거 하십시오.

v1.10.12: Xcoms

v2.0.2: Xcoms

  1. Workflow가 성공적으로 수행 되었다면 Airflow UI의 Admin->XComs로 가서 push_task 작업이 생성한 XCom을 확인 해 보십시오. Xcoms

  2. Graph View로 가서 pull_task로그를 확인해 보십시오. 성공적으로 수행 되었다면 pull_function이 출력한 XCom 메시지를 로그에서 확인 할 수 있습니다. Xcoms

생각해보면 Basic_DAG.py에서 xcom_push() 메소드가 사용 된 흔적을 발견 할 수 없는 데 어떻게 XCom (key=return_value, value=Hello world!)을 생성 했던 걸까요? 실제로 XCom을 생성하는 방법에는 몇 가지가 있습니다. 그 중에 하나는 명시적으로 xcom_push() 메소드를 호출하는 방법이 있고, 다른 하나는 작업의 Python 함수에서 return 키워드를 통해 value를 리턴하는 방법입니다. 즉, PythonOperator의 작업인 경우 python_callable 파라미터에 명시 된 Python 함수에서 return value가 있으면 그 value는 자동으로 XCom으로 push 됩니다. 이 때 XCom의 key는 기본적으로 return_value로 정해집니다.

  1. xcom_DAG 스크립트의 Python 함수를 다음과 같이 변경 한 다음 재 실행 해 보십시오. 변경 한 스크립트는 Airflow S3 bucket의 dags 폴더로 재차 업로드 해야 합니다. Xcoms

  2. 변경 된 스크립트가 Airflow UI에 잘 반영 되었는 지 Code View를 통해 확인 하십시오. 바로 반영 되지 않았을 경우 Refresh 버튼을 클릭하고 브라우저를 reload 해 보십시오. Xcoms

  3. DAG를 수동으로 트리거하고 XComs 화면에 XCom이 예상대로 생성 되었는 지 확인 하십시오. Xcoms

  4. 확인 된 결과와 같이 명시적으로 xcom_push() 메소드를 호출하지 않아도 Python 함수의 return value로 인해 XCom이 자동으로 생성 됩니다. 그리고 pull_task의 로그를 보면 성공적으로 xcom value를 조회 한 것을 알 수 있습니다. Xcoms