In [39]:
import os
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import threading
In [40]:
#Get sample data
df = pd.DataFrame(np.random.randint(0,100,size=(100000, 4)), columns=list('ABCD'))
df.drop_duplicates(['A', 'B'], keep='last', inplace=True)
df.head(1)
Out[40]:
In [41]:
#Create our test database for upserts (this is postgreSQL)
DB_TYPE = 'postgresql'
DB_DRIVER = 'psycopg2'
DB_USER = 'admin'
DB_PASS = 'password'
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'pandas_upsert'
POOL_SIZE = 50
### Config update complete ###
SQLALCHEMY_DATABASE_URI = '%s+%s://%s:%s@%s:%s/%s' %(DB_TYPE, DB_DRIVER, DB_USER,
DB_PASS, DB_HOST, DB_PORT, DB_NAME)
#Add more threads to the pool for execution
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_size=POOL_SIZE, max_overflow=0)
In [43]:
def to_sql_newrows(df, pool_size, *args, **kargs):
"""
Extend the Python pandas to_sql() method to thread database insertion
Required:
df : pandas dataframe to insert new rows into a database table
POOL_SIZE : your sqlalchemy max connection pool size. Set < your db connection limit.
Example where this matters: your cloud DB has a connection limit.
*args:
Pandas to_sql() arguments.
Required arguments are:
tablename : Database table name to write results to
engine : SqlAlchemy engine
Optional arguments are:
'if_exists' : 'append' or 'replace'. If table already exists, use append.
'index' : True or False. True if you want to write index values to the db.
Credits for intial threading code:
http://techyoubaji.blogspot.com/2015/10/speed-up-pandas-tosql-with.html
"""
CHUNKSIZE = 1000
INITIAL_CHUNK = 100
if len(df) > CHUNKSIZE:
#write the initial chunk to the database if df is bigger than chunksize
df.iloc[:INITIAL_CHUNK, :].to_sql(*args, **kargs)
else:
#if df is smaller than chunksize, just write it to the db now
df.to_sql(*args, **kargs)
workers, i = [], 0
for i in range((df.shape[0] - INITIAL_CHUNK)/CHUNKSIZE):
t = threading.Thread(target=lambda: df.iloc[INITIAL_CHUNK+i*CHUNKSIZE:INITIAL_CHUNK+(i+1)*CHUNKSIZE].to_sql(*args, **kargs))
t.start()
workers.append(t)
df.iloc[INITIAL_CHUNK+(i+1)*CHUNKSIZE:, :].to_sql(*args, **kargs)
[t.join() for t in workers]
In [44]:
#Create a table with a unique constraint on A and B columns.
engine.execute("""DROP TABLE IF EXISTS "test_upsert" """)
engine.execute("""CREATE TABLE "test_upsert" (
"A" INTEGER,
"B" INTEGER,
"C" INTEGER,
"D" INTEGER,
CONSTRAINT pk_A_B PRIMARY KEY ("A","B"))
""")
"""
#Add unique constraint to table
try:
args = ' ALTER TABLE test_upsert ADD CONSTRAINT uk_a_b UNIQUE ("A", "B") '
results = engine.execute(args)
print 'success'
except:
print 'unique constraint already exists'
"""
#Insert data using pandas.to_sql
df.to_sql('test_upsert', engine, if_exists='append', index=False)
In [45]:
#Check that the table exists and there is data in it
df_in_db = pd.read_sql_query('SELECT "A", "B" FROM test_upsert', engine)
df_in_db.head(1)
Out[45]:
In [46]:
#Now lets bring in some new data to insert, along with the same old data
df_new = df = pd.DataFrame(np.random.randint(0,1000,size=(100000, 4)), columns=list('ABCD'))
df2 = pd.concat([df, df_new])
df2.head(1)
Out[46]:
In [47]:
#First let's get the length of both dataframe
len_df2 = df2.shape[0]
len_df_in_db = df_in_db.shape[0]
print ('new df is %s rows, and data in db is %s rows') %(len_df2, len_df_in_db)
In [48]:
#Now let's find out what rows are duplicates using a combination of left outer join and select where the join type is 'left only'
#The new _merge column added via the new indicator column in pandas will help us greatly here
#Lets use a self join to make sure our sample data doesnt have duplicates
df2.drop_duplicates(['A', 'B'], keep='last', inplace=True)
df_all = pd.merge(df2, df_in_db, how='left', on=['A', 'B'],
copy=False, indicator=True, suffixes=['', '_in_db'])
df_all.reset_index(inplace=True, drop=True)
df_all = df_all[df_all['_merge']=='left_only']
print 'left joined df is %s rows' %(df_all.shape[0])
df_all.head(1)
Out[48]:
In [49]:
#Now let's drop any columns that are in "both" or the "right only (in the datbase)
cols_to_drop = list([col for col in df_all.columns \
if '_in_db' in col \
or 'ID' in col
or 'index' in col
or '_merge' in col])
print cols_to_drop
df_unique = df_all.drop(cols_to_drop, axis=1)
df_unique.head(1)
Out[49]:
In [50]:
#Trying multithreaded insert new rows into database...
tosql(df_unique, POOL_SIZE, upsert=True, 'test_upsert', engine, if_exists='append', index=False)
In [11]:
#Now let's put the new records in df_unique back into the database!
df_unique.to_sql('test_upsert', engine, if_exists='append', index=False)
In [13]:
df_in_db_2 = pd.read_sql_query('SELECT count("A") FROM test_upsert', engine)
df_in_db_2.head(1)
Out[13]:
In [15]:
#Awesome, that seems to work. We only inserted new rows! Let's check to make sure it's unique
df_dupscount = pd.read_sql_query("""
select * from (
SELECT "A",
ROW_NUMBER() OVER(PARTITION BY "A", "B" ORDER BY "A" asc) AS Row
FROM test_upsert
) dups
where
dups.Row > 1
""", engine)
df_dupscount.head(5)
Out[15]:
In [ ]:
In [ ]: