XCom은 cross-communication을 의미하며 작업 간에 메시지 또는 소량의 데이터를 교환하기 위한 Airflow의 메커니즘입니다. XCom은 다음 Airflow UI 화면에서처럼 Key, Value 및 Timestamp 필드가 있는 작은 오브젝트로 생각 할 수 있습니다. XCom은 Admin->XComs를 통해 액세스 할 수 있습니다

위 화면을 보면 Basic_DAG의 hello_task에서 이미 XCom을 생성했던 흔적이 있습니다. XCom이 어떻게 동작하는지 실습을 통해 알아 보겠습니다.
xcom_DAG.py 파일을 dags 폴더에 생성하십시오.# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
"owner": "airflow",
"start_date": days_ago(1),
}
dag = DAG(
"xcom_DAG",
default_args=default_args,
schedule_interval=None,
)
def push_function(**kwargs):
message='pushed XCom message.'
# ti is Task Instance in the provide_context
ti = kwargs['ti']
ti.xcom_push(key="message", value=message)
def pull_function(**kwargs):
ti = kwargs['ti']
message = ti.xcom_pull(key="message", task_ids='push_task')
print("Pulled Message: '%s'" % message)
t1 = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=dag)
t2 = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=dag)
t1 >> t2
DAG 스크립트에는 2개의 Python 함수 (push_function, pull_function)가 구현되어 있습니다. 그리고 두 함수는 PythonOperator의 python_callable 파라미터를 통해 호출 됩니다. 작업은 명시적으로 xcom_push()와 xcom_pull() 메소드를 호출하여 XCom을 push 또는 pull 할 수 있습니다. Xcom 메소드는 작업 인스턴스 (Task Instance) 오브젝트를 통해서만 액세스 할 수 있으며, 본 실습에서 사용하는 Airflow 1.10.x 버전은 **kwargs argument을 통해 제공되는 context 정보에서 작업 인스턴스 오브젝트를 액세스 할 수 있습니다. xcom_push() 메소드는 XCom을 생성하기 위해 key와 value를 파라미터로 제공합니다. xcom_pull() 메소드는 XCom을 조회하기 위해 key와 task_id를 제공합니다.

새로 생성 한 DAG를 Airflow S3 bucket의 dags 폴더로 업로드 해야 합니다. Cloud9 workspace의 terminal에서 다음 AWS CLI 명령어를 실행하여 DAG 파일을 S3로 업로드 하십시오.
CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.
cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/

Airflow UI를 reload 하면 xcom_DAG가 DAG 목록에 나타납니다.

Graph View로 가서 toggle switch의 상태를 On으로 변경하고 수동으로 DAG를 트리거 하십시오.
v1.10.12:

v2.0.2:

Workflow가 성공적으로 수행 되었다면 Airflow UI의 Admin->XComs로 가서 push_task 작업이 생성한 XCom을 확인 해 보십시오.

Graph View로 가서 pull_task의 로그를 확인해 보십시오. 성공적으로 수행 되었다면 pull_function이 출력한 XCom 메시지를 로그에서 확인 할 수 있습니다.

생각해보면 Basic_DAG.py에서 xcom_push() 메소드가 사용 된 흔적을 발견 할 수 없는 데 어떻게 XCom (key=return_value, value=Hello world!)을 생성 했던 걸까요? 실제로 XCom을 생성하는 방법에는 몇 가지가 있습니다. 그 중에 하나는 명시적으로 xcom_push() 메소드를 호출하는 방법이 있고, 다른 하나는 작업의 Python 함수에서 return 키워드를 통해 value를 리턴하는 방법입니다. 즉, PythonOperator의 작업인 경우 python_callable 파라미터에 명시 된 Python 함수에서 return value가 있으면 그 value는 자동으로 XCom으로 push 됩니다. 이 때 XCom의 key는 기본적으로 return_value로 정해집니다.
xcom_DAG 스크립트의 Python 함수를 다음과 같이 변경 한 다음 재 실행 해 보십시오. 변경 한 스크립트는 Airflow S3 bucket의 dags 폴더로 재차 업로드 해야 합니다.

변경 된 스크립트가 Airflow UI에 잘 반영 되었는 지 Code View를 통해 확인 하십시오. 바로 반영 되지 않았을 경우 Refresh 버튼을 클릭하고 브라우저를 reload 해 보십시오.

DAG를 수동으로 트리거하고 XComs 화면에 XCom이 예상대로 생성 되었는 지 확인 하십시오.

확인 된 결과와 같이 명시적으로 xcom_push() 메소드를 호출하지 않아도 Python 함수의 return value로 인해 XCom이 자동으로 생성 됩니다. 그리고 pull_task의 로그를 보면 성공적으로 xcom value를 조회 한 것을 알 수 있습니다.
