Demo 1

Simple startup: python -m kitchensink.scripts.start --num-workers 2

This auto starts Redis, and runs 2 workers for you, pointing the data storage directory at /tmp


In [1]:
from kitchensink import setup_client, client
from kitchensink import do, dp, du
from kitchensink import settings
from kitchensink.admin import timethis
setup_client('http://localhost:6323/')
c = client()

In [2]:
def add(x, y):
    return x + y
c.cs(add, 1, 2)


Out[2]:
3

Here - cs is short for call_single, execute a single function remotely, and return the result


In [3]:
def double(x):
    return 2 * x
c.map(double, [1,2,3,4,5])


Out[3]:
[2, 4, 6, 8, 10]

We also have a map function, duplicating the builtin map, but the function you pass into map needs to take one parameter. The most flexible pattern for using kitchensink, is to use bc (bulk_call), execute, and the br (bulk_results)


In [4]:
for counter in range(10):
    c.bc(add, 1, 2)
c.execute()
c.br()


Out[4]:
[3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

bc - queues up a function for execution(locally), execute sends all the function calls to the server, br, pulls down the results

Error Handling


In [5]:
import time
def long_running_computation_with_some_progress_output():
    print 'starting'
    time.sleep(1)
    print 'finished step1'
    time.sleep(1)
    print 'done'
c.cs(long_running_computation_with_some_progress_output)


starting
finished step1
done

In [6]:
c.cs(add, "some string", 1)


---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-6-1aa819721224> in <module>()
----> 1 c.cs(add, "some string", 1)

/home/hugoshi/work/kitchensink/kitchensink/clients/http.pyc in call_single(self, func, *args, **kwargs)
    291         self.bc(func, *args, **kwargs)
    292         self.execute()
--> 293         return self.br()[0]
    294     cs = call_single
    295 

/home/hugoshi/work/kitchensink/kitchensink/clients/http.pyc in bulk_results(self, profile)
    205             return self.bulk_results_local()
    206         try:
--> 207             retval = self.bulk_async_result(self.jids)
    208         except KeyboardInterrupt as e:
    209             self.bulk_cancel(self.jids)

/home/hugoshi/work/kitchensink/kitchensink/clients/http.pyc in bulk_async_result(self, job_ids, timeout)
    169                 if metadata['status'] == Status.FAILED:
    170                     self.bulk_cancel(to_query)
--> 171                     raise Exception(data)
    172                 elif metadata['status'] == Status.FINISHED:
    173                     results[job_id] = data

Exception: Traceback (most recent call last):
  File "kitchensink/taskqueue/objs.py", line 260, in perform_job
    rv = job.perform()
  File "kitchensink/taskqueue/objs.py", line 147, in perform
    self._result = self.func(*self.args, **self.kwargs)
  File "kitchensink/rpc/__init__.py", line 291, in execute_msg
    result = _execute_msg(msg)
  File "kitchensink/rpc/__init__.py", line 241, in _execute_msg
    result = func(*args, **kwargs)
  File "<ipython-input-2-ce8645436ac5>", line 2, in add
TypeError: cannot concatenate 'str' and 'int' objects

Parallel Computation


In [10]:
from kitchensink.admin import timethis

def dummy_slow_function():
    print settings.rpc_url
    time.sleep(1)
with timethis('slow'):
    dummy_slow_function()
    dummy_slow_function()


http://localhost:6323/
http://localhost:6323/
slow : 2.002 seconds

In [11]:
with timethis('parallel'):
    for counter in range(4):
        c.bc(dummy_slow_function)
    c.execute()
    c.br()


http://localhost:6234/
http://localhost:6323/
http://localhost:6323/
http://localhost:6234/
parallel : 1.033 seconds

In [12]:
c.hosts()


Out[12]:
{'http://localhost:6234/': 'http://localhost:6234/',
 'http://localhost:6323/': 'http://localhost:6323/'}

In [13]:
import numpy as np
import pandas as pd
size = 1000000
dummy_data1 = pd.DataFrame({'a' : np.random.random(size), 'b' : np.random.random(size)})
dummy_data2 = pd.DataFrame({'a' : np.random.random(size), 'b' : np.random.random(size)})
from kitchensink import do, dp, du
c.reducetree('*')
obj1 = do(dummy_data1)
obj1.rpc_url = 'http://localhost:6323/'
obj1.save(url='dummy1')
obj2 = do(dummy_data2)
obj2.rpc_url = 'http://localhost:6234/'
obj2.save(url='dummy2')


finished

In [14]:
c.path_search('*')


finished
Out[14]:
['dummy1', 'dummy2']

In [15]:
c.data_info(['dummy1', 'dummy2'])


Out[15]:
({'http://localhost:6234/': 'http://localhost:6234/',
  'http://localhost:6323/': 'http://localhost:6323/'},
 {'dummy1': ({'http://localhost:6323/'},
   {'data_type': 'object',
    'fmt': 'cloudpickle',
    'size': 24000647,
    'state': 'ready'}),
  'dummy2': ({'http://localhost:6234/'},
   {'data_type': 'object',
    'fmt': 'cloudpickle',
    'size': 24000647,
    'state': 'ready'})})

In [32]:
def calc_sum(obj):
    with timethis('extracting df'):
        df = obj.obj()
    print settings.rpc_url, obj.data_url
    with timethis('sum'):
        return df.sum()
c.bc(calc_sum, du('dummy1'))
c.bc(calc_sum, du('dummy2'))
c.execute()
c.br()


http://localhost:6234/ dummy2
http://localhost:6323/ dummy1
Out[32]:
[a    499943.678429
 b    499843.238526
 dtype: float64, a    499998.93295
 b    500212.42850
 dtype: float64]

In [36]:
c.bc(calc_sum, du('dummy1'), _no_route_data=True)
c.bc(calc_sum, du('dummy2'), _no_route_data=True)
c.execute()
c.br()


1416592859.397874:14777:kitchensink.data.catalog:retrieving dummy2 from http://localhost:6234/
1416592859.399016:14776:kitchensink.data.catalog:retrieving dummy1 from http://localhost:6323/
http://localhost:6234/ dummy1
http://localhost:6323/ dummy2
Out[36]:
[a    499943.678429
 b    499843.238526
 dtype: float64, a    499998.93295
 b    500212.42850
 dtype: float64]

In [37]:
c.bc(calc_sum, du('dummy1'))
c.bc(calc_sum, du('dummy2'))
c.execute()
c.br(profile='sum')


http://localhost:6234/ dummy2
http://localhost:6323/ dummy1
sum took 0.0762531757355
extracting df    0.051909
result save      0.000367
sum              0.070048
start_spread     0.000519
end_spread       0.002318
dtype: float64
sum unmeasured_overhead 0.0150913000107
sum runtime_overhead 0.00864958763123
sum result delay 0.00341200828552
sum complete 1416592937.32
Out[37]:
[a    499943.678429
 b    499843.238526
 dtype: float64, a    499998.93295
 b    500212.42850
 dtype: float64]

In [ ]: