Here we will look briefly at how to time and profile your code, and then at an approach to making your code run faster. There is a sequence of mini-goals that is applicable to nearly every programming problem:
Note that the list does not start with Make it fast. Testing, debugging and optimization are a set of strategies and practices to achieve those goals. Only optimization will be covered in these notes - pointers to resources for testing and debugging are provided but not covered.
In [1]:
%%file distance.py
import numpy as np
def euclidean_dist(u, v):
"""Returns Euclidean distance betwen numpy vectors u and v."""
w = u - v
return np.sqrt(np.sum(w**2))
In [2]:
%%file test_distance.py
import numpy as np
from numpy.testing import assert_almost_equal
from distance import euclidean_dist
def test_non_negativity():
for i in range(10):
u = np.random.normal(3)
v = np.random.normal(3)
assert euclidean_dist(u, v) >= 0
def test_coincidence_when_zero():
u = np.zeros(3)
v = np.zeros(3)
assert euclidean_dist(u, v) == 0
def test_coincidence_when_not_zero():
for i in range(10):
u = np.random.random(3)
v = np.zeros(3)
assert euclidean_dist(u, v) != 0
def test_symmetry():
for i in range(10):
u = np.random.random(3)
v = np.random.random(3)
assert euclidean_dist(u, v) == euclidean_dist(v, u)
def test_triangle():
u = np.random.random(3)
v = np.random.random(3)
w = np.random.random(3)
assert euclidean_dist(u, w) <= euclidean_dist(u, v) + euclidean_dist(v, w)
def test_known1():
u = np.array([0])
v = np.array([3])
assert_almost_equal(euclidean_dist(u, v), 3)
def test_known2():
u = np.array([0,0])
v = np.array([3, 4])
assert_almost_equal(euclidean_dist(u, v), 5)
def test_known3():
u = np.array([0,0])
v = np.array([-3, -4])
assert_almost_equal(euclidean_dist(u, v), 5)
In [3]:
! py.test
Tools within Jupyter from the official tutorial
After an exception occurs, you can call %debug to jump into the Python debugger (pdb) and examine the problem. Alternatively, if you call %pdb, IPython will automatically start the debugger on any uncaught exception. You can print variables, see code, execute statements and even walk up and down the call stack to track down the true source of the problem. This can be an efficient way to develop and debug code, in many cases eliminating the need for print statements or external debugging tools.
You can also step through a program from the beginning by calling %run -d theprogram.py.
In [4]:
import time
import timeit
def f(nsec=1.0):
"""Function sleeps for nsec seconds."""
time.sleep(nsec)
start = timeit.default_timer()
f()
elapsed = timeit.default_timer() - start
elapsed
Out[4]:
In [5]:
def process_time(f, *args, **kwargs):
def func(*args, **kwargs):
import timeit
start = timeit.default_timer()
f(*args, **kwargs)
print(timeit.default_timer() - start)
return func
In [6]:
@process_time
def f1(nsec=1.0):
"""Function sleeps for nsec seconds."""
time.sleep(nsec)
In [7]:
f1()
In [8]:
%timeit f(0.01)
In [9]:
%timeit -n10 f(0.01)
In [10]:
%timeit -r10 f(0.01)
In [11]:
%timeit -n10 -r3 f(0.01)
This can be done in a notebook with %prun, with the following readouts as column headers:
In [12]:
def foo1(n):
return sum(i**2 for i in range(n))
def foo2(n):
return sum(i*i for i in range(n))
def foo3(n):
[foo1(n) for i in range(10)]
foo2(n)
def bar(n):
return sum(i**3 for i in range(n))
def work(n):
foo1(n)
foo2(n)
foo3(n)
bar(n)
In [13]:
%prun -q -D work.prof work(int(1e6))
In [14]:
import pstats
p = pstats.Stats('work.prof')
p.print_stats()
pass
In [15]:
p.sort_stats('time', 'cumulative').print_stats('foo')
pass
In [16]:
p.sort_stats('ncalls').print_stats(5)
pass
In [17]:
%load_ext memory_profiler
In [18]:
%%file foo.py
def foo(n):
phrase = 'repeat me'
pmul = phrase * n
pjoi = ''.join([phrase for x in range(n)])
pinc = ''
for x in range(n):
pinc += phrase
del pmul, pjoi, pinc
In [19]:
import sys
sys.path.append('.')
In [20]:
# mprun requires the code be in a file
# functions declared interactively in python will not work
from foo import foo
%mprun -f foo foo(100000)
In [21]:
# However, memit does work with interactive functions
# Unlike mprun which gives a line by line analysis
# memit gives the total amount of memory used
def gobble(n):
x = [i*i for i in range(n)]
%memit -r 3 gobble(1000000)
There are many ways to speed up slow code. However, the first thing that should come to mind (after profiling to identify the bottlenecks) is whether there is a more appropriate data structure or algorithm that can be used. The reason is that this is the only approach that makes a difference to the big O complexity, and this makes all the difference for scalability. A few examples are shown here; a large collection of classic data structures and algorithms in Python with detailed explanations is available at Problem Solving wiht Algorithms and Data Structures
You are highly encouraged to take an algorithms class, where you will discover strategies such as:
In [22]:
import numpy as np
xs = np.random.randint(0, 1000, 100)
ys = np.random.randint(0, 1000, 100)
Using lists
In [23]:
def common1(xs, ys):
"""Using lists."""
zs = set([])
for x in xs:
for y in ys:
if x==y:
zs.add(x)
return zs
In [24]:
%timeit -n3 -r3 common1(xs, ys)
Using sets
In [25]:
%timeit -n3 -r3 set(xs) & set(ys)
Using lists
In [26]:
alist = list(np.random.randint(1000, 100000, 1000))
blist = alist[:]
entries = np.random.randint(1, 10000, 10000)
In [27]:
def f1(alist, entries):
"""Using repeated sorts."""
zs = []
for entry in entries:
alist.append(entry)
alist.sort(reverse=True)
zs.append(alist.pop())
return zs
In [28]:
%timeit f1(alist, entries)
Using a heap (priority queue)
In [29]:
from heapq import heappushpop, heapify
In [30]:
def f2(alist, entries):
"""Using a priority queue."""
heapify(alist)
zs = []
for entry in entries:
zs.append(heappushpop(alist, entry))
return zs
In [31]:
%timeit f2(blist, entries)
In [32]:
def concat1(alist):
"""Using string concatenation."""
s = alist[0]
for item in alist[1:]:
s += " " + item
return s
def concat2(alist):
"""Using join."""
return " ".join(alist)
alist = ['abcde'] * 1000000
%timeit -r3 -n3 concat1(alist)
%timeit -r3 -n3 concat2(alist)
In [33]:
"""Avoiding loops."""
import math
def loop1(n):
"""Using for loop with function call."""
z = []
for i in range(n):
z.append(math.sin(i))
return z
def loop2(n):
"""Using local version of function."""
z = []
sin = math.sin
for i in range(n):
z.append(sin(i))
return z
def loop3(n):
"""Using list comprehension."""
sin = math.sin
return [sin(i) for i in range(n)]
def loop4(n):
"""Using map."""
sin = math.sin
return list(map(sin, range(n)))
def loop5(n):
"""Using numpy."""
return np.sin(np.arange(n)).tolist()
n = 1000000
%timeit -r1 -n1 loop1(n)
%timeit -r1 -n1 loop2(n)
%timeit -r1 -n1 loop3(n)
%timeit -r1 -n1 loop4(n)
%timeit -r1 -n1 loop5(n)
assert(np.all(loop1(n) == loop2(n)))
assert(np.all(loop1(n) == loop3(n)))
assert(np.all(loop1(n) == loop4(n)))
assert(np.all(loop1(n) == loop5(n)))
In [34]:
a = np.arange(1e6)
%timeit global a; a = a * 0
%timeit global a; a *= 0
In [35]:
def idx1(xs):
"""Using loops."""
s = 0
for x in xs:
if (x > 10) and (x < 20):
s += x
return s
def idx2(xs):
"""Using logical indexing."""
return np.sum(xs[(xs > 10) & (xs < 20)])
n = 1000000
xs = np.random.randint(0, 100, n)
%timeit -r3 -n3 idx1(xs)
%timeit -r3 -n3 idx2(xs)
In [36]:
def average1(xs):
"""Using loops."""
ys = xs.copy()
rows, cols = xs.shape
for i in range(rows):
for j in range(cols):
s = 0
for u in range(i-1, i+2):
if u < 0 or u >= rows:
continue
for v in range(j-1, j+2):
if v < 0 or v >= cols:
continue
s += xs[u, v]
ys[i, j] = s/9.0
return ys
def average2(xs):
"""Using shifted array views and border to avoid out of bounds checks."""
rows, cols = xs.shape
xs1 = np.zeros((rows+2, cols+2))
xs1[1:-1, 1:-1] = xs[:]
ys = (xs1[:-2, :-2] + xs1[1:-1, :-2] + xs1[2:, :-2] +
xs1[:-2, 1:-1] + xs1[1:-1, 1:-1] + xs1[2:, 1:-1] +
xs1[:-2, 2:] + xs1[1:-1, 2:] + xs1[2:, 2:])/9.0
return ys
n = 25
xs = np.random.uniform(0,10,(n, n))
%timeit -r3 -n3 average1(xs)
%timeit -r3 -n3 average2(xs)
In [37]:
xs = np.random.random((1000, 10))
xs
Out[37]:
In [38]:
ys = np.random.random((1000, 10))
ys
Out[38]:
In [39]:
from numpy.core.umath_tests import inner1d
%timeit -n3 -r3 np.array([x @ y for x, y in zip(xs, ys)])
%timeit -n3 -r3 inner1d(xs, ys)
In [40]:
from numpy.core.umath_tests import matrix_multiply
In [41]:
xs = np.random.randint(0, 10, (500, 2, 2))
ys = np.random.randint(0, 10, (500, 2, 2))
In [42]:
%timeit -n3 -r3 np.array([x @ y for x, y in zip(xs, ys)])
%timeit -r3 -n3 matrix_multiply(xs, ys)
In [43]:
from functools import lru_cache
In [44]:
def fib(n):
if n <= 2:
return 1
else:
return fib(n-1) + fib(n-2)
# A simple example of memoization - in practice, use `lru_cache` from functools
def memoize(f):
store = {}
def func(n):
if n not in store:
store[n] = f(n)
return store[n]
return func
@memoize
def mfib(n):
return fib(n)
@lru_cache()
def lfib(n):
return fib(n)
assert(fib(10) == mfib(10))
assert(fib(10) == lfib(10))
%timeit -r1 -n10 fib(30)
%timeit -r1 -n10 mfib(30)
%timeit -r1 -n10 lfib(30)