In [1]:
# Combining with structured transforms
import karps as ks
import karps.functions as f
from karps.display import show_phase

In [2]:
# Make a session
s = ks.session("demo2e")

In [3]:
employees = ks.dataframe([
    ("ACME", "John", "12/01"),
#    ("ACME", "Kate", "09/04"),
#    ("ACME", "Albert", "09/04"),
#    ("Databricks", "Ali", "09/04"),
], schema=["company_name", "employee_name", "dob"],
   name="employees")
employees


Out[3]:
/employees@org.spark.DistributedLiteral:{company_name:string, employee_name:string, dob:string}

In [4]:
# Now use this to group by companies:
res = (employees.dob
       .groupby(employees.company_name)
       .agg({
           "agg1": f.count,
           "agg2": f.count
       }))
o = f.collect(res)
o


Out[4]:
/collect_list8!org.spark.StructuredReduce:[{company_name:string, agg1:int, agg2:int}]

In [5]:
s.run(o)


---------------------------------------------------------------------------
_Rendezvous                               Traceback (most recent call last)
<ipython-input-5-a70619a70110> in <module>()
----> 1 s.run(o)

~/work/karps-python/karps/session.py in run(self, fetches, return_mode)
     49     """
     50     computation = self.compute(fetches, return_mode)
---> 51     return computation.values()
     52 
     53   def compute(self, fetches, return_mode="proto"):

~/work/karps-python/karps/computation.py in values(self)
     46     """
     47     while not self._values():
---> 48       self._progress()
     49     return self._values()
     50 

~/work/karps-python/karps/computation.py in _progress(self)
     86     logger.debug("Calling _progress")
     87     # Read one more value from the channel.
---> 88     csr = next(self._channel)
     89     #logger.debug("channel: got value %s: %s", type(csr), str(csr))
     90     if csr.HasField("start_graph"):

~/.local/share/virtualenvs/karps-python-NPcrF1gX/lib/python3.5/site-packages/grpc/_channel.py in __next__(self)
    346 
    347     def __next__(self):
--> 348         return self._next()
    349 
    350     def next(self):

~/.local/share/virtualenvs/karps-python-NPcrF1gX/lib/python3.5/site-packages/grpc/_channel.py in _next(self)
    340                         raise StopIteration()
    341                     elif self._state.code is not None:
--> 342                         raise self
    343 
    344     def __iter__(self):

_Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNKNOWN, )>

In [ ]:
# The number of people who share a birthday date with someone else.
# Takes a column of data containing birthdates.
def paradoxal_count(c):
    with ks.scope("p_count"): # Make it pretty:
        g = c.groupby(c).agg({'num_employees': f.count}, name="agg_count")
        s = f.sum(g.num_employees[g.num_employees>=2], name="paradoxical_employees")
        return s

In [ ]:


In [ ]:
df = ks.dataframe(["1/1", "3/1", "1/1"])
s.run(paradoxal_count(df))

In [ ]:


In [ ]:
# Now use this to group by companies:
res = (employees.dob
       .groupby(employees.company_name)
       .agg({
           "paradoxical_employees": paradoxal_count
       }))
o = f.collect(res)
o

In [ ]:
comp = s.compute(o)

In [ ]:
show_phase(comp, "initial")

In [ ]:
show_phase(comp, "final")

In [ ]:
comp.values()

In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]: