Pegasus Workflow Management System

A Python Jupyter Tutorial

This tutorial will take you through the steps of running simple workflows using Pegasus Workflow Management System. Pegasus allows scientists to:

  1. Automate their scientific computational work, as portable workflows. Pegasus enables scientists to construct workflows in abstract terms without worrying about the details of the underlying execution environment or the particulars of the low-level specifications required by the middleware (Condor, Globus, or Amazon EC2). It automatically locates the necessary input data and computational resources necessary for workflow execution. It cleans up storage as the workflow is executed so that data-intensive workflows have enough space to execute on storage-constrained resources.

  2. Recover from failures at runtime. When errors occur, Pegasus tries to recover when possible by retrying tasks, and when all else fails, provides a rescue workflow containing a description of only the work that remains to be done. It also enables users to move computations from one resource to another. Pegasus keeps track of what has been done (provenance) including the locations of data used and produced, and which software was used with which parameters.

  3. Debug failures in their computations using a set of system provided debugging tools and an online workflow monitoring dashboard.

This tutorial is intended for new users who want to get a quick overview of Pegasus concepts and usage via Jupyter.

For more information about Pegasus, please visit the Pegasus website: http://pegasus.isi.edu

Scientific Workflows

Scientific workflows allow users to easily express multi-step computational tasks, for example retrieve data from an instrument or a database, reformat the data, and run an analysis. A scientific workflow describes the dependencies between the tasks and in most cases the workflow is described as a directed acyclic graph (DAG), where the nodes are tasks and the edges denote the task dependencies. A defining property for a scientific workflow is that it manages data flow. The tasks in a scientific workflow can be everything from short serial tasks to very large parallel tasks (MPI for example) surrounded by a large number of small, serial tasks used for pre- and post-processing.

The Pegasus DAX3 and Jupyter Python API

The first step to enable Jupyter to use the Pegasus API is to import the Python Pegasus Jupyter API. The instance module will automatically load the Pegasus DAX3 API and the catalogs APIs.


In [ ]:
from Pegasus.jupyter.instance import *

By default, the API automatically creates a folder in the user's $HOME directory based on the workflow name. For the sake of completeness, in this tutorial we will pre-define a path where the workflow files should be created:


In [ ]:
workflow_dir = '/home/tutorial/wf-split-tutorial'

Creating the Workflow

Pegasus reads workflow descriptions from DAX files. The term "DAX" is short for "Directed Acyclic Graph in XML". DAX is an XML file format that has syntax for expressing jobs, arguments, files, and dependencies. We now will be creating a split workflow using the Pegasus provided DAX API.

In this diagram, the ovals represent computational jobs, the dog-eared squares are files, and the arrows are dependencies.

The code for creating the DAX object has 3 main sections:

  • A new ADAG object is created. This is the main object to which jobs and dependencies are added.

In [ ]:
# Create an abstract dag
dax = ADAG('split')
  • Jobs and files are added. The 5 jobs in the diagram above are added and 9 files are referenced. Arguments are defined using strings and File objects. The input and output files are defined for each job. This is an important step, as it allows Pegasus to track the files, and stage the data if necessary. Workflow outputs are tagged with "transfer=true".

In [ ]:
webpage = File('pegasus.html')
dax.addFile(webpage)

# the split job that splits the webpage into smaller chunks
split = Job('split')
split.addArguments('-l', '100', '-a', '1', webpage, 'part.')
split.uses(webpage, link=Link.INPUT)
dax.addJob(split)

# we do a parmeter sweep on the first 4 chunks created
for c in "abcd":
    part = File("part.%s" % c)
    split.uses(part, link=Link.OUTPUT, transfer=False, register=False)

    count = File("count.txt.%s" % c)

    wc = Job("wc")
    wc.addProfile( Profile("pegasus","label","p1"))
    wc.addArguments("-l",part)
    wc.setStdout(count)
    wc.uses(part, link=Link.INPUT)
    wc.uses(count, link=Link.OUTPUT, transfer=True, register=True)
    dax.addJob(wc)

    #adding dependency
    dax.depends(wc, split)
  • Dependencies are added. These are shown as arrows in the diagram above. They define the parent/child relationships between the jobs. When the workflow is executing, the order in which the jobs will be run is determined by the dependencies between them.

