In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys
sys.path.append("../..")

In [3]:
from optimus import Optimus
from optimus.helpers.test import Test


C:\Users\argenisleon\Anaconda3\lib\site-packages\socks.py:58: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
  from collections import Callable

    You are using PySparkling of version 2.4.10, but your PySpark is of
    version 2.3.1. Please make sure Spark and PySparkling versions are compatible. 
`formatargspec` is deprecated since Python 3.5. Use `signature` and the `Signature` object directly

In [4]:
op = Optimus(master='local', verbose=True)


INFO:optimus:Operative System:Windows
INFO:optimus:Just check that Spark and all necessary environments vars are present...
INFO:optimus:-----
INFO:optimus:SPARK_HOME=C:\opt\spark\spark-2.3.1-bin-hadoop2.7
INFO:optimus:HADOOP_HOME=C:\opt\hadoop-2.7.7
INFO:optimus:PYSPARK_PYTHON=C:\Users\argenisleon\Anaconda3\python.exe
INFO:optimus:PYSPARK_DRIVER_PYTHON=jupyter
INFO:optimus:PYSPARK_SUBMIT_ARGS=--jars "file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/RedshiftJDBC42-1.2.16.1027.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/mysql-connector-java-8.0.16.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/ojdbc8.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/postgresql-42.2.5.jar" --driver-class-path "C:/Users/argenisleon/Documents/Optimus/optimus/jars/RedshiftJDBC42-1.2.16.1027.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/mysql-connector-java-8.0.16.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/ojdbc8.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/postgresql-42.2.5.jar" --conf "spark.sql.catalogImplementation=hive" pyspark-shell
INFO:optimus:JAVA_HOME=C:\java
INFO:optimus:Pyarrow Installed
INFO:optimus:-----
INFO:optimus:Starting or getting SparkSession and SparkContext...
INFO:optimus:Spark Version:2.3.1
INFO:optimus:
                             ____        __  _                     
                            / __ \____  / /_(_)___ ___  __  _______
                           / / / / __ \/ __/ / __ `__ \/ / / / ___/
                          / /_/ / /_/ / /_/ / / / / / / /_/ (__  ) 
                          \____/ .___/\__/_/_/ /_/ /_/\__,_/____/  
                              /_/                                  
                              
INFO:optimus:Transform and Roll out...
INFO:optimus:Optimus successfully imported. Have fun :).
INFO:optimus:Config.ini not found

In [5]:
# Import Vectors
from pyspark.ml.linalg import Vectors

data = [
(0,1,2, Vectors.dense([1.0, 0.5, -1.0]),),
(1,2,3, Vectors.dense([2.0, 1.0, 1.0]),),
(2,3,4, Vectors.dense([4.0, 10.0, 2.0]),)]

source_df = op.spark.createDataFrame(data,["id","x", "y", "features"])

In [6]:
t = Test(op, source_df, "df_ml_2", imports=["from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector, SparseVector",
                                        "import numpy as np",
                                        "nan = np.nan",
                                        "import datetime",
                                        "from pyspark.sql import functions as F",
                                        "from optimus.ml import feature as fe"], path = "df_ml_2", final_path="..")

Normalizer


In [11]:
from optimus.ml import feature as fe
t.create(fe, "normalizer", None, 'df', None, source_df, input_cols=["features"], p=2.0)

# Problems with precision
t.delete("normalizer")


Creating test_normalizer() test function...
INFO:optimus:test_normalizer()
Viewing 3 of 3 rows / 5 columns
1 partition(s)
id
1 (bigint)
nullable
x
2 (bigint)
nullable
y
3 (bigint)
nullable
features
4 (vector)
nullable
features***NORMALIZER
5 (vector)
nullable
0
1
2
[1.0,0.5,-1.0]
[0.6666666666666666,0.3333333333333333,-0.6666666666666666]
1
2
3
[2.0,1.0,1.0]
[0.8164965809277261,0.4082482904638631,0.4082482904638631]
2
3
4
[4.0,10.0,2.0]
[0.3651483716701107,0.9128709291752769,0.18257418583505536]
Viewing 3 of 3 rows / 5 columns
1 partition(s)
Deleting file df_ml_2//normalizer.test...
INFO:optimus:test_normalizer()

In [12]:
t.run()


Creating file ../test_df_ml_2.py
Done

Vector Assembler


In [34]:
t.create(fe, "vector_assembler", None, 'df', None, source_df, input_cols=["id", "x", "y"])


Creating test_vector_assembler() test function...
INFO:optimus:test_vector_assembler()
Viewing 3 of 3 rows / 5 columns
1 partition(s)
id
1 (bigint)
nullable
x
2 (bigint)
nullable
y
3 (bigint)
nullable
features
4 (vector)
nullable
id_x_y******VECTOR_ASSEMBLER
5 (vector)
nullable
0
1
2
[1.0,0.5,-1.0]
[0.0,1.0,2.0]
1
2
3
[2.0,1.0,1.0]
[1.0,2.0,3.0]
2
3
4
[4.0,10.0,2.0]
[2.0,3.0,4.0]
Viewing 3 of 3 rows / 5 columns
1 partition(s)

Onehot encoder


In [35]:
# Creating DataFrame
data = [
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
]
df = op.spark.createDataFrame(data,["id", "category"])

In [36]:
t.create(fe, "one_hot_encoder", None, 'df', None, source_df, input_cols=["id"])


Creating test_one_hot_encoder() test function...
INFO:optimus:test_one_hot_encoder()
Viewing 3 of 3 rows / 5 columns
1 partition(s)
id
1 (bigint)
nullable
x
2 (bigint)
nullable
y
3 (bigint)
nullable
features
4 (vector)
nullable
id***ONE_HOT_ENCODER
5 (vector)
nullable
0
1
2
[1.0,0.5,-1.0]
(2,[0],[1.0])
1
2
3
[2.0,1.0,1.0]
(2,[1],[1.0])
2
3
4
[4.0,10.0,2.0]
(2,[],[])
Viewing 3 of 3 rows / 5 columns
1 partition(s)

In [37]:
t.run()


Creating file ../test_df_ml_2.py
Done

In [ ]: