This tutorial will take you through the steps of running simple workflows using Pegasus Workflow Management System. Pegasus allows scientists to:
Automate their scientific computational work, as portable workflows. Pegasus enables scientists to construct workflows in abstract terms without worrying about the details of the underlying execution environment or the particulars of the low-level specifications required by the middleware (Condor, Globus, or Amazon EC2). It automatically locates the necessary input data and computational resources necessary for workflow execution. It cleans up storage as the workflow is executed so that data-intensive workflows have enough space to execute on storage-constrained resources.
Recover from failures at runtime. When errors occur, Pegasus tries to recover when possible by retrying tasks, and when all else fails, provides a rescue workflow containing a description of only the work that remains to be done. It also enables users to move computations from one resource to another. Pegasus keeps track of what has been done (provenance) including the locations of data used and produced, and which software was used with which parameters.
Debug failures in their computations using a set of system provided debugging tools and an online workflow monitoring dashboard.
This tutorial is intended for new users who want to get a quick overview of Pegasus concepts and usage via Jupyter.
For more information about Pegasus, please visit the Pegasus website: http://pegasus.isi.edu
Scientific workflows allow users to easily express multi-step computational tasks, for example retrieve data from an instrument or a database, reformat the data, and run an analysis. A scientific workflow describes the dependencies between the tasks and in most cases the workflow is described as a directed acyclic graph (DAG), where the nodes are tasks and the edges denote the task dependencies. A defining property for a scientific workflow is that it manages data flow. The tasks in a scientific workflow can be everything from short serial tasks to very large parallel tasks (MPI for example) surrounded by a large number of small, serial tasks used for pre- and post-processing.
In [ ]:
from Pegasus.jupyter.instance import *
By default, the API automatically creates a folder in the user's $HOME
directory based on the workflow name. For the sake of completeness, in this tutorial we will pre-define a path where the workflow files should be created:
In [ ]:
workflow_dir = '/home/tutorial/wf-split-tutorial'
Pegasus reads workflow descriptions from DAX files. The term "DAX" is short for "Directed Acyclic Graph in XML". DAX is an XML file format that has syntax for expressing jobs, arguments, files, and dependencies. We now will be creating a split workflow using the Pegasus provided DAX API.
The code for creating the DAX object has 3 main sections:
In [ ]:
# Create an abstract dag
dax = ADAG('split')
In [ ]:
webpage = File('pegasus.html')
dax.addFile(webpage)
# the split job that splits the webpage into smaller chunks
split = Job('split')
split.addArguments('-l', '100', '-a', '1', webpage, 'part.')
split.uses(webpage, link=Link.INPUT)
dax.addJob(split)
# we do a parmeter sweep on the first 4 chunks created
for c in "abcd":
part = File("part.%s" % c)
split.uses(part, link=Link.OUTPUT, transfer=False, register=False)
count = File("count.txt.%s" % c)
wc = Job("wc")
wc.addProfile( Profile("pegasus","label","p1"))
wc.addArguments("-l",part)
wc.setStdout(count)
wc.uses(part, link=Link.INPUT)
wc.uses(count, link=Link.OUTPUT, transfer=True, register=True)
dax.addJob(wc)
#adding dependency
dax.depends(wc, split)
A complete specification of the DAX API is available here.
The Replica Catalog (RC) tells Pegasus where to find each of the input files for the workflow.
All files in a Pegasus workflow are referred to in the DAX using their Logical File Name (LFN). These LFNs are mapped to Physical File Names (PFNs) when Pegasus plans the workflow. This level of indirection enables Pegasus to map abstract DAXes to different execution sites and plan out the required file transfers automatically.
The replica catalog for our example workflow contains only one entry for the workflow’s only input file. This entry has an LFN of pegasus.html
with a PFN of file:///home/tutorial/jupyter/pegasus.html
and the file is stored on the local site, which implies that it will need to be transferred to the condorpool site when the workflow runs.
In [ ]:
rc = ReplicaCatalog(workflow_dir)
rc.add('pegasus.html', 'file:///home/tutorial/jupyter/pegasus.html', site='local')
The Transformation Catalog (TC) describes all of the executables (called "transformations") used by the workflow. This description includes the site(s) where they are located, the architecture and operating system they are compiled for, and any other information required to properly transfer them to the execution site and run them.
The TC should contain information about two transformations: wc
, and split
. These transformations are referenced in the split DAX. The transformation catalog indicates that both transformations are installed on the condorpool
site, and are compiled for x86_64
Linux.
In [ ]:
e_split = Executable('split', arch=Arch.X86_64, os=OSType.LINUX, installed=True)
e_split.addPFN(PFN('file:///usr/bin/split', 'condorpool'))
e_wc = Executable('wc', arch=Arch.X86_64, os=OSType.LINUX, installed=True)
e_wc.addPFN(PFN('file:///usr/bin/wc', 'condorpool'))
In [ ]:
tc = TransformationCatalog(workflow_dir)
tc.add(e_split)
tc.add(e_wc)
In [ ]:
sc = SitesCatalog(workflow_dir)
sc.add_site('condorpool', arch=Arch.X86_64, os=OSType.LINUX)
sc.add_site_profile('condorpool', namespace=Namespace.PEGASUS, key='style', value='condor')
sc.add_site_profile('condorpool', namespace=Namespace.CONDOR, key='universe', value='vanilla')
In [ ]:
instance = Instance(dax, replica_catalog=rc, transformation_catalog=tc, sites_catalog=sc, workflow_dir=workflow_dir)
In [ ]:
instance.run(site='condorpool', force=True)
At this point, the workflow has been planned and started its execution in the above folder (named submit_dir
).
The Pegasus mapper generates an executable workflow based on the abstract workflow. It finds the appropriate software, data, and computational resources required for workflow execution. It can also restructure the workflow to optimize performance and adds transformations for data management and provenance information generation.
After the workflow has been submitted you can monitor it using the status()
method. This method takes two arguments:
loop
: whether the status command should be invoked once or continuously until the workflow is completed or a failure is detected.delay
: The delay (in seconds) the status will be refreshed. Default value is 10s.
In [ ]:
instance.status(loop=True, delay=5)
Congratulations! You have completed the tutorial.
Refer to the other chapters in the Pegasus documentation for more information about creating, planning, and executing workflows with Pegasus.
Please contact the Pegasus Users Mailing list at pegasus-users@isi.edu if you need help.
In [ ]: