An HTCondor pool provides a way for you (as a user) to submit units of work, called jobs, to be executed on a distributed network of computing resources. HTCondor provides tools to monitor your jobs as they run, and make certain kinds of changes to them after submission, which we call "managing" jobs.
In this tutorial, we will learn how to submit and manage jobs from Python.
We will see how to submit jobs with various toy executables, how to ask HTCondor for information about them, and how to tell HTCondor to do things with them.
All of these things are possible from the command line as well, using tools like condor_submit
, condor_qedit
, and condor_hold
.
However, working from Python instead of the command line gives us access to the full power of Python to do things like generate jobs programmatically based on user input, pass information consistently from submission to management, or even expose an HTCondor pool to a web application.
We start by importing the HTCondor Python bindings modules, which provide the functions we will need to talk to HTCondor.
In [1]:
import htcondor # for submitting jobs, querying HTCondor daemons, etc.
import classad # for interacting with ClassAds, HTCondor's internal data format
To submit a job, we must first describe it.
A submit description is held in a Submit
object.
Submit
objects consist of key-value pairs, and generally behave like Python dictionaries.
If you're familiar with HTCondor's submit file syntax, you should think of each line in the submit file as a single key-value pair in the Submit
object.
Let's start by writing a Submit
object that describes a job that executes the hostname
command on an execute node, which prints out the "name" of the node.
Since hostname
prints its results to standard output (stdout), we will capture stdout and bring it back to the submit machine so we can see the name.
In [2]:
hostname_job = htcondor.Submit({
"executable": "/bin/hostname", # the program to run on the execute node
"output": "hostname.out", # anything the job prints to standard output will end up in this file
"error": "hostname.err", # anything the job prints to standard error will end up in this file
"log": "hostname.log", # this file will contain a record of what happened to the job
"request_cpus": "1", # how many CPU cores we want
"request_memory": "128MB", # how much memory we want
"request_disk": "128MB", # how much disk space we want
})
print(hostname_job)
The available descriptors are documented in the condor_submit
manual.
The keys of the Python dictionary you pass to htcondor.Submit
should be the same as for the submit descriptors, and the values should be strings containing exactly what would go on the right-hand side.
Note that we gave the Submit
object several relative filepaths.
These paths are relative to the directory containing this Jupyter notebook (or, more generally, the current working directory).
When we run the job, you should see those files appear in the file browser on the left as HTCondor creates them.
Now that we have a description, let's submit a job.
To do so, we must ask the HTCondor scheduler to open a transaction.
Once we have the transaction, we can "queue" (i.e., submit) a job via the Submit
object.
In [3]:
schedd = htcondor.Schedd() # get the Python representation of the scheduler
with schedd.transaction() as txn: # open a transaction, represented by `txn`
cluster_id = hostname_job.queue(txn) # queues one job in the current transaction; returns job's ClusterId
print(cluster_id)
The integer returned by the queue
method is the ClusterId
for the submission.
It uniquely identifies this submission.
Later in this module, we will use it to ask the HTCondor scheduler for information about our jobs.
It isn't important to understand the transaction mechanics for now; think of it as boilerplate. (There are advanced use cases where it might be useful.)
For now, our job will hopefully have finished running. You should be able to see the files in the file browser on the left. Try opening one of them and seeing what's inside.
We can also look at the output from inside Python:
In [4]:
import os
import time
output_path = "hostname.out"
# this is a crude way to wait for the job to finish
# see the Advanced tutorial "Scalable Job Tracking" for better methods!
while not os.path.exists(output_path):
print("Output file doesn't exist yet; sleeping for one second")
time.sleep(1)
with open(output_path, mode = "r") as f:
print(f.read())
If you got some text, it worked!
If the file never shows up, it means your job didn't run.
You might try looking at the log
or error
files specified in the submit description to see if there is any useful information in them about why the job failed.
By default, each queue
will submit a single job.
A more common use case is to submit many jobs at once, often sharing some base submit description.
Let's write a new submit description which runs sleep
.
When we have multiple jobs in a single cluster, each job will be identified not just by its ClusterId but also by a ProcID.
We can use the ProcID to separate the output and error files for each individual job.
Anything that looks like $(...)
in a submit description is a macro, a placeholder which will be "expanded" later by HTCondor into a real value for that particular job.
The ProcID expands to a series of incrementing integers, starting at 0.
So the first job in a cluster will have ProcID 0, the next will have ProcID 1, etc.
In [5]:
sleep_job = htcondor.Submit({
"executable": "/bin/sleep",
"arguments": "10s", # sleep for 10 seconds
"output": "sleep-$(ProcId).out", # output and error for each job, using the $(ProcId) macro
"error": "sleep-$(ProcId).err",
"log": "sleep.log", # we still send all of the HTCondor logs for every job to the same file (not split up!)
"request_cpus": "1",
"request_memory": "128MB",
"request_disk": "128MB",
})
print(sleep_job)
We will submit 10 of these jobs.
All we need to change from our previous queue
call is to add the count
keyword argument.
In [6]:
schedd = htcondor.Schedd()
with schedd.transaction() as txn:
cluster_id = sleep_job.queue(txn, count=10) # submit 10 jobs
print(cluster_id)
Now that we have a bunch of jobs in flight, we might want to check how they're doing.
We can ask the HTCondor scheduler about jobs by using its query
method.
We give it a constraint, which tells it which jobs to look for, and a projection, which tells it what information to return.
In [7]:
schedd.query(
constraint=f"ClusterId == {cluster_id}",
projection=["ClusterId", "ProcId", "Out"],
)
Out[7]:
There are a few things to notice here:
history
method instead, which behaves like query
, but only looks at jobs that have left the queue.ClassAd
. ClassAds are the common data exchange format used by HTCondor. In Python, they mostly behave like dictionaries.By varying some part of the submit description using the ProcID, we can change how each individual job behaves. Perhaps it will use a different input file, or a different argument. However, we often want more flexibility than that. Perhaps our input files are named after different cities, or by timestamp, or some other naming scheme that already exists.
To use such information in the submit description, we need to use itemdata. Itemdata lets us pass arbitrary extra information when we queue, which we can reference with macros inside the submit description. This lets use the full power of Python to generate the submit descriptions for our jobs.
Let's mock this situation out by generating some files with randomly-chosen names.
We'll also switch to using pathlib.Path
, Python's more modern file path manipulation library.
In [8]:
from pathlib import Path
import random
import string
import shutil
def random_string(length):
"""Produce a random lowercase ASCII string with the given length."""
return "".join(random.choices(string.ascii_lowercase, k = length))
# make a directory to hold the input files, clearing away any existing directory
input_dir = Path.cwd() / "inputs"
shutil.rmtree(input_dir, ignore_errors = True)
input_dir.mkdir()
# make 5 input files
for idx in range(5):
rs = random_string(5)
input_file = input_dir / "{}.txt".format(rs)
input_file.write_text("Hello from job {}".format(rs))
Now we'll get a list of all the files we just created in the input directory. This is precisely the kind of situation where Python affords us a great deal of flexibility over a submit file: we can use Python instead of the HTCondor submit language to generate and inspect the information we're going to put into the submit description.
In [9]:
input_files = list(input_dir.glob("*.txt"))
for path in input_files:
print(path)
Now we'll make our submit description.
Our goal is just to print out the text held in each file, which we can do using cat
.
We will tell HTCondor to transfer the input file to the execute location by including it in transfer_input_files
.
We also need to call cat
on the right file via arguments
.
Keep in mind that HTCondor will move the files in transfer_input_files
directly to the scratch directory on the execute machine, so instead of the full path, we just need the file's "name", the last component of its path.
pathlib
will make it easy to extract this information.
In [10]:
cat_job = htcondor.Submit({
"executable": "/bin/cat",
"arguments": "$(input_file_name)", # we will pass in the value for this macro via itemdata
"transfer_input_files": "$(input_file)", # we also need HTCondor to move the file to the execute node
"should_transfer_files": "yes", # force HTCondor to transfer files even though we're running entirely inside a container (and it normally wouldn't need to)
"output": "cat-$(ProcId).out",
"error": "cat-$(ProcId).err",
"log": "cat.log",
"request_cpus": "1",
"request_memory": "128MB",
"request_disk": "128MB",
})
print(cat_job)
The itemdata should be passed as a list of dictionaries, where the keys are the macro names to replace in the submit description.
In our case, the keys are input_file
and input_file_name
, so should have a list of 10 dictionaries, each with two entries.
HTCondor expects the input file list to be a comma-separated list of POSIX-style paths, so we explicitly convert our Path
to a POSIX string.
In [11]:
itemdata = [{"input_file": path.as_posix(), "input_file_name": path.name} for path in input_files]
for item in itemdata:
print(item)
Now we'll submit the jobs, using queue_with_itemdata
instead of queue
:
In [12]:
schedd = htcondor.Schedd()
with schedd.transaction() as txn:
submit_result = cat_job.queue_with_itemdata(txn, itemdata = iter(itemdata)) # submit one job for each item in the itemdata
print(submit_result.cluster())
Note that queue_with_itemdata
returns a "submit result", not just the ClusterId.
The ClusterId can be retreived from the submit result with its cluster()
method.
Let's do a query to make sure we got the itemdata right (these jobs run fast, so you might need to re-run the jobs if your first run has already left the queue):
In [13]:
schedd.query(
constraint=f"ClusterId == {submit_result.cluster()}",
projection=["ClusterId", "ProcId", "Out", "Args", "TransferInput"],
)
Out[13]:
And let's take a look at all the output:
In [14]:
# again, this is very crude - see the advanced tutorials!
while not len(list(Path.cwd().glob("cat-*.out"))) == len(itemdata):
print("Not all output files exist yet; sleeping for one second")
time.sleep(1)
for output_file in Path.cwd().glob("cat-*.out"):
print(output_file, "->", output_file.read_text())
Once a job is in queue, the scheduler will try its best to execute it to completion. There are several cases where you may want to interrupt the normal flow of jobs. Perhaps the results are no longer needed; perhaps the job needs to be edited to correct a submission error. These actions fall under the purview of job management.
There are two Schedd
methods dedicated to job management:
edit()
: Change an attribute for a set of jobs.act()
: Change the state of a job (remove it from the queue, hold it, suspend it, etc.).The act
method takes an argument from the JobAction
enum.
Commonly-used values include:
Hold
: put a job on hold, vacating a running job if necessary. A job will stay in the hold
state until told otherwise.Release
: Release a job from the hold state, returning it to Idle.Remove
: Remove a job from the queue. If it is running, it will stop running.
This requires the execute node to acknowledge it has successfully vacated the job, so Remove
may
not be instantaneous.Vacate
: Cause a running job to be killed on the remote resource and return to the Idle state. With
Vacate
, jobs may be given significant time to cleanly shut down.To play with this, let's bring back our sleep submit description, but increase the sleep time significantly so that we have time to interact with the jobs.
In [15]:
long_sleep_job = htcondor.Submit({
"executable": "/bin/sleep",
"arguments": "10m", # sleep for 10 minutes
"output": "sleep-$(ProcId).out",
"error": "sleep-$(ProcId).err",
"log": "sleep.log",
"request_cpus": "1",
"request_memory": "128MB",
"request_disk": "128MB",
})
print(long_sleep_job)
In [16]:
schedd = htcondor.Schedd()
with schedd.transaction() as txn:
cluster_id = long_sleep_job.queue(txn, 5)
As an experiment, let's set an arbitrary attribute on the jobs and check that it worked.
When we're really working, we could do things like change the amount of memory a job has requested by editing its RequestMemory
attribute.
The job attributes that are built-in to HTCondor are described here, but your site may specify additional, custom attributes as well.
In [17]:
# sets attribute foo to the string "bar" for all of our jobs
# note the nested quotes around bar! The outer "" make it a Python string; the inner "" make it a ClassAd string.
schedd.edit(f"ClusterId == {cluster_id}", "foo", "\"bar\"")
# do a query to check the value of attribute foo
schedd.query(
constraint=f"ClusterId == {cluster_id}",
projection=["ClusterId", "ProcId", "JobStatus", "foo"],
)
Out[17]:
Although the job status appears to be an attribute, we cannot edit
it directly.
As mentioned above, we must instead act
on the job.
Let's hold the first two jobs so that they stop running, but leave the others going.
In [18]:
# hold the first two jobs
schedd.act(htcondor.JobAction.Hold, f"ClusterId == {cluster_id} && ProcID <= 1")
# check the status of the jobs
ads = schedd.query(
constraint=f"ClusterId == {cluster_id}",
projection=["ClusterId", "ProcId", "JobStatus"],
)
for ad in ads:
# the ClassAd objects returned by the query act like dictionaries, so we can extract individual values out of them using []
print(f"ProcID = {ad['ProcID']} has JobStatus = {ad['JobStatus']}")
The various job statuses are represented by numbers. 1
means Idle
, 2
means Running
, and 5
means Held
. If you see JobStatus = 5
above for ProcID = 0
and ProcID = 1
, then we succeeded!
The opposite of JobAction.Hold
is JobAction.Release
.
Let's release those jobs and let them go back to Idle
.
In [19]:
schedd.act(htcondor.JobAction.Release, f"ClusterId == {cluster_id}")
ads = schedd.query(
constraint=f"ClusterId == {cluster_id}",
projection=["ClusterId", "ProcId", "JobStatus"],
)
for ad in ads:
# the ClassAd objects returned by the query act like dictionaries, so we can extract individual values out of them using []
print(f"ProcID = {ad['ProcID']} has JobStatus = {ad['JobStatus']}")
Note that we simply released all the jobs in the cluster. Releasing a job that is not held doesn't do anything, so we don't have to be extremely careful.
Finally, let's clean up after ourselves:
In [20]:
schedd.act(htcondor.JobAction.Remove, f"ClusterId == {cluster_id}")
Out[20]:
Now let's practice what we've learned.
In [21]:
# MODIFY OR ADD TO THIS BLOCK...
incrementing_sleep = htcondor.Submit({
"executable": "/bin/sleep",
"arguments": "1",
"output": "ex1-$(ProcId).out",
"error": "ex1-$(ProcId).err",
"log": "ex1.log",
"request_cpus": "1",
"request_memory": "128MB",
"request_disk": "128MB",
})
schedd = htcondor.Schedd()
with schedd.transaction() as txn:
cluster_id = incrementing_sleep.queue(txn, 5)
In [22]:
# ... TO MAKE THIS TEST PASS
expected = [str(i) for i in range(5, 10)]
print("Expected ", expected)
ads = schedd.query(f"ClusterId == {cluster_id}", projection = ["Args"])
arguments = sorted(ad["Args"] for ad in ads)
print("Got ", arguments)
assert arguments == expected, "Arguments were not what we expected!"
print("The test passed. Good job!")
In [23]:
# MODIFY OR ADD TO THIS BLOCK...
echo = htcondor.Submit({
"request_cpus": "1",
"request_memory": "128MB",
"request_disk": "128MB",
})
schedd = htcondor.Schedd()
with schedd.transaction() as txn:
cluster_id = echo.queue(txn, 1)
In [24]:
# ... TO MAKE THIS TEST PASS
does_file_exist = os.path.exists("ex3.txt")
assert does_file_exist, "ex3.txt does not exist!"
expected = "Echo to Target"
print("Expected ", expected)
contents = open("ex3.txt", mode = "r").read().strip()
print("Got ", contents)
assert expected in contents, "Contents were not what we expected!"
print("The test passed. Good job!")
In [25]:
# MODIFY OR ADD TO THIS BLOCK...
long_sleep = htcondor.Submit({
"executable": "/bin/sleep",
"arguments": "10m",
"output": "ex2-$(ProcId).out",
"error": "ex2-$(ProcId).err",
"log": "ex2.log",
"request_cpus": "1",
"request_memory": "128MB",
"request_disk": "128MB",
})
schedd = htcondor.Schedd()
with schedd.transaction() as txn:
cluster_id = long_sleep.queue(txn, 100)
In [26]:
# ... TO MAKE THIS TEST PASS
import getpass
try:
ads = schedd.query(f"ClusterId == {cluster_id}", projection = ["ProcID", "JobStatus"])
proc_to_status = {int(ad["ProcID"]): ad["JobStatus"] for ad in sorted(ads, key = lambda ad: ad["ProcID"])}
for proc, status in proc_to_status.items():
print("Proc {} has status {}".format(proc, status))
assert len(proc_to_status) == 100, "Wrong number of jobs (perhaps you need to resubmit them?)."
assert all(status == 5 for proc, status in proc_to_status.items() if proc % 2 != 0), "Not all odd jobs were held."
assert all(status != 5 for proc, status in proc_to_status.items() if proc % 2 == 0), "An even job was held."
print("The test passed. Good job!")
finally:
schedd.act(htcondor.JobAction.Remove, f'Owner=="{getpass.getuser()}"')