ODS SubDAG

Workflow의 첫 단계는 RDS 소스를 S3 Data Lake으로 수집하기 위해 AWS Glue ETL job을 실행합니다. 해당 작업을 수행하기 위해 필요한 Glue Catalog의 database table과 Glue ETL job은 실습 환경을 구축하는 과정에서 CloudFormation template을 통해 이미 생성 되었습니다.

  1. AWS Glue 콘솔로 가서 chinook_mysql 데이터베이스에 다음 화면과 같은 11개의 테이블이 존재하는 지 확인 하십시오. 해당 테이블은 RDS의 chinook 데이터베이스를 Glue Crawler로 크롤링하여 생성 되었습니다. OdsSubdag

  2. Glue ETL Spark job도 존재하는 지 확인 하십시오. OdsSubdag

Glue ETL job은 소스 데이터베이스와 1:1로 매핑 된 테이블의 데이터를 추출하여 parquet 포맷으로 S3 bucket에 저장하는 작업을 수행 합니다.

  1. ETL job 하나를 선택하고 ETL 스크립트를 살펴 보십시오. 모든 ETL 스크립트에는 공통적으로 datalake_bucket, month_partition_key, year_partition_keyJob 파라미터로 받아서 데이터를 저장 할 S3 path를 설정합니다. 파라미터는 이 후 Airflow 작업을 통해 전달 됩니다. OdsSubdag

  2. Airflow에서 트리거 할 수 있는 Glue ETL job이 준비 된 상태이므로 Airflow 작업을 만들겠습니다. Airflow에서 반복적인 패턴의 작업은 main DAG에 구성하는 것 보다 modular하게 sub DAG로 분리하여 구성하면 main DAG를 단조롭게 할 수 있습니다. 따라서, Glue ETL job을 트리거 하는 작업과 Glue Catalog에 테이블을 생성하는 작업하나의 DAG 오브젝트로 구성하겠습니다. 다음 스크립트를 Cloud9 workspacecopy/paste하여 dags/chinookSubDAG 폴더 안에 chinook_ods_subDAG.py를 생성 하십시오.

SubDAG 파일을 저장 할 chinookSubDAG 폴더가 아직 없을 것 이므로 먼저 생성하고 SubDAG 파일을 저장 해야합니다.

from airflow.models import DAG
from airflow.models import Variable
from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator

# Airflow connections
AWS_CONN='aws_default'

# UI variables
ODS_DB=Variable.get('ODS_DB')
CURRENT_YEAR=Variable.get('CURRENT_YEAR')
CURRENT_MONTH=Variable.get('CURRENT_MONTH')
DATALAKE_BUCKET=Variable.get('DATALAKE_BUCKET')
PROJECT_BUCKET=Variable.get('PROJECT_BUCKET')
               
table_list = [ 'artist', 'album', 'mediatype', 'genre', 'track', 'customer', 'invoice', 'invoiceline' ]

# Helper Function: Returns DAG that ingest data from RDS to S3 ODS tables
def transfer_rds_to_s3_subdag(parent_dag_name, child_dag_name, default_args):

    with DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=default_args
    ) as dag:

        for table_name in table_list:
            t1 = S3DeleteObjectsOperator(
                    task_id=f'clear_{table_name}_data_in_s3',
                    bucket=DATALAKE_BUCKET,
                    prefix=f'ods/{table_name}/{CURRENT_YEAR}/{CURRENT_MONTH}',
                    aws_conn_id=AWS_CONN
                )
            
            t2 = AwsGlueJobOperator(
                    task_id=f'transfer_{table_name}_data_to_s3',    
                    job_name=f'chinook-{table_name}-table-to-s3', 
                    script_args={"--datalake_bucket": DATALAKE_BUCKET, \
                                 "--year_partition_key": CURRENT_YEAR, \
                                 "--month_partition_key": CURRENT_MONTH},
                    num_of_dpus=5
                )
            
            t1 >> t2
            
    return dag
    

# ODS SQLs
ODS_DROP_ARTIST_TABLE=\
"DROP TABLE IF EXISTS artist"

ODS_CREATE_ARTIST_TABLE=\
f"""
CREATE EXTERNAL TABLE `artist`(`ArtistId` int, `Name` string) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/artist/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file');
"""

ODS_DROP_ALBUM_TABLE=\
"DROP TABLE IF EXISTS album"

ODS_CREATE_ALBUM_TABLE=\
f"""
CREATE EXTERNAL TABLE `album`(`AlbumId` int, `Title` string, `ArtistId` int) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/album/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ODS_DROP_MEDIATYPE_TABLE=\
"DROP TABLE IF EXISTS mediatype"

ODS_CREATE_MEDIATYPE_TABLE=\
f"""
CREATE EXTERNAL TABLE `mediatype`(`MediaTypeId` int, `Name` string) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/mediatype/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ODS_DROP_GENRE_TABLE=\
"DROP TABLE IF EXISTS genre"

ODS_CREATE_GENRE_TABLE=\
f"""
CREATE EXTERNAL TABLE `genre`(`GenreId` int, `Name` string) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/genre/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ODS_DROP_TRACK_TABLE=\
"DROP TABLE IF EXISTS track"

ODS_CREATE_TRACK_TABLE=\
f"""
CREATE EXTERNAL TABLE `track`(`TrackId` int, `Name` string, `AlbumId` int, `MediaTypeId` int, `GenreId` int, `Composer` string, `Milliseconds` int, `Bytes` int, `UnitPrice` decimal(10,2)) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/track/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ODS_DROP_CUSTOMER_TABLE=\
"DROP TABLE IF EXISTS customer"

ODS_CREATE_CUSTOMER_TABLE=\
f"""
CREATE EXTERNAL TABLE `customer`(`CustomerId` int, `FirstName` string, `LastName` string, `Company` string, `Address` string, `City` string, `State` string, `Country` string, `PostalCode` string, `Phone` string, `Fax` string, `Email` string, `SupportRepId` int) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/customer/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ODS_DROP_INVOICE_TABLE=\
"DROP TABLE IF EXISTS invoice"

ODS_CREATE_INVOICE_TABLE=\
f"""
CREATE EXTERNAL TABLE `invoice`(`InvoiceId` int, `CustomerId` int, `InvoiceDate` timestamp, `BillingAddress` string, `BillingCity` string, `BillingState` string, `BillingCountry` string, `BillingPostalCode` string, `Total` decimal(10,2)) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/invoice/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ODS_DROP_INVOICELINE_TABLE=\
"DROP TABLE IF EXISTS invoiceline"

ODS_CREATE_INVOICELINE_TABLE=\
f"""
CREATE EXTERNAL TABLE `invoiceline`(`InvoiceLineId` int, `InvoiceId` int, `TrackId` int, `UnitPrice` decimal(10,2), `Quantity` int) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/invoiceline/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

drop_sql_dict = { 
                  'artist': ODS_DROP_ARTIST_TABLE, \
                  'album': ODS_DROP_ALBUM_TABLE, \
                  'mediatype': ODS_DROP_MEDIATYPE_TABLE, \
                  'genre': ODS_DROP_GENRE_TABLE, \
                  'track': ODS_DROP_TRACK_TABLE, \
                  'customer': ODS_DROP_CUSTOMER_TABLE, \
                  'invoice': ODS_DROP_INVOICE_TABLE, \
                  'invoiceline': ODS_DROP_INVOICELINE_TABLE
                }
                
create_sql_dict = { 
                  'artist': ODS_CREATE_ARTIST_TABLE, \
                  'album': ODS_CREATE_ALBUM_TABLE, \
                  'mediatype': ODS_CREATE_MEDIATYPE_TABLE, \
                  'genre': ODS_CREATE_GENRE_TABLE, \
                  'track': ODS_CREATE_TRACK_TABLE, \
                  'customer': ODS_CREATE_CUSTOMER_TABLE, \
                  'invoice': ODS_CREATE_INVOICE_TABLE, \
                  'invoiceline': ODS_CREATE_INVOICELINE_TABLE
                }
                
# Helper Function: Returns DAG that create ODS tables in Glue Catalog
def create_ods_table_subdag(parent_dag_name, child_dag_name, default_args):

    with DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=default_args
    ) as dag:

        for table_name in table_list:
            t1 = AWSAthenaOperator(
                    task_id=f'drop_{table_name}_table',
                    query=drop_sql_dict[table_name],
                    database=ODS_DB,
                    output_location=f"s3://{PROJECT_BUCKET}/athena_query_results/",
                    aws_conn_id=AWS_CONN,
                    workgroup="primary"
                )
            
            t2 = AWSAthenaOperator(
                    task_id=f'create_{table_name}_table',
                    query=create_sql_dict[table_name],
                    database=ODS_DB,
                    output_location=f"s3://{PROJECT_BUCKET}/athena_query_results/",
                    aws_conn_id=AWS_CONN,
                    workgroup="primary"
                )  
            
            t1 >> t2
            
    return dag
  1. 스크립트 구성을 살펴 보겠습니다. 먼저 DAG를 구성하기 위해 필요한 Airflow library를 import 합니다. 특히 S3, Glue 및 Athena 작업이 필요하므로 AWS Operator library가 사용 됩니다. 그리고 Airflow 작업이 AWS 서비스와 연결 하기 위해 기본 구성 되 있는 aws_default 연결을 사용하며, 또한 Airflow UI에 정의 된 Variables (변수)을 Global 변수로 선언하여 사용합니다. OdsSubdag

  2. transfer_rds_to_s3 함수는 RDS의 데이터를 S3로 수집하기 위한 DAG 오브젝트를 구성합니다. 첫번째 S3DeleteObjectsOperator의 작업은 소스 데이터 수집 작업이 여러 번 수행 되도 중복 데이터가 S3 Data Lake bucket에 생성되는 것을 방지 하기 위해 기존에 수집 된 S3 오브젝트를 삭제하는 작업 입니다. 두번째 AwsGlueJobOperator의 작업은 Glue ETL job을 트리거 하는 작업 입니다. 따라서 AwsGlueJobOperator에 Glue job의 이름과 scripts_args를 정의하여 Glue job에서 요구하는 파리미터 정보를 제공합니다.

각 테이블 별로 clear_<table_name>_data_in_s3, transfer_<table_name>_data_to_s3 이름의 작업이 Airflow DAG에 생성 됩니다.

OdsSubdag

  1. Glue를 통해 S3 Data Lake bucket으로 적재 된 데이터를 Glue Catalog의 테이블로 정의해야 합니다. 본 실습에서는 Athena DDL SQL를 통해 직관적으로 Glue Catalog 테이블을 생성합니다. 따라서, DAG 스크립트에는 Athena에서 수행 할 Drop table, Create table 쿼리 세트가 각 테이블 별로 정의 되었습니다. OdsSubdag

  2. create_ods_table 함수는 S3 Data Lake bucket에 적재 된 데이터의 대한 Glue Catalog 테이블을 생성하는 DAG 오브젝트를 구성합니다. 첫번째 AthenaOperator는 앞서 정의 한 각 테이블의 Drop table 쿼리 문을 사용하여 Glue Catalog에 존재하는 기존 테이블을 삭제하기 위한 작업을 정의합니다. 두번째 AthenaOperator는 앞서 정의 한 각 테이블의 Create table 쿼리 문을 사용하여 Glue Catalog에 새 테이블을 생성하기 위한 작업을 정의합니다.

각 테이블 별로 drop_<table_name>_table, create_<table_name>_table 이름의 작업이 Airflow DAG에 생성 됩니다.

OdsSubdag

  1. Airflow UI의 Admin->Variables 화면으로 가서 DAG 스크립트에서 사용하는 Airflow 변수를 다음 테이블의 정보를 참조하여 생성 하십시오.

Airflow 변수를 생성하기 전에 <your_aws_account_id> 문구는 현재 사용하고 있는 12자리 AWS Account ID로 변경해야 합니다.

Key Val
ODS_DB chinook_ods
CURRENT_YEAR 2009
CURRENT_MONTH 01
DATALAKE_BUCKET mwaa-datalake-bucket-<your_aws_account_id>
PROJECT_BUCKET mwaa-project-bucket-<your_aws_account_id>

OdsSubdag