Advanced Topics: Top-K and Self Joins

Setup


In [ ]:
import ibis
import os
hdfs_port = os.environ.get('IBIS_WEBHDFS_PORT', 50070)
hdfs = ibis.hdfs_connect(host='impala', port=hdfs_port)
con = ibis.impala.connect(host='impala', database='ibis_testing',
                          hdfs_client=hdfs)
ibis.options.interactive = True

"Top-K" Filtering

A common analytical pattern involves subsetting based on some method of ranking. For example, "the 5 most frequently occurring widgets in a dataset". By choosing the right metric, you can obtain the most important or least important items from some dimension, for some definition of important.

To carry out the pattern by hand involves the following

  • Choose a ranking metric
  • Aggregate, computing the ranking metric, by the target dimension
  • Order by the ranking metric and take the highest K values
  • Use those values as a set filter (either with semi_join or isin) in your next query

For example, let's look at the TPC-H tables and find the 5 or 10 customers who placed the most orders over their lifetime:


In [ ]:
orders = con.table('tpch_orders')

top_orders = (orders
              .group_by('o_custkey')
              .size()
              .sort_by(('count', False))
              .limit(5))
top_orders

Now, we could use these customer keys as a filter in some other analysis:


In [ ]:
# Among the top 5 most frequent customers, what's the histogram of their order statuses?
analysis = (orders[orders.o_custkey.isin(top_orders.o_custkey)]
            .group_by('o_orderstatus')
            .size())
analysis

This is such a common pattern that Ibis supports a high level primitive topk operation, which can be used immediately as a filter:


In [ ]:
top_orders = orders.o_custkey.topk(5)
orders[top_orders].group_by('o_orderstatus').size()

This goes a little further. Suppose now we want to rank customers by their total spending instead of the number of orders, perhaps a more meaningful metric:


In [ ]:
total_spend = orders.o_totalprice.sum().name('total')
top_spenders = (orders
                .group_by('o_custkey')
                .aggregate(total_spend)
                .sort_by(('total', False))
                .limit(5))
top_spenders

To use another metric, just pass it to the by argument in topk:


In [ ]:
top_spenders = orders.o_custkey.topk(5, by=total_spend)
orders[top_spenders].group_by('o_orderstatus').size()

Self joins

If you're a relational data guru, you may have wondered how it's possible to join tables with themselves, because joins clauses involve column references back to the original table.

Consider the SQL

SELECT t1.key, sum(t1.value - t2.value) AS metric
    FROM my_table t1
      JOIN my_table t2
        ON t1.key = t2.subkey
    GROUP BY 1

Here, we have an unambiguous way to refer to each of the tables through aliasing.

Let's consider the TPC-H database, and support we want to compute year-over-year change in total order amounts by region using joins.


In [ ]:
region = con.table('tpch_region')
nation = con.table('tpch_nation')
customer = con.table('tpch_customer')
orders = con.table('tpch_orders')

orders.limit(5)

First, let's join all the things and select the fields we care about:


In [ ]:
fields_of_interest = [region.r_name.name('region'), 
                      nation.n_name.name('nation'),
                      orders.o_totalprice.name('amount'),
                      orders.o_orderdate.cast('timestamp').name('odate') # these are strings
                      ]

joined_all = (region.join(nation, region.r_regionkey == nation.n_regionkey)
              .join(customer, customer.c_nationkey == nation.n_nationkey)
              .join(orders, orders.o_custkey == customer.c_custkey)
              [fields_of_interest])

Okay, great, let's have a look:


In [ ]:
joined_all.limit(5)

Sweet, now let's aggregate by year and region:


In [ ]:
year = joined_all.odate.year().name('year')

total = joined_all.amount.sum().cast('double').name('total')

annual_amounts = (joined_all
                  .group_by(['region', year])
                  .aggregate(total))
annual_amounts

Looking good so far. Now, we need to join this table on itself, by subtracting 1 from one of the year columns.

We do this by creating a "joinable" view of a table that is considered a distinct object within Ibis. To do this, use the view function:


In [ ]:
current = annual_amounts
prior = annual_amounts.view()

yoy_change = (current.total - prior.total).name('yoy_change')

results = (current.join(prior, ((current.region == prior.region) & 
                                (current.year == (prior.year - 1))))
           [current.region, current.year, yoy_change])
df = results.execute()

In [ ]:
df['yoy_pretty'] = df.yoy_change.map(lambda x: '$%.2fmm' % (x / 1000000.))
df

If you're being fastidious and want to consider the first year occurring in the dataset for each region to have 0 for the prior year, you will instead need to do an outer join and treat nulls in the prior side of the join as zero:


In [ ]:
yoy_change = (current.total - prior.total.zeroifnull()).name('yoy_change')
results = (current.outer_join(prior, ((current.region == prior.region) & 
                                      (current.year == (prior.year - 1))))
           [current.region, current.year, current.total,
            prior.total.zeroifnull().name('prior_total'), 
            yoy_change])

results.limit(10)