In [1]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:33705")
client


/home/bird/miniconda3/envs/ovscrptd/lib/python3.6/site-packages/dask/config.py:168: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
  data = yaml.load(f.read()) or {}
/home/bird/miniconda3/envs/ovscrptd/lib/python3.6/site-packages/distributed/config.py:20: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
  defaults = yaml.load(f)
Out[1]:

Client

Cluster

  • Workers: 4
  • Cores: 12
  • Memory: 33.35 GB

In [2]:
import dask
import dask.dataframe as dd

from random import randint
from time import sleep

In [3]:
# This generally seems terrible pretty sure I shouldn't be using "sleep" for this demonstration

def short_running_func():
    print('short') # If using dask these print statements are likely in your console
    sleep(0.1)
    return 'short'

def time_out_func():
    print('long')  # If using dask these print statements are likely in your console
    result = ''
    try:
        sleep(0.5)
        raise TimeoutError()
        result = 'long'
    except TimeoutError as e:
        result = 'timeout'
    return result
    
def process_column(value):
    result = None
    
    # Randomly pick short or long to demonstrate
    pick = randint(0, 1) 
    if pick == 0:
        result = short_running_func()
    else:
        result = time_out_func()

    return result

In [8]:
# Keep it small for now
df = dask.datasets.timeseries().reset_index()
df = df.loc[0:100,:]
df.head()


/home/bird/miniconda3/envs/ovscrptd/lib/python3.6/site-packages/dask/dataframe/utils.py:391: FutureWarning: Creating a DatetimeIndex by passing range endpoints is deprecated.  Use `pandas.date_range` instead.
  tz=idx.tz, name=idx.name)
Out[8]:
timestamp id name x y
0 2000-01-01 00:00:00 1009 Bob -0.656071 -0.505284
1 2000-01-01 00:00:01 1086 Oliver 0.286447 0.681899
2 2000-01-01 00:00:02 1010 Dan -0.290703 -0.728478
3 2000-01-01 00:00:03 1018 Frank 0.973330 0.356958
4 2000-01-01 00:00:04 998 Oliver -0.849845 0.802483

In [9]:
df['process'] = df['name'].apply(process_column, meta='O')
%time df.head()


CPU times: user 74.6 ms, sys: 26.5 ms, total: 101 ms
Wall time: 28.2 s
Out[9]:
timestamp id name x y process
0 2000-01-01 00:00:00 1009 Bob -0.656071 -0.505284 short
1 2000-01-01 00:00:01 1086 Oliver 0.286447 0.681899 timeout
2 2000-01-01 00:00:02 1010 Dan -0.290703 -0.728478 timeout
3 2000-01-01 00:00:03 1018 Frank 0.973330 0.356958 short
4 2000-01-01 00:00:04 998 Oliver -0.849845 0.802483 timeout

In [ ]: