Airflow DAG 개발

다음은 ML 워크플로우 구축을 위한 Airflow DAG 코드를 개발 하겠습니다. DAG를 정의하는 첫 번째 단계로 데이터 파이프라인에서 사용할 모든 필수 모듈(Operator 및 Sensor)를 import 합니다.

from datetime import timedelta
import airflow
from airflow import DAG
from airflow.models import DAG
from airflow.sensors.s3_prefix_sensor import S3PrefixSensor

# airflow operators
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

# airflow sagemaker operators
from airflow.providers.amazon.aws.operators.sagemaker_training \
    import SageMakerTrainingOperator
from airflow.providers.amazon.aws.operators.sagemaker_tuning \
    import SageMakerTuningOperator
from airflow.providers.amazon.aws.operators.sagemaker_transform \
    import SageMakerTransformOperator

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.utils.trigger_rule import TriggerRule

# sagemaker sdk
import boto3
import sagemaker
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.estimator import Estimator
from sagemaker.tuner import HyperparameterTuner

# airflow sagemaker configuration
from sagemaker.workflow.airflow import training_config
#from sagemaker.workflow.airflow import tuning_config
from sagemaker.workflow.airflow import transform_config_from_estimator

이 전 단계에서 만든 Python 파일을 DAG 라이브러리로 import 합니다.

# Import preprocess file.
from preprocess import preprocess

# Import config file.
import config as cfg

def get_sagemaker_role_arn(role_name, region_name):
    iam = boto3.client('iam', region_name=region_name)
    response = iam.get_role(RoleName=role_name)
    return response["Role"]["Arn"]

다음은 데이터와 스크립트가 저장될 S3 버킷 이름과 리전을 지정하고 Airflow DAG에 대한 기본 변수를 정의합니다.

아래 코드 스니펫의 S3_BUCKET_NAMEREGION_NAME을 워크숍 버킷 및 지역으로 수정하십시오.

S3_BUCKET_NAME = "mwaa-sagemaker-bucket-<your_aws_account_id>"
REGION_NAME = "<AWS Region>" # Seoul: ap-northeast-2, Tokyo: ap-northeast-1, Virgina: us-east-1
  
default_args = {  
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'retries': 0,
    'retry_delay': timedelta(minutes=2),
    'provide_context': True,
    'email': ['airflow@iloveairflow.com'],
    'email_on_failure': True,
    'email_on_retry': False
}

다음은 DAG에서 ML 학습과 예측을 수행하기 위해 필요한 estimator를 설정합니다.

#-----------
### Setup Train and Tunning Tasks. 
#-----------

# read config file
config = cfg.get_config(S3_BUCKET_NAME, REGION_NAME)

# set configuration for tasks
hook = AwsBaseHook(aws_conn_id='airflow-sagemaker', resource_type="sagemaker")
region = config["job_level"]["region_name"]
sess = hook.get_session(region_name=region)
role = get_sagemaker_role_arn(
    config["train_model"]["sagemaker_role"],
    sess.region_name)
container = get_image_uri(sess.region_name, 'xgboost', repo_version='1.0-1')

hpo_enabled = False

train_input = config["train_model"]["inputs"]["train"]
csv_train_input = sagemaker.session.s3_input(train_input, content_type='csv')

validation_input = config["train_model"]["inputs"]["validation"]
csv_validation_input = sagemaker.session.s3_input(validation_input, content_type='csv')

training_inputs = {"train": csv_train_input, "validation": csv_validation_input}

output_path = config["train_model"]["estimator_config"]["output_path"]

fm_estimator = Estimator(image_name=container, 
    role=role,
    train_instance_count=1, 
    train_instance_type='ml.m5.2xlarge', 
    train_volume_size=5, # 5 GB 
    output_path=output_path,
    sagemaker_session=sagemaker.session.Session(sess),
    #train_use_spot_instances=True,
    #train_max_run=300,
    #train_max_wait=600
    )

fm_estimator.set_hyperparameters(max_depth=5,
    eta=0.2,
    #gamma=4,
    #min_child_weight=300,
    #subsample=0.8,
    #silent=0,
    objective='reg:linear',
    early_stopping_rounds=10,
    num_round=150)


# train_config specifies SageMaker training configuration
train_config = training_config(
    estimator=fm_estimator,
    inputs=training_inputs)

# create transform config
transform_config = transform_config_from_estimator(
    estimator=fm_estimator,
    task_id="model_tuning" if hpo_enabled else "model_training",
    task_type="tuning" if hpo_enabled else "training",
    **config["batch_transform"]["transform_config"]
)

다음은 기본 설정을 적용하여 DAG 개체를 만들고 DAG가 실행되어야 하는 스케줄 간격을 지정합니다.

#-------
### Start creating DAGs
#-------

dag = DAG(  
    'ml_pipeline',
    default_args=default_args,
    dagrun_timeout=timedelta(hours=2)
    #schedule_interval='0 3 * * *'
)

s3_sensor = S3PrefixSensor(
    task_id='s3_sensor',
    bucket_name=S3_BUCKET_NAME,
    prefix='raw/',
    dag=dag
)

Preprocessing

학습 및 테스트 데이터 세트를 생성할 전처리 작업을 추가합니다.

# Create python operator to call our preprocess function (preprocess.py file).
preprocess_task = PythonOperator(
    task_id='preprocessing',
    python_callable=preprocess,
    op_kwargs={'bucket_name': S3_BUCKET_NAME},
    dag=dag)

Model training

Airflow Amazon SageMaker Operators를 사용하면 Amazon SageMaker 작업을 만들 수 있습니다. 다음과 같이 SageMakerTrainingOperator를 사용하여 Amazon SageMaker XG-Boost 알고리즘을 학습합니다.

# launch sagemaker training job and wait until it completes
train_model_task = SageMakerTrainingOperator(
    task_id='model_training',
    dag=dag,
    config=train_config,
    aws_conn_id='airflow-sagemaker',
    wait_for_completion=True,
    check_interval=30
)

Model inference

Airflow SageMakerTransformOperator로 Amazon SageMaker 배치 추론 작업을 생성하여 테스트 데이터 세트를 기반으로 모델의 성능을 평가합니다.

# launch sagemaker batch transform job and wait until it completes
batch_transform_task = SageMakerTransformOperator(
    task_id='predicting',
    dag=dag,
    config=transform_config,
    aws_conn_id='airflow-sagemaker',
    wait_for_completion=True,
    check_interval=30,
    trigger_rule=TriggerRule.ONE_SUCCESS
)

또한 SageMakerTuningOperator를 사용하면 하이퍼파라미터 튜닝 작업을 실행할 수 있어 최상의 모델을 찾을 수도 있습니다. 관련 내용은 블로그 게시물을 참조 하시길 바랍니다.

여기까지 ML 파이프라인의 일부 작업을 구성했으며 다음 단계는 이 모든 작업을 연결하고 DAG를 Airflow에 배포하고 DAG를 실행 하겠습니다.