Dask

  • Dask 공식 문서
  • numpy, pandas, sklearn이랑 통합 가능
  • Dask is a flexible parallel computing library for analytic computing.

  • Dask is composed of two components:

    • Dynamic task scheduling optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.
    • “Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of the dynamic task schedulers.

pip3 install dask로 하면 dask 베이직만 설치됨.

Installation

  • pip install dask[complete]: Install everything
  • pip install dask[array]: Install dask and numpy
  • pip install dask[bag]: Install dask and cloudpickle
  • pip install dask[dataframe]: Install dask, numpy, and pandas
  • pip install dask: Install only dask, which depends only on the standard library. This is appropriate if you only want the task schedulers.

We do this so that users of the lightweight core dask scheduler aren’t required to download the more exotic dependencies of the collections (numpy, pandas, etc.).


In [2]:
import dask 
import pandas as pd

In [34]:
df = pd.read_csv('./user_log_2018_01_01.csv')

In [35]:
df


Out[35]:
user_id event_cnt sex use_phone abc
0 1 0 0 1 0
1 2 1 0 0 1
2 3 0 1 0 0
3 4 0 1 0 0
4 5 1 0 0 1

In [7]:
import dask.dataframe as dd

In [36]:
dask_df = dd.read_csv('./user_log_2018_01_01.csv')

In [37]:
dask_df


Out[37]:
Dask DataFrame Structure:
user_id event_cnt sex use_phone abc
npartitions=1
int64 int64 int64 int64 int64
... ... ... ... ...
Dask Name: from-delayed, 3 tasks

In [38]:
dir(dask_df)


Out[38]:
['_HTML_FMT',
 '__abs__',
 '__add__',
 '__and__',
 '__array__',
 '__array_ufunc__',
 '__array_wrap__',
 '__bool__',
 '__class__',
 '__complex__',
 '__dask_graph__',
 '__dask_keys__',
 '__dask_optimize__',
 '__dask_postcompute__',
 '__dask_postpersist__',
 '__dask_scheduler__',
 '__dask_tokenize__',
 '__delattr__',
 '__delitem__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__float__',
 '__floordiv__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__int__',
 '__invert__',
 '__le__',
 '__len__',
 '__long__',
 '__lt__',
 '__mod__',
 '__module__',
 '__mul__',
 '__ne__',
 '__neg__',
 '__new__',
 '__nonzero__',
 '__or__',
 '__pow__',
 '__radd__',
 '__rand__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__rfloordiv__',
 '__rmod__',
 '__rmul__',
 '__ror__',
 '__rpow__',
 '__rsub__',
 '__rtruediv__',
 '__rxor__',
 '__setattr__',
 '__setitem__',
 '__setstate__',
 '__sizeof__',
 '__slots__',
 '__str__',
 '__sub__',
 '__subclasshook__',
 '__truediv__',
 '__weakref__',
 '__xor__',
 '_args',
 '_bind_comparison_method',
 '_bind_operator',
 '_bind_operator_method',
 '_constructor',
 '_contains_index_name',
 '_cum_agg',
 '_elemwise',
 '_get_binary_operator',
 '_get_numeric_data',
 '_get_unary_operator',
 '_is_column_label_reference',
 '_is_index_level_reference',
 '_meta',
 '_meta_nonempty',
 '_name',
 '_partition_type',
 '_reduction_agg',
 '_repr_data',
 '_repr_divisions',
 '_repr_html_',
 '_scalarfunc',
 '_select_columns_or_index',
 '_token_prefix',
 '_validate_axis',
 'abc',
 'abs',
 'add',
 'align',
 'all',
 'any',
 'append',
 'apply',
 'applymap',
 'assign',
 'astype',
 'bfill',
 'categorize',
 'clear_divisions',
 'clip',
 'clip_lower',
 'clip_upper',
 'columns',
 'combine',
 'combine_first',
 'compute',
 'copy',
 'corr',
 'count',
 'cov',
 'cummax',
 'cummin',
 'cumprod',
 'cumsum',
 'dask',
 'describe',
 'diff',
 'div',
 'divisions',
 'drop',
 'drop_duplicates',
 'dropna',
 'dtypes',
 'eq',
 'eval',
 'event_cnt',
 'ffill',
 'fillna',
 'first',
 'floordiv',
 'ge',
 'get_dtype_counts',
 'get_ftype_counts',
 'get_partition',
 'groupby',
 'gt',
 'head',
 'idxmax',
 'idxmin',
 'index',
 'info',
 'isin',
 'isna',
 'isnull',
 'iterrows',
 'itertuples',
 'join',
 'known_divisions',
 'last',
 'le',
 'loc',
 'lt',
 'map_overlap',
 'map_partitions',
 'mask',
 'max',
 'mean',
 'memory_usage',
 'merge',
 'min',
 'mod',
 'mul',
 'ndim',
 'ne',
 'nlargest',
 'notnull',
 'npartitions',
 'nsmallest',
 'nunique_approx',
 'persist',
 'pipe',
 'pivot_table',
 'pow',
 'prod',
 'quantile',
 'query',
 'radd',
 'random_split',
 'rdiv',
 'reduction',
 'rename',
 'repartition',
 'resample',
 'reset_index',
 'rfloordiv',
 'rmod',
 'rmul',
 'rolling',
 'round',
 'rpow',
 'rsub',
 'rtruediv',
 'sample',
 'select_dtypes',
 'sem',
 'set_index',
 'sex',
 'shift',
 'size',
 'squeeze',
 'std',
 'sub',
 'sum',
 'tail',
 'to_bag',
 'to_csv',
 'to_delayed',
 'to_hdf',
 'to_html',
 'to_parquet',
 'to_records',
 'to_string',
 'to_timestamp',
 'truediv',
 'use_phone',
 'user_id',
 'values',
 'var',
 'visualize',
 'where']

In [13]:
dask_df["0"]


Out[13]:
Dask Series Structure:
npartitions=1
    int64
      ...
Name: 0, dtype: int64
Dask Name: getitem, 4 tasks

In [18]:
dask_df.index


Out[18]:
Dask Index Structure:
npartitions=1
    int64
      ...
dtype: int64
Dask Name: from-delayed, 4 tasks

In [21]:
len(dask_df.index)


Out[21]:
5

In [22]:
dask_df.info


Out[22]:
<bound method DataFrame.info of Dask DataFrame Structure:
                   0      0      1    0.1    0.2
npartitions=1                                   
               int64  int64  int64  int64  int64
                 ...    ...    ...    ...    ...
Dask Name: from-delayed, 3 tasks>

Setup

  • 2 families of task scheduler
    • 1) Single machine scheduler : basic feature, default, does not scale
    • 2) Distributed scheduler : sophisticated, more feature, a bit more effort to set up

In [52]:
dask_df.head()


Out[52]:
user_id event_cnt sex use_phone abc
0 1 0 0 1 0
1 2 1 0 0 1
2 3 0 1 0 0
3 4 0 1 0 0
4 5 1 0 0 1

In [42]:
dask_df["user_id"].sum().compute()


Out[42]:
15

In [43]:
dask_df["event_cnt"].sum().compute()


Out[43]:
2

In [47]:
dask_df[dask_df["event_cnt"]>1].sum().compute()


Out[47]:
user_id      0
event_cnt    0
sex          0
use_phone    0
abc          0
dtype: int64

In [56]:
from dask.distributed import Client

In [61]:
client2 = Client()
# client = Client(process=False)

In [62]:
dask_df["event_cnt"].sum().compute(scheduler='client2')


Out[62]:
2
  • Single Machine
    • Default Scheduler : no-setup, local threads or process for larger than memory processing
    • Dask.distributed : newer system on a single machine. advanced features
  • Distributed computing
    • Manual SEtup : dask-scheduler and dask-worker
    • SSH
    • High Performance Computers
    • Kuberneters
    • Python API
    • Docker
    • Cloud

In [ ]:
# with
with dask.config.set(scheduler='threads'):
    x.compute()
    y.compute()

In [ ]:
# global setting
dask.config.set(scheduler='threads')

LocalCluster


In [63]:
from dask.distributed import Client, LocalCluster

In [64]:
cluster = LocalCluster()

class distributed.deploy.local.LocalCluster(n_workers=None, threads_per_worker=None, processes=True, loop=None, start=None, ip=None, scheduler_port=0, silence_logs=30, diagnostics_port=8787, services={}, worker_services={}, service_kwargs=None, asynchronous=False, **worker_kwargs)


In [66]:
client = Client(cluster)

In [67]:
cluster



In [68]:
client


Out[68]:

Client

  • Scheduler: tcp://127.0.0.1:65529

Cluster

  • Workers: 8
  • Cores: 8
  • Memory: 17.18 GB

In [ ]:
### Add a new worker to the cluster

In [74]:
w = cluster.start_worker(ncores=2)

In [75]:
cluster.stop_worker(w)

Command line

dask-worker tcp://192.0.0.100:8786

SSH

pip3 install paramiko
  • dask-ssh 192.168.0.1 192.168.0.2 192.168.0.3 192.168.0.4
  • dask-ssh 192.168.0.{1,2,3,4}
  • dask-ssh --hostfile hostfile.txt