데이터 준비의 일부로 Pandas DataFrame이 사용됩니다. 데이터 준비 작업에서는 다음 단계가 수행됩니다.
다음 코드를 사용하여 preprocess.py 파일을 만드십시오.
import pandas as pd
import numpy as np
import boto3
import os
def preprocess(bucket_name): # we assign a function which we will call in our main DAG file -- using python operator.
my_region = boto3.session.Session().region_name # set the region of the instance
# set an output path where the trained model will be saved.
prefix = 'xgboost'
output_path ='s3://{}/{}/output'.format(bucket_name, prefix)
# Download file from S3 bucket and load in dataframe (model_data)
prefix_1 = 'raw' # Enter your folder where you will upload your dataet file
data_file = 'train_1.csv' # Enter the name of your dataset file
data_location = 's3://{}/{}/{}'.format(bucket_name,prefix_1,data_file)
df = pd.read_csv(data_location)
# Check for missing data
#df.isnull().sum()
# Here we can see that Coloumn-2 "Pickup_datetime" is an object ---> which we need to convert to "datetime_object" to use in ML algorithms.
# Pandas can do that easily.
df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'], format= '%Y-%m-%d %H:%M:%S UTC')
# Since we see from above table Lat & Long are not in correct range of NYC -- we will drop the fields which fall outisde these ranges.
# we can filter the data using the df.loc function in Pandas.
df = df.loc[df['pickup_latitude'].between(40,42)]
df = df.loc[df['pickup_longitude'].between(-75,-72)]
df = df.loc[df['dropoff_latitude'].between(40,42)]
df = df.loc[df['dropoff_longitude'].between(-75,-72)]
# Now lets try to fix the "fare amount" and "passenger-count"
df = df.loc[df['fare_amount'] > 2.5] # US$ 2.50 is a minimum fare taxi will charge - so we are considering only those fields who are above $2.50.
df = df.loc[df['passenger_count'] > 0]
# Here we can see 1 outlier -- which is 9 passengers, which seem incorrect.
# Lets drop those outliers
df = df.loc[df['passenger_count'] <=6]
# Lets create new columns 'Year', 'month', 'Day' etc... from a single column "pickup_datetime".
df['year']=df.pickup_datetime.dt.year
df['month']=df.pickup_datetime.dt.month
df['day']=df.pickup_datetime.dt.day
df['weekday']=df.pickup_datetime.dt.weekday
df['hour']=df.pickup_datetime.dt.hour
# Lets calculate - distance now.
def haversine_np(lon1, lat1, lon2, lat2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
All args must be of equal length.
"""
lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2
c = 2 * np.arcsin(np.sqrt(a))
km = 6367 * c
return km
# Now lets create one more column 'distance'.
df['distance'] = haversine_np(df['pickup_longitude'],df['pickup_latitude'],df['dropoff_longitude'],df['dropoff_latitude'])
# We can see above that there are some points "min" -- which has zero distance - lets try to dop those fields.
df = df.loc[df['distance'] > 0]
# But before we pass our dataset to a algorithm to create a model --- lets drop the features which we don't need.
# For e.g. 'key' and 'pickup_datetime' -- becuase we have already extracted all those data in other columns.
del df['pickup_datetime']
# Train, Test Split
train_data, validation_data, test_data = np.split(df.sample(frac=1, random_state=1729), [int(.6*len(df)), int(.8*len(df))])
print(train_data.shape, validation_data.shape, test_data.shape)
# Lets create a csv file from this 'train_data' and upload to S3 bucket --- under 'xgboost' prefix.
train_data.to_csv('train.csv', index=False, header=False)
boto3.Session().resource('s3').Bucket(bucket_name).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
# Lets perform same steps for validatation data.
validation_data.to_csv('validate.csv', index=False, header=False)
boto3.Session().resource('s3').Bucket(bucket_name).Object(os.path.join(prefix, 'validate/validate.csv')).upload_file('validate.csv')
del test_data['fare_amount']
test_data.to_csv('test.csv', index=False, header=False)
boto3.Session().resource('s3').Bucket(bucket_name).Object(os.path.join(prefix, 'test/test.csv')).upload_file('test.csv')
데이터 준비 단계는 학습, 검증 및 테스트를 위한 CSV 파일을 S3 경로 s3://mwaa-sagemaker-bucket-<your_aws_account_id>/xgboost/로 복사합니다.
이 후에 DAG와 함께 preprocess.py를 Airflow에 배포 할 것입니다.