Spark does not let one define arbitrary functions and reuse them at will. In this example, we show how to decompose a problem into a set of simpler primitive functions, that nevertheless perform arbitrary operations that would not be allowed in Spark.
We are going to build a function that exemplifies the birthday paradox: given a set of birthdates, it will returns the number of people who happen to share a birthdate with someone else. This is easy to express using joins and groups. This function takes a dataset or a column as input (the birth dates) and returns a single number (the number of people who share the same birth day). This is an aggregation function! Our urge is of course to use it then in a different setting such as in a group, etc. As we will see, Karps allows us to write code that works for both Pandas and Spark, and that allows to plug any aggregation function in a very natural way.
In [1]:
import pandas as pd
import karps as ks
import karps.functions as f
from karps.display import show_phase
In [2]:
# Make a session at the top, although it is not required immediately.
s = ks.session("demo2")
This is an extremely small dataset:
In [3]:
employees = ks.dataframe([
("ACME", "John", "12/01"),
("ACME", "Kate", "09/04"),
("ACME", "Albert", "09/04"),
("Databricks", "Ali", "09/04"),
], schema=["company_name", "employee_name", "dob"],
name="employees")
employees
Out[3]:
Now, here is the definition of the birthday paradox function. It is pretty simple code:
In [4]:
# The number of people who share a birthday date with someone else.
# Takes a column of data containing birthdates.
def paradoxal_count(c):
with ks.scope("p_count"): # Make it pretty:
g = c.groupby(c).agg({'num_employees': f.count}, name="agg_count")
s = f.sum(g.num_employees[g.num_employees>=2], name="paradoxical_employees")
return s
This is a simple function. If we wanted to try it, or write tests for it, we would prefer not to have to launch a Spark instance, which comes with some overhead. Let's write a simple test case using Pandas to be confident it is working as expected, and then use it in Spark.
It correctly found that 2 people share the same January 1st birth date.
In [5]:
# A series of birth dates.
test_df = pd.Series(["1/1", "3/5", "1/1"])
paradoxal_count(test_df)
Out[5]:
Now that we have this nice function, let's use against each of the companies in our dataset, with Spark.
Notice that you can directly plug the function, no need to do translation, etc. This is impossible to do in Spark for complex functions like this one.
We get at the end a daframe with the name of the company and the number of employees that share the same birthdate:
In [6]:
# Now use this to group by companies:
res = (employees.dob
.groupby(employees.company_name)
.agg({
"paradoxical_employees": paradoxal_count
}))
res
Out[6]:
This is still a dataframe. Now is the time to collect and see the content:
In [7]:
o = f.collect(res)
o
Out[7]:
We run it using the session we opened before, and we use compute to inspect how Karps and Spark are evaluating the computations.
In [8]:
comp = s.compute(o)
comp
Out[8]:
Let's look under the hood to see how this gets translated.
The transformation is defined using two nested first-orderd functions, that get collected using the FunctionalShuffle operation called shuffle9.
In [9]:
show_phase(comp, "initial")
In [10]:
show_phase(comp, "final")
After optimization and flattening, the graph actually turns out to be a linear graph with a first shuffle, a filter, a second shuffle and then a final aggregate. You can click around to see how computations are being done.
In [11]:
show_phase(comp, "final")
And finally the value:
In [12]:
comp.values()
Out[12]:
As a conclusion, with Karps, you can take any reasonable function and reuse it in arbitrary ways in a functional manner, in a type-safe manner. Karps will write for you the complex SQL queries that you would have to write by hand. All errors are detected well before the actual runtime, which greatly simplifies the debugging.
Laziness and structured transforms bring to Spark some fundamental characteristics such as modularity, reusability, better testing and fast-fail comprehensive error checking, on top of automatic performance optimizations.
In [13]:
show_phase(comp, "parsed")
In [14]:
show_phase(comp, "physical")
In [15]:
show_phase(comp, "rdd")
In [16]:
comp.dump_profile("karps-trace-2.json")
In [ ]:
In [ ]:
In [ ]:
In [ ]: