Airflow DAG 개발

Cloud9 workspace 환경에서 새 파일을 만들고 Airflow DAG 코드를 개발 하겠습니다. DAG를 정의하는 첫 번째 단계로 데이터 파이프라인에서 사용할 모든 필수 모듈(Operator 및 Sensor)를 import 합니다.

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

Airflow variables 및 connections을 DAG의 로컬 변수로 불러 옵니다.

# Airflow connections
AWS_CONN='aws_default'
REDSHIFT_CONN='redshift_conn'

# Airflow variables
ODS_DB=Variable.get('ODS_DB')
DWH_DB=Variable.get('DWH_DB')
CURRENT_YEAR=Variable.get('CURRENT_YEAR')
CURRENT_MONTH=Variable.get('CURRENT_MONTH')
DATALAKE_BUCKET=Variable.get('DATALAKE_BUCKET')
PROJECT_BUCKET=Variable.get('PROJECT_BUCKET')

다음은 ODS 데이터 카탈로그 구축에 필요한 Athena SQL 쿼리를 구성 합니다.

ods_table_list = [ 'artist', 'album', 'mediatype', 'genre', 'track', 'customer', 'invoice', 'invoiceline' ]

# ODS SQLs
ODS_DROP_ARTIST_TABLE=\
"DROP TABLE IF EXISTS artist"

ODS_CREATE_ARTIST_TABLE=\
f"""
CREATE EXTERNAL TABLE `artist`(`ArtistId` int, `Name` string) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/artist/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file');
"""

ODS_DROP_ALBUM_TABLE=\
"DROP TABLE IF EXISTS album"

ODS_CREATE_ALBUM_TABLE=\
f"""
CREATE EXTERNAL TABLE `album`(`AlbumId` int, `Title` string, `ArtistId` int) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/album/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ODS_DROP_MEDIATYPE_TABLE=\
"DROP TABLE IF EXISTS mediatype"

ODS_CREATE_MEDIATYPE_TABLE=\
f"""
CREATE EXTERNAL TABLE `mediatype`(`MediaTypeId` int, `Name` string) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/mediatype/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ODS_DROP_GENRE_TABLE=\
"DROP TABLE IF EXISTS genre"

ODS_CREATE_GENRE_TABLE=\
f"""
CREATE EXTERNAL TABLE `genre`(`GenreId` int, `Name` string) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/genre/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ODS_DROP_TRACK_TABLE=\
"DROP TABLE IF EXISTS track"

ODS_CREATE_TRACK_TABLE=\
f"""
CREATE EXTERNAL TABLE `track`(`TrackId` int, `Name` string, `AlbumId` int, `MediaTypeId` int, `GenreId` int, `Composer` string, `Milliseconds` int, `Bytes` int, `UnitPrice` decimal(10,2)) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/track/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ODS_DROP_CUSTOMER_TABLE=\
"DROP TABLE IF EXISTS customer"

ODS_CREATE_CUSTOMER_TABLE=\
f"""
CREATE EXTERNAL TABLE `customer`(`CustomerId` int, `FirstName` string, `LastName` string, `Company` string, `Address` string, `City` string, `State` string, `Country` string, `PostalCode` string, `Phone` string, `Fax` string, `Email` string, `SupportRepId` int) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/customer/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ODS_DROP_INVOICE_TABLE=\
"DROP TABLE IF EXISTS invoice"

ODS_CREATE_INVOICE_TABLE=\
f"""
CREATE EXTERNAL TABLE `invoice`(`InvoiceId` int, `CustomerId` int, `InvoiceDate` timestamp, `BillingAddress` string, `BillingCity` string, `BillingState` string, `BillingCountry` string, `BillingPostalCode` string, `Total` decimal(10,2)) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/invoice/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ODS_DROP_INVOICELINE_TABLE=\
"DROP TABLE IF EXISTS invoiceline"

ODS_CREATE_INVOICELINE_TABLE=\
f"""
CREATE EXTERNAL TABLE `invoiceline`(`InvoiceLineId` int, `InvoiceId` int, `TrackId` int, `UnitPrice` decimal(10,2), `Quantity` int) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION  's3://{DATALAKE_BUCKET}/ods/invoiceline/{CURRENT_YEAR}/{CURRENT_MONTH}/' 
TBLPROPERTIES ('classification'='parquet', 'compressionType'='snappy', 'typeOfData'='file')
"""

ods_drop_sql_dict = { 
                  'artist': ODS_DROP_ARTIST_TABLE, \
                  'album': ODS_DROP_ALBUM_TABLE, \
                  'mediatype': ODS_DROP_MEDIATYPE_TABLE, \
                  'genre': ODS_DROP_GENRE_TABLE, \
                  'track': ODS_DROP_TRACK_TABLE, \
                  'customer': ODS_DROP_CUSTOMER_TABLE, \
                  'invoice': ODS_DROP_INVOICE_TABLE, \
                  'invoiceline': ODS_DROP_INVOICELINE_TABLE
                }
                
ods_create_sql_dict = { 
                  'artist': ODS_CREATE_ARTIST_TABLE, \
                  'album': ODS_CREATE_ALBUM_TABLE, \
                  'mediatype': ODS_CREATE_MEDIATYPE_TABLE, \
                  'genre': ODS_CREATE_GENRE_TABLE, \
                  'track': ODS_CREATE_TRACK_TABLE, \
                  'customer': ODS_CREATE_CUSTOMER_TABLE, \
                  'invoice': ODS_CREATE_INVOICE_TABLE, \
                  'invoiceline': ODS_CREATE_INVOICELINE_TABLE
                }

다음은 DW dimension 데이터 및 카탈로그 구축에 필요한 Athena SQL 쿼리를 구성 합니다.

# DW SQLs
DWH_DROP_DIM_TRACK_TABLE="DROP TABLE IF EXISTS dim_track"

DWH_CREATE_DIM_TRACK_TABLE=\
f"""
CREATE TABLE dim_track
WITH (
      external_location = 's3://{DATALAKE_BUCKET}/dwh/dim_track/',
      format = 'Parquet',
      parquet_compression = 'SNAPPY'
    )
AS 
SELECT 
tr.trackid as "track_id", 
tr.name, tr.composer, 
tr.milliseconds, 
tr.bytes, tr.unitprice, 
ar.name as "artist", 
al.title as "album", 
ge.name as "genre", 
me.name as "media_type"
FROM {ODS_DB}.artist ar
  INNER JOIN {ODS_DB}.album al ON ar.artistid = al.artistid
  INNER JOIN {ODS_DB}.track tr ON al.albumid = tr.albumid
  INNER JOIN {ODS_DB}.genre ge ON tr.genreid = ge.genreid
  INNER JOIN {ODS_DB}.mediatype me ON tr.mediatypeid = me.mediatypeid
"""

DWH_DROP_DIM_CUSTOMER_TABLE="DROP TABLE IF EXISTS dim_customer"

DWH_CREATE_DIM_CUSTOMER_TABLE=\
f"""
CREATE TABLE dim_customer
WITH (
      external_location = 's3://{DATALAKE_BUCKET}/dwh/dim_customer/',
      format = 'Parquet',
      parquet_compression = 'SNAPPY'
    )
AS 
SELECT customerid as "customer_id", firstname as "first_name", lastname as "last_name", 
       company, address, city, state, country, postalcode as "postal_code", phone, 
       fax, email, supportrepid as "support_rep_id"
FROM {ODS_DB}.customer
"""

DWH_DROP_DIM_INVOICE_TABLE="DROP TABLE IF EXISTS dim_invoice"

DWH_CREATE_DIM_INVOICE_TABLE=\
f"""
CREATE TABLE dim_invoice
WITH (
      external_location = 's3://{DATALAKE_BUCKET}/dwh/dim_invoice/',
      format = 'Parquet',
      parquet_compression = 'SNAPPY'
    )
AS 
SELECT invoiceid as "invoice_id", billingaddress as "billing_address", billingcity as "billing_city", 
       billingstate as "billing_state", billingcountry as "billing_country", billingpostalcode as "billing_postal_code"
FROM {ODS_DB}.invoice
"""

dw_table_list = [ 'dim_track', 'dim_customer', 'dim_invoice' ]

dw_drop_sql_dict = { 
                  'dim_track': DWH_DROP_DIM_TRACK_TABLE, \
                  'dim_customer': DWH_DROP_DIM_CUSTOMER_TABLE, \
                  'dim_invoice': DWH_DROP_DIM_INVOICE_TABLE
                }
                
dw_create_sql_dict = { 
                  'dim_track': DWH_CREATE_DIM_TRACK_TABLE, \
                  'dim_customer': DWH_CREATE_DIM_CUSTOMER_TABLE, \
                  'dim_invoice': DWH_CREATE_DIM_INVOICE_TABLE
                }

다음은 DW fact 데이터 및 카탈로그 구축에 사용 할 EMR job 환경을 구성 합니다.

# EMR Cluster configuration
JOB_FLOW_OVERRIDES = {
    'Name': 'demo-spark-cluster',
    'ReleaseLabel': 'emr-6.2.0',
    'Applications': [
        {
            'Name': 'Spark'
        }
    ],
    "Configurations": [
        {
            "Classification": "spark-hive-site",
            "Properties": {
                "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
            }
        }
    ],
    'Instances': {
        'InstanceGroups': [
            {
                'Name': 'Master nodes',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            },
            {
                'Name': 'Slave nodes',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,                
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False
    },
    'VisibleToAllUsers': True,
    'JobFlowRole': 'EMR_EC2_DefaultRole-mwaa-workshop',
    'ServiceRole': 'EMR_DefaultRole-mwaa-workshop',
    'Tags': [
        {
            'Key': 'Project',
            'Value': 'Airflow Demo'
        }
    ]
}

# EMR Spark Step configuration
EMR_STEPS = [
    {
        'Name': 'Invoice Fact Table Processing Step',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit', '--deploy-mode', 'cluster', \
                     f's3://{PROJECT_BUCKET}/scripts/spark/create_dwh_fact_invoice_table.py', \
                     DATALAKE_BUCKET, ODS_DB, DWH_DB, CURRENT_YEAR, CURRENT_MONTH]
        },
    }
]

다음은 Airflow DAG에 대한 기본 변수를 정의합니다.

redshift_table_list = [ 'dim_track', 'dim_customer', 'dim_invoice', 'dim_date', 'fact_invoice' ]

# DAG default configurations
default_args = {
    "owner" : "airflow",
    "start_date" : days_ago(1)
}

DAG_NAME="chinook_taskgroup_DAG"

with DAG(dag_id=DAG_NAME, 
        schedule_interval=None, 
        default_args=default_args, 
        catchup=False
    ) as dag:

ODS 작업 그룹

다음은 RDS 데이터 소스를 S3 데이터레이크로 수집하기 위한 작업 그룹을 구성합니다. Airflow v2에서는 다수의 작업을 그룹핑하고 병렬로 실행하기 위해 TaskGroup을 사용할 수 있습니다. 첫번째 S3DeleteObjectsOperator는 소스 데이터 수집 작업이 여러 번 수행 되도 중복 데이터가 S3 Data Lake bucket에 생성되는 것을 방지 하기 위해 기존에 수집 된 S3 오브젝트를 삭제하는 작업을 정의합니다. 두번째 AwsGlueJobOperator는 Glue ETL job을 트리거 하는 작업을 정의합니다.

각 테이블 별로 t1, t2 작업 세트가 생성 됩니다.

    # ingest data from RDS to S3 ODS tables
    with TaskGroup(group_id='glue_transfer_rds_to_s3_group') as tg1:
        for table_name in ods_table_list:
            t1 = S3DeleteObjectsOperator(
                    task_id=f'clear_{table_name}_data_in_s3',
                    bucket=DATALAKE_BUCKET,
                    prefix=f'ods/{table_name}/{CURRENT_YEAR}/{CURRENT_MONTH}',
                    aws_conn_id=AWS_CONN
                )
            
            t2 = AwsGlueJobOperator(
                    task_id=f'transfer_{table_name}_data_to_s3',    
                    job_name=f'chinook-{table_name}-table-to-s3', 
                    script_args={"--datalake_bucket": DATALAKE_BUCKET, \
                                 "--year_partition_key": CURRENT_YEAR, \
                                 "--month_partition_key": CURRENT_MONTH},
                    num_of_dpus=5
                )
            
            t1 >> t2

다음은 S3 데이터레이크에 적재 된 ODS 데이터의 대한 Glue data catalog를 생성합니다. Glue crawler를 사용하는 방법이 있지만 본 실습은 보다 직관적인 Athena DDL SQL를 통해 Glue data catalog을 구축합니다. 첫번째 AthenaOperator는 Glue data catalog의 존재하는 기존 테이블을 Drop table 쿼리 문을 사용하여 삭제하는 작업을 정의합니다. 두번째 AthenaOperatorCreate table 쿼리 문을 사용하여 Glue Catalog에 새 테이블을 생성하기 위한 작업을 정의합니다.

각 테이블 별로 t1, t2 작업 세트가 생성 됩니다.

    # ingest data from RDS to S3 ODS tables
    with TaskGroup(group_id='athena_create_ods_table_group') as tg2:            
        for table_name in ods_table_list:
            t1 = AWSAthenaOperator(
                    task_id=f'drop_{table_name}_table',
                    query=ods_drop_sql_dict[table_name],
                    database=ODS_DB,
                    output_location=f"s3://{PROJECT_BUCKET}/athena_query_results/",
                    aws_conn_id=AWS_CONN,
                    workgroup="primary"
                )
            
            t2 = AWSAthenaOperator(
                    task_id=f'create_{table_name}_table',
                    query=ods_create_sql_dict[table_name],
                    database=ODS_DB,
                    output_location=f"s3://{PROJECT_BUCKET}/athena_query_results/",
                    aws_conn_id=AWS_CONN,
                    workgroup="primary"
                )  
            
            t1 >> t2

DW 작업 그룹

다음은 ODS 데이터를 소스로 DW의 dimension table을 구축합니다. 첫번째 S3DeleteObjectsOperator는 dimension table의 데이터를 생성하는 작업이 여러 번 수행 되도 중복 데이터가 S3 데이터레이크에 생성되는 것을 방지 하기 위해 기존에 생성 된 S3 데이터를 삭제하는 작업을 정의합니다. 그리고 다음 두 개의 AwsAthenaOperator는 각 각 Drop table 및 Create table 쿼리 문을 사용하여 최신 dimension table을 생성하기 위한 작업을 정의합니다.

각 테이블 별로 t1, t2, t2 작업 세트가 생성 됩니다.

    with TaskGroup(group_id='athena_create_dwh_dim_table_group') as tg3a:    
        for table_name in dw_table_list:
            if table_name == 'fact_invoice':
                s3_prefix = f'dwh/{table_name}/year={CURRENT_YEAR}/month={CURRENT_MONTH}/'
            else:
                s3_prefix = f'dwh/{table_name}/'
                
            t1 = S3DeleteObjectsOperator(
                    task_id=f'clear_{table_name}_data',
                    bucket=DATALAKE_BUCKET,
                    prefix=s3_prefix,
                    aws_conn_id=AWS_CONN
                )
            
            t2 = AWSAthenaOperator(
                    task_id=f'drop_{table_name}_table',
                    query=dw_drop_sql_dict[table_name],
                    database=DWH_DB,
                    output_location=f"s3://{PROJECT_BUCKET}/athena_query_results/",
                    aws_conn_id=AWS_CONN,
                    workgroup="primary"
                )
            
            t3 = AWSAthenaOperator(
                    task_id=f'create_{table_name}_table',
                    query=dw_create_sql_dict[table_name],
                    database=DWH_DB,
                    output_location=f"s3://{PROJECT_BUCKET}/athena_query_results/",
                    aws_conn_id=AWS_CONN,
                    workgroup="primary"
                )
                
            t1 >> t2 >> t3

다음은 ODS 데이터를 소스로 DW의 fact table을 구축합니다. 첫번째 EmrCreateJobFlowOperator는 EMR cluster configuration 정보를 기반으로 EMR cluster를 프로비져닝하기 위한 작업을 정의하고, 두번째 EmrAddStepsOperator는 Spark job을 EMR step에 추가하기 위한 작업을 정의합니다. 그리고 EmrStepSensor는 EMR step의 Spark job이 완료 될 때까지 대기하기 위한 작업을 정의합니다. 마지막으로 EmrTerminateJobFlowOperator는 EMR cluster를 종료하기 위한 작업을 정의합니다. 이러한 작업 순서대로 DW의 fact table을 구축하는 작업이 전개 됩니다.

    with TaskGroup(group_id='emr_create_dwh_fact_table_group') as tg3b:              
        # Create EMR cluster
        t1 = EmrCreateJobFlowOperator(
            task_id='create_emr_cluster',
            job_flow_overrides=JOB_FLOW_OVERRIDES,
            aws_conn_id=AWS_CONN,
        )
        
        # Add EMR Spark step    
        t2 = EmrAddStepsOperator(
            task_id='add_spark_step_for_fact_table',
            job_flow_id="{{ task_instance.xcom_pull(task_ids='emr_create_dwh_fact_table_group.create_emr_cluster', key='return_value') }}",
            aws_conn_id=AWS_CONN,
            steps=EMR_STEPS
        ) 
        
        # Wait step completion     
        t3 = EmrStepSensor(
            task_id='wait_for_step_complete',
            job_flow_id="{{ task_instance.xcom_pull(task_ids='emr_create_dwh_fact_table_group.create_emr_cluster', key='return_value') }}",
            step_id="{{ task_instance.xcom_pull(task_ids='emr_create_dwh_fact_table_group.add_spark_step_for_fact_table', key='return_value')[0] }}",
            aws_conn_id=AWS_CONN
        )

        # Terminate EMR cluster       
        t4 = EmrTerminateJobFlowOperator(
            task_id='terminate_emr_cluster',
            job_flow_id="{{ task_instance.xcom_pull(task_ids='emr_create_dwh_fact_table_group.create_emr_cluster', key='return_value') }}",
            aws_conn_id=AWS_CONN
        )
        
        t1 >> t2 >> t3 >> t4

Redshift 작업 그룹

다음은 S3 데이터레이크에 준비 된 DW 데이터를 Redshift 클러스터로 로딩합니다. 첫번째 PostgresOperator는 Redshift 테이블에 중복 데이터가 저장되지 않게 하기 위해 truncate table 쿼리를 수행하여 데이블을 비우기 위한 작업을 정의합니다. S3ToRedshiftOperator는 S3 Data Lake에 준비 된 DW 데이터를 Redshift 테이블로 로딩하기 위한 작업을 정의합니다.

각 테이블 별로 t1, t2 작업 세트가 생성 됩니다.

    with TaskGroup(group_id='load_redshift_table_group') as tg4:         
        for table_name in redshift_table_list:
            t1 = PostgresOperator(
                    task_id=f'truncate_{table_name}_table',
                    sql=f'TRUNCATE TABLE {DWH_DB}.{table_name};',
                    postgres_conn_id='redshift_conn'
                )    
            
            t2 = S3ToRedshiftOperator(
                    task_id=f'load_{table_name}_to_redshift',
                    redshift_conn_id=REDSHIFT_CONN,
                    aws_conn_id=AWS_CONN,
                    s3_bucket=DATALAKE_BUCKET,
                    s3_key=f'dwh/{table_name}/',
                    schema=DWH_DB,
                    table=table_name,
                    copy_options=['format as parquet']
                )
                
            t1 >> t2   

DAG 작업 그룹 연결

마지막으로 모든 작업 그룹의 순서와 종속성을 다음과 같이 정의합니다.

    # Execution dependency
    tg1 >> tg2 >> [tg3a, tg3b] >> tg4

이렇게 해서 DAG 정의가 모두 완료 되었습니다.