데이터 준비

데이터 준비의 일부로 Pandas DataFrame이 사용됩니다. 데이터 준비 작업에서는 다음 단계가 수행됩니다.

  • Missing/Null 데이터 값 식별
  • Outlier 탐지
  • 데이터 변환 수행 (e.g. 날짜 형식)
  • 클린 데이터 세트를 학습 및 테스트 데이터 세트로 분할
  • 모델 학습을 위해 준비된 파일을 Amazon S3 버킷에 복사

다음 코드를 사용하여 preprocess.py 파일을 만드십시오.

import pandas as pd
import numpy as np
import boto3
import os 

def preprocess(bucket_name):    # we assign a function which we will call in our main DAG file -- using python operator.

    my_region = boto3.session.Session().region_name # set the region of the instance

    # set an output path where the trained model will be saved.
    prefix = 'xgboost'
    output_path ='s3://{}/{}/output'.format(bucket_name, prefix)

    # Download file from S3 bucket and load in dataframe (model_data)
    prefix_1 = 'raw'   # Enter your folder where you will upload your dataet file
    data_file = 'train_1.csv'    # Enter the name of your dataset file
    data_location = 's3://{}/{}/{}'.format(bucket_name,prefix_1,data_file)

    df = pd.read_csv(data_location)

    # Check for missing data
    #df.isnull().sum() 

    # Here we can see that Coloumn-2 "Pickup_datetime" is an object ---> which we need to convert to "datetime_object" to use in ML algorithms.
    # Pandas can do that easily.
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'], format= '%Y-%m-%d %H:%M:%S UTC')


    # Since we see from above table Lat & Long are not in correct range of NYC -- we will drop the fields which fall outisde these ranges.
    # we can filter the data using the df.loc function in Pandas.
    df = df.loc[df['pickup_latitude'].between(40,42)]
    df = df.loc[df['pickup_longitude'].between(-75,-72)]
    df = df.loc[df['dropoff_latitude'].between(40,42)]
    df = df.loc[df['dropoff_longitude'].between(-75,-72)]


    # Now lets try to fix the "fare amount" and "passenger-count" 
    df = df.loc[df['fare_amount'] > 2.5]   # US$ 2.50 is a minimum fare taxi will charge - so we are considering only those fields who are above $2.50.
    df = df.loc[df['passenger_count'] > 0]


    # Here we can see 1 outlier -- which is 9 passengers, which seem incorrect.
    # Lets drop those outliers
    df = df.loc[df['passenger_count'] <=6]

    # Lets create new columns 'Year', 'month', 'Day' etc... from a single column "pickup_datetime".
    df['year']=df.pickup_datetime.dt.year
    df['month']=df.pickup_datetime.dt.month
    df['day']=df.pickup_datetime.dt.day
    df['weekday']=df.pickup_datetime.dt.weekday
    df['hour']=df.pickup_datetime.dt.hour


    # Lets calculate - distance now.
    def haversine_np(lon1, lat1, lon2, lat2):

        """
        Calculate the great circle distance between two points
        on the earth (specified in decimal degrees)

        All args must be of equal length.    

        """
        lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])

        dlon = lon2 - lon1
        dlat = lat2 - lat1

        a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2

        c = 2 * np.arcsin(np.sqrt(a))
        km = 6367 * c
        return km


    # Now lets create one more column 'distance'.
    df['distance'] = haversine_np(df['pickup_longitude'],df['pickup_latitude'],df['dropoff_longitude'],df['dropoff_latitude'])


    # We can see above that there are some points "min" -- which has zero distance - lets try to dop those fields.
    df = df.loc[df['distance'] > 0]

    # But before we pass our dataset to a algorithm to create a model --- lets drop the features which we don't need.
    # For e.g. 'key' and 'pickup_datetime' -- becuase we have already extracted all those data in other columns.

    del df['pickup_datetime']


    # Train, Test Split
    train_data, validation_data, test_data = np.split(df.sample(frac=1, random_state=1729), [int(.6*len(df)), int(.8*len(df))])
    print(train_data.shape, validation_data.shape, test_data.shape)


    # Lets create a csv file from this 'train_data' and upload to S3 bucket --- under 'xgboost' prefix.
    train_data.to_csv('train.csv', index=False, header=False)
    boto3.Session().resource('s3').Bucket(bucket_name).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')

    # Lets perform same steps for validatation data.
    validation_data.to_csv('validate.csv', index=False, header=False)
    boto3.Session().resource('s3').Bucket(bucket_name).Object(os.path.join(prefix, 'validate/validate.csv')).upload_file('validate.csv')


    del test_data['fare_amount']

    test_data.to_csv('test.csv', index=False, header=False)
    boto3.Session().resource('s3').Bucket(bucket_name).Object(os.path.join(prefix, 'test/test.csv')).upload_file('test.csv')

데이터 준비 단계는 학습, 검증 및 테스트를 위한 CSV 파일을 S3 경로 s3://mwaa-sagemaker-bucket-<your_aws_account_id>/xgboost/로 복사합니다.

이 후에 DAG와 함께 preprocess.py를 Airflow에 배포 할 것입니다.