This notebook provides a TensorFrames Quick Start using Databricks Community Edition. You can run this from the pyspark shell like any other Spark package:
# The version we're using in this notebook
$SPARK_HOME/bin/pyspark --packages tjhunter:tensorframes:0.2.2-s_2.10
# The latest version
$SPARK_HOME/bin/pyspark --packages databricks:tensorframes:0.2.3-s_2.10
For more information, please refer to the databricks/tensorframes github repo.
Please follow the configuration and setup steps below in the following order:
tensorframes-0.2.2-s_2.10/databricks/python/bin/pip install https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.9.0rc0-cp27-none-linux_x86_64.whl/databricks/python/bin/pip install https://storage.googleapis.com/tensorflow/linux/gpu/tensorflow-0.9.0rc0-cp27-none-linux_x86_64.whl
In [4]:
%sh
/databricks/python/bin/pip install https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.9.0rc0-cp27-none-linux_x86_64.whl
This is a simple TensorFrames program that where the op is to perform a simple addition. The original source code can be found at the databricks/tensorframes GitHub repo. This is in reference to the TensorFrames Readme.md > How to Run in Python section.
The first thing we will do is import TensorFlow, TensorFrames, and pyspark.sql.row and create a dataframe based on an RDD of floats.
In [7]:
# Import TensorFlow, TensorFrames, and Row
import tensorflow as tf
import tensorframes as tfs
from pyspark.sql import Row
# Create RDD of floats and convert into DataFrame `df`
rdd = [Row(x=float(x)) for x in range(10)]
df = sqlContext.createDataFrame(rdd)
View the df DataFrame generated by the RDD of floats
In [9]:
df.show()
As noted above, this Tensor graph consists of adding 3 to the tensor created by the df DataFrame generated by the RDD of floats.
x utilizes tfs.block where block builds a block placeholder based on the content of a column in a dataframe.z is a the output tensor from the tensorflow add method (tf.add) df2 is the new DataFrame which adds extra columns to the df DataFrame with the z tensor block by block
In [11]:
# Run TensorFlow program executes:
# The `op` performs the addition (i.e. `x` + `3`)
# Place the data back into a DataFrame
with tf.Graph().as_default() as g:
# The TensorFlow placeholder that corresponds to column 'x'.
# The shape of the placeholder is automatically inferred from the DataFrame.
x = tfs.block(df, "x")
# The output that adds y to x
z = tf.add(x, 3, name='z')
# The resulting dataframe
df2 = tfs.map_blocks(z, df)
# Note that `z` is the tensor output from the `tf.add` operation
print z
In [13]:
df2.show()
In this next section, we will show how to work with block-wise reducing operations. Specifically, we will compute the sum and min of a field vectors, working with blocks of rows for more efficient processing.
First, we will create an one-colummn DataFrame of vectors
In [15]:
# Build a DataFrame of vectors
data = [Row(y=[float(y), float(-y)]) for y in range(10)]
df = sqlContext.createDataFrame(data)
df.show()
In [17]:
# Print the information gathered by TensorFlow to check the content of the DataFrame
tfs.print_schema(df)
Notice the double[?,?] meaning that TensorFlow does not know the dimensions of the vectors.
In [19]:
# Because the dataframe contains vectors, we need to analyze it first to find the
# dimensions of the vectors.
df2 = tfs.analyze(df)
# The information gathered by TF can be printed to check the content:
tfs.print_schema(df2)
Upon analysis via df2 DataFrame, TensorFlow has inferred that y contains vectors of size 2. For small tensors (scalars and vectors), TensorFrames usually infers the shapes of the tensors without requiring a preliminary analysis. If it cannot do it, an error message will indicate that you need to run the DataFrame through tfs.analyze() first.
Now, let's use the analyzed dataframe to compute the sum and the elementwise minimum of all the vectors using tf.reduce_sum and tf.reduce_min.
tf.reduce_sum: compute the sum of elements across dimensions of a tensor, e.g. if x = x = [[3, 2, 1], [-1, 2, 1]] then tf.reduce_sum(x) ==> 8.tf.reduce_min: compute the minimum of elements across dimensions of a tensor, e.g. if x = x = [[3, 2, 1], [-1, 2, 1]] then tf.reduce_min(x) ==> -1.
In [21]:
# Note: First, let's make a copy of the 'y' column. This will be very cheap in Spark 2.0+
df3 = df2.select(df2.y, df2.y.alias("z"))
# Execute the Tensor Graph
with tf.Graph().as_default() as g:
# The placeholders. Note the special name that end with '_input':
y_input = tfs.block(df3, 'y', tf_name="y_input")
z_input = tfs.block(df3, 'z', tf_name="z_input")
# Perform elementwise sum and minimum
y = tf.reduce_sum(y_input, [0], name='y')
z = tf.reduce_min(z_input, [0], name='z')
# The resulting dataframe
(data_sum, data_min) = tfs.reduce_blocks([y, z], df3)
In [22]:
# The final results are numpy arrays:
print "Elementwise sum: %s and minimum: %s " % (data_sum, data_min)
In [24]: