DKRZ data ingest workflow information update

(Disclaimer: This demo notebook is for data managers only !)

Updating information with respect to the data ingest workflow (e.g. adding quality assurance information or data publication related information) should be done in a well structured way - based on well defined steps.

These steps update consistent information sets with respect to specific workflow action (e.g. data publication)

Thus the submission_forms package provides a collection of components to support these activities.

A consistent update step normally consists of

  • update on who did what, when (e.g. data manager A quality checked data B at time C ..)
  • update on additional information on the activity (e.g. add the quality assurance record)
  • updatee the status of the individual workflow step (open, paused, action-required, closed etc.)

The following generic status states are defined:

  • ACTIVITY_STATUS = "0:open, 1:in-progress ,2:action-required, 3:paused,4:closed"
  • ERROR_STATUS = "0:open,1:ok,2:error"
  • ENTITY_STATUS = "0:open,1:stored,2:submitted,3:re-opened,4:closed"
  • CHECK_STATUS = "0:open,1:warning, 2:error,3:ok"

In [ ]:
# import necessary packages
from dkrz_forms import form_handler, utils, wflow_handler, checks
from datetime import datetime
from pprint import pprint

demo examples - step by step

The following examples can be adopted to the data managers needs by e.g. creating targeted jupyter notebooks or python scripts Data managers have two separate application scenarios for data ingest information management:

  • Alternative A)

    • check out out git repo https://gitlab.dkrz.de/DKRZ-CMIP-Pool/data_forms_repo
    • this repo contains all completed submission forms
    • all data manager related changes are also committed there
    • subdirectories in this repo relate to the individual projects (e.g. CMIP6, CORDEX, ESGF_replication, ..)
    • each entry there contains the last name of the data submission originator
  • Alternative B) (not yet documented, only prototype)

    • use search interface and API of search index on all submision forms

In [ ]:
# load workflow form object
info_file = "path_to_file.json"
my_form = utils.load_workflow_form(info_file)

In [ ]:
# show the workflow steps for this form (long-name, short-name) 
# to select a specific action, you can use the long name, e.g. 'data_ingest' or the related short name e.g. 'ing'
wflow_dict = wflow_handler.get_wflow_description(my_form)
pprint(wflow_dict)

Step 2: indicate who is working on which workflow step


In [ ]:
# 'start_action' updates the form with information on who is currently working on the form 
# internal information on this (timestamp, status information) is automatically set ..
# the resulting 'working version' of the form is commited to the work repository

wflow_handler.start_action('data_submission_review',my_form,"stephan kindermann")

Step 3: indicate the update and closure of a specific workflow step


In [ ]:
review_report = {}
review_report['comment'] = 'needed to change and correct submission form'
review_report['additional_info'] = "mail exchange with a@b with respect to question ..."

myform = wflow_handler.finish_action('data_submission_review',my_form,"stephan kindermann",review_report)

interactive "help": use ?form.part and tab completion:


In [ ]:
my_form.rev.entity_out.report

Display status of report


In [ ]:
report = checks.check_report(my_form,"sub")
checks.display_report(report)

In [ ]:
my_form.rev.entity_in.check_status

Display status of form


In [ ]:
my_form.sub.activity.ticket_url

In [ ]:
part = checks.check_step_form(my_form,"sub")
checks.display_check(part,"sub")

In [ ]:
## global check
res  = checks.check_generic_form(my_form)
checks.display_checks(my_form,res)

In [ ]:
print(my_form.sub.entity_out.status)
print(my_form.rev.entity_in.form_json)
print(my_form.sub.activity.ticket_id)

In [ ]:
pprint(my_form.workflow)

Appendix

Sometimes it is necessary to modify specific information and not relay on the generic steps described above here are some examples

Attention: this section has to refined and the status information flags have to be revised and adapted to the actual needs


In [ ]:
workflow_form = utils.load_workflow_form(info_file)
   
review = workflow_form.rev

# any additional information keys can be added,
# yet they are invisible to generic information management tools ..
workflow_form.status = "review"

review.activity.status = "1:in-review"
review.activity.start_time = str(datetime.now())
review.activity.review_comment = "data volume check to be done"
review.agent.responsible_person = "sk"

sf = form_handler.save_form(workflow_form, "sk: review started")

review.activity.status = "3:accepted"
review.activity.ticket_id = "25389"
review.activity.end_time = str(datetime.now())

review.entity_out.comment = "This submission is related to submission abc_cde"
review.entity_out.tag = "sub:abc_cde"  # tags are used to relate different forms to each other
review.entity_out.report = {'x':'y'}   # result of validation in a dict (self defined properties)

# ToDo: test and document save_form for data managers (config setting for repo)   
sf = form_handler.save_form(workflow_form, "kindermann: form_review()")

Comment: alternatively in tools workflow_step related information could also be directly given and assigned via dictionaries, yet this is only recommended for data managers making sure the structure is consistent with the preconfigured one given in config/project_config.py

  • example validation.activity.__dict__ = data_manager_generated_dict

In [ ]:
workflow_form = utils.load_workflow_form(info_file)
   
ingest = workflow_form.ing

In [ ]:
?ingest.entity_out

In [ ]:
# agent related info
workflow_form.status = "ingest"

ingest.activity.status = "started"
ingest.agent.responsible_person = "hdh"
ingest.activity.start_time=str(datetime.now())

# activity related info

ingest.activity.comment = "data pull: credentials needed for remote site"
sf = form_handler.save_form(workflow_form, "kindermann: form_review()")

In [ ]:
ingest.activity.status = "completed"
ingest.activity.end_time = str(datetime.now())

# report of the ingest process (entity_out of ingest workflow step)
ingest_report = ingest.entity_out
ingest_report.tag = "a:b:c"  # tag structure to be defined
ingest_report.status = "completed"
# free entries for detailed report information
ingest_report.report.remote_server = "gridftp.awi.de://export/data/CMIP6/test"
ingest_report.report.server_credentials = "in server_cred.krb keypass"
ingest_report.report.target_path = ".."
sf = form_handler.save_form(workflow_form, "kindermann: form_review()")

In [ ]:
ingest_report.report.

workflow step: data quality assurance


In [ ]:
from datetime import datetime
workflow_form = utils.load_workflow_form(info_file)
   
qua = workflow_form.qua

In [ ]:
workflow_form.status = "quality assurance"
qua.agent.responsible_person = "hdh"

qua.activity.status = "starting" 
qua.activity.start_time = str(datetime.now())

sf = form_handler.save_form(workflow_form, "hdh: qa start")

In [ ]:
qua.entity_out.status = "completed"
qua.entity_out.report = {
    "QA_conclusion": "PASS",
    "project": "CORDEX",
    "institute": "CLMcom",
    "model": "CLMcom-CCLM4-8-17-CLM3-5",
    "domain": "AUS-44",
    "driving_experiment":  [ "ICHEC-EC-EARTH"],
    "experiment": [ "history", "rcp45", "rcp85"],
    "ensemble_member": [ "r12i1p1" ],
    "frequency": [ "day", "mon", "sem" ],
    "annotation":
    [
        {
            "scope": ["mon", "sem"],
            "variable": [ "tasmax", "tasmin", "sfcWindmax" ],
            "caption": "attribute <variable>:cell_methods for climatologies requires <time>:climatology instead of time_bnds",
            "comment": "due to the format of the data, climatology is equivalent to time_bnds",
            "severity": "note"
        }
    ]
}
sf = form_handler.save_form(workflow_form, "hdh: qua complete")

workflow step: data publication


In [ ]:
workflow_form = utils.load_workflow_form(info_file)

workflow_form.status = "publishing"

pub = workflow_form.pub
pub.agent.responsible_person = "katharina"
pub.activity.status = "starting"
pub.activity.start_time = str(datetime.now())

sf = form_handler.save_form(workflow_form, "kb: publishing")

In [ ]:
pub.activity.status = "completed"
pub.activity.comment = "..."
pub.activity.end_time = ".."
pub.activity.report = {'model':"MPI-M"}   # activity related report information

pub.entity_out.report = {'model':"MPI-M"} # the report of the publication action - all info characterizing the publication
sf = form_handler.save_form(workflow_form, "kb: published")

In [ ]:
sf = form_handler.save_form(workflow_form, "kindermann: form demo run 1")

In [ ]:
sf.sub.activity.commit_hash