A big data system is rather useless if you cannot read data. Karps lets you read from all the sources supported by Spark, with some additional benefits.
One main difference compared to Spark is the schema of the data is strictly checked. How does it work when one does not know the schema in advance, for example when reading a JSON file? Karps provides some facilities for that.
In [1]:
:load KarpsDisplays KarpsDagDisplay
:extension OverloadedStrings
import KarpsDisplays(showGraph)
import Spark.Core.Dataset
import Spark.Core.Context
import Spark.Core.Functions
import Spark.Core.Column
import Spark.Core.Types
import Spark.Core.Row
import Spark.Core.ColumnFunctions
-- The IO inputs
import Spark.IO.Inputs
-- Some internal utilities to show what is happening under the scene.
import Spark.Core.Internal.ContextStructures
import Spark.Core.Internal.ContextInternal
import Spark.Core.Internal.ContextIOInternal
import KarpsDagDisplay(computeGraphToDisplayGraph)
import Spark.Core.Internal.Utilities(forceRight)
import Control.Monad.State(get, put)
In [2]:
conf = defaultConf {
confEndPoint = "http://10.0.2.2",
confRequestedSessionName = "session04_reading",
confUseNodePrunning = True}
createSparkSessionDef conf
When the type is known, reading data is not complicated.
TODO
Since all operations in Karps assume that the type of the data is available, we must run a preliminary step to infer the type of the data in our JSON file. Since the output is a dataframe (which is properly typed), we must invoke the execStateDef command instead of one of the exec1Def commands.
The following uses Spark's builtin capabilities to infer the schema of the JSON file:
In [3]:
df <- execStateDef $ jsonInfer "/Users/tjhunter/Downloads/employees.json"
In [4]:
df
In [5]:
let node = collect' (asCol' df)
x <- exec1Def' node
x
Now, rerun the computation above to get the result. You will notice that the result comes faster. In fact, if you were to use a larger file, you would find that the running time is constant! This is an example of Karps's aggressive caching at play.
Before running a computation, Karps inspects the sources for the last time of modification, if it finds that they have not changed since a previous computation, they are replaced by older results. In the case of our simple computation, because the file has not changed, the same result is returned.
Here is a more detailed explanation. We want to run the following computation graph:
In [6]:
showGraph (forceRight node)
The first operation done by Karps is too look for external data sources (in this case a GenericDatasource) and call Spark for the status of the file (is it available, and how fresh is it?).
In [7]:
cg = forceRight $ buildComputationGraph (forceRight node)
cgWithSourceT <- execStateDef $ updateSourceInfo cg
cgWithSource = forceRight cgWithSourceT
computeGraphToDisplayGraph cgWithSource
Based on this information, Karps prunes the graph of computation, looking for all the observables that have already been computed. In this case, we see that everything collapses onto a pointer to a previous computation: there is nothing to do!
In [8]:
-- These are the transforms done internally by Karps
cgTrans <- execStateDef $ do
session <- get
return $ forceRight $ performGraphTransforms session cgWithSource
computeGraphToDisplayGraph cgTrans
Now, change the file and see what happens. The computation will be run this time.
Thanks to determinism, Karps is able to remove large chunks of computations that have already been done before.