In [1]:
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext

# print SPARK HOME
spark_home = os.environ.get('SPARK_HOME', None)
print (spark_home)
sqlContext = SQLContext(sc)


/home/jjgarcia/clase/spark

In [2]:
rdd_txt = sc.textFile("/home/jjgarcia/clase/spark/AOL-user-ct-collection/user-ct-test-collection-*.txt")

In [4]:
#Parallel (2 m)
rdd_txt.count()

In [5]:
rdd_txt.take(5)


Out[5]:
[u'AnonID\tQuery\tQueryTime\tItemRank\tClickURL',
 u'997\tcarreermanagement.com\t2006-03-01 10:54:58\t\t',
 u'997\tbeauty and the geek\t2006-03-01 20:50:00\t2\thttp://thewb.warnerbros.com',
 u'997\tgamec\t2006-03-02 22:57:37\t\t',
 u'997\tgamecheats\t2006-03-02 22:57:45\t\t']

In [6]:
header = rdd_txt.first()
header


Out[6]:
u'AnonID\tQuery\tQueryTime\tItemRank\tClickURL'

In [7]:
rdd_txt = rdd_txt.filter(lambda l: l.startswith("AnonID") == False)

In [11]:
#Paralel (2m)
rdd_txt.count()


Out[11]:
36389567

In [10]:
rdd_txt.map(lambda l: l.split("\t")[0]).distinct().count()


Out[10]:
657427

In [15]:
rdd_txt.groupBy(lambda l: l.split("\t")[0]).count()


Out[15]:
657427

In [19]:
rdd_txt.filter(lambda l: "australia" in l.lower()).map(lambda l: l.split("\t")[0]).distinct().count()


Out[19]:
3659

In [20]:
df = rdd_txt.map(lambda l: l.split("\t"))

In [21]:
from datetime import datetime

def raise_err(r):
    try:
        _ = int(r[0])
        _ = datetime.strptime(r[2], "%Y-%m-%d %H:%M:%S")
        return False
    except:
        return True

err = df.filter(lambda r: raise_err(r))
err.count()


Out[21]:
1

In [22]:
err.take(5)


Out[22]:
[[u'\x19403684',
  u'match.com',
  u'2006-03-31 06:55:53',
  u'2',
  u'http://www.match.com']]

In [23]:
df = df.filter(lambda r: raise_err(r) == False)

In [24]:
df = df.map(lambda r: (int(r[0]), r[1], datetime.strptime(r[2], "%Y-%m-%d %H:%M:%S")))

In [25]:
df.first()


Out[25]:
(997, u'carreermanagement.com', datetime.datetime(2006, 3, 1, 10, 54, 58))

In [26]:
from pyspark.sql.types import *
schema = StructType([
                        StructField("id", IntegerType(), True),
                        StructField("query", StringType(), True),
                        StructField("time", TimestampType(), True)
                    ])

df = sqlContext.createDataFrame(df, schema)
df.registerTempTable("aol")

In [27]:
sqlContext.sql("select * from aol where id = 711391 order by time").show()


+------+--------------------+--------------------+
|    id|               query|                time|
+------+--------------------+--------------------+
|711391|can not sleep wit...|2006-03-01 01:24:...|
|711391|cannot sleep with...|2006-03-01 01:24:...|
|711391|cannot sleep with...|2006-03-01 01:24:...|
|711391|cannot sleep with...|2006-03-01 01:33:...|
|711391|  jackie zeaman nude|2006-03-01 15:26:...|
|711391|   jackie zeman nude|2006-03-01 15:26:...|
|711391|      strange cosmos|2006-03-01 16:07:...|
|711391|mansfield first a...|2006-03-01 16:09:...|
|711391|mansfield first a...|2006-03-01 16:09:...|
|711391|reverend harry myers|2006-03-01 16:10:...|
|711391|reverend harry myers|2006-03-01 16:10:...|
|711391|   national enquirer|2006-03-01 17:13:...|
|711391|how to kill mocki...|2006-03-01 17:18:...|
|711391|how to kill mocki...|2006-03-01 17:18:...|
|711391|how to kill annoy...|2006-03-01 17:18:...|
|711391|how to kill annoy...|2006-03-01 17:19:...|
|711391|how to rid your y...|2006-03-01 17:23:...|
|711391|how to rid your y...|2006-03-01 17:23:...|
|711391|how to rid your y...|2006-03-01 17:24:...|
|711391|how do i get mock...|2006-03-01 17:27:...|
+------+--------------------+--------------------+
only showing top 20 rows


In [28]:
df.agg({"time": "min"}).collect()


Out[28]:
[Row(min(time)=datetime.datetime(2006, 3, 1, 0, 1, 3))]

In [29]:
user_71845 = df[df.id == 71845]

pd_71845 = user_71845.toPandas()

In [30]:
pd_71845.head()


Out[30]:
id query time
0 71845 - 2006-04-04 06:12:36
1 71845 - 2006-04-04 06:57:56
2 71845 - 2006-04-04 07:11:52
3 71845 - 2006-04-04 07:12:02
4 71845 - 2006-04-04 07:14:05

In [31]:
sql_by_dates = """
               select cast(time as Date) as dt
                     ,count(*) as cnt
               from aol
               group by cast(time as Date)
               order by dt
               """

by_dates = sqlContext.sql(sql_by_dates)

In [53]:
pd_by_dates = by_dates.toPandas()
pd_by_dates.head()


Out[53]:
dt cnt
0 2006-03-01 454226
1 2006-03-02 474107
2 2006-03-03 428053
3 2006-03-04 467858
4 2006-03-05 515973

In [50]:
from bokeh.charts import TimeSeries, Bar, output_notebook, show
from bokeh.models import PrintfTickFormatter
output_notebook()


BokehJS successfully loaded.

In [54]:
pd_by_dates.set_index("dt")


Out[54]:
cnt
dt
2006-03-01 454226
2006-03-02 474107
2006-03-03 428053
2006-03-04 467858
2006-03-05 515973
2006-03-06 476371
2006-03-07 470863
2006-03-08 456205
2006-03-09 455997
2006-03-10 421557
2006-03-11 446699
2006-03-12 502261
2006-03-13 476551
2006-03-14 467592
2006-03-15 464170
2006-03-16 445552
2006-03-17 414549
2006-03-18 449462
2006-03-19 507810
2006-03-20 474882
2006-03-21 464915
2006-03-22 463175
2006-03-23 448475
2006-03-24 415906
2006-03-25 449595
2006-03-26 496550
2006-03-27 459611
2006-03-28 462682
2006-03-29 443084
2006-03-30 418926
... ...
2006-05-02 326934
2006-05-03 376976
2006-05-04 402279
2006-05-05 371252
2006-05-06 387758
2006-05-07 444050
2006-05-08 437745
2006-05-09 427462
2006-05-10 424377
2006-05-11 425270
2006-05-12 389550
2006-05-13 394545
2006-05-14 419483
2006-05-15 445306
2006-05-16 362028
2006-05-17 3349
2006-05-18 420486
2006-05-19 397767
2006-05-20 402975
2006-05-21 455153
2006-05-22 459142
2006-05-23 450667
2006-05-24 413170
2006-05-25 399451
2006-05-26 378689
2006-05-27 371069
2006-05-28 353203
2006-05-29 435089
2006-05-30 436799
2006-05-31 461281

92 rows × 1 columns


In [56]:
data = dict(count=pd_by_dates["cnt"], Date=pd_by_dates.index)

In [ ]: