이제 완성 된 Python 코드를 chinook_taskgroup_DAG.py로 Cloud9의 dags 폴더 안에 저장하고 MWAA 환경으로 업로드 하겠습니다.
Cloud9 workspace의 terminal에서 다음 AWS CLI 명령어를 실행하여 DAG 파일을 S3로 업로드 하십시오.
CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.
cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/

DAG 스크립트가 배포되면 대략 1분 이내에 Airflow UI에서 chinook_taskgroup_DAG를 볼 수 있습니다.
Airflow UI를 reload 하십시오. 그러면 Airflow UI에 chinook_taskgroup_DAG가 DAG 목록에 나타납니다. 그리고 배포 된 DAG는 초기에 기본적으로 비활성화된 상태로 설정됩니다.

해당 DAG의 Graph View로 가서 Workflow를 보면 다음 화면과 같은 모습압니다.

Toggle switch의 상태를 On으로 변경하고 DAG를 수동으로 트리거 하십시오. Auto-refresh를 활성화하여 진행 상태를 모니터링 할 수 있습니다. 각 각의 노드를 클릭하면 TaskGroup의 세부 작업을 볼 수도 있습니다.
첫번째 glue_transfer_rds_to_s3 작업이 시작되면 Glue Studio 콘솔의 Monitoring 화면을 통해서도 ETL job의 실행 상태를 모니터링 할 수 있습니다.

첫 번째 작업이 완료되면 S3 data lake bucket의 ODS 폴더 안에 RDS에서 추출 한 데이터가 각 테이블 별로 생성 됩니다. S3로 가서 각 테이블 폴더 안에 파일이 존재 하는지 확인 해 보십시오.

두 번째 athena_create_ods_table 작업이 완료되면, Glue Catalog의 chinook_ods 데이터베이스에는 다음 화면과 같은 테이블들이 생성 됩니다. Glue Catalog에 다음 테이블들이 생성 되었는 지 확인 해 보십시오.

두 번째 작업이 완료되면 athena_create_dwh_dim_table과 emr_create_dwh_fact_table 작업은 동시에 실행됩니다. 두 작업이 완료되면 S3 data lake bucket의 dwh 폴더 안에 Star Schema 형태로 변환 된 데이터가 생성 됩니다. S3로 가서 각 테이블 폴더 안에 파일이 존재 하는지 확인 해 보십시오.

Glue Catalog의 chinook_dwh 데이터베이스에는 다음 화면과 같은 테이블들이 생성 됩니다. Glue Catalog에 다음 테이블들이 생성되어 있는지 확인 해 보십시오.

마지막 load_redshift_table 작업이 완료되면 다음 화면과 같이 Workflow의 모든 작업이 성공 상태인 것을 볼 수 있습니다.

Redshift 테이블에 데이터가 제대로 로드 되었는 지 확인해 보겠습니다. EC2 콘솔로 가서 EC2-DB-Loader 인스턴스를 선택하고 Connect 버튼을 클릭 하여 인스턴스에 접속 하십시오.

설치되어 있는 psql 클라이언트를 사용하여 Redshift cluster 연결하고 다음 Count 쿼리를 실행하여 각 테이블에 데이터가 로드 되었는 지 확인 하십시오.
psql를 실행하기 전에 <redshift_cluster_endpoint> 문구는 CloudFormation stack의 Output 탭 화면에 출력 된 RedshiftClusterEndpoint로 변경해야 합니다.
# Connect to Redshift cluster
sudo psql -w -h <redshift_cluster_endpoint> -U awsuser -d dev -p 5439
# Display schemas
\dn
# Display tables in chinook_dwh schemas
\dt chinook_dwh.
# Display count of table rows
SELECT
(SELECT count(*) FROM chinook_dwh.fact_invoice) as fact_count,
(SELECT count(*) FROM chinook_dwh.dim_customer) as customer_count,
(SELECT count(*) FROM chinook_dwh.dim_invoice) as invoice_count,
(SELECT count(*) FROM chinook_dwh.dim_track) as track_count;

Redshift 테이블에는 2009년 1월 데이터가 로드 되었습니다. Airflow UI의 Admin->Variables 화면으로 가서 CURRENT_MONTH 변수 값을 02, 03, 04 등 순차적으로 변경하여 DAG를 재실행 해 보십시오.

DAG 실행이 완료 된 후 Count 쿼리를 재 실행하면 일부 테이블에 Count가 증가 한 것을 볼 수 있습니다. 즉, Airflow의 변수를 잘 활용하면 주 배치, 월 배치 등의 데이터 프로세싱 작업을 Airflow UI를 통해 쉽게 제어 할 수 있습니다.
