DataVec

The DataVec library from DL4J is easy to add to the BeakerX kernel, including displaying its tables with the BeakerX interactive table widget. DataVec is an ETL Library for Machine Learning, including data pipelines, data munging, and wrangling.


In [ ]:
%%classpath add mvn
org.datavec datavec-api 0.9.1
org.datavec datavec-local 0.9.1
org.datavec datavec-dataframe 0.9.1
org.deeplearning4j deeplearning4j-core 0.9.1
org.nd4j nd4j-native-platform 0.9.1

In [ ]:
%import org.nd4j.linalg.api.ndarray.INDArray
%import org.datavec.api.split.FileSplit
%import org.deeplearning4j.datasets.datavec.RecordReaderDataSetIterator
%import java.nio.file.Paths
%import org.nd4j.linalg.factory.Nd4j
%import org.datavec.api.transform.TransformProcess
%import org.datavec.api.records.reader.impl.csv.CSVRecordReader

In [ ]:
import org.datavec.api.transform.schema.Schema

inputDataSchema = new Schema.Builder()
            //We can for convenience define multiple columns of the same type
            .addColumnsString("DateString", "TimeString")
            //We can define different column types for different types of data:
            .addColumnCategorical("State", Arrays.asList("GA","VA","IL","MO","IN","KY","MS","LA","AL","TN","OH","NC","MD","CA","AZ","FL","IA","MN","KS","TX","OK","AR","NE","WA","WY","CO","ID","SD","PA","MT","NV","NY","DE","NM","ME","ND","SC","WV","MI","WI","NH","CT","MA"))
            .addColumnsInteger("State No", "Scale", "Injuries", "Fatalities")
            //Some columns have restrictions on the allowable values, that we consider valid:
            .addColumnsDouble("Start Lat", "Start Lon", "Length", "Width")
            .build();

In [ ]:
import org.datavec.api.transform.condition.ConditionOp
import org.datavec.api.transform.condition.column.CategoricalColumnCondition
import org.datavec.api.transform.filter.ConditionFilter

transformProcess = new TransformProcess.Builder(inputDataSchema)
  //Let's remove some column we don't need
  .removeColumns("DateString", "TimeString", "State No")
  //Now, suppose we only want to analyze tornadoes involving NY, MI, IL, MA. Let's filter out
  // everything except for those states.
  //Here, we are applying a conditional filter. We remove all of the examples that match the condition
  // The condition is "State" isn't one of {"NY", "MI", "IL", "MA"}
  .filter(new ConditionFilter(
                new CategoricalColumnCondition("State", ConditionOp.NotInSet, new HashSet<>(Arrays.asList("NY", "WA")))))
  .build();

In [ ]:
import org.datavec.local.transforms.TableRecords
import jupyter.Displayer;
import jupyter.Displayers;

//JVM Repr to display table using our widget instead raw string table
Displayers.register(org.datavec.dataframe.api.Table.class, new Displayer<org.datavec.dataframe.api.Table>() {
      @Override
      public Map<String, String> display(org.datavec.dataframe.api.Table table) {
        return new HashMap<String, String>() {{
          put(MIMEContainer.MIME.HIDDEN, "");
          List<List<String>> values = new ArrayList<>();
          for (int row=0; row<table.rowCount(); row++) {
            List<String> rowValues = new ArrayList<>();
            for (int column=0; column<table.columnCount(); column++) {              
              rowValues.add(table.get(column, row));
            }
          values.add(rowValues);
          }
          System.out.println(values);  
          tableDis = new TableDisplay(values, table.columnNames(), new ArrayList());
          tableDis.display();
        }};
      }
    });

outputSchema = transformProcess.getFinalSchema()
table = TableRecords.tableFromSchema(outputSchema)

For purpose of our example, we load data from CSV file and transform it using DataVec


In [ ]:
import org.datavec.api.records.reader.impl.transform.TransformProcessRecordReader

writable = []

TransformProcessRecordReader tprr = new TransformProcessRecordReader(new CSVRecordReader(0,","), transformProcess)
tprr.initialize(new FileSplit(Paths.get("../resources/data/tornadoes_2014.csv").toFile()))

// Extract filtered data (omitting null values)
while (tprr.hasNext()) {
    elem = tprr.next();
    if (elem) {
      writable.add(elem)
    }
}

In [ ]:
// Fill Table with extracted data
for (int row=0; row<writable.size; row++) {
    for (int col=0; col<outputSchema.numColumns(); col++) {
        column = table.column(col);
        column.addCell("" + writable[row][col])
    }
}

table

Example of network which classify two groups of data


In [ ]:
import org.datavec.api.records.reader.RecordReader
import org.datavec.api.util.ClassPathResource
import org.deeplearning4j.eval.Evaluation
import org.deeplearning4j.nn.api.OptimizationAlgorithm
import org.deeplearning4j.nn.conf.MultiLayerConfiguration
import org.deeplearning4j.nn.conf.NeuralNetConfiguration
import org.deeplearning4j.nn.conf.Updater
import org.deeplearning4j.nn.conf.layers.DenseLayer
import org.deeplearning4j.nn.conf.layers.OutputLayer
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork
import org.deeplearning4j.nn.weights.WeightInit
import org.deeplearning4j.optimize.listeners.ScoreIterationListener
import org.nd4j.linalg.activations.Activation
import org.nd4j.linalg.dataset.DataSet
import org.nd4j.linalg.lossfunctions.LossFunctions.LossFunction
import org.nd4j.linalg.dataset.api.iterator.DataSetIterator

def seed = 123;
def learningRate = 0.01;
def batchSize = 50;
def nEpochs = 30;

def numInputs = 2;
def numOutputs = 2;
def numHiddenNodes = 20;

def filenameTrain = Paths.get("../resources/data/linear_data_train.csv").toFile();
def filenameTest = Paths.get("../resources/data/linear_data_eval.csv").toFile();

    //Load the training data:
rr = new CSVRecordReader();
rr.initialize(new FileSplit(filenameTrain));
trainIter = new RecordReaderDataSetIterator(rr,batchSize,0,2);

//Load the test/evaluation data:
rrTest = new CSVRecordReader();
rrTest.initialize(new FileSplit(filenameTest));
testIter = new RecordReaderDataSetIterator(rrTest,batchSize,0,2);

conf = new NeuralNetConfiguration.Builder()
  .seed(seed)
  .iterations(1)
  .optimizationAlgo(OptimizationAlgorithm.STOCHASTIC_GRADIENT_DESCENT)
  .learningRate(learningRate)
  .updater(Updater.NESTEROVS)     //To configure: .updater(new Nesterovs(0.9))
  .list()
  .layer(0, new DenseLayer.Builder().nIn(numInputs).nOut(numHiddenNodes)
    .weightInit(WeightInit.XAVIER)
    .activation(Activation.RELU)
    .build())
  .layer(1, new OutputLayer.Builder(LossFunction.NEGATIVELOGLIKELIHOOD)
    .weightInit(WeightInit.XAVIER)
    .activation(Activation.SOFTMAX).weightInit(WeightInit.XAVIER)
    .nIn(numHiddenNodes).nOut(numOutputs).build())
  .pretrain(false)
  .backprop(true)
  .build();

model = new MultiLayerNetwork(conf);
model.init();
model.setListeners(new ScoreIterationListener(10));  //Print score every 10 parameter updates

for(int n = 0; n < nEpochs; n++) {
 model.fit( trainIter );
}

print "Evaluate model...."
eval = new Evaluation(numOutputs);

while(testIter.hasNext()){
  currentElement = testIter.next();
  INDArray features = currentElement.getFeatureMatrix();
  INDArray lables = currentElement.getLabels();
  INDArray predicted = model.output(features,false);

  eval.eval(lables, predicted);
}

//Print the evaluation statistics
print eval.stats()

In [ ]:
import org.nd4j.linalg.api.ops.impl.indexaccum.IMax

def filenameTrain = Paths.get("../resources/data/linear_data_train.csv").toFile();
def filenameTest = Paths.get("../resources/data/linear_data_eval.csv").toFile();
def extractDataFromND(features, labels) {
    def classesOfPoints = [:]
    nRows = features.rows()
    nClasses = labels.columns()
    INDArray argMax = Nd4j.getExecutioner().exec(new IMax(labels), 1);
    
    for (int i=0; i<features.rows(); i++) {
        int classIdx = (int)argMax.getDouble(i);
        classesOfPoints << [[features.getDouble(i, 0), features.getDouble(i, 1)]: classIdx]
    }
    
    return classesOfPoints
}

rr.initialize(new FileSplit(filenameTrain))
rr.reset()
trainIter = new RecordReaderDataSetIterator(rr, 1000, 0, 2)
ds = trainIter.next();
rawTrainData = extractDataFromND(ds.getFeatures(), ds.getLabels())

rrTest.initialize(new FileSplit(filenameTest))
rrTest.reset();
testIter = new RecordReaderDataSetIterator(rrTest,500,0,2);
ds = testIter.next();
INDArray testPredicted = model.output(ds.getFeatures());

rawPredictedData = extractDataFromND(ds.getFeatures(), testPredicted)

In [ ]:
def plot = new Plot(title: "Training data Plot")
plot.setXBound([0.0, 1.0])
plot.setYBound([-0.2, 0.8])

rawTrainData.each{k, v -> 
    if (v==0) {
       plot << new Points(x: [k[0]], y: [k[1]], color: Color.orange)
    } else {
       plot << new Points(x: [k[0]], y: [k[1]], color: Color.red)
    }
}

plot

In [ ]:
def plot = new Plot(title: "Predicted data Plot")
plot.setXBound([0.0, 1.0])
plot.setYBound([-0.2, 0.8])

rawPredictedData.each{k, v -> 
    if (v==0) {
       plot << new Points(x: [k[0]], y: [k[1]], color: Color.orange)
    } else {
       plot << new Points(x: [k[0]], y: [k[1]], color: Color.red)
    }
}

plot

In [ ]: