In [ ]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
import os
import shutil
import matplotlib.pyplot as plt
import numpy as np
from breastcancer.preprocessing import preprocess, save, train_val_split
# Ship a fresh copy of the `breastcancer` package to the Spark workers.
# Note: The zip must include the `breastcancer` directory itself,
# as well as all files within it for `addPyFile` to work correctly.
# This is equivalent to `zip -r breastcancer.zip breastcancer`.
dirname = "breastcancer"
zipname = dirname + ".zip"
shutil.make_archive(dirname, 'zip', dirname + "/..", dirname)
spark.sparkContext.addPyFile(zipname)
plt.rcParams['figure.figsize'] = (10, 6)
In [ ]:
# TODO: Filtering tiles and then cutting into samples could result
# in samples with less tissue than desired, despite that being the
# procedure of the paper. Look into simply selecting tiles of the
# desired size to begin with.
In [ ]:
# Get list of image numbers, minus the broken ones.
broken = {2, 45, 91, 112, 242, 256, 280, 313, 329, 467}
slide_nums = sorted(set(range(1,501)) - broken)
# Settings
training = True
tile_size = 256
sample_size = 256
grayscale = False
num_partitions = 20000
add_row_indices = True
train_frac = 0.8
split_seed = 24
folder = "data" # Linux-filesystem directory to read raw data
save_folder = "data" # Hadoop-supported directory in which to save DataFrames
df_path = os.path.join(save_folder, "samples_{}_{}{}.parquet".format(
"labels" if training else "testing", sample_size, "_grayscale" if grayscale else ""))
train_df_path = os.path.join(save_folder, "train_{}{}.parquet".format(sample_size,
"_grayscale" if grayscale else ""))
val_df_path = os.path.join(save_folder, "val_{}{}.parquet".format(sample_size,
"_grayscale" if grayscale else ""))
df_path, train_df_path, val_df_path
In [ ]:
# Process all slides.
df = preprocess(spark, slide_nums, tile_size=tile_size, sample_size=sample_size,
grayscale=grayscale, training=training, num_partitions=num_partitions,
folder=folder)
In [ ]:
# Save DataFrame of samples.
save(df, df_path, sample_size, grayscale)
In [ ]:
# Load full DataFrame from disk.
df = spark.read.load(df_path)
In [ ]:
# Split into train and validation DataFrames based On slide number
train, val = train_val_split(spark, df, slide_nums, folder, train_frac, add_row_indices,
seed=split_seed)
In [ ]:
# Save train and validation DataFrames.
save(train, train_df_path, sample_size, grayscale)
save(val, val_df_path, sample_size, grayscale)
In [ ]:
# Load train and validation DataFrames from disk.
train = spark.read.load(train_df_path)
val = spark.read.load(val_df_path)
In [ ]:
# Take a stratified sample.
p=0.01
train_sample = train.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42)
val_sample = val.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42)
train_sample, val_sample
In [ ]:
# Reassign row indices.
# TODO: Wrap this in a function with appropriate default arguments.
train_sample = (
train_sample.rdd
.zipWithIndex()
.map(lambda r: (r[1] + 1, *r[0]))
.toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
train_sample = train_sample.select(train_sample["__INDEX"].astype("int"),
train_sample.slide_num.astype("int"),
train_sample.tumor_score.astype("int"),
train_sample.molecular_score,
train_sample["sample"])
val_sample = (
val_sample.rdd
.zipWithIndex()
.map(lambda r: (r[1] + 1, *r[0]))
.toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
val_sample = val_sample.select(val_sample["__INDEX"].astype("int"),
val_sample.slide_num.astype("int"),
val_sample.tumor_score.astype("int"),
val_sample.molecular_score,
val_sample["sample"])
train_sample, val_sample
In [ ]:
# Save train and validation DataFrames.
tr_sample_filename = "train_{}_sample_{}{}.parquet".format(p, sample_size,
"_grayscale" if grayscale else "")
val_sample_filename = "val_{}_sample_{}{}.parquet".format(p, sample_size,
"_grayscale" if grayscale else "")
train_sample_path = os.path.join(save_folder, tr_sample_filename)
val_sample_path = os.path.join(save_folder, val_sample_filename)
save(train_sample, train_sample_path, sample_size, grayscale)
save(val_sample, val_sample_path, sample_size, grayscale)