We use an XML export of the various tables in the FileMaker Inkind database.
The XML will be read, field definitions will be extracted from it, the data will be read. We do the following:
In [27]:
import os,sys,re,collections,json
from os.path import splitext, basename
from copy import deepcopy
from functools import reduce
from glob import glob
from lxml import etree
from datetime import datetime
from pymongo import MongoClient
In [28]:
country_extra = dict(
AT=('Austria', 47.7, 15.11),
BE=('Belgium', 51.3, 3.1),
HR=('Croatia', 44.7, 15.6),
CY=('Cyprus', 35.0, 32.8),
CZ=('Czech Republic', 49.8, 15.2),
DK=('Denmark', 55.6, 11.0),
EE=('Estonia', 59.0, 25.0),
FR=('France', 46.5, 1.9),
DE=('Germany', 51.0, 10.4),
GR=('Greece', 38.0, 23.8),
HU=('Hungary', 46.9, 19.8),
IE=('Ireland', 53.1, -8.4),
IT=('Italy', 41.6, 13.0),
LV=('Latvia', 56.9, 26.8),
LT=('Lithuania', 55.2, 24.9),
LU=('Luxembourg', 49.6, 6.1),
MT=('Malta', 35.9, 14.4),
NL=('Netherlands', 52.8, 5.8),
PL=('Poland', 52.3, 19.8),
PT=('Portugal', 38.7, -9.0),
RS=('Serbia', 44.0, 20.8),
SK=('Slovakia', 48.8, 19.9),
SI=('Slovenia', 46.2, 14.4),
CH=('Switzerland', 46.9, 8.3),
GB=('United Kingdom', 52.9, -1.8),
AL=('Albania', 41.1, 19.9),
AD=('Andorra', 42.5, 1.6),
BY=('Belarus', 53.8, 29.2),
BA=('Bosnia and Herzegovina', 44.2, 18.2),
BG=('Bulgaria', 42.9, 26.5),
FI=('Finland', 63.3, 27.6),
GE=('Georgia', 41.66, 43.68),
IS=('Iceland', 65.0, -18.8),
SM=('San Marino', 43.8, 12.3),
KS=('Kosovo', 43.2, 21.9),
LI=('Liechtenstein', 47.2, 9.4),
MK=('Macedonia', 41.6, 21.8),
MD=('Moldova', 47.3, 28.7),
MC=('Monaco', 43.7, 7.4),
ME=('Montenegro', 42.3, 19.2),
NO=('Norway', 62.0, 7.1),
RO=('Romania', 45.8, 24.8),
RU=('Russian Federation', 55.6, 37.7),
ES=('Spain', 39.8, -3.4),
SE=('Sweden', 59.5, 16.1),
TR=('Turkey', 40.0, 32.8),
UA=('Ukraine', 49.3, 32.6),
)
In [29]:
# the name of the SQL database that will hold all exported data
DB_NAME = 'dariah_data'
# merge source fields into one target field:
#
# table => target_field = (source_field1, source_field2, ...)
MERGE_FIELDS = dict(
contrib=dict(
academic_entity_url=('academic_entity_url_2',),
contribution_url=('contribution_url_2',),
contact_person_mail=('contact_person_mail_2',),
),
)
# discard fields
#
# table => {discard_field1, discard_field2, ...}
SKIP_FIELDS = dict(
contrib={
'teller',
'whois',
'help_text',
'help_description',
'total_costs_total',
'goldpassword',
'gnewpassword',
'gnewpassword2',
},
country={
'countryname',
},
)
# change the type of fields
#
# table => {field1: newtype1, field2: newtype2, ...}
FIELD_TYPE_OVERRIDE = dict(
contrib=dict(
costs_total='valuta',
creation_date_time='datetime',
modification_date_time='datetime',
dateandtime_approval='datetime',
dateandtime_cioapproval='datetime',
dateandtime_ciozero='datetime',
contact_person_mail='email',
contact_person_mail_2='email',
),
country=dict(
member_dariah='bool',
),
)
# move fields from one table to a new table
#
# source_table => target_table => {source_field1, source_field2, ...}
MOVE_FIELDS = dict(
contrib=dict(
assessment={
'submit',
'approved',
'vcchead_approval',
'vcchead_disapproval',
'dateandtime_approval',
'dateandtime_cioapproval',
'dateandtime_ciozero',
'vcc_head_decision',
'vcc_head_decision_vcc11',
'vcc_head_decision_vcc12',
'vcc_head_decision_vcc21',
'vcc_head_decision_vcc22',
'vcc_head_decision_vcc31',
'vcc_head_decision_vcc32',
'vcc_head_decision_vcc41',
'vcc_head_decision_vcc42',
'vcc11_name',
'vcc12_name',
'vcc21_name',
'vcc22_name',
'vcc31_name',
'vcc32_name',
'vcc41_name',
'vcc42_name',
},
),
)
# split field values into multiple values
# specify a regular expression to split the value on, this can be set per field
# refer to the splitting re-s by name, they will be defined later on
#
# table => {field1: splitter1, field2: splitter2, ...}
SPLIT_FIELDS = dict(
contrib=dict(
disciplines_associated='generic',
other_keywords='generic_comma',
tadirah_research_activities='generic',
tadirah_research_objects='generic',
tadirah_research_techniques='generic',
type_of_inkind='generic',
vcc='generic',
year='generic',
creator='generic',
last_modifier='generic',
contact_person_name='generic',
),
)
# Sometimes values occur in several spellings.
# Here we normalize it.
# Per field we can give a list of values.
# For every value for that field that we encounter in the data, we apply a normalize method
# and compare it with the normalized given values. If there is a match, we replace the
# encountered value with the given value.
NORMALIZE_FIELDS = dict(
contrib=dict(
tadirah_research_activities=dict(
normalizer='generic',
values=set('''
1. Capture
2. Creation
3. Enrichment
4. Analysis
5. Interpretation
6. Storage
7. Dissemination
8. Meta-Activities
'''.strip().split('\n'))
),
tadirah_research_objects=dict(
normalizer='generic',
values=set('''
Artifacts
Bibliographic Listings
Code
Computers
Curricula
Digital Humanities
Data
File
Images
Images (3D)
Infrastructure
Interaction
Language
Link
Literature
Manuscript
Map
Metadata
Methods
Multimedia
Multimodal
Named Entities
Persons
Projects
Research
Research Process
Research Results
Sheet Music
Software
Sound
Standards
Text
Text Bearing Objects
Tools
Video
VREs
'''.strip().split('\n'))
),
tadirah_research_techniques=dict(
normalizer='generic',
values=set('''
Bit Stream Preservation > Storage-Preservation
Brainstorming
Browsing
Cluster Analysis > Analysis-Stylistic Analysis
Collocation Analysis > Analysis- Structural Analysis
Concordancing > Analysis-Structural Analysis
Debugging
Distance Measures > Analysis-Stylistic Analysis
Durable Persistent Media > Storage-Preservation
Emulation > Storage-Preservation
Encoding
Gamification > Dissemination-Crowdsourcing
Georeferencing > Enrichment-Annotation
Information Retrieval > Analysis-Content Analysis
Linked open data > Enrichment-Annotation; Dissemination-Publishing
Machine Learning > Analysis-Structural Analysis; Analysis-Stylistic Analysis; Analysis-Content Analysis
Mapping
Migration > Storage-Preservation
Named Entity Recognition > Enrichment-Annotation; Analysis-Content Analysis
Open Archival Information Systems > Storage-Preservation
Pattern Recognition > Analysis-Relational Analysis
Photography
POS-Tagging > Analysis-Structural Analysis
Preservation Metadata > Storage-Preservation
Principal Component Analysis > Analysis-Stylistic Analysis
Replication > Storage-Preservation
Scanning
Searching
Sentiment Analysis > Analysis-Content Analysis
Sequence Alignment > Analysis-Relational Analysis
Technology Preservation > Storage-Preservation
Topic Modeling > Analysis-Content Analysis
Versioning > Storage-Preservation
Web Crawling > Capture-Gathering
'''.strip().split('\n'))
),
vcc=dict(
normalizer='generic',
values=set('''
VCC1
VCC2
VCC3
VCC4
Coordination
'''.strip().split('\n'))
),
type_of_inkind=dict(
normalizer='generic',
values=set('''
Access
Expertise
Interoperability
Content Hosting
Tools and Software
Event
Training
Summer School
Cooperation
Educational Resources
Data
DARIAH Coordination
'''.strip().split('\n'))
),
disciplines=dict(
normalizer='generic',
values=set('''
Archaeology and Prehistory
Architecture, space management
Art and art history
Biological anthropology
Classical studies
Communication sciences
Cultural heritage and museology
Demography
Economies and finances
Education
Environmental studies
Gender studies
Geography
History
History, Philosophy and Sociology of Sciences
Law
Linguistics
Literature
Management
Media studies
Methods and statistics
Musicology and performing arts
Philosophy
Political science
Psychology
Religions
Social Anthropology and ethnology
Sociology
'''.strip().split('\n'))
),
)
# generate a separate table for the values of some fields
# N.B.: for fields with multiple values, this will be done automatically
# and for such fields a cross table will be generated as well
#
# table => {field1, field2, ...}
VALUE_FIELDS = dict(
contrib={
'country',
'creator',
'last_modifier',
'other_type_of_inkind',
},
)
# sometimes the generated table of the values in a field clash with an existing table
# with the same name which already contains those values.
# This can be fixed by linking the values in that field to the existing table instead.
# The fix involves mapping the identifiers in the generated table to the identifiers in the
# existing table. This mapping is guided by a link field, i.e. the field in the existing table
# that contains the values as found in the main table.
#
# table => {value_field1: link_field1, value_field2, link_field2, ...}
FIX_FIELDS = dict(
contrib=dict(
country='countrycode',
),
)
# sometimes we want to add fields to a table based on external information.
# For example, we want to add certain country information that we need to
# visualize countries on a map.
ADD_FIELDS = dict(
country=dict(
link_field='countrycode',
fields = (
('name', 'text'),
('latitude', 'decimal'),
('longitude', 'decimal'),
),
data = country_extra,
),
)
DEFAULT_FIELDS = dict(
contrib=dict(
creation_date_time="'2000-01-01T00:00:00'",
creator="'admin'",
type_of_inkind="'General'",
)
)
# for SQL writing: when writing insert queries, limit the number of records per statement
LIMIT_ROWS = 50 # maximum number of rows to be written in one sql insert statement
In [30]:
# Source field types, including types assigned by type overriding (see FIELD_TYPE_OVERRIDE above).
# These will be translated into appropriate SQL field types
TYPES = {'bool', 'number', 'decimal', 'text', 'valuta', 'email', 'date', 'datetime'}
# We inspect string values in fields, and assign varchar(size) types, where size is
# the least power of 2 that is big enough. But there is a minimum and a maximum.
# If the maximum is exceeded, a TEXT field (unlimited size) will be generated.
MIN_M = 5 # minimum varchar size = 2**MIN_M
MAX_M = 13 # maximum varchar size = 2**MAX_M
# dates are already in ISO (date2_pattern).
# If we encounter other dates, we could use date_pattern instead)
# datetimes are not in iso, they will be transformed to iso.
DECIMAL_PATTERN = re.compile(
r'^-?[0-9]+\.?[0-9]*'
)
DATE_PATTERN = re.compile(
r'^\s*([0-9]{2})/([0-9]{2})/([0-9]{4})$'
)
DATE2_PATTERN = re.compile(
r'^\s*([0-9]{4})-([0-9]{2})-([0-9]{2})$'
)
DATETIME_PATTERN = re.compile(
r'^\s*([0-9]{2})/([0-9]{2})/([0-9]{4})\s+([0-9]{2}):([0-9]{2})(?::([0-9]{2}))?$'
)
# meaningless values will be translated into NULLs
NULL_VALUES = {
'http://',
'https://',
'@',
}
BOOL_VALUES = {
True: {'Yes', 'YES', 'yes', 1, '1', True},
False: {'No', 'NO', 'no', 0, '0', 'NULL', False},
}
# when there is a clash between generated value tables and existing tables, the generated table's name will be
# appended with a suffix.
# If we have been told to fix this situation (see FIX_FIELDS above), the generated table will be discarded.
# So, if there are table names in the result ending with this suffix, something still has to be fixed.
TBF = '_tobefixed'
# Here are the splitting regular expressions to split field values into multiple values
splitters = dict(
generic=re.compile('[ \t]*[\n+][ \t\n]*'), # split on newlines (with surrounding white space)
generic_comma=re.compile('[ \t]*[\n+,][ \t\n]*'), # split on newlines or commas (with surrounding white space)
)
normalizers = dict(
generic=lambda x: x.lower().replace(' ',''),
)
In [31]:
# Locations
HOME_DIR = os.path.expanduser('~').replace('\\', '/')
BASE_DIR = '{}/Documents/DANS/projects/has/dacs'.format(HOME_DIR)
FM_DIR = '{}/fm'.format(BASE_DIR)
TEMP_DIR = '{}/tmp'.format(BASE_DIR)
RESULT_DIR = '{}/sql'.format(BASE_DIR)
FMNS = '{http://www.filemaker.com/fmpxmlresult}'
ROW_RAW_FILE = '{}/row_raw_file'.format(TEMP_DIR)
ROW_FILE = '{}/row_file'.format(TEMP_DIR)
ROW_EXT = 'txt'
In [32]:
nwarnings = 0
def resetw():
global nwarnings
nwarnings = 0
def info(msg):
sys.stdout.write('{}\n'.format(msg))
sys.stdout.flush()
def note(msg):
sys.stdout.write('NB: {}\n'.format(msg))
sys.stdout.flush()
def warning(msg, count=True):
global nwarnings
sys.stderr.write('{} {}: {}\n'.format('!'*5, 'WARNING', msg))
sys.stderr.flush()
if count: nwarnings += 1
def finalw():
if nwarnings == 0:
info('OK, no warnings')
else:
warning('There were {} warnings'.format(nwarnings), count=False)
def check_config():
good = True
for x in [1]:
good = False
if not os.path.exists(BASE_DIR):
warning('BASE_DIR does not exist: {}'.format(BASE_DIR))
break
this_good = True
for cdir in (TEMP_DIR, RESULT_DIR):
this_good = False
if not os.path.exists(cdir):
try:
os.makedirs(cdir)
except os.error as e:
warning('{} could not be created.'.format(cdir))
break
this_good = True
if not this_good:
break
good = True
if not good:
warning('There were configuration errors', count=False)
else:
info('Configuration OK')
This is the code to validate and transform field values.
They will be transformed into valid SQL values.
String values will be surrounded by '
, and '
inside will be escaped; newlines and tabs will be
replaced by \n
and \t
.
Currency values are problematic: there is no consistency in the use of currency symbols, thousands separators, and decimal points.
We assume all amounts are in euros and strip the currency symbol.
We assume all .
and ,
are thousands separators, unless they are followed by 1 or 2 digits only.
In that case we treat them as decimal points. The resulting values do not contain thousands separators, and use .
as decimal point.
All values with a .
or ,
inside will be shown to the user, including the resulting value, for checking.
In [33]:
def date_repl(match):
[d,m,y] = list(match.groups())
return '{}-{}-{}'.format(y,m,d)
def date2_repl(match):
[y,m,d] = list(match.groups())
return '{}-{}-{}'.format(y,m,d)
def datetime_repl(match):
[d,m,y,hr,mn,sc] = list(match.groups())
return '{}-{}-{}T{}:{}:{}'.format(y,m,d,hr,mn,sc or '00')
def sq(v_raw):
return "'{}'".format(
v_raw.strip().replace("'","''").replace('\t', '\\t').replace('\n', '\\n')
)
def bools(v_raw, i, t, fname):
if v_raw in BOOL_VALUES[True]: return 1
if v_raw in BOOL_VALUES[False]: return 0
warning(
'table `{}` field `{}` record {}: not a boolean value: "{}"'.format(
t, fname, i, v_raw
))
return v_raw
def num(v_raw, i, t, fname):
if type(v_raw) is int: return v_raw
if v_raw.isdigit(): return int(v_raw)
warning(
'table `{}` field `{}` record {}: not an integer: "{}"'.format(
t, fname, i, v_raw
))
return v_raw
def decimal(v_raw, i, t, fname):
if type(v_raw) is float: return v_raw
if v_raw.isdigit(): return float(v_raw)
if DECIMAL_PATTERN.match(v_raw): return float(v_raw)
warning(
'table `{}` field `{}` record {}: not an integer: "{}"'.format(
t, fname, i, v_raw
))
return v_raw
def email(v_raw, i, t, fname):
return v_raw.replace('mailto:', '', 1) if v_raw.startswith('mailto:') else v_raw
money_warnings = {}
money_notes = {}
def money(v_raw, i, t, fname):
note = ',' in v_raw or '.' in v_raw
v = v_raw.strip().lower().replace(' ','').replace('€', '').replace('euro', '').replace('\u00a0', '')
for p in range(2,4): # interpret . or , as decimal point if less than 3 digits follow it
if len(v) >= p and v[-p] in '.,':
v_i = v[::-1]
if v_i[p-1] == ',': v_i = v_i.replace(',', 'D', 1)
elif v_i[p-1] == '.': v_i = v_i.replace('.', 'D', 1)
v = v_i[::-1]
v = v.replace('.','').replace(',','')
v = v.replace('D', '.')
if not v.replace('.','').isdigit():
if len(set(v) & set('0123456789')):
warning(
'table `{}` field `{}` record {}: not a decimal number: "{}" <= "{}"'.format(
t, fname, i, v, v_raw,
))
money_warnings.setdefault('{}:{}'.format(t, fname), {}).setdefault(v, set()).add(v_raw)
else:
v = 'NULL'
money_notes.setdefault('{}:{}'.format(t, fname), {}).setdefault('NULL', set()).add(v_raw)
elif note:
money_notes.setdefault('{}:{}'.format(t, fname), {}).setdefault(v, set()).add(v_raw)
return v
def dt(v_raw, i, t, fname):
if not DATE2_PATTERN.match(v_raw):
warning(
'table `{}` field `{}` record {}: not a valid date: "{}"'.format(
t, fname, i, v_raw
))
return v_raw
return("'{}'".format(DATE2_PATTERN.sub(date2_repl, v_raw)))
def dtm(v_raw, i, t, fname):
if not DATETIME_PATTERN.match(v_raw):
warning(
'table `{}` field `{}` record {}: not a valid date time: "{}"'.format(
t, fname, i, v_raw
))
return v_raw
return("'{}'".format(DATETIME_PATTERN.sub(datetime_repl, v_raw)))
In [34]:
def read_fm():
main_tables_raw = []
parser = etree.XMLParser(remove_blank_text=True, ns_clean=True)
root = {}
for infile in glob('{}/*.xml'.format(FM_DIR)):
tname = basename(splitext(infile)[0])
print('Parsing {}'.format(tname))
root[tname] = etree.parse(infile, parser).getroot()
main_tables_raw.append(tname)
return (root, main_tables_raw)
In [35]:
def check_merge():
merge_errors = 0
merge_fields = {}
for t in main_tables_raw:
for (mfhead, mftail) in MERGE_FIELDS.get(t, {}).items():
for f in mftail:
if f in merge_fields.get(t, {}):
warning(
'table `{}` field `{}` already merged into `{}` now to be merged into `{}`'.format(
t, f, merge_fields[t][f], mfhead,
))
merge_errors += 1
merge_fields.setdefault(t, {})[f] = mfhead
if merge_errors:
warning('There were {} merge errors'.format(merge_errors), count=False)
else:
info('Merge definitions OK')
return merge_fields
def getfielddefs():
field_defs_raw = {}
fd_errors = 0
tfields = {}
for t in main_tables_raw:
fieldroots = [x for x in root[t].iter(FMNS+'METADATA')]
fieldroot = fieldroots[0]
tfields[t] = []
for x in fieldroot.iter(FMNS+'FIELD'):
fname = x.get('NAME').lower().replace(' ','_').replace(':', '_')
ftype = FIELD_TYPE_OVERRIDE.\
get(t, {}).\
get(fname, None) or x.get('TYPE').lower()
fmult = int(x.get('MAXREPEAT'))
if fname in SPLIT_FIELDS.get(t, {}): fmult += 1
tfields[t].append(fname)
field_defs_raw.setdefault(t, {})[fname] = [ftype, fmult]
if ftype not in TYPES:
warning('table `{}` field `{}` has unknown type "{}"'.format(
t, fname, ftype,
))
fd_errors += 1
if t in ADD_FIELDS:
added_fields = ADD_FIELDS[t]['fields']
for (fname, ftype) in added_fields:
if ftype not in TYPES:
warning('table `{}` field `{}` has unknown type "{}"'.format(
t, fname, ftype,
))
fd_errors += 1
tfields[t].append(fname)
field_defs_raw.setdefault(t, {})[fname] = [ftype, 1]
info('Table {:<20}: {:>2} fields'.format(t, len(tfields[t])))
if fd_errors:
warning('There were {} field definition errors'.format(fd_errors), count=False)
else:
info('Field definitions OK')
return (tfields, field_defs_raw)
def check_normalize():
for t in NORMALIZE_FIELDS:
for f in NORMALIZE_FIELDS[t]:
info = NORMALIZE_FIELDS[t][f]
normalizer = info['normalizer']
values = info['values']
if normalizer not in normalizers:
warning('Unknown normalizer in NORMALIZE_FIELDS: {}'.format(normalizer))
norm_function = lambda x: x
else:
norm_function = normalizers[normalizer]
info['value_lookup'] = dict((norm_function(val), val) for val in values)
info['norm_function'] = norm_function
def check_merge_more():
merge_errors = 0
for t in main_tables_raw:
for f in merge_fields.get(t, {}):
if f not in field_defs_raw[t]:
warning(
'table `{}`: cannot merge unknown field `{}`'.format(
t, f,
))
merge_errors += 1
continue
ftarget = merge_fields[t][f]
(ftype, fmult) = field_defs_raw[t][f]
if ftarget not in field_defs_raw[t]:
field_defs_raw[t][ftarget] = [ftype, 0]
(ttype, tmult) = field_defs_raw[t][ftarget]
if ttype != ftype:
warning(
'table `{}` field `{}` of type "{}" is merged into field `{}` of other type "{}"'.format(
t, f, ftype, ftarget, ttype,
))
merge_errors += 1
field_defs_raw[t][ftarget][1] += fmult
del field_defs_raw[t][f]
if merge_errors:
warning('There were {} merge errors'.format(merge_errors), count=False)
else:
info('Merge OK')
def do_skips():
fields_raw = {}
s_errors = 0
for t in main_tables_raw:
for f in SKIP_FIELDS.get(t, set()):
if f not in field_defs_raw[t]:
warning('table `{}`: unknown skip field `{}`'.format(t,f))
s_errors += 1
else:
del field_defs_raw[t][f]
fields_raw[t] = sorted(
set(field_defs_raw[t].keys()) | set(merge_fields.get(t, {}).values())
)
if s_errors:
warning('There were {} field skip errors'.format(s_errors), count=False)
else:
info('Field skips OK')
return fields_raw
In [36]:
def getdata():
rows_raw = {}
errors = {}
for t in main_tables_raw:
if t in ADD_FIELDS:
xinfo = ADD_FIELDS[t]
link_field = xinfo['link_field']
extra_fields = xinfo['fields']
extra_field_data = xinfo['data']
extra_field_set = {f[0] for f in extra_fields}
dataroots = [x for x in root[t].iter(FMNS+'RESULTSET')]
dataroot = dataroots[0]
rows_raw[t] = []
seen = set()
for (i, r) in enumerate(dataroot.iter(FMNS+'ROW')):
row = []
for c in r.iter(FMNS+'COL'):
data = [x.text for x in c.iter(FMNS+'DATA')]
row.append(data)
if t in ADD_FIELDS:
extra_id = None
for (fname, values) in zip(tfields[t], row):
if fname == link_field:
extra_id = ''.join(values)
seen.add(extra_id)
for xdata in extra_field_data[extra_id]:
row.append((xdata,))
if len(row) != len(tfields[t]):
errors.setdefault(t, {}).setdefault('Number of fields', []).append(i)
rows_raw[t].append(row)
if t in ADD_FIELDS:
for i in extra_field_data:
if i in seen: continue
row = []
j = 0
for f in tfields[t]:
if f == link_field:
row.append((i,))
elif f not in extra_field_set:
row.append(('NULL',))
for xdata in extra_field_data[i]:
row.append((xdata,))
rows_raw[t].append(row)
rf = open('{}_{}.{}'.format(ROW_RAW_FILE, t, ROW_EXT), 'w')
for row in rows_raw[t]:
for (fname, values) in zip(tfields[t], row):
rf.write('@{:>30} = {}\n'.format(
fname,
' | '.join('{}'.format(v) for v in values),
))
rf.write('{}\n'.format('='*100))
rf.close()
info('Table {:<20}: {:>4} rows read'.format(t, len(rows_raw[t])))
if errors:
for t in sorted(errors):
for k in sorted(errors[t]):
warning('table {:<20}: {:<20}: {}'.format(t, k, ','.join(str(i) for i in errors[t][k])))
else:
info('Data import OK')
return rows_raw
In [37]:
def transformrows():
rows = {}
money_warnings.clear()
warnings = {}
for t in main_tables_raw:
for (i, row_raw) in enumerate(rows_raw.get(t, [])):
values = {}
for (fname, values_raw) in zip(tfields[t], row_raw):
if fname in SKIP_FIELDS.get(t, set()): continue
sep = SPLIT_FIELDS.get(t, {}).get(fname, None)
if sep != None:
values_raw = sorted(reduce(
set.union,
[set(splitters[sep].split(v)) for v in values_raw if v != None],
set(),
))
if '' in values_raw: values_raw.remove('')
norm_info = NORMALIZE_FIELDS.get(t, {}).get(fname, None)
if norm_info != None:
vlookup = norm_info['value_lookup']
values_norm = []
for v in values_raw:
vn = norm_info['norm_function'](v)
if vn not in vlookup:
warnings.setdefault(t, {}).setdefault(fname, {}).setdefault(v, set()).add(i)
vn = v
values_norm.append(vlookup[vn])
values_raw = values_norm
ftarget = merge_fields.get(t, {}).get(fname, fname)
(ftype, fmult) = field_defs_raw[t][ftarget]
valset = set()
for v_raw in values_raw:
if v_raw == None or v_raw in NULL_VALUES:
v = 'NULL'
elif ftype == 'text': v = sq(v_raw)
elif ftype == 'bool': v = bools(v_raw, i, t, fname)
elif ftype == 'number': v = num(v_raw, i, t, fname)
elif ftype == 'decimal': v = decimal(v_raw, i, t, fname)
elif ftype == 'email': v = email(v_raw, i, t, fname)
elif ftype == 'valuta': v = money(v_raw, i, t, fname)
elif ftype == 'date': v = dt(v_raw, i, t, fname)
elif ftype == 'datetime': v = dtm(v_raw, i, t, fname)
else: v = v_raw
valset.add(v)
if fmult > 1: valset.discard('NULL')
if len(valset) == 0 or valset == {'NULL'}:
defaultVal = DEFAULT_FIELDS.get(t, {}).get(fname, None)
if defaultVal != None:
valset = {defaultVal}
info('SUBSITUTED DEFAULT for {}.{}: {}'.format(t, fname, defaultVal))
these_values = values.setdefault(ftarget, set())
these_values |= valset
rows.setdefault(t, []).append(values)
info('Table `{}`: {:>5} rows checked'.format(t, len(rows[t])))
rf = open('{}_{}.{}'.format(ROW_FILE, t, ROW_EXT), 'w')
for row in rows[t]:
for (fname, values) in sorted(row.items()):
rf.write('@{:>30} = {}\n'.format(
fname,
' | '.join('{}'.format(v) for v in sorted(values)),
))
rf.write('{}\n'.format('='*100))
rf.close()
if warnings:
for t in warnings:
for f in warnings[t]:
for v in warnings[t][f]:
warning('table `{}`, field `{}`: no normal value for "{}" in {}'.format(
t, f, v, ','.join(str(i) for i in warnings[t][f][v]),
))
if money_notes:
for tf in sorted(money_notes):
for v in sorted(money_notes[tf]):
note('{} "{}" <= {}'.format(
tf, v,
' | '.join(money_notes[tf][v]),
))
if money_warnings:
for tf in sorted(money_warnings):
for v in sorted(money_warnings[tf]):
warning('{} "{}" <= {}'.format(
tf, v,
' | '.join(money_warnings[tf][v]),
))
else:
info('Money OK')
return rows
In [38]:
def pivot():
field_data_raw = {}
for t in main_tables_raw:
for row in rows[t]:
for (fname, values) in sorted(row.items()):
field_data_raw.setdefault(t, {}).setdefault(fname, []).append(values)
info('Table `{}`: {:<5} records and {:<2} fields pivoted'.format(
t, len(rows[t]), len(field_data_raw[t]),
))
# check
good = True
for t in field_data_raw:
for f in field_data_raw[t]:
if len(field_data_raw[t][f]) != len(rows[t]):
warning(
'table `{}`, field `{}`: wrong number of records: {} instead of {}'.format(
t, f, len(field_data_raw[t][f]), len(rows[t]),
))
good = False
if good:
info('Pivot OK')
else:
warning('There were errors', count=False)
return field_data_raw
In [39]:
def move_fields():
errors = 0
main_tables = deepcopy(main_tables_raw)
fields = deepcopy(fields_raw)
field_defs = deepcopy(field_defs_raw)
field_data = deepcopy(field_data_raw)
for t in MOVE_FIELDS:
if t not in field_data:
warning('move fields from table `{}`: this table does not exist'.format(
t,
))
errors += 1
continue
for t_new in MOVE_FIELDS[t]:
main_tables.append(t_new)
nid = '{}_id'.format(t)
field_data.setdefault(t_new, {})[nid] = [{i} for i in range(len(rows[t]))]
field_defs.setdefault(t_new, {})[nid] = ((t, 'id'), 1)
move_fields = set(MOVE_FIELDS[t][t_new])
for f in sorted(move_fields):
if f not in field_data[t]:
warning(
'table `{}`: move field `{}` to `{}`: this field does not exist'.format(
t, f, t_new,
))
errors += 1
move_fields.remove(f)
continue
field_data.setdefault(t_new, {})[f] = field_data[t][f]
del field_data[t][f]
field_defs.setdefault(t_new, {})[f] = field_defs[t][f]
del field_defs[t][f]
fields[t] = sorted(set(fields[t]) - move_fields)
fields[t_new] = [nid]+sorted(move_fields)
info('moved fields\n\t`{}`\nfrom `{}` to `{}`'.format(
'`\n\t`'.join(sorted(move_fields)), t, t_new,
))
if errors:
warning('There were {} errors'.format(errors), count=False)
else:
info('Move fields OK')
return (main_tables, fields, field_defs, field_data)
In [40]:
def extract(t, fname, maindata, relvalues, relindex, reltables, relxtables, relfieldindex):
fname_alt = fname
if fname in main_tables:
link_field = FIX_FIELDS.get(t, {}).get(fname, None)
if link_field:
note('table `{}`: value field `{}` will be linked to `{}:{}'.format(
t, fname, fname, link_field,
))
else:
warning('table `{}`: value field `{}` is already a table!'.format(t, fname))
fname_alt = '{}{}'.format(fname, TBF)
is_single = field_defs[t][fname][1] == 1 # single value or multiple values
error = False
if fname in relfieldindex:
warning(
'related table `{}` extracted from `{}` and earlier from `{}`'.format(
fname, t, relfieldindex[fname],
))
error = True
relfieldindex[fname] = t
for (i, values) in enumerate(field_data[t][fname]):
for value in values:
vid = relvalues.setdefault(fname, {}).get(value, None)
if vid == None:
relindex[fname] += 1
vid = relindex[fname]
reltables.setdefault(fname_alt, []).append((vid, value))
relvalues[fname][value] = vid
if is_single:
maindata[t][fname][i] = {vid}
else:
relxtables.setdefault(fname_alt, []).append((i, vid))
if not is_single: del maindata[t][fname]
return error
def transform_data():
maindata = deepcopy(field_data)
relvalues = {} # dicts
relindex = collections.Counter()
reltables = {} # lists
relxtables = {} # lists
relfieldindex = {}
errors = 0
for t in main_tables:
field_list =\
VALUE_FIELDS.get(t, set()) |\
{f for f in fields[t] if field_defs[t][f][1] > 1}
for fname in field_list:
if fname not in field_defs[t]:
warning('table `{}`: wrong field {}'.format(t, fname))
errors += 1
continue
error = extract(
t, fname, maindata,
relvalues, relindex, reltables, relxtables, relfieldindex,
)
if error: errors +=1
if errors:
warning('There were {} extraction errors'.format(errors), count=False)
else:
info('Extraction OK')
return (maindata, reltables, relxtables, relvalues, relfieldindex)
Processing step:
This is done by computing a mapping between the ids in the two tables, based on corresponding values in a certain field of the existing table. These values are the ones that occur in the field of the main table for which a value table has been generated.
In [41]:
def getmapping(main_t, main_f):
rel_t = '{}{}'.format(main_t, TBF)
rel_f = main_t
main_codes = maindata[main_t][main_f]
rel_codes = reltables[rel_t]
main_index = dict((list(c)[0],i) for (i,c) in enumerate(main_codes))
rel_index = dict((i,c) for (i,c) in rel_codes)
return dict((i, main_index[rel_index[i]]) for i in rel_index)
def fix(t, main_t, main_f):
mapping = getmapping(main_t, main_f)
rel_t = '{}{}'.format(main_t, TBF)
if rel_t in reltables: del reltables[rel_t]
new_maindata = [{mapping[list(x)[0]]} for x in maindata[t][main_t]]
maindata[t][main_t] = new_maindata
main_tables.remove(main_t)
main_tables.insert(0, main_t)
def fix_them():
for t in FIX_FIELDS:
for main_t in FIX_FIELDS[t]:
link_field = FIX_FIELDS[t][main_t]
note('linking `{}:{}` to table `{}` on `{}`'.format(
t, main_t, main_t, link_field,
))
fix(t, main_t, link_field)
info('Fixing OK')
Processing step:
Write the SQL files.
Two files are written:
The files will drop the database if it exists, and (re)create all data.
Care has to be taken that the tables are created before other tables that depend on them.
In [42]:
def getsize(source, fname):
values = set()
for vals in source: values |= set(vals)
maxlen = max({len(x) for x in values if x != 'NULL'}, default=0)
result = 0
for m in range(MIN_M, MAX_M+1):
if maxlen <= 2**m:
result = m
break
if maxlen > 2**MAX_M:
note(
'Field `{}`: value with length {} gets type TEXT'.format(
fname, maxlen, 2**MAX_M,
))
return False
return 2**m
def getdef(source, t, fname, newfname, warn_mult=True):
(ft, fmult) = field_defs[t][fname]
if warn_mult and fmult > 1:
warning(
'skipping field `{}` because it contains multiple values'.format(
fname,
))
return None
if type(ft) is tuple:
(ftable, ffield) = ft
ftype = 'int'
fsize = '(4)'
fext = ',\n\tforeign key ({}) references {}({})'.format(fname, ftable, ffield)
elif ft == 'bool':
ftype = 'tinyint'
fsize = '(1)'
fext = ''
elif ft == 'number':
ftype = 'int'
fsize = '(4)'
fext = ''
elif ft == 'decimal':
ftype = 'decimal'
fsize = '(10,2)'
fext = ''
elif ft == 'text' or ft == 'email':
ftype = 'varchar'
fsize_raw = getsize(source, fname)
if not fsize_raw:
ftype = 'text'
fsize = ''
else:
fsize = '({})'.format(fsize_raw)
fext = 'character set utf8'
elif ft == 'valuta':
ftype = 'decimal'
fsize = '(10,2)'
fext = ''
elif ft == 'date':
ftype = 'datetime'
fsize = ''
fext = ''
elif ft == 'datetime':
ftype = 'datetime'
fsize = ''
fext = ''
else:
warning('skipping field `{}` because it has unknown type `{}`'.format(
fname, ft,
))
return None
return '{} {}{} {}'.format(newfname, ftype, fsize, fext)
def getrdef(fname):
return '''{fn}_id int(4),
foreign key ({fn}_id) references {fn}(id)'''.format(fn=fname)
def sql_data(df, tname, flist, rows):
head = 'insert into {} ({}) values'.format(tname, ','.join(flist))
for (i, row) in enumerate(rows):
if i % LIMIT_ROWS == 0:
if i > 0: df.write(';')
df.write('\n')
df.write('select "table {} row {}" as " ";\n'.format(tname, i))
df.write(head)
sep = ''
df.write('\n{}\t'.format(sep))
sep = ','
df.write('({})'.format(','.join(str(x) for x in row)))
df.write(';\n')
def print_maintables(maindata, reltables, cf, df):
errors = 0
for t in main_tables:
fdefs = ['id int(4) primary key']
flist = sorted(maindata[t])
fnewlist = []
for fname in flist:
if fname in reltables or fname in FIX_FIELDS.get(t, {}):
fdef = getrdef(fname)
fnewname = '{}_id'.format(fname)
else:
fdef = getdef(field_data[t][fname], t, fname, fname)
if fdef == None:
errors += 1
continue
fnewname = fname
fdefs.append(fdef)
fnewlist.append(fnewname)
cf.write('''
create table {} (
{}
);
'''.format(t, ',\n\t'.join(fdefs)))
maintable_raw = zip(*(maindata[t][f] for f in flist))
maintable = [
[i]+[sorted(vals)[0] for vals in row] for (i, row) in enumerate(maintable_raw)
]
sql_data(df, t, ['id'] + fnewlist, maintable)
return errors
def print_reltables(reltables, relvalues, cf, df):
errors = 0
for tname_alt in sorted(reltables):
tname = tname_alt
pos = tname_alt.rfind(TBF)
if pos > 0: tname = tname_alt[0:pos]
fdefs = ['id int(4) primary key']
fdef = getdef(
[relvalues[tname].keys()],
relfieldindex[tname],
tname, 'val', warn_mult=False,
)
if fdef == None:
errors += 1
continue
fdefs.append(fdef)
cf.write('''
create table {} (
{}
);
'''.format(tname_alt, ',\n\t'.join(fdefs)))
sql_data(df, tname_alt, ['id', 'val'], reltables[tname_alt])
return errors
def print_relxtables(relxtables, cf, df):
errors = 0
for tname_alt in sorted(relxtables):
tname = tname_alt
pos = tname_alt.rfind(TBF)
if pos > 0: tname = tname_alt[0:pos]
t = relfieldindex[tname]
tname_rep = '{}_{}'.format(t, tname_alt)
main_id = '{}_id'.format(t)
val_id = '{}_id'.format(tname)
fdefs = '''
{mi} int(4),
{vi} int(4),
foreign key ({mi}) references {mt}(id),
foreign key ({vi}) references {tn}(id)
'''.format(mt=t, mi=main_id, tn=tname, vi=val_id)
cf.write('''
create table {} ({});
'''.format(tname_rep, fdefs))
sql_data(df, tname_rep, [main_id, val_id], relxtables[tname_alt])
return errors
def sql_export():
errors = 0
cf = open('{}/create.sql'.format(RESULT_DIR), 'w')
df = open('{}/data.sql'.format(RESULT_DIR), 'w')
df.write('''
select "FILL TABLES OF DATABASE {db}" as " ";
use {db};
'''.format(db=DB_NAME))
cf.write('''
select "CREATE DATABASE {db} AND TABLES" as " ";
drop database if exists {db};
create database {db} character set utf8;
use {db};
'''.format(db=DB_NAME))
cf.write('/* value tables */\n')
df.write('\n/* value tables */\n')
errors += print_reltables(reltables, relvalues, cf, df)
cf.write('/* main tables */\n')
df.write('\n/* main tables */\n')
errors += print_maintables(maindata, reltables, cf, df)
cf.write('/* cross tables */\n')
df.write('\n/* cross tables */\n')
errors += print_relxtables(relxtables, cf, df)
cf.close()
df.close()
if errors:
warning('There were {} errors'.format(errors), count=False)
else:
info('SQL OK')
We have defined a fully normalized model to get our data into SQL. However, we also want to insert our data into MongoDB. To that end, we will denormalize some data and model the data as documents (JSON structures) with a mix of embedding and references.
Even if we choose embedding for certain related information, there are still choices. For example, the TADIRAH categories can be embedded in contribution documents as arrays of string values. But maybe those categories will get multilingual labels. Also, at the client side we want to do filtering on these categories, and that is much more cleanly programmed if we have ids for those categories and if contributions refer to those ids. On the other hand, every contribution will have just a handful of categories at most, so we do not make an independent table of categories. Instead, we embed categories plus their id into the contrib document. This reasoning follows the rules of thumb of MongoDB modeling part 1.
We create a database with collections with documents as a Python dict, and then talk to MongoDB directly through its Python client. In this way we import all the data to MongoDB.
In [43]:
mconv = collections.Counter()
def mj(t, f, rid, val, skipref):
if f.endswith('_id') or (skipref and relfieldindex.get(f) == t):
ftype = 'ref'
else:
ftype = field_defs[t][f][0]
mconv[ftype] += 1
if val == 'NULL': v = None
elif ftype == 'text': v = val[1:-1].\
replace('\\t', '\t').replace('\\n', '\n').\
replace("''", "'")#.\
# replace('"', '\\"').\
elif ftype == 'bool': v = True if val == 1 else False
elif ftype == 'number': v = int(val)
elif ftype == 'decimal': v = float(val)
elif ftype == 'valuta': v = float(val)
elif ftype == 'date': v = datetime(*map(int, re.split('[:T-]', val[1:-1])))
elif ftype == 'datetime': v = datetime(*map(int, re.split('[:T-]', val[1:-1])))
else: v = val
if rid != None: v = {'_id': '{}'.format(rid), 'value': v}
return v
def mongo_transform():
mongo_db = {}
errors = 0
embed = {}
for r in reltables.keys():
t = relfieldindex[r]
embed.setdefault(t, set()).add(r) # all value fields wil be embedded again,
# but as subdocs with ids!
for t in main_tables:
lookup = {}
fdefs = ['_id']
flist = sorted(maindata[t])
fnewlist = []
embed_info = embed.get(t, set())
embed_fields = set()
ref_fields = set()
for fname in flist:
fncomps = fname.rsplit('_', 1)
fnamex = fncomps[0]
if fname in reltables or fname in FIX_FIELDS.get(t, {}):
fdef = getrdef(fname)
if fname in embed_info:
fnewname = fname
embed_fields.add(fname)
else:
ref_fields.add(fname)
fnewname = '{}_id'.format(fname)
elif fname.endswith('_id') and fnamex in main_tables:
ref_fields.add(fnamex)
fnewname = fname
else:
fdef = getdef(field_data[t][fname], t, fname, fname)
if fdef == None:
errors += 1
continue
fnewname = fname
fdefs.append(fdef)
fnewlist.append(fnewname)
for ef in embed.get(t, set()):
lookup[ef] = dict(row for row in reltables[ef])
maintable_raw = zip(*(maindata[t][f] for f in flist))
mongo_db[t] = []
for (i, row) in enumerate(maintable_raw):
doc = dict((fname, mj(t, fname, None, sorted(fvals)[0], True)) for (fname, fvals) in zip(fnewlist, row))
doc['_id'] = '{}'.format(i)
for ef in embed_fields:
rid = doc[ef]
rval = lookup[ef][rid]
doc[ef] = mj(t, ef, rid, rval, False)
for rf in ref_fields:
idfname = '{}_id'.format(rf)
rid = doc[idfname]
doc[idfname] = '{}'.format(rid)
mongo_db[t].append(doc)
for r in relxtables:
if relfieldindex[r] == t:
if r in embed.get(t, set()):
rfname = r
byref = False
else:
rfname = '{}_id'.format(r)
byref = True
for (main_id, rid) in relxtables[r]:
rval = '{}'.format(rid) if byref else mj(t, r, rid, lookup[r][rid], False)
mongo_db[t][main_id].setdefault(rfname, []).append(rval)
if errors:
warning('There were {} errors'.format(errors), count=False)
else:
info('MONGO_TRANSFORM OK')
return mongo_db
We want to embed a few country fields in the main contrib document. We just hack it.
We also replace the original country id by the country code. The table contrib is the only one that links to countries.
We also want to rename a few fields (from under_score convention to camelCase convention, for more harmony with Meteor).
In [44]:
MONGO_FIELDS = dict(
vcchead=dict(
vcc_head_name='vccHeadName',
),
remark=dict(
remark_date_time='dateRemark',
),
workinggroup=dict(
working_group_name='name',
main_vcc='vccMain',
liaison_vcc='vccLiaison',
potential_participants='participantsPotential',
general_contribution_to_dariah_infrastructure='infraContribution',
relation_to_dariah_objectives='dariahObjectives',
in_kind_provisions='contributionsInkind',
additional_funding='fundingAdditional',
future_location_of_the_service='locationServiceFuture',
date_proposal='dateProposed',
date_approval_jrc='dateApprovalJrc',
date_approval_smt='dateApprovalSMT',
extra_info='extraInfo',
theme__access='themeAccess',
theme__sustainability='themeSustainability',
theme__impact='themeImpact',
theme__interoperability='themeInteroperability',
theme__training='themeTraining',
),
contrib=dict(
creation_date_time='dateCreated',
modification_date_time='dateModified',
last_modifier='modifiedBy',
costs_total='costTotal',
total_costs_total='costTotalTotal',
costs_description='costDescription',
description_of_contribution='description',
ikid_base='ikidBase',
academic_entity_url='urlAcademic',
contribution_url='urlContribution',
contact_person_mail='contactPersonEmail',
contact_person_name='contactPersonName',
type_of_inkind='typeContribution',
other_type_of_inkind='typeContributionOther',
disciplines_associated='disciplines',
tadirah_research_activities='tadirahActivities',
tadirah_research_objects='tadirahObjects',
tadirah_research_techniques='tadirahTechniques',
other_keywords='keywords',
find_type='findType',
find_country_id='findCountry',
message_allert='alert',
),
assessment=dict(
dateandtime_approval='dateApproval',
dateandtime_cioapproval='dateApprovalCIO',
dateandtime_ciozero='dateZeroCIO',
vcchead_approval='vccHeadApproval',
vcchead_disapproval='vccHeadDisapproval',
vcchead_decision='vccHeadDecision',
vcc_head_decision_vcc11='vccHeadDecision11',
vcc_head_decision_vcc12='vccHeadDecision12',
vcc_head_decision_vcc21='vccHeadDecision21',
vcc_head_decision_vcc22='vccHeadDecision22',
vcc_head_decision_vcc31='vccHeadDecision31',
vcc_head_decision_vcc32='vccHeadDecision32',
vcc_head_decision_vcc41='vccHeadDecision41',
vcc_head_decision_vcc42='vccHeadDecision42',
vcc11_name='vccName11',
vcc12_name='vccName12',
vcc21_name='vccName21',
vcc22_name='vccName22',
vcc31_name='vccName31',
vcc32_name='vccName32',
vcc41_name='vccName41',
vcc42_name='vccName42',
),
country=dict(
member_dariah='inDARIAH',
),
help=dict(
help_description='description',
help_text='text',
),
)
In [45]:
def mongo_rename():
stats = collections.defaultdict(collections.Counter)
for c in MONGO_FIELDS:
fields = MONGO_FIELDS[c]
for d in mongo_db[c]:
for (of, nf) in fields.items():
if of in d:
stats[c][of] += 1
d[nf] = d[of]
del d[of]
print('Done')
if not stats:
print('No replacements')
else:
for c in stats:
print('\t{}:'.format(c))
for of in stats[c]:
print('\t\t{:>4}x {:<25} => {}'.format(
stats[c][of], of, MONGO_FIELDS[c][of]
))
In [46]:
def mongo_dedup():
countries = {}
for c in mongo_db['country']:
countries[c['_id']] = c
c['_id'] = c['countrycode']
del c['countrycode']
for c in mongo_db['contrib']:
cn_id = c['country_id']
cn_dt = countries[cn_id]
c['country'] = [{'_id': cn_dt['_id'], 'value': cn_dt['name']}]
del c['country_id']
In [93]:
def mongo_singlify():
duplicates = dict()
fields = (
'year',
'contactPersonName',
'country',
'creator',
)
knownExceptions = dict(
contactPersonName={
frozenset(('Steffen Pielström', 'Steffen Pielsträ')),
}
)
for c in mongo_db['contrib']:
for f in fields:
if f in c:
if len(c[f]) > 1:
mvals = frozenset(v['value'] for v in c[f])
if not mvals in knownExceptions.get(f, set()):
duplicates.setdefault(f, set()).add(mvals)
c[f] = c[f][0]
f = 'typeContribution'
fo = 'typeContributionOther'
if fo not in c or c[fo]['value'] == None: c[fo] = []
else: c[fo] = [c[fo]]
if f not in c and len(c[fo]) >= 1:
c[f] = [c[fo][0]]
c[fo] = c[fo][1:]
if f not in c or len(c[f]) == 0:
c[f] = []
warning('Contribution with no typeContribution: {}'.format(c['_id']))
elif len(c[f]) > 1:
c[fo] = c[f][1:] + c[fo]
c[f] = [c[f][0]]
else:
c[f] = [c[f][0]]
for f in c:
if f == '_id': continue
if type(c[f]) is dict:
c[f] = [c[f]]
elif type(c[f]) != list:
c[f] = [dict(_id=1, value=c[f])]
for f in duplicates:
warning('Duplicates in field {}'.format(f))
for s in duplicates[f]:
print('\t{}'.format(set(s)))
In [101]:
# legacy users and test users
def mongo_users():
existingUsers = []
testUsers = [
dict(eppn='suzan', email='suzan1@test.eu', mayLogin=True, authority='local',
firstName='Suzan', lastName='Karelse'),
dict(eppn='marie', email='suzan2@test.eu', mayLogin=True, authority='local',
firstName='Marie', lastName='Pieterse'),
dict(eppn='gertjan', email='gertjan@test.eu', mayLogin=False, authority='local',
firstName='Gert Jan', lastName='Klein-Holgerink'),
dict(eppn='lisa', email='lisa@test.eu', mayLogin=True, authority='local',
firstName='Lisa', lastName='de Leeuw'),
dict(eppn='dirk', email='dirk@test.eu', mayLogin=True, authority='local',
firstName='Dirk', lastName='Roorda'),
]
users = collections.defaultdict(set)
eppnSet = set()
for c in mongo_db['contrib']:
crs = c.get('creator', []) + c.get('modifiedBy', [])
for cr in crs:
eppnSet.add(cr['value'])
eppnIndex = dict(((eppn, i+1) for (i, eppn) in enumerate(sorted(eppnSet))))
for c in mongo_db['contrib']:
c['creator'] = [dict(_id=eppnIndex[cr['value']]) for cr in c['creator']]
if 'modifiedBy' not in c:
c['modifiedBy'] = []
else:
c['modifiedBy'] = [dict(_id=eppnIndex[cr['value']]) for cr in c['modifiedBy']]
users = dict(((i, eppn) for (eppn, i) in eppnIndex.items()))
for (i, eppn) in sorted(users.items()):
existingUsers.append(dict(_id=i, eppn=eppn, mayLogin=False, authority='legacy'))
info('\tlegacy user {} added'.format(eppn))
info('{} legacy users added'.format(len(users)))
maxUid = max(int(u) for u in users)
for u in testUsers:
maxUid += 1
u['_id'] = maxUid
existingUsers.append(u)
inGroups = [
dict(eppn='DirkRoorda@dariah.eu', authority='DARIAH', group='sys'),
]
main_tables.extend(['users', 'groups'])
mongo_db['users'] = existingUsers
mongo_db['groups'] = inGroups
info('{} test users added'.format(len(testUsers)))
In [102]:
def import_mongo():
client = MongoClient()
client.drop_database('dariah')
db = client.dariah
for t in main_tables:
info(t)
db[t].insert_many(mongo_db[t])
To import the bson dump in another mongodb installation, use the commandline to dump the dariah database here
mongodump -d dariah -o dariah
and to import it elsewhere.
mongorestore --drop -d dariah dariah
In [103]:
# example of a warning message
warning('table `{}` field `{}`: strange value "{}"'.format('assess', 'date_approved', '2017-00-03'))
Here we go!
In [104]:
info('{:=^80}'.format('BEGIN PROCESSING'))
resetw()
info('{:=^80}'.format('CHECK CONFIG'))
check_config()
info('{:=^80}'.format('READ FM'))
(root, main_tables_raw) = read_fm()
info('{:=^80}'.format('MERGE pre CHECK'))
merge_fields = check_merge()
info('{:=^80}'.format('FIELD DEFINITIONS'))
(tfields, field_defs_raw) = getfielddefs()
info('{:=^80}'.format('FIELD VALUE NORMALIZATION'))
check_normalize()
info('{:=^80}'.format('MERGE post CHECK'))
check_merge_more()
info('{:=^80}'.format('SKIP FIELDS'))
fields_raw = do_skips()
info('{:=^80}'.format('READ DATA'))
rows_raw = getdata()
info('{:=^80}'.format('TRANSFORM ROWS'))
rows = transformrows()
info('{:=^80}'.format('PIVOT DATA'))
field_data_raw = pivot()
info('{:=^80}'.format('MOVE FIELDS'))
(main_tables, fields, field_defs, field_data) = move_fields()
info('{:=^80}'.format('REMODEL DATA'))
(maindata, reltables, relxtables, relvalues, relfieldindex) = transform_data()
info('{:=^80}'.format('FIX LINKED DATA'))
fix_them()
info('{:=^80}'.format('WRITE SQL'))
sql_export()
info('{:=^80}'.format('PREPARE MONGO DOCS'))
mongo_db = mongo_transform()
info('{:=^80}'.format('DEDUP MONGO DOCS'))
mongo_dedup()
info('{:=^80}'.format('RENAME MONGO FIELDS'))
mongo_rename()
info('{:=^80}'.format('SINGLIFY MONGO DOCS'))
mongo_singlify()
info('{:=^80}'.format('ADD LEGACY USERS TO MONGO'))
mongo_users()
info('{:=^80}'.format('END PROCESSING'))
finalw()
Mongo documents are created, now import them into the MongoDB
In [105]:
import_mongo()
In [53]:
def pprintf(tname, fname):
values_raw = field_data[tname][fname]
values = sorted(v for v in reduce(set.union, values_raw, set()) if v != 'NULL')
print('\n'.join('{}'.format(v) for v in values))
In [54]:
pprintf('contrib', 'country')
In [55]:
fdefs = getfielddefs()[0]
for tb in fdefs:
print('{}\t{}'.format(tb, ''))
for fld in fdefs[tb]:
print('{}\t{}'.format('', fld))
In [56]:
client = MongoClient()
dbm = client.dariah
for d in dbm.contrib.find({}).limit(2):
print('=' * 50)
for f in sorted(d):
print('{}={}'.format(f, d[f]))
Here is a query to get all 'type_of_inkind' values for contributions.
In [57]:
for c in dbm.contrib.distinct('typeContribution', {}):
print(c)
Here are the users:
In [58]:
for c in dbm.users.find({}):
print(c)
Here are the countries:
In [59]:
for c in dbm.country.find({'inDARIAH': True}):
print(c)
In [60]:
for c in dbm.contrib.distinct('country', {}):
print(c)
Let us get related data: the type_of_inkind of all contributions. For each contribution we need only the ids of the related type_of_inkind values.
In [61]:
for d in dbm.contrib.find({}, {'type_of_inkind._id': True}).limit(10):
print(d)
In [62]:
for d in dbm.contrib.find({}, {'country._id': True}).limit(10):
print(d)
In [ ]: