In [2]:
from dask.distributed import Client
c = Client()
import dask.dataframe as dd

In [3]:
s = dd.read_parquet('/bigdata/subway.parquet')

In [6]:
s = c.persist(s.set_index('ca', npartitions=200))

In [7]:
s.to_parquet('/bigdata/subway2.parquet')

In [8]:
s.head()


Out[8]:
unit scp station linename division description cumul_entries cumul_exits
ca
55 R508 00-00-01 NULL NULL NULL REGULAR 24 1
55 R508 00-00-01 NULL NULL NULL REGULAR 24 1
55 R508 00-00-01 NULL NULL NULL REGULAR 24 1
55 R508 00-00-01 NULL NULL NULL REGULAR 24 1
55 R508 00-00-01 NULL NULL NULL REGULAR 24 1

In [19]:
s[s.index == '55']


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-19-364a0d2d13bd> in <module>()
----> 1 s[s.index == '55']

/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/dataframe/core.py in <lambda>(self, other)
    970             return lambda self, other: elemwise(op, other, self)
    971         else:
--> 972             return lambda self, other: elemwise(op, self, other)
    973 
    974     def rolling(self, window, min_periods=None, freq=None, center=False,

/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
   2817         meta = _emulate(op, *args, **kwargs)
   2818 
-> 2819     return new_dd_object(dsk, _name, meta, divisions)
   2820 
   2821 

/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/dataframe/core.py in new_dd_object(dsk, _name, meta, divisions)
     77     Decides the appropriate output class based on the type of `meta` provided.
     78     """
---> 79     return _get_return_type(meta)(dsk, _name, meta, divisions)
     80 
     81 

/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/dataframe/core.py in __init__(self, dsk, name, meta, divisions)
    101         self.dask = dsk
    102         self._name = name
--> 103         meta = make_meta(meta)
    104         if isinstance(meta, (pd.DataFrame, pd.Series, pd.Index)):
    105             raise TypeError("Expected meta to specify scalar, got "

/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/dataframe/utils.py in make_meta(x, index)
    304         return _nonempty_scalar(x)
    305 
--> 306     raise TypeError("Don't know how to create metadata from {0}".format(x))
    307 
    308 

TypeError: Don't know how to create metadata from [False False]

In [13]:
s.index.unique().compute()


Out[13]:
0         55
1       A002
2       A006
3       A007
4       A010
5       A011
6       A013
7       A014
8       A015
9       A016
10      A021
11      A022
12      A025
13      A027
14      A029
15      A030
16      A031
17      A033
18      A034
19      A035
20      A037
21      A038
22      A039
23      A041
24      A042
25      A043
26      A046
27      A047
28      A049
29      A050
       ...  
724     R624
725     R625
726     R626
727     R627
728     R628
729     R629
730     R630
731     R632
732     R633
733     R634
734     R635
735     R636
736     R637
737     R639
738     R641
739     R643
740     R644
741     R645
742     R646
743     R647
744     R726
745     R727
746     R728
747     R729
748     R730
749     S101
750    S101A
751     S102
752    TRAM1
753    TRAM2
Name: ca, dtype: object

In [4]:
zz = s['ca unit scp station'.split()].groupby(['ca', 'unit', 'scp']).count().compute()

In [9]:
k0 = s[(s.ca=='A002') & (s.unit=='R051') & (s.scp=='02-00-00')].compute()


distributed.batched - INFO - Batched Comm Closed: ConnectionResetError: [Errno 104] Connection reset by peer
distributed.utils - ERROR - ("('getitem-dd827194738d781422cddd0e8dc97ff0', 38)", 'tcp://127.0.0.1:44991')
Traceback (most recent call last):
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/utils.py", line 193, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/client.py", line 1486, in _get
    result = yield self._gather(packed)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/client.py", line 939, in _gather
    traceback)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/six.py", line 686, in reraise
    raise value
distributed.scheduler.KilledWorker: ("('getitem-dd827194738d781422cddd0e8dc97ff0', 38)", 'tcp://127.0.0.1:44991')
---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<ipython-input-9-7f13d1cc3ea8> in <module>()
----> 1 k0 = s[(s.ca=='A002') & (s.unit=='R051') & (s.scp=='02-00-00')].compute()

/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
     93             Extra keywords to forward to the scheduler ``get`` function.
     94         """
---> 95         (result,) = compute(self, traverse=False, **kwargs)
     96         return result
     97 

/home/shekhar/anaconda3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
    200     dsk = collections_to_dsk(variables, optimize_graph, **kwargs)
    201     keys = [var._keys() for var in variables]
--> 202     results = get(dsk, keys, **kwargs)
    203 
    204     results_iter = iter(results)

/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, **kwargs)
   1523         return sync(self.loop, self._get, dsk, keys, restrictions=restrictions,
   1524                     loose_restrictions=loose_restrictions,
-> 1525                     resources=resources)
   1526 
   1527     def _optimize_insert_futures(self, dsk, keys):

/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    202         e.wait(1000000)
    203     if error[0]:
--> 204         six.reraise(*error[0])
    205     else:
    206         return result[0]

/home/shekhar/anaconda3/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    684         if value.__traceback__ is not tb:
    685             raise value.with_traceback(tb)
--> 686         raise value
    687 
    688 else:

/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/utils.py in f()
    191                 raise RuntimeError("sync() called from thread of running loop")
    192             yield gen.moment
--> 193             result[0] = yield gen.maybe_future(func(*args, **kwargs))
    194         except Exception as exc:
    195             logger.exception(exc)

/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1013 
   1014                     try:
-> 1015                         value = future.result()
   1016                     except Exception:
   1017                         self.had_exception = True

/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
    235             return self._result
    236         if self._exc_info is not None:
--> 237             raise_exc_info(self._exc_info)
    238         self._check_done()
    239         return self._result

/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)

/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1019 
   1020                     if exc_info is not None:
-> 1021                         yielded = self.gen.throw(*exc_info)
   1022                         exc_info = None
   1023                     else:

/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/client.py in _get(self, dsk, keys, restrictions, loose_restrictions, resources, raise_on_error)
   1484         packed = pack_data(keys, futures)
   1485         try:
-> 1486             result = yield self._gather(packed)
   1487         except Exception as e:
   1488             if raise_on_error:

/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1013 
   1014                     try:
-> 1015                         value = future.result()
   1016                     except Exception:
   1017                         self.had_exception = True

/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
    235             return self._result
    236         if self._exc_info is not None:
--> 237             raise_exc_info(self._exc_info)
    238         self._check_done()
    239         return self._result

/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)

/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1019 
   1020                     if exc_info is not None:
-> 1021                         yielded = self.gen.throw(*exc_info)
   1022                         exc_info = None
   1023                     else:

/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/client.py in _gather(self, futures, errors)
    937                             six.reraise(type(exception),
    938                                         exception,
--> 939                                         traceback)
    940                     if errors == 'skip':
    941                         bad_keys.add(key)

/home/shekhar/anaconda3/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    684         if value.__traceback__ is not tb:
    685             raise value.with_traceback(tb)
--> 686         raise value
    687 
    688 else:

KilledWorker: ("('getitem-dd827194738d781422cddd0e8dc97ff0', 38)", 'tcp://127.0.0.1:44991')
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f2bf807e730>, <tornado.concurrent.Future object at 0x7f2bf8002438>)
Traceback (most recent call last):
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 604, in _run_callback
    ret = callback()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 619, in <lambda>
    self.add_future(ret, lambda f: f.result())
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 286, in _watch
    yield self.instantiate()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 285, in wrapper
    yielded = next(result)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 214, in instantiate
    self.process.start()
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/context.py", line 281, in _Popen
    return Popen(process_obj)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 36, in __init__
    super().__init__(process_obj)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 52, in _launch
    self.sentinel, w = forkserver.connect_to_new_process(self._fds)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/forkserver.py", line 66, in connect_to_new_process
    client.connect(self._forkserver_address)
ConnectionRefusedError: [Errno 111] Connection refused
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f2bf8070950>, <tornado.concurrent.Future object at 0x7f2bf806d5f8>)
Traceback (most recent call last):
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 604, in _run_callback
    ret = callback()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 619, in <lambda>
    self.add_future(ret, lambda f: f.result())
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 286, in _watch
    yield self.instantiate()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 285, in wrapper
    yielded = next(result)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 214, in instantiate
    self.process.start()
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/context.py", line 281, in _Popen
    return Popen(process_obj)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 36, in __init__
    super().__init__(process_obj)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 52, in _launch
    self.sentinel, w = forkserver.connect_to_new_process(self._fds)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/forkserver.py", line 66, in connect_to_new_process
    client.connect(self._forkserver_address)
ConnectionRefusedError: [Errno 111] Connection refused
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f2bf8077158>, <tornado.concurrent.Future object at 0x7f2bf8079080>)
Traceback (most recent call last):
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 604, in _run_callback
    ret = callback()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 619, in <lambda>
    self.add_future(ret, lambda f: f.result())
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 286, in _watch
    yield self.instantiate()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 285, in wrapper
    yielded = next(result)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 214, in instantiate
    self.process.start()
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/context.py", line 281, in _Popen
    return Popen(process_obj)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 36, in __init__
    super().__init__(process_obj)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 52, in _launch
    self.sentinel, w = forkserver.connect_to_new_process(self._fds)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/forkserver.py", line 66, in connect_to_new_process
    client.connect(self._forkserver_address)
ConnectionRefusedError: [Errno 111] Connection refused
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f2bf8077bf8>, <tornado.concurrent.Future object at 0x7f2bf8079b38>)
Traceback (most recent call last):
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 604, in _run_callback
    ret = callback()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/ioloop.py", line 619, in <lambda>
    self.add_future(ret, lambda f: f.result())
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 286, in _watch
    yield self.instantiate()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 285, in wrapper
    yielded = next(result)
  File "/home/shekhar/anaconda3/lib/python3.5/site-packages/distributed/nanny.py", line 214, in instantiate
    self.process.start()
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/context.py", line 281, in _Popen
    return Popen(process_obj)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 36, in __init__
    super().__init__(process_obj)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/popen_forkserver.py", line 52, in _launch
    self.sentinel, w = forkserver.connect_to_new_process(self._fds)
  File "/home/shekhar/anaconda3/lib/python3.5/multiprocessing/forkserver.py", line 66, in connect_to_new_process
    client.connect(self._forkserver_address)
ConnectionRefusedError: [Errno 111] Connection refused

In [5]:
zz


Out[5]:
station
ca unit scp
A002 R051 02-00-00 15997
02-00-01 16200
02-03-00 15921
02-03-01 16112
02-03-02 15861
02-03-03 15884
02-03-04 15874
02-03-05 16027
02-03-06 15908
02-05-00 15815
02-05-01 15713
02-06-00 16140
A006 R079 00-00-00 16300
00-00-01 16078
00-00-02 16223
00-00-03 16044
00-00-04 16033
00-03-00 16058
00-03-01 16089
00-03-02 16069
A007 R079 01-05-00 15752
01-05-01 15725
01-06-00 16152
01-06-01 16126
01-06-02 16121
01-06-03 16363
A010 R080 00-00-00 17654
00-00-01 17187
00-00-02 17117
00-00-03 16938
... ... ... ...
PTH22 R540 02-02-01 291
02-02-02 288
02-02-03 292
02-02-04 291
02-02-05 293
02-02-06 292
02-02-07 291
02-03-00 291
02-03-01 291
02-03-02 284
02-03-03 292
02-03-04 291
02-03-05 291
02-03-06 290
02-03-07 284
02-04-00 291
02-04-02 258
02-04-03 257
02-04-04 257
02-04-05 256
02-04-06 257
02-04-07 257
02-05-00 257
02-05-01 257
02-05-02 256
02-05-03 258
02-05-05 250
TRAM1 R468 00-03-00 3
00-03-01 3
00-05-01 3

5217 rows × 1 columns


In [ ]: