In [1]:
import karps as ks
import karps.functions as f
from karps.display import show_phase

In [2]:
df = ks.dataframe([(1,)], schema="ee")

In [3]:
df.ee


Out[3]:
ee:int<-/distributed_literal_0@org.spark.DistributedLiteral:{ee:int}

In [4]:
c = f.collect(df.ee + df.ee)
c


Out[4]:
/collect_list_2!org.spark.StructuredReduce:[int]

In [5]:
c.op_extra


Out[5]:
agg_op {
  op {
    function_name: "collect_list"
    inputs {
    }
  }
}

In [6]:
m = f.max(df.ee)

In [7]:
m2 = m + m

In [8]:
df.ee + m


Out[8]:
plus(ee,OBS(/max_4)):int<-/distributed_literal_0@org.spark.DistributedLiteral:{ee:int}

In [9]:
df2 = (df.ee + m).as_dataframe()

In [10]:
df2.op_extra


Out[10]:
col_op {
  function {
    function_name: "plus"
    inputs {
      extraction {
        path: "ee"
      }
      field_name: "ee"
    }
    inputs {
      broadcast {
        observable_index: 1
      }
    }
  }
}

In [11]:
df2.parents


Out[11]:
[/distributed_literal_0@org.spark.DistributedLiteral:{ee:int},
 /max_4!org.spark.StructuredReduce:int]

In [12]:
m2.op_extra


Out[12]:
col_op {
  function {
    function_name: "plus"
    inputs {
      broadcast {
      }
      field_name: "_0"
    }
    inputs {
      broadcast {
        observable_index: 1
      }
      field_name: "_1"
    }
  }
}

In [13]:
col1 = df.ee + df.ee
c = f.max(col1)
c


Out[13]:
/max_8!org.spark.StructuredReduce:int

In [14]:
s = ks.session("test2")
comp = s.compute(c)

In [15]:
show_phase(comp, "final")



In [16]:
comp.values()


Out[16]:
(int, int_value: 2
)

In [ ]: