In [1]:
# Create library using Mavern coordinate
# Enter "databricks:tensorframes:0.2.8-s_2.10" and search for this library
# Attach it onto your current working cluster
# Reference: https://github.com/databricks/tensorframes/wiki/TensorFrames-user-guide
import tensorframes as tfs
import tensorflow as tf
from pyspark.sql import Row
from pyspark.sql.functions import *
In [2]:
data = [Row(x=float(x)) for x in range(10)]
df = sqlContext.createDataFrame(data)
tfs.print_schema(df)
In [3]:
data
In [4]:
with tf.Graph().as_default() as g:
x = tfs.block(df, "x")
# The output that adds 3 to x
z = tf.add(x, 3, name="z")
# The resulting dataframe
df2 = tfs.map_blocks(z, df)
df2.collect()
In [5]:
# Build a DataFrame of vectors
data_2 = [Row(y=[float(y), float(-y)]) for y in range(10)]
df = sqlContext.createDataFrame(data_2)
# Because the dataframe contains vectors, we need to analyze it first to find the
# dimensions of the vectors.
df2 = tfs.analyze(df)
# The information gathered by TF can be printed to check the content:
tfs.print_schema(df2)
# TF has inferred that y contains vectors of size 2
# root
# |-- y: array (nullable = false) DoubleType[?,2]
# Let's use the analyzed dataframe to compute the sum and the elementwise minimum
# of all the vectors:
# First, let's make a copy of the 'y' column. This will be very cheap in Spark 2.0+
df3 = df2.select(df2.y, df2.y.alias("z"))
with tf.Graph().as_default() as g:
# The placeholders. Note the special name that end with '_input':
y_input = tfs.block(df3, 'y', tf_name="y_input")
z_input = tfs.block(df3, 'z', tf_name="z_input")
y = tf.reduce_sum(y_input, [0], name='y')
z = tf.reduce_min(z_input, [0], name='z')
# The resulting dataframe
(data_sum, data_min) = tfs.reduce_blocks([y, z], df3)
# The final results are numpy arrays:
print data_sum
# [45.0, -45.0]
print data_min
# [0.0, -9.0]
In [6]:
data_3 = [Row(x=[float(x), float(2 * x)],
key=str(x % 2),
z = float(x+1)) for x in range(1, 6)]
df4 = sqlContext.createDataFrame(data_3)
tfs.print_schema(df4)
In [7]:
# The analyze() method can be used to gather more information about the shape. It is relatively expensive to run.
df5 = tfs.analyze(df4)
tfs.print_schema(df5)
In [8]:
data_4 = [Row(x=float(x)) for x in range(5)]
df6 = sqlContext.createDataFrame(data_4)
with tf.Graph().as_default() as g:
# The placeholder that corresponds to column 'x'
x = tf.placeholder(tf.double, shape=[None], name="x")
# The output that adds 3 to x
z = tf.add(x, 3, name='z')
# The resulting dataframe
df7 = tfs.map_blocks(z, df6)
df7.show()
In [9]:
# Following code computes the sum of a column by repeatedly summing pairs of columns together
data_5 = [Row(x=float(x)) for x in range(5)]
df8 = sqlContext.createDataFrame(data_5)
with tf.Graph().as_default() as g:
# The placeholders that correspond to column 'x'.
# Note the convention of calling them with '_1' and '_2'
x_1 = tf.placeholder(tf.double, shape=[], name="x_1")
x_2 = tf.placeholder(tf.double, shape=[], name="x_2")
# We sum the two inputs.
x = tf.add(x_1, x_2, name='x')
# The resulting number
res = tfs.reduce_rows(x, df8)
print res
In [10]:
# computes the harmonic mean of same values, aggregated by a key
data = [Row(x=[float(x), float(2 * x)], key=str(x % 2)) for x in range(1, 6)]
# The rest of the example works without modification if we replace the data with scalars:
# data = [Row(x=float(x), key=str(x % 2)) for x in range(1, 6)]
# The analysis is not required if x is a real
df = tfs.analyze(sqlContext.createDataFrame(data))
col_name = "x"
col_key = "key"
with tf.Graph().as_default() as g:
x = tfs.block(df, col_name)
invs = tf.divide(tf.to_double(1.0), tf.to_double(x), name="invs")
df2 = tfs.map_blocks([invs, tf.ones_like(invs, name="count")], df)
In [11]:
gb = df2.select(col_key, "invs", "count").groupBy(col_key)
with tf.Graph().as_default() as g:
# Look at the documentation of tfs.aggregate for the naming conventions of the placeholders.
x_input = tfs.block(df2, "invs", tf_name="invs_input")
count_input = tfs.block(df2, "invs", tf_name="count_input")
x = tf.reduce_sum(x_input, [0], name='invs')
count = tf.reduce_sum(count_input, [0], name='count')
df3 = tfs.aggregate([x, count], gb)
with tf.Graph().as_default() as g:
invs = tfs.block(df2, "invs")
count = tfs.block(df2, "count")
geom_mean = tf.div(tf.to_double(count), invs, name = "harmonic_mean")
df4 = tfs.map_blocks(geom_mean, df3).select("key", "harmonic_mean")
df4.show()
In [12]: