In [1]:
from edx.idea.data_frame import DataFrame
from edx.idea.sql import sql_query
from collections import namedtuple
In [2]:
df = DataFrame.from_url('gibberish.txt')
Note that the following command only reads part of the file and returns the first 5 lines. It need not read the entire file.
In [3]:
df.take(5)
Out[3]:
In [4]:
def wc_mapper(line):
for word in line.split(' '):
yield (word.rstrip(".,\r\n").lower(), 1)
def wc_reducer(word, counts):
yield (word, sum(counts))
counted_words = df.map_reduce(wc_mapper, wc_reducer)
After the previous code was executed, no computation has actually been done. Once I actually request the results of the compuation, it is the promise is resolved, data is computed and then returned to the driver.
In [5]:
dict(counted_words.collect()[:10])
Out[5]:
In [6]:
TextLine = namedtuple('TextLine', ['line'])
In order to know what to call columns etc, the data must be structured somewhat. We use a namedtuple to provide that structure, assigning names to values.
In [7]:
def convert(l):
yield TextLine(line=l)
df2 = df.map(convert)
df2.take(1)
Out[7]:
We can then save the DataFrame as a table. Note that this overwrites any existing table with that name.
In [8]:
df2.to_table(table_name='gibberish')
Out[8]:
In [9]:
res = sql_query('SELECT line FROM gibberish')
Again, this SQL query has not yet been executed, it is not until we actually try to use the results that it is run.
In [10]:
res.take(10)
Out[10]:
Tables can be partitioned using a primary key. When writing to a partioned table, only the modified partitions are overwritten.
Note that if a partition is written to, it is entirely overwritten, so the replacement data must be complete.
In [11]:
PartitionedTextLine = namedtuple('TextLine', ['partition', 'line'])
In [12]:
def part_convert(l):
yield PartitionedTextLine(partition=(len(l) % 5), line=l)
part_df = df.map(part_convert)
In [13]:
part_df.to_table(table_name='partitioned', primary_key='partition')
Out[13]:
Note the first few records of partition 0.
In [14]:
sql_query("SELECT line FROM partitioned WHERE partition=0 LIMIT 10").collect()
Out[14]:
Now we create a new DataFrame containing records only in partition 0 and convert all of the strings to upper case.
Note that when we call to_table
on this DataFrame it only replaces partition 0, all other partitions are untouched.
In [15]:
def part_convert_upper(l):
part = len(l) % 5
if part == 0:
yield PartitionedTextLine(partition=part, line=l.upper())
df.map(part_convert_upper).to_table(table_name='partitioned', primary_key='partition')
Out[15]:
Partition 0 records are now all uppercase.
In [16]:
sql_query("SELECT line FROM partitioned WHERE partition=0 LIMIT 10").collect()
Out[16]:
Other partitions are still there and still lowercase.
In [17]:
sql_query("SELECT line FROM partitioned WHERE partition=1 LIMIT 10").collect()
Out[17]: