---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-21-5fc72a66dedb> in <module>()
----> 1 rev_agg.map(lambda (asin, toks): (asin, tfidf(toks, idfs_cat))).take(10)
/home/cs598rk/spark/python/pyspark/rdd.pyc in take(self, num)
1297
1298 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1299 res = self.context.runJob(self, takeUpToNumLeft, p)
1300
1301 items += res
/home/cs598rk/spark/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
914 # SparkContext#runJob.
915 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 916 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
917 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
918
/home/cs598rk/spark/python/pyspark/rdd.pyc in _jrdd(self)
2386 command = (self.func, profiler, self._prev_jrdd_deserializer,
2387 self._jrdd_deserializer)
-> 2388 pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
2389 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
2390 bytearray(pickled_cmd),
/home/cs598rk/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
2306 # the serialized command will be compressed by broadcast
2307 ser = CloudPickleSerializer()
-> 2308 pickled_command = ser.dumps(command)
2309 if len(pickled_command) > (1 << 20): # 1M
2310 # The broadcast will have same life cycle as created PythonRDD
/home/cs598rk/spark/python/pyspark/serializers.pyc in dumps(self, obj)
426
427 def dumps(self, obj):
--> 428 return cloudpickle.dumps(obj, 2)
429
430
/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol)
644
645 cp = CloudPickler(file,protocol)
--> 646 cp.dump(obj)
647
648 return file.getvalue()
/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
105 self.inject_addons()
106 try:
--> 107 return Pickler.dump(self, obj)
108 except RuntimeError as e:
109 if 'recursion' in e.args[0]:
/opt/anaconda/lib/python2.7/pickle.pyc in dump(self, obj)
222 if self.proto >= 2:
223 self.write(PROTO + chr(self.proto))
--> 224 self.save(obj)
225 self.write(STOP)
226
/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/anaconda/lib/python2.7/pickle.pyc in save_tuple(self, obj)
560 write(MARK)
561 for element in obj:
--> 562 save(element)
563
564 if id(obj) in memo:
/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
197 klass = getattr(themodule, name, None)
198 if klass is None or klass is not obj:
--> 199 self.save_function_tuple(obj)
200 return
201
/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
234 # create a skeleton function object and memoize it
235 save(_make_skel_func)
--> 236 save((code, closure, base_globals))
237 write(pickle.REDUCE)
238 self.memoize(func)
/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/anaconda/lib/python2.7/pickle.pyc in save_tuple(self, obj)
546 if n <= 3 and proto >= 2:
547 for element in obj:
--> 548 save(element)
549 # Subtle. Same as in the big comment below.
550 if id(obj) in memo:
/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/anaconda/lib/python2.7/pickle.pyc in save_list(self, obj)
598
599 self.memoize(obj)
--> 600 self._batch_appends(iter(obj))
601
602 dispatch[ListType] = save_list
/opt/anaconda/lib/python2.7/pickle.pyc in _batch_appends(self, items)
631 write(MARK)
632 for x in tmp:
--> 633 save(x)
634 write(APPENDS)
635 elif n:
/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
197 klass = getattr(themodule, name, None)
198 if klass is None or klass is not obj:
--> 199 self.save_function_tuple(obj)
200 return
201
/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
234 # create a skeleton function object and memoize it
235 save(_make_skel_func)
--> 236 save((code, closure, base_globals))
237 write(pickle.REDUCE)
238 self.memoize(func)
/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/anaconda/lib/python2.7/pickle.pyc in save_tuple(self, obj)
546 if n <= 3 and proto >= 2:
547 for element in obj:
--> 548 save(element)
549 # Subtle. Same as in the big comment below.
550 if id(obj) in memo:
/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/anaconda/lib/python2.7/pickle.pyc in save_list(self, obj)
598
599 self.memoize(obj)
--> 600 self._batch_appends(iter(obj))
601
602 dispatch[ListType] = save_list
/opt/anaconda/lib/python2.7/pickle.pyc in _batch_appends(self, items)
634 write(APPENDS)
635 elif n:
--> 636 save(tmp[0])
637 write(APPEND)
638 # else tmp is empty, and we're done
/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
192 #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
--> 193 self.save_function_tuple(obj)
194 return
195 else:
/home/cs598rk/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
239
240 # save the rest of the func data needed by _fill_function
--> 241 save(f_globals)
242 save(defaults)
243 save(dct)
/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
284 f = self.dispatch.get(t)
285 if f:
--> 286 f(self, obj) # Call unbound method with explicit self
287 return
288
/opt/anaconda/lib/python2.7/pickle.pyc in save_dict(self, obj)
647
648 self.memoize(obj)
--> 649 self._batch_setitems(obj.iteritems())
650
651 dispatch[DictionaryType] = save_dict
/opt/anaconda/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
679 for k, v in tmp:
680 save(k)
--> 681 save(v)
682 write(SETITEMS)
683 elif n:
/opt/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
304 reduce = getattr(obj, "__reduce_ex__", None)
305 if reduce:
--> 306 rv = reduce(self.proto)
307 else:
308 reduce = getattr(obj, "__reduce__", None)
/home/cs598rk/spark/python/pyspark/rdd.pyc in __getnewargs__(self)
204 # This method is called when attempting to pickle an RDD, which is always an error:
205 raise Exception(
--> 206 "It appears that you are attempting to broadcast an RDD or reference an RDD from an "
207 "action or transformation. RDD transformations and actions can only be invoked by the "
208 "driver, not inside of other transformations; for example, "
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.