mCerebrum is not the only way to collect and load data into Cerebral Cortex. It is possible to import your own structured datasets into the platform. This example will demonstrate how to load existing data and subsequently how to read it back from Cerebral Cortex through the same mechanisms you have been utilizing. Additionally, it demonstrates how to write a custom data transformation fuction to manipulate data and produce a smoothed result which can then be visualized.
In [ ]:
%reload_ext autoreload
from util.dependencies import *
from settings import USER_ID
Cerebral Cortex provides a set of predefined data import routines that fit typical use cases. The most common is CSV data parser, csv_data_parser. These parsers are easy to write and can be extended to support most types of data. Additionally, the data importer, import_data, needs to be brought into this notebook so that we can start the data import process.
The import_data method requires several parameters that are discussed below.
cc_config: The path to the configuration files for Cerebral Cortex; this is the same folder that you would utilize for the Kernel initializationinput_data_dir: The path to where the data to be imported is located; in this example, sample_data is available in the file/folder browser on the left and you should explore the files located inside of ituser_id: The universally unique identifier (UUID) that owns the data to be imported into the systemdata_file_extension: The type of files to be considered for importdata_parser: The import method or another that defines how to interpret the data samples on a per-line basisgen_report: A simple True/False value that controls if a report is printed to the screen when complete
In [ ]:
from cerebralcortex.data_importer.data_parsers import csv_data_parser
from cerebralcortex.data_importer import import_dir
import_dir(
cc_config="/home/md2k/cc_conf/",
input_data_dir="sample_data/",
user_id=USER_ID,
data_file_extension=[".csv"],
data_parser=csv_data_parser,
gen_report=True
)
In [ ]:
CC = Kernel("/home/md2k/cc_conf/")
In [ ]:
iot_stream = CC.get_stream("iot-data-stream")
# Data
iot_stream.show(truncate=False)
# Metadata
iot_stream.metadata
In [ ]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType, IntegerType
from pyspark.sql.functions import minute, second, mean, window
from pyspark.sql import functions as F
import numpy as np
In [ ]:
schema = StructType([
StructField("smooth_vals", FloatType())
])
The user-defined function (UDF) is one of two mechanisms available for distributed data processing within the Apache Spark framework.
The F.udf Python decorator assigns the recently defined schema as a return type of the udf method. The method, smooth_algo, accepts a list of values, vals, and any python-based operations can be run over this data window to produce the result defined in the schema. In this case, we are computing a simple windowed average.
In [ ]:
@F.udf(schema)
def smooth_algo(vals):
return [sum(vals)/len(vals)]
The smoothing algorithm is applied to the datastream by calling the run_algorithm method and passing the method as a parameter along with which columns, some_vals, that should be sent. Finally, the windowDuration parameter specified the size of the time windows on which to segment the data before applying the algorithm. Notice that when the next cell is run, the operation completes nearly instantaneously. This is due to the lazy evaluation aspects of the Spark framework. When you run the next cell to show the data, the algorithm will be applied to the whole dataset before displaying the results on the screen.
In [ ]:
smooth_stream = iot_stream.run_algorithm(smooth_algo, columnNames=["some_vals"], windowDuration=10)
In [ ]:
smooth_stream.show(truncate=False)
In [ ]:
iot_stream.plot()
In [ ]:
smooth_stream.plot()