In [13]:
from pyspark import SparkContext
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import json
import os
sc = SparkContext()

In [14]:
def get_rdd(base, input, num_part):
        base_dir = os.path.join(base)
        input_path = os.path.join(input)
        file_name = os.path.join(base_dir, input_path)
        rdd = sc.textFile(file_name, num_part)
        rdd_j = rdd.map(json.loads)
        rdd_j.cache()
        return rdd_j

In [15]:
def tf(tokens):
    res = dict()
    addon = 1.0 / len(tokens)
    for tok in tokens:
        res[tok] = res.setdefault(tok, 0) + addon
    return res

In [16]:
def idfs(corpus):
    N = float(corpus.count())
    uniqueTokens = corpus.flatMap(lambda x: x[1]).distinct()
    tokenCountPairTuple = corpus.flatMap(lambda x: set(x[1])).map(lambda x: (x, 1))
    tokenSumPairTuple = tokenCountPairTuple.reduceByKey(lambda a, b: a + b)
    return (tokenSumPairTuple.map(lambda (tok, num): (tok, N / num )))

In [17]:
def tfidf(tokens, idfs):
    """ Compute TF-IDF
    Args:
        tokens (list of str): input list of tokens from tokenize
        idfs (dictionary): record to IDF value
    Returns:
        dictionary: a dictionary of records to TF-IDF values
    """
    tfs = tf(tokens)
    tfIdfDict = {key: idfs[key] * tfs[key] for key in tokens}
    return tfIdfDict

In [18]:
num_part = 4
revs = get_rdd('../data', 'reviews_electronics5000.json', num_part)
rev_texts = revs.map(lambda x: (x['asin'], x['reviewText']))
rev_agg_texts = rev_texts.map(lambda (asin, text): (asin, [text])).reduceByKey(lambda x, y: x + y)
rev_agg = rev_agg_texts.map(lambda (asin, revs): (asin, ' '.join(revs)))
rev_agg = rev_agg.map(lambda (asin, rev): (asin, word_tokenize(rev)))
rev_agg.map(lambda (asin, toks): (asin, tf(toks)))
rev_agg.cache()


Out[18]:
PythonRDD[7] at RDD at PythonRDD.scala:43

In [26]:
# tf
tfs = rev_agg.map(lambda (asin, toks): (asin, tf(toks)))

In [20]:
# idf
# use the whole category as idf corpus
idfs_cat = idfs(rev_agg)

In [22]:
idfs_cat.take(5)


Out[22]:
[(u'DVD+R', 299.0),
 (u'1,2', 299.0),
 (u'four', 13.0),
 (u'gag', 299.0),
 (u'recommended.UPDATE', 299.0)]

In [23]:
idfs_cat.lookup('1,2')


Out[23]:
[299.0]

In [21]:
rev_agg.map(lambda (asin, toks): (asin, toks)).take(10)


---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-21-5fc72a66dedb> in <module>()
----> 1 rev_agg.map(lambda (asin, toks): (asin, tfidf(toks, idfs_cat))).take(10)

/home/cs598rk/spark/python/pyspark/rdd.pyc in take(self, num)
   1297 
   1298             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1299             res = self.context.runJob(self, takeUpToNumLeft, p)
   1300 
   1301             items += res

/home/cs598rk/spark/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    914         # SparkContext#runJob.
    915         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 916         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    917         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    918 

/home/cs598rk/spark/python/pyspark/rdd.pyc in _jrdd(self)
   2386         command = (self.func, profiler, self._prev_jrdd_deserializer,
   2387                    self._jrdd_deserializer)
-> 2388         pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
   2389         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
   2390                                              bytearray(pickled_cmd),

/home/cs598rk/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
   2306     # the serialized command will be compressed by broadcast
   2307     ser = CloudPickleSerializer()
-> 2308     pickled_command = ser.dumps(command)
   2309     if len(pickled_command) > (1 << 20):  # 1M
   2310         # The broadcast will have same life cycle as created PythonRDD

/home/cs598rk/spark/python/pyspark/serializers.pyc in dumps(self, obj)
    426 
    427     def dumps(self, obj):
--> 428         return cloudpickle.dumps(obj, 2)
    429 
    430 

/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol)
    644 
    645     cp = CloudPickler(file,protocol)
--> 646     cp.dump(obj)
    647 
    648     return file.getvalue()

/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
    105         self.inject_addons()
    106         try:
--> 107             return Pickler.dump(self, obj)
    108         except RuntimeError as e:
    109             if 'recursion' in e.args[0]:

/opt/anaconda/lib/python2.7/pickle.pyc in dump(self, obj)
    222         if self.proto >= 2:
    223             self.write(PROTO + chr(self.proto))
--> 224         self.save(obj)
    225         self.write(STOP)
    226 

/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/opt/anaconda/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    560         write(MARK)
    561         for element in obj:
--> 562             save(element)
    563 
    564         if id(obj) in memo:

/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
    197             klass = getattr(themodule, name, None)
    198             if klass is None or klass is not obj:
--> 199                 self.save_function_tuple(obj)
    200                 return
    201 

/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
    234         # create a skeleton function object and memoize it
    235         save(_make_skel_func)
--> 236         save((code, closure, base_globals))
    237         write(pickle.REDUCE)
    238         self.memoize(func)

/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/opt/anaconda/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    546         if n <= 3 and proto >= 2:
    547             for element in obj:
--> 548                 save(element)
    549             # Subtle.  Same as in the big comment below.
    550             if id(obj) in memo:

/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/opt/anaconda/lib/python2.7/pickle.pyc in save_list(self, obj)
    598 
    599         self.memoize(obj)
--> 600         self._batch_appends(iter(obj))
    601 
    602     dispatch[ListType] = save_list

/opt/anaconda/lib/python2.7/pickle.pyc in _batch_appends(self, items)
    631                 write(MARK)
    632                 for x in tmp:
--> 633                     save(x)
    634                 write(APPENDS)
    635             elif n:

/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
    197             klass = getattr(themodule, name, None)
    198             if klass is None or klass is not obj:
--> 199                 self.save_function_tuple(obj)
    200                 return
    201 

/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
    234         # create a skeleton function object and memoize it
    235         save(_make_skel_func)
--> 236         save((code, closure, base_globals))
    237         write(pickle.REDUCE)
    238         self.memoize(func)

/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/opt/anaconda/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    546         if n <= 3 and proto >= 2:
    547             for element in obj:
--> 548                 save(element)
    549             # Subtle.  Same as in the big comment below.
    550             if id(obj) in memo:

/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/opt/anaconda/lib/python2.7/pickle.pyc in save_list(self, obj)
    598 
    599         self.memoize(obj)
--> 600         self._batch_appends(iter(obj))
    601 
    602     dispatch[ListType] = save_list

/opt/anaconda/lib/python2.7/pickle.pyc in _batch_appends(self, items)
    634                 write(APPENDS)
    635             elif n:
--> 636                 save(tmp[0])
    637                 write(APPEND)
    638             # else tmp is empty, and we're done

/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
    191         if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
    192             #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
--> 193             self.save_function_tuple(obj)
    194             return
    195         else:

/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
    239 
    240         # save the rest of the func data needed by _fill_function
--> 241         save(f_globals)
    242         save(defaults)
    243         save(dct)

/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/opt/anaconda/lib/python2.7/pickle.pyc in save_dict(self, obj)
    647 
    648         self.memoize(obj)
--> 649         self._batch_setitems(obj.iteritems())
    650 
    651     dispatch[DictionaryType] = save_dict

/opt/anaconda/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
    679                 for k, v in tmp:
    680                     save(k)
--> 681                     save(v)
    682                 write(SETITEMS)
    683             elif n:

/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    304             reduce = getattr(obj, "__reduce_ex__", None)
    305             if reduce:
--> 306                 rv = reduce(self.proto)
    307             else:
    308                 reduce = getattr(obj, "__reduce__", None)

/home/cs598rk/spark/python/pyspark/rdd.pyc in __getnewargs__(self)
    204         # This method is called when attempting to pickle an RDD, which is always an error:
    205         raise Exception(
--> 206             "It appears that you are attempting to broadcast an RDD or reference an RDD from an "
    207             "action or transformation. RDD transformations and actions can only be invoked by the "
    208             "driver, not inside of other transformations; for example, "

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

In [ ]: