TensorFrames: Quick Start

This notebook provides a TensorFrames Quick Start using Databricks Community Edition. You can run this from the pyspark shell like any other Spark package:

# The version we're using in this notebook
$SPARK_HOME/bin/pyspark --packages tjhunter:tensorframes:0.2.2-s_2.10  

# The latest version 
$SPARK_HOME/bin/pyspark --packages databricks:tensorframes:0.2.3-s_2.10

For more information, please refer to the databricks/tensorframes github repo.

Configuration and Setup

Please follow the configuration and setup steps below in the following order:

  1. Launch a Spark cluster using Spark 1.6 (Hadoop 1) and Scala 2.10
  2. Attach to this cluster TensorFrames 0.2.2: tensorframes-0.2.2-s_2.10
  3. In a notebook, run one of the following command to install TensorFlow. This has been tested with TensorFlow 0.9 CPU edition (0.9 installs a bit faster)
    • TensorFlow 0.9, Ubuntu/Linux 64-bit, CPU only, Python 2.7 /databricks/python/bin/pip install https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.9.0rc0-cp27-none-linux_x86_64.whl
    • TensorFlow 0.9, Ubuntu/Linux 64-bit, GPU enabled, Python 2.7 /databricks/python/bin/pip install https://storage.googleapis.com/tensorflow/linux/gpu/tensorflow-0.9.0rc0-cp27-none-linux_x86_64.whl
  4. Detach and reattach the notebook you just ran this command from
  5. Your cluster is now configured. You can run pure tensorflow programs on the driver, or TensorFrames examples on the whole cluster

Install TensorFlow

  • Remember to first install TensorFrames 0.2.2: tensorframes-0.2.2-s_2.10
  • This is the pip install command that will install TensorFlow on to the Apache Spark driver

In [4]:
%sh
/databricks/python/bin/pip install https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.9.0rc0-cp27-none-linux_x86_64.whl

Detach and re-attach this notebook from the cluster

  • Once you have done this, you will be able to run TensorFlow programs on the driver and TensorFrames programs on the cluster.

Quick Start

This is a simple TensorFrames program that where the op is to perform a simple addition. The original source code can be found at the databricks/tensorframes GitHub repo. This is in reference to the TensorFrames Readme.md > How to Run in Python section.

Use Tensorflow to add 3 to an existing column

The first thing we will do is import TensorFlow, TensorFrames, and pyspark.sql.row and create a dataframe based on an RDD of floats.


In [7]:
# Import TensorFlow, TensorFrames, and Row
import tensorflow as tf
import tensorframes as tfs
from pyspark.sql import Row

# Create RDD of floats and convert into DataFrame `df`
rdd = [Row(x=float(x)) for x in range(10)]
df = sqlContext.createDataFrame(rdd)

View the df DataFrame generated by the RDD of floats


In [9]:
df.show()

Execute the Tensor Graph

As noted above, this Tensor graph consists of adding 3 to the tensor created by the df DataFrame generated by the RDD of floats.

  • x utilizes tfs.block where block builds a block placeholder based on the content of a column in a dataframe.
  • z is a the output tensor from the tensorflow add method (tf.add)
  • df2 is the new DataFrame which adds extra columns to the df DataFrame with the z tensor block by block

In [11]:
# Run TensorFlow program executes:
#   The `op` performs the addition (i.e. `x` + `3`)
#   Place the data back into a DataFrame
with tf.Graph().as_default() as g:
    # The TensorFlow placeholder that corresponds to column 'x'.
    # The shape of the placeholder is automatically inferred from the DataFrame.
    x = tfs.block(df, "x")
    
    # The output that adds y to x
    z = tf.add(x, 3, name='z')
    
    # The resulting dataframe
    df2 = tfs.map_blocks(z, df)

# Note that `z` is the tensor output from the `tf.add` operation
print z

Review the output dataframe

With the tensor added as a column z to the df dataframe; you now have the df2 dataframe that allows you to continue working with your data as a Spark DataFrame.


In [13]:
df2.show()

Block-wise reducing operations example

In this next section, we will show how to work with block-wise reducing operations. Specifically, we will compute the sum and min of a field vectors, working with blocks of rows for more efficient processing.

Building a DataFrame of vectors

First, we will create an one-colummn DataFrame of vectors


In [15]:
# Build a DataFrame of vectors
data = [Row(y=[float(y), float(-y)]) for y in range(10)]
df = sqlContext.createDataFrame(data)
df.show()

Analyze the DataFrame

We need to analyze the DataFrame to determine what is its shape (i.e., dimensions of the vectors). For example, below, we use the tfs.print_schema commmand for the df DataFrame.


In [17]:
# Print the information gathered by TensorFlow to check the content of the DataFrame
tfs.print_schema(df)

Notice the double[?,?] meaning that TensorFlow does not know the dimensions of the vectors.


In [19]:
# Because the dataframe contains vectors, we need to analyze it first to find the
# dimensions of the vectors.
df2 = tfs.analyze(df)

# The information gathered by TF can be printed to check the content:
tfs.print_schema(df2)

Analyze This

Upon analysis via df2 DataFrame, TensorFlow has inferred that y contains vectors of size 2. For small tensors (scalars and vectors), TensorFrames usually infers the shapes of the tensors without requiring a preliminary analysis. If it cannot do it, an error message will indicate that you need to run the DataFrame through tfs.analyze() first.

Compute Elementwise Sum and Min of all vectors

Now, let's use the analyzed dataframe to compute the sum and the elementwise minimum of all the vectors using tf.reduce_sum and tf.reduce_min.

  • tf.reduce_sum: compute the sum of elements across dimensions of a tensor, e.g. if x = x = [[3, 2, 1], [-1, 2, 1]] then tf.reduce_sum(x) ==> 8.
  • tf.reduce_min: compute the minimum of elements across dimensions of a tensor, e.g. if x = x = [[3, 2, 1], [-1, 2, 1]] then tf.reduce_min(x) ==> -1.

In [21]:
# Note: First, let's make a copy of the 'y' column. This will be very cheap in Spark 2.0+
df3 = df2.select(df2.y, df2.y.alias("z"))

# Execute the Tensor Graph
with tf.Graph().as_default() as g:
    # The placeholders. Note the special name that end with '_input':
    y_input = tfs.block(df3, 'y', tf_name="y_input")
    z_input = tfs.block(df3, 'z', tf_name="z_input")
    
    # Perform elementwise sum and minimum 
    y = tf.reduce_sum(y_input, [0], name='y')
    z = tf.reduce_min(z_input, [0], name='z')
    
    # The resulting dataframe
    (data_sum, data_min) = tfs.reduce_blocks([y, z], df3)

In [22]:
# The final results are numpy arrays:
print "Elementwise sum: %s and minimum: %s " % (data_sum, data_min)

Notes:

  • The scoping of the graphs above is important because TensorFrames finds which DataFrame column to feed to TensorFrames based on the placeholders of the graph.
  • It is good practice to keep small graphs when sending them to Spark.

In [24]: