Sparta - Occupancy Model

This example will distruibute multiple Sparta model fits across a cluster using Spark/SparkR technology.

By using the "R (SparkR)" jupyter kernel the SparkR libary is loaded and the Spark context (sc) generated automatically. These will need to be set by the user if not using the SparkR jupyter kernel (see commands below). Where the container environmental varaibles SPARK_HOME and MASTER give the path to local Spark installation and address of the Spark cluster, respectively.

 library(SparkR, lib.loc=file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'))
 sc <- sparkR.session(master=Sys.getenv('MASTER'))

In [1]:
ls() # Spark context (sc) already generated


'sc'

The devtools library is install by default to allow for installing new libraries. At present if the container resets the new libray will be lost. Libraries can be installed globally on request, which will persist following a restart of the container.


In [2]:
library(devtools)
install_github('BiologicalRecordsCentre/sparta@0.1.30')


Skipping install of 'sparta' from a github remote, the SHA1 (f7ea39de) has not changed since last install.
  Use `force = TRUE` to force installation

User files must be written and read from the /data/ path as this is availble to availible to all the containers (notebooks and Spark nodes).


In [3]:
library(sparta)
load('/data/biodiversity/taxa/input/Test_Data.rdata')
visitData <- formatOccData(taxa = taxa_data$CONCEPT, site = taxa_data$TO_GRIDREF, time_period = taxa_data$TO_STARTDATE)
save(visitData, file = '/data/biodiversity/taxa/input/visit_data.rdata')


Loading required package: lme4
Loading required package: Matrix

Libraries used within functions to be distributed to a Spark cluster must be install on the Spark worker and then import them inside the function. At present, libraries can only be installed by request. For this example, the Sparta libary (frozen at version 0.1.30) R2jags have been installed on the Spark workers.

Only a single argument can be passed to the distributed functions using spark.lapply command. More complex structures can passed using lists, but are limited in Jupyter IO flow. For this example, save visitData from current notebook and re-load it within the worker function.


In [4]:
modelFitFunct <-  function(taxaName) {
    start.time <- Sys.time()
    loadNamespace("sparta") # this will throw an error if the library is missing
    loadNamespace("R2jags")
    load('/data/biodiversity/taxa/input/visit_data.rdata') # Load required data *contains variable visitData*
    iterations <- 10
    model.output <- sparta::occDetFunc(taxa_name = taxaName,
                                       occDetdata = visitData$occDetdata,
                                       spp_vis = visitData$spp_vis,
                                       write_results = FALSE,
                                       n_chains = 1,
                                       n_iterations = iterations,
                                       burnin = iterations/2,
                                       thinning = 3,
                                       nyr = 2)
    # Save output to the /data/ path on each loop, in case the browser tab is closed
    save(model.output, file = file.path('/data/biodiversity/taxa/output', paste0(taxaName, '.rdata')))
    end.time <- Sys.time()
    return(end.time - start.time)
}

Unlike Zeppelin, Jupyter Notebooks can not keep a process running once the browser tab is closed. The Future library, will turn a process into an asynchronous task and will allow the cell to continue to run in the backgound. Variables can then be checked using the value command.


In [5]:
library("future")

In [ ]:
runList <- tail(colnames(visitData$spp_vis), -1)[1:2] # limit to 2 species rather than the full 50
out <- future({spark.lapply(runList, modelFitFunct)})

In [ ]:
value(out)

In [ ]: