In [1]:
import csv
import numpy
import datetime
import pandas as pd
import subprocess
import psycopg2 as pg
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
db = 'cwbdb'
def create_db(dbname, hostname='localhost', pg_admin_db='postgres'):
#create a new postgresql database with postgis extension
conn = pg.connect(host=hostname, database=pg_admin_db)
checkdb_exist_sql = '''select count(datname) from pg_database where datname='%s';''' % dbname
cur = conn.cursor()
cur.execute(checkdb_exist_sql)
result = cur.fetchone()
if result[0] == 0:
# http://initd.org/psycopg/docs/extensions.html?highlight=isolation_level_autocommit
# before create new database, you need to set isolation level autocommit
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur.execute('CREATE DATABASE %s;' % dbname)
cur.close()
conn = pg.connect(host=hostname, database=dbname)
cur = conn.cursor()
cur.execute('CREATE EXTENSION postgis;')
cur.execute('CREATE EXTENSION postgis_topology;')
conn.commit()
else:
print("The database %s already exists!" % dbname)
cur.close()
def query_db(dbname, sql, hostname='localhost'):
conn = pg.connect(host=hostname, database=dbname)
cur = conn.cursor()
cur.execute(sql)
result = cur.fetchall()
cur.close()
return(result)
# create new table
def create_table(db, sql, hostname='localhost'):
try:
conn = pg.connect(host=hostname, database=db)
cur = conn.cursor()
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur.execute(sql)
conn.close()
except:
print("Create table error!")
raise
def insert_table(db, insert_sql, hostname='localhost'):
try:
conn = pg.connect(host=hostname, database=db)
cur = conn.cursor()
cur.execute(insert_sql)
conn.commit()
conn.close()
except:
print("Insert data into table error!")
raise
def split_rdate(db, new_table, orig_tab, hostname='localhost'):
split_sql = '''
CREATE TABLE %s AS (
SELECT
stn_code,
substring(cast(rdate as text) from 1 for 4)::integer yr,
substring(cast(rdate as text) from 5 for 2)::integer mm,
substring(cast(rdate as text) from 7 for 2)::integer dd,
substring(cast(rdate as text) from 9 for 2)::integer-1 hr,
ps01, tx01, rh01, wd01, wd02, pp01, ss01
FROM %s); ''' % ( new_table, orig_tab )
conn = pg.connect(host=hostname, database=db)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
cur.execute(split_sql)
conn.close()
In [ ]:
# create postgresql database with postgis extension
create_db(dbname=db)
In [2]:
# create format
create_format_sql = '''
DROP TABLE IF EXISTS format_cwb;
DROP SEQUENCE IF EXISTS format_cwb_seq;
CREATE SEQUENCE format_cwb_seq;
CREATE TABLE format_cwb (
id int DEFAULT nextval('format_cwb_seq') PRIMARY KEY,
format character varying, -- format type, ex: Format_CWB_H_1
pd_start date, -- start period of format, ex: 1951-01-01
pd_end date, -- end period of format, ex:2008-12-31
item_code character varying, -- item (observation) code
item_detail character varying, -- detail information of item
pos_start int, -- start position
pos_end int -- end position
); '''
# create basic information of each meteorological station
create_meteo_sql = '''
DROP TABLE IF EXISTS station_cwb;
CREATE TABLE station_cwb (
stn_code character(6) PRIMARY KEY, -- 測站代碼
station character(24), -- 測站名稱
elevation float, -- 海拔高度(m)
x float, -- TWD97 經度
y float -- TWD97 緯度
);'''
In [3]:
#create_table(db, create_meteo_sql)
create_table(db,create_format_sql)
In [4]:
try:
format_file = open('info/format_cwb.csv', 'r')
sql_insert = 'INSERT INTO format_cwb (format, pd_start, pd_end, item_code, item_detail, pos_start, pos_end) VALUES ('
conn = pg.connect(host='localhost', database=db)
cur = conn.cursor()
for row in format_file.readlines():
insert = sql_insert+str(row)+');'
cur.execute(insert)
conn.commit()
conn.close()
except:
raise
In [ ]:
# insert station information into station_cwb table
conn = pg.connect(host='localhost', database=db)
cur = conn.cursor()
rfile = open('info/stn_info.csv', 'r')
csv_reader = csv.reader(rfile)
s1 = 'INSERT INTO station_cwb (stn_code, station, elevation, x, y) VALUES ('
for row in csv_reader:
insert = s1+'\''+row[0]+'\',\''+row[1]+'\','+row[2]+','+row[4]+','+row[6]+');'
cur.execute(insert)
conn.commit()
conn.close()
In [ ]:
sql = 'select stn_code from station_cwb_2 where stn_code like \'C%\''
stn_res = query_db(db, sql)
In [ ]:
# concatenate hourly data into single file
autostn_a1 = []
autostn_a2 = []
for i in stn_res:
# type A1
cp1 = '''cat CWB_A_%s_200[0-8]*.txt CWB_A_%s_20090[1-9].txt CWB_A_%s_200910.txt > ../../../data/type_a/CWB_A1_%s.txt'''% (i[0], i[0], i[0], i[0])
cp2 = '''cat CWB_A_%s_20091[1-2].txt CWB_A_%s_201*.txt > ../../../data/type_a/CWB_A2_%s.txt''' % (i[0], i[0], i[0])
autostn_a1.append(cp1)
autostn_a2.append(cp2)
autostn_cat_f1 = open('script/autostn_list_cat.sh', 'w')
autostn_cat_f2 = open('script/autostn_list_cat2.sh', 'w')
for item in autostn_a1:
autostn_cat_f1.write("%s\n" % item)
for item in autostn_a2:
autostn_cat_f2.write("%s\n" % item)
In [ ]:
query_stn_code = '''
select stn_code from station_cwb where stn_code not like '4%';'''
stn = query_db(db, query_stn_code)
# create automatic recording meteorological station
for i in range(0, len(stn)):
create_auto_stn_sql = '''
DROP TABLE IF EXISTS stn_auto_%s;
-- CREATE TABLE stn_auto_%s(
-- stn_code character varying,
-- rdate numeric,
-- ps01 float,
-- tx01 float,
-- wd01 float,
-- wd02 float,
-- pp01 float,
-- ss01 float
-- );
''' % (stn[i][0], stn[i][0])
create_table(db, create_auto_stn_sql)
In [ ]:
list_inputf_sql = '''SELECT stn_code from info_a_1_list;'''
input_stn = query_db(db, list_inputf_sql)
input_stn
In [ ]:
list_inputf_sql = '''SELECT stn_code from info_a_1_list;'''
input_stn = query_db(db, list_inputf_sql)
conn = pg.connect(host='localhost', database=db)
cur = conn.cursor()
for i in range(0,len(input_stn)):
stn_name = input_stn[i][0].lower()
out_stn_file = '/tmp/stn_auto_'+stn_name+'.csv'
# find the column specs
tab_width_sql = '''select pos_start,pos_end from format_cwb where format='Format_CWB_A_1';'''
tab_width = query_db(db, tab_width_sql)
tab_len = len(tab_width)
# dirty hack of date and hour column
#date_list = (tab_width[1][0]-1,tab_width[4][1])
#hr_list = (tab_width[4][0]-1,tab_width[4][1])
#ncolspec = [tab_width[0], (7,17)]#, hr_list]
#for row in range(5,tab_len):
# ncolspec.append(tab_width[row])
ncolspec = [(0, 6), (7, 17), (18, 24), (25, 31), (32, 38), (39, 45), (46, 52), (53, 59)]
# open raw data and use read_fwf to read and parse date
stn_data_file = open('concat_a/CWB_A_%s_1.txt' % stn_name.upper(), 'r')
#pd.read_fwf(stn_data_file, colspecs=ncolspec, header=None, infer_datetime_format=True, parse_dates=[1], na_values='-9999')
df = pd.read_fwf(stn_data_file, colspecs=ncolspec, header=None, na_values='-9999')
df.to_csv(out_stn_file, index=False, convert_datetime64=False , header=False)
copy_sql = '''COPY %s FROM '%s' WITH CSV DELIMITER ',';
''' % ( 'stn_auto_'+stn_name, out_stn_file )
cur.execute(copy_sql)
conn.commit()
print(stn_name)
conn.close()
In [ ]:
ncolspec = [(0, 6), (7, 17), (18, 24), (25, 31), (32, 38), (39, 45), (46, 52), (53, 59)]
stn_a1_data = open('data/CWB_A1_all.txt', 'r')
a1_df = pd.read_fwf(stn_a1_data, colspecs=ncolspec, header=None, na_values='-9999')
a1_df.to_csv('data/CWB_A1_all_parsed.csv', index=False, header=False)
In [ ]:
In [ ]:
In [ ]:
In [5]:
split_rdate('cwbdb', 'stn_auto_r', 'stn_auto')
In [4]:
In [ ]: