Here we will go through the process of writing a parser that parses data from an arbitrary format, in this case the format is provided in the file new_parser_data.xlsx
and consists of one sheet with data grouped by time point, and another sheet with the identifiers
Generally, it makes sense to start with one of the existing parsers as a guide. In this case, the spectromax_OD
parser would be most relevant. The sample data is available in tests/test_data/sample_parser_docs_data.xlsx
In [1]:
from impact.parsers import Parser, parse_raw_identifier, parse_time_point_list
from impact import TimePoint
def new_parser(experiment, data, id_type='traverse'):
# Define the layout of our data
first_row_index = 0
plate_size = 8
spacing = 1
time_row_col = [0,0]
data_row_col = [1,0]
# Define the type of data this parser accepts
analyte_name = 'OD600'
analyte_type = 'biomass'
# In this case, we can first prepare the data by extracting the relevant information from each sheet
unparsed_identifiers = data['identifiers']
raw_data = data['data']
# The data starts at (1,1) and is in a 8x12 format
timepoint_list = []
# We first parse the identifiers, as these can be recycled (the only thing that is changing is the time)
identifiers = []
for i, row in enumerate(unparsed_identifiers):
parsed_row = []
for j, data in enumerate(row):
# Here we can implement logic to exclude any data which is not present, for example when a plate is not full
# In this case, any cell which is empty, 0, or None will be excluded
if unparsed_identifiers[i][j] not in ['', 0, '0', None]:
temp_trial_identifier = parse_raw_identifier(unparsed_identifiers[i][j], id_type)
parsed_row.append(temp_trial_identifier)
else:
parsed_row.append(None)
identifiers.append(parsed_row)
for start_row_index in range(first_row_index, len(raw_data), plate_size+spacing):
if raw_data[start_row_index][0] != '~End':
time = int(raw_data[start_row_index+time_row_col[0]][time_row_col[1]])
# Define the data for a single plate, single timepoint
plate_data = [row[2:14] for row in raw_data[start_row_index:start_row_index+plate_size]]
# Load the data point by point
for i, row in enumerate(plate_data):
for j, data in enumerate(row):
# Skip wells where no identifier is listed or no data present
if identifiers[i][j] is not None and data not in [None,'']:
ti = identifiers[i][j]
ti.analyte_type, ti.analyte_name = analyte_type, analyte_name
time_point = TimePoint(ti, time, float(data))
timepoint_list.append(time_point)
else:
break
# Finally we parse all of the time points (into their logical strucutre based on identifiers)
# And add them to the experiment
replicate_trial_list = parse_time_point_list(timepoint_list)
for rep in replicate_trial_list:
experiment.add_replicate_trial(rep)
With the new parser defined, we can register it to the Parser class, and directly parse our data. The parser will return an Experiment
instance, containing all the data.
In [2]:
Parser.register_parser('my_new_format',new_parser)
expt = Parser.parse_raw_data('my_new_format',file_name='../tests/test_data/sample_parser_docs_data.xlsx')
In [3]:
print(expt)