File ingestion


In [ ]:
rdd = sc.textFile("file:///home/bigdata/training/datasets/users.csv")

In [ ]:
rdd.count()

In [ ]:
rdd.take(5)

In [ ]:
payments = sc.parallelize([50,50,100,100,100,100])

In [ ]:
payments.collect()

Partitioning, Caching and immutability


In [ ]:
users = rdd.repartition(4)

In [ ]:
rdd.getNumPartitions()

In [ ]:
users.getNumPartitions()

In [ ]:
users.persist(StorageLevel.MEMORY_ONLY)

Filtering


In [ ]:
budapestiek = users.filter(lambda x: 'Budapest' in x)

In [ ]:
budapestiek.count()

In [ ]:
users.filter(lambda x: 'Budapest' in x).filter(lambda x: ',22' in x).take(5)

Mapping


In [ ]:
users.map(lambda x: x.split(",")).take(2)

In [ ]:
records = users.map(lambda x: x.split(","))

In [ ]:
records.map(lambda x: int(x[2])).take(2)

In [ ]:
records.map(lambda x: int(x[2])).filter(lambda x: x > 30).count()

In [ ]:
records.map(lambda x: x[1]).distinct().collect()

Sum and Average


In [ ]:
payments.sum()

In [ ]:
payments.reduce(lambda x, y: x+y)

In [ ]:
users.map(lambda x: int(x.split(",")[2])).aggregate((0,0), lambda x,y: (x[0]+1,x[1]+y), lambda x,y: (x[0]+y[0] , x[1]+y[1]))

In [ ]:
users.count()

KeyRDDs


In [ ]:
payments = sc.textFile("payments.csv")

In [ ]:
u = records.map(lambda x: (int(x[0]), x))

In [ ]:
pr = payments.map(lambda x: x.split(","))
p = payments.map(lambda x: (int(x.split(",")[1]), x.split(",")))

In [ ]:
p.take(5)

In [ ]:
pr.map(lambda x: (x[0],int(x[2]))).groupByKey().map(lambda x: (x[0],list(x[1]))).take(2)

In [ ]:
pr.map(lambda x: (x[0],int(x[2]))).reduceByKey(lambda x, y: x + y).collect()

In [ ]:
u.join(p).take(5)

Accumulators


In [ ]:
d = {
    'Budapest': 'Budapest',
    'Debrecen': 'Hajdu-Bihar',
    'Gyor': 'Gyor-Moson-Sopron',
    'Sopron': 'Gyor-Moson-Sopron'
}
megyek = sc.broadcast(d)

In [ ]:
pr.filter(lambda x: x[3] == 'MasterCard').map(lambda x: (int(x[1]),int(x[2]))).join(u)\
   .map(lambda x: (megyek.value[ x[1][1][1] ], int(x[1][0]))).reduceByKey(lambda x, y: x + y).collect()

In [ ]:
ucleansed = records.map(lambda x: [int(x[0]), x[1], int(x[2])])

DataFrames


In [ ]:
df = ucleansed.toDF(['id','city','age'])

In [ ]:
df.schema

In [ ]:
df.printSchema()

In [ ]:
df.first()

In [ ]:
df.take(1)

In [ ]:
l = df.limit(10)

In [ ]:
l.show()

In [ ]:
l.toPandas()

groupping


In [ ]:
p = l.toPandas()
p.groupby("city").mean()

In [ ]:
df.count()

In [ ]:
df.filter(df["age"] >= 30).count()

In [ ]:
df.select(df['city'], df['age'])

In [ ]:
df.select(df['city']).distinct().toPandas()

In [ ]:
g = df.groupBy(df["city"])
g.avg().toPandas()

SQL


In [ ]:
df.registerTempTable("users");

In [ ]:
result = sqlContext.sql("SELECT city, count(*) as cnt FROM users GROUP BY city ORDER BY cnt DESC")

In [ ]:
result.toPandas()

In [ ]:
result.write.json()