Feast (Feature Store) is a tool for managing and serving machine learning features.
To execute this notebook, you'll first need to install Feast and connect to an existing deployment. To get started, follow the instructions in this Getting Started guide here. In short, you will need to:
pip install feast
git clone https://github.com/feast-dev/feast.git
cd feast/infra/docker-compose
cp .env.sample .env
infra/docker-compose/gcp-service-accounts foldergsutil mb gs://my-feast-staging-bucket
Configure the .env file to reference your service key:
FEAST_CORE_GCP_SERVICE_ACCOUNT_KEYFEAST_BATCH_SERVING_GCP_SERVICE_ACCOUNT_KEYFEAST_JUPYTER_GCP_SERVICE_ACCOUNT_KEYConfigure the following fields in the feast/infra/docker-compose/serving/batch-serving.yml file:
feast.stores.config.project_idfeast.stores.config.dataset_idfeast.stores.config.staging_locationStart Feast:
docker-compose \
-f docker-compose.yml \
-f docker-compose.online.yml \
-f docker-compose.batch.yml \
up -d
In [1]:
import os
# Feast Core acts as the central feature registry
FEAST_CORE_URL = os.getenv('FEAST_CORE_URL', 'localhost:6565')
# Feast Online Serving allows for the retrieval of real-time feature data
FEAST_ONLINE_SERVING_URL = os.getenv('FEAST_ONLINE_SERVING_URL', 'localhost:6566')
# Feast Batch Serving allows for the retrieval of historical feature data
FEAST_BATCH_SERVING_URL = os.getenv('FEAST_BATCH_SERVING_URL', 'localhost:6567')
In [43]:
#!pip install --user feast
In [44]:
#!pip install --user xgboost
In [2]:
import pandas as pd
import numpy as np
from pytz import timezone, utc
from feast import Client, FeatureSet, Entity, ValueType
from feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
from feast.types.Value_pb2 import Value as Value
from google.protobuf.duration_pb2 import Duration
from datetime import datetime, timedelta
from random import randrange
import random
from sklearn.model_selection import train_test_split
from xgboost import XGBRegressor
In [3]:
!head taxi-train.csv
In [4]:
COL_NAMES = ['fare_amount', 'pickup_datetime', 'pickup_longitude', \
'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count', 'taxi_id']
taxi_df = pd.read_csv('taxi-train.csv', names=COL_NAMES)
In [5]:
taxi_df.head()
Out[5]:
In [6]:
# needs datetime field in datetime[ns] format
# create a datetime field from pickup_datetime
taxi_datetime = pd.to_datetime(taxi_df.pickup_datetime, unit='ns', utc=True)
taxi_df.insert(2, "datetime", taxi_datetime, True)
In [7]:
taxi_df.head()
Out[7]:
In [8]:
def compute_dist(row):
lat1, lon1 = row.pickup_latitude, row.pickup_longitude
lat2, lon2 = row.dropoff_latitude, row.dropoff_longitude
londiff = lon2 - lon1
latdiff = lat2 - lat1
return np.sqrt(londiff*londiff + latdiff*latdiff)
In [9]:
taxi_df['euclid_dist'] = taxi_df.apply(compute_dist, axis=1)
In [10]:
taxi_df.head()
Out[10]:
In [11]:
# Connect to FEAST core
client = Client(core_url=FEAST_CORE_URL)
In [12]:
client.list_feature_sets()
Out[12]:
In [13]:
FS_NAME = "taxirides"
taxi_fs = FeatureSet("taxirides")
In [14]:
taxi_fs.infer_fields_from_df(taxi_df,
entities=[Entity(name='taxi_id', dtype=ValueType.INT64)],
replace_existing_features=True)
In [15]:
client.apply(taxi_fs)
In [16]:
client.list_feature_sets()
Out[16]:
In [17]:
print(client.get_feature_set('taxirides'))
In [18]:
client.ingest(taxi_fs, taxi_df)
Out[18]:
In [19]:
_feast_batch_client = Client(serving_url=FEAST_BATCH_SERVING_URL,
core_url=FEAST_CORE_URL)
In [20]:
model_features = ['pickup_latitude',
'pickup_longitude',
'dropoff_latitude',
'dropoff_longitude',
'passenger_count',
'euclid_dist']
target = 'fare_amount'
In [21]:
# Add the target variable to our feature list
features = model_features + [target]
To pull batch features, we provide an entity dataframe that contains the entities and timestamps we want to retrieve. We'll provide every pairing to get all offline features for training.
In [22]:
taxis = taxi_df.taxi_id.unique()
days = taxi_df.datetime.unique()
entity_df = pd.DataFrame(
{
"datetime": [day for day in days for taxi in taxis],
"taxi_id": [taxi for day in days for taxi in taxis],
}
)
In [23]:
entity_df.shape
Out[23]:
In [24]:
FS_NAME = "taxirides"
# Retrieve training dataset from Feast
dataset = _feast_batch_client.get_batch_features(
feature_refs=[FS_NAME + ":" + feature for feature in features],
entity_rows=entity_df).to_dataframe()
dataset.dropna(inplace=True) # not all pairing of datetime and taxi_id have entry
In [25]:
dataset.head()
Out[25]:
In [28]:
x_train, x_test, y_train, y_test = \
train_test_split(dataset[[FS_NAME + "__" + feature for feature in model_features]],
dataset[FS_NAME + "__" + target],
test_size=0.25, random_state=42)
In [29]:
model = XGBRegressor(base_score=0.5, booster='gbtree', colsample_bylevel=1,
colsample_bynode=1, colsample_bytree=1, gamma=0,
importance_type='gain', learning_rate=0.1, max_delta_step=0,
max_depth=3, min_child_weight=1, missing=None, n_estimators=100,
n_jobs=1, nthread=None, objective='reg:squarederror', random_state=0,
reg_alpha=0, reg_lambda=1, scale_pos_weight=1, seed=None,
silent=None, subsample=1, verbosity=1)
# Next, we'll fit the model with training data.
model.fit(x_train, y_train)
Out[29]:
In [30]:
train_score = model.score(x_train, y_train)
test_score = model.score(x_test, y_test)
print("Training score: ", train_score)
print("Testing score: ", test_score)
In [31]:
_feast_online_client = Client(serving_url=FEAST_ONLINE_SERVING_URL)
In [32]:
# for a single taxi_id
taxi_id = 1
online_features = _feast_online_client.get_online_features(
feature_refs=[FS_NAME + ":" + feature for feature in model_features],
entity_rows=[
GetOnlineFeaturesRequest.EntityRow(
fields={
"taxi_id": Value(
int64_val=taxi_id)
}
)
],
)
In [33]:
print(online_features)
In [34]:
# Convert to Pandas dataframe
features_dict = dict.fromkeys([FS_NAME + "__" + feature for feature in model_features])
for row in online_features.field_values:
for feature in model_features:
if features_dict[FS_NAME + "__" + feature] is None:
features_dict[FS_NAME + "__" + feature] = []
if feature in ['passenger_count']:
features_dict[FS_NAME + "__" + feature].append(row.fields[FS_NAME + ":" + feature].int64_val)
else:
features_dict[FS_NAME + "__" + feature].append(row.fields[FS_NAME + ":" + feature].double_val)
In [35]:
features_dict
Out[35]:
In [36]:
predict_df = pd.DataFrame.from_dict(features_dict)
In [37]:
model.predict(predict_df)
Out[37]:
In [38]:
# Create a Pandas dataframe
features_dict = dict.fromkeys([FS_NAME + "__" + feature for feature in model_features] + ['taxi_id'])
# all taxi_ids
taxi_ids = taxi_df.taxi_id.unique()
entity_rows = []
for taxi_id in taxi_ids.tolist():
entity_rows.append(
GetOnlineFeaturesRequest.EntityRow(fields={'taxi_id': Value(int64_val=taxi_id)})
)
In [39]:
data = _feast_online_client.get_online_features(
feature_refs=[FS_NAME + ":" + feature for feature in model_features],
entity_rows=entity_rows)
In [40]:
for row in data.field_values:
# capture taxi_id
if features_dict['taxi_id'] is None:
features_dict['taxi_id'] = []
features_dict['taxi_id'].append(row.fields['taxi_id'].int64_val)
# get all feature values
for feature in model_features:
if features_dict[FS_NAME + "__" + feature] is None:
features_dict[FS_NAME + "__" + feature] = []
if feature in ['passenger_count']:
features_dict[FS_NAME + "__" + feature].append(row.fields[FS_NAME + ":" + feature].int64_val)
else:
features_dict[FS_NAME + "__" + feature].append(row.fields[FS_NAME + ":" + feature].double_val)
In [41]:
predict_df = pd.DataFrame.from_dict(features_dict)
predict_df.head()
Out[41]:
In [42]:
pd.DataFrame.from_dict({'taxi_id': predict_df.taxi_id,
'prediciton': model.predict(predict_df.drop('taxi_id', axis=1))})
Out[42]:
Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License