Meta Data


This notebook contains everything you need to create a nice neat list of meta data dictionaries out of netcdf files. In this case we have made one meta data dictionary for each day in a five year span. The dictionaries are only created when there is data available on the given day, and there are up to 8 datafiles represented on each day. Each files contains data from various sensors and that is reported out in a whole slew of variables. Each variable has attributes associated with it in the netcdf file. These attributes are carried over into the dict and other attributes are added, such as a flag variable that can be raised for various problematic data situations (missing data, unreasonable data, ...)

Overview of Data Dict Structure

What we want is the output to be a dict for each of the following: Variable, File, Metadata. The functions that generate each of these will be called from the parse_netcdf function. Then the dicts will be fed into the classes and output to the database.

Setup


In [3]:
from __future__ import print_function
import pandas as pd
import datetime as dt
import numpy as np
import os
import xray
from posixpath import join

In [34]:
from flask.ext.mongoengine import MongoEngine

db = MongoEngine()

In [4]:
ROOTDIR = 'C:/Users/Julia/Documents/GitHub/MpalaTower/raw_netcdf_output/'
data = 'Table1'

datas = ['upper', 'Table1', 'lws', 'licor6262', 'WVIA',
         'Manifold', 'flux', 'ts_data', 'Table1Rain']
non_static_attrs = ['instrument', 'source', 'program', 'logger']
static_attrs = ['station_name', 'lat', 'lon', 'elevation',
                'Year', 'Month', 'DOM', 'Minute', 'Hour',
                'Day_of_Year', 'Second', 'uSecond', 'WeekDay']

# Setting expected ranges for units. It is ok to include multiple ways of writing
# the same unit, just put all the units in a list
flag_by_units = {}

temp_min = 0
temp_max = 40
temp = ['Deg C', 'C']
for unit in temp:
    flag_by_units.update({unit : {'min' : temp_min, 'max' : temp_max}})
    
percent_min = 0
percent_max = 100
percent = ['percent', '%']
for unit in percent:
    flag_by_units.update({unit : {'min' : percent_min, 'max' : percent_max}})

shf_min = ''
shf_max = ''
shf = ['W/m^2']

shf_cal_min = ''
shf_cal_max = ''
shf_cal = ['W/(m^2 mV)']

batt_min = 11
batt_max = 240
batt = ['Volts', 'V']
for unit in batt:
    flag_by_units.update({unit : {'min' : batt_min, 'max' : batt_max}})

PA_min = 15
PA_max = 25
PA = ['uSec']

In [66]:
def process_netcdf(input_dir, data, f, static_attrs):
    ds = xray.Dataset()
    ds = xray.open_dataset(join(input_dir, data, f),
                           decode_cf=True, decode_times=True)
    df = ds.to_dataframe()

    # drop from df, columns that don't change with time
    exclude = [var for var in static_attrs if var in df.columns]
    df_var = df.drop(exclude, axis=1)  # dropping vars like lat, lon

    # get some descriptive statistics on each of the variables
    df_summ = df_var.describe()
    return ds, df_summ

Variable


In [82]:
class Variable(db.EmbeddedDocument):
    timestep_count = db.IntField()
    flags = db.ListField(db.StringField())
    name = db.StringField(db_field='var')
    units = db.StringField()
    count = db.IntField()
    avg_val = db.FloatField(db_field='mean')  # Avoid function names
    std_val = db.FloatField(db_field='std')
    min_val = db.FloatField(db_field='min')
    max_val = db.FloatField(db_field='max')
    p25th = db.FloatField(db_field='25%')
    p75th = db.FloatField(db_field='75%')
    content_type = db.StringField(db_field='content_coverage_type')
    coordinates = db.StringField()
    comment = db.StringField()

In [109]:
def generate_flags(var, flag_by_units):
    # check status of data and raise flags
    flags = []

    if var.timestep_count*11/12 < var.count < var.timestep_count:
        flags.append('missing a little data')
    elif var.timestep_count < var.count <= var.timestep_count*11/12:
        flags.append('missing some data')
    elif var.timestep_count/12 <= var.count <= var.timestep_count/2:
        flags.append('missing lots of data')
    elif var.count == 0:
        flags.append('no data')

    try:
        if var.name.startswith('del'):
            pass
        elif var.comment == 'Std':  # don't check std_dev
            pass
        else:
            if var.max_val > flag_by_units[var.units]['max']:
                flags.append('contains high values')
            if var.min_val < flag_by_units[var.units]['min']:
                flags.append('contains low values')
    except:
        pass
    return flags

In [110]:
def generate_variable(ds, df_summ, var, flag_by_units):
    if df_summ[var]['count'] != 0:
        this_variable = Variable(
            name=var,
            timestep_count=len(ds['time']),
            count=df_summ[var]['count'],
            avg_val=df_summ[var]['mean'],
            std_val=df_summ[var]['std'],
            min_val=df_summ[var]['min'],
            p25th=df_summ[var]['25%'],
            p75th=df_summ[var]['75%'],
            max_val=df_summ[var]['max'],
            units=ds[var].attrs['units'],
            comment=ds[var].attrs['comment'],
            coordinates=ds[var].attrs['coordinates'],
            content_type=ds[var].attrs['content_coverage_type'],
        )
    else:
        this_variable = Variable(
            name=var,
            timestep_count=len(ds['time']),
            count=df_summ[var]['count']
        )
    this_variable.flags = generate_flags(this_variable, flag_by_units)

    return this_variable

Datafile attribute dict


In [245]:
class File(db.EmbeddedDocument):

    source = db.StringField()
    instrument = db.StringField()
    datafile = db.StringField()
    filename = db.StringField()
    frequency = db.FloatField()
    frequency_flag = db.StringField()

    # The File object contains a list of Variables:
    variables = db.EmbeddedDocumentListField(Variable)

In [189]:
def convert_to_sec(num, units):
    if units.startswith(('Min','min')):
        out = int(num)*60
    elif units.startswith(('ms', 'mS')):
        out = float(num)/1000
    elif units.startswith(('s','S')):
        out = int(num)
    else:
        print('couldn\'t parse units')
        return (num, units)
    return out

In [239]:
def programmed_frequency(f, this_file):
    data = f['data']
    program = this_file.source.split('CPU:')[1].split(',')[0]
    try:
        prog = open(join(f['dir'], 'programs', program))
    except:
        freq_flag = 'program: %s not found' % program
        freq = float('nan')
        return freq_flag, freq
    lines = prog.readlines()
    i= 0
    k = 0
    interval = None
    DT = 'DataTable'
    DI = 'DataInterval'
    CT = 'CallTable'
    for i in range(len(lines)):
        if lines[i].split()[0:] == [DT, data]:
            k = i
        if lines[i].split()[0:1] == DI and i <= (k+2):
            interval = lines[i].split(',')[1]
            units = lines[i].split(',')[2]
        i +=1
    if interval == None:
        i = 0
        for i in range(len(lines)):
            if lines[i].split()[0:1] == 'Scan':
                interval = lines[i].split('(')[1].split(',')[0]
                units = lines[i].split(',')[1]
            if lines[i].split()[0:2] == [CT, data] and i <= (k+7):
                interval = interval
                units = units
            else:
                interval = None
                units = None
            i +=1
    if interval == None:
        freq_flag = 'could not find interval in %s' % program
        freq = 'nan'
        return freq_flag, freq
    try:
        num = int(interval)
    except:
        for line in lines:
            if line.startswith('Const '+interval):
                a = line.split('=')[1]
                b = a.split()[0]
                num = int(b)
    freq = convert_to_sec(num, units)
    freq_flag = 'found frequency'
    
    return freq_flag, freq

In [240]:
def generate_file(f, ds, df_summ, flag_by_units):
    this_file = File(
            source=ds.attrs['source'],
            instrument=ds.attrs['instrument'],
            filename=f['filename']
        )
    freq_flag, freq = programmed_frequency(f, this_file)
    this_file.frequency = float(freq)
    this_file.frequency_flag = freq_flag
    variables = []
    for var in df_summ:
        variables.append(generate_variable(ds, df_summ, var, flag_by_units))
    this_file.variables = variables
    
    return this_file

Metadata


In [216]:
class Metadata(db.Document):

    license = db.StringField()
    title = db.StringField()
    creator = db.StringField(db_field='creator_name', default='Kelly Caylor')
    creator_email = db.EmailField()
    institution = db.StringField()
    aknowledgements = db.StringField()
    feature_type = db.StringField(db_field='featureType')
    year = db.IntField(required=True)
    month = db.IntField(required=True)
    doy = db.IntField(required=True)
    date = db.DateTimeField(required=True)
    summary = db.StringField()
    conventions = db.StringField()
    naming_authority = db.StringField()  # or URLField?

    # The Metadata object contains a list of Files:
    files = db.EmbeddedDocumentListField(File)

    meta = {
        'collection': 'metadata',
        'ordering': ['-date'],
        'index_background': True,
        'indexes': [
            'year',
            'month',
            'doy',
        ]
    }

In [135]:
def generate_metadata(self, input_dir, ds):
    
    self.license = ds.attrs['license']
    self.title = ds.attrs['title']
    self.creator=ds.attrs['creator_name']
    self.creator_email=ds.attrs['creator_email']
    self.institution=ds.attrs['institution']
    self.aknowledgements=ds.attrs['acknowledgement']
    self.feature_type=ds.attrs['featureType']
    self.summary=ds.attrs['summary']
    self.conventions=ds.attrs['Conventions']
    self.naming_authority=ds.attrs['naming_authority']
    
    return self

