In [1]:
%load_ext autoreload
%autoreload 2

import sys

# optimus
sys.path.append("..")

# pypika
sys.path.append("../../pypika")

In [2]:
from pypika import Query, Table, Field, analytics as an
from pypika.dialects import MySQLQuery as Query

In [3]:
agg = [an.Min('RATING'),an.Max('RATING')]
q = Query.from_('main.music').select(*agg)
print(q.get_sql())


SELECT MIN('RATING'),MAX('RATING') FROM `main.music`

In [4]:
"SELECT MIN(RATING) FROM main.music"


Out[4]:
'SELECT MIN(RATING) FROM main.music'

In [4]:
from pypika import functions as fn
from pypika import Tables, MySQLQuery, Interval

fruits, consumers = Tables('fruits', 'consumers')
q = MySQLQuery.from_(fruits) \
    .join(consumers) \
    .on(fruits.consumer_id == consumers.id) \
    .select(fruits.id, fruits.name) \
    .where((fruits.harvest_date + Interval(days=1)) < fn.Now())
print(q.get_sql())


SELECT `fruits`.`id`,`fruits`.`name` FROM `fruits` JOIN `consumers` ON `fruits`.`consumer_id`=`consumers`.`id` WHERE `fruits`.`harvest_date`+INTERVAL 1 DAY<NOW()

In [3]:
from optimus import Optimus
op= Optimus(master="local", app_name= "optimus", verbose = True)


C:\Users\argenisleon\Anaconda3\lib\site-packages\socks.py:58: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
  from collections import Callable

    You are using PySparkling of version 2.4.10, but your PySpark is of
    version 2.3.1. Please make sure Spark and PySparkling versions are compatible. 
INFO:optimus:Operative System:Windows
INFO:optimus:Just check that Spark and all necessary environments vars are present...
INFO:optimus:-----
INFO:optimus:SPARK_HOME=C:\opt\spark\spark-2.3.1-bin-hadoop2.7
INFO:optimus:HADOOP_HOME=C:\opt\hadoop-2.7.7
INFO:optimus:PYSPARK_PYTHON=C:\Users\argenisleon\Anaconda3\python.exe
INFO:optimus:PYSPARK_DRIVER_PYTHON=jupyter
INFO:optimus:PYSPARK_SUBMIT_ARGS=--jars "file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/RedshiftJDBC42-1.2.16.1027.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/mysql-connector-java-8.0.16.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/ojdbc8.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/postgresql-42.2.5.jar" --driver-class-path "C:/Users/argenisleon/Documents/Optimus/optimus/jars/RedshiftJDBC42-1.2.16.1027.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/mysql-connector-java-8.0.16.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/ojdbc8.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/postgresql-42.2.5.jar" --conf "spark.sql.catalogImplementation=hive" pyspark-shell
INFO:optimus:JAVA_HOME=C:\java
INFO:optimus:Pyarrow Installed
INFO:optimus:-----
INFO:optimus:Starting or getting SparkSession and SparkContext...
INFO:optimus:Spark Version:2.3.1
INFO:optimus:
                             ____        __  _                     
                            / __ \____  / /_(_)___ ___  __  _______
                           / / / / __ \/ __/ / __ `__ \/ / / / ___/
                          / /_/ / /_/ / /_/ / / / / / / /_/ (__  ) 
                          \____/ .___/\__/_/_/ /_/ /_/\__,_/____/  
                              /_/                                  
                              
INFO:optimus:Transform and Roll out...
INFO:optimus:Optimus successfully imported. Have fun :).
INFO:optimus:Config.ini not found

In [6]:
df = op.load.csv("data/Meteorite_Landings.csv").h_repartition()

In [11]:
df.show()


+--------------------+-----+--------+--------+--------+-----+--------------------+----------+----------+--------------------+
|                name|   id|nametype|recclass|mass (g)| fall|                year|    reclat|   reclong|         GeoLocation|
+--------------------+-----+--------+--------+--------+-----+--------------------+----------+----------+--------------------+
|Dominion Range 08357|52132|   Valid|      L6|     8.9|Found|01/01/2008 12:00:...|       0.0|       0.0|(0.000000, 0.000000)|
|       Yamato 792863|28212|   Valid|      H5|  132.25|Found|01/01/1979 12:00:...|     -71.5|  35.66667|(-71.500000, 35.6...|
|           Acfer 232|  240|   Valid|      H5|   725.0|Found|01/01/1991 12:00:...|  27.73944|   4.32833|(27.739440, 4.328...|
|Jiddat al Harasis...|56470|   Valid|      L5|    17.5|Found|                null|  19.83528|  56.46139|(19.835280, 56.46...|
|               Imlay|52855|   Valid|      L5|   770.0|Found|01/01/2009 12:00:...|  40.74018|-118.17285|(40.740180, -118....|
|           Shişr 029|23564|   Valid|      H5|  204.71|Found|01/01/1998 12:00:...|   18.1501|  53.80047|(18.150100, 53.80...|
|Northwest Africa ...|31272|   Valid| Eucrite|   138.0|Found|01/01/2004 12:00:...|      null|      null|                null|
|         Dhofar 1462|55271|   Valid|     L~4|   400.8|Found|01/01/2008 12:00:...|  18.34955|  54.25013|(18.349550, 54.25...|
|          Dhofar 085| 6784|   Valid|      H4|    78.0|Found|01/01/1999 12:00:...|  19.21233|   54.8475|(19.212330, 54.84...|
|Grove Mountains 0...|50563|   Valid|      L6|     1.8|Found|01/01/2006 12:00:...| -72.78194|  75.30056|(-72.781940, 75.3...|
|Grove Mountains 0...|46973|   Valid|      H4|    1.55|Found|01/01/2006 12:00:...| -72.77833|  75.32167|(-72.778330, 75.3...|
|Queen Alexandra R...|20352|   Valid|      H6|     5.9|Found|01/01/1994 12:00:...|     -84.0|     168.0|(-84.000000, 168....|
|Grove Mountains 0...|48024|   Valid|      H5|    1.01|Found|01/01/2003 12:00:...|-72.773333| 75.326944|(-72.773333, 75.3...|
|LaPaz Icefield 03...|34826|   Valid|      L5|    28.2|Found|01/01/2003 12:00:...|      null|      null|                null|
|Northwest Africa ...|31200|   Valid|    L5/6|  1200.0|Found|01/01/2002 12:00:...|      null|      null|                null|
|Elephant Moraine ...| 9762|   Valid|      H5|    11.2|Found|01/01/1996 12:00:...| -76.18333| 157.16667|(-76.183330, 157....|
| Miller Range 090027|53340|   Valid|     LL5|   981.2|Found|01/01/2009 12:00:...|       0.0|       0.0|(0.000000, 0.000000)|
| Miller Range 090895|54418|   Valid|      H5|     5.3|Found|01/01/2009 12:00:...|       0.0|       0.0|(0.000000, 0.000000)|
|        San Juan 043|52381|   Valid|      H5|    26.4|Found|01/01/2009 12:00:...|-25.443167|  -69.8825|(-25.443167, -69....|
|  Miller Range 05057|44456|   Valid|     LL6|   472.2|Found|01/01/2005 12:00:...|      null|      null|                null|
+--------------------+-----+--------+--------+--------+-----+--------------------+----------+----------+--------------------+
only showing top 20 rows


In [40]:
from pypika import Tables, MySQLQuery, Interval

df.set_name("df")

agg = [an.Min('id').as_('a'),an.Max('id')]

q = MySQLQuery.from_("df").select(*agg) 


def clean(c):
    return q.get_sql().replace("'","`")

df.query(clean(q)).show()


+---+-------+
|  a|max(id)|
+---+-------+
|  1|  57458|
+---+-------+


In [45]:
df.cols.min_sql("id")


MIN('id')
SELECT MIN(`id`) `Min_id` FROM `df`
+------+
|Min_id|
+------+
|     1|
+------+

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-45-7de2afff22f7> in <module>
----> 1 df.cols.min_sql("id")

~\Documents\Optimus\optimus\helpers\decorators.py in wrapper(*args, **kwargs)
     47         def wrapper(*args, **kwargs):
     48             start_time = timeit.default_timer()
---> 49             f = func(*args, **kwargs)
     50             _time = round(timeit.default_timer() - start_time, 2)
     51             if log_time:

~\Documents\Optimus\optimus\dataframe\columns.py in min_sql(columns)
    578         :return:
    579         """
--> 580         return agg_exprs(columns, an.Min)
    581 
    582 

~\Documents\Optimus\optimus\dataframe\columns.py in agg_exprs(columns, funcs, tidy, *args)
    533         :return:
    534         """
--> 535         return exec_agg(create_exprs(columns, funcs, *args), tidy)
    536 
    537     @add_attr(cols)

~\Documents\Optimus\optimus\dataframe\columns.py in exec_agg(exprs, tidy)
    553         self.query(exprs).show()
    554 
--> 555         df = self.agg(*exprs)
    556 
    557         result = parse_col_names_funcs_to_keys(df.to_json())

~\Anaconda3\lib\site-packages\pyspark\sql\dataframe.py in agg(self, *exprs)
   1325         [Row(min(age)=2)]
   1326         """
-> 1327         return self.groupBy().agg(*exprs)
   1328 
   1329     @since(2.0)

~\Anaconda3\lib\site-packages\pyspark\sql\group.py in agg(self, *exprs)
     89         else:
     90             # Columns
---> 91             assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"
     92             jdf = self._jgd.agg(exprs[0]._jc,
     93                                 _to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]]))

AssertionError: all exprs should be Column

In [93]:
df.cols.max("id")


Out[93]:
57458

In [95]:
df.cols.dtypes()


Out[95]:
{'name': 'string',
 'id': 'int',
 'nametype': 'string',
 'recclass': 'string',
 'mass (g)': 'double',
 'fall': 'string',
 'year': 'string',
 'reclat': 'double',
 'reclong': 'double',
 'GeoLocation': 'string'}

In [96]:
df.cols.count_zeros(["id","mass (g)"])


Out[96]:
{'id': 0, 'mass (g)': 19}

In [111]:
df.cols.count_uniques(["id","mass (g)"])


Out[111]:
{'id': 42365, 'mass (g)': 12497}

In [116]:
df.query("SELECT COUNT(*), recclass from df GROUP BY recclass ORDER BY COUNT(*) DESC").show()


+--------+-----------+
|count(1)|   recclass|
+--------+-----------+
|    8285|         L6|
|    7142|         H5|
|    4796|         L5|
|    4528|         H6|
|    4211|         H4|
|    2766|        LL5|
|    2043|        LL6|
|    1253|         L4|
|     428|       H4/5|
|     416|        CM2|
|     386|         H3|
|     365|         L3|
|     335|        CO3|
|     300|   Ureilite|
|     285|Iron, IIIAB|
|     268|        LL4|
|     256|        CV3|
|     241|  Diogenite|
|     240|  Howardite|
|     225|         LL|
+--------+-----------+
only showing top 20 rows


In [ ]: