In [ ]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
import math
import matplotlib.pyplot as plt
import numpy as np
import openslide
from openslide.deepzoom import DeepZoomGenerator
import pandas as pd
from pyspark.mllib.linalg import Vectors
from scipy.ndimage.morphology import binary_fill_holes
from skimage.color import rgb2gray
from skimage.feature import canny
from skimage.morphology import binary_closing, binary_dilation, disk
plt.rcParams['figure.figsize'] = (10, 6)
In [ ]:
def open_slide(slide_num, folder, training):
"""
Open a whole-slide image, given an image number.
Args:
slide_num: Slide image number as an integer.
folder: Directory in which the slides folder is stored, as a string.
This should contain either a `training_image_data` folder with
images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
folder with images in the format `TUPAC-TE-###.svs`.
training: Boolean for training or testing datasets.
Returns:
An OpenSlide object representing a whole-slide image.
"""
if training:
filename = os.path.join(folder, "training_image_data", "TUPAC-TR-{}.svs".format(str(slide_num).zfill(3)))
else:
# Testing images
filename = os.path.join(folder, "testing_image_data", "TUPAC-TE-{}.svs".format(str(slide_num).zfill(3)))
slide = openslide.open_slide(filename)
return slide
In [ ]:
def create_tile_generator(slide, tile_size=1024, overlap=0):
"""
Create a tile generator for the given slide.
This generator is able to extract tiles from the overall
whole-slide image.
Args:
slide: An OpenSlide object representing a whole-slide image.
tile_size: The width and height of a square tile to be generated.
overlap: Number of pixels by which to overlap the tiles.
Returns:
A DeepZoomGenerator object representing the tile generator. Each
extracted tile is an Image with shape (tile_size, tile_size, channels).
Note: This generator is not a true "Python generator function", but
rather is an object that is capable of extracting individual tiles.
"""
generator = DeepZoomGenerator(slide, tile_size=tile_size, overlap=overlap, limit_bounds=True)
return generator
In [ ]:
def get_20x_zoom_level(slide, generator):
"""
Return the zoom level that corresponds to a 20x magnification.
The generator can extract tiles from multiple zoom levels, downsampling
by a factor of 2 per level from highest to lowest resolution.
Args:
slide: An OpenSlide object representing a whole-slide image.
generator: A DeepZoomGenerator object representing a tile generator.
Note: This generator is not a true "Python generator function", but
rather is an object that is capable of extracting individual tiles.
Returns:
Zoom level corresponding to a 20x magnification, or as close as possible.
"""
highest_zoom_level = generator.level_count - 1 # 0-based indexing
try:
mag = int(slide.properties[openslide.PROPERTY_NAME_OBJECTIVE_POWER])
# `mag / 20` gives the downsampling factor between the slide's
# magnification and the desired 20x magnification.
# `(mag / 20) / 2` gives the zoom level offset from the highest
# resolution level, based on a 2x downsampling factor in the
# generator.
offset = math.floor((mag / 20) / 2)
level = highest_zoom_level - offset
except ValueError:
# In case the slide magnification level is unknown, just
# use the highest resolution.
level = highest_zoom_level
return level
In [ ]:
def process_slide(slide_num, folder, training, tile_size=1024, overlap=0):
"""
Generate all possible tile indices for a whole-slide image.
Given a slide number, tile size, and overlap, generate
all possible (slide_num, tile_size, overlap, zoom_level, col, row)
indices.
Args:
slide_num: Slide image number as an integer.
folder: Directory in which the slides folder is stored, as a string.
This should contain either a `training_image_data` folder with
images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
folder with images in the format `TUPAC-TE-###.svs`.
training: Boolean for training or testing datasets.
tile_size: The width and height of a square tile to be generated.
overlap: Number of pixels by which to overlap the tiles.
Returns:
A list of (slide_num, tile_size, overlap, zoom_level, col, row)
integer index tuples representing possible tiles to extract.
"""
# Open slide.
slide = open_slide(slide_num, folder, training)
# Create tile generator.
generator = create_tile_generator(slide, tile_size, overlap)
# Get 20x zoom level.
zoom_level = get_20x_zoom_level(slide, generator)
# Generate all possible (zoom_level, col, row) tile index tuples.
cols, rows = generator.level_tiles[zoom_level]
tile_indices = [(slide_num, tile_size, overlap, zoom_level, col, row)
for col in range(cols) for row in range(rows)]
return tile_indices
In [ ]:
def process_tile_index(tile_index, folder, training):
"""
Generate a tile from a tile index.
Given a (slide_num, tile_size, overlap, zoom_level, col, row) tile
index, generate a (slide_num, tile) tuple.
Args:
tile_index: A (slide_num, tile_size, overlap, zoom_level, col, row)
integer index tuple representing a tile to extract.
folder: Directory in which the slides folder is stored, as a string.
This should contain either a `training_image_data` folder with
images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
folder with images in the format `TUPAC-TE-###.svs`.
training: Boolean for training or testing datasets.
Returns:
A (slide_num, tile) tuple, where slide_num is an integer, and tile
is a 3D NumPy array of shape (tile_size, tile_size, channels) in
RGB format.
"""
slide_num, tile_size, overlap, zoom_level, col, row = tile_index
# Open slide.
slide = open_slide(slide_num, folder, training)
# Create tile generator.
generator = create_tile_generator(slide, tile_size, overlap)
# Generate tile
tile = np.array(generator.get_tile(zoom_level, (col, row)))
return (slide_num, tile)
In [ ]:
def keep_tile(tile_tuple, tile_size=1024, tissue_threshold=0.9):
"""
Determine if a tile should be kept.
This filters out tiles based on size and a tissue percentage
threshold, using a custom algorithm. If a tile has height &
width equal to (tile_size, tile_size), and contains greater
than or equal to the given percentage, then it will be kept;
otherwise it will be filtered out.
Args:
tile_tuple: A (slide_num, tile) tuple, where slide_num is an
integer, and tile is a 3D NumPy array of shape
(tile_size, tile_size, channels) in RGB format.
tile_size: The width and height of a square tile to be generated.
tissue_threshold: Tissue percentage threshold.
Returns:
A Boolean indicating whether or not a tile should be kept
for future usage.
"""
slide_num, tile = tile_tuple
if tile.shape[0:2] == (tile_size, tile_size):
# Convert 3D RGB image to 2D grayscale image, from
# 0 (dense tissue) to 1 (plain background).
tile = rgb2gray(tile)
# 8-bit depth complement, from 1 (dense tissue)
# to 0 (plain background).
tile = 1 - tile
# Canny edge detection with hysteresis thresholding.
# This returns a binary map of edges, with 1 equal to
# an edge. The idea is that tissue would be full of
# edges, while background would not.
tile = canny(tile)
# Binary closing, which is a dilation followed by
# an erosion. This removes small dark spots, which
# helps remove noise in the background.
tile = binary_closing(tile, disk(10))
# Binary dilation, which enlarges bright areas,
# and shrinks dark areas. This helps fill in holes
# within regions of tissue.
tile = binary_dilation(tile, disk(10))
# Fill remaining holes within regions of tissue.
tile = binary_fill_holes(tile)
# Calculate percentage of tissue coverage.
percentage = tile.mean()
return percentage >= tissue_threshold
else:
return False
In [ ]:
def process_tile(tile_tuple, sample_size=256, grayscale=False):
"""
Process a tile into a group of smaller samples.
Cut up a tile into smaller blocks of sample_size x sample_size pixels,
change the shape of each sample from (H, W, channels) to
(channels, H, W), then flatten each into a vector of length
channels*H*W.
Args:
tile_tuple: A (slide_num, tile) tuple, where slide_num is an
integer, and tile is a 3D NumPy array of shape
(tile_size, tile_size, channels).
sample_size: The new width and height of the square samples to be
generated.
grayscale: Whether or not to generate grayscale samples, rather
than RGB.
Returns:
A list of (slide_num, sample) tuples representing cut up tiles,
where each sample has been transposed from
(sample_size_x, sample_size_y, channels) to (channels, sample_size_x, sample_size_y),
and flattened to a vector of length (channels*sample_size_x*sample_size_y).
"""
slide_num, tile = tile_tuple
if grayscale:
tile = rgb2gray(tile)[:, :, np.newaxis] # Grayscale
# Save disk space and future IO time by converting from [0,1] to [0,255],
# at the expense of some minor loss of information.
tile = np.round(tile * 255).astype("uint8")
x, y, ch = tile.shape
# 1. Reshape into a 5D array of (num_x, sample_size_x, num_y, sample_size_y, ch), where
# num_x and num_y are the number of chopped tiles on the x and y axes, respectively.
# 2. Swap sample_size_x and num_y axes to create (num_x, num_y, sample_size_x, sample_size_y, ch).
# 3. Combine num_x and num_y into single axis, returning
# (num_samples, sample_size_x, sample_size_y, ch).
# 4. Swap axes from (num_samples, sample_size_x, sample_size_y, ch) to
# (num_samples, ch, sample_size_x, sample_size_y).
# 5. Flatten samples into (num_samples, ch*sample_size_x*sample_size_y).
samples = (tile.reshape((x // sample_size, sample_size, y // sample_size, sample_size, ch))
.swapaxes(1,2)
.reshape((-1, sample_size, sample_size, ch))
.transpose(0,3,1,2))
samples = samples.reshape(samples.shape[0], -1)
samples = [(slide_num, sample) for sample in list(samples)]
return samples
In [ ]:
def visualize_tile(tile):
"""
Plot a tissue tile.
Args:
tile: A 3D NumPy array of shape (tile_size, tile_size, channels).
Returns:
None
"""
plt.imshow(tile)
plt.show()
In [ ]:
def visualize_sample(sample, size=256):
"""
Plot a tissue sample.
Args:
sample: A square sample flattened to a vector of size
(channels*size_x*size_y).
size: The width and height of the square samples.
Returns:
None
"""
# Change type, reshape, transpose to (size_x, size_y, channels).
length = sample.shape[0]
channels = int(length / (size * size))
if channels > 1:
sample = sample.astype('uint8').reshape((channels, size, size)).transpose(1,2,0)
plt.imshow(sample)
else:
vmax = 255 if sample.max() > 1 else 1
sample = sample.reshape((size, size))
plt.imshow(sample, cmap="gray", vmin=0, vmax=vmax)
plt.show()
In [ ]:
def create_ground_truth_maps(folder):
"""
Create lookup maps for ground truth labels.
Args:
folder: Directory in which the slides folder is stored, as a string.
This should contain a `training_ground_truth.csv` file.
"""
filename = os.path.join(folder, "training_ground_truth.csv")
labels = pd.read_csv(filename, names=["tumor_score","molecular_score"], header=None)
labels["slide_num"] = range(1, 501)
# Create slide_num -> tumor_score, and slide_num -> molecular_score dictionaries
tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, labels.tumor_score)}
molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, labels.molecular_score)}
return tumor_score_dict, molecular_score_dict
In [ ]:
def preprocess(slide_nums, folder="data", training=True, tile_size=1024, overlap=0,
tissue_threshold=0.9, sample_size=256, grayscale=False, num_partitions=20000):
"""
Preprocess a set of whole-slide images.
Preprocess a set of whole-slide images as follows:
1. Tile the slides into tiles of size (tile_size, tile_size, 3).
2. Filter the tiles to remove unnecessary tissue.
3. Cut the remaining tiles into samples of size (sample_size, sample_size, ch),
where `ch` is 1 if `grayscale` is true, or 3 otherwise.
Args:
slide_nums: List of whole-slide numbers to process.
folder: Directory in which the slides folder is stored, as a string.
This should contain either a `training_image_data` folder with
images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
folder with images in the format `TUPAC-TE-###.svs`.
training: Boolean for training or testing datasets.
tile_size: The width and height of a square tile to be generated.
overlap: Number of pixels by which to overlap the tiles.
tissue_threshold: Tissue percentage threshold for filtering.
sample_size: The new width and height of the square samples to be
generated.
grayscale: Whether or not to generate grayscale samples, rather
than RGB.
num_partitions: Number of partitions to use during processing.
Returns:
A DataFrame in which each row contains the slide number, tumor score,
molecular score, and the sample stretched out into a Vector.
"""
slides = sc.parallelize(slide_nums)
# Force even partitioning by collecting and parallelizing -- for memory issues
## HACK Note: This was a PySpark bug with a fix in the master branch now.
tile_indices = slides.flatMap(lambda slide: process_slide(slide, folder, training, tile_size, overlap)).collect()
tile_indices = sc.parallelize(tile_indices, num_partitions)
## END HACK -- update later
tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
if training:
tumor_score_dict, molecular_score_dict = create_ground_truth_maps(folder)
samples_with_labels = (samples.map(lambda tup:
(tup[0], tumor_score_dict[tup[0]], molecular_score_dict[tup[0]],
Vectors.dense(tup[1]))))
df = samples_with_labels.toDF(["slide_num", "tumor_score", "molecular_score", "sample"])
df = df.select(df.slide_num.astype("int"), df.tumor_score.astype("int"), df.molecular_score, df["sample"])
else: # testing data -- no labels
df = samples.toDF(["slide_num", "sample"])
df = df.select(df.slide_num.astype("int"), df["sample"])
df = df.repartition(num_partitions) # Even out the partitions
return df
def save(df, training=True, sample_size=256, grayscale=False, mode="error"):
"""
Save a preprocessed DataFrame of samples in Parquet format.
The filename will be formatted as follows:
`samples_{labels|testing}_SAMPLE-SIZE[_grayscale].parquet`
Args:
df: A DataFrame in which each row contains the slide number, tumor score,
molecular score, and the sample stretched out into a Vector.
training: Boolean for training or testing datasets.
sample_size: The width and height of the square samples.
grayscale: Whether or not to the samples are in grayscale format, rather
than RGB.
mode: Specifies the behavior of `df.write.mode` when the data already exists.
Options include:
* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
"""
filename = "samples_{}_{}{}.parquet".format("labels" if training else "testing",
sample_size,
"_grayscale" if grayscale else "")
filepath = os.path.join("data", filename)
df.write.mode(mode).save(filepath, format="parquet")
In [ ]:
# TODO: Filtering tiles and then cutting into samples could result
# in samples with less tissue than desired, despite that being the
# procedure of the paper. Look into simply selecting tiles of the
# desired size to begin with.
In [ ]:
# Get list of image numbers, minus the broken ones
broken = {2, 45, 91, 112, 242, 256, 280, 313, 329, 467}
slide_nums = sorted(set(range(1,501)) - broken)
# Settings
training = True
sample_size=64
grayscale = True
num_partitions = 20000
folder = "/home/MDM/breast_cancer/data"
In [ ]:
# Process all slides
df = preprocess(slide_nums, sample_size=sample_size, grayscale=grayscale,
training=training, num_partitions=num_partitions, folder=folder)
df
In [ ]:
# Save DataFrame of samples
save(df, sample_size=sample_size, grayscale=grayscale, training=training)
In [ ]:
filename = "samples_{}_{}{}.parquet".format("labels" if training else "testing",
sample_size,
"_grayscale" if grayscale else "")
filepath = os.path.join("data", filename)
df = sqlContext.read.load(filepath)
In [ ]:
labels = pd.read_csv("data/training_ground_truth.csv", names=["tumor_score","molecular_score"], header=None)
labels["slide_num"] = range(1, 501)
# Create slide_num -> tumor_score and slide_num -> molecular_score dictionaries
tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, labels.tumor_score)}
molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, labels.molecular_score)}
In [ ]:
print(labels["tumor_score"].value_counts() / labels.tumor_score.count())
print(labels[labels.slide_num > 400]["tumor_score"].value_counts() / labels[labels.slide_num > 400].tumor_score.count())
In [ ]:
train = (df.where(df.slide_num <= 400)
.rdd
.zipWithIndex()
.map(lambda r: (r[1] + 1, *r[0]))
.toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
train = train.select(train["__INDEX"].astype("int"), train.slide_num.astype("int"), train.tumor_score.astype("int"),
train.molecular_score, train["sample"])
val = (df.where(df.slide_num > 400)
.rdd
.zipWithIndex()
.map(lambda r: (r[1] + 1, *r[0]))
.toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
val = val.select(val["__INDEX"].astype("int"), val.slide_num.astype("int"), val.tumor_score.astype("int"),
val.molecular_score, val["sample"])
train, val
In [ ]:
# Write
# TODO: Wrap this in a function with appropriate default arguments
mode = "error"
tr_filename = os.path.join("data", "train_{}{}.parquet".format(sample_size, "_grayscale" if grayscale else ""))
val_filename = os.path.join("data", "val_{}{}.parquet".format(sample_size, "_grayscale" if grayscale else ""))
train.write.mode(mode).save(tr_filename, format="parquet")
val.write.mode(mode).save(val_filename, format="parquet")
In [ ]:
tr_filename = os.path.join("data", "train_{}{}.parquet".format(sample_size, "_grayscale" if grayscale else ""))
val_filename = os.path.join("data", "val_{}{}.parquet".format(sample_size, "_grayscale" if grayscale else ""))
train = sqlContext.read.load(tr_filename)
val = sqlContext.read.load(val_filename)
In [ ]:
# Take a stratified sample
p=0.01
train_sample = train.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42)
val_sample = val.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42)
train_sample, val_sample
In [ ]:
# TODO: turn this into a function
# Repartition to get ~128MB partitions
# TODO: Update logic to use the following to automatically
# select the number of partitions:
# ex_mb = SIZE*SIZE*CHANNELS * 8 / 1024 / 1024 # size of one example in MB
# ideal_part_size_mb = 128 # 128 MB partitions sizes are empirically ideal
# ideal_exs_per_part = round(ideal_part_size_mb / ex_mb)
# tr_parts = round(tc / ideal_exs_per_part)
# val_parts = round(vc / ideal_exs_per_part)
if grayscale:
train_sample = train_sample.repartition(150) #300) #3000)
val_sample = val_sample.repartition(40) #80) #800)
else: # 3x
train_sample = train_sample.repartition(450) #900) #9000)
val_sample = val_sample.repartition(120) #240) #2400)
# Reassign row indices
train_sample = (
train_sample.rdd
.zipWithIndex()
.map(lambda r: (r[1] + 1, *r[0]))
.toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
train_sample = train_sample.select(train_sample["__INDEX"].astype("int"), train_sample.slide_num.astype("int"),
train_sample.tumor_score.astype("int"), train_sample.molecular_score, train_sample["sample"])
val_sample = (
val_sample.rdd
.zipWithIndex()
.map(lambda r: (r[1] + 1, *r[0]))
.toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
val_sample = val_sample.select(val_sample["__INDEX"].astype("int"), val_sample.slide_num.astype("int"),
val_sample.tumor_score.astype("int"), val_sample.molecular_score, val_sample["sample"])
train_sample, val_sample
# Write
# TODO: Wrap this in a function with appropriate default arguments
mode = "error"
tr_sample_filename = os.path.join("data", "train_{}_sample_{}{}.parquet".format(p, sample_size, "_grayscale" if grayscale else ""))
val_sample_filename = os.path.join("data", "val_{}_sample_{}{}.parquet".format(p, sample_size, "_grayscale" if grayscale else ""))
train_sample.write.mode(mode).save(tr_sample_filename, format="parquet")
val_sample.write.mode(mode).save(val_sample_filename, format="parquet")