Distributed Hyper-parameter searching


In [1]:
%matplotlib inline

# Hide info messages from paramiko
import logging
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.WARN)

import time
import random
import threading
import pandas as pd
import numpy as np
import plotly.plotly as py
import plotly.graph_objs as go
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (2, 2)

from distributed import progress, Client
from pprint import pprint

from sklearn.decomposition import PCA
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import cross_val_score
from sklearn.datasets import load_digits

from cclyde.cluster import Cluster

Create and launch AWS instances.


In [2]:
cluster = Cluster(key_name='default_windows', n_nodes=16, cluster_name='default', instance_type='t2.medium')


You can now run .configure()

In [3]:
cluster.configure()


Connecting to Boto3 and EC2 resources...Done.
Checking keypair exists using key_name: "default_windows"...
	Found pem_key: "default_windows"...Done.
Checking for Cluster Clyde's Virtual Private Cloud (cclyde_vpc) on AWS...
	Found existing VPC...Done.
Checking for Cluster Clyde's Subnet in the VPC...
	Found existing subnet...Done.
Validating security group...Found existing cclyde security group, connecting to it...Done.
Configuring security group...
	Working on permission: tcp from port: 22 to port: 22...already exists! Passing.

	Working on permission: tcp from port: 80 to port: 8786...already exists! Passing.

	Working on permission: tcp from port: 80 to port: 8787...already exists! Passing.

	Working on permission: tcp from port: 0 to port: 65535...already exists! Passing.
Done configuring security group.
Checking for Cluster Clyde's internet gateway...found existing cclyde gateway...Done.
Attaching internet gateway to VPC if needed...gateway already associated with VPC...Done.
Confirming proper VPC route table configuration...Found existing route table, confirming proper config...Done.
Everything is configured, you can now run >>> cluster.launch_instances() OR cluster.reconnect_to_cluster()

In [4]:
cluster.launch_instances_nonblocking()


WARNING:Cluster-Clyde:	Once instances are running, you may be accumulating charges from AWS; be sure to run cluster.stop_cluster() *AND* confirm instances are stopped/terminated via AWS console!
All instances in running state, waiting for all to be reachable...
All 16 instances ready!
Setting node names...Done.

MNIST dataset

Grayscale hand-written digits


In [5]:
X, y = load_digits(return_X_y=True)
X = np.asarray([x.flatten() for x in X])

for i in range(3):
    plt.imshow(X[i].reshape((8, 8)), cmap='Greys_r')
    plt.title('Digit: {}'.format(y[i]))
    plt.show()


Train a NN to predict the numbers (as simple as it gets)

This also demonstrates the time problem of adjusting hyper-parameters



In [6]:
pca = PCA(n_components=30)

print 'Features before: ', X.shape[1]

X = pca.fit_transform(X)

print 'Features after: ', X.shape[1]
print '{}% Explained Variance'.format(round(sum(pca.explained_variance_ratio_) * 100, 1))


Features before:  64
Features after:  30
95.9% Explained Variance

Train with some given parameters...


In [7]:
lr = MLPClassifier(hidden_layer_sizes=(10, 5), batch_size=10,
                   solver='sgd', learning_rate_init=0.01, early_stopping=True)

start = time.time()
scores = cross_val_score(estimator=lr,
                         X=X, 
                         y=y,
                         cv=5)

print("\nAccuracy: {}% (+/- {})".format(round(scores.mean() * 100, 2), round(scores.std(), 3) * 2))
print('Finished in {}sec\n'.format(round(time.time() - start, 2)))


Accuracy: 74.5% (+/- 0.104)
Finished in 1.2sec


Alright, how about something else...


In [8]:
lr = MLPClassifier(hidden_layer_sizes=(10, 10,), batch_size=100,
                   solver='sgd', learning_rate_init=0.01, early_stopping=True)

start = time.time()
scores = cross_val_score(estimator=lr,
                         X=X, 
                         y=y,
                         cv=5)

print("\nAccuracy: {}% (+/- {})".format(round(scores.mean() * 100, 2), round(scores.std(), 3) * 2))
print('Finished in {}sec\n'.format(round(time.time() - start, 2)))


Accuracy: 87.33% (+/- 0.072)
Finished in 0.47sec

and now something different than that..


In [9]:
lr = MLPClassifier(hidden_layer_sizes=(10, 10, 10,), batch_size=100,
                   solver='sgd', learning_rate_init=0.01, early_stopping=True)

start = time.time()
scores = cross_val_score(estimator=lr,
                         X=X, 
                         y=y,
                         cv=5)

print("\nAccuracy: {}% (+/- {})".format(round(scores.mean() * 100, 2), round(scores.std(), 3) * 2))
print('Finished in {}sec\n'.format(round(time.time() - start, 2)))


Accuracy: 85.42% (+/- 0.05)
Finished in 0.55sec

Issue: What hyper params are best?

Train for all/most?


In [10]:
# Define hyper parameter ranges
batch_sizes = np.linspace(start=5, stop=750, num=50, dtype=np.int64)
n_layers = range(1, 8, 1)

# Make a list of all combinations
params = []
for batch_size in batch_sizes:
    for n_layer in n_layers:

        n_neuron = np.random.randint(low=5, high=200)
        params.append({'batch_size': batch_size,
                       'hidden_layer_sizes': tuple(n_neuron for _ in range(n_layer)),
                       'solver': 'sgd',
                       'learning_rate_init': 0.01,
                       'early_stopping': True
                      })

print '{} different combinations.'.format(len(params))
pprint(params[:2])


350 different combinations.
[{'batch_size': 5,
  'early_stopping': True,
  'hidden_layer_sizes': (78,),
  'learning_rate_init': 0.01,
  'solver': 'sgd'},
 {'batch_size': 5,
  'early_stopping': True,
  'hidden_layer_sizes': (82, 82),
  'learning_rate_init': 0.01,
  'solver': 'sgd'}]

This will take a while, even if using all cores on a local machine; let's distribute the workload


Before executing the next few blocks, make sure the instances are ready to connect. If launched with the non-blocking thread, we can check if it's done with .is_alive()


In [11]:
print 'Lauching thread is alive: ', cluster.instance_launching_thread.is_alive()


Lauching thread is alive:  False

In [12]:
cluster.install_anaconda()


Installing Anaconda on cluster...


-------------------

Host: 54.158.253.218	Exit Code: 0

-------------------

Host: 54.160.232.175	Exit Code: 0

-------------------

Host: 54.164.10.104	Exit Code: 0

-------------------

Host: 52.90.161.227	Exit Code: 0

-------------------

Host: 54.144.51.1	Exit Code: 0

-------------------

Host: 54.158.38.41	Exit Code: 0

-------------------

Host: 54.165.115.104	Exit Code: 0

-------------------

Host: 54.144.230.38	Exit Code: 0

-------------------

Host: 54.165.190.164	Exit Code: 0

-------------------

Host: 54.159.46.149	Exit Code: 0

-------------------

Host: 54.161.10.20	Exit Code: 0

-------------------

Host: 52.91.215.80	Exit Code: 0

-------------------

Host: 54.87.232.84	Exit Code: 0

-------------------

Host: 54.91.7.223	Exit Code: 0

-------------------

Host: 54.164.125.178	Exit Code: 0

-------------------

Host: 54.165.78.58	Exit Code: 0
Done.

In [13]:
cluster.install_python_packages(['scikit-learn', 'numpy', 'pandas', 'dask', 'futures'], method='conda')


Installing package: scikit-learn
-------------------

Host: 54.158.253.218	Exit Code: 0

-------------------

Host: 54.160.232.175	Exit Code: 0

-------------------

Host: 54.164.10.104	Exit Code: 0

-------------------

Host: 52.90.161.227	Exit Code: 0

-------------------

Host: 54.144.51.1	Exit Code: 0

-------------------

Host: 54.158.38.41	Exit Code: 0

-------------------

Host: 54.165.115.104	Exit Code: 0

-------------------

Host: 54.144.230.38	Exit Code: 0

-------------------

Host: 54.165.190.164	Exit Code: 0

-------------------

Host: 54.159.46.149	Exit Code: 0

-------------------

Host: 54.161.10.20	Exit Code: 0

-------------------

Host: 52.91.215.80	Exit Code: 0

-------------------

Host: 54.87.232.84	Exit Code: 0

-------------------

Host: 54.91.7.223	Exit Code: 0

-------------------

Host: 54.164.125.178	Exit Code: 0

-------------------

Host: 54.165.78.58	Exit Code: 0

Installed scikit-learn

Installing package: numpy
-------------------

Host: 54.158.253.218	Exit Code: 0

-------------------

Host: 54.160.232.175	Exit Code: 0

-------------------

Host: 54.164.10.104	Exit Code: 0

-------------------

Host: 52.90.161.227	Exit Code: 0

-------------------

Host: 54.144.51.1	Exit Code: 0

-------------------

Host: 54.158.38.41	Exit Code: 0

-------------------

Host: 54.165.115.104	Exit Code: 0

-------------------

Host: 54.144.230.38	Exit Code: 0

-------------------

Host: 54.165.190.164	Exit Code: 0

-------------------

Host: 54.159.46.149	Exit Code: 0

-------------------

Host: 54.161.10.20	Exit Code: 0

-------------------

Host: 52.91.215.80	Exit Code: 0

-------------------

Host: 54.87.232.84	Exit Code: 0

-------------------

Host: 54.91.7.223	Exit Code: 0

-------------------

Host: 54.164.125.178	Exit Code: 0

-------------------

Host: 54.165.78.58	Exit Code: 0

Installed numpy

Installing package: pandas
-------------------

Host: 54.158.253.218	Exit Code: 0

-------------------

Host: 54.160.232.175	Exit Code: 0

-------------------

Host: 54.164.10.104	Exit Code: 0

-------------------

Host: 52.90.161.227	Exit Code: 0

-------------------

Host: 54.144.51.1	Exit Code: 0

-------------------

Host: 54.158.38.41	Exit Code: 0

-------------------

Host: 54.165.115.104	Exit Code: 0

-------------------

Host: 54.144.230.38	Exit Code: 0

-------------------

Host: 54.165.190.164	Exit Code: 0

-------------------

Host: 54.159.46.149	Exit Code: 0

-------------------

Host: 54.161.10.20	Exit Code: 0

-------------------

Host: 52.91.215.80	Exit Code: 0

-------------------

Host: 54.87.232.84	Exit Code: 0

-------------------

Host: 54.91.7.223	Exit Code: 0

-------------------

Host: 54.164.125.178	Exit Code: 0

-------------------

Host: 54.165.78.58	Exit Code: 0

Installed pandas

Installing package: dask
-------------------

Host: 54.158.253.218	Exit Code: 0

-------------------

Host: 54.160.232.175	Exit Code: 0

-------------------

Host: 54.164.10.104	Exit Code: 0

-------------------

Host: 52.90.161.227	Exit Code: 0

-------------------

Host: 54.144.51.1	Exit Code: 0

-------------------

Host: 54.158.38.41	Exit Code: 0

-------------------

Host: 54.165.115.104	Exit Code: 0

-------------------

Host: 54.144.230.38	Exit Code: 0

-------------------

Host: 54.165.190.164	Exit Code: 0

-------------------

Host: 54.159.46.149	Exit Code: 0

-------------------

Host: 54.161.10.20	Exit Code: 0

-------------------

Host: 52.91.215.80	Exit Code: 0

-------------------

Host: 54.87.232.84	Exit Code: 0

-------------------

Host: 54.91.7.223	Exit Code: 0

-------------------

Host: 54.164.125.178	Exit Code: 0

-------------------

Host: 54.165.78.58	Exit Code: 0

Installed dask

Installing package: futures
-------------------

Host: 54.158.253.218	Exit Code: 0

-------------------

Host: 54.160.232.175	Exit Code: 0

-------------------

Host: 54.164.10.104	Exit Code: 0

-------------------

Host: 52.90.161.227	Exit Code: 0

-------------------

Host: 54.144.51.1	Exit Code: 0

-------------------

Host: 54.158.38.41	Exit Code: 0

-------------------

Host: 54.165.115.104	Exit Code: 0

-------------------

Host: 54.144.230.38	Exit Code: 0

-------------------

Host: 54.165.190.164	Exit Code: 0

-------------------

Host: 54.159.46.149	Exit Code: 0

-------------------

Host: 54.161.10.20	Exit Code: 0

-------------------

Host: 52.91.215.80	Exit Code: 0

-------------------

Host: 54.87.232.84	Exit Code: 0

-------------------

Host: 54.91.7.223	Exit Code: 0

-------------------

Host: 54.164.125.178	Exit Code: 0

-------------------

Host: 54.165.78.58	Exit Code: 0

Installed futures
Out[13]:
True

In [14]:
scheduler_address = cluster.launch_dask()


Installing dask.distributed on cluster

-------------------

Host: 54.158.253.218	Exit Code: 0

-------------------

Host: 54.160.232.175	Exit Code: 0

-------------------

Host: 54.164.10.104	Exit Code: 0

-------------------

Host: 52.90.161.227	Exit Code: 0

-------------------

Host: 54.144.51.1	Exit Code: 0

-------------------

Host: 54.158.38.41	Exit Code: 0

-------------------

Host: 54.165.115.104	Exit Code: 0

-------------------

Host: 54.144.230.38	Exit Code: 0

-------------------

Host: 54.165.190.164	Exit Code: 0

-------------------

Host: 54.159.46.149	Exit Code: 0

-------------------

Host: 54.161.10.20	Exit Code: 0

-------------------

Host: 52.91.215.80	Exit Code: 0

-------------------

Host: 54.87.232.84	Exit Code: 0

-------------------

Host: 54.91.7.223	Exit Code: 0

-------------------

Host: 54.164.125.178	Exit Code: 0

-------------------

Host: 54.165.78.58	Exit Code: 0

Launching scheduler on master node...
-------------------

Host: 54.164.10.104	Exit Code: 0
	STDOUT for 54.164.10.104:
		()Done.

Launching workers...
-------------------

Host: 54.158.253.218	Exit Code: 0
	STDOUT for 54.158.253.218:
		()
-------------------

Host: 54.160.232.175	Exit Code: 0
	STDOUT for 54.160.232.175:
		()
-------------------

Host: 54.164.125.178	Exit Code: 0
	STDOUT for 54.164.125.178:
		()
-------------------

Host: 52.90.161.227	Exit Code: 0
	STDOUT for 52.90.161.227:
		()
-------------------

Host: 54.144.51.1	Exit Code: 0
	STDOUT for 54.144.51.1:
		()
-------------------

Host: 54.158.38.41	Exit Code: 0
	STDOUT for 54.158.38.41:
		()
-------------------

Host: 54.165.115.104	Exit Code: 0
	STDOUT for 54.165.115.104:
		()
-------------------

Host: 54.144.230.38	Exit Code: 0
	STDOUT for 54.144.230.38:
		()
-------------------

Host: 54.165.190.164	Exit Code: 0
	STDOUT for 54.165.190.164:
		()
-------------------

Host: 54.159.46.149	Exit Code: 0
	STDOUT for 54.159.46.149:
		()
-------------------

Host: 54.161.10.20	Exit Code: 0
	STDOUT for 54.161.10.20:
		()
-------------------

Host: 52.91.215.80	Exit Code: 0
	STDOUT for 52.91.215.80:
		()
-------------------

Host: 54.87.232.84	Exit Code: 0
	STDOUT for 54.87.232.84:
		()
-------------------

Host: 54.91.7.223	Exit Code: 0
	STDOUT for 54.91.7.223:
		()
-------------------

Host: 54.165.78.58	Exit Code: 0
	STDOUT for 54.165.78.58:
		()Done.

Scheduler should be available here: 54.164.10.104:8786:8786
Web Dashboard should be available here: 54.164.10.104:8786:8787

Connect to the resulting scheduler


In [15]:
c = Client(address=scheduler_address)
c


Out[15]:
<Client: scheduler="54.164.10.104:8786" processes=15 cores=30>

Define functions which will be distributed to workers...


In [16]:
def get_data(kwargs):
    """
    Function which gets data and performs PCA on it.
    """
    from sklearn.datasets import load_digits
    from sklearn.decomposition import PCA
    import numpy as np
    
    X, y = load_digits(return_X_y=True)
    X = np.asarray([x.flatten() for x in X])
    pca = PCA(n_components=30)
    X = pca.fit_transform(X)
    
    return (kwargs, X, y)


def model_tester(package):
    """
    Function which is mapped to cluster. Passes kwargs to model to be trained.
    Returns score based on those kwargs.
    """
    
    kwargs, X, y = package
    
    import time
    import numpy as np
    from sklearn.neural_network import MLPClassifier
    from sklearn.model_selection import cross_val_score
    
    # Initialize model with given kwargs
    lr = MLPClassifier(**kwargs)
    scores = cross_val_score(estimator=lr,
                             X=X, 
                             y=y,
                             cv=5)
    return (kwargs, scores.mean(), scores.std())


def score_combiner(package):
    """
    Not needed, but more functions == more pretty colors
    """
    import time
    import random
    time.sleep(random.random())
    
    kwargs, score_m, score_std = package
    kwargs.update({'score': score_m, 'std': score_std})
    
    return kwargs


def double(n):
    '''
    Useless worker function # 1
    '''
    import time
    import random
    import sklearn
    time.sleep(random.random())
    return n * 2, 2


def add_two(package):
    """
    Useless worker function # 2
    """
    n, n2 = package
    import time
    import random
    time.sleep(random.random())
    return n + n2

Run test functions...


In [17]:
futures = c.map(double, range(250))
futures = c.map(add_two, futures)
progress(futures)

Distribute the actual work


In [18]:
futures = c.map(get_data, params)
futures = c.map(model_tester, futures)
futures = c.map(score_combiner, futures)
progress(futures)

In [19]:
results = c.gather(futures)

In [20]:
df = pd.DataFrame(results)
df['n_layers'] = df.hidden_layer_sizes.map(lambda _tuple: len(_tuple)) 
df['n_neurons'] = df.hidden_layer_sizes.map(lambda _tuple: _tuple[0])
df.head()


Out[20]:
batch_size early_stopping hidden_layer_sizes learning_rate_init score solver std n_layers n_neurons
0 5 True (78,) 0.01 0.782888 sgd 0.016110 1 78
1 5 True (82, 82) 0.01 0.768820 sgd 0.080876 2 82
2 5 True (188, 188, 188) 0.01 0.812680 sgd 0.098912 3 188
3 5 True (75, 75, 75, 75) 0.01 0.840106 sgd 0.079771 4 75
4 5 True (141, 141, 141, 141, 141) 0.01 0.904282 sgd 0.039371 5 141

In [21]:
df.n_layers.unique()


Out[21]:
array([1, 2, 3, 4, 5, 6, 7], dtype=int64)

In [21]:
data = []
for n_layers in df.n_layers.unique():
    
    temp = df[df.n_layers == n_layers]
    
    trace = go.Scatter(
        x = temp.n_neurons,
        y = temp.n_layers,
        mode='markers',
        text=['{}%<br>Layers: {}'.format(round(v * 100, 2), l) 
              for v, l in zip(temp.score.values, temp.n_layers.values)],
        name='{} layers'.format(n_layers),
        marker=dict(
            size=temp.batch_size / 20.0,
            color = temp.score, #set color equal to a variable
            colorscale='Viridis',
            showscale=False
        )
    )
    data.append(trace)

layout = dict(title = 'Best performing models.<br>(size = batch size)',
              xaxis = dict(zeroline = False, title='Neuron Count'),
              yaxis = dict(zeroline = False, title='Layer Count'),
             )

fig = dict(data=data, layout=layout)
py.iplot(fig, filename='styled-scatter')


Out[21]:

In [22]:
df.ix[df.score.argmax(), :]


Out[22]:
batch_size                              20
early_stopping                        True
hidden_layer_sizes    (165, 165, 165, 165)
learning_rate_init                    0.01
score                              0.94437
solver                                 sgd
std                               0.017427
n_layers                                 4
n_neurons                              165
Name: 10, dtype: object

Also create a distributed queue system...


In [23]:
from Queue import Queue
local_q = Queue()
remote_q = c.scatter(local_q)

def long_calc1(n):
    
    import time
    import random
    
    time.sleep(random.random())
    
    return n + 2

def long_calc2(n):
    
    import time
    import random
    
    time.sleep(random.random())
    
    return n * 2

def long_calc3(n):
    
    import time
    import random
    
    time.sleep(random.random())
    
    return n - 2


long_calc1_q = c.map(long_calc1, remote_q)
long_calc2_q = c.map(long_calc2, long_calc1_q)
long_calc3_q = c.map(long_calc3, long_calc2_q)
result_q = c.gather(long_calc3_q)

queue is currently empty...


In [24]:
result_q.qsize()


Out[24]:
0

Start submitting jobs to the queue with a thread


In [25]:
def start_jobs():

    jobs = range(500)

    for job in jobs:
        time.sleep(random.random())
        local_q.put(job)
        
    return 

thread = threading.Thread(target=start_jobs)
thread.start()

and begin receiving the results...


In [26]:
def get_jobs():
    while True:
        print result_q.get()
        
    return

finish_thread = threading.Thread(target=get_jobs)
finish_thread.start()


2
4
6
8
10
12
14
16
18
20
22
24
26
28
30
32
34
36
38
40
42
44
46
48
50
52
54
56
58
60
62
64
66
68
70
72
74
76
78
80
82
84
86
88
90
92
94
96
98
100
102
104
106
108
110
112
114
116
118
120
122
124
126
128
130
132
134
136
138
140

In [27]:
cluster.terminate_cluster()


142
144
146
ERROR:distributed.utils:Stream is closed
Traceback (most recent call last):
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1015, in run
    value = future.result()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\client.py", line 907, in _scatter
    broadcast=broadcast)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1015, in run
    value = future.result()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\core.py", line 427, in send_recv_from_rpc
    deserialize=self.deserialize, **kwargs)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1015, in run
    value = future.result()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\core.py", line 308, in send_recv
    response = yield read(stream, deserialize=deserialize)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1015, in run
    value = future.result()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\core.py", line 203, in read
    n_frames = yield stream.read_bytes(8)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1015, in run
    value = future.result()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
StreamClosedError: Stream is closed
ERROR:distributed.utils:long_calc3-f1fd63862cfbb37e140547b8aafd616e
Traceback (most recent call last):
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1015, in run
    value = future.result()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\client.py", line 806, in _gather
    None)
  File "<string>", line 2, in reraise
CancelledError: long_calc3-f1fd63862cfbb37e140547b8aafd616e
Exception in thread Thread-18:
Traceback (most recent call last):
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\threading.py", line 801, in __bootstrap_inner
    self.run()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\client.py", line 948, in _threaded_scatter
    futures = self.scatter(L, **kwargs)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\client.py", line 1024, in scatter
    broadcast=broadcast)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\utils.py", line 134, in sync
    six.reraise(type(error[0]), error[0], traceback[0])
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1015, in run
    value = future.result()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\client.py", line 907, in _scatter
    broadcast=broadcast)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1015, in run
    value = future.result()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\core.py", line 427, in send_recv_from_rpc
    deserialize=self.deserialize, **kwargs)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1015, in run
    value = future.result()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\core.py", line 308, in send_recv
    response = yield read(stream, deserialize=deserialize)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1015, in run
    value = future.result()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\core.py", line 203, in read
    n_frames = yield stream.read_bytes(8)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1015, in run
    value = future.result()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
StreamClosedError: Stream is closed

Exception in thread Thread-22:
Traceback (most recent call last):
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\threading.py", line 801, in __bootstrap_inner
    self.run()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\client.py", line 841, in _threaded_gather
    results = self.gather(L, **kwargs)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\client.py", line 883, in gather
    return sync(self.loop, self._gather, futures, errors=errors)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\utils.py", line 134, in sync
    six.reraise(type(error[0]), error[0], traceback[0])
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1015, in run
    value = future.result()
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\tornado\gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "C:\Users\Miles\Anaconda3\envs\clusterclyde\lib\site-packages\distributed\client.py", line 806, in _gather
    None)
  File "<string>", line 2, in reraise
CancelledError: long_calc3-f1fd63862cfbb37e140547b8aafd616e

WARNING:Cluster-Clyde:Terminated instances. Please check your AWS console to ensure termination of nodes!

In [ ]: