In [1]:
# Combining with structured transforms
import karps as ks
import karps.functions as f
from karps.display import show_phase

In [2]:
employees = ks.dataframe([("a", 1), ("a", 2), ("b", 1)], schema=["my_key", "my_val"],
name="employees")
employees


Out[2]:
/employees@org.spark.DistributedLiteral:{my_key:string, my_val:int}

In [3]:
g = employees.groupby(employees.my_key)
g


Out[3]:
KeyedGroup(ref=/employees@org.spark.DistributedLiteral:{my_key:string, my_val:int}, key=string, value={my_key:string, my_val:int})

In [4]:
df2 = g.agg({"the_max": lambda x: f.max(x.my_val)})
df2


_col_op_proto: extraction=['my_key'] c=key:string<-/employees@org.spark.DistributedLiteral:{my_key:string, my_val:int}
_col_op_proto: extraction=[] c=value:{my_key:string, my_val:int}<-/employees@org.spark.DistributedLiteral:{my_key:string, my_val:int}
_as_nodes: x=<class 'karps.column.DataFrame'>:/employees@org.spark.DistributedLiteral:{my_key:string, my_val:int}
_col_op_proto: extraction=['my_val'] c=my_val:int<-/placeholder_1@org.spark.Placeholder:{my_key:string, my_val:int}
_as_nodes: x=<class 'karps.column.DataFrame'>:/placeholder_1@org.spark.Placeholder:{my_key:string, my_val:int}
_as_nodes: x=<class 'karps.column.DataFrame'>:/structured_transform_2@org.spark.StructuredTransform:int
_agg_ks: out=[('the_max', /max_3!org.spark.StructuredReduce:int)]
_as_nodes: x=<class 'karps.column.DataFrame'>:/structured_transform_0@org.spark.StructuredTransform:{key:string, value:{my_key:string, my_val:int}}
_as_nodes: x=<class 'karps.column.DataFrame'>:/placeholder_1@org.spark.Placeholder:{my_key:string, my_val:int}
_as_nodes: x=<class 'karps.column.Observable'>:/max_3!org.spark.StructuredReduce:int
Out[4]:
/shuffle_4@org.spark.FunctionalShuffle:{key:string, the_max:int}

In [5]:
o = f.collect(df2)
o


_as_nodes: x=<class 'karps.column.DataFrame'>:/shuffle_4@org.spark.FunctionalShuffle:{key:string, the_max:int}
Out[5]:
/collect_list_5!org.spark.StructuredReduce:[{key:string, the_max:int}]

In [6]:
s = ks.session("demo2b")
comp = s.compute(o)

In [7]:
show_phase(comp, "initial")



In [8]:
show_phase(comp, "FUNCTIONAL_FLATTENING")



In [9]:
show_phase(comp, "final")



In [10]:
comp.values()


Out[10]:
([{key:string, value:int}], array_value {
  values {
    struct_value {
      values {
        string_value: "b"
      }
      values {
        int_value: 1
      }
    }
  }
  values {
    struct_value {
      values {
        string_value: "a"
      }
      values {
        int_value: 2
      }
    }
  }
}
)

In [ ]: