In [1]:
# Combining with structured transforms
import karps as ks
import karps.functions as f
from karps.display import show_phase

In [2]:
employees = ks.dataframe([("a", 1), ("a", 2), ("b", 1)], schema=["my_key", "my_val"],
name="employees")
employees


Out[2]:
/employees@org.spark.DistributedLiteral:{my_key:string, my_val:int}

In [3]:
def num_collisions(c):
    return f.count(c[c>=2])
num_collisions(employees.my_val)


Out[3]:
/count_2!org.spark.StructuredReduce:int

In [4]:
employees.groupby(employees.my_key)


Out[4]:
KeyedGroup(ref=/employees@org.spark.DistributedLiteral:{my_key:string, my_val:int}, key=string, value={my_key:string, my_val:int})

In [5]:
employees.as_column()


Out[5]:
EXTR([]):{my_key:string, my_val:int}<-/employees@org.spark.DistributedLiteral:{my_key:string, my_val:int}

In [6]:
employees.my_val.groupby(employees.my_key).agg({'num_collisions': num_collisions})


Out[6]:
/structured_transform_9@org.spark.StructuredTransform:{my_key:string, num_collisions:int}

In [7]:
g = employees.my_val.groupby(employees.my_key)

In [8]:
g._key_col._field_name


Out[8]:
'my_key'

In [9]:
df1 = employees.groupby(employees.my_key).agg({'num_collisions': lambda df: num_collisions(df.my_val)})
o = f.collect(df1)
o


Out[9]:
/collect_list_17!org.spark.StructuredReduce:[{my_key:string, num_collisions:int}]

In [10]:
s = ks.session("demo2c")
comp = s.compute(o)

In [11]:
show_phase(comp, "initial")



In [12]:
show_phase(comp, "final")



In [13]:
comp.values()


Out[13]:
([{my_key:string, num_collisions:int}], array_value {
  values {
    struct_value {
      values {
        string_value: "a"
      }
      values {
        int_value: 1
      }
    }
  }
}
)

In [14]:
1


Out[14]:
1

In [ ]:


In [ ]:


In [ ]: