In [2]:
df = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])
Check for duplicates.
In [3]:
print('Count of rows: {0}'.format(df.count()))
print('Count of distinct rows: {0}'.format(df.distinct().count()))
If these two numbers differ - you have rows that are exact copies of each other. We can drop these rows by using the .dropDuplicates(...)
method.
In [4]:
df = df.dropDuplicates()
df.show()
Let's confirm.
In [5]:
print('Count of ids: {0}'.format(df.count()))
print('Count of distinct ids: {0}'.format(df.select([c for c in df.columns if c != 'id']).distinct().count()))
We still have one more duplicate. We will use the .dropDuplicates(...)
but add the subset
parameter.
In [6]:
df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id'])
df.show()
To calculate the total and distinct number of IDs in one step we can use the .agg(...)
method.
In [7]:
import pyspark.sql.functions as fn
df.agg(
fn.count('id').alias('count'),
fn.countDistinct('id').alias('distinct')
).show()
Give each row a unique ID.
In [8]:
df.withColumn('new_id', fn.monotonically_increasing_id()).show()
In [9]:
df_miss = spark.createDataFrame([
(1, 143.5, 5.6, 28, 'M', 100000),
(2, 167.2, 5.4, 45, 'M', None),
(3, None , 5.2, None, None, None),
(4, 144.5, 5.9, 33, 'M', None),
(5, 133.2, 5.7, 54, 'F', None),
(6, 124.1, 5.2, None, 'F', None),
(7, 129.2, 5.3, 42, 'M', 76000),
], ['id', 'weight', 'height', 'age', 'gender', 'income'])
To find the number of missing observations per row we can use the following snippet.
In [10]:
df_miss.rdd.map(
lambda row: (row['id'], sum([c == None for c in row]))
).collect()
Out[10]:
Let's see what values are missing so when we count missing observations in columns we can decide whether to drop the observation altogether or impute some of the observations.
In [11]:
df_miss.where('id == 3').show()
What is the percentage of missing observations we see in each column?
In [12]:
df_miss.agg(*[
(1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing')
for c in df_miss.columns
]).show()
We will drop the 'income'
feature as most of its values are missing.
In [13]:
df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != 'income'])
df_miss_no_income.show()
To drop the observations instead you can use the .dropna(...)
method.
In [14]:
df_miss_no_income.dropna(thresh=3).show()
To impute a mean, median or other calculated value you need to first calculate the value, create a dict with such values, and then pass it to the .fillna(...)
method.
In [15]:
means = df_miss_no_income.agg(
*[fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']
).toPandas().to_dict('records')[0]
means['gender'] = 'missing'
df_miss_no_income.fillna(means).show()
In [16]:
df_outliers = spark.createDataFrame([
(1, 143.5, 5.3, 28),
(2, 154.2, 5.5, 45),
(3, 342.3, 5.1, 99),
(4, 144.5, 5.5, 33),
(5, 133.2, 5.4, 54),
(6, 124.1, 5.1, 21),
(7, 129.2, 5.3, 42),
], ['id', 'weight', 'height', 'age'])
First, we calculate the lower and upper cut off points for each feature.
In [17]:
cols = ['weight', 'height', 'age']
bounds = {}
for col in cols:
quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05)
IQR = quantiles[1] - quantiles[0]
bounds[col] = [quantiles[0] - 1.5 * IQR, quantiles[1] + 1.5 * IQR]
The bounds
dictionary holds the lower and upper bounds for each feature.
In [18]:
bounds
Out[18]:
Let's now use it to flag our outliers.
In [19]:
outliers = df_outliers.select(*['id'] + [
(
(df_outliers[c] < bounds[c][0]) |
(df_outliers[c] > bounds[c][1])
).alias(c + '_o') for c in cols
])
outliers.show()
We have two outliers in the weight
feature and two in the age
feature.
In [20]:
df_outliers = df_outliers.join(outliers, on='id')
df_outliers.filter('weight_o').select('id', 'weight').show()
df_outliers.filter('age_o').select('id', 'age').show()
Load our data and convert it to a Spark DataFrame.
In [21]:
import pyspark.sql.types as typ
Next, we read the data in.
In [22]:
fraud = sc.textFile('ccFraud.csv.gz')
header = fraud.first()
fraud = fraud \
.filter(lambda row: row != header) \
.map(lambda row: [int(elem) for elem in row.split(',')])
Following, we create the schema for our DataFrame
.
In [23]:
fields = [
*[
typ.StructField(h[1:-1], typ.IntegerType(), True)
for h in header.split(',')
]
]
schema = typ.StructType(fields)
Finally, we create our DataFrame
.
In [24]:
fraud_df = spark.createDataFrame(fraud, schema)
Now that the dataframe is ready we can calculate the basic descriptive statistics for our dataset.
In [25]:
fraud_df.printSchema()
For categorical columns we will count the frequencies of their values using .groupby(...)
method.
In [26]:
fraud_df.groupby('gender').count().show()
For the truly numerical features we can use the .describe()
method.
In [27]:
numerical = ['balance', 'numTrans', 'numIntlTrans']
In [28]:
desc = fraud_df.describe(numerical)
desc.show()
Here's how you check skewness (we will do it for the 'balance'
feature only).
In [29]:
fraud_df.agg({'balance': 'skewness'}).show()
Calculating correlations in PySpark is very easy once your data is in a DataFrame form.
In [29]:
fraud_df.corr('balance', 'numTrans')
Out[29]:
In order to create a correlations matrix you can use the script below.
In [30]:
n_numerical = len(numerical)
corr = []
for i in range(0, n_numerical):
temp = [None] * i
for j in range(i, n_numerical):
temp.append(fraud_df.corr(numerical[i], numerical[j]))
corr.append(temp)
corr
Out[30]:
First, let's load the modules and set them up.
In [30]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use('ggplot')
import bokeh.charts as chrt
from bokeh.io import output_notebook
output_notebook()
In [31]:
hists = fraud_df.select('balance').rdd.flatMap(lambda row: row).histogram(20)
To plot the histogram you can simply call the matplotlib like below.
In [39]:
data = {
'bins': hists[0][:-1],
'freq': hists[1]
}
fig = plt.figure(figsize=(12,9))
ax = fig.add_subplot(1, 1, 1)
ax.bar(data['bins'], data['freq'], width=2000)
ax.set_title('Histogram of \'balance\'')
plt.savefig('B05793_05_22.png', dpi=300)
In a similar manner, a histogram can be create with Bokeh.
In [41]:
b_hist = chrt.Bar(data, values='freq', label='bins', title='Histogram of \'balance\'')
chrt.show(b_hist)
If your data is small enough to fit on the driver (although we would argue it would normally be faster to use the method showed above) you can bring the data and use the .hist(...)
(from Matplotlib) or .Histogram(...)
(from Bokeh) methods.
In [42]:
data_driver = {'obs': fraud_df.select('balance').rdd.flatMap(lambda row: row).collect()}
In [44]:
fig = plt.figure(figsize=(12,9))
ax = fig.add_subplot(1, 1, 1)
ax.hist(data_driver['obs'], bins=20)
ax.set_title('Histogram of \'balance\' using .hist()')
plt.savefig('B05793_05_24.png', dpi=300)
In [37]:
b_hist_driver = chrt.Histogram(data_driver, values='obs', title='Histogram of \'balance\' using .Histogram()', bins=20)
chrt.show(b_hist_driver)
In this example we will sample our fraud dataset at 1% given gender as strata.
In [38]:
data_sample = fraud_df.sampleBy('gender', {1: 0.0002, 2: 0.0002}).select(numerical)
To put multiple 2D charts in one go you can use
In [39]:
data_multi = dict([
(elem, data_sample.select(elem).rdd.flatMap(lambda row: row).collect())
for elem in numerical
])
sctr = chrt.Scatter(data_multi, x='balance', y='numTrans')
chrt.show(sctr)