In [2]:
from dask.distributed import Client
c = Client()
import dask.dataframe as dd
In [3]:
s = dd.read_parquet('/bigdata/subway.parquet')
In [6]:
s = c.persist(s.set_index('ca', npartitions=200))
In [7]:
s.to_parquet('/bigdata/subway2.parquet')
In [8]:
s.head()
Out[8]:
unit
scp
station
linename
division
description
cumul_entries
cumul_exits
ca
55
R508
00-00-01
NULL
NULL
NULL
REGULAR
24
1
55
R508
00-00-01
NULL
NULL
NULL
REGULAR
24
1
55
R508
00-00-01
NULL
NULL
NULL
REGULAR
24
1
55
R508
00-00-01
NULL
NULL
NULL
REGULAR
24
1
55
R508
00-00-01
NULL
NULL
NULL
REGULAR
24
1
In [19]:
s[s.index == '55']
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-19-364a0d2d13bd> in <module>()
----> 1 s[s.index == '55']
/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/dataframe/core.py in <lambda>(self, other)
970 return lambda self, other: elemwise(op, other, self)
971 else:
--> 972 return lambda self, other: elemwise(op, self, other)
973
974 def rolling(self, window, min_periods=None, freq=None, center=False,
/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
2817 meta = _emulate(op, *args, **kwargs)
2818
-> 2819 return new_dd_object(dsk, _name, meta, divisions)
2820
2821
/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/dataframe/core.py in new_dd_object(dsk, _name, meta, divisions)
77 Decides the appropriate output class based on the type of `meta` provided.
78 """
---> 79 return _get_return_type(meta)(dsk, _name, meta, divisions)
80
81
/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/dataframe/core.py in __init__(self, dsk, name, meta, divisions)
101 self.dask = dsk
102 self._name = name
--> 103 meta = make_meta(meta)
104 if isinstance(meta, (pd.DataFrame, pd.Series, pd.Index)):
105 raise TypeError("Expected meta to specify scalar, got "
/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/dataframe/utils.py in make_meta(x, index)
304 return _nonempty_scalar(x)
305
--> 306 raise TypeError("Don't know how to create metadata from {0}".format(x))
307
308
TypeError: Don't know how to create metadata from [False False]
In [13]:
s.index.unique().compute()
Out[13]:
0 55
1 A002
2 A006
3 A007
4 A010
5 A011
6 A013
7 A014
8 A015
9 A016
10 A021
11 A022
12 A025
13 A027
14 A029
15 A030
16 A031
17 A033
18 A034
19 A035
20 A037
21 A038
22 A039
23 A041
24 A042
25 A043
26 A046
27 A047
28 A049
29 A050
...
724 R624
725 R625
726 R626
727 R627
728 R628
729 R629
730 R630
731 R632
732 R633
733 R634
734 R635
735 R636
736 R637
737 R639
738 R641
739 R643
740 R644
741 R645
742 R646
743 R647
744 R726
745 R727
746 R728
747 R729
748 R730
749 S101
750 S101A
751 S102
752 TRAM1
753 TRAM2
Name: ca, dtype: object
In [4]:
zz = s['ca unit scp station'.split()].groupby(['ca', 'unit', 'scp']).count().compute()
In [9]:
k0 = s[(s.ca=='A002') & (s.unit=='R051') & (s.scp=='02-00-00')].compute()
distributed.batched - INFO - Batched Comm Closed: ConnectionResetError: [Errno 104] Connection reset by peer
distributed.utils - ERROR - ("('getitem-dd827194738d781422cddd0e8dc97ff0', 38)", 'tcp://127.0.0.1:44991')
Traceback (most recent call last):
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/utils.py", line 193, in f
result[0] = yield gen.maybe_future(func(*args, **kwargs))
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/client.py", line 1486, in _get
result = yield self._gather(packed)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/client.py", line 939, in _gather
traceback)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/six.py", line 686, in reraise
raise value
distributed.scheduler.KilledWorker: ("('getitem-dd827194738d781422cddd0e8dc97ff0', 38)", 'tcp://127.0.0.1:44991')
---------------------------------------------------------------------------
KilledWorker Traceback (most recent call last)
<ipython-input-9-7f13d1cc3ea8> in <module>()
----> 1 k0 = s[(s.ca=='A002') & (s.unit=='R051') & (s.scp=='02-00-00')].compute()
/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
93 Extra keywords to forward to the scheduler ``get`` function.
94 """
---> 95 (result,) = compute(self, traverse=False, **kwargs)
96 return result
97
/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
200 dsk = collections_to_dsk(variables, optimize_graph, **kwargs)
201 keys = [var._keys() for var in variables]
--> 202 results = get(dsk, keys, **kwargs)
203
204 results_iter = iter(results)
/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, **kwargs)
1523 return sync(self.loop, self._get, dsk, keys, restrictions=restrictions,
1524 loose_restrictions=loose_restrictions,
-> 1525 resources=resources)
1526
1527 def _optimize_insert_futures(self, dsk, keys):
/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
202 e.wait(1000000)
203 if error[0]:
--> 204 six.reraise(*error[0])
205 else:
206 return result[0]
/home/shekhar/anaconda3/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
684 if value.__traceback__ is not tb:
685 raise value.with_traceback(tb)
--> 686 raise value
687
688 else:
/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/utils.py in f()
191 raise RuntimeError("sync() called from thread of running loop")
192 yield gen.moment
--> 193 result[0] = yield gen.maybe_future(func(*args, **kwargs))
194 except Exception as exc:
195 logger.exception(exc)
/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
1013
1014 try:
-> 1015 value = future.result()
1016 except Exception:
1017 self.had_exception = True
/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
235 return self._result
236 if self._exc_info is not None:
--> 237 raise_exc_info(self._exc_info)
238 self._check_done()
239 return self._result
/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
1019
1020 if exc_info is not None:
-> 1021 yielded = self.gen.throw(*exc_info)
1022 exc_info = None
1023 else:
/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/client.py in _get(self, dsk, keys, restrictions, loose_restrictions, resources, raise_on_error)
1484 packed = pack_data(keys, futures)
1485 try:
-> 1486 result = yield self._gather(packed)
1487 except Exception as e:
1488 if raise_on_error:
/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
1013
1014 try:
-> 1015 value = future.result()
1016 except Exception:
1017 self.had_exception = True
/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
235 return self._result
236 if self._exc_info is not None:
--> 237 raise_exc_info(self._exc_info)
238 self._check_done()
239 return self._result
/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
1019
1020 if exc_info is not None:
-> 1021 yielded = self.gen.throw(*exc_info)
1022 exc_info = None
1023 else:
/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/client.py in _gather(self, futures, errors)
937 six.reraise(type(exception),
938 exception,
--> 939 traceback)
940 if errors == 'skip':
941 bad_keys.add(key)
/home/shekhar/anaconda3/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
684 if value.__traceback__ is not tb:
685 raise value.with_traceback(tb)
--> 686 raise value
687
688 else:
KilledWorker: ("('getitem-dd827194738d781422cddd0e8dc97ff0', 38)", 'tcp://127.0.0.1:44991')
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f2bf807e730>, <tornado.concurrent.Future object at 0x7f2bf8002438>)
Traceback (most recent call last):
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 604, in _run_callback
ret = callback()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/stack_context.py", line 275, in null_wrapper
return fn(*args, **kwargs)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 619, in <lambda>
self.add_future(ret, lambda f: f.result())
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 286, in _watch
yield self.instantiate()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 285, in wrapper
yielded = next(result)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 214, in instantiate
self.process.start()
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/context.py", line 281, in _Popen
return Popen(process_obj)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 36, in __init__
super().__init__(process_obj)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 52, in _launch
self.sentinel, w = forkserver.connect_to_new_process(self._fds)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/forkserver.py", line 66, in connect_to_new_process
client.connect(self._forkserver_address)
ConnectionRefusedError: [Errno 111] Connection refused
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f2bf8070950>, <tornado.concurrent.Future object at 0x7f2bf806d5f8>)
Traceback (most recent call last):
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 604, in _run_callback
ret = callback()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/stack_context.py", line 275, in null_wrapper
return fn(*args, **kwargs)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 619, in <lambda>
self.add_future(ret, lambda f: f.result())
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 286, in _watch
yield self.instantiate()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 285, in wrapper
yielded = next(result)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 214, in instantiate
self.process.start()
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/context.py", line 281, in _Popen
return Popen(process_obj)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 36, in __init__
super().__init__(process_obj)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 52, in _launch
self.sentinel, w = forkserver.connect_to_new_process(self._fds)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/forkserver.py", line 66, in connect_to_new_process
client.connect(self._forkserver_address)
ConnectionRefusedError: [Errno 111] Connection refused
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f2bf8077158>, <tornado.concurrent.Future object at 0x7f2bf8079080>)
Traceback (most recent call last):
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 604, in _run_callback
ret = callback()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/stack_context.py", line 275, in null_wrapper
return fn(*args, **kwargs)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 619, in <lambda>
self.add_future(ret, lambda f: f.result())
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 286, in _watch
yield self.instantiate()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 285, in wrapper
yielded = next(result)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 214, in instantiate
self.process.start()
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/context.py", line 281, in _Popen
return Popen(process_obj)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 36, in __init__
super().__init__(process_obj)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 52, in _launch
self.sentinel, w = forkserver.connect_to_new_process(self._fds)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/forkserver.py", line 66, in connect_to_new_process
client.connect(self._forkserver_address)
ConnectionRefusedError: [Errno 111] Connection refused
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f2bf8077bf8>, <tornado.concurrent.Future object at 0x7f2bf8079b38>)
Traceback (most recent call last):
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 604, in _run_callback
ret = callback()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/stack_context.py", line 275, in null_wrapper
return fn(*args, **kwargs)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 619, in <lambda>
self.add_future(ret, lambda f: f.result())
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 286, in _watch
yield self.instantiate()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 285, in wrapper
yielded = next(result)
File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 214, in instantiate
self.process.start()
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/context.py", line 281, in _Popen
return Popen(process_obj)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 36, in __init__
super().__init__(process_obj)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 52, in _launch
self.sentinel, w = forkserver.connect_to_new_process(self._fds)
File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/forkserver.py", line 66, in connect_to_new_process
client.connect(self._forkserver_address)
ConnectionRefusedError: [Errno 111] Connection refused
In [5]:
zz
Out[5]:
station
ca
unit
scp
A002
R051
02-00-00
15997
02-00-01
16200
02-03-00
15921
02-03-01
16112
02-03-02
15861
02-03-03
15884
02-03-04
15874
02-03-05
16027
02-03-06
15908
02-05-00
15815
02-05-01
15713
02-06-00
16140
A006
R079
00-00-00
16300
00-00-01
16078
00-00-02
16223
00-00-03
16044
00-00-04
16033
00-03-00
16058
00-03-01
16089
00-03-02
16069
A007
R079
01-05-00
15752
01-05-01
15725
01-06-00
16152
01-06-01
16126
01-06-02
16121
01-06-03
16363
A010
R080
00-00-00
17654
00-00-01
17187
00-00-02
17117
00-00-03
16938
...
...
...
...
PTH22
R540
02-02-01
291
02-02-02
288
02-02-03
292
02-02-04
291
02-02-05
293
02-02-06
292
02-02-07
291
02-03-00
291
02-03-01
291
02-03-02
284
02-03-03
292
02-03-04
291
02-03-05
291
02-03-06
290
02-03-07
284
02-04-00
291
02-04-02
258
02-04-03
257
02-04-04
257
02-04-05
256
02-04-06
257
02-04-07
257
02-05-00
257
02-05-01
257
02-05-02
256
02-05-03
258
02-05-05
250
TRAM1
R468
00-03-00
3
00-03-01
3
00-05-01
3
5217 rows × 1 columns
In [ ]:
Content source: r-shekhar/NYC-transport
Similar notebooks: