In [1]:
sc.master
Out[1]:
'yarn'
In [1]:
logFile = "hdfs:///user/hadooper/"
In [2]:
logData = sc.textFile(logFile).cache()
In [3]:
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a:{} , lines with b: {}".format(numAs, numBs))
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
<ipython-input-3-644c3d36df0c> in <module>()
----> 1 numAs = logData.filter(lambda s: 'a' in s).count()
2 numBs = logData.filter(lambda s: 'b' in s).count()
3 print("Lines with a:{} , lines with b: {}".format(numAs, numBs))
/opt/spark/python/pyspark/rdd.py in count(self)
1071 3
1072 """
-> 1073 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1074
1075 def stats(self):
/opt/spark/python/pyspark/rdd.py in sum(self)
1062 6.0
1063 """
-> 1064 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
1065
1066 def count(self):
/opt/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
933 # zeroValue provided to each partition is unique from the one provided
934 # to the final reduce call
--> 935 vals = self.mapPartitions(func).collect()
936 return reduce(op, vals, zeroValue)
937
/opt/spark/python/pyspark/rdd.py in collect(self)
832 """
833 with SCCallSiteSync(self.context) as css:
--> 834 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
835 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
836
/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1253 proto.END_COMMAND_PART
1254
-> 1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
1257 answer, self.gateway_client, self.target_id, self.name)
/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in send_command(self, command, retry, binary)
983 connection = self._get_connection()
984 try:
--> 985 response = connection.send_command(command)
986 if binary:
987 return response, self._create_connection_guard(connection)
/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in send_command(self, command)
1150
1151 try:
-> 1152 answer = smart_decode(self.stream.readline()[:-1])
1153 logger.debug("Answer received: {0}".format(answer))
1154 if answer.startswith(proto.RETURN_MESSAGE):
~/anaconda3/lib/python3.6/socket.py in readinto(self, b)
584 while True:
585 try:
--> 586 return self._sock.recv_into(b)
587 except timeout:
588 self._timeout_occurred = True
KeyboardInterrupt:
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/home/hadooper/anaconda3/lib/python3.6/threading.py", line 551, in wait
signaled = self._cond.wait(timeout)
File "/home/hadooper/anaconda3/lib/python3.6/threading.py", line 295, in wait
waiter.acquire()
File "/opt/spark/python/pyspark/context.py", line 245, in signal_handler
self.cancelAllJobs()
File "/opt/spark/python/pyspark/context.py", line 983, in cancelAllJobs
self._jsc.sc().cancelAllJobs()
AttributeError: 'NoneType' object has no attribute 'sc'
In [ ]:
Content source: waue0920/hadoop_example
Similar notebooks: