In [1]:
# Make sure that Python starts in Workflow-folder or else the modules will be screewed up!
import sys, os, getpass
from datetime import datetime
from py4j import protocol
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
sys.path.append(module_path)
print(module_path)
user = getpass.getuser()
if user == "sidsel":
parquet_path = "/home/sidsel/workspace/sparkdata/parquet"
elif user == "svanhmic":
parquet_path = "/home/svanhmic/workspace/data/DABAI/sparkdata/parquet"
# Start the logger.
import logging
logger_tester = logging.getLogger(__name__)
logger_tester.setLevel(logging.INFO)
logger_file_handler_param = logging.FileHandler('/tmp/'+datetime.now().strftime('workflow_test_%d_%m_%Y.log'))
logger_formatter_param = logging.Formatter('%(asctime)s:%(levelname)s:%(name)s:%(message)s')
logger_tester.addHandler(logger_file_handler_param)
logger_file_handler_param.setFormatter(logger_formatter_param)
In [2]:
from shared.Extension_to_timeit import pretty_time_result
In [3]:
#print(params.output_parameters(parameters))
test_params_1 = {'tol': 0.001, 'k': 8, 'maxIter': 10, 'algorithm': 'GaussianMixture', 'seed': 1080866016001745000}
In [4]:
print(sc.defaultMinPartitions)
print(sc.defaultParallelism)
conf = sc.getConf()
conf.getAll()
Out[4]:
In [21]:
from cleaning.ExecuteCleaningWorkflow import ExecuteWorkflow
execution_model = ExecuteWorkflow(dict_params=test_params_1
,cols_features=['a','b']
,cols_labels=['id','k','dimension'])
n_samples = [1000,]# 10000, 1000000, 10000000]
n_partitions = [80, 200, 400, 600, 800, 1000]
collection_of_data = [parquet_path+'/normal_cluster_n_'+str(i)+'.parquet' for i in n_samples]
collection_of_model = []
#counts = [i.rdd.getNumPartitions() for i in collection_of_data]
#counts
In [22]:
#Run this bad boy!
for jdx, partition in enumerate(n_partitions):
try:
for idx, data_path in enumerate(collection_of_data):
# Strings
str_1 = 'Iteration: {} - Number of partions: {}'
str_2 = 'Iteration: {} - Training model time: {!s}'
str_3 = 'Iteration: {} - Transforming model time {!s}'
df_data = (spark.
read.
parquet(data_path).
repartition(partition)
)
iteration = idx+len(collection_of_data)*jdx
logger_tester.info(
str_1.format(iteration, df_data.rdd.getNumPartitions()))
model_timer = %timeit -r7 -o collection_of_model.append(execution_model.execute_pipeline(df_data))
transformer_timer = %timeit -o execution_model.apply_model(collection_of_model[iteration],df_data)
collection_of_model = collection_of_model[:iteration+1]
logger_tester.info(
str_2.format(iteration,pretty_time_result(model_timer)))
logger_tester.info(
str_3.format(iteration,pretty_time_result(transformer_timer)))
except protocol.Py4JError as error:
ex_type, ex, tb = sys.exc_info()
logger_tester.warning('Failed with traceback'+ str(error.with_traceback(tb)))
continue
In [18]:
for i in collection_of_data:
print(i)
df_data = (spark
.read
.parquet(i)
.repartition(partition)
)
print(df_data.count())
#for data in collection_of_data[0]:
# print(data)
In [ ]:
for i in samples:
means = create_dummy_data.create_means(dim, k, 10) # [[0, 0, 0], [3, 3, 3], [-3, 3, -3], [5, -5, 5]]
stds = create_dummy_data.create_stds(dim, k) # [[1, 1, 1], [1, 1, 1], [1, 1, 1], [1, 1, 1]]
n_samples = create_dummy_data.create_partition_samples(i, k) # [1000, 10000, 4000, 50]
print(n_samples)
df = create_dummy_data.create_normal_cluster_data_spark(dim, n_samples, means, stds)
#df.show(100)
df.write.parquet('/user/micsas/data/parquet/normal_cluster_n_'+str(i)+'.parquet', mode='overwrite')