Predicting Breast Cancer Proliferation Scores with Apache Spark and Apache SystemML

Preprocessing


Setup


In [ ]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

import os
import shutil

import matplotlib.pyplot as plt
import numpy as np

from breastcancer.preprocessing import preprocess, save, train_val_split

# Ship a fresh copy of the `breastcancer` package to the Spark workers.
# Note: The zip must include the `breastcancer` directory itself,
# as well as all files within it for `addPyFile` to work correctly.
# This is equivalent to `zip -r breastcancer.zip breastcancer`.
dirname = "breastcancer"
zipname = dirname + ".zip"
shutil.make_archive(dirname, 'zip', dirname + "/..", dirname)
spark.sparkContext.addPyFile(zipname)

plt.rcParams['figure.figsize'] = (10, 6)

Execute Preprocessing & Save


In [ ]:
# TODO: Filtering tiles and then cutting into samples could result
# in samples with less tissue than desired, despite that being the
# procedure of the paper.  Look into simply selecting tiles of the
# desired size to begin with.

In [ ]:
# Get list of image numbers, minus the broken ones.
broken = {2, 45, 91, 112, 242, 256, 280, 313, 329, 467}
slide_nums = sorted(set(range(1,501)) - broken)

# Settings
training = True
tile_size = 256
sample_size = 256
grayscale = False
num_partitions = 20000
add_row_indices = True
train_frac = 0.8
split_seed = 24
folder = "data"  # Linux-filesystem directory to read raw data
save_folder = "data"  # Hadoop-supported directory in which to save DataFrames
df_path = os.path.join(save_folder, "samples_{}_{}{}.parquet".format(
    "labels" if training else "testing", sample_size, "_grayscale" if grayscale else ""))
train_df_path = os.path.join(save_folder, "train_{}{}.parquet".format(sample_size,
    "_grayscale" if grayscale else ""))
val_df_path = os.path.join(save_folder, "val_{}{}.parquet".format(sample_size,
    "_grayscale" if grayscale else ""))

df_path, train_df_path, val_df_path

In [ ]:
# Process all slides.
df = preprocess(spark, slide_nums, tile_size=tile_size, sample_size=sample_size,
                grayscale=grayscale, training=training, num_partitions=num_partitions,
                folder=folder)

In [ ]:
# Save DataFrame of samples.
save(df, df_path, sample_size, grayscale)

In [ ]:
# Load full DataFrame from disk.
df = spark.read.load(df_path)

In [ ]:
# Split into train and validation DataFrames based On slide number
train, val = train_val_split(spark, df, slide_nums, folder, train_frac, add_row_indices,
                             seed=split_seed)

In [ ]:
# Save train and validation DataFrames.
save(train, train_df_path, sample_size, grayscale)
save(val, val_df_path, sample_size, grayscale)

Sample Data

TODO: Wrap this in a function with appropriate default arguments


In [ ]:
# Load train and validation DataFrames from disk.
train = spark.read.load(train_df_path)
val = spark.read.load(val_df_path)

In [ ]:
# Take a stratified sample.
p=0.01
train_sample = train.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42)
val_sample = val.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42)

train_sample, val_sample

In [ ]:
# Reassign row indices.
# TODO: Wrap this in a function with appropriate default arguments.
train_sample = (
  train_sample.rdd
              .zipWithIndex()
              .map(lambda r: (r[1] + 1, *r[0]))
              .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
train_sample = train_sample.select(train_sample["__INDEX"].astype("int"),
                                   train_sample.slide_num.astype("int"),
                                   train_sample.tumor_score.astype("int"),
                                   train_sample.molecular_score,
                                   train_sample["sample"])

val_sample = (
  val_sample.rdd
            .zipWithIndex()
            .map(lambda r: (r[1] + 1, *r[0]))
            .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
val_sample = val_sample.select(val_sample["__INDEX"].astype("int"),
                               val_sample.slide_num.astype("int"),
                               val_sample.tumor_score.astype("int"),
                               val_sample.molecular_score,
                               val_sample["sample"])

train_sample, val_sample

In [ ]:
# Save train and validation DataFrames.
tr_sample_filename = "train_{}_sample_{}{}.parquet".format(p, sample_size,
    "_grayscale" if grayscale else "")
val_sample_filename = "val_{}_sample_{}{}.parquet".format(p, sample_size,
    "_grayscale" if grayscale else "")
train_sample_path = os.path.join(save_folder, tr_sample_filename)
val_sample_path = os.path.join(save_folder, val_sample_filename)
save(train_sample, train_sample_path, sample_size, grayscale)
save(val_sample, val_sample_path, sample_size, grayscale)