We use the part of the instacart data that you can find here https://www.instacart.com/datasets/grocery-shopping-2017
Specically order_products__prior.csv a 4 columns, 33.2 Million rows csv file.
Before 2.2.10 It took 355.58 seconds to process all the data set in a Windows 10, Instacart data
After 2.2.10 It took 78 sec. infer== False
In [1]:
%load_ext autoreload
%autoreload 2
In [2]:
import sys
sys.path.append("..")
In [3]:
# Create optimus
from optimus import Optimus
op = Optimus(master="local[*]", app_name = "optimus" ,verbose =True, checkpoint= True)
C:\Users\argenisleon\Anaconda3\lib\site-packages\socks.py:58: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
from collections import Callable
You are using PySparkling of version 2.4.10, but your PySpark is of
version 2.3.1. Please make sure Spark and PySparkling versions are compatible.
INFO:optimus:Operative System:Windows
INFO:optimus:Just check that Spark and all necessary environments vars are present...
INFO:optimus:-----
INFO:optimus:SPARK_HOME=C:\opt\spark\spark-2.3.1-bin-hadoop2.7
INFO:optimus:HADOOP_HOME=C:\opt\hadoop-2.7.7
INFO:optimus:PYSPARK_PYTHON=C:\Users\argenisleon\Anaconda3\python.exe
INFO:optimus:PYSPARK_DRIVER_PYTHON=jupyter
INFO:optimus:PYSPARK_SUBMIT_ARGS=--jars "file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/RedshiftJDBC42-1.2.16.1027.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/mysql-connector-java-8.0.16.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/ojdbc8.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/postgresql-42.2.5.jar" --driver-class-path "C:/Users/argenisleon/Documents/Optimus/optimus/jars/RedshiftJDBC42-1.2.16.1027.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/mysql-connector-java-8.0.16.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/ojdbc8.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/postgresql-42.2.5.jar" --conf "spark.sql.catalogImplementation=hive" pyspark-shell
INFO:optimus:JAVA_HOME=C:\java
INFO:optimus:Pyarrow Installed
INFO:optimus:-----
INFO:optimus:Starting or getting SparkSession and SparkContext...
INFO:optimus:Spark Version:2.3.1
INFO:optimus:Setting checkpoint folder local. If you are in a cluster initialize Optimus with master='your_ip' as param
INFO:optimus:Deleting previous folder if exists...
INFO:optimus:Creating the checkpoint directory...
INFO:optimus:
____ __ _
/ __ \____ / /_(_)___ ___ __ _______
/ / / / __ \/ __/ / __ `__ \/ / / / ___/
/ /_/ / /_/ / /_/ / / / / / / /_/ (__ )
\____/ .___/\__/_/_/ /_/ /_/\__,_/____/
/_/
INFO:optimus:Transform and Roll out...
INFO:optimus:Optimus successfully imported. Have fun :).
INFO:optimus:Config.ini not found
In [4]:
df = op.load.csv("C:\\Users\\argenisleon\\Desktop\\order_products__prior.csv")
In [5]:
df.table()
Viewing 10 of 32.4 million rows / 4 columns
8 partition(s)
order_id
1 (int)
nullable
product_id
2 (int)
nullable
add_to_cart_order
3 (int)
nullable
reordered
4 (int)
nullable
2
33120
1
1
2
28985
2
1
2
9327
3
0
2
45918
4
1
2
30035
5
0
2
17794
6
1
2
40141
7
1
2
1819
8
1
2
43668
9
0
3
33754
1
1
Viewing 10 of 32.4 million rows / 4 columns
8 partition(s)
In [7]:
%%time
df.groupBy("order_id").count().sort("count",ascending=False).show()
+--------+-----+
|order_id|count|
+--------+-----+
| 1564244| 145|
| 790903| 137|
| 61355| 127|
| 2970392| 121|
| 2069920| 116|
| 3308010| 115|
| 2753324| 114|
| 2499774| 112|
| 2621625| 109|
| 77151| 109|
| 2136777| 108|
| 1657096| 108|
| 2648316| 105|
| 1892299| 104|
| 171934| 104|
| 653887| 102|
| 1384519| 102|
| 1867980| 102|
| 3282039| 101|
| 3052353| 101|
+--------+-----+
only showing top 20 rows
Wall time: 15.9 s
In [3]:
%%time
df.cols.frequency("order_id")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
<timed eval> in <module>
NameError: name 'df' is not defined
In [7]:
%%time
op.profiler.to_json(df, "order_id", infer=False, relative_error=1)
INFO:optimus:Procesing General Stats...
INFO:optimus:general_stats() executed in 37.12 sec
INFO:optimus:Using 'column_exp' to process column 'order_id' with function _cast_to
INFO:optimus:cast_columns() executed in 0.01 sec
INFO:optimus:Processing Frequency ...
INFO:optimus:dataset_info() executed in 0.07 sec
Wall time: 2min 43s
Out[7]:
'{"columns": {"order_id": {"stats": {"count_uniques_agg": 3025302, "min": 2, "max": 3421083, "stddev": 987300.69645, "kurtosis": -1.19913, "mean": 1710748.51894, "skewness": -0.00049, "sum": 55487254019416, "variance": 974762665216.534, "zeros": 0, "count_na": 0, "hist": [{"count": 3239832.0, "lower": 2.0, "upper": 342110.1}, {"count": 3243756.0, "lower": 342110.1, "upper": 684218.2}, {"count": 3237942.0, "lower": 684218.2, "upper": 1026326.3}, {"count": 3243055.0, "lower": 1026326.3, "upper": 1368434.4}, {"count": 3247818.0, "lower": 1368434.4, "upper": 1710542.5}, {"count": 3249098.0, "lower": 1710542.5, "upper": 2052650.6}, {"count": 3243017.0, "lower": 2052650.6, "upper": 2394758.7}, {"count": 3248231.0, "lower": 2394758.7, "upper": 2736866.8}, {"count": 3241887.0, "lower": 2736866.8, "upper": 3078974.9}, {"count": 3239843.0, "lower": 3078974.9, "upper": 3421083.0}], "percentile": {"0.75": 2, "0.95": 2, "0.05": 2, "0.25": 2, "0.5": 2}, "missing_count": 0, "p_missing": 0.0, "range": 3421081, "median": 2, "interquartile_range": 0, "coef_variation": 0.57712, "mad": 0}, "frequency": [{"value": 1564244, "count": 145}, {"value": 790903, "count": 137}, {"value": 61355, "count": 127}, {"value": 2970392, "count": 121}, {"value": 2069920, "count": 116}, {"value": 3308010, "count": 115}, {"value": 2753324, "count": 114}, {"value": 2499774, "count": 112}, {"value": 77151, "count": 109}, {"value": 2621625, "count": 109}], "name": "order_id", "column_dtype": "int", "dtypes_stats": {"string": 0, "bool": 0, "int": 32434489, "float": 0, "double": 0, "date": 0, "array": 0, "null": 0, "missing": 0}, "column_type": "numeric"}}, "name": null, "rows_count": "32.4 million", "count_types": {"numeric": 1, "categorical": 0, "null": 0, "array": 0, "date": 0, "bool": 0}, "size": "262.3 MB", "summary": {"cols_count": 4, "rows_count": 32434489, "missing_count": "0.0%", "size": "262.1 MB"}, "sample": {"columns": [{"title": "order_id"}, {"title": "product_id"}, {"title": "add_to_cart_order"}, {"title": "reordered"}], "value": [[934811], [1149543], [1156809], [1320736], [1343186], [1565712], [1968204], [2450020], [2508067], [2830472]]}}'
In [19]:
a = df.limit(10) 1:46 2:25
In [13]:
%%time
df.cols.frequency("order_id")
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<timed eval> in <module>
~\Documents\Optimus\optimus\helpers\decorators.py in wrapper(*args, **kwargs)
47 def wrapper(*args, **kwargs):
48 start_time = timeit.default_timer()
---> 49 f = func(*args, **kwargs)
50 _time = round(timeit.default_timer() - start_time, 2)
51 if log_time:
~\Documents\Optimus\optimus\dataframe\columns.py in frequency(columns, n)
1546 result = {}
1547 for f in freq.collect():
-> 1548 result[f[0]] = [{"value": kv[0], "count": kv[1]} for kv in f[1]]
1549
1550 return result
TypeError: 'int' object is not iterable
In [24]:
from optimus import Profiler
p = Profiler()
p.run(a,"order_id")
INFO:optimus:Config.ini not found
INFO:optimus:general_stats() executed in 23.99 sec
INFO:optimus:Processing column 'order_id'...
INFO:optimus:Using 'column_exp' to process column 'order_id' with function _cast_to
INFO:optimus:cast_columns() executed in 0.02 sec
INFO:optimus:------------------------------
INFO:optimus:Processing column 'order_id'...
INFO:optimus:frequency() executed in 1.39 sec
INFO:optimus:dataset_info() executed in 0.08 sec
---------------------------------------------------------------------------
TemplateNotFound Traceback (most recent call last)
<ipython-input-24-c1ed1a1b51c2> in <module>
1 from optimus import Profiler
2 p = Profiler()
----> 3 p.run(a,"order_id")
~\Documents\Optimus\optimus\helpers\decorators.py in timed(*args, **kw)
26 def timed(*args, **kw):
27 start_time = timeit.default_timer()
---> 28 f = method(*args, **kw)
29 _time = round(timeit.default_timer() - start_time, 2)
30 logger.print("{name}() executed in {time} sec".format(name=method.__name__, time=_time))
~\Documents\Optimus\optimus\profiler\profiler.py in run(self, df, columns, buckets, infer, relative_error, approx_count)
199 template_env = jinja2.Environment(loader=template_loader, autoescape=True)
200
--> 201
202 # Render template
203 # Create the profiler info header
~\Anaconda3\lib\site-packages\jinja2\environment.py in get_template(self, name, parent, globals)
828 if parent is not None:
829 name = self.join_path(name, parent)
--> 830 return self._load_template(name, self.make_globals(globals))
831
832 @internalcode
~\Anaconda3\lib\site-packages\jinja2\environment.py in _load_template(self, name, globals)
802 template.is_up_to_date):
803 return template
--> 804 template = self.loader.load(self, name, globals)
805 if self.cache is not None:
806 self.cache[cache_key] = template
~\Anaconda3\lib\site-packages\jinja2\loaders.py in load(self, environment, name, globals)
111 # first we try to get the source for this template together
112 # with the filename and the uptodate function.
--> 113 source, filename, uptodate = self.get_source(environment, name)
114
115 # try to load the code from the bytecode cache if there is a
~\Anaconda3\lib\site-packages\jinja2\loaders.py in get_source(self, environment, template)
185 return False
186 return contents, filename, uptodate
--> 187 raise TemplateNotFound(template)
188
189 def list_templates(self):
TemplateNotFound: /out/general_info.html
In [22]:
op.profiler.run(a, "order_id", infer=True, relative_error=1)
INFO:optimus:general_stats() executed in 23.64 sec
INFO:optimus:Processing column 'order_id'...
INFO:optimus:Using 'column_exp' to process column 'order_id' with function _cast_to
INFO:optimus:cast_columns() executed in 0.02 sec
INFO:optimus:------------------------------
INFO:optimus:Processing column 'order_id'...
INFO:optimus:frequency() executed in 1.27 sec
INFO:optimus:dataset_info() executed in 0.09 sec
---------------------------------------------------------------------------
TemplateNotFound Traceback (most recent call last)
<ipython-input-22-4f64cac065ea> in <module>
----> 1 op.profiler.run(a, "order_id", infer=True, relative_error=1)
~\Documents\Optimus\optimus\helpers\decorators.py in timed(*args, **kw)
26 def timed(*args, **kw):
27 start_time = timeit.default_timer()
---> 28 f = method(*args, **kw)
29 _time = round(timeit.default_timer() - start_time, 2)
30 logger.print("{name}() executed in {time} sec".format(name=method.__name__, time=_time))
~\Documents\Optimus\optimus\profiler\profiler.py in run(self, df, columns, buckets, infer, relative_error, approx_count)
199 # Render template
200 # Create the profiler info header
--> 201 html = ""
202 general_template = template_env.get_template("general_info.html")
203 html = html + general_template.render(data=output)
~\Anaconda3\lib\site-packages\jinja2\environment.py in get_template(self, name, parent, globals)
828 if parent is not None:
829 name = self.join_path(name, parent)
--> 830 return self._load_template(name, self.make_globals(globals))
831
832 @internalcode
~\Anaconda3\lib\site-packages\jinja2\environment.py in _load_template(self, name, globals)
802 template.is_up_to_date):
803 return template
--> 804 template = self.loader.load(self, name, globals)
805 if self.cache is not None:
806 self.cache[cache_key] = template
~\Anaconda3\lib\site-packages\jinja2\loaders.py in load(self, environment, name, globals)
111 # first we try to get the source for this template together
112 # with the filename and the uptodate function.
--> 113 source, filename, uptodate = self.get_source(environment, name)
114
115 # try to load the code from the bytecode cache if there is a
~\Anaconda3\lib\site-packages\jinja2\loaders.py in get_source(self, environment, template)
185 return False
186 return contents, filename, uptodate
--> 187 raise TemplateNotFound(template)
188
189 def list_templates(self):
TemplateNotFound: /out/general_info.html
In [1]:
df.groupBy("order_id").count().sort("count",ascending=False)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
<ipython-input-1-86d4d0fdcf73> in <module>
----> 1 df.groupBy("order_id").count().sort("count",ascending=False)
NameError: name 'df' is not defined
In [ ]:
Content source: ironmussa/Optimus
Similar notebooks: