In [1]:
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext
# print SPARK HOME
spark_home = os.environ.get('SPARK_HOME', None)
print (spark_home)
sqlContext = SQLContext(sc)
In [2]:
rdd_txt = sc.textFile("/home/jjgarcia/clase/spark/AOL-user-ct-collection/user-ct-test-collection-*.txt")
In [4]:
#Parallel (2 m)
rdd_txt.count()
In [5]:
rdd_txt.take(5)
Out[5]:
In [6]:
header = rdd_txt.first()
header
Out[6]:
In [7]:
rdd_txt = rdd_txt.filter(lambda l: l.startswith("AnonID") == False)
In [11]:
#Paralel (2m)
rdd_txt.count()
Out[11]:
In [10]:
rdd_txt.map(lambda l: l.split("\t")[0]).distinct().count()
Out[10]:
In [15]:
rdd_txt.groupBy(lambda l: l.split("\t")[0]).count()
Out[15]:
In [19]:
rdd_txt.filter(lambda l: "australia" in l.lower()).map(lambda l: l.split("\t")[0]).distinct().count()
Out[19]:
In [20]:
df = rdd_txt.map(lambda l: l.split("\t"))
In [21]:
from datetime import datetime
def raise_err(r):
try:
_ = int(r[0])
_ = datetime.strptime(r[2], "%Y-%m-%d %H:%M:%S")
return False
except:
return True
err = df.filter(lambda r: raise_err(r))
err.count()
Out[21]:
In [22]:
err.take(5)
Out[22]:
In [23]:
df = df.filter(lambda r: raise_err(r) == False)
In [24]:
df = df.map(lambda r: (int(r[0]), r[1], datetime.strptime(r[2], "%Y-%m-%d %H:%M:%S")))
In [25]:
df.first()
Out[25]:
In [26]:
from pyspark.sql.types import *
schema = StructType([
StructField("id", IntegerType(), True),
StructField("query", StringType(), True),
StructField("time", TimestampType(), True)
])
df = sqlContext.createDataFrame(df, schema)
df.registerTempTable("aol")
In [27]:
sqlContext.sql("select * from aol where id = 711391 order by time").show()
In [28]:
df.agg({"time": "min"}).collect()
Out[28]:
In [29]:
user_71845 = df[df.id == 71845]
pd_71845 = user_71845.toPandas()
In [30]:
pd_71845.head()
Out[30]:
In [31]:
sql_by_dates = """
select cast(time as Date) as dt
,count(*) as cnt
from aol
group by cast(time as Date)
order by dt
"""
by_dates = sqlContext.sql(sql_by_dates)
In [53]:
pd_by_dates = by_dates.toPandas()
pd_by_dates.head()
Out[53]:
In [50]:
from bokeh.charts import TimeSeries, Bar, output_notebook, show
from bokeh.models import PrintfTickFormatter
output_notebook()
In [54]:
pd_by_dates.set_index("dt")
Out[54]:
In [56]:
data = dict(count=pd_by_dates["cnt"], Date=pd_by_dates.index)
In [ ]: