Predicting Breast Cancer Proliferation Scores with Apache Spark and Apache SystemML

Preprocessing


Setup


In [ ]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

import math

import matplotlib.pyplot as plt
import numpy as np
import openslide
from openslide.deepzoom import DeepZoomGenerator
import pandas as pd
from pyspark.mllib.linalg import Vectors
from scipy.ndimage.morphology import binary_fill_holes
from skimage.color import rgb2gray
from skimage.feature import canny
from skimage.morphology import binary_closing, binary_dilation, disk

plt.rcParams['figure.figsize'] = (10, 6)

Open Whole-Slide Image


In [ ]:
def open_slide(slide_num, folder, training):
  """
  Open a whole-slide image, given an image number.
  
  Args:
    slide_num: Slide image number as an integer.
    folder: Directory in which the slides folder is stored, as a string.
      This should contain either a `training_image_data` folder with
      images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
      folder with images in the format `TUPAC-TE-###.svs`.
    training: Boolean for training or testing datasets.
  
  Returns:
    An OpenSlide object representing a whole-slide image.
  """
  if training:
    filename = os.path.join(folder, "training_image_data", "TUPAC-TR-{}.svs".format(str(slide_num).zfill(3)))
  else:
    # Testing images
    filename = os.path.join(folder, "testing_image_data", "TUPAC-TE-{}.svs".format(str(slide_num).zfill(3)))
  slide = openslide.open_slide(filename)
  return slide

Create Tile Generator


In [ ]:
def create_tile_generator(slide, tile_size=1024, overlap=0):
  """
  Create a tile generator for the given slide.
  
  This generator is able to extract tiles from the overall
  whole-slide image.
  
  Args:
    slide: An OpenSlide object representing a whole-slide image.
    tile_size: The width and height of a square tile to be generated.
    overlap: Number of pixels by which to overlap the tiles.
  
  Returns:
    A DeepZoomGenerator object representing the tile generator. Each
    extracted tile is an Image with shape (tile_size, tile_size, channels).
    Note: This generator is not a true "Python generator function", but
    rather is an object that is capable of extracting individual tiles.
  """
  generator = DeepZoomGenerator(slide, tile_size=tile_size, overlap=overlap, limit_bounds=True)
  return generator

Determine 20x Magnification Zoom Level


In [ ]:
def get_20x_zoom_level(slide, generator):
  """
  Return the zoom level that corresponds to a 20x magnification.
  
  The generator can extract tiles from multiple zoom levels, downsampling
  by a factor of 2 per level from highest to lowest resolution.
  
  Args:
    slide: An OpenSlide object representing a whole-slide image.
    generator: A DeepZoomGenerator object representing a tile generator.
      Note: This generator is not a true "Python generator function", but
      rather is an object that is capable of extracting individual tiles.
  
  Returns:
    Zoom level corresponding to a 20x magnification, or as close as possible.
  """
  highest_zoom_level = generator.level_count - 1  # 0-based indexing
  try:
    mag = int(slide.properties[openslide.PROPERTY_NAME_OBJECTIVE_POWER])
    # `mag / 20` gives the downsampling factor between the slide's
    # magnification and the desired 20x magnification.
    # `(mag / 20) / 2` gives the zoom level offset from the highest
    # resolution level, based on a 2x downsampling factor in the
    # generator.
    offset = math.floor((mag / 20) / 2)
    level = highest_zoom_level - offset
  except ValueError:
    # In case the slide magnification level is unknown, just
    # use the highest resolution.
    level = highest_zoom_level
  return level

Generate Tile Indices For Whole-Slide Image.


In [ ]:
def process_slide(slide_num, folder, training, tile_size=1024, overlap=0):
  """
  Generate all possible tile indices for a whole-slide image.
  
  Given a slide number, tile size, and overlap, generate
  all possible (slide_num, tile_size, overlap, zoom_level, col, row)
  indices.
  
  Args:
    slide_num: Slide image number as an integer.
    folder: Directory in which the slides folder is stored, as a string.
      This should contain either a `training_image_data` folder with
      images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
      folder with images in the format `TUPAC-TE-###.svs`.
    training: Boolean for training or testing datasets.
    tile_size: The width and height of a square tile to be generated.
    overlap: Number of pixels by which to overlap the tiles.
  
  Returns:
    A list of (slide_num, tile_size, overlap, zoom_level, col, row)
    integer index tuples representing possible tiles to extract.
  """
  # Open slide.
  slide = open_slide(slide_num, folder, training)
  # Create tile generator.
  generator = create_tile_generator(slide, tile_size, overlap)
  # Get 20x zoom level.
  zoom_level = get_20x_zoom_level(slide, generator)
  # Generate all possible (zoom_level, col, row) tile index tuples.
  cols, rows = generator.level_tiles[zoom_level]
  tile_indices = [(slide_num, tile_size, overlap, zoom_level, col, row)
                  for col in range(cols) for row in range(rows)]
  return tile_indices

Generate Tile From Tile Index


In [ ]:
def process_tile_index(tile_index, folder, training):
  """
  Generate a tile from a tile index.
  
  Given a (slide_num, tile_size, overlap, zoom_level, col, row) tile
  index, generate a (slide_num, tile) tuple.
  
  Args:
    tile_index: A (slide_num, tile_size, overlap, zoom_level, col, row)
      integer index tuple representing a tile to extract.
    folder: Directory in which the slides folder is stored, as a string.
      This should contain either a `training_image_data` folder with
      images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
      folder with images in the format `TUPAC-TE-###.svs`.
    training: Boolean for training or testing datasets.
  
  Returns:
    A (slide_num, tile) tuple, where slide_num is an integer, and tile
    is a 3D NumPy array of shape (tile_size, tile_size, channels) in
    RGB format.
  """
  slide_num, tile_size, overlap, zoom_level, col, row = tile_index
  # Open slide.
  slide = open_slide(slide_num, folder, training)
  # Create tile generator.
  generator = create_tile_generator(slide, tile_size, overlap)
  # Generate tile
  tile = np.array(generator.get_tile(zoom_level, (col, row)))
  return (slide_num, tile)

Filter Tile For Dimensions & Tissue Threshold


In [ ]:
def keep_tile(tile_tuple, tile_size=1024, tissue_threshold=0.9):
  """
  Determine if a tile should be kept.
  
  This filters out tiles based on size and a tissue percentage
  threshold, using a custom algorithm. If a tile has height &
  width equal to (tile_size, tile_size), and contains greater
  than or equal to the given percentage, then it will be kept;
  otherwise it will be filtered out.
  
  Args:
    tile_tuple: A (slide_num, tile) tuple, where slide_num is an
      integer, and tile is a 3D NumPy array of shape 
      (tile_size, tile_size, channels) in RGB format.
    tile_size: The width and height of a square tile to be generated.
    tissue_threshold: Tissue percentage threshold.
  
  Returns:
    A Boolean indicating whether or not a tile should be kept
    for future usage.
  """
  slide_num, tile = tile_tuple
  if tile.shape[0:2] == (tile_size, tile_size):
    # Convert 3D RGB image to 2D grayscale image, from
    # 0 (dense tissue) to 1 (plain background).
    tile = rgb2gray(tile)
    # 8-bit depth complement, from 1 (dense tissue)
    # to 0 (plain background).
    tile = 1 - tile
    # Canny edge detection with hysteresis thresholding.
    # This returns a binary map of edges, with 1 equal to
    # an edge. The idea is that tissue would be full of
    # edges, while background would not.
    tile = canny(tile)
    # Binary closing, which is a dilation followed by
    # an erosion. This removes small dark spots, which
    # helps remove noise in the background.
    tile = binary_closing(tile, disk(10))
    # Binary dilation, which enlarges bright areas,
    # and shrinks dark areas. This helps fill in holes
    # within regions of tissue.
    tile = binary_dilation(tile, disk(10))
    # Fill remaining holes within regions of tissue.
    tile = binary_fill_holes(tile)
    # Calculate percentage of tissue coverage.
    percentage = tile.mean()
    return percentage >= tissue_threshold
  else:
    return False

Generate Flattened Samples From Tile


In [ ]:
def process_tile(tile_tuple, sample_size=256, grayscale=False):
  """
  Process a tile into a group of smaller samples.
  
  Cut up a tile into smaller blocks of sample_size x sample_size pixels,
  change the shape of each sample from (H, W, channels) to 
  (channels, H, W), then flatten each into a vector of length
  channels*H*W.
  
  Args:
    tile_tuple: A (slide_num, tile) tuple, where slide_num is an
    integer, and tile is a 3D NumPy array of shape 
    (tile_size, tile_size, channels).
    sample_size: The new width and height of the square samples to be
      generated.
    grayscale: Whether or not to generate grayscale samples, rather
      than RGB.
  
  Returns:
    A list of (slide_num, sample) tuples representing cut up tiles,
    where each sample has been transposed from
    (sample_size_x, sample_size_y, channels) to (channels, sample_size_x, sample_size_y),
    and flattened to a vector of length (channels*sample_size_x*sample_size_y).
  """
  slide_num, tile = tile_tuple
  if grayscale:
    tile = rgb2gray(tile)[:, :, np.newaxis]  # Grayscale
    # Save disk space and future IO time by converting from [0,1] to [0,255],
    # at the expense of some minor loss of information.
    tile = np.round(tile * 255).astype("uint8")
  x, y, ch = tile.shape
  # 1. Reshape into a 5D array of (num_x, sample_size_x, num_y, sample_size_y, ch), where
  # num_x and num_y are the number of chopped tiles on the x and y axes, respectively.
  # 2. Swap sample_size_x and num_y axes to create (num_x, num_y, sample_size_x, sample_size_y, ch).
  # 3. Combine num_x and num_y into single axis, returning
  # (num_samples, sample_size_x, sample_size_y, ch).
  # 4. Swap axes from (num_samples, sample_size_x, sample_size_y, ch) to
  # (num_samples, ch, sample_size_x, sample_size_y).
  # 5. Flatten samples into (num_samples, ch*sample_size_x*sample_size_y).
  samples = (tile.reshape((x // sample_size, sample_size, y // sample_size, sample_size, ch))
                 .swapaxes(1,2)
                 .reshape((-1, sample_size, sample_size, ch))
                 .transpose(0,3,1,2))
  samples = samples.reshape(samples.shape[0], -1)
  samples = [(slide_num, sample) for sample in list(samples)]
  return samples

Visualize Tile


In [ ]:
def visualize_tile(tile):
  """
  Plot a tissue tile.
  
  Args:
    tile: A 3D NumPy array of shape (tile_size, tile_size, channels).
  
  Returns:
    None
  """
  plt.imshow(tile)
  plt.show()

Visualize Sample


In [ ]:
def visualize_sample(sample, size=256):
  """
  Plot a tissue sample.
  
  Args:
    sample: A square sample flattened to a vector of size
      (channels*size_x*size_y).
    size: The width and height of the square samples.
  
  Returns:
    None
  """
  # Change type, reshape, transpose to (size_x, size_y, channels).
  length = sample.shape[0]
  channels = int(length / (size * size))
  if channels > 1:
    sample = sample.astype('uint8').reshape((channels, size, size)).transpose(1,2,0)
    plt.imshow(sample)
  else:
    vmax = 255 if sample.max() > 1 else 1
    sample = sample.reshape((size, size))
    plt.imshow(sample, cmap="gray", vmin=0, vmax=vmax)
  plt.show()

Get Ground Truth Labels


In [ ]:
def create_ground_truth_maps(folder):
  """
  Create lookup maps for ground truth labels.
  
  Args:
    folder: Directory in which the slides folder is stored, as a string.
      This should contain a `training_ground_truth.csv` file.
  """
  filename = os.path.join(folder, "training_ground_truth.csv")
  labels = pd.read_csv(filename, names=["tumor_score","molecular_score"], header=None)
  labels["slide_num"] = range(1, 501)

  # Create slide_num -> tumor_score, and slide_num -> molecular_score dictionaries
  tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, labels.tumor_score)}
  molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, labels.molecular_score)}
  return tumor_score_dict, molecular_score_dict

Process All Slides Into A Saved Spark DataFrame


