The common data elements for predictive maintenance problems can be summarized as follows:
It is possible that failure history is contained within maintenance history, either as in the form of special error codes or order dates for spare parts. In those cases, failures can be extracted from the maintenance data. Additionally, different business domains may have a variety of other data sources that influence failure patterns which are not listed here exhaustively. These should be identified by consulting the domain experts when building predictive models.
Some examples of above data elements from use cases are:
Machine conditions and usage: Flight routes and times, sensor data collected from aircraft engines, sensor readings from ATM transactions, train events data, sensor readings from wind turbines, elevators and connected cars.
Machine features: Circuit breaker technical specifications such as voltage levels, geolocation or car features such as make, model, engine size, tire types, production facility etc.
Failure history: fight delay dates, aircraft component failure dates and types, ATM cash withdrawal transaction failures, train/elevator door failures, brake disk replacement order dates, wind turbine failure dates and circuit breaker command failures.
Maintenance history: Flight error logs, ATM transaction error logs, train maintenance records including maintenance type, short description etc. and circuit breaker maintenance records.
Given the above data sources, the two main data types we observe in predictive maintenance domain are temporal data and static data. Failure history, machine conditions, repair history, usage history are time series indicated by the timestamp of data collection. Machine and operator specific features, are more static, since they usually describe the technical specifications of machines or operator’s properties.
For this scenario, we use a relatively large-scale data to walk the user through the main steps from data ingestion (this Jupyter notebook), feature engineering, model building, and model operationalization and deployment. The code for the entire process is written in PySpark and implemented using Jupyter notebooks within Azure ML Workbench. The selected model is operationalized using Azure Machine Learning Model Management for use in a production environment simulating making realtime failure predictions.
This data aquisiton notebook will download the simulated predicitive maintenance data sets from our GitHub data store. We do some preliminary data cleaning and verification, and store the results as a Spark data frame in an Azure Blob storage container for use in the remaining notebook steps of this analysis.
Note: This notebook will take about 10-15 minutes to execute all cells, depending on the compute configuration you have setup. Most of this time is spent handling the telemetry data set, which contains about 8.7 million records.
In [1]:
## Setup our environment by importing required libraries
import time
import os
import glob
import urllib
# Read csv file from URL directly
import pandas as pd
# For Azure blob storage access
from azure.storage.blob import BlockBlobService
from azure.storage.blob import PublicAccess
# For creating some preliminary EDA plots.
%matplotlib inline
import matplotlib.pyplot as plt
from ggplot import *
from datetime import datetime
# Setup the pyspark environment
from pyspark.sql import SparkSession
# For logging model evaluation parameters back into the
# AML Workbench run history plots.
import logging
from azureml.logging import get_azureml_logger
amllog = logging.getLogger("azureml")
amllog.level = logging.INFO
# Turn on cell level logging.
%azureml history on
%azureml history show
# Time the notebook execution.
# This will only make sense if you "Run All" cells
tic = time.time()
logger = get_azureml_logger() # logger writes to AMLWorkbench runtime view
spark = SparkSession.builder.getOrCreate()
# Telemetry
logger.log('amlrealworld.predictivemaintenance.data_ingestion','true')
Out[1]:
We will be storing intermediate results for use between these Jupyter notebooks in an Azure Blob Storage container. Instructions for setting up your Azure Storage account are available within this link (https://docs.microsoft.com/en-us/azure/storage/blobs/storage-python-how-to-use-blob-storage). You will need to copy your account name and account key from the Access Keys area in the portal into the following code block. These credentials will be reused in all four Jupyter notebooks.
We will handle creating the containers and writing the data to these containers for each notebook. Further instructions for using Azure Blob storage with AML Workbench are available (https://github.com/Azure/ViennaDocs/blob/master/Documentation/UsingBlobForStorage.md).
You will need to enter the ACCOUNT_NAME as well as the ACCOUNT_KEY in order to access Azure Blob storage account you have created. This notebook will create and store all the resulting data files in a blob container under this account.
In [2]:
# Enter your Azure blob storage details here
ACCOUNT_NAME = "<your blob storage account name>"
# You can find the account key under the _Access Keys_ link in the
# [Azure Portal](portal.azure.com) page for your Azure storage container.
ACCOUNT_KEY = "<your blob storage account key>"
#-------------------------------------------------------------------------------------------
# We will create this container to hold the results of executing this notebook.
# If this container name already exists, we will use that instead, however
# This notebook will ERASE ALL CONTENTS.
CONTAINER_NAME = "dataingestion"
# Connect to your blob service
az_blob_service = BlockBlobService(account_name=ACCOUNT_NAME, account_key=ACCOUNT_KEY)
# Create a new container if necessary, otherwise you can use an existing container.
# This command creates the container if it does not already exist. Else it does nothing.
az_blob_service.create_container(CONTAINER_NAME,
fail_on_exist=False,
public_access=PublicAccess.Container)
Out[2]:
We will be reusing the raw simulated data files from another tutorial. The notebook automatically downloads these files stored at Microsoft/SQL-Server-R-Services-Samples GitHub site.
The five data files are:
To get an idea of what is contained in the data, we examine this machine schematic.
There are 1000 machines of four different models. Each machine contains four components of interest, and four sensors measuring voltage, pressure, vibration and rotation. A controller monitors the system and raises alerts for five different error conditions. Maintenance logs indicate when something is done to the machine which does not include a component replacement. A failure is defined by the replacement of a component.
This notebook does some preliminary data cleanup, creates summary graphics for each data set to verify the data downloaded correctly, and stores the resulting data sets in the Azure blob container created in the previous section.
In [2]:
# The raw data is stored on GitHub here:
basedataurl = "http://media.githubusercontent.com/media/Microsoft/SQL-Server-R-Services-Samples/master/PredictiveMaintanenceModelingGuide/Data/"
# We will store each of these data sets in blob storage in an
# Azure Storage Container on your Azure subscription.
# See https://github.com/Azure/ViennaDocs/blob/master/Documentation/UsingBlobForStorage.md
# for details.
# These file names detail which blob each files is stored under.
MACH_DATA = 'machines_files.parquet'
MAINT_DATA = 'maint_files.parquet'
ERROR_DATA = 'errors_files.parquet'
TELEMETRY_DATA = 'telemetry_files.parquet'
FAILURE_DATA = 'failure_files.parquet'
In [3]:
# load raw data from the GitHub URL
datafile = "machines.csv"
# Download the file once, and only once.
if not os.path.isfile(datafile):
urllib.request.urlretrieve(basedataurl+datafile, datafile)
# Read into pandas
machines = pd.read_csv(datafile, encoding='utf-8')
print(machines.count())
machines.head(10)
Out[3]:
The following figure plots a histogram of the machines age colored by the specific model.
In [4]:
plt.figure(figsize=(8, 6))
_, bins, _ = plt.hist([machines.loc[machines['model'] == 'model1', 'age'],
machines.loc[machines['model'] == 'model2', 'age'],
machines.loc[machines['model'] == 'model3', 'age'],
machines.loc[machines['model'] == 'model4', 'age']],
20, stacked=True, label=['model1', 'model2', 'model3', 'model4'])
plt.xlabel('Age (yrs)')
plt.ylabel('Count')
plt.legend()
Out[4]:
The figure shows how long the collection of machines have been in service. It indicates there are four model types, shown in different colors, and all four models have been in service over the entire 20 years of service. The machine age will be a feature in our analysis, since we expect older machines may have a different set of errors and failures then machines that have not been in service long.
Next, we convert the machines data to a Spark dataframe, and verify the data types have converted correctly.
In [5]:
# The data was read in using a Pandas data frame. We'll convert
# it to pyspark to ensure it is in a Spark usable form for later
# manipulations.
mach_spark = spark.createDataFrame(machines,
verifySchema=False)
# We no longer need th pandas dataframe, so we can release that memory.
del machines
# Check data type conversions.
mach_spark.printSchema()
Now we write the spark dataframe to an Azure blob storage container for use in the remaining notebooks of this scenario.
In [6]:
# Write the Machine data set to intermediate storage
mach_spark.write.mode('overwrite').parquet(MACH_DATA)
for blob in az_blob_service.list_blobs(CONTAINER_NAME):
if MACH_DATA in blob.name:
az_blob_service.delete_blob(CONTAINER_NAME, blob.name)
# upload the entire folder into blob storage
for name in glob.iglob(MACH_DATA + '/*'):
print(os.path.abspath(name))
az_blob_service.create_blob_from_path(CONTAINER_NAME, name, name)
print("Machines files saved!")
The error log contains non-breaking errors recorded while the machine is still operational. These errors are not considered failures, though they may be predictive of a future failure event. The error datetime field is rounded to the closest hour since the telemetry data (loaded later) is collected on an hourly rate.
In [7]:
# load raw data from the GitHub URL
datafile = "errors.csv"
# Download the file once, and only once.
if not os.path.isfile(datafile):
urllib.request.urlretrieve(basedataurl+datafile, datafile)
# Read into pandas
errors = pd.read_csv(datafile, encoding='utf-8')
print(errors.count())
errors.head(10)
Out[7]:
The following histogram details the distribution of the errors tracked in the log files.
In [8]:
# Quick plot to show structure
ggplot(aes(x="errorID"), errors) + geom_bar(fill="blue", color="black")
Out[8]:
The error data consists of a time series (datetime stamped) of error codes thrown by each machine (machineID). The figure shows how many errors occured in each of the five error classes over the entire year. We could split this figure over each individual machine, but with 1000 individuals, the figure would not be very informative.
Next, we convert the errors data to a Spark dataframe, and verify the data types have converted correctly.
In [9]:
# The data was read in using a Pandas data frame. We'll convert
# it to pyspark to ensure it is in a Spark usable form for later
# manipulations.
error_spark = spark.createDataFrame(errors,
verifySchema=False)
# We no longer need the pandas dataframe, so we can release that memory.
del errors
# Check data type conversions.
error_spark.printSchema()
Now we write the spark dataframe to an Azure blob storage container for use in the remaining notebooks of this scenario.
In [11]:
# Write the Errors data set to intermediate storage
error_spark.write.mode('overwrite').parquet(ERROR_DATA)
for blob in az_blob_service.list_blobs(CONTAINER_NAME):
if ERROR_DATA in blob.name:
az_blob_service.delete_blob(CONTAINER_NAME, blob.name)
# upload the entire folder into blob storage
for name in glob.iglob(ERROR_DATA + '/*'):
print(os.path.abspath(name))
az_blob_service.create_blob_from_path(CONTAINER_NAME, name, name)
print("Errors files saved!")
The maintenance log contains both scheduled and unscheduled maintenance records. Scheduled maintenance corresponds with regular inspection of components, unscheduled maintenance may arise from mechanical failure or other performance degradations. A failure record is generated for component replacement in the case of either maintenance events. Because maintenance events can also be used to infer component life, the maintenance data has been collected over two years (2014, 2015) instead of only over the year of interest (2015).
In [10]:
# load raw data from the GitHub URL
datafile = "maint.csv"
# Download the file once, and only once.
if not os.path.isfile(datafile):
urllib.request.urlretrieve(basedataurl+datafile, datafile)
# Read into pandas
maint = pd.read_csv(datafile, encoding='utf-8')
print(maint.count())
maint.head(20)
Out[10]:
In [11]:
# Quick plot to show structure
ggplot(aes(x="comp"), maint) + geom_bar(fill="blue", color="black")
Out[11]:
The figure shows a histogram of component replacements divided into the four component types over the entire maintenance history. It looks like these four components are replaced at similar rates.
There are many ways we might want to look at this data including calculating how long each component type lasts, or the time history of component replacements within each machine. This will take some preprocess of the data, which we are delaying until we do the feature engineering steps in the next example notebook.
Next, we convert the errors data to a Spark dataframe, and verify the data types have converted correctly.
In [12]:
# The data was read in using a Pandas data frame. We'll convert
# it to pyspark to ensure it is in a Spark usable form for later
# manipulations.
maint_spark = spark.createDataFrame(maint,
verifySchema=False)
# We no longer need the pandas dataframe, so we can release that memory.
del maint
# Check data type conversions.
maint_spark.printSchema()
Now we write the spark dataframe to an Azure blob storage container for use in the remaining notebooks of this scenario.
In [15]:
# Write the Maintenance data set to intermediate storage
maint_spark.write.mode('overwrite').parquet(MAINT_DATA)
for blob in az_blob_service.list_blobs(CONTAINER_NAME):
if MAINT_DATA in blob.name:
az_blob_service.delete_blob(CONTAINER_NAME, blob.name)
# upload the entire folder into blob storage
for name in glob.iglob(MAINT_DATA + '/*'):
print(os.path.abspath(name))
az_blob_service.create_blob_from_path(CONTAINER_NAME, name, name)
print("Maintenance files saved!")
In [15]:
# Github has been having some timeout issues. This should fix the problem for this dataset.
import socket
socket.setdefaulttimeout(30)
# load raw data from the GitHub URL
datafile = "telemetry.csv"
# Download the file once, and only once.
if not os.path.isfile(datafile):
urllib.request.urlretrieve(basedataurl+datafile, datafile)
# Read into pandas
telemetry = pd.read_csv(datafile, encoding='utf-8')
# handle missing values
# define groups of features
features_datetime = ['datetime']
features_categorical = ['machineID']
features_numeric = list(set(telemetry.columns) - set(features_datetime) - set(features_categorical))
# Replace numeric NA with 0
telemetry[features_numeric] = telemetry[features_numeric].fillna(0)
# Replace categorical NA with 'Unknown'
telemetry[features_categorical] = telemetry[features_categorical].fillna("Unknown")
# Counts...
print(telemetry.count())
# Examine 10 rowzs of data.
telemetry.head(10)
Out[15]:
In [17]:
# Check the incoming schema, we want to convert datetime to the correct type.
# format datetime field which comes in as string
telemetry.dtypes
Out[17]:
Rather than plot 8.7 million data points, this figure plots a month of measurements for a single machine. This is representative of each feature repeated for every machine over the entire year of sensor data collection.
In [18]:
plt_data = telemetry.loc[telemetry['machineID'] == 1]
# format datetime field which comes in as string
plt_data['datetime'] = pd.to_datetime(plt_data['datetime'], format="%Y-%m-%d %H:%M:%S")
# Quick plot to show structure
plot_df = plt_data.loc[(plt_data['datetime'] >= pd.to_datetime('2015-02-01')) &
(plt_data['datetime'] <= pd.to_datetime('2015-03-01'))]
plt_data = pd.melt(plot_df, id_vars=['datetime', 'machineID'])
ggplot(aes(x="datetime", y="value", color = "variable", group="variable"), plt_data) +\
geom_line() +\
scale_x_date(labels=date_format('%m-%d')) +\
facet_grid('variable', scales='free_y')
Out[18]:
The figure shows one month worth of telemetry sensor data for one machine. Each sensor is shown in it's own panel.
Next, we convert the errors data to a Spark dataframe, and verify the data types have converted correctly.
In [19]:
# The data was read in using a Pandas data frame. We'll convert
# it to pyspark to ensure it is in a Spark usable form for later
# manipulations.
# This line takes about 9.5 minutes to run.
telemetry_spark = spark.createDataFrame(telemetry, verifySchema=False)
# We no longer need the pandas dataframes, so we can release that memory.
del telemetry
del plt_data
del plot_df
# Check data type conversions.
telemetry_spark.printSchema()
Now we write the spark dataframe to an Azure blob storage container for use in the remaining notebooks of this scenario.
In [20]:
# Write the telemetry data set to intermediate storage
telemetry_spark.write.mode('overwrite').parquet(TELEMETRY_DATA)
for blob in az_blob_service.list_blobs(CONTAINER_NAME):
if TELEMETRY_DATA in blob.name:
az_blob_service.delete_blob(CONTAINER_NAME, blob.name)
# upload the entire folder into blob storage
for name in glob.iglob(TELEMETRY_DATA + '/*'):
print(os.path.abspath(name))
az_blob_service.create_blob_from_path(CONTAINER_NAME, name, name)
print("Telemetry files saved!")
In [21]:
# load raw data from the GitHub URL
datafile = "failures.csv"
# Download the file once, and only once.
if not os.path.isfile(datafile):
urllib.request.urlretrieve(basedataurl+datafile, datafile)
# Read into pandas
failures = pd.read_csv(datafile, encoding='utf-8')
print(failures.count())
failures.head(10)
Out[21]:
The following histogram details the distribution of the failure records obtained from failure log. This log was built originally from component replacements the maintenance log file.
In [22]:
# Plot failures
ggplot(aes(x="failure"), failures) + geom_bar(fill="blue", color="black")
Out[22]:
The figure shows failure related replacements occured for each of the 4 component types over the entire year.
Next, we convert the maintenance data to PySpark and store it in an Azure blob.
In [23]:
# The data was read in using a Pandas data frame. We'll convert
# it to pyspark to ensure it is in a Spark usable form for later
# manipulations.
failures_spark = spark.createDataFrame(failures,
verifySchema=False)
# Check data type conversions.
failures_spark.printSchema()
Now we write the spark dataframe to an Azure blob storage container for use in the remaining notebooks of this scenario.
In [24]:
# Write the failures data set to intermediate storage
failures_spark.write.mode('overwrite').parquet(FAILURE_DATA)
for blob in az_blob_service.list_blobs(CONTAINER_NAME):
if FAILURE_DATA in blob.name:
az_blob_service.delete_blob(CONTAINER_NAME, blob.name)
# upload the entire folder into blob storage
for name in glob.iglob(FAILURE_DATA + '/*'):
print(os.path.abspath(name))
az_blob_service.create_blob_from_path(CONTAINER_NAME, name, name)
print("Failure files saved!")
# Time the notebook execution.
# This will only make sense if you "Run All" cells
toc = time.time()
print("Full run took %.2f minutes" % ((toc - tic)/60))
logger.log("Data Ingestion Run time", ((toc - tic)/60))
Out[24]:
We have now downloaded the required data files in csv format. We converted the data into Pandas data frames so we could generate a few graphs to help us understand what was in each data file. Then saved them into an Azure Blob storage container as Spark data frames for use in the remaining analysis steps. The Code\2_feature_engineering.ipynb
Jupyter notebook will read these spark data frames from Azure blob and generate the modeling features for out predictive maintenance machine learning model.