In [1]:
import re
import csv
In [2]:
dataline_pat = re.compile(r"^\d{6} .*$")
def txt_to_datalines(txt):
def is_dataline(line):
return re.match(dataline_pat, line)
lines = txt.split("\n")
datalines = list(filter(is_dataline, lines))
return datalines
In [3]:
def get_country(line):
return line[16:18]
def has_valid_country(line):
country = get_country(line)
return (
country in ("US", "GB", "CP", "FR", "IN", "PC", "IS", "PK", "NK") and
line[16:24] != "FRIJOLES"
)
In [4]:
def txt_to_data(txt):
datalines = txt_to_datalines(txt)
country_lines = list(filter(has_valid_country, datalines))
fixed_width = ("{0: <80}".format(line) for line in country_lines)
return [ {
"date": line[:6].strip(),
"time": line[7:15].strip(),
"testing_party": line[16:18].strip(),
"site": line[18:21].strip(),
"subsite": line[21].strip(),
"type": line[23:27].strip(),
"body_wave_magnitude": line[28:31].strip(),
"surface_wave_magnitude": line[32:35].strip(),
"explosive_yield": line[36:41].strip(),
"latitude": line[42:49].strip().upper(),
"longitude": line[50:58].strip().upper(),
"purpose": line[59:61].strip(),
"device_type": line[61:63].strip(),
"rock_type": line[63:67].strip(),
"device_position": line[67].strip(),
"name": line[68:76].strip(),
"references": line[76:80].strip()
} for line in fixed_width ]
In [5]:
def data_to_csv(data, outfile):
cols = [ "date", "time", "testing_party", "site", "subsite", "type",
"body_wave_magnitude", "surface_wave_magnitude", "explosive_yield",
"latitude", "longitude", "purpose", "device_type", "rock_type",
"device_position", "name", "references" ]
writer = csv.DictWriter(outfile, fieldnames=cols)
writer.writeheader()
writer.writerows(data)
In [6]:
txt = open("../documents/oklahoma-catalog-original.txt").read()
data = txt_to_data(txt)
In [7]:
with open("../data/oklahoma-catalog-explosions.csv", "w") as f:
data_to_csv(data, f)