Dask is a flexible parallel computing library for analytic computing.
Dask is composed of two components:
pip3 install dask로 하면 dask 베이직만 설치됨.
We do this so that users of the lightweight core dask scheduler aren’t required to download the more exotic dependencies of the collections (numpy, pandas, etc.).
In [2]:
import dask
import pandas as pd
In [34]:
df = pd.read_csv('./user_log_2018_01_01.csv')
In [35]:
df
Out[35]:
In [7]:
import dask.dataframe as dd
In [36]:
dask_df = dd.read_csv('./user_log_2018_01_01.csv')
In [37]:
dask_df
Out[37]:
In [38]:
dir(dask_df)
Out[38]:
In [13]:
dask_df["0"]
Out[13]:
In [18]:
dask_df.index
Out[18]:
In [21]:
len(dask_df.index)
Out[21]:
In [22]:
dask_df.info
Out[22]:
In [52]:
dask_df.head()
Out[52]:
In [42]:
dask_df["user_id"].sum().compute()
Out[42]:
In [43]:
dask_df["event_cnt"].sum().compute()
Out[43]:
In [47]:
dask_df[dask_df["event_cnt"]>1].sum().compute()
Out[47]:
In [56]:
from dask.distributed import Client
In [61]:
client2 = Client()
# client = Client(process=False)
In [62]:
dask_df["event_cnt"].sum().compute(scheduler='client2')
Out[62]:
In [ ]:
# with
with dask.config.set(scheduler='threads'):
x.compute()
y.compute()
In [ ]:
# global setting
dask.config.set(scheduler='threads')
In [63]:
from dask.distributed import Client, LocalCluster
In [64]:
cluster = LocalCluster()
class distributed.deploy.local.LocalCluster(n_workers=None, threads_per_worker=None, processes=True, loop=None, start=None, ip=None, scheduler_port=0, silence_logs=30, diagnostics_port=8787, services={}, worker_services={}, service_kwargs=None, asynchronous=False, **worker_kwargs)
In [66]:
client = Client(cluster)
In [67]:
cluster
In [68]:
client
Out[68]:
In [ ]:
### Add a new worker to the cluster
In [74]:
w = cluster.start_worker(ncores=2)
In [75]:
cluster.stop_worker(w)
dask-worker tcp://192.0.0.100:8786