In [1]:
sc.master


Out[1]:
'yarn'

In [1]:
logFile = "hdfs:///user/hadooper/"

In [2]:
logData = sc.textFile(logFile).cache()

In [3]:
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a:{} , lines with b: {}".format(numAs, numBs))


---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-3-644c3d36df0c> in <module>()
----> 1 numAs = logData.filter(lambda s: 'a' in s).count()
      2 numBs = logData.filter(lambda s: 'b' in s).count()
      3 print("Lines with a:{} , lines with b: {}".format(numAs, numBs))

/opt/spark/python/pyspark/rdd.py in count(self)
   1071         3
   1072         """
-> 1073         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   1074 
   1075     def stats(self):

/opt/spark/python/pyspark/rdd.py in sum(self)
   1062         6.0
   1063         """
-> 1064         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
   1065 
   1066     def count(self):

/opt/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
    933         # zeroValue provided to each partition is unique from the one provided
    934         # to the final reduce call
--> 935         vals = self.mapPartitions(func).collect()
    936         return reduce(op, vals, zeroValue)
    937 

/opt/spark/python/pyspark/rdd.py in collect(self)
    832         """
    833         with SCCallSiteSync(self.context) as css:
--> 834             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    835         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
    836 

/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1253             proto.END_COMMAND_PART
   1254 
-> 1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
   1257             answer, self.gateway_client, self.target_id, self.name)

/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in send_command(self, command, retry, binary)
    983         connection = self._get_connection()
    984         try:
--> 985             response = connection.send_command(command)
    986             if binary:
    987                 return response, self._create_connection_guard(connection)

/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in send_command(self, command)
   1150 
   1151         try:
-> 1152             answer = smart_decode(self.stream.readline()[:-1])
   1153             logger.debug("Answer received: {0}".format(answer))
   1154             if answer.startswith(proto.RETURN_MESSAGE):

~/anaconda3/lib/python3.6/socket.py in readinto(self, b)
    584         while True:
    585             try:
--> 586                 return self._sock.recv_into(b)
    587             except timeout:
    588                 self._timeout_occurred = True

KeyboardInterrupt: 
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/home/hadooper/anaconda3/lib/python3.6/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/home/hadooper/anaconda3/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()
  File "/opt/spark/python/pyspark/context.py", line 245, in signal_handler
    self.cancelAllJobs()
  File "/opt/spark/python/pyspark/context.py", line 983, in cancelAllJobs
    self._jsc.sc().cancelAllJobs()
AttributeError: 'NoneType' object has no attribute 'sc'

In [ ]: