Reading data

A big data system is rather useless if you cannot read data. Karps lets you read from all the sources supported by Spark, with some additional benefits.

One main difference compared to Spark is the schema of the data is strictly checked. How does it work when one does not know the schema in advance, for example when reading a JSON file? Karps provides some facilities for that.


In [1]:
:load KarpsDisplays KarpsDagDisplay
:extension OverloadedStrings

import KarpsDisplays(showGraph)

import Spark.Core.Dataset
import Spark.Core.Context
import Spark.Core.Functions
import Spark.Core.Column
import Spark.Core.Types
import Spark.Core.Row
import Spark.Core.ColumnFunctions
-- The IO inputs
import Spark.IO.Inputs

-- Some internal utilities to show what is happening under the scene.
import Spark.Core.Internal.ContextStructures
import Spark.Core.Internal.ContextInternal
import Spark.Core.Internal.ContextIOInternal
import KarpsDagDisplay(computeGraphToDisplayGraph)
import Spark.Core.Internal.Utilities(forceRight)
import Control.Monad.State(get, put)

In [2]:
conf = defaultConf {
        confEndPoint = "http://10.0.2.2",
        confRequestedSessionName = "session04_reading",
        confUseNodePrunning = True}

createSparkSessionDef conf


[Debug] Creating spark session at url: http://10.0.2.2:8081/sessions/session04_reading @(<unknown>:<unknown> <unknown>:0:0)

Reading typed data

When the type is known, reading data is not complicated.

TODO

Reading untyped data

Since all operations in Karps assume that the type of the data is available, we must run a preliminary step to infer the type of the data in our JSON file. Since the output is a dataframe (which is properly typed), we must invoke the execStateDef command instead of one of the exec1Def commands.

The following uses Spark's builtin capabilities to infer the schema of the JSON file:


In [3]:
df <- execStateDef $ jsonInfer "/Users/tjhunter/Downloads/employees.json"


[Debug] executeCommand1': computing observable /inferschema_70b0c1@org.spark.InferSchema!{rows:[{fieldPath:[string] isNullable:bool typeId:int fieldIndex:int}]} @(<unknown>:<unknown> <unknown>:0:0)
[Info] Sending computations at url: http://10.0.2.2:8081/computations/session04_reading/0/createwith nodes: [/inferschema_70b0c1@org.spark.InferSchema!{rows:[{fieldPath:[string] isNullable:bool typeId:int fieldIndex:int}]}] @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /inferschema_70b0c1 finished @(<unknown>:<unknown> <unknown>:0:0)

In [4]:
df


Right /genericdatasource_bcdac3@org.spark.GenericDatasource:{name:string? salary:int?}

In [5]:
let node = collect' (asCol' df)
x <- exec1Def' node
x


[Debug] executeCommand1': computing observable /collect_e4bd12@org.spark.Collect![{name:string? salary:int?}] @(<unknown>:<unknown> <unknown>:0:0)
[Debug] updateSourceInfo: found sources [HdfsPath "/Users/tjhunter/Downloads/employees.json"] @(<unknown>:<unknown> <unknown>:0:0)
[Debug] updateSourceInfo: retrieved stamps [(HdfsPath "/Users/tjhunter/Downloads/employees.json",Right (DataInputStamp "1491777474000"))] @(<unknown>:<unknown> <unknown>:0:0)
[Info] Sending computations at url: http://10.0.2.2:8081/computations/session04_reading/1/createwith nodes: [/genericdatasource_bcdac3@org.spark.GenericDatasource:{name:string? salary:int?},/collect_e4bd12@org.spark.Collect![{name:string? salary:int?}]] @(<unknown>:<unknown> <unknown>:0:0)
[Info] _computationMultiStatus: /collect_e4bd12 finished @(<unknown>:<unknown> <unknown>:0:0)
RowArray (fromList [RowArray (fromList [StringElement "Andy",IntElement 4500]),RowArray (fromList [StringElement "Berta",IntElement 4000]),RowArray (fromList [StringElement "Justin",IntElement 3500]),RowArray (fromList [StringElement "Michael",IntElement 3000])])

Computation reuse and resource checks

Now, rerun the computation above to get the result. You will notice that the result comes faster. In fact, if you were to use a larger file, you would find that the running time is constant! This is an example of Karps's aggressive caching at play.

Before running a computation, Karps inspects the sources for the last time of modification, if it finds that they have not changed since a previous computation, they are replaced by older results. In the case of our simple computation, because the file has not changed, the same result is returned.

Here is a more detailed explanation. We want to run the following computation graph:


In [6]:
showGraph (forceRight node)


The first operation done by Karps is too look for external data sources (in this case a GenericDatasource) and call Spark for the status of the file (is it available, and how fresh is it?).


In [7]:
cg = forceRight $ buildComputationGraph (forceRight node)
cgWithSourceT <- execStateDef $ updateSourceInfo cg
cgWithSource = forceRight cgWithSourceT
computeGraphToDisplayGraph cgWithSource


[Debug] updateSourceInfo: found sources [HdfsPath "/Users/tjhunter/Downloads/employees.json"] @(<unknown>:<unknown> <unknown>:0:0)
[Debug] updateSourceInfo: retrieved stamps [(HdfsPath "/Users/tjhunter/Downloads/employees.json",Right (DataInputStamp "1491777474000"))] @(<unknown>:<unknown> <unknown>:0:0)

Based on this information, Karps prunes the graph of computation, looking for all the observables that have already been computed. In this case, we see that everything collapses onto a pointer to a previous computation: there is nothing to do!


In [8]:
-- These are the transforms done internally by Karps
cgTrans <- execStateDef $ do
        session <- get
        return $ forceRight $ performGraphTransforms session cgWithSource
computeGraphToDisplayGraph cgTrans


Now, change the file and see what happens. The computation will be run this time.

Thanks to determinism, Karps is able to remove large chunks of computations that have already been done before.