Process data into list of daily data dicts

data_dict = {'year': 2010, 'doy': 001, 'month': 01 'date': 2010-01-01, 'files': [{'ROOTDIR': ROOTDIR, 'data': data, 'f': f}, {'ROOTDIR': ROOTDIR, 'data': data, 'f': f}, {'ROOTDIR': ROOTDIR, 'data': data, 'f': f}]

In [136]:
def find_dates(self, input_dir, datas):
    data_list = []
    data_dict = None
    start = '2010-01-01'
    end = dt.datetime.utcnow()
    rng = pd.date_range(start, end, freq='D')
    for date in rng:
        i = 0
        y = date.year
        m = date.month
        d = date.dayofyear
        f = 'raw_MpalaTower_%i_%03d.nc' % (y, d)
        if any(f in os.listdir(join(input_dir, data)) for data in datas):
            data_dict = {'year': y, 'month' : m, 'doy': d, 'date' : date, 'files': []}

In [ ]:
def find_files(this_metadata, datas):
    f = 'raw_MpalaTower_%i_%03d.nc' % (this_metadata.year, this_metadata.doy)
    for data in datas:
        if f in os.listdir(join(input_dir, data)):
            this_file.datafile = data
            this_file.filename = f
            this_metadata.files.append.(this_file)
    return this_metadata

Send to internet


In [243]:
from flask.ext.mongoengine import MongoEngine

db = MongoEngine()
db.connect(host='mongodb://joey:joejoe@dogen.mongohq.com:10097/mpala_tower_metadata')


Out[243]:
MongoClient('dogen.mongohq.com', 10097)

In [ ]:


In [244]:
ds, df_summ = process_netcdf(data_dict['files'][0]['dir'],
                                 data_dict['files'][0]['data'],
                                 data_dict['files'][0]['filename'],
                                 static_attrs)
    this_metadata = generate_metadata(data_dict, ds)
    for f in data_dict['files']:
        print(f['filename'],f['data'])
        ds, df_summ = process_netcdf(f['dir'], f['data'], f['filename'], static_attrs)
        this_file = generate_file(f, ds, df_summ, flag_by_units)
        this_metadata.files.append(this_file)
    this_metadata.save()


raw_MpalaTower_2015_097.nc upper
raw_MpalaTower_2015_098.nc upper
raw_MpalaTower_2015_098.nc Table1
raw_MpalaTower_2015_098.nc flux
raw_MpalaTower_2015_099.nc upper
raw_MpalaTower_2015_099.nc Table1
raw_MpalaTower_2015_099.nc lws
raw_MpalaTower_2015_099.nc Manifold
raw_MpalaTower_2015_099.nc flux
raw_MpalaTower_2015_100.nc upper
raw_MpalaTower_2015_100.nc Table1
raw_MpalaTower_2015_100.nc lws
raw_MpalaTower_2015_100.nc WVIA
raw_MpalaTower_2015_100.nc Manifold
raw_MpalaTower_2015_100.nc flux
---------------------------------------------------------------------------
AutoReconnect                             Traceback (most recent call last)
<ipython-input-244-c19f146926cb> in <module>()
     12         this_file = generate_file(f, ds, df_summ, flag_by_units)
     13         this_metadata.files.append(this_file)
---> 14     this_metadata.save()

C:\Anaconda\lib\site-packages\mongoengine\document.pyc in save(self, force_insert, validate, clean, write_concern, cascade, cascade_kwargs, _refs, save_condition, **kwargs)
    305                     object_id = collection.insert(doc, **write_concern)
    306                 else:
--> 307                     object_id = collection.save(doc, **write_concern)
    308             else:
    309                 object_id = doc['_id']

C:\Anaconda\lib\site-packages\pymongo\collection.pyc in save(self, to_save, manipulate, safe, check_keys, **kwargs)
    283 
    284         if "_id" not in to_save:
--> 285             return self.insert(to_save, manipulate, safe, check_keys, **kwargs)
    286         else:
    287             self.update({"_id": to_save["_id"]}, to_save, True,

C:\Anaconda\lib\site-packages\pymongo\collection.pyc in insert(self, doc_or_docs, manipulate, safe, check_keys, continue_on_error, **kwargs)
    413             message._do_batched_insert(self.__full_name, gen(), check_keys,
    414                                        safe, options, continue_on_error,
--> 415                                        self.uuid_subtype, client)
    416 
    417         if return_one:

C:\Anaconda\lib\site-packages\pymongo\mongo_client.pyc in _send_message(self, message, with_last_error, command)
   1136             except (ConnectionFailure, socket.error), e:
   1137                 self.disconnect()
-> 1138                 raise AutoReconnect(str(e))
   1139             except:
   1140                 sock_info.close()

AutoReconnect: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond

In [ ]: