In [1]:
# Make sure that Python starts in Workflow-folder or else the modules will be screewed up!
import sys, os, getpass
from datetime import datetime
from py4j import protocol

module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)
print(module_path)

user = getpass.getuser()

if user == "sidsel":
    parquet_path = "/home/sidsel/workspace/sparkdata/parquet"
elif user == "svanhmic":
    parquet_path = "/home/svanhmic/workspace/data/DABAI/sparkdata/parquet"
    
# Start the logger.
import logging
logger_tester = logging.getLogger(__name__)
logger_tester.setLevel(logging.INFO)
logger_file_handler_param = logging.FileHandler('/tmp/'+datetime.now().strftime('workflow_test_%d_%m_%Y.log'))
logger_formatter_param = logging.Formatter('%(asctime)s:%(levelname)s:%(name)s:%(message)s')

logger_tester.addHandler(logger_file_handler_param)
logger_file_handler_param.setFormatter(logger_formatter_param)


/home/svanhmic/workspace/DABAI/Workflows

In [2]:
from shared.Extension_to_timeit import pretty_time_result

In [3]:
#print(params.output_parameters(parameters))
test_params_1 = {'tol': 0.001, 'k': 8, 'maxIter': 10, 'algorithm': 'GaussianMixture', 'seed': 1080866016001745000}

In [4]:
print(sc.defaultMinPartitions)
print(sc.defaultParallelism)
conf = sc.getConf()
conf.getAll()


2
4
Out[4]:
[('spark.submit.pyFiles',
  '/home/svanhmic/.ivy2/jars/graphframes_graphframes-0.5.0-spark2.0-s_2.11.jar,/home/svanhmic/.ivy2/jars/com.typesafe.scala-logging_scala-logging-api_2.11-2.1.2.jar,/home/svanhmic/.ivy2/jars/com.typesafe.scala-logging_scala-logging-slf4j_2.11-2.1.2.jar,/home/svanhmic/.ivy2/jars/org.scala-lang_scala-reflect-2.11.0.jar,/home/svanhmic/.ivy2/jars/org.slf4j_slf4j-api-1.7.7.jar'),
 ('spark.driver.memory', '12g'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1504776524676'),
 ('spark.driver.host', '10.52.1.5'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.port', '39407'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.jars',
  'file:/usr/local/share/rand_jars/graphframes-0.5.0-spark2.0-s_2.11.jar,file:/home/svanhmic/.ivy2/jars/graphframes_graphframes-0.5.0-spark2.0-s_2.11.jar,file:/home/svanhmic/.ivy2/jars/com.typesafe.scala-logging_scala-logging-api_2.11-2.1.2.jar,file:/home/svanhmic/.ivy2/jars/com.typesafe.scala-logging_scala-logging-slf4j_2.11-2.1.2.jar,file:/home/svanhmic/.ivy2/jars/org.scala-lang_scala-reflect-2.11.0.jar,file:/home/svanhmic/.ivy2/jars/org.slf4j_slf4j-api-1.7.7.jar'),
 ('spark.files',
  'file:/home/svanhmic/.ivy2/jars/graphframes_graphframes-0.5.0-spark2.0-s_2.11.jar,file:/home/svanhmic/.ivy2/jars/com.typesafe.scala-logging_scala-logging-api_2.11-2.1.2.jar,file:/home/svanhmic/.ivy2/jars/com.typesafe.scala-logging_scala-logging-slf4j_2.11-2.1.2.jar,file:/home/svanhmic/.ivy2/jars/org.scala-lang_scala-reflect-2.11.0.jar,file:/home/svanhmic/.ivy2/jars/org.slf4j_slf4j-api-1.7.7.jar')]

In [21]:
from cleaning.ExecuteCleaningWorkflow import ExecuteWorkflow

execution_model = ExecuteWorkflow(dict_params=test_params_1
                                ,cols_features=['a','b']
                                ,cols_labels=['id','k','dimension'])

n_samples = [1000,]# 10000, 1000000, 10000000]
n_partitions = [80, 200, 400, 600, 800, 1000]
collection_of_data = [parquet_path+'/normal_cluster_n_'+str(i)+'.parquet' for i in n_samples]
collection_of_model = []
#counts = [i.rdd.getNumPartitions() for i in collection_of_data]
#counts

In [22]:
#Run this bad boy!
for jdx, partition in enumerate(n_partitions):
    try:
        for idx, data_path in enumerate(collection_of_data):

            # Strings 
            str_1 = 'Iteration: {} - Number of partions: {}'
            str_2 = 'Iteration: {} - Training model time: {!s}'
            str_3 = 'Iteration: {} - Transforming model time {!s}'
            
            df_data = (spark.
                       read.
                       parquet(data_path).
                       repartition(partition)
                       )

            iteration = idx+len(collection_of_data)*jdx
            logger_tester.info(
                str_1.format(iteration, df_data.rdd.getNumPartitions()))
            
            model_timer = %timeit -r7 -o collection_of_model.append(execution_model.execute_pipeline(df_data))
            transformer_timer = %timeit -o execution_model.apply_model(collection_of_model[iteration],df_data)
            collection_of_model = collection_of_model[:iteration+1]
            
            logger_tester.info(
                str_2.format(iteration,pretty_time_result(model_timer)))
            logger_tester.info(
                str_3.format(iteration,pretty_time_result(transformer_timer)))
    except protocol.Py4JError as error:
        ex_type, ex, tb = sys.exc_info()
        logger_tester.warning('Failed with traceback'+ str(error.with_traceback(tb)))
        continue


3.21 s ± 61.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
78.7 ms ± 5.16 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
7.43 s ± 328 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
85.7 ms ± 7.98 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
14.9 s ± 239 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
79.6 ms ± 6.54 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
29.5 s ± 5.93 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
98.1 ms ± 6.88 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
32.5 s ± 773 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
87.1 ms ± 4.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
46.2 s ± 5.68 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
93.2 ms ± 8.35 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [18]:
for i in collection_of_data:
    print(i)
    df_data = (spark
               .read
               .parquet(i)
               .repartition(partition)
              )
    print(df_data.count())
#for data in collection_of_data[0]:
#    print(data)


/home/svanhmic/workspace/data/DABAI/sparkdata/parquet/normal_cluster_n_1000.parquet
1000
/home/svanhmic/workspace/data/DABAI/sparkdata/parquet/normal_cluster_n_10000.parquet
10000
/home/svanhmic/workspace/data/DABAI/sparkdata/parquet/normal_cluster_n_1000000.parquet
1000000

In [ ]:
for i in samples:
    means = create_dummy_data.create_means(dim, k, 10)  # [[0, 0, 0], [3, 3, 3], [-3, 3, -3], [5, -5, 5]]
    stds = create_dummy_data.create_stds(dim, k)  # [[1, 1, 1], [1, 1, 1], [1, 1, 1], [1, 1, 1]]
    n_samples = create_dummy_data.create_partition_samples(i, k)  # [1000, 10000, 4000, 50]
    print(n_samples)
    df = create_dummy_data.create_normal_cluster_data_spark(dim, n_samples, means, stds)
    #df.show(100)
    df.write.parquet('/user/micsas/data/parquet/normal_cluster_n_'+str(i)+'.parquet', mode='overwrite')