Pegasus Workflow Management System

A Python Jupyter Tutorial

This tutorial will take you through the steps of running simple workflows using Pegasus Workflow Management System. Pegasus allows scientists to:

  1. Automate their scientific computational work, as portable workflows. Pegasus enables scientists to construct workflows in abstract terms without worrying about the details of the underlying execution environment or the particulars of the low-level specifications required by the middleware (Condor, Globus, or Amazon EC2). It automatically locates the necessary input data and computational resources necessary for workflow execution. It cleans up storage as the workflow is executed so that data-intensive workflows have enough space to execute on storage-constrained resources.

  2. Recover from failures at runtime. When errors occur, Pegasus tries to recover when possible by retrying tasks, and when all else fails, provides a rescue workflow containing a description of only the work that remains to be done. It also enables users to move computations from one resource to another. Pegasus keeps track of what has been done (provenance) including the locations of data used and produced, and which software was used with which parameters.

  3. Debug failures in their computations using a set of system provided debugging tools and an online workflow monitoring dashboard.

This tutorial is intended for new users who want to get a quick overview of Pegasus concepts and usage via Jupyter.

For more information about Pegasus, please visit the Pegasus website: http://pegasus.isi.edu

Scientific Workflows

Scientific workflows allow users to easily express multi-step computational tasks, for example retrieve data from an instrument or a database, reformat the data, and run an analysis. A scientific workflow describes the dependencies between the tasks and in most cases the workflow is described as a directed acyclic graph (DAG), where the nodes are tasks and the edges denote the task dependencies. A defining property for a scientific workflow is that it manages data flow. The tasks in a scientific workflow can be everything from short serial tasks to very large parallel tasks (MPI for example) surrounded by a large number of small, serial tasks used for pre- and post-processing. For this tutorial we will be executing the simple workflow depicted below.

In this diagram, the ovals represent tasks and the rectangles represent input and output files. The workflow takes in as input pegasus.html. split is called on that file and chunks of the original file are output. For each chunk (four in this example), wc is invoked and outputs a single file.

The Pegasus Python API

The first step is to import the Pegasus API package and its contents. This includes everything necessary to start running workflows.


In [1]:
from Pegasus.api import *

The next step is to specify a "work" directory where workflow files will be created.


In [7]:
from pathlib import Path

WORK_DIR = Path(Path.cwd()) / "pegasus-tutorial"
Path.mkdir(WORK_DIR)

Creating the Workflow

Next we need to create a Pegasus Workflow object. Using the diagram above as a reference, we add files and jobs to the workflow accordingly.


In [8]:
# create the workflow object with name `split`
wf = Workflow("split", infer_dependencies=True)

# define input/output files used by the split job
input_file = File("pegasus.html")
split_output_files = [File("part.{}".format(c)) for c in "abcd"]

# add the split job
wf.add_jobs(
    Job("split")
    .add_args("-l", "100", "-a", "1", input_file, "part.")
    .add_inputs(input_file)
    .add_outputs(*split_output_files, stage_out=False)
)

# add the wc jobs
for c, file in zip("abcd", split_output_files):
    wf.add_jobs(
        Job("wc")
        .add_args("-l", file)
        .add_inputs(file)
        .set_stdout("count.txt.{}".format(c), stage_out=True)
    )
  • Workflow outputs are tagged with stage_out=True as these files will be transferred to a designated output directory.
  • You can manually add dependencies between jobs, however by setting infer_dependencies=True in the Workflow constructor, all dependencies will be inferred based on input and output files.
  • Most methods can be chained together. A complete specification of the API is available here

Information Catalogs

The workflow description that you specify to Pegasus is portable, and usually does not contain any locations to physical input files, executables or cluster end points where jobs are executed. Pegasus uses three information catalogs, the site catalog, transformation catalog, and replica catalog, during the planning process. These catalogs decouple the portable aspects of the workflow from non-portable aspects (physical file names, executable install locations, etc.). In the following sections, we define these catalogs and briefly describe their usage.

Creating the Site Catalog

