""" Statistical process control charts to emulate and enhance functionality provided by MiniTab Written Bryce Lakamp, unless otherwise stated """
In [19]:
import numpy as np
import scipy.stats
import pandas as pd
from collections import OrderedDict
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
#import matplotlib.table as tbl
import matplotlib.gridspec as gridspec
import matplotlib.ticker as tkr
%matplotlib inline
In [2]:
def unbias_const(constant, observation):
'''Moving range unbiasing constant depends on distance between range points. Tabular look up between 0 and 25, calculated between 26 and 100
ref: http://support.minitab.com/en-us/minitab-express/1/help-and-how-to/control-charts/how-to/variables-data-in-subgroups/xbar-r-chart/methods-and-formulas/unbiasing-constants-d2-d3-and-d4/
'''
d2 = [1.128, 1.693, 2.059, 2.326, 2.534, 2.704, 2.847, 2.97, 3.078, 3.173, 3.258, 3.336, 3.407, 3.472, 3.532, 3.588, 3.64, 3.689, 3.735, 3.778, 3.819, 3.858, 3.895, 3.931, 3.964, 3.997, 4.027, 4.057, 4.086, 4.113, 4.139, 4.165, 4.189, 4.213, 4.236, 4.259, 4.28, 4.301, 4.322, 4.341, 4.361, 4.379, 4.398, 4.415, 4.433, 4.45, 4.466, 4.482, 4.498]
d3 = [0.8525, 0.8884, 0.8798, 0.8641, 0.848, 0.8332, 0.8198, 0.8078, 0.7971, 0.7873, 0.7785, 0.7704, 0.763, 0.7562, 0.7499, 0.7441, 0.7386, 0.7335, 0.7287, 0.7242, 0.7199, 0.7159, 0.7121, 0.7084]
d4 = [0.954, 1.588, 1.978, 2.257, 2.472, 2.645, 2.791, 2.915, 3.024, 3.121, 3.207, 3.285, 3.356, 3.422, 3.482, 3.538, 3.591, 3.64, 3.686, 3.73, 3.771, 3.811, 3.847, 3.883]
assert constant in ['d2', 'd3', 'd4']
assert int(observation) <= 100 or int(observation) >= 2
#raise ValueError('Observation outside of predictable range')
if constant == 'd2' and observation <= 50:
return d2[observation - 2]
elif constant == 'd2' and observation > 50:
return 3.4873 + 0.0250141 * observation - 0.00009823 * observation **2
if constant == 'd3' and observation <= 25:
return d3[observation - 2]
elif constant == 'd3' and observation > 25:
return 0.80818 - 0.0051871 * observation + 0.00005098 * observation ** 2 - 0.00000019 * observation ** 3
if constant == 'd4' and observation <= 25:
return d4[observation - 2]
elif constant == 'd4' and observation >25:
return 2.88606 + 0.051313 * observation -0.00049243 * observation ** 2 + 0.00000188 * observation ** 3
'''
To do: review source material for calculation
D. J. Wheeler and D. S. Chambers. (1992). Understanding Statistical Process Control, Second Edition, SPC Press, Inc.
H. Leon Harter (1960). "Tables of Range and Studentized Range". The Annals of Mathematical Statistics, Vol. 31, No. 4, Institute of Mathematical Statistics, 1122−1147.
'''
In [85]:
class ControlChart():
'''
Tests for special cause
ruleset invesigation inspired by Michal Nowikowski <godfryd@gmail.com>
'''
tests = {
0: "1 point > n standard deviations from centerline",
1: "n points in a row on same side of centerline",
2: "n points in a row, all increasing or all decreasing",
3: "n points in a row, alternating up and down",
4: "n of n+1 > 2 standard deviations from center line on same side",
5: "n of n+1 > 1 standard deviations from center line on same side",
6: "n points in a row within 1 standard deviation of centerline on either side",
7: "n points in a row greater than 1 standard deviation from centerline on either side"
}
def __init__(self
, data
, *args
, spec_max = None
, spec_min = None
#, index = 0
, sigma_level = 3
, on_same_side = 9
, trending = 6
, alternating = 14
, beyond_2sigma = 2
, beyond_1sigma = 4
, within_1sigma = 15
, outside_1sigma = 8
, test_group = None
, **kwargs
):
self.data = pd.Series(data)
self.usl = spec_max
self.lsl = spec_min
self.sigma_level = sigma_level
self.on_same_side = on_same_side
self.trending = trending
self.alternating = alternating
self.beyond_2sigma = beyond_2sigma
self.beyond_1sigma = beyond_1sigma
self.within_1sigma = within_1sigma
self.outside_1sigma = outside_1sigma
self.test_group = test_group
#self.index = index #amount to shift output dataframe index by
def _test_list(self):
self.test_list = self.test_group if self.test_group else []
test_matrix = pd.DataFrame(index = self.data.index)
for i in self.test_list:
test_matrix[i] = {
0: self._test0_run,
1: self._test1_run,
2: self._test2_run,
3: self._test3_run,
4: self._test4_run,
5: self._test5_run,
6: self._test6_run,
7: self._test7_run
}[i]()
self.fail_series = self.data[test_matrix.any(axis = 1)]
'''lists of popular groupings
RULES_BASIC = [RULES_1_BEYOND_3SIGMA,
RULES_7_ON_ONE_SIDE]
RULES_PMI = [RULES_1_BEYOND_3SIGMA,
RULES_8_ON_ONE_SIDE]
RULES_WECO = [RULES_1_BEYOND_3SIGMA,
RULES_2_OF_3_BEYOND_2SIGMA,
RULES_4_OF_5_BEYOND_1SIGMA,
RULES_8_ON_ONE_SIDE,
RULES_6_TRENDING, RULES_14_UP_DOWN]
RULES_NELSON = [RULES_1_BEYOND_3SIGMA,
RULES_9_ON_ONE_SIDE,
RULES_6_TRENDING,
RULES_14_UP_DOWN,
RULES_2_OF_3_BEYOND_2SIGMA,
RULES_4_OF_5_BEYOND_1SIGMA,
RULES_15_BELOW_1SIGMA,
RULES_8_BEYOND_1SIGMA_BOTH_SIDES]
RULES_ALL = [RULES_1_BEYOND_3SIGMA,
RULES_2_OF_3_BEYOND_2SIGMA,
RULES_4_OF_5_BEYOND_1SIGMA,
RULES_7_ON_ONE_SIDE,
RULES_8_ON_ONE_SIDE,
RULES_6_TRENDING,
RULES_14_UP_DOWN,
RULES_15_BELOW_1SIGMA,
RULES_8_BEYOND_1SIGMA_BOTH_SIDES]
'''
def _test_spec_run(self):
if self.usl:
above = self.data > self.usl
self.usl_series = pd.Series(np.full(len(self.data.index), self.usl, dtype=np.float64), name = 'usl', index = self.data.index)
else:
above = np.array([False] * len(self.data.index))
if self.lsl:
below = self.data < self.lsl
self.lsl_series = pd.Series(np.full(len(self.data.index), self.lsl, dtype=np.float64), name = 'lsl', index = self.data.index)
else:
below = np.array([False] * len(self.data.index))
self.oos_series = pd.Series(self.data[above | below], name = 'oos', index = self.data.index)
if self.usl or self.lsl: self._cpk_calc()
def _cpk_calc(self):
#Calc CPU
try:
cpu = (self.usl - self.centerline) / 3 / self.stddev
except TypeError:
cpu = False
#Calc CPL
try:
cpl = (self.centerline - self.lsl) / 3 / self.stddev
except TypeError:
cpl = False
#Take min of existing
if cpu and cpl:
self.cpk = min(cpu, cpl)
elif cpu and not cpl:
self.cpk = cpu
elif not cpu and cpl:
self.cpk = cpl
def _test0_run(self):
"1 point > n standard deviations from centerline, returns True if out"
above = self.data > (self.centerline + self.sigma_level * self.stddev)
below = self.data < (self.centerline - self.sigma_level * self.stddev)
return above | below
def _test1_run(self):
return [False] * len(self.data.index)
def _test2_run(self):
return [False] * len(self.data.index)
def _test3_run(self):
return [False] * len(self.data.index)
def _test4_run(self):
return [False] * len(self.data.index)
def _test5_run(self):
return [False] * len(self.data.index)
def _test6_run(self):
return [False] * len(self.data.index)
def _test7_run(self):
return [False] * len(self.data.index)
'''tests = {
0: [_test0_run(), "1 point > n standard deviations from centerline"],
1: [_test1_run(), "n points in a row on same side of centerline"],
2: [_test2_run(), "n points in a row, all increasing or all decreasing"],
3: [_test3_run(), "n points in a row, alternating up and down"],
4: [_test4_run(), "n of n+1 > 2 standard deviations from center line on same side"],
5: [_test5_run(), "n of n+1 > 1 standard deviations from center line on same side"],
6: [_test6_run(), "n points in a row within 1 standard deviation of centerline on either side"],
7: [_test7_run(), "n points in a row greater than 1 standard deviation from centerline on either side"]
}'''
###need to concatenate plot points based on the series index entering the class
def get_plot_points(self):
'''Returns pandas dataframe for individual data, center line value, upper and lower control limits, upper and lower spec limits, as applicable'''
plots = pd.concat(
[
self.data.rename('pts'),
self.centerline_series,
self.lcl_series,
self.ucl_series
], axis = 1)
if self.usl: plots['usl'] = self.usl_series
if self.lsl: plots['lsl'] = self.lsl_series
if self.usl or self.lsl: plots['oos'] = self.oos_series
if self.test_group: plots['fail'] = self.fail_series
return plots
In [144]:
class MovingRange(ControlChart):
'''
Generates Shewhart moving range control chart data, from positional element data, including approximate sigma. Plot using other packages
observation range, how large of a subset to review range of; default 2
sigma_level, test for special cause 1 rejection level; default 3
'''
def __init__(self, *args, observation_range = 2, stddev_only = False, **kwargs):
super(MovingRange, self).__init__(*args, **kwargs)
self.observation_range = observation_range
self._moving_range_array()
#self._moving_range_median() if use_median else self._moving_range_mean()
self._mean()
self._stddev()
if not stddev_only:
self._centerline()
self._lcl()
self._ucl()
self._test_list()
self._test_spec_run()
#print('sigma level Moving range', self.sigma_level)
def _moving_range_array(self):
'''Generate an array from the moving average for the given observation range. Overwrites input data, for Class consistancy.
Need to perform time study to determine if this method or if building comparison method is faster'''
#add nan to the beginning of each segment
mr = [] #[np.NaN] * (self.observation_range - 1)
#pd.Series([np.NaN] * (self.observation_range - 1), index = self.data.index[:self.observation_range], name = 'pts')
blank = list(range(self.observation_range - 1))
for i in range(len(self.data.index)):
if i in blank:
mr.append(np.NaN)
else:
subset = self.data[i - self.observation_range + 1 : i + 1]
mr.append(max(subset) - min(subset))
self.data = pd.Series(mr, index = self.data.index, name = 'pts')
def _mean(self):
self.mean = self.data.mean()#[0] #get value only
#def _median(self):
#unable to achieve same stats using this method. More investigation necessary. MR-Bar not the same
# self.centerline = self.data.median
def _stddev(self):
self.stddev = self.mean / unbias_const('d2', self.observation_range)
def _centerline(self, index = 0):
self.centerline = self.mean
self.centerline_series = pd.Series(np.full(len(self.data.index), self.centerline, dtype=np.float64)
, name = 'cen'
, index = self.data.index
)
def _lcl(self):
lcl = self.mean - self.sigma_level * unbias_const('d3', self.observation_range) * self.stddev
self.lcl = max(0, lcl)
self.lcl_series = pd.Series(np.full(len(self.data.index), self.lcl, dtype=np.float64)
, name = 'lcl'
, index = self.data.index
)
def _ucl(self):
self.ucl = self.mean + self.sigma_level * unbias_const('d3', self.observation_range) * self.stddev
self.ucl_series = pd.Series(np.full(len(self.data.index), self.ucl, dtype=np.float64)
, name = 'ucl'
, index = self.data.index
)
#def _test_spec_run(self): #overwrite for moving range
# self.oos_series = pd.Series([np.NaN] * len(self.data.index), name = 'oos', index = self.data.index)
def get_S(self):
return self.stddev
In [130]:
class Individual(ControlChart):
'''
Generates plot data for base indvidual data. Accepts modifications to mean, stdev, UCL & LCL for historical comparison.
observation range, how large of a subset to review range of, only applicable to moving range sub calculation; default 2
sigma_level, test for special cause 1 rejection level; default 3
'''
def __init__(self, *args, process_mean = None, stddev = None, observation_range = 2, **kwargs):
super(Individual, self).__init__(*args, **kwargs)
self.observation_range = observation_range
self._mean(process_mean)
self.stddev = stddev if stddev else self.get_S()
self._centerline()
self._lcl()
self._ucl()
self._test_list()
self._test_spec_run()
self._ADnormality()
def _mean(self, process_mean):
self.mean = process_mean if process_mean else self.data.mean()
def _centerline(self):
self.centerline = self.mean #add future use of median if necessary
self.centerline_series = pd.Series(np.full(len(self.data.index), self.centerline, dtype=np.float64)
, name = 'cen'
, index = self.data.index
)
def _lcl(self):
self.lcl = self.centerline - self.sigma_level * self.stddev
self.lcl_series = pd.Series(np.full(len(self.data.index), self.lcl, dtype=np.float64)
, name = 'lcl'
, index = self.data.index
)
def _ucl(self):
self.ucl = self.centerline + self.sigma_level * self.stddev
self.ucl_series = pd.Series(np.full(len(self.data.index), self.ucl, dtype=np.float64)
, name = 'ucl'
, index = self.data.index
)
def get_S(self):
mr = MovingRange(data, observation_range = self.observation_range,
sigma_level = self.sigma_level, stddev_only = True)
S = mr.get_S()
del mr
return S
def _ADnormality(self):
self.ADnorm, crit, sig_lvl = scipy.stats.anderson(self.data)
In [161]:
'''
random datasets to trial out for unit tests
'''
np.random.seed(2)
#uniform random data
data = np.random.uniform(0, 20, size=50)
#normal random data
data_norm = np.random.normal(loc = 10, scale = 3, size = 50)
#normal random data, with stage column
size = 5
df1 = pd.DataFrame({'data': np.random.normal(loc = 10, scale = 3, size = size)
, 'stage': ['stage0'] * size
}, index = range(size)
)
df2 = pd.DataFrame({'data': np.random.normal(loc = 20, scale = 3, size = size)
, 'stage': ['stage1'] * size
}, index = range(size, size*2)
)
df3 = pd.DataFrame({'data': np.random.normal(loc = 10, scale = 3, size = size)
, 'stage': ['stage0'] * size
}, index = range(size * 2, size * 3)
)
data_stage = pd.concat([df1, df2, df3])
In [162]:
## test code
ser1 = pd.Series(['A0', 'A1', 'A2', 'A3'], name = 'A', index = [0,1,2,3])
ser2 = pd.Series(['B0', 'B1', 'B2', 'B3'], name = 'B', index = [1,2,3,4])
ser3 = pd.Series(['C0', 'C1', 'C2', 'C3'], name = 'C', index = [1,2,3,4])
ser4 = pd.Series(['D0', 'D1', 'D2', 'D3'], name = 'D', index = [0,1,2,4])
ser5 = pd.Series(['D0', 'D1', 'D2', 'D3'], name = 'D', index = [5,6,7,8])
ser6 = pd.Series(np.full(4, 10, dtype=np.float64),name = 'E', index = ser2.index)
a = pd.concat([ser2, ser3, ser6], axis = 1)
In [160]:
class ControlChartFigureWrapper():
'''
Data, and stage must be provided as a single pandas series, or dataframe with only one column.
'''
#formatting defaults
fmt_pts = {'color': 'black', 'linestyle': 'solid', 'marker': 's'} #data
fmt_fail = {'color': 'green', 'linestyle': None, 'marker': 's'} #failed points
fmt_spec = {'color': 'red', 'linestyle': None, 'marker': 's'} #out_of_spec
fmt_cen = {'color': 'green', 'linestyle': 'dashed', 'marker': None} #centerlines
fmt_cl = {'color': 'green', 'linestyle': 'solid', 'marker': None} #control limits
fmt_sl = {'color': 'red', 'linestyle': 'solid', 'marker': None} #spec limits
stats = {'centerline': True, 'stddev': True, 'ucl': True, 'lcl': True, 'usl': False, 'lsl': False, 'cpk': False,
'A-D normality': False
}
def __init__(self, data, charts = 'H-I-MR', *args, **kwargs):
self.data_stages, self.stage_breaks = self._stage_data(data, *args, **kwargs)
self.stage_breaks = [max(data.index) + 0.5 for stage, data in self.data_stages][:-1]
#plt.close('all')
#try: ylabel = kwarg.pop('ylabel')
#except: ylabel = ''
figsize = kwargs.get('figsize', (10,10))
self.charts = charts
self.fmt_pts.update(kwargs.get('fmt_pts', {}))
self.fmt_fail.update(kwargs.get('fmt_fail', {}))
self.fmt_spec.update(kwargs.get('fmt_spec', {}))
self.fmt_cen.update(kwargs.get('fmt_cen', {}))
self.fmt_cl.update(kwargs.get('fmt_cl', {}))
self.fmt_sl.update(kwargs.get('fmt_sl', {}))
self.stats.update(kwargs.get('stats', {}))
{
'H-I-MR': self._figure_HIMR,
'I': self._figure_Individual,
'I-MR': self._figure_IMR
#'XR'
#'XS'
#'H'
}[charts](figsize = figsize, *args, **kwargs)
def _figure_HIMR(self, figsize, *args, **kwargs):
self.gs = gridspec.GridSpec(2, 2,
width_ratios=[1,3],
height_ratios=[5,3]
)
plt.figure(1, figsize=figsize)
self._plot_mr(*args, gs_pos = 3, **kwargs)
self._plot_individual(*args, gs_pos = 1, **kwargs)
self._plot_hist(*args, gs_pos = 0, **kwargs)
self._plot_stats(*args, gs_pos = 2, **kwargs)
def _figure_Individual(self, figsize, *args, **kwargs):
self.gs = gridspec.GridSpec(1,1)
plt.figure(2, figsize=figsize)
self._plot_individual(*args, gs_pos = 0, **kwargs)
def _figure_IMR(self, figsize, *args, **kwargs):
self.gs = gridspec.GridSpec(2, 1,
height_ratios=[5,3]
)
plt.figure(3, figsize=figsize)
self._plot_mr(*args, gs_pos = 1, **kwargs)
self._plot_individual(*args, gs_pos = 0, **kwargs)
def _plot_individual(self, gs_pos, *args, **kwargs):
#Initialize plot, grab kwargs
spec_min = kwargs.get('spec_min', False)
spec_max = kwargs.get('spec_max', False)
ylabel = kwargs.get('ylabel', False)
special_cause_tests = kwargs.get('test_group', False)
out_of_spec = spec_min or spec_max
try:
self.individuals = [Individual(data, stddev = stddev, *args, **kwargs)
for (stage, data), stddev in zip(self.data_stages, self.mr_stddev)
]
except ValueError:
self.individuals = [Individual(data, *arg, **kwargs)
for stage, data in self.data_stages
]
plot_points = [ind.get_plot_points() for ind in self.individuals]
ind = pd.concat(plot_points)
for index in self.stage_breaks:
ind.loc[index] = np.NaN
self.ind = ind.sort_index()
self.ind_axes = plt.subplot(self.gs[gs_pos])
plt.plot(self.ind['lcl'], **self.fmt_cl)
plt.plot(self.ind['ucl'], **self.fmt_cl)
if spec_min:
plt.plot(self.ind['lsl'], **self.fmt_sl)
if spec_max:
plt.plot(self.ind['usl'], **self.fmt_sl)
plt.plot(self.ind['cen'], **self.fmt_cen)
plt.plot(self.ind['pts'], **self.fmt_pts)
if special_cause_tests:
plt.plot(self.ind['fail'], **self.fmt_fail)
if out_of_spec:
plt.plot(self.ind['oos'], **self.fmt_spec)
plt.title('Individual')
def _plot_mr(self, *args, gs_pos, **kwargs):
#Initialize plot, grab kwargs
ylabel = kwargs.get('ylabel', False)
special_cause_tests = kwargs.get('test_group', False)
moving_ranges = [MovingRange(data, *args, **kwargs) for stage, data in self.data_stages]
self.mr_stddev = [mr.get_S() for mr in moving_ranges]
plot_points = [mr.get_plot_points() for mr in moving_ranges]
mr = pd.concat(plot_points)
for index in self.stage_breaks:
mr.loc[index] = np.NaN
self.mr = mr.sort_index()
plt.subplot(self.gs[gs_pos])
plt.plot(self.mr['lcl'], **self.fmt_cl)
plt.plot(self.mr['ucl'], **self.fmt_cl)
plt.plot(self.mr['cen'], **self.fmt_cen)
plt.plot(self.mr['pts'], **self.fmt_pts)
if special_cause_tests:
plt.plot(self.mr['fail'], **self.fmt_fail)
plt.title('Moving Range')
if ylabel:
plt.ylabel(ylabel)
def _plot_hist(self, *args, gs_pos, num_bins = 10, orientation = 'horizontal', **kwargs):
#Initialize plot, grab kwargs
spec_min = kwargs.get('spec_min', False)
spec_max = kwargs.get('spec_max', False)
ylabel = kwargs.get('ylabel', False)
#charts = kwargs.get('charts', 'H-I-MR')
if self.charts == 'H-I-MR':
self.hist_axes = plt.subplot(self.gs[gs_pos], sharey = self.ind_axes)
#single histogram for all data, need to implement something that plots groups on each other, maybe.
data = np.array([item for stage, group in self.data_stages for item in group])
plt.hist(data, num_bins, normed=True, orientation = orientation, color = 'orange')
x_array = np.linspace(*self.hist_axes.get_ylim(), num = 50)
y = scipy.stats.norm.pdf(x_array, data.mean(), data.std())
plt.plot(y, x_array, **self.fmt_cen)
plt.xticks(np.linspace(*self.hist_axes.get_xlim(), num = 5) )
if spec_min:
plt.axhline(y=spec_min, **self.fmt_sl)
if spec_max:
plt.axhline(y=spec_max, **self.fmt_sl)
if ylabel:
plt.ylabel(ylabel)
formatter = tkr.FormatStrFormatter('%1.2f')
self.hist_axes.xaxis.set_major_formatter(formatter)
plt.gca().invert_xaxis()
else:
self.hist_axes = plt.subplot(gs[gs_pos])
plt.hist(self.data_stages, num_bins, normed=1, orientation = 'vertical', color = 'orange')
def _plot_stats(self, *args, gs_pos, **kwargs):
plt.subplot(self.gs[gs_pos])
plt.axis('off')
stat_result = []
if self.stats['centerline']:
cen = ['{:1.4g}'.format(ind.mean) for ind in self.individuals]
stat_result.append('Centerline: ' + ', '.join(cen))
if self.stats['stddev']:
stddev = ['{:1.4f}'.format(ind.stddev) for ind in self.individuals]
stat_result.append('Std Dev: ' + ', '.join(stddev))
if self.stats['ucl']:
ucl = ['{:1.4g}'.format(ind.ucl) for ind in self.individuals]
stat_result.append('UCL: ' + ', '.join(ucl))
if self.stats['lcl']:
lcl = ['{:1.4g}'.format(ind.lcl) for ind in self.individuals]
stat_result.append('LCL: ' + ', '.join(lcl))
if self.stats['usl']:
usl = ['{:1.4g}'.format(ind.usl) for ind in self.individuals]
stat_result.append('USL: ' + ', '.join(usl))
if self.stats['lsl']:
lsl = ['{:1.4g}'.format(ind.lsl) for ind in self.individuals]
stat_result.append('LSL: ' + ', '.join(lsl))
if self.stats['cpk']:
cpk = ['{:1.4g}'.format(ind.cpk) for ind in self.individuals]
stat_result.append('CPk: ' + ', '.join(cpk))
if self.stats['A-D normality']:
norm = ['{:1.4g}'.format(ind.ADnorm) for ind in self.individuals]
stat_result.append('Normality: ' + ', '.join(norm))
plt.text(0.0, 0.5, '\n'.join(stat_result))
def _stage_data(self, data, *args, stages = pd.Series(), split_method = None, **kwargs):
'''
Restructures the data given into an orderedDict of stage name keys
and stage dataframe subsets
'''
if split_method and (len(data) != len(stages)):
raise ValueError('Provided data and Stage length differ')
if stages.any() and not split_method:
raise NameError('No split method provided with stage list')
data = pd.Series(data)
stages = pd.Series(stages)
stage_frames = []
def _none():
key = data.name
values = data
stage_frames.append((key, values))
stage_breaks = []
return stage_frames, stage_breaks
def _unique():
#obtain unique stages
keys = stages.unique()
for key in keys:
values = data[stages == key]
stage_frames.append((key, values))
#obtain stage breaks
init = True
key = None
values = None
stage_breaks = []
for n in range(len(data.index)):
if init: #init only true once
key = stages[n]
values = data[n:n+1]
init = False
elif not init and key == stages[n]:
values = data[n:n+1]
else:
stage_breaks.append(max(values.index) + 0.5)
key = stages[n]
values = data[n:n+1]
return stage_frames, stage_breaks
def _change():
init = True
key = None
values = None
stage_breaks = []
#for value, stage in zip(data, stages):
for n in range(len(data.index)):
if init: #init only true once
key = stages[n]
values = pd.Series(data[n:n+1])
init = False
elif not init and key == stages[n]:
values = values.append(data[n:n+1])
else:
stage_frames.append((key, values)) #save last round data
stage_breaks.append(max(values.index) + 0.5)
key = stages[n]
values = pd.Series(data[n:n+1])
stage_frames.append((key, values)) #save last data point
return stage_frames, stage_breaks
return {
None: _none,
'unique': _unique,
'change': _change
}[split_method]()
#unit test?
#data_df = pd.DataFrame(data)
stats = {'centerline': True, 'stddev': True, 'ucl': True, 'lcl': True, 'usl': False, 'lsl': False, 'cpk': False,
'A-D normality': True
}
ControlChartFigureWrapper(data_stage['data']
, charts = 'H-I-MR'
, ylabel = 'Random'
, stats = stats
, test_group = [0]
, spec_min = 5
, stages = data_stage['stage']
, split_method = 'change'
, observation_range = 2
)
Out[160]:
In [ ]:
""" SPC Statistical Process Control provides means to monitor process behaviour using statistical tools defined by Shewhart and others. The process run is shown as Quality Control Charts (QCC).
Author: Michal Nowikowski godfryd@gmail.com
License: MIT """
import numpy as np
CHART_X_BAR_R_X = "x_bar R - X" CHART_X_BAR_R_R = "x_bar R - R" CHART_X_BAR_S_X = "x_bar S - X" CHART_X_BAR_S_S = "x_bar S - S" CHART_X_MR_X = "X mR - X" CHART_X_MR_MR = "X mR - mR" CHART_P = "p" CHART_NP = "np" CHART_C = "c" CHART_U = "u" CHART_EWMA = "EWMA" CHART_CUSUM = "CUSUM" CHART_THREE_WAY = "three way" CHART_TIME_SERIES = "time series"
RULES_1_BEYOND_3SIGMA = "1 beyond 3sigma" RULES_2_OF_3_BEYOND_2SIGMA = "2 of 3 beyond 2sigma" RULES_4_OF_5_BEYOND_1SIGMA = "4 of 5 beyond 1sigma" RULES_7_ON_ONE_SIDE = "7 on one side" RULES_8_ON_ONE_SIDE = "8 on one side" RULES_9_ON_ONE_SIDE = "9 on one side" RULES_6_TRENDING = "6 trending" RULES_14_UP_DOWN = "14 up down" RULES_15_BELOW_1SIGMA = "15 below 1sigma" RULES_8_BEYOND_1SIGMA_BOTH_SIDES = "8 beyond 1*sigma on both sides"
RULES_BASIC = [RULES_1_BEYOND_3SIGMA, RULES_7_ON_ONE_SIDE] RULES_PMI = [RULES_1_BEYOND_3SIGMA, RULES_8_ON_ONE_SIDE] RULES_WECO = [RULES_1_BEYOND_3SIGMA, RULES_2_OF_3_BEYOND_2SIGMA, RULES_4_OF_5_BEYOND_1SIGMA, RULES_8_ON_ONE_SIDE, RULES_6_TRENDING, RULES_14_UP_DOWN] RULES_NELSON = [RULES_1_BEYOND_3SIGMA, RULES_9_ON_ONE_SIDE, RULES_6_TRENDING, RULES_14_UP_DOWN, RULES_2_OF_3_BEYOND_2SIGMA, RULES_4_OF_5_BEYOND_1SIGMA, RULES_15_BELOW_1SIGMA, RULES_8_BEYOND_1SIGMA_BOTH_SIDES]
RULES_ALL = [RULES_1_BEYOND_3SIGMA, RULES_2_OF_3_BEYOND_2SIGMA, RULES_4_OF_5_BEYOND_1SIGMA, RULES_7_ON_ONE_SIDE, RULES_8_ON_ONE_SIDE, RULES_6_TRENDING, RULES_14_UP_DOWN, RULES_15_BELOW_1SIGMA, RULES_8_BEYOND_1SIGMA_BOTH_SIDES]
def test_beyond_limits(data, center, lcl, ucl): return data[0] > ucl or data[0] < lcl
def test_violating_runs(data, center, lcl, ucl): for i in xrange(1, len(data)): if (data[i-1] - center)*(data[i] - center) < 0: return False return True
A2 = [0, 0, 1.880, 1.023, 0.729, 0.577, 0.483, 0.419, 0.373, 0.337, 0.308] D3 = [0, 0, 0, 0, 0, 0, 0, 0.076, 0.136, 0.184, 0.223] D4 = [0, 0, 3.267, 2.575, 2.282, 2.115, 2.004, 1.924, 1.864, 1.816, 1.777]
c4 = [0, 0, 0.7979, 0.8862, 0.9213, 0.9400, 0.9515, 0.9594, 0.9650, 0.9693, 0.9727, 0.9754, 0.9776, 0.9794, 0.9810, 0.9823] # 0.9869, 0.9896] B3 = [0, 0, 0, 0, 0, 0, 0.030, 0.118, 0.185, 0.239, 0.284, 0.321, 0.354, 0.382, 0.406, 0.428] # 0.510, 0.565] B4 = [0, 0, 3.267, 2.568, 2.266, 2.089, 1.970, 1.882, 1.815, 1.761, 1.716, 1.679, 1.646, 1.618, 1.594, 1.572] # 1.490, 1.435] B5 = [0, 0, 0, 0, 0, 0, 0.029, 0.113, 0.179, 0.232, 0.276, 0.313, 0.346, 0.374, 0.399, 0.421] # 0.504, 0.559] B6 = [0, 0, 2.606, 2.276, 2.088, 1.964, 1.874, 1.806, 1.751, 1.707, 1.669, 1.637, 1.610, 1.585, 1.563, 1.544] # 1.470, 1.420] A3 = [0, 0, 2.659, 1.954, 1.628, 1.427, 1.287, 1.182, 1.099, 1.032, 0.975, 0.927, 0.886, 0.850, 0.817, 0.789] # 0.680, 0.606]
def get_stats_x_mr_x(data, size): assert size == 1 center = np.mean(data) sd = 0 for i in xrange(len(data)-1): sd += abs(data[i] - data[i+1]) sd /= len(data) - 1 d2 = 1.128 lcl = center - 3sd/d2 ucl = center + 3sd/d2 return center, lcl, ucl
def get_stats_x_mr_mr(data, size): assert size == 1 sd = 0 for i in xrange(len(data)-1): sd += abs(data[i] - data[i+1]) sd /= len(data) - 1 d2 = 1.128 center = sd lcl = 0 ucl = center + 3*sd/d2 return center, lcl, ucl
def get_stats_x_bar_r_x(data, size): n = size assert n >= 2 assert n <= 10
r_sum = 0
for xset in data:
assert len(xset) == n
r_sum += max(xset) - min(xset)
r_bar = r_sum / len(data)
x_bar = np.mean(data)
center = x_bar
lcl = center - A2[n]*r_bar
ucl = center + A2[n]*r_bar
return center, lcl, ucl
def get_stats_x_bar_r_r(data, size): n = size assert n >= 2 assert n <= 10
r_sum = 0
for xset in data:
assert len(xset) == n
r_sum += max(xset) - min(xset)
r_bar = r_sum / len(data)
center = r_bar
lcl = D3[n]*r_bar
ucl = D4[n]*r_bar
return center, lcl, ucl
def get_stats_x_bar_s_x(data, size): n = size assert n >= 2 assert n <= 10
s_bar = np.mean(np.std(data, 1, ddof=1))
x_bar = np.mean(data)
center = x_bar
lcl = center - A3[n]*s_bar
ucl = center + A3[n]*s_bar
return center, lcl, ucl
def get_stats_x_bar_s_s(data, size): n = size assert n >= 2 assert n <= 10
s_bar = np.mean(np.std(data, 1, ddof=1))
center = s_bar
lcl = B3[n]*s_bar
ucl = B4[n]*s_bar
return center, lcl, ucl
def get_stats_p(data, size): n = size assert n > 1
pbar = float(sum(data)) / (n * len(data))
sd = np.sqrt(pbar*(1-pbar)/n)
center = pbar
lcl = center - 3*sd
if lcl < 0:
lcl = 0
ucl = center + 3*sd
if ucl > 1:
ucl = 1.0
return center, lcl, ucl
def get_stats_np(data, size): n = size assert n > 1
pbar = float(sum(data)) / (n * len(data))
sd = np.sqrt(n*pbar*(1-pbar))
center = n*pbar
lcl = center - 3*sd
if lcl < 0:
lcl = 0
ucl = center + 3*sd
if ucl > n:
ucl = n
return center, lcl, ucl
def get_stats_c(data, size): cbar = np.mean(data)
center = cbar
lcl = center - 3*np.sqrt(cbar)
if lcl < 0:
lcl = 0
ucl = center + 3*np.sqrt(cbar)
return center, lcl, ucl
def get_stats_u(data, size): n = size assert n > 1
cbar = float(sum(data))/(len(data)*n)
center = cbar
lcl = center - 3*np.sqrt(cbar/n)
if lcl < 0:
lcl = 0
ucl = center + 3*np.sqrt(cbar/n)
return center, lcl, ucl
def get_stats_cusum(data, size): """ Find the data for a cusum graph
Only returns 0 as the center as the data is moved
its mean and ucl and lcl are not reported
"""
return 0, None, None
def prepare_data_none(data, size): return data
def prepare_data_x_bar_rs_x(data, size): data2 = [] for xset in data: data2.append(np.mean(xset)) return data2
def prepare_data_x_bar_r_r(data, size): data2 = [] for xset in data: data2.append(max(xset) - min(xset)) return data2
def prepare_data_x_bar_s_s(data, size): data2 = [] for xset in data: data2.append(np.std(xset, ddof=1)) return data2
def prepare_data_x_mr(data, size): data2 = [0] for i in xrange(len(data)-1): data2.append(abs(data[i] - data[i+1])) return data2
def prepare_data_p(data, size): data2 = [0] for d in data: data2.append(float(d)/size) return data2
def prepare_data_u(data, size): data2 = [0] for d in data: data2.append(float(d)/size) return data2
def prepare_data_cusum(data, size, target=None): """ Prepares the data for a CUSUM graph
subtracts the mean from each data point
then calculates the culumative sum of each
$S_m=\sum_{i=1}^m (x_i-\mu)$
where $x_i$ is the data point
$\mu$ is the target value
if $\mu is not provided the mean of the sample is used
"""
data2 = []
if target is None:
target = np.mean(data)
for d in data:
data2.append(float(d) - target)
data3 = [sum(data2[:i]) for i in xrange(len(data2)+1)]
return data3
STATS_FUNCS = { CHART_X_BAR_R_X: (get_stats_x_bar_r_x, prepare_data_x_bar_rs_x), CHART_X_BAR_R_R: (get_stats_x_bar_r_r, prepare_data_x_bar_r_r), CHART_X_BAR_S_X: (get_stats_x_bar_s_x, prepare_data_x_bar_rs_x), CHART_X_BAR_S_S: (get_stats_x_bar_s_s, prepare_data_x_bar_s_s), CHART_X_MR_X: (get_stats_x_mr_x, prepare_data_none), CHART_X_MR_MR: (get_stats_x_mr_mr, prepare_data_x_mr), CHART_P: (get_stats_p, prepare_data_p), CHART_NP: (get_stats_np, prepare_data_none), CHART_C: (get_stats_c, prepare_data_none), CHART_U: (get_stats_u, prepare_data_u), CHART_EWMA: (None, prepare_data_none), CHART_CUSUM: (get_stats_cusum, prepare_data_cusum), CHART_THREE_WAY: (None, prepare_data_none), CHART_TIME_SERIES: (None, prepare_data_none)}
RULES_FUNCS = { RULES_1_BEYOND_3SIGMA: (test_beyond_limits, 1), RULES_2_OF_3_BEYOND_2SIGMA: (None, 3), RULES_4_OF_5_BEYOND_1SIGMA: (None, 5), RULES_7_ON_ONE_SIDE: (test_violating_runs, 7), RULES_8_ON_ONE_SIDE: (test_violating_runs, 8), RULES_9_ON_ONE_SIDE: (test_violating_runs, 9), RULES_6_TRENDING: (None, 6), RULES_14_UP_DOWN: (None, 14), RULES_15_BELOW_1SIGMA: (None, 15), RULES_8_BEYOND_1SIGMA_BOTH_SIDES: (None, 8)}
class Spc(object): """ Main class that provides SPC analysis. It detects SPC rules violations. It can draw charts using matplotlib.
:arguments:
data
user data as flat array
**Usage**
>>> s = Spc([1, 2, 3, 3, 2, 1, 3, 8], CHART_X_MR_X)
>>> s.get_stats()
(2.875, 0.21542553191489322, 5.5345744680851068)
>>> s.get_violating_points()
{'1 beyond 3*sigma': [7]}
>>> s.get_chart()
>>> s = Spc([1, 2, 3, 3, 2, 1, 3, 8], CHART_CUSUM)
>>> s.get_stats()
(0, None, None)
>>> s.get_violating_points()
{'7 on one side': [7, 8], '1 beyond 3*sigma': [1, 2, 3, 4, 5, 6, 7, 8]}
>>> s.get_chart()
"""
def __init__(self, data, chart_type, rules=RULES_BASIC, stats_custom=None, newdata=None, sizes=None):
data = data if isinstance(data, list) else list(data)
self.chart_type = chart_type
self.rules = rules
self.stats = []
if newdata is None:
newdata = []
sf, pd = STATS_FUNCS[chart_type]
if sizes is None:
if isinstance(data[0], (list, tuple)):
size = len(data[0])
else:
size = 1
else:
size = sizes
if stats_custom is None:
self.center, self.lcl, self.ucl = sf(data, size)
else:
self.center, self.lcl, self.ucl = stats_custom
self._data = pd(data + newdata, size)
self.violating_points = self._find_violating_points()
def _find_violating_points(self, rules=None):
if rules is None:
rules = []
if len(rules) > 0:
rs = rules
else:
rs = self.rules
points = {}
for i in xrange(len(self._data)):
for r in rs:
func, points_num = RULES_FUNCS[r]
if func is None or i <= points_num - 1:
continue
if func(self._data[i-points_num+1:i+1], self.center, self.lcl, self.ucl):
points.setdefault(r, []).append(i)
return points
def get_chart(self, legend=True, title=None, index=None):
"""Generate chart using matplotlib."""
try:
import matplotlib
except ImportError:
raise Exception("matplotlib not installed")
else:
import matplotlib.pyplot as plt
import matplotlib.lines as mlines
if index is not None and not isinstance(index, list):
index = list(index)
plt.figure(figsize=(20, 10))
ax = plt.subplot(111)
if index is None:
plt.plot(self._data, "bo-", ms=5, label='Data')
else:
plt.plot(index, self._data, "bo-", ms=5, label='Data')
title = self.chart_type if title is None else title
plt.title(title, fontsize=22) # setting the title for the figure
if self.center is not None:
plt.axhline(self.center, color='k', linestyle='-', label='Center (%0.3f)' % self.center)
if self.ucl is not None:
plt.axhline(self.ucl, color='r', linestyle='-.', linewidth=4, label='UCL (%0.3f)' % self.ucl)
if self.lcl is not None:
plt.axhline(self.lcl, color='r', linestyle='-.', linewidth=4, label='LCL (%0.3f)' % self.lcl)
if RULES_7_ON_ONE_SIDE in self.violating_points:
for i in self.violating_points[RULES_7_ON_ONE_SIDE]:
if index is not None:
ax.plot([index[i]], [self._data[i]], "yo", ms=10)
else:
ax.plot([i], [self._data[i]], "yo", ms=10)
ax.plot([], [], color='yellow', linestyle='', marker='o', ms=10, label='Run of 7')
if RULES_8_ON_ONE_SIDE in self.violating_points:
for i in self.violating_points[RULES_8_ON_ONE_SIDE]:
if index is not None:
ax.plot([index[i]], [self._data[i]], "yo", ms=10)
else:
ax.plot([i], [self._data[i]], "yo", ms=10)
ax.plot([], [], color='yellow', linestyle='', marker='o', ms=10, label='Run of 8')
if RULES_1_BEYOND_3SIGMA in self.violating_points:
for i in self.violating_points[RULES_1_BEYOND_3SIGMA]:
if index is not None:
ax.plot([index[i]], [self._data[i]], "ro", ms=10)
else:
ax.plot([i], [self._data[i]], "ro", ms=10)
ax.plot([], [], color='red', linestyle='', marker='o', ms=10, label='Out of Limits')
# readability improvements
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.get_xaxis().tick_bottom()
ax.get_yaxis().tick_left()
plt.xticks(fontsize=14)
plt.yticks(fontsize=14)
plt.grid(axis='y')
ylim = plt.ylim()
plt.ylim((ylim[0]-1, ylim[1]+1))
legend_output = None
if legend is True:
legend_output = plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
return ax, legend_output
def get_violating_points(self):
"""Return points that violates rules of control chart"""
return self.violating_points
def get_stats(self):
"""Return basic statistics about data as tuple: (center, LCL, UCL)."""
return self.center, self.lcl, self.ucl
In [ ]:
http://docs.scipy.org/doc/scipy/reference/stats.html