In [ ]:
# Input file path.
file_name = 'my_file.csv'
# Input SQL server information.
db_property = dict(
host='my_host',
user='user',
password='password',
db='my_db'
)
db_table = 'my_table'
In [ ]:
from __future__ import print_function
from __future__ import division
import numpy as np
import scipy as sp
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
In [ ]:
def get_sql_engine(db_property):
"""Get MySQL engine.
Args:
db_property: A dict.
db_property['user']: DB server user.
db_property['password']: DB server passward.
db_property['host']: DB server host.
db_property['db']: DB server database.
"""
import sqlalchemy
connection_str = 'mysql://{0}:{1}@{2}/{3}'\
.format(db_property['user'], db_property['password'],
db_property['host'], db_property['db'])
return sqlalchemy.create_engine(connection_str+'?charset=utf8')
In [ ]:
def truncate_sql_results(db_property, db_table):
"""Truncate previous MySQL table results.
Note: Of course we could revise the SQL scripts.
Args:
db_property: A dict.
db_property['user']: DB server user.
db_property['password']: DB server passward.
db_property['host']: DB server host.
db_property['db']: DB server database.
db_table: A string. Table name.
"""
engine = get_sql_engine(db_property)
try:
engine.execute('DELETE FROM %s' % db_table)
except:
pass
In [ ]:
def write_pydf_2sql(pydf, db_property, db_table, DB_WRITE_CHUNKSIZE=500):
"""Write Pandas DataFrame to MySQL table.
Args:
pydf: Pandas DataFrame.
db_property: A dict.
db_property['user']: DB server user.
db_property['password']: DB server passward.
db_property['host']: DB server host.
db_property['db']: DB server database.
db_table: A string. Table name.
"""
import pandas as pd
truncate_previous_results(db_property, db_table)
engine = get_sql_engine(db_property)
pydf.to_sql(db_table, engine, index=False, if_exists='append',
chunksize=DB_WRITE_CHUNKSIZE)
In [ ]:
data_pydf = pd.read_csv(file_name)
In [ ]:
write_pydf_2sql(data_pydf, db_table)