This notebook is intended to show how to use pandas, and sql alchemy to upload data into DB2-switch and create geospatial coordinate and indexes.

Install using pip or any other package manager pandas, sqlalchemy and pg8000. The later one is the driver to connect to the db.


In [5]:
import pandas as pd
from sqlalchemy import create_engine

After importing the required packages, first create the engine to connect to the DB. The approach I generally use is to create a string based on the username and password. The code is a function, you just need to fill in with the username, password and the dbname.

It allows you to create different engines to connect to serveral dbs.


In [6]:
def connection(user,passwd,dbname, echo_i=False):
    str1 = ('postgresql+pg8000://' + user +':' + passw + '@switch-db2.erg.berkeley.edu:5432/' 
            + dbname + '?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory')
    engine = create_engine(str1,echo=echo_i,isolation_level='AUTOCOMMIT')
    return engine

In [7]:
user = 'jdlara'
passw = 'Amadeus-2010'
dbname = 'apl_cec' 
engine_db= connection(user,passw,dbname)

Afterwards, use pandas to import the data from Excel files or any other text file format. Make sure that the data in good shape before trying to push it into the server. In this example I use previous knowledge of the structure of the tabs in the excel file to recursively upload each tab and match the name of the table with the tab name.

If you are using csv files just change the commands to pd.read_csv() in this link you can find the documentation.

Before doing this I already checked that the data is properly organized, crate new cells to explore the data beforehand if needed


In [8]:
excel_file = 'substations_table.xlsx'
tab_name = 'sheet1'
schema_for_upload = 'geographic_data'
pd_data.to_sql(name, engine_db, schema=schema_for_upload, if_exists='replace',chunksize=100)

In [ ]:


In [4]:
excel_file = 'substations_table.xlsx'
tab_name = 'sheet1'
#csv_name = ''
schema_for_upload = 'geographic_data'
for name in tab_name:
    pd_data = pd.read_excel(excel_file, encoding='UTF-8')
    pd_data.to_sql(name, engine_db, schema=schema_for_upload, if_exists='replace',chunksize=100)


---------------------------------------------------------------------------
ProgrammingError                          Traceback (most recent call last)
<ipython-input-4-647195f1e621> in <module>()
      5 for name in tab_name:
      6     pd_data = pd.read_excel(excel_file, encoding='UTF-8')
----> 7     pd_data.to_sql(name, engine_db, schema=schema_for_upload, if_exists='replace',chunksize=100)

/Library/Python/2.7/site-packages/pandas/core/generic.pyc in to_sql(self, name, con, flavor, schema, if_exists, index, index_label, chunksize, dtype)
   1163         sql.to_sql(self, name, con, flavor=flavor, schema=schema,
   1164                    if_exists=if_exists, index=index, index_label=index_label,
-> 1165                    chunksize=chunksize, dtype=dtype)
   1166 
   1167     def to_pickle(self, path):

/Library/Python/2.7/site-packages/pandas/io/sql.pyc in to_sql(frame, name, con, flavor, schema, if_exists, index, index_label, chunksize, dtype)
    569     pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index,
    570                       index_label=index_label, schema=schema,
--> 571                       chunksize=chunksize, dtype=dtype)
    572 
    573 

/Library/Python/2.7/site-packages/pandas/io/sql.pyc in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype)
   1247                          if_exists=if_exists, index_label=index_label,
   1248                          schema=schema, dtype=dtype)
-> 1249         table.create()
   1250         table.insert(chunksize)
   1251         if (not name.isdigit() and not name.islower()):

/Library/Python/2.7/site-packages/pandas/io/sql.pyc in create(self)
    700                     "'{0}' is not valid for if_exists".format(self.if_exists))
    701         else:
--> 702             self._execute_create()
    703 
    704     def insert_statement(self):

/Library/Python/2.7/site-packages/pandas/io/sql.pyc in _execute_create(self)
    685         # Inserting table into database, add to MetaData object
    686         self.table = self.table.tometadata(self.pd_sql.meta)
--> 687         self.table.create()
    688 
    689     def create(self):

/Library/Python/2.7/site-packages/sqlalchemy/sql/schema.pyc in create(self, bind, checkfirst)
    723         bind._run_visitor(ddl.SchemaGenerator,
    724                           self,
--> 725                           checkfirst=checkfirst)
    726 
    727     def drop(self, bind=None, checkfirst=False):

/Library/Python/2.7/site-packages/sqlalchemy/engine/base.pyc in _run_visitor(self, visitorcallable, element, connection, **kwargs)
   1854                      connection=None, **kwargs):
   1855         with self._optional_conn_ctx_manager(connection) as conn:
-> 1856             conn._run_visitor(visitorcallable, element, **kwargs)
   1857 
   1858     class _trans_ctx(object):

/Library/Python/2.7/site-packages/sqlalchemy/engine/base.pyc in _run_visitor(self, visitorcallable, element, **kwargs)
   1479     def _run_visitor(self, visitorcallable, element, **kwargs):
   1480         visitorcallable(self.dialect, self,
-> 1481                         **kwargs).traverse_single(element)
   1482 
   1483 

/Library/Python/2.7/site-packages/sqlalchemy/sql/visitors.pyc in traverse_single(self, obj, **kw)
    119             meth = getattr(v, "visit_%s" % obj.__visit_name__, None)
    120             if meth:
--> 121                 return meth(obj, **kw)
    122 
    123     def iterate(self, obj):

/Library/Python/2.7/site-packages/sqlalchemy/sql/ddl.pyc in visit_table(self, table, create_ok, include_foreign_key_constraints, _is_metadata_operation)
    767         if hasattr(table, 'indexes'):
    768             for index in table.indexes:
--> 769                 self.traverse_single(index)
    770 
    771         table.dispatch.after_create(

/Library/Python/2.7/site-packages/sqlalchemy/sql/visitors.pyc in traverse_single(self, obj, **kw)
    119             meth = getattr(v, "visit_%s" % obj.__visit_name__, None)
    120             if meth:
--> 121                 return meth(obj, **kw)
    122 
    123     def iterate(self, obj):

/Library/Python/2.7/site-packages/sqlalchemy/sql/ddl.pyc in visit_index(self, index)
    786 
    787     def visit_index(self, index):
--> 788         self.connection.execute(CreateIndex(index))
    789 
    790 

/Library/Python/2.7/site-packages/sqlalchemy/engine/base.pyc in execute(self, object, *multiparams, **params)
    912                 type(object))
    913         else:
--> 914             return meth(self, multiparams, params)
    915 
    916     def _execute_function(self, func, multiparams, params):

/Library/Python/2.7/site-packages/sqlalchemy/sql/ddl.pyc in _execute_on_connection(self, connection, multiparams, params)
     66 
     67     def _execute_on_connection(self, connection, multiparams, params):
---> 68         return connection._execute_ddl(self, multiparams, params)
     69 
     70     def execute(self, bind=None, target=None):

/Library/Python/2.7/site-packages/sqlalchemy/engine/base.pyc in _execute_ddl(self, ddl, multiparams, params)
    966             compiled,
    967             None,
--> 968             compiled
    969         )
    970         if self._has_events or self.engine._has_events:

/Library/Python/2.7/site-packages/sqlalchemy/engine/base.pyc in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1144                 parameters,
   1145                 cursor,
-> 1146                 context)
   1147 
   1148         if self._has_events or self.engine._has_events:

/Library/Python/2.7/site-packages/sqlalchemy/engine/base.pyc in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
   1339                 util.raise_from_cause(
   1340                     sqlalchemy_exception,
-> 1341                     exc_info
   1342                 )
   1343             else:

/Library/Python/2.7/site-packages/sqlalchemy/util/compat.pyc in raise_from_cause(exception, exc_info)
    200     exc_type, exc_value, exc_tb = exc_info
    201     cause = exc_value if exc_value is not exception else None
--> 202     reraise(type(exception), exception, tb=exc_tb, cause=cause)
    203 
    204 if py3k:

/Library/Python/2.7/site-packages/sqlalchemy/engine/base.pyc in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1137                         statement,
   1138                         parameters,
-> 1139                         context)
   1140         except Exception as e:
   1141             self._handle_dbapi_exception(

/Library/Python/2.7/site-packages/sqlalchemy/engine/default.pyc in do_execute(self, cursor, statement, parameters, context)
    448 
    449     def do_execute(self, cursor, statement, parameters, context=None):
--> 450         cursor.execute(statement, parameters)
    451 
    452     def do_execute_no_params(self, cursor, statement, context=None):

/Library/Python/2.7/site-packages/pg8000/core.pyc in execute(self, operation, args, stream)
    905                 if not self._c.in_transaction and not self._c.autocommit:
    906                     self._c.execute(self, "begin transaction", None)
--> 907                 self._c.execute(self, operation, args)
    908         except AttributeError as e:
    909             if self._c is None:

/Library/Python/2.7/site-packages/pg8000/core.pyc in execute(self, cursor, operation, vals)
   2015         self._write(SYNC_MSG)
   2016         self._flush()
-> 2017         self.handle_messages(cursor)
   2018         if cursor.portal_suspended:
   2019             if self.autocommit:

/Library/Python/2.7/site-packages/pg8000/core.pyc in handle_messages(self, cursor)
   2092 
   2093         if self.error is not None:
-> 2094             raise self.error
   2095 
   2096     # Byte1('C') - Identifies the message as a close command.

ProgrammingError: (pg8000.core.ProgrammingError) (u'ERROR', u'42P07', u'relation "ix_geographic_data_e_index" already exists', u'index.c', u'773', u'index_create', u'', u'') [SQL: u'CREATE INDEX ix_geographic_data_e_index ON geographic_data.e (index)']

Once the data is updated, it is possible to run the SQL commands to properly create geom columns in the tables, this can be done as follows. The ojective is to run an SQL querie like this:

PGSQL
set search_path = SCHEMA, public;
alter table vTABLE drop column if exists geom;
SELECT AddGeometryColumn ('SCHEMA','vTABLE','geom',4326,'POINT',2);
UPDATE TABLE set geom = ST_SetSRID(st_makepoint(vTABLE.lon, vTABLE.lat), 4326)::geometry;

where SCHEMA and vTABLE are the variable portions. Also note, that this query assumes that your columns with latitude and longitude are named lat and lon respectively; moreover, it also assumes that the coordinates are in the 4326 projection.

The following function runs the query for you, considering again that the data is clean and nice.


In [9]:
def create_geom(table,schema,engine, projection=4326):
    k = engine.connect()
    query = ('set search_path = "'+ schema +'"'+ ', public;')
    print query
    k.execute(query)
    query = ('alter table ' + table + ' drop column if exists geom;')
    print query
    k.execute(query)
    query = 'SELECT AddGeometryColumn (\''+ schema + '\',\''+ table + '\',\'geom\''+',4326,\'POINT\',2);'
    print query
    k.execute(query)
    query = ('UPDATE ' + table + ' set geom = ST_SetSRID(st_makepoint(' + table + '.lon, ' + 
             table + '.lat),' + str(projection) + ')::geometry;')
    k.execute(query)
    print query
    k = engine.dispose()
    return 'geom column added with SRID ' + str(projection)

In [10]:
table = 'substation_table'
schema = 'geographic_data'
create_geom(table,schema,engine_db)


set search_path = "geographic_data", public;
alter table substation_table drop column if exists geom;
SELECT AddGeometryColumn ('geographic_data','substation_table','geom',4326,'POINT',2);
UPDATE substation_table set geom = ST_SetSRID(st_makepoint(substation_table.lon, substation_table.lat),4326)::geometry;
Out[10]:
'geom column added with SRID 4326'

The function created the geom column, the next step is to define a function to create the Primary-Key in the db. Remember that the index from the data frame is included as an index in the db, sometimes an index is not really neded and might need to be dropped.


In [ ]:
def create_pk(table,schema,column,engine):
    k = engine.connect()
    query = ('set search_path = "'+ schema +'"'+ ', public;')
    print query
    k.execute(query)
    query = ('alter table ' + table + ' ADD CONSTRAINT '+ table +'_pk PRIMARY KEY (' + column + ')')
    print query 
    k.execute(query)
    k = engine.dispose()
    return 'Primary key created with column' + column

In [ ]:
col = ''
create_pk(table,schema,col,engine_db)

The reason why we use postgis is to improve geospatial queries and provide a better data structure for geospatial operations. Many of the ST_ functions have improved performance when a geospatial index is created. The process implemented here comes from this workshop. This re-creates the process using python functions so that it can be easily replicated for many tables.

The query to create a geospatial index is as follows:

set search_path = SCHEMA, public;
CREATE INDEX vTABLE_gix ON vTABLE USING GIST (geom);

This assumes that the column name with the geometry is named geom. If the process follows from the previous code, it will work ok.

The following step is to run a VACUUM, creating an index is not enough to allow PostgreSQL to use it effectively. VACUUMing must be performed when ever a new index is created or after a large number of UPDATEs, INSERTs or DELETEs are issued against a table.

VACUUM ANALYZE vTABLE;

The final step corresponds to CLUSTERING, this process re-orders the table according to the geospatial index we created. This ensures that records with similar attributes have a high likelihood of being found in the same page, reducing the number of pages that must be read into memory for some types of queries. When a query to find nearest neighbors or within a certain are is needed, geometries that are near each other in space are near each other on disk. The query to perform this clustering is as follows:

CLUSTER vTABLE USING vTABLE_gix;
ANALYZE vTABLE;

In [11]:
def create_gidx(table,schema,engine,column='geom'):
    k = engine.connect()
    query = ('set search_path = "'+ schema +'"'+ ', public;')
    k.execute(query)
    print query
    query = ('CREATE INDEX ' + table + '_gix ON ' + table + ' USING GIST (' + column + ');')
    k.execute(query)
    print query
    query = ('VACUUM ' + table + ';')
    k.execute(query)
    print query
    query = ('CLUSTER ' + table + ' USING ' + table + '_gix;')
    k.execute(query)
    print query
    query = ('ANALYZE ' + table + ';')
    k.execute(query)
    print query
    k = engine.dispose()
    return k

In [12]:
create_gidx(table,schema,engine_db)


set search_path = "geographic_data", public;
CREATE INDEX substation_table_gix ON substation_table USING GIST (geom);
VACUUM substation_table;
CLUSTER substation_table USING substation_table_gix;
ANALYZE substation_table;

In [ ]: