In [68]:
import os
import re
import psycopg2
import getpass
from collections import OrderedDict
# database config
sqluser=getpass.getuser()
# keep sqlpass blank if using peer authentication
sqlpass=''
# database
sqldb='mimic'
sqlschema='public,mimiciii'
query_schema = 'set search_path to ' + sqlschema + ';'
In [32]:
if (not sqlpass) & (sqlpass != ''):
con = psycopg2.connect(user=sqluser, password=sqlpass, database=sqldb)
else:
con = psycopg2.connect(user=sqluser, database=sqldb)
print('Connected!')
In [24]:
# function to read a single script
def read_script(base_path, script_name):
query = ''
with open(os.path.join(base_path,script_name)) as f:
for line in f.readlines():
line = line.lstrip(' ').rstrip(' ')
if len(line)<1:
continue
elif len(line)<2:
query += line
else:
# ignore comments
if '--' in line:
line = line[0:line.index('--')]
query += line
# replace double newlines with single newline
query = query.replace('\n\n','\n')
return query
In [89]:
def extract_drop_line(query):
# hack out the drop materialized view/drop table statement
query_drop = []
if 'drop materialized view ' in query.lower():
query_drop.extend(re.findall('drop materialized view [A-z0-9_ ]+;\n',query,re.I))
if 'drop table ' in query.lower():
query_drop.extend(re.findall('drop table [A-z0-9_ ]+;\n',query,re.I))
if not query_drop:
query_drop = ''
elif len(query_drop)==1:
query = query.replace(query_drop[0], '')
query = [query]
else:
# have multiple drop/create statements
query_parts = list() #query.split(query_drop[1])[0]
for i, q in enumerate(query_drop):
# get first part of query
query_split = query.split(q)
query_parts.append(query_split[0])
query = query_split[1]
# now append the final table created in the full query
query_parts.append(query)
# remove the first element
query_parts = query_parts[1:]
query = query_parts
return query, query_drop
In [96]:
# benchmark query
def benchmark_query(con, query, query_schema=query_schema, query_drop=query_drop, parallel_workers=None):
cur = con.cursor()
cur.execute(query_schema)
if parallel_workers:
cur.execute('SET max_parallel_workers_per_gather TO {};'.format(parallel_workers))
else:
cur.execute('SET max_parallel_workers_per_gather TO DEFAULT;')
cur.execute(query_drop)
cur.execute('explain analyze ' + query)
result = cur.fetchall()
cur.execute('commit;')
cur.close()
query_plan = [item[0] for item in result]
time = float(query_plan[-1].replace('Execution time: ', '').replace(' ms', ''))
return time, query_plan
In [86]:
# example on a single concept
base_path = '/home/alistairewj/git/mimic-code/concepts'
script_name = 'demographics/icustay_detail.sql'
print(script_name, end='...')
# read the script's query
query = read_script(base_path, script_name)
# returns a list of queries/drop statements
query, query_drop = extract_drop_line(query)
if len(query)==1:
# most of the time each script only creates a single view/table
query = query[0]
query_drop = query_drop[0]
time, query_plan = benchmark_query(con, query, query_schema=query_schema, query_drop=query_drop)
print('{:6.1f}s'.format(time/1000))
else:
print('')
for i in range(len(query)):
time, query_plan = benchmark_query(con, query[i], query_schema=query_schema, query_drop=query_drop[i])
print(' part {} - {:6.1f}s'.format(i, time/1000))
In [100]:
query_plans = OrderedDict()
query_times = OrderedDict()
base_path = '/home/alistairewj/git/mimic-code/concepts'
# read through all make concepts
with open(os.path.join(base_path,'make-concepts.sql')) as fp:
for line in fp.readlines():
if len(line)<2:
continue
elif line[0:2] != '\\i':
continue
elif 'ccs_diagnosis_table.sql' in line:
continue
# get the name of the script
script_name = line[3:].rstrip('\n')
print('{:40s}'.format(script_name), end='... ')
# read the script's query
query = read_script(base_path, script_name)
query, query_drop = extract_drop_line(query)
if len(query)==1:
# most of the time each script only creates a single view/table
q = query[0]
qd = query_drop[0]
time, query_plan = benchmark_query(con, q, query_schema=query_schema, query_drop=qd)
print('{:6.1f}s'.format(time/1000))
else:
query_plans[script_name] = list()
query_times[script_name] = list()
for i in range(len(query)):
time, query_plan = benchmark_query(con, query[i], query_schema=query_schema, query_drop=query_drop[i])
print('')
print(' part {}...{:18s}{:6.1f}s'.format(i, '', time/1000))
query_plans[script_name].append(query_plan)
query_times[script_name].append(time)
In [99]:
# same thing, but test parallel
query_plans_single_core = OrderedDict()
query_times_single_core = OrderedDict()
parallel_workers = 0
base_path = '/home/alistairewj/git/mimic-code/concepts'
# read through all make concepts
with open(os.path.join(base_path,'make-concepts.sql')) as fp:
for line in fp.readlines():
if len(line)<2:
continue
elif line[0:2] != '\\i':
continue
elif 'ccs_diagnosis_table.sql' in line:
continue
# get the name of the script
script_name = line[3:].rstrip('\n')
print('{:40s}'.format(script_name), end='... ')
# read the script's query
query = read_script(base_path, script_name)
query, query_drop = extract_drop_line(query)
if len(query)==1:
# most of the time each script only creates a single view/table
q = query[0]
qd = query_drop[0]
time, query_plan = benchmark_query(con, q, query_schema=query_schema, query_drop=qd,
parallel_workers=0)
print('{:6.1f}s'.format(time/1000))
query_plans_single_core[script_name] = query_plan
query_times_single_core[script_name] = time
else:
query_plans_single_core[script_name] = list()
query_times_single_core[script_name] = list()
print('')
for i in range(len(query)):
time, query_plan = benchmark_query(con, query[i],
query_schema=query_schema, query_drop=query_drop[i],
parallel_workers=0)
print(' part {}...{:18s}{:6.1f}s'.format(i, '', time/1000))
query_plans_single_core[script_name].append(query_plan)
query_times_single_core[script_name].append(time)
In [105]:
# first print all queries which used a parallel plan
for q in query_plans:
for i, l in enumerate(query_plans[q]):
if 'Parallel' in l:
print(q)
break
If the above prints nothing, no queries are using a parallel plan! :(