In [1]:
import karps as ks
import karps.functions as f
from karps.display import show_phase

In [2]:
df = ks.dataframe([1.0, 2.0], name="my_input")
df


Out[2]:
/my_input@org.spark.DistributedLiteral:double

In [3]:
def my_mean(df):
    cached_df = f.autocache(df)
    sum = f.sum(cached_df)
    count = f.as_double(f.count(cached_df))
    return sum / count

In [4]:
the_mean = my_mean(df)
the_mean


Out[4]:
/divide_4!org.spark.LocalStructuredTransform:double

In [11]:



Out[11]:
(double, double_value: 1.5
)

In [ ]:


In [5]:
s = ks.session("test3")
comp = s.compute(the_mean)

In [6]:
show_phase(comp, "initial")



In [7]:
show_phase(comp, "REMOVE_OBSERVABLE_BROADCASTS")



In [8]:
show_phase(comp, "MERGE_AGGREGATIONS")



In [9]:
show_phase(comp, "final")



In [10]:
comp.values()


Out[10]:
(double, double_value: 1.5
)

In [12]:
s.run(the_mean)


Out[12]:
(double, double_value: 1.5
)

In [ ]: