Airflow DAG 실행

DAG 업로드

이제 완성 된 Python 코드를 chinook_taskgroup_DAG.py로 Cloud9의 dags 폴더 안에 저장하고 MWAA 환경으로 업로드 하겠습니다.

Cloud9 workspace의 terminal에서 다음 AWS CLI 명령어를 실행하여 DAG 파일을 S3로 업로드 하십시오.

CLI를 실행하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.

cd ~/environment/dags
ls
aws s3 sync . s3://mwaa-workshop-<your_aws_account_id>/dags/

Dw_dagupload

DAG 스크립트가 배포되면 대략 1분 이내에 Airflow UI에서 chinook_taskgroup_DAG를 볼 수 있습니다.

Data Pipeline in Action

  1. Airflow UI를 reload 하십시오. 그러면 Airflow UI에 chinook_taskgroup_DAG가 DAG 목록에 나타납니다. 그리고 배포 된 DAG는 초기에 기본적으로 비활성화된 상태로 설정됩니다. Dagrun

  2. 해당 DAG의 Graph View로 가서 Workflow를 보면 다음 화면과 같은 모습압니다. Dagrun

  3. Toggle switch의 상태를 On으로 변경하고 DAG를 수동으로 트리거 하십시오. Auto-refresh를 활성화하여 진행 상태를 모니터링 할 수 있습니다. 각 각의 노드를 클릭하면 TaskGroup의 세부 작업을 볼 수도 있습니다.

첫번째 glue_transfer_rds_to_s3 작업이 시작되면 Glue Studio 콘솔의 Monitoring 화면을 통해서도 ETL job의 실행 상태를 모니터링 할 수 있습니다.

Dagrun

  1. 첫 번째 작업이 완료되면 S3 data lake bucket의 ODS 폴더 안에 RDS에서 추출 한 데이터가 각 테이블 별로 생성 됩니다. S3로 가서 각 테이블 폴더 안에 파일이 존재 하는지 확인 해 보십시오. Dagrun

  2. 두 번째 athena_create_ods_table 작업이 완료되면, Glue Catalog의 chinook_ods 데이터베이스에는 다음 화면과 같은 테이블들이 생성 됩니다. Glue Catalog에 다음 테이블들이 생성 되었는 지 확인 해 보십시오. Dagrun

  3. 두 번째 작업이 완료되면 athena_create_dwh_dim_tableemr_create_dwh_fact_table 작업은 동시에 실행됩니다. 두 작업이 완료되면 S3 data lake bucket의 dwh 폴더 안에 Star Schema 형태로 변환 된 데이터가 생성 됩니다. S3로 가서 각 테이블 폴더 안에 파일이 존재 하는지 확인 해 보십시오. Dagrun

  4. Glue Catalog의 chinook_dwh 데이터베이스에는 다음 화면과 같은 테이블들이 생성 됩니다. Glue Catalog에 다음 테이블들이 생성되어 있는지 확인 해 보십시오. Dagrun

  5. 마지막 load_redshift_table 작업이 완료되면 다음 화면과 같이 Workflow의 모든 작업이 성공 상태인 것을 볼 수 있습니다. Dagrun

  6. Redshift 테이블에 데이터가 제대로 로드 되었는 지 확인해 보겠습니다. EC2 콘솔로 가서 EC2-DB-Loader 인스턴스를 선택하고 Connect 버튼을 클릭 하여 인스턴스에 접속 하십시오. Dagrun Dagrun

  7. 설치되어 있는 psql 클라이언트를 사용하여 Redshift cluster 연결하고 다음 Count 쿼리를 실행하여 각 테이블에 데이터가 로드 되었는 지 확인 하십시오.

psql를 실행하기 전에 <redshift_cluster_endpoint> 문구는 CloudFormation stack의 Output 탭 화면에 출력 된 RedshiftClusterEndpoint로 변경해야 합니다.

# Connect to Redshift cluster
sudo psql -w -h <redshift_cluster_endpoint> -U awsuser -d dev -p 5439

# Display schemas
\dn

# Display tables in chinook_dwh schemas
\dt chinook_dwh.

# Display count of table rows
SELECT 
(SELECT count(*) FROM chinook_dwh.fact_invoice) as fact_count,
(SELECT count(*) FROM chinook_dwh.dim_customer) as customer_count,
(SELECT count(*) FROM chinook_dwh.dim_invoice) as invoice_count,
(SELECT count(*) FROM chinook_dwh.dim_track) as track_count;

Dagrun

  1. Redshift 테이블에는 2009년 1월 데이터가 로드 되었습니다. Airflow UI의 Admin->Variables 화면으로 가서 CURRENT_MONTH 변수 값을 02, 03, 04 등 순차적으로 변경하여 DAG를 재실행 해 보십시오. Dagrun

  2. DAG 실행이 완료 된 후 Count 쿼리를 재 실행하면 일부 테이블에 Count가 증가 한 것을 볼 수 있습니다. 즉, Airflow의 변수를 잘 활용하면 주 배치, 월 배치 등의 데이터 프로세싱 작업을 Airflow UI를 통해 쉽게 제어 할 수 있습니다. Dagrun