In [1]:
# some utility imports
from __future__ import print_function
from pprint import pprint
from matplotlib import pyplot as plt
# main imports
import numpy
import distarray
# reduce precision on printed array values
numpy.set_printoptions(precision=2)
# display figures inline
%matplotlib inline
In [2]:
# a 4-row 5-column NumPy array with random contents
nparr = numpy.random.random((4, 5))
nparr
Out[2]:
In [3]:
# NumPy array attributes
print("type:", type(nparr))
print("dtype:", nparr.dtype)
print("ndim:", nparr.ndim)
print("shape:", nparr.shape)
print("itemsize:", nparr.itemsize)
print("nbytes:", nparr.nbytes)
In [4]:
# First we need a `Context` object. More on this later.
# For now, think of this object like the `NumPy` module.
# `Context`s manage the worker engines for us.
from distarray.globalapi import Context
context = Context()
In [5]:
# Make a DistArray from a NumPy array.
# This will push sections of the original NumPy array out
# to the engines.
darr = context.fromarray(nparr)
darr
Out[5]:
In [6]:
# Print the array section stored on each engine
for i, a in enumerate(darr.get_localarrays()):
print(i, a)
In [7]:
# DistArrays have similar attributes to NumPy arrays,
print("type:", type(darr))
print("dtype:", darr.dtype)
print("ndim:", darr.ndim)
print("shape:", darr.shape)
print("itemsize:", darr.itemsize)
print("nbytes:", darr.nbytes)
In [8]:
# and some additional attributes.
print("targets:", darr.targets)
print("context:", darr.context)
print("distribution:", darr.distribution)
In [9]:
# NumPy provides `ufuncs`, or Universal Functions, that operate
# elementwise over NumPy arrays.
numpy.sin(nparr)
Out[9]:
In [10]:
# DistArray provides ufuncs as well, for `DistArray`s.
import distarray.globalapi as da
da.sin(darr)
Out[10]:
In [11]:
# `toarray` makes a NumPy array out of a DistArray, pulling all of the
# pieces back to the client. We do this to display the contents of the
# DistArray.
da.sin(darr).toarray()
Out[11]:
In [12]:
# A NumPy binary ufunc.
nparr + nparr
Out[12]:
In [13]:
# The equivalent DistArray ufunc.
# Notice that a new DistArray is created without
# pulling data back to the client.
darr + darr
Out[13]:
In [14]:
# Contents of the resulting DistArray.
(darr + darr).toarray()
Out[14]:
In [15]:
# NumPy sum
print("sum:", nparr.sum())
print("sum over an axis:", nparr.sum(axis=1))
In [16]:
# DistArray sum
print("sum:", darr.sum(), darr.sum().toarray())
print("sum over an axis:", darr.sum(axis=1), darr.sum(axis=1).toarray())
In [17]:
# Our example array, as a reminder:
darr.toarray()
Out[17]:
In [18]:
# The shapes of the local sections of our DistArray
darr.localshapes()
Out[18]:
In [19]:
# Return the value of a single element
darr[0, 2]
Out[19]:
In [20]:
# Take a column slice
darr_view = darr[:, 3] # all rows, third column
print(darr_view)
print(darr_view.toarray())
In [21]:
# Slices return a new DistArray that is a view on the
# original, just like in NumPy.
# Changes in the view change the original array.
darr_view[3] = -0.99
print("view:")
print(darr_view.toarray())
print("original:")
print(darr.toarray())
In [22]:
# A more complex slice, with negative indices and a step.
print(darr[:, 2::2])
print(darr[:-1, 2::2].toarray())
In [23]:
# Incomplete indexing
# Grab the first row
darr[0]
Out[23]:
In [24]:
# Let's look at the `Distribution` object that was created for us
# automatically by `fromarray`.
distribution = darr.distribution
In [25]:
# This is a 2D distribution: its 0th dimension is Block-distributed,
# and it's 1st dimension isn't distributed.
pprint(distribution.maps)
In [26]:
# Plot this Distribution, color-coding which process each global index
# belongs to.
from distarray.plotting import plot_array_distribution
process_coords = [(0, 0), (1, 0), (2, 0), (3, 0)]
plot_array_distribution(darr, process_coords, cell_label=False, legend=True)
Out[26]:
In [27]:
# Check out which sections of this array's 0th dimension are on
# each process.
distribution.maps[0].bounds
Out[27]:
The Distribution above was created for us by fromarray,
but DistArray lets us specify more complex distributions.
Here, we specify that the 0th dimension has a Block distribution ('b') and the 1st dimension has a Cyclic distribution.
DistArray supports Block, Cyclic, Block-Cyclic, Unstructured, and No-distribution dimensions. See the ScaLAPACK Documentation for more information about Distribution types.
In [28]:
from distarray.globalapi import Distribution
distribution = Distribution(context, shape=(64, 64), dist=('b', 'c'))
a = context.zeros(distribution, dtype='int32')
plot_array_distribution(a, process_coords, cell_label=False, legend=True)
Out[28]:
In [29]:
darr
Out[29]:
In [30]:
darr.toarray()
Out[30]:
In [31]:
# simple reshaping
reshaped = darr.distribute_as((10, 2))
reshaped
Out[31]:
In [32]:
reshaped.toarray()
Out[32]:
In [33]:
# A more complex resdistribution,
# changing shape, dist, and targets
dist = Distribution(context, shape=(5, 4),
dist=('b', 'b'), targets=(1, 3))
darr.distribute_as(dist)
Out[33]:
In [34]:
print("targets:", context.targets)
print("comm:", context.comm)
In [35]:
context.zeros((5, 3))
Out[35]:
In [36]:
context.ones((20, 20))
Out[36]:
In [37]:
# load .npy files in parallel
numpy.save("/tmp/outfile.npy", nparr)
distribution = Distribution(context, nparr.shape)
new_darr = context.load_npy("/tmp/outfile.npy", distribution)
new_darr
Out[37]:
In [38]:
# save to .dnpy (a built-in flat-file format based on .npy)
context.save_dnpy("/tmp/outfile", darr)
In [39]:
# load from .dnpy
context.load_dnpy("/tmp/outfile")
Out[39]:
In [40]:
# save DistArrays to .hdf5 files in parallel
context.save_hdf5("/tmp/outfile.hdf5", darr, mode='w')
In [41]:
# load DistArrays from .hdf5 files in parallel (using h5py)
context.load_hdf5("/tmp/outfile.hdf5", distribution)
Out[41]:
Global view, local control. The apply method on a Context allows you to write functions that are applied locally (that is, on the engines) to each section of a DistArray. This allows you to push your computation close to your data, avoiding communication round-trips and possibly speeding up your computations.
In [42]:
def get_local_random():
"""Function to be applied locally."""
import numpy
return numpy.random.randint(10)
context.apply(get_local_random)
Out[42]:
In [43]:
def get_local_var(darr):
"""Another local computation."""
return darr.ndarray.var()
context.apply(get_local_var, args=(darr.key,))
Out[43]:
In [44]:
def local_demean(la):
"""Return the local array with the mean removed."""
return la.ndarray - la.ndarray.mean()
context.register(local_demean)
In [45]:
context.local_demean(darr)
Out[45]:
Instead of using an IPython client (which uses ZeroMQ to communicate to the engines), you can run your DistArray code in MPI-only mode (using an extra MPI process for the client). This can be more performant.
In [46]:
# an example script to run in MPI-only mode
%cd julia_set
!python benchmark_julia.py -h
In [47]:
# Compile kernel.pyx
!python setup.py build_ext --inplace
# Run the benchmarking script with 5 MPI processes:
# 4 worker processes and 1 client process
!mpiexec -np 5 python benchmark_julia.py --kernel=cython -r1 1024
Already have a library with its own distributed arrays? Use the Distributed Array Protocol to work with DistArray.
The Distributed Array Protocol (DAP) is a process-local protocol that allows two subscribers, called the "producer" and the "consumer" or the "exporter" and the "importer", to communicate the essential data and metadata necessary to share a distributed-memory array between them. This allows two independently developed components to access, modify, and update a distributed array without copying. The protocol formalizes the metadata and buffers involved in the transfer, allowing several distributed array projects to collaborate, facilitating interoperability. By not copying the underlying array data, the protocol allows for efficient sharing of array data.
http://distributed-array-protocol.readthedocs.org/en/rel-0.9.0/
In [48]:
def return_protocol_structure(la):
return la.__distarray__()
context.apply(return_protocol_structure, (darr.key,))
Out[48]:
This material is based upon work supported by the Department of Energy under Award Number DE-SC0007699.
This report was prepared as an account of work sponsored by an agency of the United States Government. Neither the United States Government nor any agency thereof, nor any of their employees, makes any warranty, express or implied, or assumes any legal liability or responsibility for the accuracy, completeness, or usefulness of any information, apparatus, product, or process disclosed, or represents that its use would not infringe privately owned rights. Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or otherwise does not necessarily constitute or imply its endorsement, recommendation, or favoring by the United States Government or any agency thereof. The views and opinions of authors expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof.