# Prepare and understand data for modeling

### Duplicates

Consider the following example.

``````

In [2]:

df = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])

``````

Check for duplicates.

``````

In [3]:

print('Count of rows: {0}'.format(df.count()))
print('Count of distinct rows: {0}'.format(df.distinct().count()))

``````
``````

Count of rows: 7
Count of distinct rows: 6

``````

If these two numbers differ - you have rows that are exact copies of each other. We can drop these rows by using the `.dropDuplicates(...)` method.

``````

In [4]:

df = df.dropDuplicates()
df.show()

``````
``````

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  4| 144.5|   5.9| 33|     M|
|  1| 144.5|   5.9| 33|     M|
|  5| 129.2|   5.3| 42|     M|
|  5| 133.2|   5.7| 54|     F|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
+---+------+------+---+------+

``````

Let's confirm.

``````

In [5]:

print('Count of ids: {0}'.format(df.count()))
print('Count of distinct ids: {0}'.format(df.select([c for c in df.columns if c != 'id']).distinct().count()))

``````
``````

Count of ids: 6
Count of distinct ids: 5

``````

We still have one more duplicate. We will use the `.dropDuplicates(...)` but add the `subset` parameter.

``````

In [6]:

df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id'])
df.show()

``````
``````

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  4| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+

``````

To calculate the total and distinct number of IDs in one step we can use the `.agg(...)` method.

``````

In [7]:

import pyspark.sql.functions as fn

df.agg(
fn.count('id').alias('count'),
fn.countDistinct('id').alias('distinct')
).show()

``````
``````

+-----+--------+
|count|distinct|
+-----+--------+
|    5|       4|
+-----+--------+

``````

Give each row a unique ID.

``````

In [8]:

df.withColumn('new_id', fn.monotonically_increasing_id()).show()

``````
``````

+---+------+------+---+------+-------------+
| id|weight|height|age|gender|       new_id|
+---+------+------+---+------+-------------+
|  5| 133.2|   5.7| 54|     F|  25769803776|
|  4| 144.5|   5.9| 33|     M| 171798691840|
|  2| 167.2|   5.4| 45|     M| 592705486848|
|  3| 124.1|   5.2| 23|     F|1236950581248|
|  5| 129.2|   5.3| 42|     M|1365799600128|
+---+------+------+---+------+-------------+

``````

### Missing observations

Consider a similar example to the one we presented above.

``````

In [9]:

df_miss = spark.createDataFrame([
(1, 143.5, 5.6, 28,   'M',  100000),
(2, 167.2, 5.4, 45,   'M',  None),
(3, None , 5.2, None, None, None),
(4, 144.5, 5.9, 33,   'M',  None),
(5, 133.2, 5.7, 54,   'F',  None),
(6, 124.1, 5.2, None, 'F',  None),
(7, 129.2, 5.3, 42,   'M',  76000),
], ['id', 'weight', 'height', 'age', 'gender', 'income'])

``````

To find the number of missing observations per row we can use the following snippet.

``````

In [10]:

df_miss.rdd.map(
lambda row: (row['id'], sum([c == None for c in row]))
).collect()

``````
``````

Out[10]:

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

``````

Let's see what values are missing so when we count missing observations in columns we can decide whether to drop the observation altogether or impute some of the observations.

``````

In [11]:

df_miss.where('id == 3').show()

``````
``````

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  3|  null|   5.2|null|  null|  null|
+---+------+------+----+------+------+

``````

What is the percentage of missing observations we see in each column?

``````

In [12]:

df_miss.agg(*[
(1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing')
for c in df_miss.columns
]).show()

``````
``````

+----------+------------------+--------------+------------------+------------------+------------------+
|id_missing|    weight_missing|height_missing|       age_missing|    gender_missing|    income_missing|
+----------+------------------+--------------+------------------+------------------+------------------+
|       0.0|0.1428571428571429|           0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
+----------+------------------+--------------+------------------+------------------+------------------+

``````

We will drop the `'income'` feature as most of its values are missing.

``````

In [13]:

df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != 'income'])
df_miss_no_income.show()

``````
``````

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  3|  null|   5.2|null|  null|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+

``````

To drop the observations instead you can use the `.dropna(...)` method.

``````

In [14]:

df_miss_no_income.dropna(thresh=3).show()

``````
``````

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+

``````

To impute a mean, median or other calculated value you need to first calculate the value, create a dict with such values, and then pass it to the `.fillna(...)` method.

``````

In [15]:

means = df_miss_no_income.agg(
*[fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']
).toPandas().to_dict('records')[0]

means['gender'] = 'missing'

df_miss_no_income.fillna(means).show()

``````
``````

+---+-------------+------+---+-------+
| id|       weight|height|age| gender|
+---+-------------+------+---+-------+
|  1|        143.5|   5.6| 28|      M|
|  2|        167.2|   5.4| 45|      M|
|  3|140.283333333|   5.2| 40|missing|
|  4|        144.5|   5.9| 33|      M|
|  5|        133.2|   5.7| 54|      F|
|  6|        124.1|   5.2| 40|      F|
|  7|        129.2|   5.3| 42|      M|
+---+-------------+------+---+-------+

``````

### Outliers

Consider another simple example.

``````

In [16]:

df_outliers = spark.createDataFrame([
(1, 143.5, 5.3, 28),
(2, 154.2, 5.5, 45),
(3, 342.3, 5.1, 99),
(4, 144.5, 5.5, 33),
(5, 133.2, 5.4, 54),
(6, 124.1, 5.1, 21),
(7, 129.2, 5.3, 42),
], ['id', 'weight', 'height', 'age'])

``````

First, we calculate the lower and upper cut off points for each feature.

``````

In [17]:

cols = ['weight', 'height', 'age']
bounds = {}

for col in cols:
quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05)
IQR = quantiles[1] - quantiles[0]
bounds[col] = [quantiles[0] - 1.5 * IQR, quantiles[1] + 1.5 * IQR]

``````

The `bounds` dictionary holds the lower and upper bounds for each feature.

``````

In [18]:

bounds

``````
``````

Out[18]:

{'age': [-11.0, 92.0],
'height': [4.499999999999999, 6.1000000000000005],
'weight': [91.69999999999999, 191.7]}

``````

Let's now use it to flag our outliers.

``````

In [19]:

outliers = df_outliers.select(*['id'] + [
(
(df_outliers[c] < bounds[c][0]) |
(df_outliers[c] > bounds[c][1])
).alias(c + '_o') for c in cols
])
outliers.show()

``````
``````

+---+--------+--------+-----+
| id|weight_o|height_o|age_o|
+---+--------+--------+-----+
|  1|   false|   false|false|
|  2|   false|   false|false|
|  3|    true|   false| true|
|  4|   false|   false|false|
|  5|   false|   false|false|
|  6|   false|   false|false|
|  7|   false|   false|false|
+---+--------+--------+-----+

``````

We have two outliers in the `weight` feature and two in the `age` feature.

``````

In [20]:

df_outliers = df_outliers.join(outliers, on='id')
df_outliers.filter('weight_o').select('id', 'weight').show()
df_outliers.filter('age_o').select('id', 'age').show()

``````
``````

+---+------+
| id|weight|
+---+------+
|  3| 342.3|
+---+------+

+---+---+
| id|age|
+---+---+
|  3| 99|
+---+---+

``````

### Descriptive statistics

Load our data and convert it to a Spark DataFrame.

``````

In [21]:

import pyspark.sql.types as typ

``````

Next, we read the data in.

``````

In [22]:

fraud = sc.textFile('ccFraud.csv.gz')

fraud = fraud \
.filter(lambda row: row != header) \
.map(lambda row: [int(elem) for elem in row.split(',')])

``````

Following, we create the schema for our `DataFrame`.

``````

In [23]:

fields = [
*[
typ.StructField(h[1:-1], typ.IntegerType(), True)
]
]

schema = typ.StructType(fields)

``````

Finally, we create our `DataFrame`.

``````

In [24]:

fraud_df = spark.createDataFrame(fraud, schema)

``````

Now that the dataframe is ready we can calculate the basic descriptive statistics for our dataset.

``````

In [25]:

fraud_df.printSchema()

``````
``````

root
|-- custID: integer (nullable = true)
|-- gender: integer (nullable = true)
|-- state: integer (nullable = true)
|-- cardholder: integer (nullable = true)
|-- balance: integer (nullable = true)
|-- numTrans: integer (nullable = true)
|-- numIntlTrans: integer (nullable = true)
|-- creditLine: integer (nullable = true)
|-- fraudRisk: integer (nullable = true)

``````

For categorical columns we will count the frequencies of their values using `.groupby(...)` method.

``````

In [26]:

fraud_df.groupby('gender').count().show()

``````
``````

+------+-------+
|gender|  count|
+------+-------+
|     1|6178231|
|     2|3821769|
+------+-------+

``````

For the truly numerical features we can use the `.describe()` method.

``````

In [27]:

numerical = ['balance', 'numTrans', 'numIntlTrans']

``````
``````

In [28]:

desc = fraud_df.describe(numerical)
desc.show()

``````
``````

+-------+-----------------+------------------+-----------------+
|summary|          balance|          numTrans|     numIntlTrans|
+-------+-----------------+------------------+-----------------+
|  count|         10000000|          10000000|         10000000|
|   mean|     4109.9199193|        28.9351871|        4.0471899|
| stddev|3996.847309737077|26.553781024522852|8.602970115863767|
|    min|                0|                 0|                0|
|    max|            41485|               100|               60|
+-------+-----------------+------------------+-----------------+

``````

Here's how you check skewness (we will do it for the `'balance'` feature only).

``````

In [29]:

fraud_df.agg({'balance': 'skewness'}).show()

``````
``````

+------------------+
| skewness(balance)|
+------------------+
|1.1818315552995033|
+------------------+

``````

### Correlations

Calculating correlations in PySpark is very easy once your data is in a DataFrame form.

``````

In [29]:

fraud_df.corr('balance', 'numTrans')

``````
``````

Out[29]:

0.00044523140172659576

``````

In order to create a correlations matrix you can use the script below.

``````

In [30]:

n_numerical = len(numerical)

corr = []

for i in range(0, n_numerical):
temp = [None] * i

for j in range(i, n_numerical):
temp.append(fraud_df.corr(numerical[i], numerical[j]))
corr.append(temp)

corr

``````
``````

Out[30]:

[[1.0, 0.00044523140172659576, 0.00027139913398184604],
[None, 1.0, -0.0002805712819816179],
[None, None, 1.0]]

``````

### Visualization

First, let's load the modules and set them up.

``````

In [30]:

%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use('ggplot')

import bokeh.charts as chrt
from bokeh.io import output_notebook

output_notebook()

``````
``````

/* BEGIN /Users/drabast/anaconda/lib/python3.5/site-packages/bokeh/server/static/js/bokeh.min.js */