File ingestion
In [ ]:
rdd = sc.textFile("file:///home/bigdata/training/datasets/users.csv")
In [ ]:
rdd.count()
In [ ]:
rdd.take(5)
In [ ]:
payments = sc.parallelize([50,50,100,100,100,100])
In [ ]:
payments.collect()
Partitioning, Caching and immutability
In [ ]:
users = rdd.repartition(4)
In [ ]:
rdd.getNumPartitions()
In [ ]:
users.getNumPartitions()
In [ ]:
users.persist(StorageLevel.MEMORY_ONLY)
Filtering
In [ ]:
budapestiek = users.filter(lambda x: 'Budapest' in x)
In [ ]:
budapestiek.count()
In [ ]:
users.filter(lambda x: 'Budapest' in x).filter(lambda x: ',22' in x).take(5)
Mapping
In [ ]:
users.map(lambda x: x.split(",")).take(2)
In [ ]:
records = users.map(lambda x: x.split(","))
In [ ]:
records.map(lambda x: int(x[2])).take(2)
In [ ]:
records.map(lambda x: int(x[2])).filter(lambda x: x > 30).count()
In [ ]:
records.map(lambda x: x[1]).distinct().collect()
Sum and Average
In [ ]:
payments.sum()
In [ ]:
payments.reduce(lambda x, y: x+y)
In [ ]:
users.map(lambda x: int(x.split(",")[2])).aggregate((0,0), lambda x,y: (x[0]+1,x[1]+y), lambda x,y: (x[0]+y[0] , x[1]+y[1]))
In [ ]:
users.count()
KeyRDDs
In [ ]:
payments = sc.textFile("payments.csv")
In [ ]:
u = records.map(lambda x: (int(x[0]), x))
In [ ]:
pr = payments.map(lambda x: x.split(","))
p = payments.map(lambda x: (int(x.split(",")[1]), x.split(",")))
In [ ]:
p.take(5)
In [ ]:
pr.map(lambda x: (x[0],int(x[2]))).groupByKey().map(lambda x: (x[0],list(x[1]))).take(2)
In [ ]:
pr.map(lambda x: (x[0],int(x[2]))).reduceByKey(lambda x, y: x + y).collect()
In [ ]:
u.join(p).take(5)
Accumulators
In [ ]:
d = {
'Budapest': 'Budapest',
'Debrecen': 'Hajdu-Bihar',
'Gyor': 'Gyor-Moson-Sopron',
'Sopron': 'Gyor-Moson-Sopron'
}
megyek = sc.broadcast(d)
In [ ]:
pr.filter(lambda x: x[3] == 'MasterCard').map(lambda x: (int(x[1]),int(x[2]))).join(u)\
.map(lambda x: (megyek.value[ x[1][1][1] ], int(x[1][0]))).reduceByKey(lambda x, y: x + y).collect()
In [ ]:
ucleansed = records.map(lambda x: [int(x[0]), x[1], int(x[2])])
DataFrames
In [ ]:
df = ucleansed.toDF(['id','city','age'])
In [ ]:
df.schema
In [ ]:
df.printSchema()
In [ ]:
df.first()
In [ ]:
df.take(1)
In [ ]:
l = df.limit(10)
In [ ]:
l.show()
In [ ]:
l.toPandas()
groupping
In [ ]:
p = l.toPandas()
p.groupby("city").mean()
In [ ]:
df.count()
In [ ]:
df.filter(df["age"] >= 30).count()
In [ ]:
df.select(df['city'], df['age'])
In [ ]:
df.select(df['city']).distinct().toPandas()
In [ ]:
g = df.groupBy(df["city"])
g.avg().toPandas()
SQL
In [ ]:
df.registerTempTable("users");
In [ ]:
result = sqlContext.sql("SELECT city, count(*) as cnt FROM users GROUP BY city ORDER BY cnt DESC")
In [ ]:
result.toPandas()
In [ ]:
result.write.json()