The site catalog (SC) describes the sites where the workflow jobs are to be executed. In this tutorial we assume that you have a personal condoor pool, which we will refer to as condorpool, running on the same host Jupyter is installed.


In [9]:
SHARED_SCRATCH_DIR = str(WORK_DIR)
SHARED_STORAGE_DIR = str(WORK_DIR / "outputs")
SC_FILE_PATH = str(WORK_DIR / "SiteCatalog.yml")
sc = (
    SiteCatalog()
    .add_sites(
        Site("local", arch=Arch.X86_64, os_type=OS.LINUX)
            .add_directories(
                Directory(Directory.SHAREDSCRATCH, SHARED_SCRATCH_DIR)
                    .add_file_servers(FileServer("file://" + SHARED_SCRATCH_DIR, Operation.ALL)),

                Directory(Directory.SHAREDSTORAGE, SHARED_STORAGE_DIR)
                    .add_file_servers(FileServer("file://" + SHARED_STORAGE_DIR, Operation.ALL))
            ),

        Site("condorpool", arch=Arch.X86_64, os_type=OS.LINUX)
            .add_pegasus(style="condor")
            .add_condor(universe="vanilla")
    ).write(SC_FILE_PATH)
)

Creating the Replica Catalog

All files in a Pegasus workflow are referred to in the workflow object using their Logical File Name (LFN). These LFNs are mapped to Physical File Names (PFNs) when Pegasus plans the workflow. This level of indirection enables Pegasus to map abstract workflows to different execution sites and plan out the required file transfers automatically.

The replica catalog (RC) for our example workflow contains only one entry for the workflow’s only input file. This entry has an LFN of pegasus.html with a PFN of file:///home/tutorial/jupyter/pegasus.html and the file is stored on the local site, which implies that it will need to be transferred to the condorpool site when the workflow runs.


In [13]:
RC_FILE_PATH = str(WORK_DIR / "ReplicaCatalog.yml")
rc = ReplicaCatalog()\
        .add_replica(input_file, "file:/" + str(WORK_DIR) + "/pegasus.hmlt", "local")\
        .write(RC_FILE_PATH)

Creating the Transformation Catalog

The transformation catalog (TC) describes all of the executables (called "transformations") used by the workflow. This description includes the site(s) where they are located, the architecture and operating system they are compiled for, and any other information required to properly transfer them to the execution site and run them.

The TC should contain information about the two transformations used in the workflow above: split and wc. The TC indicates that both transformations are installed on the condorpool site, and are compiled for x86_65 linux.


In [12]:
TC_FILE_PATH = str(WORK_DIR / "TransformationCatalog.yml")
tc = TransformationCatalog()\
        .add_transformations(
            Transformation("split")
                .add_site(TransformationSite("condorpool", "file:///usr/bin/split", False, arch=Arch.X86_64, os_type=OS.LINUX)),
            
            Transformation("wc")
                .add_site(TransformationSite("condorpool", "file:///usr/bin/wc", False, arch=Arch.X86_64, os_type=OS.LINUX))
        ).write(TC_FILE_PATH)

Configuration

Before running our split workflow, a few configuration properties must be set.


In [ ]:
CONF_FILE_PATH = str(WORK_DIR / "pegasus.conf")
conf = Properties()
conf["pegasus.catalog.site.file"] = SC_FILE_PATH
conf["pegasus.catalog.site"] = "YAML"
conf["pegasus.catalog.replica.file"] = RC_FILE_PATH
conf["pegasus.catalog.replica"] = "YAML"
conf["pegasus.catalog.transformation.file"] = TC_FILE_PATH
conf["pegasus.catalog.transformation"] = "YAML"
with open(CONF_FILE_PATH, "w") as f:
    conf.write(f)

Running the Workflow

Up until this point we have defined the workflow, the site, replica, and transformation catalogs, and finally a configuration file. By invoking plan on the workflow object, the abstract workflow will be planned by Pegasus and converted into an executable workflow that will be executed on the condorpool.


In [ ]:
wf.plan(
    dir=WORK_DIR,
    conf=CONF_FILE_PATH, 
    sites="condorpool",
    output_site="local",
    submit=True
)