Simple Aggregation

Thanks, Monte!


In [88]:
import numpy as np

data = np.arange(1000).reshape(100,10)
print data.shape


(100, 10)

Pandas


In [89]:
import pandas as pd

pand_tmp = pd.DataFrame(data, 
             columns=['x{0}'.format(i) for i in range(data.shape[1])])
pand_tmp.head()


Out[89]:
x0 x1 x2 x3 x4 x5 x6 x7 x8 x9
0 0 1 2 3 4 5 6 7 8 9
1 10 11 12 13 14 15 16 17 18 19
2 20 21 22 23 24 25 26 27 28 29
3 30 31 32 33 34 35 36 37 38 39
4 40 41 42 43 44 45 46 47 48 49

What is the row sum?


In [90]:
pand_tmp.sum(axis=1)


Out[90]:
0       45
1      145
2      245
3      345
4      445
5      545
6      645
7      745
8      845
9      945
10    1045
11    1145
12    1245
13    1345
14    1445
15    1545
16    1645
17    1745
18    1845
19    1945
20    2045
21    2145
22    2245
23    2345
24    2445
25    2545
26    2645
27    2745
28    2845
29    2945
      ... 
70    7045
71    7145
72    7245
73    7345
74    7445
75    7545
76    7645
77    7745
78    7845
79    7945
80    8045
81    8145
82    8245
83    8345
84    8445
85    8545
86    8645
87    8745
88    8845
89    8945
90    9045
91    9145
92    9245
93    9345
94    9445
95    9545
96    9645
97    9745
98    9845
99    9945
dtype: int64

Column sum?


In [91]:
pand_tmp.sum(axis=0)


Out[91]:
x0    49500
x1    49600
x2    49700
x3    49800
x4    49900
x5    50000
x6    50100
x7    50200
x8    50300
x9    50400
dtype: int64

In [92]:
pand_tmp.to_csv('numbers.csv', index=False)

Spark


In [93]:
lines = sc.textFile('numbers.csv', 18)
for l in lines.take(3):
    print l


x0,x1,x2,x3,x4,x5,x6,x7,x8,x9
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19

In [94]:
type(lines.take(1))


Out[94]:
list

How do we skip the header? How about using find()? What is Boolean value for true with find()?


In [95]:
lines = lines.filter(lambda x: x.find('x') != 0)
for l in lines.take(2):
    print l


0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19

In [96]:
data = lines.map(lambda x: x.split(','))
data.take(3)


Out[96]:
[[u'0', u'1', u'2', u'3', u'4', u'5', u'6', u'7', u'8', u'9'],
 [u'10', u'11', u'12', u'13', u'14', u'15', u'16', u'17', u'18', u'19'],
 [u'20', u'21', u'22', u'23', u'24', u'25', u'26', u'27', u'28', u'29']]

Row Sum

Cast to integer and sum!


In [97]:
def row_sum(x):
    int_x = map(lambda x: int(x), x)
    return sum(int_x)

data_row_sum = data.map(row_sum)

print data_row_sum.collect()
print data_row_sum.count()


[45, 145, 245, 345, 445, 545, 645, 745, 845, 945, 1045, 1145, 1245, 1345, 1445, 1545, 1645, 1745, 1845, 1945, 2045, 2145, 2245, 2345, 2445, 2545, 2645, 2745, 2845, 2945, 3045, 3145, 3245, 3345, 3445, 3545, 3645, 3745, 3845, 3945, 4045, 4145, 4245, 4345, 4445, 4545, 4645, 4745, 4845, 4945, 5045, 5145, 5245, 5345, 5445, 5545, 5645, 5745, 5845, 5945, 6045, 6145, 6245, 6345, 6445, 6545, 6645, 6745, 6845, 6945, 7045, 7145, 7245, 7345, 7445, 7545, 7645, 7745, 7845, 7945, 8045, 8145, 8245, 8345, 8445, 8545, 8645, 8745, 8845, 8945, 9045, 9145, 9245, 9345, 9445, 9545, 9645, 9745, 9845, 9945]
100

Column Sum

This one's a bit trickier, and portends ill for large, complex data sets (like example 4)... Let's enumerate the list comprising each RDD "line" such that each value is indexed by the corresponding column number.


In [98]:
def col_key(x):
    for i, value in enumerate(x):
       yield (i, int(value))

tmp = data.flatMap(col_key)
tmp.take(12)


Out[98]:
[(0, 0),
 (1, 1),
 (2, 2),
 (3, 3),
 (4, 4),
 (5, 5),
 (6, 6),
 (7, 7),
 (8, 8),
 (9, 9),
 (0, 10),
 (1, 11)]

In [99]:
tmp.take(3)


Out[99]:
[(0, 0), (1, 1), (2, 2)]

In [100]:
tmp = tmp.groupByKey()
for i in tmp.take(2):
    print i, type(i)


(0, <pyspark.resultiterable.ResultIterable object at 0x7f6ea1f42d50>) <type 'tuple'>
(8, <pyspark.resultiterable.ResultIterable object at 0x7f6ea1f42650>) <type 'tuple'>

In [101]:
data_col_sum = tmp.map(lambda x: sum(x[1]))
for i in data_col_sum.take(2):
    print i


49500
50300

In [102]:
print data_col_sum.collect()
print data_col_sum.count()


[49500, 50300, 49700, 49900, 50100, 49600, 49800, 50400, 50000, 50200]
10

Column sum with Spark.sql.dataframe


In [103]:
from pyspark.sql.types import *

In [104]:
pyspark_df = sqlCtx.createDataFrame(pand_tmp)

In [105]:
pyspark_df.take(2)


Out[105]:
[Row(x0=0, x1=1, x2=2, x3=3, x4=4, x5=5, x6=6, x7=7, x8=8, x9=9),
 Row(x0=10, x1=11, x2=12, x3=13, x4=14, x5=15, x6=16, x7=17, x8=18, x9=19)]

In [106]:
for i in pyspark_df.columns:
    print pyspark_df.groupBy().sum(i).collect()


[Row(SUM(x0)=49500)]
[Row(SUM(x1)=49600)]
[Row(SUM(x2)=49700)]
[Row(SUM(x3)=49800)]
[Row(SUM(x4)=49900)]
[Row(SUM(x5)=50000)]
[Row(SUM(x6)=50100)]
[Row(SUM(x7)=50200)]
[Row(SUM(x8)=50300)]
[Row(SUM(x9)=50400)]