In this script, we load the model from the Code/3_model_building.ipynb
Jupyter notebook and the labeled feature data set constructed in the Code/2_feature_engineering.ipynb
notebook in order to build the model deployment artifacts. We create deployment functions, which we test locally in the notebook. We package a model schema file, the deployment run functions file, and the model created in the previous notebook into a deployment file. We load this package onto our Azure blob storage for deployment.
The remainder of this notebook details steps required to deploy and operationalize the model using Azure Machine Learning Model Management environment for use in production in realtime.
Note: This notebook will take about 1 minute to execute all cells, depending on the compute configuration you have setup.
In [1]:
## setup our environment by importing required libraries
import json
import os
import shutil
import time
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
# for creating pipelines and model
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer
# setup the pyspark environment
from pyspark.sql import SparkSession
from azureml.api.schema.dataTypes import DataTypes
from azureml.api.schema.sampleDefinition import SampleDefinition
from azureml.api.realtime.services import generate_schema
# For Azure blob storage access
from azure.storage.blob import BlockBlobService
from azure.storage.blob import PublicAccess
# For logging model evaluation parameters back into the
# AML Workbench run history plots.
import logging
from azureml.logging import get_azureml_logger
amllog = logging.getLogger("azureml")
amllog.level = logging.INFO
# Turn on cell level logging.
%azureml history on
%azureml history show
# Time the notebook execution.
# This will only make sense if you "Run all cells"
tic = time.time()
logger = get_azureml_logger() # logger writes to AMLWorkbench runtime view
spark = SparkSession.builder.getOrCreate()
# Telemetry
logger.log('amlrealworld.predictivemaintenance.operationalization','true')
Out[1]:
We need to load the feature data set from memory to construct the operationalization schema. We again will require your storage account name and account key to connect to the blob storage.
In [2]:
# Enter your Azure blob storage details here
ACCOUNT_NAME = "<your blob storage account name>"
# You can find the account key under the _Access Keys_ link in the
# [Azure Portal](portal.azure.com) page for your Azure storage container.
ACCOUNT_KEY = "<your blob storage account key>"
#-------------------------------------------------------------------------------------------
# We will create this container to hold the results of executing this notebook.
# If this container name already exists, we will use that instead, however
# This notebook will ERASE ALL CONTENTS.
CONTAINER_NAME = "featureengineering"
FE_DIRECTORY = 'featureengineering_files.parquet'
MODEL_CONTAINER = 'modeldeploy'
# Connect to your blob service
az_blob_service = BlockBlobService(account_name=ACCOUNT_NAME, account_key=ACCOUNT_KEY)
# Create a new container if necessary, otherwise you can use an existing container.
# This command creates the container if it does not already exist. Else it does nothing.
az_blob_service.create_container(CONTAINER_NAME,
fail_on_exist=False,
public_access=PublicAccess.Container)
# create a local path where to store the results later.
if not os.path.exists(FE_DIRECTORY):
os.makedirs(FE_DIRECTORY)
# download the entire parquet result folder to local path for a new run
for blob in az_blob_service.list_blobs(CONTAINER_NAME):
if CONTAINER_NAME in blob.name:
local_file = os.path.join(FE_DIRECTORY, os.path.basename(blob.name))
az_blob_service.get_blob_to_path(CONTAINER_NAME, blob.name, local_file)
fedata = spark.read.parquet(FE_DIRECTORY)
fedata.limit(5).toPandas().head(5)
Out[2]:
The init() function initializes your web service, loading in any data or models that you need to score your inputs. In the example below, we load in the trained model. This command is run when the Docker container containing your service initializes.
The run() function defines what is executed on a scoring call. In our simple example, we simply load in the input as a data frame, and run our pipeline on the input, and return the prediction.
Start by defining the init() and run() functions, test them with example data. Then write them to the score.py
file for deployment.
In [3]:
# Initialize the deployment environment
def init():
# read in the model file
from pyspark.ml import PipelineModel
global pipeline
pipeline = PipelineModel.load(os.environ['AZUREML_NATIVE_SHARE_DIRECTORY']+'pdmrfull.model')
# Run the model and return the scored result.
def run(input_df):
import json
response = ''
try:
#Get prediction results for the dataframe
# We'll use the known label, key variables and
# a few extra columns we won't need.
key_cols =['label_e','machineID','dt_truncated', 'failure','model_encoded','model' ]
# Then get the remaing feature names from the data
input_features = input_df.columns
# Remove the extra stuff if it's in the input_df
input_features = [x for x in input_features if x not in set(key_cols)]
# Vectorize as in model building
va = VectorAssembler(inputCols=(input_features), outputCol='features')
data = va.transform(input_df).select('machineID','features')
score = pipeline.transform(data)
predictions = score.collect()
#Get each scored result
preds = [str(x['prediction']) for x in predictions]
response = ",".join(preds)
except Exception as e:
print("Error: {0}",str(e))
return (str(e))
# Return results
print(json.dumps(response))
return json.dumps(response)
In [4]:
# We'll use the known label, key variables and
# a few extra columns we won't need. (machineID is required)
key_cols =['label_e','dt_truncated', 'failure','model_encoded','model' ]
# Then get the remaining feature names from the data
input_features = fedata.columns
# Remove the extra stuff if it's in the input_df
input_features = [x for x in input_features if x not in set(key_cols)]
# define the input data frame
inputs = {"input_df": SampleDefinition(DataTypes.SPARK,
fedata.select(input_features))}
json_schema = generate_schema(run_func=run, inputs=inputs, filepath='service_schema.json')
We can then test the init()
and run()
functions right here in the notebook. It's about impossible to debug after publish a web service.
First we get a sample test observation that we can score. For this, we can randomly select a single record from the test data we've loaded from Azure blob.
In [5]:
# Randomly select a record from the loaded test data.
smple = fedata.sample(False, .8).limit(1).select(input_features)
smple.toPandas().head()
Out[5]:
The deployment requires first initializing (init()
) the environment, then running the model with the supplied data fields (run(<data>)
). The run()
function returns the predicted label, 0.0
indicates a healthy record, other values correspond to the component predicted to fail within the next 7 days (1.0, 2.0, 3.0, 4.0
).
In [6]:
# test init() in local notebook
init()
# test run() in local notebook
run(smple)
Out[6]:
The model returned a 0.0
, indicating a healthy prediction. Comparing this to the actual value of the label_e
variable for this record would determine how the model actually did. However we did not include this feature in the sampled data, as it would not be available in the production environment.
In the following code block, we use the filter
function to select 10 records with a specific failure label (4.0
) indicating a failure for component 4 is probable within the next 7 days. You can see this by scrolling to the right to find the label_e
variable.
In [7]:
smple_f = fedata.filter(fedata.label_e == 4.0).sample(False, .8).limit(10)
smple_f.toPandas().head()
Out[7]:
Since we have already initialized the environment, we can submit this new record to the model for scoring. We need the record to align with the specified scheme, so we select out the features according to the input_features
vector.
In [8]:
run(smple_f.select(input_features))
Out[8]:
Comparing the output of this to the actual value indicates a mismatch in the failure prediction.
In [9]:
# save the schema file for deployment
out = json.dumps(json_schema)
with open(os.environ['AZUREML_NATIVE_SHARE_DIRECTORY'] + 'service_schema.json', 'w') as f:
f.write(out)
We will use %%writefile
meta command to save the init()
and run()
functions to the pdmscore.py
file. Because of how the %%writefile
command works, we have to copy these functions from the tested versions above into this code block.
In [10]:
%%writefile {os.environ['AZUREML_NATIVE_SHARE_DIRECTORY']}/pdmscore.py
import json
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier
# for creating pipelines and model
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer
def init():
# read in the model file
from pyspark.ml import PipelineModel
# read in the model file
global pipeline
pipeline = PipelineModel.load('pdmrfull.model')
def run(input_df):
response = ''
try:
# We'll use the known label, key variables and
# a few extra columns we won't need.
key_cols =['label_e','machineID','dt_truncated', 'failure','model_encoded','model' ]
# Then get the remaing feature names from the data
input_features = input_df.columns
# Remove the extra stuff if it's in the input_df
input_features = [x for x in input_features if x not in set(key_cols)]
# Vectorize as in model building
va = VectorAssembler(inputCols=(input_features), outputCol='features')
data = va.transform(input_df).select('machineID','features')
score = pipeline.transform(data)
predictions = score.collect()
#Get each scored result
preds = [str(x['prediction']) for x in predictions]
response = ",".join(preds)
except Exception as e:
print("Error: {0}",str(e))
return (str(e))
# Return results
print(json.dumps(response))
return json.dumps(response)
if __name__ == "__main__":
init()
run("{\"input_df\":[{\"machineID\":114,\"volt_rollingmean_3\":163.375732902,\"rotate_rollingmean_3\":333.149484586,\"pressure_rollingmean_3\":100.183951698,\"vibration_rollingmean_3\":44.0958812638,\"volt_rollingmean_24\":164.114723991,\"rotate_rollingmean_24\":277.191815232,\"pressure_rollingmean_24\":97.6289110707,\"vibration_rollingmean_24\":50.8853505161,\"volt_rollingstd_3\":21.0049565219,\"rotate_rollingstd_3\":67.5287259378,\"pressure_rollingstd_3\":12.9361526861,\"vibration_rollingstd_3\":4.61359760918,\"volt_rollingstd_24\":15.5377738062,\"rotate_rollingstd_24\":67.6519885441,\"pressure_rollingstd_24\":10.528274633,\"vibration_rollingstd_24\":6.94129487555,\"error1sum_rollingmean_24\":0.0,\"error2sum_rollingmean_24\":0.0,\"error3sum_rollingmean_24\":0.0,\"error4sum_rollingmean_24\":0.0,\"error5sum_rollingmean_24\":0.0,\"comp1sum\":489.0,\"comp2sum\":549.0,\"comp3sum\":549.0,\"comp4sum\":564.0,\"age\":18.0}]}")
These files are stored in the ['AZUREML_NATIVE_SHARE_DIRECTORY']
location on the kernel host machine with the model stored in the 3_model_building.ipynb
notebook. In order to share these assets and operationalize the model, we create a new blob container and store a compressed file containing those assets for later retrieval from the deployment location.
In [11]:
# Compress the operationalization assets for easy blob storage transfer
MODEL_O16N = shutil.make_archive('o16n', 'zip', os.environ['AZUREML_NATIVE_SHARE_DIRECTORY'])
# Create a new container if necessary, otherwise you can use an existing container.
# This command creates the container if it does not already exist. Else it does nothing.
az_blob_service.create_container(MODEL_CONTAINER,
fail_on_exist=False,
public_access=PublicAccess.Container)
# Transfer the compressed operationalization assets into the blob container.
az_blob_service.create_blob_from_path(MODEL_CONTAINER, "o16n.zip", str(MODEL_O16N) )
# Time the notebook execution.
# This will only make sense if you "Run All" cells
toc = time.time()
print("Full run took %.2f minutes" % ((toc - tic)/60))
logger.log("Operationalization Run time", ((toc - tic)/60))
Out[11]:
Once the assets are stored, we can download them into a deployment compute context for operationalization on an Azure web service. For this scenario, we will deploy this on our local context.
We demonstrate how to setup this web service this through a CLI window opened in the AML Workbench application.
To download the model we've saved, follow these instructions on a local computer.
Once downloaded, unzip the file into the directory of your choosing. The zip file contains three deployment assets:
pdmscore.py
filepdmrfull.model
directoryservice_schema.json
fileCreate a modelmanagement under your account. We will call this pdmmodelmanagement
. The remaining defaults are acceptable.
az ml account modelmanagement create --location <ACCOUNT_REGION> --resource-group <RESOURCE_GROUP> --name pdmmodelmanagement
If you get a ResourceGroupNotFound
error, you may need to set the correct subscription. This is typically only an issue if your Azure login connects to multiple subscritpions.
az account set -s '<subscription name>'
You can find the subscription name
or subscription id
through the (https://portal.azure.com) under the resource group you'd like to use.
Show what environment is currently active:
az ml env show
If nothing is set, we setup the environment with the existing model management context first:
az ml env setup --location <ACCOUNT_REGION> --resource-group <RESOURCE_GROUP> --name pdmmodelmanagement
using the same <ACCOUNT_REGION>
and <RESOURCE_GROUP>
in the previous section. Then set the current environment:
az ml env set --resource-group <RESOURCE_GROUP> --cluster-name pdmmodelmanagement
Check that the environment is now set:
az ml env show
Once the model management environment is setup, we'll deploy the web service from the CLI to a local docker container for this demonstration. This assumes you have docker installed localy (https://www.docker.com/get-docker).
Once docker is installed and running, you will need to prepare the local docker container, just as we didi the remote container.
az ml experiment prepare -c docker
Now deploy the solution to this container.
These commands assume the current directory contains the webservice assets we created in throughout the notebooks in this scenario (pdmscore.py
, service_schema.json
and pdmrfull.model
). Change to the directory where the zip file was unpacked.
The command to create a web service (<SERVICE_ID>
) with these operationalization assets in the current directory is:
az ml service create realtime -f <filename> -r <TARGET_RUNTIME> -m <MODEL_FILE> -s <SCHEMA_FILE> -n <SERVICE_ID> --cpu 0.1
The default cluster has only 2 nodes with 2 cores each. Some cores are taken for system components. AMLWorkbench asks for 1 core per service. To deploy multiple services into this cluster, we specify the cpu requirement in the service create command as (--cpu 0.1) to request 10% of a core.
For this example, we will call our webservice amlworkbenchpdmwebservice
. This SERVICE_ID
must be all lowercase, with no spaces:
az ml service create realtime -f pdmscore.py -r spark-py -m pdmrfull.model -s service_schema.json --cpu 0.1 -n amlworkbenchpdmwebservice
This command will take some time to execute.
Once complete, the az ml service create
command returns sample usage commands to test the service for both PowerShell and the cmd prompt. We can test this deployment by executing these commands from the command line. For our example:
az ml service run realtime -i amlworkbenchpdmwebservice --% -d "{\"input_df\": [{\"rotate_rollingmean_36\": 450.0384342542265, \"rotate_rollingstd_36\": 0.0, \"volt_rollingmean_24\": 166.69782028530955, \"volt_rollingmean_36\": 166.5072079613422, \"comp1sum\": 504.0, \"comp2sum\": 564.0, \"error3sum_rollingmean_24\": 0.0, \"vibration_rollingmean_24\": 40.302192663278625, \"pressure_rollingstd_24\": 0.0, \"vibration_rollingstd_12\": 0.0, \"pressure_rollingstd_12\": 0.0, \"rotate_rollingmean_12\": 445.7130438343768, \"volt_rollingstd_24\": 0.0, \"comp3sum\": 444.0, \"error1sum_rollingmean_24\": 0.0, \"vibration_rollingstd_36\": 0.0, \"rotate_rollingstd_24\": 0.0, \"volt_rollingstd_36\": 0.0, \"rotate_rollingmean_24\": 444.92430808877185, \"error2sum_rollingmean_24\": 0.0, \"pressure_rollingstd_36\": 0.0, \"comp4sum\": 399.0, \"machineID\": 27, \"pressure_rollingmean_24\": 100.42784289855126, \"volt_rollingmean_12\": 162.37456132546583, \"error4sum_rollingmean_24\": 0.0, \"pressure_rollingmean_12\": 103.46853199581041, \"age\": 9, \"pressure_rollingmean_36\": 99.1626730910439, \"volt_rollingstd_12\": 0.0, \"rotate_rollingstd_12\": 0.0, \"vibration_rollingmean_36\": 39.86004229336383, \"vibration_rollingstd_24\": 0.0, \"error5sum_rollingmean_24\": 0.0, \"vibration_rollingmean_12\": 39.69610732198209}, {\"rotate_rollingmean_36\": 452.58602482190344, \"rotate_rollingstd_36\": 1.3063227195446807, \"volt_rollingmean_24\": 168.8315798036505, \"volt_rollingmean_36\": 166.8633264221902, \"comp1sum\": 504.0, \"comp2sum\": 564.0, \"error3sum_rollingmean_24\": 0.0, \"vibration_rollingmean_24\": 39.8762193116053, \"pressure_rollingstd_24\": 0.5506261833397947, \"vibration_rollingstd_12\": 0.5581845837178677, \"pressure_rollingstd_12\": 1.3059590035299573, \"rotate_rollingmean_12\": 448.82482383859184, \"volt_rollingstd_24\": 1.1327450423992658, \"comp3sum\": 444.0, \"error1sum_rollingmean_24\": 0.0, \"vibration_rollingstd_36\": 0.12802019423837702, \"rotate_rollingstd_24\": 6.2252625510326345, \"volt_rollingstd_36\": 1.2113288898088435, \"rotate_rollingmean_24\": 455.68853459771736, \"error2sum_rollingmean_24\": 0.0, \"pressure_rollingstd_36\": 0.360813923769749, \"comp4sum\": 399.0, \"machineID\": 27, \"pressure_rollingmean_24\": 98.84197839575184, \"volt_rollingmean_12\": 169.6342364499553, \"error4sum_rollingmean_24\": 0.0, \"pressure_rollingmean_12\": 100.13428527324218, \"age\": 9, \"pressure_rollingmean_36\": 99.18126302139088, \"volt_rollingstd_12\": 1.7162303092954838, \"rotate_rollingstd_12\": 7.358009183124642, \"vibration_rollingmean_36\": 39.83194043387068, \"vibration_rollingstd_24\": 0.26866456414969686, \"error5sum_rollingmean_24\": 0.0, \"vibration_rollingmean_12\": 40.534215611846555}, {\"rotate_rollingmean_36\": 452.6366978657443, \"rotate_rollingstd_36\": 0.726203655443797, \"volt_rollingmean_24\": 165.47787140830766, \"volt_rollingmean_36\": 164.9839282666808, \"comp1sum\": 503.0, \"comp2sum\": 563.0, \"error3sum_rollingmean_24\": 0.0, \"vibration_rollingmean_24\": 39.48080284488274, \"pressure_rollingstd_24\": 0.43573594568766316, \"vibration_rollingstd_12\": 0.33150005427630586, \"pressure_rollingstd_12\": 0.30398746497620055, \"rotate_rollingmean_12\": 462.5522453568429, \"volt_rollingstd_24\": 1.388783538126311, \"comp3sum\": 443.0, \"error1sum_rollingmean_24\": 0.0, \"vibration_rollingstd_36\": 0.06733738203927228, \"rotate_rollingstd_24\": 2.2615583783043336, \"volt_rollingstd_36\": 0.4066137169118576, \"rotate_rollingmean_24\": 454.4666253135592, \"error2sum_rollingmean_24\": 0.0, \"pressure_rollingstd_36\": 0.40800640702349306, \"comp4sum\": 398.0, \"machineID\": 27, \"pressure_rollingmean_24\": 98.70475189546528, \"volt_rollingmean_12\": 168.0289231573457, \"error4sum_rollingmean_24\": 0.0, \"pressure_rollingmean_12\": 97.5496715182615, \"age\": 9, \"pressure_rollingmean_36\": 99.92595364177775, \"volt_rollingstd_12\": 1.9026812928919759, \"rotate_rollingstd_12\": 12.545522310840685, \"vibration_rollingmean_36\": 39.16084871098736, \"vibration_rollingstd_24\": 0.2757178837764945, \"error5sum_rollingmean_24\": 0.0, \"vibration_rollingmean_12\": 39.21822301136402}]}"
This submits 3 records to the model through the web service, and returns predictioned output labels for each of the three rows:
"0.0,0.0,0.0"
Indicating that these records are predicted to be healthy with in the requested 7 day time window.
We can view additional service usage information with the following command.
az ml service usage realtime -i amlworkbenchpdmwebservice
Which indicates how the service is currently deployed:
Scoring URL:
http://127.0.0.1:32770/score
Headers:
Content-Type: application/json
Swagger URL:
http://127.0.0.1:32770/swagger.json
Sample CLI command:
...
Working through all of these notebooks, we have completed:
Code/1_data_aquisition.ipynb
notebook.Code/2_feature_engineering.ipynb
notebook.Code/3_model_building.ipynb
notebook.Code/4_operationalization.ipynb
notebook.