In [2]:
"""
    Create tests for various FACS file formats:
        1. Windows vs. Mac format testing
        2. Tests for various outputs from different flow cytometry machines (ACCURI, CANTO, FACSCAN)
"""
import os
import re
import csv
import pandas as pd
import numpy as np
import unittest

ACCURI_PATTERN = re.compile(r'(Well)_([A-Z]\d{2})')
CANTO_PATTERN = re.compile(r'[\w\d_-]+_[A-Z]\d+_(?P<well>[A-Z]\d{2})')
FACSCAN_PATTERN = re.compile(r'(?P<plate>\d{2})(?P<well>[A-Z]\d{2})')
ANTIGEN_PATTERN = re.compile(r'(Ag\d+)')
ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"

def well_name_position(well_name, columns=12):
    '''Convert a well name (A01-K12) or (1-96) to its position (1-96)
    '''
    match = re.match( r"^(?P<row>[A-W])?(?P<column>\d{1,2})$", well_name.upper())
    if match:
        # If there is an alphanumeric row position
        if match.group("row"):
            row = ALPHABET.index(match.group("row"))
            column = int(match.group("column"))
            return (row * columns) + column
        # Otherwise the only position is an absolute one
        else:
            return int(match.group("column"))
    else:
        raise ValueError("Invalid well_name: %s" % str(well_name))


def match_facs_pattern_pos(line):
    """Identify line pattern (ACCURI, CANTO, FACSCAN).

        Args: 
            line: a string in the "Sample" columns
        Returns:
            Numerical representation of the well position (i.e. A01 -> 1; H12 -> 96)
    """
    match_accuri = ACCURI_PATTERN.search(line)
    match_canto = CANTO_PATTERN.search(line)
    match_facscan = FACSCAN_PATTERN.search(line)
    # convert well pos -> integer
    if match_accuri:
        return well_name_position(match_accuri.group(2))
    elif match_canto:
        return well_name_position(match_canto.group(1))
    elif match_facscan:
        return well_name_position(match_facscan.group(2))
    else:
        raise ValueError("There is no matching pattern for this file type.")

        
def facs_parse_file(data_file, cols=12):
    """ Parsing FLOWJO FACS files.
    Args:
        data_file: file to Parse
    Returns:
        plates: {'plate - position': [values,]}
    """
    # get the file name and remove all white spaces
    file_name = os.path.basename(data_file.name).split('.')[0].replace(' ', '')
    # handle different delimiters and file formats (csv, tsv) on Windows and Mac 
    sniffer = csv.Sniffer()
    dialect = sniffer.sniff(data_file.read().replace('\r', '\n'))
    data_file.seek(0) # go to the beginning of the file
    # data frame manipulations
    df = pd.read_csv(data_file, sep=dialect.delimiter)
    # keep all of the rows expect Mean and StdDev
    df = df.loc[~df['Sample'].isin(['Mean', 'StdDev'])]
    # replace a "*" with -1
    df = df.replace(['*'], [-1])
    # get antigens from columns
    antigens = df.columns[1:]
    # round all values to 4 significant figures
    current_indexes = [match_facs_pattern_pos(line) for line in df['Sample'] if line not in ('Mean', 'StdDev')]
    full_index = range(1, 97)
    # cannot reindex if 1-96 indexes are present
    if current_indexes != full_index:
        df.index += 1 # index from 1 (not 0)
        df.index = current_indexes # reset index as representated by Sample column
        df = df.reindex(full_index, fill_value=1) # reindex to add missing values
    plates = {"{} - {}".format(file_name, antigen): list(df[antigen]) for antigen in antigens}
    return plates

In [6]:
CWD = os.getcwd()
expected_output_all_accuri = {'accuri - Population 2': [6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 12.0], 
                              'accuri - Population 3': [7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 14.0], 
                              'accuri - Population 1': [4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 10.0]}

expected_output_all_canto = {'canto - Population 2': [6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 12.0], 
                             'canto - Population 3': [7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 14.0], 
                             'canto - Population 1': [4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 10.0]}

expected_output_all_facscan = {'facscan - Population 2': [6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 6.25, 12.0], 
                               'facscan - Population 3': [7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 7.25, 14.0], 
                               'facscan - Population 1': [4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 4.25, 10.0]}


class TestFacs(unittest.TestCase):
    """
        Testing parsing outputs from different flow cytomentry machines (ACCURI, CANTO, FACSCAN)
    """
    
    def test_accuri_96(self):
        file_name = os.path.join(CWD, 'DATA', 'facs_files_04_2016', 'machine_formats', 'accuri.txt')
        with open(file_name, 'r') as data_file:
            output = facs_parse_file(data_file)
            self.assertEqual(output, expected_output_all_accuri)
            
            
    def test_canto_96(self):
        file_name = os.path.join(CWD, 'DATA', 'facs_files_04_2016', 'machine_formats', 'canto.txt')
        with open(file_name, 'r') as data_file:
            output = facs_parse_file(data_file)
            self.assertEqual(output, expected_output_all_canto)
            
            
    def test_facscan_96(self):
        file_name = os.path.join(CWD, 'DATA', 'facs_files_04_2016', 'machine_formats', 'facscan.txt')
        with open(file_name, 'r') as data_file:
            output = facs_parse_file(data_file)
            self.assertEqual(output, expected_output_all_facscan)

suite = unittest.TestSuite()
suite.addTest(TestFacs("test_accuri_96"))
suite.addTest(TestFacs("test_canto_96"))
suite.addTest(TestFacs("test_facscan_96"))
runner = unittest.TextTestRunner()
runner.run(suite)


...
----------------------------------------------------------------------
Ran 3 tests in 0.048s

OK
Out[6]:
<unittest.runner.TextTestResult run=3 errors=0 failures=0>

In [ ]: