Simple Aggregation


In [ ]:
import numpy as np

data = np.arange(1000).reshape(100,10)
print data.shape

Pandas


In [ ]:
import pandas as pd

pand_tmp = pd.DataFrame(data, 
             columns=['x{0}'.format(i) for i in range(data.shape[1])])
pand_tmp.head()

What is the row sum?


In [ ]:
pand_tmp.sum(axis=1)

Column sum?


In [ ]:
pand_tmp.sum(axis=0)

In [ ]:
pand_tmp.to_csv('numbers.csv', index=False)

Spark


In [ ]:
import findspark
import os
findspark.init() # you need that before import pyspark.

import pyspark
sc = pyspark.SparkContext('local[4]', 'pyspark')

In [ ]:
lines = sc.textFile('numbers.csv', 18)
for l in lines.take(3):
    print l

lines.take(3)

In [ ]:
type(lines.take(1))

How do we skip the header? How about using find()? What is Boolean value for true with find()?


In [ ]:
lines = lines.filter(lambda x: x.find('x') != 0)
for l in lines.take(2):
    print l

In [ ]:
data = lines.map(lambda x: x.split(','))
data.take(3)

Row Sum

Cast to integer and sum!


In [ ]:
def row_sum(x):
    int_x = map(lambda x: int(x), x)
    return sum(int_x)

data_row_sum = data.map(row_sum)

print data_row_sum.collect()

print data_row_sum.count()

Column Sum

This one's a bit trickier, and portends ill for large, complex data sets (like example 5)...

Let's enumerate the list comprising each RDD "line" such that each value is indexed by the corresponding column number.


In [ ]:
def col_key(x):
    for i, value in enumerate(x):
        yield (i, int(value))

tmp = data.flatMap(col_key)
tmp.take(15)

Notice how flatMap works here: the generator is returned per partition, meaning that the first element value of each tuple cycles.


In [ ]:
tmp.take(3)

In [ ]:
tmp = tmp.groupByKey()
for i in tmp.take(2):
    print i, type(i)

In [ ]:
data_col_sum = tmp.map(lambda x: sum(x[1]))
for i in data_col_sum.take(2):
    print i

In [ ]:
print data_col_sum.collect()
print data_col_sum.count()

Column sum with Spark.sql.dataframe


In [ ]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [ ]:
sc

In [ ]:
pyspark_df = sqlContext.createDataFrame(pand_tmp)

In [ ]:
pyspark_df.take(2)

groupBy() without arguments groups by all columns


In [ ]:
for i in pyspark_df.columns:
    print pyspark_df.groupBy().sum(i).collect()

In [ ]: