Distributed Tensorflow


In [167]:
import tensorflow as tf

In [168]:
%load_ext version_information
%version_information numpy, scipy, matplotlib, pandas, tensorflow, sklearn, skflow


The version_information extension is already loaded. To reload it, use:
  %reload_ext version_information
Out[168]:
SoftwareVersion
Python2.7.11 64bit [GCC 4.2.1 (Apple Inc. build 5577)]
IPython4.2.0
OSDarwin 15.5.0 x86_64 i386 64bit
numpy1.11.0
scipy0.13.2
matplotlib1.3.1
pandas0.13.0
tensorflow0.8.0
sklearn0.14.1
skflowThe 'skflow' distribution was not found and is required by the application
Sat Jun 04 11:46:56 2016 PDT

Overview of Components

Cluster

To define a distributed computation in tensorflow we need to specify two kinds of jobs:

  • worker jobs
  • parameter server (ps) jobs

Each job is defined by one ore more tasks. Each task is usually specified with a simple numerical index, i.e. 0,1,2,3, ...


In [202]:
CLUSTER_SPEC= """
{
    'ps' : ['tensorflow0.pipeline.io:8888', 'tensorflow1.pipeline.io:8888'],
    'worker' : [ 'tensorflow2.pipeline.io:8888','tensorflow3.pipeline.io:8888'],
}
"""

In [203]:
import ast

cluster_def = ast.literal_eval(CLUSTER_SPEC)

In [204]:
cluster_def


Out[204]:
{'ps': ['tensorflow0.pipeline.io:8888', 'tensorflow1.pipeline.io:8888'],
 'worker': ['tensorflow2.pipeline.io:8888', 'tensorflow3.pipeline.io:8888']}

In [205]:
spec = tf.train.ClusterSpec(cluster_def)

In [206]:
spec.jobs


Out[206]:
['ps', 'worker']

In [207]:
for job in spec.jobs:
    print(job, spec.job_tasks(job))


('ps', ['tensorflow0.pipeline.io:8888', 'tensorflow1.pipeline.io:8888'])
('worker', ['tensorflow2.pipeline.io:8888', 'tensorflow3.pipeline.io:8888'])

In [208]:
workers = ['/job:worker/task:{}'.format(i) for i in range(len(cluster_def['worker']))]
param_servers = ['/job:ps/task:{}'.format(i) for i in range(len(cluster_def['ps']))]

In [209]:
workers


Out[209]:
['/job:worker/task:0', '/job:worker/task:1']

In [210]:
param_servers


Out[210]:
['/job:ps/task:0', '/job:ps/task:1']

Pinning of Variables

Each Variable is assigned to a specific device.


In [211]:
l = tf.Variable("local_cpu")
l.device


Out[211]:
u''

We can enforce the assigned device using the tf.device context.


In [212]:
for ps in param_servers:
    with tf.device(ps):
        v = tf.Variable("my_var")
v.device


Out[212]:
u'/job:ps/task:1'

Tensorflow Server

The server is responsible to handle the actual communication. On each of the cluster's node we will spawn a simple gRPC Server.


In [213]:
def launch_worker(job_name, task_id, cluster_def):
    server = tf.train.Server(
        cluster_def,
        job_name=job_name,
        task_index=task_id
    )
    server.join()

Connecting to a Server

to connect to any server you can specify the 'target' of the session,direct ip:port of the server when creating a Session object.

Note that the server is generic and can assume either the role of parameter server or of worker.The Cluster configuration decides the role.

The best practice is to create a single Image launching the tensorflow worker.

Environment variables then specify the exact role for the worker at run time.

gRPC

gRPC Is a Remote Procedure Call protocol based on Protocol Buffers.

Each object in tensorflow that has to be sent over the wire has a gRPC definition.

  1. Client figures out what variables need to be serialized to gRPC.
  2. Client makes the gRPC remote call to the Server and sends the values.
  3. If the Server accepts the call, the serialized tensors are de-serialized
  4. The Server runs the requested operation on the graph and all its dependencies
  5. The Server serializes the result and sends it back on the same connection to the Client
  6. The Client receives the results and deserializes.

Example of a gRPC declaration for the Variable

syntax = "proto3";

package tensorflow;

// Protocol buffer representing a Variable.
message VariableDef {
  // Name of the variable tensor.
  string variable_name = 1;

  // Name of the initializer op.
  string initializer_name = 2;

  // Name of the snapshot tensor.
  string snapshot_name = 3;

}

Each variable can then be serialized using the to_proto method:


In [214]:
v.to_proto()


Out[214]:
variable_name: "Variable_24:0"
initializer_name: "Variable_24/Assign"
snapshot_name: "Variable_24/read:0"

Simple reduce sum Example


In [236]:
batch_size = 1000

graph = tf.Graph()
with graph.as_default():
        
    with tf.device('/job:ps/task:0'):
        input_array = tf.placeholder(tf.int32, shape=[None])
        final_result = tf.Variable(0)
        
    # divide the input across the cluster:
    all_reduce = []
    splitted = tf.split(0, len(workers), input_array)
    for idx, (portion, worker) in enumerate(zip(splitted,workers)):
        with tf.device(worker):
           print(worker)
           local_reduce = tf.reduce_sum(portion)
           local_reduce = tf.Print(portion, [local_reduce], message="portion is")
           all_reduce.append(local_reduce)
    
    final_result = tf.reduce_sum(tf.pack(all_reduce))


/job:worker/task:0
/job:worker/task:1

In [237]:
sess_config = tf.ConfigProto(
    allow_soft_placement=True,
    log_device_placement=True)

We can now run the graph


In [240]:
import numpy as np
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)

with tf.Session("grpc://tensorflow3.pipeline.io:8888", graph=graph, config=sess_config) as session:
    result = session.run(final_result, feed_dict={ input_array: np.ones([1000]) }, options=run_options)
    print(result)


1000

We can also inspect any remote variable:


In [243]:
final_result.device


Out[243]:
u''

In [242]:
with tf.Session("grpc://tensorflow3.pipeline.io:8888", graph=graph, config=sess_config) as session:
    result = session.run(local_reduce, feed_dict={ input_array: np.ones([1000]) }, options=run_options)
    print(result)


[1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1]

In [ ]: