In [1]:
# Import the Kotta module
from kotta import Kotta, KottaJob
from kotta.kotta_functions import *
# Create a Kotta Connection using Login with Amazon credentials
# The token from Kotta is stored in the auth.file
konn = Kotta(open('../auth.file').read())
In [2]:
''' A typical python function
my_sum takes a list of numbers and returns the sum
'''
def my_sum(items):
return sum(items)
result = my_sum (range(0,100))
print(result)
Let's send the function to run remotely on Kotta.
The Kotta decorator @kottajob takes a connection to Kotta, the queue to send your jobs to, and the walltime for your application.
The decorated functions take and return python objects as usual.
The functions block by default, which means that the next statement will not execute until my_sum() has completed execution.
In [3]:
@kottajob(konn, 'Test', 5)
def my_sum(items):
return sum(items)
result = my_sum (range(0,100))
print(result)
Let's run the same function but with non-blocking behavior. When a decorated function is called with (block=False), a job object is immediately returned and the python moves on to the next statement. The job object has a status(Kotta_Conn) function which returns the status of the job. Alternatively, you could call the get_returns(Kotta_Conn) function which returns the python object returned from the my_func() once execution is complete.
In [4]:
''' Now let's send the function to run remotely on Kotta
The decorated functions take and return python objects, when run in non-blocking mode.
Non-blocking is the default behavior.
'''
@kottajob(konn, 'Test', 5, block=False)
def my_sum(items):
import time
time.sleep(5)
return sum(items)
job_hndl = my_sum (range(0,100))
print(job_hndl)
In [8]:
status = job_hndl.status(konn)
print("Status : ", status)
result = job_hndl.get_results(konn)
print("Result : ", result)
In [10]:
#print(job_hndl.STDERR)
#print(job_hndl.STDOUT)
Code makes mistakes, the people who write code make mistakes. Either way, we need to be able to tell easily what failed, especially when our code runs remotely. Towards debuggability, kottajob are always log the STDOUT and STDERR. These are accessible from the job objects, as shown below.
In [ ]:
@kottajob(konn, 'Test', 5, block=True)
def numpy_sum(items):
np_array = numpy.array(items)
return numpy.sum(np_array)
job = numpy_sum(range(0,100))
In [ ]:
# Check the outputs
# Work in progress!!!
print(job.outputs)
print(job.status(konn))
print(job.STDOUT)
In [ ]:
### Easy Parallelism
@kottajob(konn, 'Test', 5, block=False)
def my_sum(items):
import time
time.sleep(2)
return sum(items)
jobs = []
for item in range(0,100,20):
jobs.extend([ my_sum(range(item,item+20)) ])
print(jobs)
In [ ]:
[job.wait(konn) for job in jobs]
In [ ]:
returns_array = [job.get_results(konn) for job in jobs]
print(returns_array)
sum(returns_array)
In [ ]:
@kottajob(konn, 'Test', 10)
def file_sum(inputs=[]):
import os
print(os.listdir('.'))
data = [ int(line.strip()) for line in open(os.path.basename(inputs[0]), 'r').readlines() ]
total = sum(data)
length= len(data)
return total, length
returns = file_sum(inputs=['s3://klab-jobs/1m_shuffled.txt'])
print(returns)
In [ ]:
import numpy as np
np.sum(np.array(range(0,10)))
In [ ]:
import numpy as np
@kottajob(konn, 'Test', 10, requirements='numpy')
def numpy_chunk_adder(data):
import numpy as np
total = np.sum(data)
return total
returns = []
for i in range(0,100,20):
returns.extend([numpy_chunk_adder(np.array(range(i,i+20)))])
print(returns)
In [ ]: