Feature Store using FEAST

Feast (Feature Store) is a tool for managing and serving machine learning features.

To execute this notebook, you'll first need to install Feast and connect to an existing deployment. To get started, follow the instructions in this Getting Started guide here. In short, you will need to:

  1. Install docker-compose if it's not already on your machine. You can also deploy Feast using GKE which is better suited for production purposes.
  2. Install feast
    • pip install feast
      
  3. Clone the Feast repository and navigate to the infra/docker-compose sub-directory
    • git clone https://github.com/feast-dev/feast.git
      
    • cd feast/infra/docker-compose
      
  4. Make a copy of the .env.sample file
    • cp .env.sample .env
      
  5. Create a service account and copy it to the infra/docker-compose/gcp-service-accounts folder
  6. Create a GCS bucket to use for staging
    • gsutil mb gs://my-feast-staging-bucket
      
  7. Configure the .env file to reference your service key:

    • FEAST_CORE_GCP_SERVICE_ACCOUNT_KEY
    • FEAST_BATCH_SERVING_GCP_SERVICE_ACCOUNT_KEY
    • FEAST_JUPYTER_GCP_SERVICE_ACCOUNT_KEY
  8. Configure the following fields in the feast/infra/docker-compose/serving/batch-serving.yml file:

    • feast.stores.config.project_id
    • feast.stores.config.dataset_id
    • feast.stores.config.staging_location
  9. Start Feast:

    docker-compose \
        -f docker-compose.yml \
        -f docker-compose.online.yml \
        -f docker-compose.batch.yml \
        up -d
    

Configuration

Set up the serving clients for offline and batch feature retrieval.


In [1]:
import os

# Feast Core acts as the central feature registry
FEAST_CORE_URL = os.getenv('FEAST_CORE_URL', 'localhost:6565')

# Feast Online Serving allows for the retrieval of real-time feature data
FEAST_ONLINE_SERVING_URL = os.getenv('FEAST_ONLINE_SERVING_URL', 'localhost:6566')

# Feast Batch Serving allows for the retrieval of historical feature data
FEAST_BATCH_SERVING_URL = os.getenv('FEAST_BATCH_SERVING_URL', 'localhost:6567')

Import libraries and modules


In [43]:
#!pip install --user feast

In [44]:
#!pip install --user xgboost

In [2]:
import pandas as pd
import numpy as np
from pytz import timezone, utc
from feast import Client, FeatureSet, Entity, ValueType
from feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
from feast.types.Value_pb2 import Value as Value
from google.protobuf.duration_pb2 import Duration
from datetime import datetime, timedelta
from random import randrange
import random

from sklearn.model_selection import train_test_split
from xgboost import XGBRegressor

Ingesting features into Feast

Read in taxifare data

Read in the taxifare data from .csv file and inspect with Pandas.


In [3]:
!head taxi-train.csv


5.5,2020-04-01 22:32:37,-73.99022674560547,40.749820709228516,-73.99752807617188,40.744781494140625,4,0
5.0,2020-06-14 00:21:24,-73.9786605834961,40.78569030761719,-73.9762191772461,40.77631378173828,1,0
5.0,2020-02-28 10:50:48,-73.99897003173828,40.745609283447266,-73.9891357421875,40.747825622558594,1,0
8.5,2020-01-20 16:25:38,-73.987548828125,40.72443389892578,-73.977783203125,40.7481803894043,1,0
6.0,2020-03-04 19:24:49,-73.98148345947266,40.76824188232422,-73.98226165771484,40.75939178466797,3,0
7.0,2020-08-23 11:36:04,-73.93275451660156,40.79555892944336,-73.94598388671875,40.777591705322266,1,0
8.5,2020-04-13 19:24:08,-73.98275756835938,40.767696380615234,-73.98104095458984,40.78606414794922,1,0
9.0,2020-04-07 12:31:59,-73.87153625488281,40.75267028808594,-73.88893127441406,40.748294830322266,1,0
6.0,2020-09-05 18:36:26,-73.9574203491211,40.76603317260742,-73.95464324951172,40.7779426574707,1,0
6.0,2020-02-24 23:28:34,-73.99248504638672,40.72425842285156,-73.98728942871094,40.73808670043945,1,0

In [4]:
COL_NAMES = ['fare_amount', 'pickup_datetime', 'pickup_longitude', \
             'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count', 'taxi_id']

taxi_df = pd.read_csv('taxi-train.csv', names=COL_NAMES)

In [5]:
taxi_df.head()


Out[5]:
fare_amount pickup_datetime pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude passenger_count taxi_id
0 5.5 2020-04-01 22:32:37 -73.990227 40.749821 -73.997528 40.744781 4 0
1 5.0 2020-06-14 00:21:24 -73.978661 40.785690 -73.976219 40.776314 1 0
2 5.0 2020-02-28 10:50:48 -73.998970 40.745609 -73.989136 40.747826 1 0
3 8.5 2020-01-20 16:25:38 -73.987549 40.724434 -73.977783 40.748180 1 0
4 6.0 2020-03-04 19:24:49 -73.981483 40.768242 -73.982262 40.759392 3 0

In [6]:
# needs datetime field in datetime[ns] format
# create a datetime field from pickup_datetime
taxi_datetime = pd.to_datetime(taxi_df.pickup_datetime, unit='ns', utc=True)
taxi_df.insert(2, "datetime", taxi_datetime, True)

In [7]:
taxi_df.head()


Out[7]:
fare_amount pickup_datetime datetime pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude passenger_count taxi_id
0 5.5 2020-04-01 22:32:37 2020-04-01 22:32:37+00:00 -73.990227 40.749821 -73.997528 40.744781 4 0
1 5.0 2020-06-14 00:21:24 2020-06-14 00:21:24+00:00 -73.978661 40.785690 -73.976219 40.776314 1 0
2 5.0 2020-02-28 10:50:48 2020-02-28 10:50:48+00:00 -73.998970 40.745609 -73.989136 40.747826 1 0
3 8.5 2020-01-20 16:25:38 2020-01-20 16:25:38+00:00 -73.987549 40.724434 -73.977783 40.748180 1 0
4 6.0 2020-03-04 19:24:49 2020-03-04 19:24:49+00:00 -73.981483 40.768242 -73.982262 40.759392 3 0

Create new features: Euclidean distance

Engineer an additional feature which provides the Euclidean distance from pickup location to dropoff location.


In [8]:
def compute_dist(row):
    lat1, lon1 = row.pickup_latitude, row.pickup_longitude
    lat2, lon2 = row.dropoff_latitude, row.dropoff_longitude
    londiff = lon2 - lon1
    latdiff = lat2 - lat1
    return np.sqrt(londiff*londiff + latdiff*latdiff)

In [9]:
taxi_df['euclid_dist'] = taxi_df.apply(compute_dist, axis=1)

In [10]:
taxi_df.head()


Out[10]:
fare_amount pickup_datetime datetime pickup_longitude pickup_latitude dropoff_longitude dropoff_latitude passenger_count taxi_id euclid_dist
0 5.5 2020-04-01 22:32:37 2020-04-01 22:32:37+00:00 -73.990227 40.749821 -73.997528 40.744781 4 0 0.008871
1 5.0 2020-06-14 00:21:24 2020-06-14 00:21:24+00:00 -73.978661 40.785690 -73.976219 40.776314 1 0 0.009689
2 5.0 2020-02-28 10:50:48 2020-02-28 10:50:48+00:00 -73.998970 40.745609 -73.989136 40.747826 1 0 0.010081
3 8.5 2020-01-20 16:25:38 2020-01-20 16:25:38+00:00 -73.987549 40.724434 -73.977783 40.748180 1 0 0.025676
4 6.0 2020-03-04 19:24:49 2020-03-04 19:24:49+00:00 -73.981483 40.768242 -73.982262 40.759392 3 0 0.008884

Connect to FEAST and create a FeatureSet with this dataframe


In [11]:
# Connect to FEAST core
client = Client(core_url=FEAST_CORE_URL)

In [12]:
client.list_feature_sets()


Out[12]:
[]

In [13]:
FS_NAME = "taxirides"
taxi_fs = FeatureSet("taxirides")

In [14]:
taxi_fs.infer_fields_from_df(taxi_df,
                             entities=[Entity(name='taxi_id', dtype=ValueType.INT64)],
                             replace_existing_features=True)


Entity taxi_id(ValueType.INT64) manually updated (replacing an existing field).
Feature fare_amount (ValueType.DOUBLE) added from dataframe.
Feature pickup_longitude (ValueType.DOUBLE) added from dataframe.
Feature pickup_latitude (ValueType.DOUBLE) added from dataframe.
Feature dropoff_longitude (ValueType.DOUBLE) added from dataframe.
Feature dropoff_latitude (ValueType.DOUBLE) added from dataframe.
Feature passenger_count (ValueType.INT64) added from dataframe.
Feature euclid_dist (ValueType.DOUBLE) added from dataframe.


In [15]:
client.apply(taxi_fs)


Feature set created: "taxirides"

In [16]:
client.list_feature_sets()


Out[16]:
[default/taxirides]

In [17]:
print(client.get_feature_set('taxirides'))


{
  "spec": {
    "name": "taxirides",
    "entities": [
      {
        "name": "taxi_id",
        "valueType": "INT64"
      }
    ],
    "features": [
      {
        "name": "pickup_longitude",
        "valueType": "DOUBLE"
      },
      {
        "name": "dropoff_latitude",
        "valueType": "DOUBLE"
      },
      {
        "name": "dropoff_longitude",
        "valueType": "DOUBLE"
      },
      {
        "name": "fare_amount",
        "valueType": "DOUBLE"
      },
      {
        "name": "euclid_dist",
        "valueType": "DOUBLE"
      },
      {
        "name": "passenger_count",
        "valueType": "INT64"
      },
      {
        "name": "pickup_latitude",
        "valueType": "DOUBLE"
      }
    ],
    "maxAge": "0s",
    "source": {
      "type": "KAFKA",
      "kafkaSourceConfig": {
        "bootstrapServers": "kafka:9092,localhost:9094",
        "topic": "feast-features"
      }
    },
    "project": "default"
  },
  "meta": {
    "createdTimestamp": "2020-06-26T18:45:45Z",
    "status": "STATUS_PENDING"
  }
}

In [18]:
client.ingest(taxi_fs, taxi_df)


Waiting for feature set to be ready for ingestion...
100%|██████████| 28247/28247 [00:01<00:00, 19406.32rows/s]
Ingestion complete!

Ingestion statistics:
Success: 28247/28247
Removing temporary file(s)...
Out[18]:
'29c921f6-7462-3b73-92c6-50c02aeedc45'

Retrieving feature stores from Feast

Get batch features for training

To access historical/office features, we'll set up a Feast serving batch client.


In [19]:
_feast_batch_client = Client(serving_url=FEAST_BATCH_SERVING_URL,
                             core_url=FEAST_CORE_URL)

In [20]:
model_features = ['pickup_latitude',
                  'pickup_longitude',
                  'dropoff_latitude',
                  'dropoff_longitude',
                  'passenger_count',
                  'euclid_dist']

target = 'fare_amount'

In [21]:
# Add the target variable to our feature list
features = model_features + [target]

To pull batch features, we provide an entity dataframe that contains the entities and timestamps we want to retrieve. We'll provide every pairing to get all offline features for training.


In [22]:
taxis = taxi_df.taxi_id.unique()
days = taxi_df.datetime.unique() 

entity_df = pd.DataFrame(
    {
        "datetime": [day for day in days for taxi in taxis],
        "taxi_id": [taxi for day in days for taxi in taxis],
    }
)

In [23]:
entity_df.shape


Out[23]:
(109193, 2)

In [24]:
FS_NAME = "taxirides"

# Retrieve training dataset from Feast
dataset = _feast_batch_client.get_batch_features(
    feature_refs=[FS_NAME + ":" + feature for feature in features],
    entity_rows=entity_df).to_dataframe()
dataset.dropna(inplace=True)  # not all pairing of datetime and taxi_id have entry

In [25]:
dataset.head()


Out[25]:
event_timestamp taxi_id taxirides__pickup_latitude taxirides__pickup_longitude taxirides__dropoff_latitude taxirides__dropoff_longitude taxirides__passenger_count taxirides__euclid_dist taxirides__fare_amount
1044 2020-04-10 14:31:50+00:00 0 40.75267 -73.871536 40.748295 -73.888931 1.0 0.017937 9.0
1045 2020-04-08 10:59:21+00:00 0 40.75267 -73.871536 40.748295 -73.888931 1.0 0.017937 9.0
1046 2020-04-09 06:13:34+00:00 0 40.75267 -73.871536 40.748295 -73.888931 1.0 0.017937 9.0
1047 2020-04-12 14:27:28+00:00 0 40.75267 -73.871536 40.748295 -73.888931 1.0 0.017937 9.0
1048 2020-04-09 22:36:21+00:00 0 40.75267 -73.871536 40.748295 -73.888931 1.0 0.017937 9.0

In [28]:
x_train, x_test, y_train, y_test = \
    train_test_split(dataset[[FS_NAME + "__" + feature for feature in model_features]],
                     dataset[FS_NAME + "__" + target],
                     test_size=0.25, random_state=42)

In [29]:
model = XGBRegressor(base_score=0.5, booster='gbtree', colsample_bylevel=1,
       colsample_bynode=1, colsample_bytree=1, gamma=0,
       importance_type='gain', learning_rate=0.1, max_delta_step=0,
       max_depth=3, min_child_weight=1, missing=None, n_estimators=100,
       n_jobs=1, nthread=None, objective='reg:squarederror', random_state=0,
       reg_alpha=0, reg_lambda=1, scale_pos_weight=1, seed=None,
       silent=None, subsample=1, verbosity=1)

# Next, we'll fit the model with training data.
model.fit(x_train, y_train)


Out[29]:
XGBRegressor(base_score=0.5, booster='gbtree', colsample_bylevel=1,
             colsample_bynode=1, colsample_bytree=1, gamma=0, gpu_id=-1,
             importance_type='gain', interaction_constraints='',
             learning_rate=0.1, max_delta_step=0, max_depth=3,
             min_child_weight=1, missing=None, monotone_constraints='()',
             n_estimators=100, n_jobs=1, nthread=1, num_parallel_tree=1,
             objective='reg:squarederror', random_state=0, reg_alpha=0,
             reg_lambda=1, scale_pos_weight=1, seed=0, silent=None, subsample=1,
             tree_method='exact', validate_parameters=1, verbosity=1)

In [30]:
train_score = model.score(x_train, y_train)  
test_score = model.score(x_test, y_test)  
print("Training score: ", train_score)
print("Testing score: ", test_score)


Training score:  0.9999999890464406
Testing score:  0.9999999891959381

Predict with online features


In [31]:
_feast_online_client = Client(serving_url=FEAST_ONLINE_SERVING_URL)

In [32]:
# for a single taxi_id
taxi_id = 1

online_features = _feast_online_client.get_online_features(
    feature_refs=[FS_NAME + ":" + feature for feature in model_features],
    entity_rows=[
        GetOnlineFeaturesRequest.EntityRow(
            fields={
                "taxi_id": Value(
                    int64_val=taxi_id)
            }
        )
    ],
)

In [33]:
print(online_features)


field_values {
  fields {
    key: "taxi_id"
    value {
      int64_val: 1
    }
  }
  fields {
    key: "taxirides:dropoff_latitude"
    value {
      double_val: 40.78923797607422
    }
  }
  fields {
    key: "taxirides:dropoff_longitude"
    value {
      double_val: -73.96871948242188
    }
  }
  fields {
    key: "taxirides:euclid_dist"
    value {
      double_val: 0.015991973949749216
    }
  }
  fields {
    key: "taxirides:passenger_count"
    value {
      int64_val: 1
    }
  }
  fields {
    key: "taxirides:pickup_latitude"
    value {
      double_val: 40.77964401245117
    }
  }
  fields {
    key: "taxirides:pickup_longitude"
    value {
      double_val: -73.95592498779297
    }
  }
}


In [34]:
# Convert to Pandas dataframe
features_dict = dict.fromkeys([FS_NAME + "__" + feature for feature in model_features])

for row in online_features.field_values:
    for feature in model_features:
        if features_dict[FS_NAME + "__" + feature] is None:
            features_dict[FS_NAME + "__" + feature] = []    
        if feature in ['passenger_count']:
            features_dict[FS_NAME + "__" + feature].append(row.fields[FS_NAME + ":" + feature].int64_val)
        else:
            features_dict[FS_NAME + "__" + feature].append(row.fields[FS_NAME + ":" + feature].double_val)

In [35]:
features_dict


Out[35]:
{'taxirides__pickup_latitude': [40.77964401245117],
 'taxirides__pickup_longitude': [-73.95592498779297],
 'taxirides__dropoff_latitude': [40.78923797607422],
 'taxirides__dropoff_longitude': [-73.96871948242188],
 'taxirides__passenger_count': [1],
 'taxirides__euclid_dist': [0.015991973949749216]}

In [36]:
predict_df = pd.DataFrame.from_dict(features_dict)

In [37]:
model.predict(predict_df)


Out[37]:
array([4.9998827], dtype=float32)

Batch predict job for all taxi_ids


In [38]:
# Create a Pandas dataframe
features_dict = dict.fromkeys([FS_NAME + "__" + feature for feature in model_features] + ['taxi_id'])

# all taxi_ids
taxi_ids = taxi_df.taxi_id.unique()

entity_rows = []
for taxi_id in taxi_ids.tolist():
    entity_rows.append(
        GetOnlineFeaturesRequest.EntityRow(fields={'taxi_id': Value(int64_val=taxi_id)})
    )

In [39]:
data = _feast_online_client.get_online_features(
    feature_refs=[FS_NAME + ":" + feature for feature in model_features],
    entity_rows=entity_rows)

In [40]:
for row in data.field_values:
    # capture taxi_id
    if features_dict['taxi_id'] is None:
        features_dict['taxi_id'] = []
    features_dict['taxi_id'].append(row.fields['taxi_id'].int64_val)
    
    # get all feature values
    for feature in model_features:
        if features_dict[FS_NAME + "__" + feature] is None:
            features_dict[FS_NAME + "__" + feature] = []    
        if feature in ['passenger_count']:
            features_dict[FS_NAME + "__" + feature].append(row.fields[FS_NAME + ":" + feature].int64_val)
        else:
            features_dict[FS_NAME + "__" + feature].append(row.fields[FS_NAME + ":" + feature].double_val)

In [41]:
predict_df = pd.DataFrame.from_dict(features_dict)
predict_df.head()


Out[41]:
taxirides__pickup_latitude taxirides__pickup_longitude taxirides__dropoff_latitude taxirides__dropoff_longitude taxirides__passenger_count taxirides__euclid_dist taxi_id
0 40.748283 -73.985092 40.773758 -73.870667 1 0.117227 7
1 40.722446 -73.987366 40.648792 -73.977425 1 0.074322 18
2 40.779644 -73.955925 40.789238 -73.968719 1 0.015992 1
3 40.752628 -74.004257 40.747871 -74.000458 2 0.006088 11
4 40.728329 -73.999672 40.761410 -73.968971 1 0.045132 5

In [42]:
pd.DataFrame.from_dict({'taxi_id': predict_df.taxi_id, 
                        'prediciton': model.predict(predict_df.drop('taxi_id', axis=1))})


Out[42]:
taxi_id prediciton
0 7 8.999670
1 18 8.999670
2 1 4.999883
3 11 8.999670
4 5 8.999670
5 16 5.999835
6 10 8.999670
7 3 8.999670
8 14 8.499792
9 8 8.499792
10 2 8.999670
11 17 5.999835
12 12 8.999670
13 6 4.999883
14 0 8.999670
15 15 8.999670
16 9 8.999670
17 4 8.999670
18 13 8.999670

Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License