A complete specification of the DAX API is available here.

Information Catalogs

The workflow description (DAX) that you specify to Pegasus is portable, and usually does not contain any locations to physical input files, executables or cluster end points where jobs are executed. Pegasus uses three information catalogs during the planning process.

Creating the Replica Catalog

The Replica Catalog (RC) tells Pegasus where to find each of the input files for the workflow.

All files in a Pegasus workflow are referred to in the DAX using their Logical File Name (LFN). These LFNs are mapped to Physical File Names (PFNs) when Pegasus plans the workflow. This level of indirection enables Pegasus to map abstract DAXes to different execution sites and plan out the required file transfers automatically.

The replica catalog for our example workflow contains only one entry for the workflow’s only input file. This entry has an LFN of pegasus.html with a PFN of file:///home/tutorial/jupyter/pegasus.html and the file is stored on the local site, which implies that it will need to be transferred to the condorpool site when the workflow runs.


In [ ]:
rc = ReplicaCatalog(workflow_dir)
rc.add('pegasus.html', 'file:///home/tutorial/jupyter/pegasus.html', site='local')

Creating the Transformation Catalog

The Transformation Catalog (TC) describes all of the executables (called "transformations") used by the workflow. This description includes the site(s) where they are located, the architecture and operating system they are compiled for, and any other information required to properly transfer them to the execution site and run them.

The TC should contain information about two transformations: wc, and split. These transformations are referenced in the split DAX. The transformation catalog indicates that both transformations are installed on the condorpool site, and are compiled for x86_64 Linux.


In [ ]:
e_split = Executable('split', arch=Arch.X86_64, os=OSType.LINUX, installed=True)
e_split.addPFN(PFN('file:///usr/bin/split', 'condorpool'))

e_wc = Executable('wc', arch=Arch.X86_64, os=OSType.LINUX, installed=True)
e_wc.addPFN(PFN('file:///usr/bin/wc', 'condorpool'))

In [ ]:
tc = TransformationCatalog(workflow_dir)
tc.add(e_split)
tc.add(e_wc)

Creating the Sites Catalog

The site catalog describes the sites where the workflow jobs are to be executed. In this tutorial, we assume that you have a Personal Condor pool running on the same host Jupyter is installed.


In [ ]:
sc = SitesCatalog(workflow_dir)
sc.add_site('condorpool', arch=Arch.X86_64, os=OSType.LINUX)
sc.add_site_profile('condorpool', namespace=Namespace.PEGASUS, key='style', value='condor')
sc.add_site_profile('condorpool', namespace=Namespace.CONDOR, key='universe', value='vanilla')

Creating an Instance for Running the Workflow


In [ ]:
instance = Instance(dax, replica_catalog=rc, transformation_catalog=tc, sites_catalog=sc, workflow_dir=workflow_dir)

Running the Workflow


In [ ]:
instance.run(site='condorpool', force=True)

At this point, the workflow has been planned and started its execution in the above folder (named submit_dir).

The Pegasus mapper generates an executable workflow based on the abstract workflow. It finds the appropriate software, data, and computational resources required for workflow execution. It can also restructure the workflow to optimize performance and adds transformations for data management and provenance information generation.

Monitoring Workflow Executions

After the workflow has been submitted you can monitor it using the status() method. This method takes two arguments:

  • loop: whether the status command should be invoked once or continuously until the workflow is completed or a failure is detected.
  • delay: The delay (in seconds) the status will be refreshed. Default value is 10s.

In [ ]:
instance.status(loop=True, delay=5)

Congratulations! You have completed the tutorial.

Refer to the other chapters in the Pegasus documentation for more information about creating, planning, and executing workflows with Pegasus.

Please contact the Pegasus Users Mailing list at pegasus-users@isi.edu if you need help.


In [ ]: