This is not meant to be a full tutorial, but rather a brief presentation on how to cache datasets with Krapsh. A full Spark tutorial can be found there: http://spark.apache.org/docs/latest/quick-start.html#caching
In [1]:
:load KrapshDisplays
import KrapshDisplays(showGraph)
import KrapshDagDisplay(computeGraphToDisplayGraph)
In [2]:
import Spark.Core.Dataset
import Spark.Core.Context
import Spark.Core.Functions
import Spark.Core.Column
import Spark.Core.ColumnFunctions
import qualified Data.Text as T
conf = defaultConf {
confEndPoint = T.pack "http://localhost",
confRequestedSessionName = T.pack "session03_introduction" }
createSparkSessionDef conf
In [3]:
-- Some internal utilities to show what is happening under the scene.
import Spark.Core.Internal.ContextStructures
import Spark.Core.Internal.ContextInternal
import Spark.Core.Internal.Utilities(forceRight)
Caching is a fundamental step when working repeatedly with large datasets. It is hard to get right, however, and it is a frequent source of performance issues for budding Spark users. Krapsh embeds some tools to help you diagnose caching issues.
Consider the following trivial program that creates a dataset, and runs a few computations on it (it computes the number of elements as well as their sums, and computes the mean out of it). Of course, Spark includes a mean operator so this would not be recommended in practice, but it is instructive to study caching issues.
In [4]:
ds = dataset ([1,2,3] :: [Int]) @@ "data"
-- Turns the dataset into a column and computes the sum of all the elements in this column
s1 = sumCol (asCol ds) @@ "sum"
-- Counts the element in the dataset
s2 = count ds @@ "count"
x = (s1 + s2) @@ "result"
Let us see what the computation graph looks like:
In [5]:
showGraph x
For large datasets, this would be inefficient, because it would cause the original dataset data
to be recomputed twice, to perform the sum
and count
operations.
We can instruct Spark to cache the dataset (using the cache
operator). But this will consume some precious memory, so we need to tell Spark to uncache it once we are sure we are done. The operator to do that, unsurprisingly, is called uncache
. Here is a first attempt that is not going to work:
In [6]:
-- cache
cachedData = cache ds @@ "cache"
s1 = sumCol (asCol cachedData) @@ "sum"
s2 = count cachedData @@ "count"
uncachedData = uncache cachedData @@ "uncache"
x = (s1 + s2) @@ "result"
Let us look at the graph of computations: the uncaching operation is missing!
In [7]:
showGraph x
Because of laziness, the result must depend on the uncaching operation. Otherwise it is skipped when calculating the operations that need to be done.
Here is a second attempt, in which we explicitly add a logical dependency between the uncaching and the result: the uncaching will happen before we compute the result. Adding a logical dependency is expressed with the depends
function.
In [8]:
:t depends
In [9]:
x = ((s1 + s2) `depends` [untyped uncachedData]) @@ "result"
Now we see the uncaching operation in the graph of operations:
In [10]:
showGraph x
However, when we attempt to run it, it fails!
In [11]:
exec1Def x
The message gives us a hint, telling us that it found some operations that are illegal.
When you look at the graph above, nothing indicates to Spark if it should first cache the data, or compute the sum or count operation. This ambiguity is in fact visible in the graph because the three nodes sum
, count
and uncache
are represented on the same line. Which one should be done first?
Because of the lazy nature in Spark, it is a common mistake to uncache data while it is still going to be used later. Krapsh analyzes the graph of computations, and will refuse to run operations if they could be misintepreted by Spark. In that case, Krapsh can see the sum
or count
operation could happen after we uncache the data, so it will ask us to resolve this ambiguity.
How do we fix it? Simply by ensuring that sum
or count
is run before uncaching the data, using logical dependencies again:
In [12]:
uncachedData = ((uncache cachedData) `depends` [untyped s1, untyped s2]) @@ "uncache"
x = ((s1 + s2) `depends` [untyped uncachedData]) @@ "result"
showGraph x
When trying to run this program now, Spark can properly launch the computations.
In [13]:
exec1Def x
This business of caching and uncaching is clearly tedious and at the same time fairly mechanical. Can we automate all that? Yes! Krapsh includes an autocache operator that acts as a hint: it instructs Spark to consider a specific dataset for potential caching. If Krapsh decides it is useful to cache this dataset, it will automatically insert caching and uncaching instructions.
Here is our original program, but this time we automate the caching process: no uncaching, and caching is done using the autocache
operator.
In [14]:
cachedData = autocache ds @@ "autocache"
s1 = sumCol (asCol cachedData) @@ "sum"
s2 = count cachedData @@ "count"
x = (s1 + s2) @@ "result"
This is all that is required. For the sake of demonstration, let us see under the hood what transforms have been done by looking at the graph of computations. This is not required by casual users, only to understand the computations that are going to take place:
In [15]:
cg = forceRight (buildComputationGraph x)
cg' = forceRight (performGraphTransforms cg)
computeGraphToDisplayGraph cg'
This is very close to the transforms that we have done manually! Krapsh has added extra logical dependencies to ensure that operations are done in the correct order.
To conclude:
autocache
when possiblecache
and uncache
. Krapsh will prevent you from doing mistakes however.
In [ ]: