polipoly/polipoly.py

277 lines
9.7 KiB
Python
Raw Normal View History

"""Python library for working with political boundaries
Political boundaries are defined by one or more polygons and obtained
from census.gov shapefiles. Census boundary shapefiles are available at
http://www.census.gov/geo/www/cob/bdy_files.html.
At the moment this library has only been used with State and Congressional
District boundaries.
"""
__author__ = "James Turk (james.p.turk@gmail.com)"
__version__ = "0.1.0"
__copyright__ = "Copyright (c) 2007 Sunlight Labs"
__license__ = "BSD"
import urllib
from shapelib import ShapeFile
from dbflib import DBFFile
### General Utilities ###
# internally used helper function
def left_of_edge(point, ep1, ep2):
"""Determine if point is left of infinite line touching ep1 and ep2"""
return ((ep1[0]-point[0])*(ep2[1]-point[1]) -
(ep2[0]-point[0])*(ep1[1]-point[1])) < 0
class Polygon(object):
''' Simple polygon class used for point containment testing and conversion
Allows for testing if a polygon contains a point as well as conversion
to various portable representations '''
def __init__(self, vertices):
self.vertices = vertices
def contains(self, point):
''' Determine if point lies within the polygon. '''
# initially winds is 0
winds = 0
# iterate over edges
for i in xrange(len(self.vertices)-1):
# add wind if edge crosses point going up and point is to left
if (self.vertices[i][1] < point[1] < self.vertices[i+1][1] and
left_of_edge(point, self.vertices[i], self.vertices[i+1])):
winds += 1
# end wind if edge crosses point going down and point is to right
elif (self.vertices[i][1] > point[1] > self.vertices[i+1][1] and
not left_of_edge(point, self.vertices[i], self.vertices[i+1])):
winds -= 1
# point is contained if net winds is not zero
return winds != 0
def to_kml(self):
''' get KML polygon representation '''
coordstr = ' '.join("%.15f,%.15f" % v for v in self.vertices)
return '''<Polygon><outerBoundaryIs><LinearRing>
<coordinates>%s</coordinates>
</LinearRing></outerBoundaryIs></Polygon>''' % coordstr
### Exceptions ###
class GeocodingError(Exception):
"""Custom exception which maps possible google geocoder error codes to
human readable strings.
See http://www.google.com/apis/maps/documentation/reference.html#GGeoStatusCode
"""
STATUS_CODES = {500: 'Unknown Geocoding Server Error',
601: 'Empty Address',
602: 'Unknown Address',
603: 'Prohibited Address',
610: 'Bad API Key',
620: 'Too Many Requests'}
def __init__(self, code):
Exception.__init__(self)
self.code = int(code)
def __str__(self):
return 'GeocodingError: %d - %s' % (self.code,
self.STATUS_CODES[self.code])
class ShapefileError(Exception):
""" Exception for problems with census shapefiles."""
def __init__(self, message):
Exception.__init__(self, message)
def __str__(self):
return 'ShapefileError: %s' % (self.message)
### Census Shapefiles ###
# Mapping of FIPS codes to Postal Abbreviations
# obtained from http://www.itl.nist.gov/fipspubs/fip5-2.htm
FIPS_TO_STATE = {
'01':'AL', '02':'AK', '04':'AZ', '05':'AR', '06':'CA', '08':'CO', '09':'CT',
'10':'DE', '11':'DC', '12':'FL', '13':'GA', '15':'HI', '16':'ID', '17':'IL',
'18':'IN', '19':'IA', '20':'KS', '21':'KY', '22':'LA', '23':'ME', '24':'MD',
'25':'MA', '26':'MI', '27':'MN', '28':'MS', '29':'MO', '30':'MT', '31':'NE',
'32':'NV', '33':'NH', '34':'NJ', '35':'NM', '36':'NY', '37':'NC', '38':'ND',
'39':'OH', '40':'OK', '41':'OR', '42':'PA', '44':'RI', '45':'SC', '46':'SD',
'47':'TN', '48':'TX', '49':'UT', '50':'VT', '51':'VA', '53':'WA', '54':'WV',
'55':'WI', '56':'WY', '72':'PR' }
class Entity(object):
""" A named list of polygons associated with a political boundary.
eg. a state, congressional district, or school district"""
def __init__(self, name, entity, vertices, extents):
self.name = name
self.entity = entity
self.polygons = [Polygon(vlist) for vlist in vertices]
self.extents = extents
@staticmethod
def from_shapefile(obj, rec):
""" Factory function that creates a Entity (or derived class) from
a census.gov shapefile. """
# by using the LSAD determine if a subclass is defined for this entity
lsad_mapping = {
('01'): State,
('C1', 'C2', 'C3', 'C4'): CongressDistrict
}
for lsads, cls in lsad_mapping.iteritems():
if rec['LSAD'] in lsads:
return cls.from_shapefile(obj, rec)
# if there is no mapping for the LSAD just construct a Entity
return Entity('', rec['LSAD_TRANS'], obj.vertices(), obj.extents())
def in_rect(self, point):
""" Check if a point lies within the bounding extents of the entity """
return self.extents[0][0] < point[0] < self.extents[1][0] and \
self.extents[0][1] < point[1] < self.extents[1][1]
def contains(self, point):
""" Check if a point lies within any of the entities polygons """
if self.in_rect(point):
for poly in self.polygons:
if poly.contains(point):
return True
return False
def to_kml(self):
""" Return a KML Placemark representing the entity """
return """<Placemark><name>%s</name>
<MultiGeometry>%s</MultiGeometry>
</Placemark>""" % (self.name, ''.join(poly.to_kml() for poly in self.polygons))
class CongressDistrict(Entity):
""" Entity with state and district properties. """
def __init__(self, entity, vertices, extents, state, district):
Entity.__init__(self, '%s-%s' % (state, district),
entity, vertices, extents)
self.state = state
self.district = district
@staticmethod
def from_shapefile(obj, rec):
""" Construct a CongressDistrict from a census.gov shapefile """
return CongressDistrict(rec['LSAD_TRANS'], obj.vertices(),
obj.extents(), FIPS_TO_STATE[rec['STATE']],
rec['CD'])
class State(Entity):
""" Entity for states, adds a state property """
def __init__(self, vertices, extents, state):
Entity.__init__(self, 'State', state, vertices, extents)
self.state = state
@staticmethod
def from_shapefile(obj, rec):
""" Construct a State from a census.gov shapefile """
return State(obj.vertices(), obj.extents(), rec['NAME'])
def read_census_shapefile(filename):
"""Read census shapefile and return list of entity-derived objects.
Given the base name of a census .shp/.dbf file returns a list of all
Entity-derived objects described by the the file.
"""
try:
shp = ShapeFile(filename)
except IOError:
raise ShapefileError('Could not open %s.shp' % filename)
try:
dbf = DBFFile(filename)
except IOError:
raise ShapefileError('Could not open %s.dbf' % filename)
shape_count = shp.info()[0]
# shape_count should always equal dbf.record_count()
if shape_count != dbf.record_count():
raise ShapefileError('SHP/DBF record count mismatch (SHP=%d, DBF=%d)' %
(shape_count, dbf.record_count()))
# generator version
#for i in xrange(shp.info()[0]):
# yield Entity.fromShapefile(shp.read_object(i), dbf.read_record(i))
# shp.info()[0] is the number of objects
return [Entity.from_shapefile(shp.read_object(i), dbf.read_record(i))
for i in xrange(shape_count)]
### Geocoding ###
class AddressToDistrictService(object):
"""Reusable service which maps addresses to districts using the census
data and the google geocoder.
Usage:
service = AddressToDistrictService('google-maps-apikey','path-to-cd99')
lat,lng,district = service.address_to_district('address')
"""
def __init__(self, apikey, census_file):
"""AddressToDistrictService constructor
Initialize given a google maps API key and a path to a census.gov
all congressional districts (cd99) dataset.
Google maps API keys are available from:
http://www.google.com/apis/maps/signup.html
The cd99_110 dataset is available from:
http://www.census.gov/geo/www/cob/cd110.html
"""
self.apikey = apikey
self.boundaries = read_census_shapefile(census_file)
def geocode(self, address):
"""Convert an address into a latitude/longitude via google maps"""
url = 'http://maps.google.com/maps/geo?output=csv&q=%s&key=%s' % \
(urllib.quote(address), self.apikey)
# returns status,level-of-detail,lat,long
status, lat, lat, lng = urllib.urlopen(url).read().split(',')
# 200 - OK
if status == '200':
return lat, lng
else:
raise GeocodingError(status)
def address_to_district(self, address):
"""Given an address returns the congressional district it lies within.
This function works by geocoding the address and then finding the point
that the returned lat/long returned lie within.
"""
lat, lng = self.geocode(address)
flat, flng = float(lat), float(lng)
return lat, lng, [(cb.state, cb.district) for cb in self.boundaries
if cb.contains((flng,flat))]