In [ ]:
def preprocess(slide_nums, folder="data", training=True, tile_size=1024, overlap=0,
               tissue_threshold=0.9, sample_size=256, grayscale=False, num_partitions=20000):
  """
  Preprocess a set of whole-slide images.
  
  Preprocess a set of whole-slide images as follows:
    1. Tile the slides into tiles of size (tile_size, tile_size, 3).
    2. Filter the tiles to remove unnecessary tissue.
    3. Cut the remaining tiles into samples of size (sample_size, sample_size, ch),
       where `ch` is 1 if `grayscale` is true, or 3 otherwise.
  
  Args:
    slide_nums: List of whole-slide numbers to process.
    folder: Directory in which the slides folder is stored, as a string.
      This should contain either a `training_image_data` folder with
      images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
      folder with images in the format `TUPAC-TE-###.svs`.
    training: Boolean for training or testing datasets.
    tile_size: The width and height of a square tile to be generated.
    overlap: Number of pixels by which to overlap the tiles.
    tissue_threshold: Tissue percentage threshold for filtering.
    sample_size: The new width and height of the square samples to be
      generated.
    grayscale: Whether or not to generate grayscale samples, rather
      than RGB.
    num_partitions: Number of partitions to use during processing.
  
  Returns:
    A DataFrame in which each row contains the slide number, tumor score,
    molecular score, and the sample stretched out into a Vector.
  """
  slides = sc.parallelize(slide_nums)
  # Force even partitioning by collecting and parallelizing -- for memory issues
  ## HACK Note: This was a PySpark bug with a fix in the master branch now.
  tile_indices = slides.flatMap(lambda slide: process_slide(slide, folder, training, tile_size, overlap)).collect()
  tile_indices = sc.parallelize(tile_indices, num_partitions)
  ## END HACK -- update later
  tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
  filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
  samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
  if training:
    tumor_score_dict, molecular_score_dict = create_ground_truth_maps(folder)
    samples_with_labels = (samples.map(lambda tup: 
                                       (tup[0], tumor_score_dict[tup[0]], molecular_score_dict[tup[0]],
                                        Vectors.dense(tup[1]))))
    df = samples_with_labels.toDF(["slide_num", "tumor_score", "molecular_score", "sample"])
    df = df.select(df.slide_num.astype("int"), df.tumor_score.astype("int"), df.molecular_score, df["sample"])
  else:  # testing data -- no labels
    df = samples.toDF(["slide_num", "sample"])
    df = df.select(df.slide_num.astype("int"), df["sample"])
  df = df.repartition(num_partitions)  # Even out the partitions
  return df

def save(df, training=True, sample_size=256, grayscale=False, mode="error"):
  """
  Save a preprocessed DataFrame of samples in Parquet format.
  
  The filename will be formatted as follows:
    `samples_{labels|testing}_SAMPLE-SIZE[_grayscale].parquet`
  
  Args:
    df: A DataFrame in which each row contains the slide number, tumor score,
      molecular score, and the sample stretched out into a Vector.
    training: Boolean for training or testing datasets.
    sample_size: The width and height of the square samples.
    grayscale: Whether or not to the samples are in grayscale format, rather
      than RGB.
    mode: Specifies the behavior of `df.write.mode` when the data already exists.
      Options include:
        * `append`: Append contents of this :class:`DataFrame` to existing data.
        * `overwrite`: Overwrite existing data.
        * `error`: Throw an exception if data already exists.
        * `ignore`: Silently ignore this operation if data already exists.
  """
  filename = "samples_{}_{}{}.parquet".format("labels" if training else "testing",
                                              sample_size,
                                              "_grayscale" if grayscale else "")
  filepath = os.path.join("data", filename)
  df.write.mode(mode).save(filepath, format="parquet")

Execute Preprocessing & Save


In [ ]:
# TODO: Filtering tiles and then cutting into samples could result
# in samples with less tissue than desired, despite that being the
# procedure of the paper.  Look into simply selecting tiles of the
# desired size to begin with.

In [ ]:
# Get list of image numbers, minus the broken ones
broken = {2, 45, 91, 112, 242, 256, 280, 313, 329, 467}
slide_nums = sorted(set(range(1,501)) - broken)

# Settings
training = True
sample_size=64
grayscale = True
num_partitions = 20000
folder = "/home/MDM/breast_cancer/data"

In [ ]:
# Process all slides
df = preprocess(slide_nums, sample_size=sample_size, grayscale=grayscale,
                training=training, num_partitions=num_partitions, folder=folder)
df

In [ ]:
# Save DataFrame of samples
save(df, sample_size=sample_size, grayscale=grayscale, training=training)

Split Into Separate Train & Validation DataFrames Based On Slide Number

TODO: Wrap this in a function with appropriate default arguments


In [ ]:
filename = "samples_{}_{}{}.parquet".format("labels" if training else "testing",
                                            sample_size,
                                            "_grayscale" if grayscale else "")
filepath = os.path.join("data", filename)
df = sqlContext.read.load(filepath)

In [ ]:
labels = pd.read_csv("data/training_ground_truth.csv", names=["tumor_score","molecular_score"], header=None)
labels["slide_num"] = range(1, 501)

# Create slide_num -> tumor_score and slide_num -> molecular_score dictionaries
tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, labels.tumor_score)}
molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, labels.molecular_score)}

In [ ]:
print(labels["tumor_score"].value_counts() / labels.tumor_score.count())
print(labels[labels.slide_num > 400]["tumor_score"].value_counts() / labels[labels.slide_num > 400].tumor_score.count())

In [ ]:
train = (df.where(df.slide_num <= 400)
           .rdd
           .zipWithIndex()
           .map(lambda r: (r[1] + 1, *r[0]))
           .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
train = train.select(train["__INDEX"].astype("int"), train.slide_num.astype("int"), train.tumor_score.astype("int"),
                     train.molecular_score, train["sample"])

val = (df.where(df.slide_num > 400)
         .rdd
         .zipWithIndex()
         .map(lambda r: (r[1] + 1, *r[0]))
         .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
val = val.select(val["__INDEX"].astype("int"), val.slide_num.astype("int"), val.tumor_score.astype("int"),
                 val.molecular_score, val["sample"])

train, val

In [ ]:
# Write
# TODO: Wrap this in a function with appropriate default arguments
mode = "error"
tr_filename = os.path.join("data", "train_{}{}.parquet".format(sample_size, "_grayscale" if grayscale else ""))
val_filename = os.path.join("data", "val_{}{}.parquet".format(sample_size, "_grayscale" if grayscale else ""))
train.write.mode(mode).save(tr_filename, format="parquet")
val.write.mode(mode).save(val_filename, format="parquet")

Sample Data

TODO: Wrap this in a function with appropriate default arguments


In [ ]:
tr_filename = os.path.join("data", "train_{}{}.parquet".format(sample_size, "_grayscale" if grayscale else ""))
val_filename = os.path.join("data", "val_{}{}.parquet".format(sample_size, "_grayscale" if grayscale else ""))
train = sqlContext.read.load(tr_filename)
val = sqlContext.read.load(val_filename)

In [ ]:
# Take a stratified sample
p=0.01
train_sample = train.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42)
val_sample = val.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42)
train_sample, val_sample

In [ ]:
# TODO: turn this into a function
# Repartition to get ~128MB partitions

# TODO: Update logic to use the following to automatically
# select the number of partitions:
# ex_mb = SIZE*SIZE*CHANNELS * 8 / 1024 / 1024  # size of one example in MB
# ideal_part_size_mb = 128  # 128 MB partitions sizes are empirically ideal
# ideal_exs_per_part = round(ideal_part_size_mb / ex_mb)
# tr_parts = round(tc / ideal_exs_per_part)
# val_parts = round(vc / ideal_exs_per_part)

if grayscale:
  train_sample = train_sample.repartition(150)  #300) #3000)
  val_sample = val_sample.repartition(40)  #80) #800)
else:  # 3x
  train_sample = train_sample.repartition(450)  #900) #9000)
  val_sample = val_sample.repartition(120)  #240) #2400)

# Reassign row indices
train_sample = (
  train_sample.rdd
              .zipWithIndex()
              .map(lambda r: (r[1] + 1, *r[0]))
              .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
train_sample = train_sample.select(train_sample["__INDEX"].astype("int"), train_sample.slide_num.astype("int"), 
                                   train_sample.tumor_score.astype("int"), train_sample.molecular_score, train_sample["sample"])

val_sample = (
  val_sample.rdd
            .zipWithIndex()
            .map(lambda r: (r[1] + 1, *r[0]))
            .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
val_sample = val_sample.select(val_sample["__INDEX"].astype("int"), val_sample.slide_num.astype("int"), 
                               val_sample.tumor_score.astype("int"), val_sample.molecular_score, val_sample["sample"])

train_sample, val_sample

# Write
# TODO: Wrap this in a function with appropriate default arguments
mode = "error"
tr_sample_filename = os.path.join("data", "train_{}_sample_{}{}.parquet".format(p, sample_size, "_grayscale" if grayscale else ""))
val_sample_filename = os.path.join("data", "val_{}_sample_{}{}.parquet".format(p, sample_size, "_grayscale" if grayscale else ""))
train_sample.write.mode(mode).save(tr_sample_filename, format="parquet")
val_sample.write.mode(mode).save(val_sample_filename, format="parquet")