In [1]:
from edx.idea.data_frame import DataFrame
from edx.idea.sql import sql_query
from collections import namedtuple

Loading data to work with


In [2]:
df = DataFrame.from_url('gibberish.txt')

Note that the following command only reads part of the file and returns the first 5 lines. It need not read the entire file.


In [3]:
df.take(5)


Out[3]:
[u'Lorem ipsum dolor sit amet, consectetur adipiscing elit.',
 u'Integer euismod lacus nec mi dignissim porta.',
 u'Aenean vel libero ac nulla sodales lacinia in vitae ex.',
 u'Fusce vitae orci id erat pretium aliquet et vel augue.',
 u'Aenean cursus nisl vitae facilisis vehicula.']

Word Count (obligatory)

Below is the classic word count problem implemented using this API. Note that the wc_mapper and wc_reducer closures are actually serialized and shipped to slave nodes to execute over various partitions of the data.


In [4]:
def wc_mapper(line):
    for word in line.split(' '):
        yield (word.rstrip(".,\r\n").lower(), 1)

def wc_reducer(word, counts):
    yield (word, sum(counts))

counted_words = df.map_reduce(wc_mapper, wc_reducer)

After the previous code was executed, no computation has actually been done. Once I actually request the results of the compuation, it is the promise is resolved, data is computed and then returned to the driver.


In [5]:
dict(counted_words.collect()[:10])


Out[5]:
{u'ac': 18,
 u'auctor': 14,
 u'felis': 14,
 u'justo': 8,
 u'quam': 6,
 u'sagittis': 6,
 u'semper': 8,
 u'suscipit': 9,
 u'urna': 17,
 u'varius': 4}

SQL


In [6]:
TextLine = namedtuple('TextLine', ['line'])

In order to know what to call columns etc, the data must be structured somewhat. We use a namedtuple to provide that structure, assigning names to values.


In [7]:
def convert(l):
    yield TextLine(line=l)
df2 = df.map(convert)
df2.take(1)


Out[7]:
[TextLine(line=u'Lorem ipsum dolor sit amet, consectetur adipiscing elit.')]

We can then save the DataFrame as a table. Note that this overwrites any existing table with that name.


In [8]:
df2.to_table(table_name='gibberish')


Deleted file:///tmp/spark/warehouse/gibberish
Out[8]:
<edx.idea.data_frame.DataFrame at 0x7f2dde515fd0>

In [9]:
res = sql_query('SELECT line FROM gibberish')

Again, this SQL query has not yet been executed, it is not until we actually try to use the results that it is run.


In [10]:
res.take(10)


Out[10]:
[Row(line=u'Nullam ut enim in urna hendrerit luctus et ut leo.'),
 Row(line=u'Aenean lacinia metus a ipsum bibendum egestas.'),
 Row(line=u'Phasellus nec arcu dapibus, elementum ex sit amet, dapibus sem.'),
 Row(line=u'Integer feugiat magna eget urna porta, at faucibus eros molestie.'),
 Row(line=u'Nullam faucibus odio porttitor, fermentum felis eget, consectetur augue.'),
 Row(line=u'Vivamus in massa sed sem vulputate pellentesque.'),
 Row(line=u'Nunc nec orci eget purus ullamcorper auctor eget eu justo.'),
 Row(line=u'Maecenas nec turpis ac nisl pharetra condimentum at sed sem.'),
 Row(line=u'Proin viverra turpis at blandit sollicitudin.'),
 Row(line=u'Vestibulum et risus feugiat, molestie dolor nec, ultrices sapien.')]

Tables can be partitioned using a primary key. When writing to a partioned table, only the modified partitions are overwritten.

Note that if a partition is written to, it is entirely overwritten, so the replacement data must be complete.


In [11]:
PartitionedTextLine = namedtuple('TextLine', ['partition', 'line'])

In [12]:
def part_convert(l):
    yield PartitionedTextLine(partition=(len(l) % 5), line=l)
part_df = df.map(part_convert)

In [13]:
part_df.to_table(table_name='partitioned', primary_key='partition')


Deleted file:///tmp/spark/warehouse/partitioned/partition=3
Deleted file:///tmp/spark/warehouse/partitioned/partition=2
Deleted file:///tmp/spark/warehouse/partitioned/partition=4
Deleted file:///tmp/spark/warehouse/partitioned/partition=1
Deleted file:///tmp/spark/warehouse/partitioned/partition=0
Out[13]:
<edx.idea.data_frame.DataFrame at 0x7f2dde4cdc50>

Note the first few records of partition 0.


In [14]:
sql_query("SELECT line FROM partitioned WHERE partition=0 LIMIT 10").collect()


Out[14]:
[Row(line=u'Nullam ut enim in urna hendrerit luctus et ut leo.'),
 Row(line=u'Integer feugiat magna eget urna porta, at faucibus eros molestie.'),
 Row(line=u'Maecenas nec turpis ac nisl pharetra condimentum at sed sem.'),
 Row(line=u'Proin viverra turpis at blandit sollicitudin.'),
 Row(line=u'Vestibulum et risus feugiat, molestie dolor nec, ultrices sapien.'),
 Row(line=u'Quisque at ante faucibus, pellentesque velit elementum, ullamcorper tellus.'),
 Row(line=u'Cras et turpis non augue porta vehicula.'),
 Row(line=u'Etiam ac sem commodo, rutrum urna id, elementum turpis.'),
 Row(line=u'Nam eget quam bibendum, aliquam nunc vitae, consequat massa.'),
 Row(line=u'Etiam non justo convallis, sollicitudin erat vel, suscipit justo.')]

Now we create a new DataFrame containing records only in partition 0 and convert all of the strings to upper case.

Note that when we call to_table on this DataFrame it only replaces partition 0, all other partitions are untouched.


In [15]:
def part_convert_upper(l):
    part = len(l) % 5
    if part == 0:
        yield PartitionedTextLine(partition=part, line=l.upper())

df.map(part_convert_upper).to_table(table_name='partitioned', primary_key='partition')


Deleted file:///tmp/spark/warehouse/partitioned/partition=0
Out[15]:
<edx.idea.data_frame.DataFrame at 0x7f2dde4dd610>

Partition 0 records are now all uppercase.


In [16]:
sql_query("SELECT line FROM partitioned WHERE partition=0 LIMIT 10").collect()


Out[16]:
[Row(line=u'NULLAM UT ENIM IN URNA HENDRERIT LUCTUS ET UT LEO.'),
 Row(line=u'INTEGER FEUGIAT MAGNA EGET URNA PORTA, AT FAUCIBUS EROS MOLESTIE.'),
 Row(line=u'MAECENAS NEC TURPIS AC NISL PHARETRA CONDIMENTUM AT SED SEM.'),
 Row(line=u'PROIN VIVERRA TURPIS AT BLANDIT SOLLICITUDIN.'),
 Row(line=u'VESTIBULUM ET RISUS FEUGIAT, MOLESTIE DOLOR NEC, ULTRICES SAPIEN.'),
 Row(line=u'QUISQUE AT ANTE FAUCIBUS, PELLENTESQUE VELIT ELEMENTUM, ULLAMCORPER TELLUS.'),
 Row(line=u'CRAS ET TURPIS NON AUGUE PORTA VEHICULA.'),
 Row(line=u'ETIAM AC SEM COMMODO, RUTRUM URNA ID, ELEMENTUM TURPIS.'),
 Row(line=u'NAM EGET QUAM BIBENDUM, ALIQUAM NUNC VITAE, CONSEQUAT MASSA.'),
 Row(line=u'ETIAM NON JUSTO CONVALLIS, SOLLICITUDIN ERAT VEL, SUSCIPIT JUSTO.')]

Other partitions are still there and still lowercase.


In [17]:
sql_query("SELECT line FROM partitioned WHERE partition=1 LIMIT 10").collect()


Out[17]:
[Row(line=u'Aenean lacinia metus a ipsum bibendum egestas.'),
 Row(line=u'Quisque et nisi nec ipsum dictum lobortis at vitae urna.'),
 Row(line=u'Duis vitae erat tempus dui fringilla accumsan.'),
 Row(line=u'Donec porttitor neque at nulla rutrum blandit.'),
 Row(line=u'Nunc at est et leo mollis tristique.'),
 Row(line=u'Donec lobortis metus et mi dignissim suscipit.'),
 Row(line=u'Duis et mi nec erat elementum egestas vel ut nulla.'),
 Row(line=u'Integer et nunc non augue rutrum vulputate ut in tellus.'),
 Row(line=u'Fusce quis eros eu urna elementum efficitur id eget dui.'),
 Row(line=u'Morbi ut tortor eu felis gravida lacinia euismod ut dui.')]