In [ ]:
import ibis
import os
hdfs_port = os.environ.get('IBIS_WEBHDFS_PORT', 50070)
hdfs = ibis.hdfs_connect(host='impala', port=hdfs_port)
con = ibis.impala.connect(host='impala', database='ibis_testing',
hdfs_client=hdfs)
ibis.options.interactive = True
A common analytical pattern involves subsetting based on some method of ranking. For example, "the 5 most frequently occurring widgets in a dataset". By choosing the right metric, you can obtain the most important or least important items from some dimension, for some definition of important.
To carry out the pattern by hand involves the following
semi_join or isin) in your next queryFor example, let's look at the TPC-H tables and find the 5 or 10 customers who placed the most orders over their lifetime:
In [ ]:
orders = con.table('tpch_orders')
top_orders = (orders
.group_by('o_custkey')
.size()
.sort_by(('count', False))
.limit(5))
top_orders
Now, we could use these customer keys as a filter in some other analysis:
In [ ]:
# Among the top 5 most frequent customers, what's the histogram of their order statuses?
analysis = (orders[orders.o_custkey.isin(top_orders.o_custkey)]
.group_by('o_orderstatus')
.size())
analysis
This is such a common pattern that Ibis supports a high level primitive topk operation, which can be used immediately as a filter:
In [ ]:
top_orders = orders.o_custkey.topk(5)
orders[top_orders].group_by('o_orderstatus').size()
This goes a little further. Suppose now we want to rank customers by their total spending instead of the number of orders, perhaps a more meaningful metric:
In [ ]:
total_spend = orders.o_totalprice.sum().name('total')
top_spenders = (orders
.group_by('o_custkey')
.aggregate(total_spend)
.sort_by(('total', False))
.limit(5))
top_spenders
To use another metric, just pass it to the by argument in topk:
In [ ]:
top_spenders = orders.o_custkey.topk(5, by=total_spend)
orders[top_spenders].group_by('o_orderstatus').size()
If you're a relational data guru, you may have wondered how it's possible to join tables with themselves, because joins clauses involve column references back to the original table.
Consider the SQL
SELECT t1.key, sum(t1.value - t2.value) AS metric
FROM my_table t1
JOIN my_table t2
ON t1.key = t2.subkey
GROUP BY 1
Here, we have an unambiguous way to refer to each of the tables through aliasing.
Let's consider the TPC-H database, and support we want to compute year-over-year change in total order amounts by region using joins.
In [ ]:
region = con.table('tpch_region')
nation = con.table('tpch_nation')
customer = con.table('tpch_customer')
orders = con.table('tpch_orders')
orders.limit(5)
First, let's join all the things and select the fields we care about:
In [ ]:
fields_of_interest = [region.r_name.name('region'),
nation.n_name.name('nation'),
orders.o_totalprice.name('amount'),
orders.o_orderdate.cast('timestamp').name('odate') # these are strings
]
joined_all = (region.join(nation, region.r_regionkey == nation.n_regionkey)
.join(customer, customer.c_nationkey == nation.n_nationkey)
.join(orders, orders.o_custkey == customer.c_custkey)
[fields_of_interest])
Okay, great, let's have a look:
In [ ]:
joined_all.limit(5)
Sweet, now let's aggregate by year and region:
In [ ]:
year = joined_all.odate.year().name('year')
total = joined_all.amount.sum().cast('double').name('total')
annual_amounts = (joined_all
.group_by(['region', year])
.aggregate(total))
annual_amounts
Looking good so far. Now, we need to join this table on itself, by subtracting 1 from one of the year columns.
We do this by creating a "joinable" view of a table that is considered a distinct object within Ibis. To do this, use the view function:
In [ ]:
current = annual_amounts
prior = annual_amounts.view()
yoy_change = (current.total - prior.total).name('yoy_change')
results = (current.join(prior, ((current.region == prior.region) &
(current.year == (prior.year - 1))))
[current.region, current.year, yoy_change])
df = results.execute()
In [ ]:
df['yoy_pretty'] = df.yoy_change.map(lambda x: '$%.2fmm' % (x / 1000000.))
df
If you're being fastidious and want to consider the first year occurring in the dataset for each region to have 0 for the prior year, you will instead need to do an outer join and treat nulls in the prior side of the join as zero:
In [ ]:
yoy_change = (current.total - prior.total.zeroifnull()).name('yoy_change')
results = (current.outer_join(prior, ((current.region == prior.region) &
(current.year == (prior.year - 1))))
[current.region, current.year, current.total,
prior.total.zeroifnull().name('prior_total'),
yoy_change])
results.limit(10)