ML training often expects flat data, like a line in a CSV. Tensorflow was first built to learn on these flat tensors. But the data you care about and want to predict things about usually starts out structured.
Over and over again you have to write transform code that turns your structured data into Tensors. This repetitive transform code must be rewritten over and over for all your ML pipelines both for training and serving! And it lets bugs slip into your ML pipeline.
struct2tensor lets you take advantage of structured data within your ML pipelines. It is:
Suppose we have this structured data we want to train on. The source example data format is a protobuff. struct2tensor was built internally and works on protobuffers now. It can be extended to parquet, json, etc. in the future.
# e.g. a web session
message Session{
message SessionInfo {
string session_feature = 1;
double session_duration_sec = 2;
}
SessionInfo session_info = 1;
message Event {
string query = 1;
message Action {
int number_of_views = 1;
}
repeated Action action = 2;
}
repeated Event event = 2;
}
In 3 steps we'll extract the fields we want with struct2tensor. We'll end up with batch-aligned SparseTensors:
event (submessage Session::Event).session_info.session_featureevent.queryevent.action.number_of_views (the actual label could be sum(action.number_of_views for action in event))Then we can build a struct2tensor query that:
SparseTensorsDon't worry about some of these terms yet. We'll show you an example. And then explain the terms later.
In [1]:
# install struct2tensor
!pip install struct2tensor
# graphviz for pretty output
!pip install graphviz
In [2]:
import base64
import numpy as np
import pprint
import os
import tensorflow
from graphviz import Source
# We are in the world of TFjupyter notebook -i 0.0.0.0 2.0 \o/
import tensorflow.compat.v2 as tf
tf.enable_v2_behavior()
# TODO(b/73292591): Remove once type annotations work in AutoGraph.
tf.autograph.set_verbosity(0, False)
from IPython.display import Image
from IPython.lib import pretty
# we use the fully namespaced calls in this example notebook so you can
# recognize all the struct2tensor uses inline.
import struct2tensor
from struct2tensor.prensor import Prensor
from struct2tensor.path import Path
from struct2tensor.expression_impl import parquet
from struct2tensor.expression_impl import project
from struct2tensor.expression_impl.proto import create_expression_from_proto
from struct2tensor.calculate import calculate_prensors
from struct2tensor.calculate_options import get_default_options
from struct2tensor.prensor_util import get_sparse_tensors
from struct2tensor.test import test_pb2
from google.protobuf import text_format
def _display(graph):
"""Renders a graphviz digraph."""
s = Source(graph)
s.format='svg'
return s
def _create_query_from_text_sessions(text_sessions):
"""Creates a struct2tensor query from a list of pbtxt of struct2tensor.test.Session."""
sessions = tf.constant([
text_format.Merge(
text_session,
struct2tensor.test.test_pb2.Session()
).SerializeToString()
for text_session in text_sessions
])
return struct2tensor.expression_impl.proto.create_expression_from_proto(
sessions, struct2tensor.test.test_pb2.Session.DESCRIPTOR)
def _unpack_sparse_tensors(dict_of_sp):
"""Unpacks SparseTensors in dict_of_sp into tuples of components."""
return {
k: (v.dense_shape, v.indices, v.values) for k, v in dict_of_sp.items()
}
def _pack_sparse_tensors(dict_of_tuple):
"""Packs the tuples back to SparseTensors."""
return {
k: tf.SparseTensor(dense_shape=v[0], indices=v[1], values=v[2])
for k, v in dict_of_tuple.items()
}
def _prensor_pretty_printer(prensor, p, cycle):
"""Pretty printing function for struct2tensor.prensor.Prensor"""
pretty.pprint(struct2tensor.prensor_util.get_sparse_tensors(
prensor, options=struct2tensor.calculate_options.get_default_options()))
def _sp_pretty_printer(sp, p, cycle):
"""Pretty printing function for SparseTensor."""
del cycle
p.begin_group(4, "SparseTensor(")
p.text("values={}, ".format(sp.values.numpy().tolist()))
p.text("dense_shape={}, ".format(sp.dense_shape.numpy().tolist()))
p.break_()
p.text("indices={}".format(sp.indices.numpy().tolist()))
p.end_group(4, ")")
pretty.for_type(tf.SparseTensor, _sp_pretty_printer)
pretty.for_type(struct2tensor.prensor.Prensor, _prensor_pretty_printer)
_pretty_print = pretty.pprint
print("type-specific pretty printing ready to go")
In [3]:
@tf.function(input_signature=[tf.TensorSpec(shape=(None), dtype=tf.string)])
def parse_session(serialized_sessions):
"""A TF function parsing a batch of serialized Session protos into tensors.
It is a TF graph that takes one 1-D tensor as input, and outputs a
Dict[str, tf.SparseTensor] (due to a bug in TF 2.0, it actually cannot
output tf.SparseTensor, hence _{un,}pack_sparse_tensors).
"""
query = struct2tensor.expression_impl.proto.create_expression_from_proto(
serialized_sessions, struct2tensor.test.test_pb2.Session.DESCRIPTOR)
# Move all the fields of our interest to under "event".
query = query.promote_and_broadcast({
"session_feature": "session_info.session_feature",
"action_number_of_views": "event.action.number_of_views" },
"event")
# Specify "event" to be examples.
query = query.reroot("event")
# Extract all the fields of our interest.
projection = query.project(["session_feature", "query", "action_number_of_views"])
prensors = struct2tensor.calculate.calculate_prensors([projection])
output_sparse_tensors = {}
for prensor in prensors:
path_to_tensor = struct2tensor.prensor_util.get_sparse_tensors(
prensor, options=struct2tensor.calculate_options.get_default_options())
output_sparse_tensors.update({str(k): v for k, v in path_to_tensor.items()})
# TF2.0 bug: tf.function cannot return tf.SparseTensor.
return _unpack_sparse_tensors(output_sparse_tensors)
print("Defined the workhorse func: (structured data at rest) -> (tensors)")
In [4]:
serialized_sessions = tf.constant([
text_format.Merge(
"""
session_info {
session_duration_sec: 1.0
session_feature: "foo"
}
event {
query: "Hello"
action {
number_of_views: 1
}
action {
}
}
event {
query: "world"
action {
number_of_views: 2
}
action {
number_of_views: 3
}
}
""",
struct2tensor.test.test_pb2.Session()
).SerializeToString()
])
_pretty_print(
# TF2.0 bug: tf.function cannot return tf.SparseTensor
_pack_sparse_tensors(
parse_session(serialized_sessions)))
See how we went from our pre-pipeline data (the Protobuffer) all the way to the structured data, packed into SparseTensors?
Interested and want to learn more? Read on...
Let's define several terms we mentioned before:
A Prensor (protobuffer + tensor) is a data structure storing the data we work on. We use protobuffers a lot at Google. struct2tensor can support other structured formats, too.
For example, throughout this colab we will be using proto
struct2tensor.test.Session. A schematic visualization
of a selected part of the prensor from that proto looks like:
In [5]:
#@title { display-mode: "form" }
_display("""
digraph {
root -> session [label="*"];
session -> event [label="*"];
session -> session_id [label="?"];
event -> action [label="*"];
event -> query_token [label="*"]
action -> number_of_views [label="?"];
}
""")
Out[5]:
We will be using visualizations like this to demostrate struct2tensor queries later.
Note:
A struct2tensor query transforms a Prensor into another Prensor.
For example, broadcast is a query that replicates a node as a child of one of its siblings.
Applying
broadcast(
source_path="session.session_id",
sibling="event",
new_field_name="session_session_id")
on the previous tree gives:
In [6]:
#@title { display-mode: "form" }
_display("""
digraph {
session_session_id [color="red"];
root -> session [label="*"];
session -> event [label="*"];
session -> session_id [label="?"];
event -> action [label="*"];
event -> session_session_id [label="?"];
event -> query_token [label="*"];
action -> number_of_views [label="?"];
}
""")
Out[6]:
We will talk about common struct2tensor queries in later sections.
A projection of paths in a Prensor produces another Prensor with just the selected paths.
The structure of the projected path can be represented losslessly as nested lists. For example, the projection of event.action.number_of_views from the struct2tensorTree formed by the following two instances of struct2tensor.test.Session:
{
event { action { number_of_views: 1} action { number_of_views: 2} action {} }
event {}
}, {
event { action { number_of_views: 3} }
}
is:
[ # the outer list has two elements b/c there are two Session protos.
[ # the first proto has two events
[[1],[2],[]], # 3 actions, the last one does not have a number_of_views.
[], # the second event does not have action
],
[ # the second proto has one event
[[3]],
],
]
tf.SparseTensorstruct2tensor uses tf.SparseTensor to represent the above nested list in the projection results. Note that tf.SparseTensor essentially enforces that the lists nested at the same level to have the same length (because the there is a certain size for each dimension), therefore this representation is lossy. The above nested lists, when written as a SparseTensor will look like:
tf.SparseTensor(
dense_shape=[2, 2, 3, 1], # each is the maximum length of lists at the same nesting level.
values = [1, 2, 3],
indices = [[0, 0, 0, 0], [0, 0, 1, 0], [1, 0, 0, 0]]
)
Note that the last dimension is useless: the index of that dimension will always be 0 for any present value because number_of_views is an optional field. So struct2tensors library will actually "squeeze" all the optional dimensions.
The actual result would be:
In [7]:
query = _create_query_from_text_sessions(['''
event { action { number_of_views: 1} action { number_of_views: 2} action {} }
event {}
''', '''
event { action { number_of_views: 3} }
''']
).project(["event.action.number_of_views"])
prensor = struct2tensor.calculate.calculate_prensors([query])
pretty.pprint(prensor)
struct2tensor's internal data model is closer to the above "nested lists" abstraction and sometimes it's easier to reason with "nested lists" than with SparseTensors.
Recently, tf.RaggedTensor was introduced to represent nested lists exactly. We are working on adding support for projecting into ragged tensors.
In [8]:
#@title { display-mode: "form" }
_display('''
digraph {
root -> session [label="*"];
session -> event [label="*"];
event -> query_token [label="*"];
}
''')
Out[8]:
promote(source_path="event.query_token", new_field_name="event_query_token")
In [9]:
#@title { display-mode: "form" }
_display('''
digraph {
event_query_token [color="red"];
root -> session [label="*"];
session -> event [label="*"];
session -> event_query_token [label="*"];
event -> query_token [label="*"];
}
''')
Out[9]:
In [10]:
query = (_create_query_from_text_sessions([
"""
event {
query_token: "abc"
query_token: "def"
}
event {
query_token: "ghi"
}
"""])
.promote(source_path="event.query_token", new_field_name="event_query_token")
.project(["event_query_token"]))
prensor = struct2tensor.calculate.calculate_prensors([query])
_pretty_print(prensor)
The projected structure is like:
{
# this is under Session.
event_query_token: "abc"
event_query_token: "def"
event_query_token: "ghi"
}
broadcastBroadcasts the value of a node to one of its sibling. The value will be replicated if the sibling is repeated. This is similar to Tensorflow and Numpy's broadcasting semantics.
In [11]:
#@title { display-mode: "form" }
_display('''
digraph {
root -> session [label="*"];
session -> session_id [label="?"];
session -> event [label="*"];
}
''')
Out[11]:
broadcast(source_path="session_id", sibling_field="event", new_field_name="session_session_id")
In [12]:
#@title { display-mode: "form" }
_display('''
digraph {
session_session_id [color="red"];
root -> session [label="*"];
session -> session_id [label="?"];
session -> event [label="*"];
event -> session_session_id [label="?"];
}
''')
Out[12]:
In [13]:
query = (_create_query_from_text_sessions([
"""
session_id: 8
event { }
event { }
"""])
.broadcast(source_path="session_id",
sibling_field="event",
new_field_name="session_session_id")
.project(["event.session_session_id"]))
prensor = struct2tensor.calculate.calculate_prensors([query])
_pretty_print(prensor)
The projected structure is like:
{
event {
session_session_id: 8
}
event {
session_session_id: 8
}
}
promote_and_broadcastThe query accepts multiple source fields and a destination field. For each source field, it first promotes it to the least common ancestor with the destination field (if necessary), then broadcasts it to the destination field (if necessary).
Usually for the purpose of machine learning, this gives a reasonable flattened representation of nested structures.
promote_and_broadcast(
path_dictionary={
'session_info_duration_sec': 'session_info.session_duration_sec'},
dest_path_parent='event.action')
is equivalent to:
promote(source_path='session_info.session_duration_sec',
new_field_name='anonymous_field1')
broadcast(source_path='anonymous_field1',
sibling_field='event.action',
new_field_name='session_info_duration_sec')
map_field_valuesCreates a new node that is a sibling of a leaf node. The values of the new node are results of applying the given function to the values of the source node.
Note that the function provided takes 1-D tensor that contains all the values of the source node as input and should also output a 1-D tensor of the same size, and it should build TF ops.
In [14]:
query = (_create_query_from_text_sessions([
"""
session_id: 8
""",
"""
session_id: 9
"""])
.map_field_values("session_id", lambda x: tf.add(x, 1), dtype=tf.int64,
new_field_name="session_id_plus_one")
.project(["session_id_plus_one"]))
prensor = struct2tensor.calculate.calculate_prensors([query])
_pretty_print(prensor)
rerootMakes the given node the new root of the struct2tensorTree. This has two effects:
In [15]:
#@title { display-mode: "form" }
_display('''
digraph {
root -> session [label="*"];
session -> session_id [label="?"];
session -> event [label="*"];
event -> event_id [label="?"];
}
''')
Out[15]:
reroot("event")
In [16]:
#@title { display-mode: "form" }
_display('''
digraph {
root -> event [label="*"];
event -> event_id [label="?"];
}
''')
Out[16]:
In [17]:
#@title { display-mode: "form" }
text_protos = ["""
session_id: 1
event {
event_id: "a"
}
event {
event_id: "b"
}
""",
"""
session_id: 2
""",
"""
session_id: 3
event {
event_id: "c"
}
"""
]
print("""Assume the following Sessions: """)
print([text_format.Merge(p, struct2tensor.test.test_pb2.Session()) for p in text_protos])
print("\n")
reroot_example_query = _create_query_from_text_sessions(text_protos)
print("""project(["event.event_id"]) before reroot() (the batch dimension is the index to sessions):""")
_pretty_print(struct2tensor.calculate.calculate_prensors([reroot_example_query.project(["event.event_id"])]))
print("\n")
print("""project(["event_id"]) after reroot() (the batch dimension becomes the index to events):""")
_pretty_print(struct2tensor.calculate.calculate_prensors([reroot_example_query.reroot("event").project(["event_id"])]))
struct2tensor offers an Apache Parquet tf.DataSet that allows reading from a Parquet file and apply queries to manipulate the structure of the data.
Because of the powerful struct2tensor library, the dataset will only read the Parquet columns that are required. This reduces I/O cost if we only need a select few columns.
Please run the code cell at Some Pretty Printing and Imports to ensure that all required modules are imported, and that pretty print works properly.
In [0]:
# Download our sample data file from the struct2tensor repository. The desciption of the data is below.
!curl -o dremel_example.parquet 'https://raw.githubusercontent.com/google/struct2tensor/master/struct2tensor/testdata/parquet_testdata/dremel_example.parquet'
We will use a sample Parquet data file (dremel_example.parquet), which contains data based on the example used in this paper: https://storage.googleapis.com/pub-tools-public-publication-data/pdf/36632.pdf
The file dremel_example.parquet has the following schema:
message Document {
required int64 DocId;
optional group Links {
repeated int64 Backward;
repeated int64 Forward; }
repeated group Name {
repeated group Language {
required string Code;
optional string Country; }
optional string Url; }}
and contains the following data:
Document
DocId: 10
Links
Forward: 20
Forward: 40
Forward: 60
Name
Language
Code: 'en-us'
Country: 'us'
Language
Code: 'en'
Url: 'http://A'
Name
Url: 'http://B'
Name
Language
Code: 'en-gb'
Country: 'gb'
Document
DocId: 20
Links
Backward: 10
Backward: 30
Forward: 80
Name
Url: 'http://C'
In this example, we will promote and broadcast the field Links.Forward and project it.
batch_size will be the number of records (Document) per prensor. This works with optional and repeated fields, and will be able to batch the entire record.
Feel free to try batch_size = 2 in the below code. (Note this parquet file only has 2 records (Document) total).
In [0]:
filenames = ["dremel_example.parquet"]
batch_size = 1
exp = parquet.create_expression_from_parquet_file(filenames)
new_exp = exp.promote_and_broadcast({"new_field": "Links.Forward"}, Path(
["Name"]))
proj_exp = project.project(new_exp, [Path(["Name", "new_field"])])
proj_exp_needed = project.project(exp, [Path(["Name", "Url"])])
# Please note that currently, proj_exp_needed needs to be passed into calculate.
# This is due to the way data is stored in parquet (values and repetition &
# definition levels). To construct the node for "Name", we need to read the
# values of a column containing "Name".
pqds = parquet.calculate_parquet_values([proj_exp, proj_exp_needed], exp,
filenames, batch_size)
for prensors in pqds:
new_field_prensor = prensors[0]
print("============================")
print("Schema of new_field prensor: ")
print(new_field_prensor)
print("\nSparse tensor representation: ")
pretty.pprint(new_field_prensor)
print("============================")