This tutorial will take you through the steps of running simple workflows using Pegasus Workflow Management System. Pegasus allows scientists to:
Automate their scientific computational work, as portable workflows. Pegasus enables scientists to construct workflows in abstract terms without worrying about the details of the underlying execution environment or the particulars of the low-level specifications required by the middleware (Condor, Globus, or Amazon EC2). It automatically locates the necessary input data and computational resources necessary for workflow execution. It cleans up storage as the workflow is executed so that data-intensive workflows have enough space to execute on storage-constrained resources.
Recover from failures at runtime. When errors occur, Pegasus tries to recover when possible by retrying tasks, and when all else fails, provides a rescue workflow containing a description of only the work that remains to be done. It also enables users to move computations from one resource to another. Pegasus keeps track of what has been done (provenance) including the locations of data used and produced, and which software was used with which parameters.
Debug failures in their computations using a set of system provided debugging tools and an online workflow monitoring dashboard.
This tutorial is intended for new users who want to get a quick overview of Pegasus concepts and usage via Jupyter.
For more information about Pegasus, please visit the Pegasus website: http://pegasus.isi.edu
Scientific workflows allow users to easily express multi-step computational tasks, for example retrieve data from an instrument or a database, reformat the data, and run an analysis. A scientific workflow describes the dependencies between the tasks and in most cases the workflow is described as a directed acyclic graph (DAG), where the nodes are tasks and the edges denote the task dependencies. A defining property for a scientific workflow is that it manages data flow. The tasks in a scientific workflow can be everything from short serial tasks to very large parallel tasks (MPI for example) surrounded by a large number of small, serial tasks used for pre- and post-processing. For this tutorial we will be executing the simple workflow depicted below.
In this diagram, the ovals represent tasks and the rectangles represent input and output files. The workflow takes in as input pegasus.html
. split
is called on that file and chunks of the original file are output. For each chunk (four in this example), wc
is invoked and outputs a single file.
In [1]:
from Pegasus.api import *
The next step is to specify a "work" directory where workflow files will be created.
In [7]:
from pathlib import Path
WORK_DIR = Path(Path.cwd()) / "pegasus-tutorial"
Path.mkdir(WORK_DIR)
Next we need to create a Pegasus Workflow object. Using the diagram above as a reference, we add files and jobs to the workflow accordingly.
In [8]:
# create the workflow object with name `split`
wf = Workflow("split", infer_dependencies=True)
# define input/output files used by the split job
input_file = File("pegasus.html")
split_output_files = [File("part.{}".format(c)) for c in "abcd"]
# add the split job
wf.add_jobs(
Job("split")
.add_args("-l", "100", "-a", "1", input_file, "part.")
.add_inputs(input_file)
.add_outputs(*split_output_files, stage_out=False)
)
# add the wc jobs
for c, file in zip("abcd", split_output_files):
wf.add_jobs(
Job("wc")
.add_args("-l", file)
.add_inputs(file)
.set_stdout("count.txt.{}".format(c), stage_out=True)
)
stage_out=True
as these files will be transferred to a designated output directory.infer_dependencies=True
in the Workflow constructor, all dependencies will be inferred based on input and output files.The workflow description that you specify to Pegasus is portable, and usually does not contain any locations to physical input files, executables or cluster end points where jobs are executed. Pegasus uses three information catalogs, the site catalog, transformation catalog, and replica catalog, during the planning process. These catalogs decouple the portable aspects of the workflow from non-portable aspects (physical file names, executable install locations, etc.). In the following sections, we define these catalogs and briefly describe their usage.
In [9]:
SHARED_SCRATCH_DIR = str(WORK_DIR)
SHARED_STORAGE_DIR = str(WORK_DIR / "outputs")
SC_FILE_PATH = str(WORK_DIR / "SiteCatalog.yml")
sc = (
SiteCatalog()
.add_sites(
Site("local", arch=Arch.X86_64, os_type=OS.LINUX)
.add_directories(
Directory(Directory.SHAREDSCRATCH, SHARED_SCRATCH_DIR)
.add_file_servers(FileServer("file://" + SHARED_SCRATCH_DIR, Operation.ALL)),
Directory(Directory.SHAREDSTORAGE, SHARED_STORAGE_DIR)
.add_file_servers(FileServer("file://" + SHARED_STORAGE_DIR, Operation.ALL))
),
Site("condorpool", arch=Arch.X86_64, os_type=OS.LINUX)
.add_pegasus(style="condor")
.add_condor(universe="vanilla")
).write(SC_FILE_PATH)
)
All files in a Pegasus workflow are referred to in the workflow object using their Logical File Name (LFN). These LFNs are mapped to Physical File Names (PFNs) when Pegasus plans the workflow. This level of indirection enables Pegasus to map abstract workflows to different execution sites and plan out the required file transfers automatically.
The replica catalog (RC) for our example workflow contains only one entry for the workflow’s only input file. This entry has an LFN of pegasus.html
with a PFN of file:///home/tutorial/jupyter/pegasus.html
and the file is stored on the local
site, which implies that it will need to be transferred to the condorpool
site when the workflow runs.
In [13]:
RC_FILE_PATH = str(WORK_DIR / "ReplicaCatalog.yml")
rc = ReplicaCatalog()\
.add_replica(input_file, "file:/" + str(WORK_DIR) + "/pegasus.hmlt", "local")\
.write(RC_FILE_PATH)
The transformation catalog (TC) describes all of the executables (called "transformations") used by the workflow. This description includes the site(s) where they are located, the architecture and operating system they are compiled for, and any other information required to properly transfer them to the execution site and run them.
The TC should contain information about the two transformations used in the workflow above: split
and wc
. The TC indicates that both transformations are installed on the condorpool
site, and are compiled for x86_65
linux
.
In [12]:
TC_FILE_PATH = str(WORK_DIR / "TransformationCatalog.yml")
tc = TransformationCatalog()\
.add_transformations(
Transformation("split")
.add_site(TransformationSite("condorpool", "file:///usr/bin/split", False, arch=Arch.X86_64, os_type=OS.LINUX)),
Transformation("wc")
.add_site(TransformationSite("condorpool", "file:///usr/bin/wc", False, arch=Arch.X86_64, os_type=OS.LINUX))
).write(TC_FILE_PATH)
In [ ]:
CONF_FILE_PATH = str(WORK_DIR / "pegasus.conf")
conf = Properties()
conf["pegasus.catalog.site.file"] = SC_FILE_PATH
conf["pegasus.catalog.site"] = "YAML"
conf["pegasus.catalog.replica.file"] = RC_FILE_PATH
conf["pegasus.catalog.replica"] = "YAML"
conf["pegasus.catalog.transformation.file"] = TC_FILE_PATH
conf["pegasus.catalog.transformation"] = "YAML"
with open(CONF_FILE_PATH, "w") as f:
conf.write(f)
Up until this point we have defined the workflow, the site, replica, and transformation catalogs,
and finally a configuration file. By invoking plan
on the workflow object, the abstract workflow
will be planned by Pegasus and converted into an executable workflow that will be executed
on the condorpool
.
In [ ]:
wf.plan(
dir=WORK_DIR,
conf=CONF_FILE_PATH,
sites="condorpool",
output_site="local",
submit=True
)