다음은 ML 워크플로우 구축을 위한 Airflow DAG 코드를 개발 하겠습니다. DAG를 정의하는 첫 번째 단계로 데이터 파이프라인에서 사용할 모든 필수 모듈(Operator 및 Sensor)를 import 합니다.
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.models import DAG
from airflow.sensors.s3_prefix_sensor import S3PrefixSensor
# airflow operators
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
# airflow sagemaker operators
from airflow.providers.amazon.aws.operators.sagemaker_training \
import SageMakerTrainingOperator
from airflow.providers.amazon.aws.operators.sagemaker_tuning \
import SageMakerTuningOperator
from airflow.providers.amazon.aws.operators.sagemaker_transform \
import SageMakerTransformOperator
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.utils.trigger_rule import TriggerRule
# sagemaker sdk
import boto3
import sagemaker
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.estimator import Estimator
from sagemaker.tuner import HyperparameterTuner
# airflow sagemaker configuration
from sagemaker.workflow.airflow import training_config
#from sagemaker.workflow.airflow import tuning_config
from sagemaker.workflow.airflow import transform_config_from_estimator
이 전 단계에서 만든 Python 파일을 DAG 라이브러리로 import 합니다.
# Import preprocess file.
from preprocess import preprocess
# Import config file.
import config as cfg
def get_sagemaker_role_arn(role_name, region_name):
iam = boto3.client('iam', region_name=region_name)
response = iam.get_role(RoleName=role_name)
return response["Role"]["Arn"]
다음은 데이터와 스크립트가 저장될 S3 버킷 이름과 리전을 지정하고 Airflow DAG에 대한 기본 변수를 정의합니다.
아래 코드 스니펫의 S3_BUCKET_NAME 및 REGION_NAME을 워크숍 버킷 및 지역으로 수정하십시오.
S3_BUCKET_NAME = "mwaa-sagemaker-bucket-<your_aws_account_id>"
REGION_NAME = "<AWS Region>" # Seoul: ap-northeast-2, Tokyo: ap-northeast-1, Virgina: us-east-1
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'retries': 0,
'retry_delay': timedelta(minutes=2),
'provide_context': True,
'email': ['airflow@iloveairflow.com'],
'email_on_failure': True,
'email_on_retry': False
}
다음은 DAG에서 ML 학습과 예측을 수행하기 위해 필요한 estimator를 설정합니다.
#-----------
### Setup Train and Tunning Tasks.
#-----------
# read config file
config = cfg.get_config(S3_BUCKET_NAME, REGION_NAME)
# set configuration for tasks
hook = AwsBaseHook(aws_conn_id='airflow-sagemaker', resource_type="sagemaker")
region = config["job_level"]["region_name"]
sess = hook.get_session(region_name=region)
role = get_sagemaker_role_arn(
config["train_model"]["sagemaker_role"],
sess.region_name)
container = get_image_uri(sess.region_name, 'xgboost', repo_version='1.0-1')
hpo_enabled = False
train_input = config["train_model"]["inputs"]["train"]
csv_train_input = sagemaker.session.s3_input(train_input, content_type='csv')
validation_input = config["train_model"]["inputs"]["validation"]
csv_validation_input = sagemaker.session.s3_input(validation_input, content_type='csv')
training_inputs = {"train": csv_train_input, "validation": csv_validation_input}
output_path = config["train_model"]["estimator_config"]["output_path"]
fm_estimator = Estimator(image_name=container,
role=role,
train_instance_count=1,
train_instance_type='ml.m5.2xlarge',
train_volume_size=5, # 5 GB
output_path=output_path,
sagemaker_session=sagemaker.session.Session(sess),
#train_use_spot_instances=True,
#train_max_run=300,
#train_max_wait=600
)
fm_estimator.set_hyperparameters(max_depth=5,
eta=0.2,
#gamma=4,
#min_child_weight=300,
#subsample=0.8,
#silent=0,
objective='reg:linear',
early_stopping_rounds=10,
num_round=150)
# train_config specifies SageMaker training configuration
train_config = training_config(
estimator=fm_estimator,
inputs=training_inputs)
# create transform config
transform_config = transform_config_from_estimator(
estimator=fm_estimator,
task_id="model_tuning" if hpo_enabled else "model_training",
task_type="tuning" if hpo_enabled else "training",
**config["batch_transform"]["transform_config"]
)
다음은 기본 설정을 적용하여 DAG 개체를 만들고 DAG가 실행되어야 하는 스케줄 간격을 지정합니다.
#-------
### Start creating DAGs
#-------
dag = DAG(
'ml_pipeline',
default_args=default_args,
dagrun_timeout=timedelta(hours=2)
#schedule_interval='0 3 * * *'
)
s3_sensor = S3PrefixSensor(
task_id='s3_sensor',
bucket_name=S3_BUCKET_NAME,
prefix='raw/',
dag=dag
)
학습 및 테스트 데이터 세트를 생성할 전처리 작업을 추가합니다.
# Create python operator to call our preprocess function (preprocess.py file).
preprocess_task = PythonOperator(
task_id='preprocessing',
python_callable=preprocess,
op_kwargs={'bucket_name': S3_BUCKET_NAME},
dag=dag)
Airflow Amazon SageMaker Operators를 사용하면 Amazon SageMaker 작업을 만들 수 있습니다. 다음과 같이 SageMakerTrainingOperator를 사용하여 Amazon SageMaker XG-Boost 알고리즘을 학습합니다.
# launch sagemaker training job and wait until it completes
train_model_task = SageMakerTrainingOperator(
task_id='model_training',
dag=dag,
config=train_config,
aws_conn_id='airflow-sagemaker',
wait_for_completion=True,
check_interval=30
)
Airflow SageMakerTransformOperator로 Amazon SageMaker 배치 추론 작업을 생성하여 테스트 데이터 세트를 기반으로 모델의 성능을 평가합니다.
# launch sagemaker batch transform job and wait until it completes
batch_transform_task = SageMakerTransformOperator(
task_id='predicting',
dag=dag,
config=transform_config,
aws_conn_id='airflow-sagemaker',
wait_for_completion=True,
check_interval=30,
trigger_rule=TriggerRule.ONE_SUCCESS
)
또한 SageMakerTuningOperator를 사용하면 하이퍼파라미터 튜닝 작업을 실행할 수 있어 최상의 모델을 찾을 수도 있습니다. 관련 내용은 블로그 게시물을 참조 하시길 바랍니다.
여기까지 ML 파이프라인의 일부 작업을 구성했으며 다음 단계는 이 모든 작업을 연결하고 DAG를 Airflow에 배포하고 DAG를 실행 하겠습니다.