In [83]:
import os
import xml.etree.cElementTree as ET
import pprint
import re
import codecs
import json
import collections
In [84]:
DATADIR = "data"
DATAFILE = "honolulu_hawaii.osm"
datafile = os.path.join(DATADIR, DATAFILE)
In [85]:
def count_tags(filename):
counts = collections.defaultdict(int)
for line in ET.iterparse(filename, events=("start",)):
current = line[1].tag
counts[current] += 1
return counts
tags = count_tags(datafile)
pprint.pprint(tags)
In [86]:
lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')
def key_type(element, keys):
if element.tag == "tag":
k_value = element.attrib['k']
if lower.search(k_value) is not None:
keys['lower'] += 1
elif lower_colon.search(k_value) is not None:
keys['lower_colon'] += 1
elif problemchars.search(k_value) is not None:
keys["problemchars"] += 1
else:
keys['other'] += 1
return keys
def process_map(filename):
keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0}
for _, element in ET.iterparse(filename):
keys = key_type(element, keys)
return keys
all_keys = process_map(datafile)
print all_keys
In [87]:
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)
expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road",
"Trail", "Parkway", "Commons"]
# UPDATE THIS VARIABLE
mapping = { "pl": "Place",
"st": "Street",
"ave": "Avenue",
"rd": "Road",
"w": "West",
"n": "North",
"s": "South",
"e": "East",
"blvd":"Boulevard",
"sr": "Drive",
"ct": "Court",
"ne": "Northeast",
"se": "Southeast",
"nw": "Northwest",
"sw": "Southwest",
"dr": "Drive",
"sq": "Square",
"ln": "Lane",
"trl": "Trail",
"pkwy": "Parkway",
"ste": "Suite",
"lp": "Loop",
"hwy": "Highway"}
def audit_street_type(street_types, street_name):
m = street_type_re.search(street_name)
if m:
street_type = m.group()
if street_type not in expected:
street_types[street_type].add(street_name)
def is_street_name(elem):
return (elem.attrib['k'] == "addr:street")
def audit(osmfile):
osm_file = open(osmfile, "r")
street_types = collections.defaultdict(set)
for event, elem in ET.iterparse(osm_file, events=("start",)):
if elem.tag == "node" or elem.tag == "way":
for tag in elem.iter("tag"):
if is_street_name(tag):
audit_street_type(street_types, tag.attrib['v'])
return street_types
def update_name(name, mapping):
after = []
# Split name string to test each part of the name;
# Replacements may come anywhere in the name.
for part in name.split(" "):
# Normalize each address part to cehck against normalized parts in dict.
part = part.strip(",\.").lower()
# Check each part of the name against the keys in the correction dict
if part in mapping.keys():
# If exists in dict, overwrite that part of the name with the dict value for it.
part = mapping[part]
# Assemble each corrected piece of the name back together.
# Also, capitalize the each address part before adding it back.
# .capitalize() instead of .title() so 1st stays as 1st instead of converting to 1St
after.append(part.capitalize())
# Return all pieces of the name as a string joined by a space.
return " ".join(after)
st_types = audit(datafile)
In [88]:
for st_type, ways in st_types.iteritems():
for name in ways:
better_name = update_name(name, mapping)
print name, "=>", better_name
In [105]:
lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')
CREATED = [ "version", "changeset", "timestamp", "user", "uid"]
def is_address(elem):
if elem.attrib['k'][:5] == "addr:":
return True
def shape_element(element):
# Make an empty dictionary for the output node/element.
node = {}
# Process 'node' or 'way' elements only.
if element.tag == "node" or element.tag == "way":
if element.attrib["id"] == '21442033':
print element.tag
for tag in element.iter("tag"):
print tag.attrib
address_info = {}
nd_info = []
# Add 'node'/'way' as 'type'
node["type"] = element.tag
# Add 'id' as 'id'
node["id"] = element.attrib["id"]
# If visible exists, add it to dict
if "visible" in element.attrib.keys():
node["visible"] = element.attrib["visible"]
# Add 'lat'/'lon' if they ar available
if "lat" in element.attrib.keys():
node["pos"] = [float(element.attrib['lat']), float(element.attrib['lon'])]
# Add version, changeset, timestamp, uid, and user under the root node 'created'
node["created"] = {"version": element.attrib['version'],
"changeset": element.attrib['changeset'],
"timestamp": element.attrib['timestamp'],
"uid": element.attrib['uid'],
"user": element.attrib['user']}
# Iterate through the tags of k,v pairs.
for tag in element.iter("tag"):
#print tag.attrib
p = problemchars.search(tag.attrib['k'])
if p:
# print "PROBLEM:", p.group()
# Do nothing currently
continue
elif is_address(tag):
if ":" in tag.attrib['k'][5:]:
# print "Bad Address:", tag.attrib['k'], "--", tag.attrib['v']
# first 5 char of address attributes should be 'addr:'
# If they're not, it's a bad address for this script.
# Skip.
continue
else:
# If first 5 char contain ':' (i.e. 'addr:'), add the last part of the string as a key and
# the value from 'v' as the value in our address_info dict.
# i.e. 'addr:state' will add 'state'
address_info[tag.attrib['k'][5:]] = tag.attrib['v']
#print "Good Address:", tag.attrib['k'], "--", tag.attrib['v']
else:
# If there's no ':', just add the 'k' as a key, and 'v' as a value in our node dict.
node[tag.attrib['k']] = tag.attrib['v']
#print "Outside:", tag.attrib['k'], "--", tag.attrib['v']
# If we found 'addr:' info and added it to our address_info dict,
if address_info != {}:
# Then add that address_info dict under the node 'address'
node['address'] = address_info
# Iterate through the 'nd' nodes if they exist.
for tag2 in element.iter("nd"):
# add each entry in a running list.
nd_info.append(tag2.attrib['ref'])
# If the resulting list isn't empty,
if nd_info != []:
# Add the list under the node 'node_refs'
node['node_refs'] = nd_info
return node
else:
# If the element isn't 'node' or 'way', just return None.
return None
def process_map(file_in, pretty = False):
# You do not need to change this file
file_out = "{0}.json".format(file_in)
data = []
with codecs.open(file_out, "w") as fo:
for _, element in ET.iterparse(file_in):
el = shape_element(element)
if el:
data.append(el)
if pretty:
fo.write(json.dumps(el, indent=2)+"\n")
else:
fo.write(json.dumps(el) + "\n")
return data
data = process_map("data/honolulu_hawaii.osm", False)
In [104]:
In [ ]: