Table Joins Tutorial

This tutorial walks through some ways to join Hail tables. We'll use a simple movie dataset to illustrate. The movie dataset comes in multiple parts. Here are a few questions we might naturally ask about the dataset:

  • What is the mean rating per genre?
  • What is the favorite movie for each occupation?
  • What genres are most preferred by women vs men?

We'll use joins to combine datasets in order to answer these questions.

Let's initialize Hail, fetch the tutorial data, and load three tables: users, movies, and ratings.


In [ ]:
import hail as hl

hl.utils.get_movie_lens('data/')

users = hl.read_table('data/users.ht')
movies = hl.read_table('data/movies.ht')
ratings = hl.read_table('data/ratings.ht')

The Key to Understanding Joins

To understand joins in Hail, we need to revisit one of the crucial properties of tables: the key.

A table has an ordered list of fields known as the key. Our users table has one key, the id field. We can see all the fields, as well as the keys, of a table by calling describe().


In [ ]:
users.describe()

key is a struct expression of all of the key fields, so we can refer to the key of a table without explicitly specifying the names of the key fields.


In [ ]:
users.key.describe()

Keys need not be unique or non-missing, although in many applications they will be both.

When tables are joined in Hail, they are joined based on their keys. In order to join two tables, they must share the same number of keys, same key types (i.e. string vs integer), and the same order of keys.

Let's look at a simple example of a join. We'll use the Table.parallelize() method to create two small tables, t1 and t2.


In [ ]:
t1 = hl.Table.parallelize([
    {'a': 'foo', 'b': 1},
    {'a': 'bar', 'b': 2},
    {'a': 'bar', 'b': 2}],
    hl.tstruct(a=hl.tstr, b=hl.tint32),
    key='a')
t2 = hl.Table.parallelize([
    {'t': 'foo', 'x': 3.14},
    {'t': 'bar', 'x': 2.78},
    {'t': 'bar', 'x': -1},
    {'t': 'quam', 'x': 0}],
    hl.tstruct(t=hl.tstr, x=hl.tfloat64),
    key='t')

In [ ]:
t1.show()

In [ ]:
t2.show()

Now, we can join the tables.


In [ ]:
j = t1.annotate(t2_x = t2[t1.a].x)
j.show()

Let's break this syntax down.

t2[t1.a] is an expression referring to the row of table t2 with value t1.a. So this expression will create a map between the keys of t1 and the rows of t2. You can view this mapping directly:


In [ ]:
t2[t1.a].show()

Since we only want the field x from t2, we can select it with t2[t1.a].x. Then we add this field to t1 with the anntotate_rows() method. The new joined table j has a field t2_x that comes from the rows of t2. The tables could be joined, because they shared the same number of keys (1) and the same key type (string). The keys do not need to share the same name. Notice that the rows with keys present in t2 but not in t1 do not show up in the final result. This join syntax performs a left join. Tables also have a SQL-style inner/left/right/outer join() method.

The magic of keys is that they can be used to create a mapping, like a Python dictionary, between the keys of one table and the row values of another table: table[expr] will refer to the row of table that has a key value of expr. If the row is not unique, one such row is chosen arbitrarily.

Here's a subtle bit: if expr is an expression indexed by a row of table2, then table[expr] is also an expression indexed by a row of table2.

Also note that while they look similar, table['field'] and table1[table2.key] are doing very different things!

table['field'] selects a field from the table, while table1[table2.key] creates a mapping between the keys of table2 and the rows of table1.


In [ ]:
t1['a'].describe()

In [ ]:
t2[t1.a].describe()

Joining Tables

Now that we understand the basics of how joins work, let's use a join to compute the average movie rating per genre.

We have a table ratings, which contains user_id, movie_id, and rating fields. Group by movie_id and aggregate to get the mean rating of each movie.


In [ ]:
t = (ratings.group_by(ratings.movie_id) 
     .aggregate(rating = hl.agg.mean(ratings.rating)))
t.describe()

To get the mean rating by genre, we need to join in the genre field from the movies table.


In [ ]:
t = t.annotate(genres = movies[t.movie_id].genres)
t.describe()

In [ ]:
t.show()

We want to group the ratings by genre, but they're packed up in an array. To unpack the genres, we can use explode.

explode creates a new row for each element in the value of the field, which must be a collection (array or set).


In [ ]:
t = t.explode(t.genres)
t.show()

Finally, we can get group by genre and aggregate to get the mean rating per genre.


In [ ]:
t = (t.group_by(t.genres)
     .aggregate(rating = hl.agg.mean(t.rating)))
t.show(n=100)

Let's do another example. This time, we'll see if we can determine what the highest rated movies are, on average, for each occupation. We start by joining the two tables movies and users.


In [ ]:
movie_data = ratings.annotate(
    movie = movies[ratings.movie_id].title,
    occupation = users[ratings.user_id].occupation)

movie_data.show()

Next, we'll use group_by along with the aggregator hl.agg.mean to determine the average rating of each movie by occupation. Remember that the group_by operation is always associated with an aggregation.


In [ ]:
ratings_by_job = movie_data.group_by(
    movie_data.occupation, movie_data.movie).aggregate(
    mean = hl.agg.mean(movie_data.rating))

ratings_by_job.show()

In [ ]:

Now we can use another group_by to determine the highest rated movie, on average, for each occupation.

The syntax here needs some explaining. The second step in the cell below is just to clean up the table created by the preceding step. If you examine the intermediate result (for example, by giving a new name to the output of the first step), you will see that there are two columns corresponding to occupation, occupation and val.occupation. This is an artifact of the aggregator syntax and the fact that we are retaining the entire row from ratings_by_job. So in the second line, we use select to keep those columns that we want, and also rename them to drop the val. syntax. Since occupation is a key of this table, we don't need to select for it.


In [ ]:
highest_rated = ratings_by_job.group_by(
    ratings_by_job.occupation).aggregate(
    val = hl.agg.take(ratings_by_job.row,1, ordering = -ratings_by_job.mean)[0]
)

highest_rated = highest_rated.select(movie = highest_rated.val.movie,
                                    mean = highest_rated.val.mean)

highest_rated.show()

Let's try to get a deeper understanding of this result. Notice that every movie displayed has an average rating of 5, which means that every person gave these movies the highest rating. Is that unlikely? We can determine how many people rated each of these movies by working backwards and filtering our original movie_data table by fields in highest_rated.

Note that in the second line below, we are taking advantage of the fact that Hail tables are keyed.


In [ ]:
highest_rated = highest_rated.key_by(
    highest_rated.occupation, highest_rated.movie)

counts_temp = movie_data.filter(
    hl.is_defined(highest_rated[movie_data.occupation, movie_data.movie]))

counts = counts_temp.group_by(counts_temp.occupation, counts_temp.movie).aggregate(
    counts = hl.agg.count())

counts.show()

So it looks like the highest rated movies, when computed naively, mostly have a single viewer rating them. To get a better understanding of the data, we can recompute this list but only include movies which have more than 1 viewer (left as an exercise).

Exercises

  • What is the favorite movie for each occupation, conditional on there being more than one viewer?
  • What genres are rated most differently by men and women?