In [1]:
import karps as ks
import karps.functions as f
from karps.display import show_phase

In [2]:
employees = ks.dataframe([
    ("ACME", "John", "12/01", 12.0),
    ("ACME", "Kate", "09/04", 11.4),
], schema=["company_name", "employee_name", "dob", "shoe_size"],
name="employees")
employees


Out[2]:
/employees@org.spark.DistributedLiteral:{company_name:string, employee_name:string, dob:string, shoe_size:double}

In [3]:
# Group employees by date of birth, and count how many employees per calendar date.
#df2 = employees.groupby(employees.dob).agg({"count", f.count})
# Count how many dates have more than one employee.
#num_collisions = f.count(df2[df2.count >= 2], name="num_collisions")

#print("number of days with more than one b-day", s.run(num_collisions))

In [4]:
# Group employees by date of birth, and count how many employees per calendar date.
#df2 = df.groupby(df.employees_dob).agg({"count", f.count})
# Count how many dates have more than one employee.
#num_collisions = f.count(f.filter(df2.count >= 2, df2.employees_dob), name="num_collisions")

#print("number of days with more than one b-day", s.run(num_collisions))

In [7]:
# Put that into a function:
def num_collisions(dob_col):
    by_dob = dob_col.groupby(dob_col)
    count_dob = by_dob.agg({"count": f.count}, name="count_by_dob")
    num_collisions = f.count(
        count_dob[count_dob.count >= 2],
        name="num_collisions")
    return num_collisions

In [8]:
# Let's check that it works against a small amount of data
sample_dobs = ks.dataframe(["12/1", "1/4", "12/1"])
ks.display(num_collisions(sample_dobs))


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-8-ed3dffe00216> in <module>()
      1 # Let's check that it works against a small amount of data
      2 sample_dobs = ks.dataframe(["12/1", "1/4", "12/1"])
----> 3 ks.display(num_collisions(sample_dobs))

TypeError: 'module' object is not callable

In [ ]:
# We can also use this function as an aggregation function!
collisions_by_company =
    (employees.employees_dob
     .groupby(employees.company)
     .agg({"num_collisions": num_collisions}))

In [ ]:
# We can also use this function as an aggregation function!
collisions_by_company =
    (employees
     .groupby(employees.company)
     .agg({
         "num_collisions": num_collisions, # Functional
         "shoe_size": f.mean(employee.shoe_size) # Direct call to a column of the data being grouped.
         # lambda col: f.mean(col.shoe_size)
     }))

In [ ]:
s = ks.session("test3")
comp = s.compute(the_mean)

In [ ]:
show_phase(comp, "initial")

In [ ]:
show_phase(comp, "REMOVE_OBSERVABLE_BROADCASTS")

In [ ]:
show_phase(comp, "MERGE_AGGREGATIONS")

In [ ]:
show_phase(comp, "final")

In [ ]:
comp.values()

In [ ]:
s.run(the_mean)

In [ ]: