TOIL(1) | Toil | TOIL(1) |
toil - Toil Documentation
Toil is an open-source pure-Python workflow engine that lets people write better pipelines.
Check out our website for a comprehensive list of Toil's features and read our paper to learn what Toil can do in the real world. Please subscribe to our low-volume announce mailing list and feel free to also join us on GitHub and Gitter.
If using Toil for your research, please cite
A Toil workflow can be run with just two steps:
from toil.common import Toil from toil.job import Job def helloWorld(message, memory="1G", cores=1, disk="1G"):
return "Hello, world!, here's a message: %s" % message if __name__ == "__main__":
parser = Job.Runner.getDefaultArgumentParser()
options = parser.parse_args()
options.clean = "always"
with Toil(options) as toil:
output = toil.start(Job.wrapFn(helloWorld, "You did it!"))
print(output)
python helloWorld.py file:my-job-store
Congratulations! You've run your first Toil workflow using the default Batch System, singleMachine, using the file job store.
Toil uses batch systems to manage the jobs it creates.
The singleMachine batch system is primarily used to prepare and debug workflows on a local machine. Once validated, try running them on a full-fledged batch system (see batchsysteminterface). Toil supports many different batch systems such as Apache Mesos and Grid Engine; its versatility makes it easy to run your workflow in all kinds of places.
Toil is totally customizable! Run python helloWorld.py --help to see a complete list of available options.
For something beyond a "Hello, world!" example, refer to A (more) real-world example.
The Common Workflow Language (CWL) is an emerging standard for writing workflows that are portable across multiple workflow engines and platforms. Running CWL workflows using Toil is easy.
cwlVersion: v1.0 class: CommandLineTool baseCommand: echo stdout: output.txt inputs:
message:
type: string
inputBinding:
position: 1 outputs:
output:
type: stdout
and this code into example-job.yaml:
message: Hello world!
$ toil-cwl-runner example.cwl example-job.yaml
Your output will be in output.txt:
$ cat output.txt Hello world!
To learn more about CWL, see the CWL User Guide (from where this example was shamelessly borrowed).
To run this workflow on an AWS cluster have a look at Running a CWL Workflow on AWS.
For information on using CWL with Toil see the section cwl
The Workflow Description Language (WDL) is another emerging language for writing workflows that are portable across multiple workflow engines and platforms. Running WDL workflows using Toil is still in alpha, and currently experimental. Toil currently supports basic workflow syntax (see wdl for more details and examples). Here we go over running a basic WDL helloworld workflow.
workflow write_simple_file {
call write_file
}
task write_file {
String message
command { echo ${message} > wdl-helloworld-output.txt }
output { File test = "wdl-helloworld-output.txt" }
} and this code into ``wdl-helloworld.json``::
{
"write_simple_file.write_file.message": "Hello world!"
}
$ toil-wdl-runner wdl-helloworld.wdl wdl-helloworld.json
Your output will be in wdl-helloworld-output.txt:
$ cat wdl-helloworld-output.txt Hello world!
To learn more about WDL, see the main WDL website .
For a more detailed example and explanation, we've developed a sample pipeline that merge-sorts a temporary file. This is not supposed to be an efficient sorting program, rather a more fully worked example of what Toil is capable of.
$ python sort.py file:jobStore
The workflow created a file called sortedFile.txt in your current directory. Have a look at it and notice that it contains a whole lot of sorted lines!
This workflow does a smart merge sort on a file it generates, fileToSort.txt. The sort is smart because each step of the process---splitting the file into separate chunks, sorting these chunks, and merging them back together---is compartmentalized into a job. Each job can specify its own resource requirements and will only be run after the jobs it depends upon have run. Jobs without dependencies will be run in parallel.
NOTE:
$ python sort.py file:jobStore \
--numLines=5000 \
--lineLength=10 \
--overwriteOutput=True \
--workDir=/tmp/
Here we see that we can add our own options to a Toil script. As noted above, the first two options, --numLines and --lineLength, determine the number of lines and how many characters are in each line. --overwriteOutput causes the current contents of sortedFile.txt to be overwritten, if it already exists. The last option, --workDir, is an option built into Toil to specify where temporary files unique to a job are kept.
To understand the details of what's going on inside. Let's start with the main() function. It looks like a lot of code, but don't worry---we'll break it down piece by piece.
def main(options=None):
if not options:
# deal with command line arguments
parser = ArgumentParser()
Job.Runner.addToilOptions(parser)
parser.add_argument('--numLines', default=defaultLines, help='Number of lines in file to sort.', type=int)
parser.add_argument('--lineLength', default=defaultLineLen, help='Length of lines in file to sort.', type=int)
parser.add_argument("--fileToSort", help="The file you wish to sort")
parser.add_argument("--outputFile", help="Where the sorted output will go")
parser.add_argument("--overwriteOutput", help="Write over the output file if it already exists.", default=True)
parser.add_argument("--N", dest="N",
help="The threshold below which a serial sort function is used to sort file. "
"All lines must of length less than or equal to N or program will fail",
default=10000)
parser.add_argument('--downCheckpoints', action='store_true',
help='If this option is set, the workflow will make checkpoints on its way through'
'the recursive "down" part of the sort')
parser.add_argument("--sortMemory", dest="sortMemory",
help="Memory for jobs that sort chunks of the file.",
default=None)
parser.add_argument("--mergeMemory", dest="mergeMemory",
help="Memory for jobs that collate results.",
default=None)
options = parser.parse_args()
if not hasattr(options, "sortMemory") or not options.sortMemory:
options.sortMemory = sortMemory
if not hasattr(options, "mergeMemory") or not options.mergeMemory:
options.mergeMemory = sortMemory
# do some input verification
sortedFileName = options.outputFile or "sortedFile.txt"
if not options.overwriteOutput and os.path.exists(sortedFileName):
print(f'Output file {sortedFileName} already exists. '
f'Delete it to run the sort example again or use --overwriteOutput=True')
exit()
fileName = options.fileToSort
if options.fileToSort is None:
# make the file ourselves
fileName = 'fileToSort.txt'
if os.path.exists(fileName):
print(f'Sorting existing file: {fileName}')
else:
print(f'No sort file specified. Generating one automatically called: {fileName}.')
makeFileToSort(fileName=fileName, lines=options.numLines, lineLen=options.lineLength)
else:
if not os.path.exists(options.fileToSort):
raise RuntimeError("File to sort does not exist: %s" % options.fileToSort)
if int(options.N) <= 0:
raise RuntimeError("Invalid value of N: %s" % options.N)
# Now we are ready to run
with Toil(options) as workflow:
sortedFileURL = 'file://' + os.path.abspath(sortedFileName)
if not workflow.options.restart:
sortFileURL = 'file://' + os.path.abspath(fileName)
sortFileID = workflow.importFile(sortFileURL)
sortedFileID = workflow.start(Job.wrapJobFn(setup,
sortFileID,
int(options.N),
options.downCheckpoints,
options=options,
memory=sortMemory))
else:
sortedFileID = workflow.restart()
workflow.exportFile(sortedFileID, sortedFileURL)
First we make a parser to process command line arguments using the argparse module. It's important that we add the call to Job.Runner.addToilOptions() to initialize our parser with all of Toil's default options. Then we add the command line arguments unique to this workflow, and parse the input. The help message listed with the arguments should give you a pretty good idea of what they can do.
Next we do a little bit of verification of the input arguments. The option --fileToSort allows you to specify a file that needs to be sorted. If this option isn't given, it's here that we make our own file with the call to makeFileToSort().
Finally we come to the context manager that initializes the workflow. We create a path to the input file prepended with 'file://' as per the documentation for toil.common.Toil() when staging a file that is stored locally. Notice that we have to check whether or not the workflow is restarting so that we don't import the file more than once. Finally we can kick off the workflow by calling toil.common.Toil.start() on the job setup. When the workflow ends we capture its output (the sorted file's fileID) and use that in toil.common.Toil.exportFile() to move the sorted file from the job store back into "userland".
Next let's look at the job that begins the actual workflow, setup.
def setup(job, inputFile, N, downCheckpoints, options):
"""
Sets up the sort.
Returns the FileID of the sorted file
"""
RealtimeLogger.info("Starting the merge sort")
return job.addChildJobFn(down,
inputFile, N, 'root',
downCheckpoints,
options = options,
preemptable=True,
memory=sortMemory).rv()
setup really only does two things. First it writes to the logs using Job.log() and then calls addChildJobFn(). Child jobs run directly after the current job. This function turns the 'job function' down into an actual job and passes in the inputs including an optional resource requirement, memory. The job doesn't actually get run until the call to Job.rv(). Once the job down finishes, its output is returned here.
Now we can look at what down does.
def down(job, inputFileStoreID, N, path, downCheckpoints, options, memory=sortMemory):
"""
Input is a file, a subdivision size N, and a path in the hierarchy of jobs.
If the range is larger than a threshold N the range is divided recursively and
a follow on job is then created which merges back the results else
the file is sorted and placed in the output.
"""
RealtimeLogger.info("Down job starting: %s" % path)
# Read the file
inputFile = job.fileStore.readGlobalFile(inputFileStoreID, cache=False)
length = os.path.getsize(inputFile)
if length > N:
# We will subdivide the file
RealtimeLogger.critical("Splitting file: %s of size: %s"
% (inputFileStoreID, length))
# Split the file into two copies
midPoint = getMidPoint(inputFile, 0, length)
t1 = job.fileStore.getLocalTempFile()
with open(t1, 'w') as fH:
fH.write(copySubRangeOfFile(inputFile, 0, midPoint+1))
t2 = job.fileStore.getLocalTempFile()
with open(t2, 'w') as fH:
fH.write(copySubRangeOfFile(inputFile, midPoint+1, length))
# Call down recursively. By giving the rv() of the two jobs as inputs to the follow-on job, up,
# we communicate the dependency without hindering concurrency.
result = job.addFollowOnJobFn(up,
job.addChildJobFn(down, job.fileStore.writeGlobalFile(t1), N, path + '/0',
downCheckpoints, checkpoint=downCheckpoints, options=options,
preemptable=True, memory=options.sortMemory).rv(),
job.addChildJobFn(down, job.fileStore.writeGlobalFile(t2), N, path + '/1',
downCheckpoints, checkpoint=downCheckpoints, options=options,
preemptable=True, memory=options.mergeMemory).rv(),
path + '/up', preemptable=True, options=options, memory=options.sortMemory).rv()
else:
# We can sort this bit of the file
RealtimeLogger.critical("Sorting file: %s of size: %s"
% (inputFileStoreID, length))
# Sort the copy and write back to the fileStore
shutil.copyfile(inputFile, inputFile + '.sort')
sort(inputFile + '.sort')
result = job.fileStore.writeGlobalFile(inputFile + '.sort')
RealtimeLogger.info("Down job finished: %s" % path)
return result
Down is the recursive part of the workflow. First we read the file into the local filestore by calling job.fileStore.readGlobalFile(). This puts a copy of the file in the temp directory for this particular job. This storage will disappear once this job ends. For a detailed explanation of the filestore, job store, and their interfaces have a look at managingFiles.
Next down checks the base case of the recursion: is the length of the input file less than N (remember N was an option we added to the workflow in main)? In the base case, we just sort the file, and return the file ID of this new sorted file.
If the base case fails, then the file is split into two new tempFiles using job.fileStore.getLocalTempFile() and the helper function copySubRangeOfFile. Finally we add a follow on Job up with job.addFollowOnJobFn(). We've already seen child jobs. A follow-on Job is a job that runs after the current job and all of its children (and their children and follow-ons) have completed. Using a follow-on makes sense because up is responsible for merging the files together and we don't want to merge the files together until we know they are sorted. Again, the return value of the follow-on job is requested using Job.rv().
Looking at up
def up(job, inputFileID1, inputFileID2, path, options, memory=sortMemory):
"""
Merges the two files and places them in the output.
"""
RealtimeLogger.info("Up job starting: %s" % path)
with job.fileStore.writeGlobalFileStream() as (fileHandle, outputFileStoreID):
fileHandle = codecs.getwriter('utf-8')(fileHandle)
with job.fileStore.readGlobalFileStream(inputFileID1) as inputFileHandle1:
inputFileHandle1 = codecs.getreader('utf-8')(inputFileHandle1)
with job.fileStore.readGlobalFileStream(inputFileID2) as inputFileHandle2:
inputFileHandle2 = codecs.getreader('utf-8')(inputFileHandle2)
RealtimeLogger.info("Merging %s and %s to %s"
% (inputFileID1, inputFileID2, outputFileStoreID))
merge(inputFileHandle1, inputFileHandle2, fileHandle)
# Cleanup up the input files - these deletes will occur after the completion is successful.
job.fileStore.deleteGlobalFile(inputFileID1)
job.fileStore.deleteGlobalFile(inputFileID2)
RealtimeLogger.info("Up job finished: %s" % path)
return outputFileStoreID
we see that the two input files are merged together and the output is written to a new file using job.fileStore.writeGlobalFileStream(). After a little cleanup, the output file is returned.
Once the final up finishes and all of the rv() promises are fulfilled, main receives the sorted file's ID which it uses in exportFile to send it to the user.
There are other things in this example that we didn't go over such as checkpoints and the details of much of the api.
At the end of the script the lines
if __name__ == '__main__'
main()
are included to ensure that the main function is only run once in the '__main__' process invoked by you, the user. In Toil terms, by invoking the script you created the leader process in which the main() function is run. A worker process is a separate process whose sole purpose is to host the execution of one or more jobs defined in that script. In any Toil workflow there is always one leader process, and potentially many worker processes.
When using the single-machine batch system (the default), the worker processes will be running on the same machine as the leader process. With full-fledged batch systems like Mesos the worker processes will typically be started on separate machines. The boilerplate ensures that the pipeline is only started once---on the leader---but not when its job functions are imported and executed on the individual workers.
Typing python sort.py --help will show the complete list of arguments for the workflow which includes both Toil's and ones defined inside sort.py. A complete explanation of Toil's arguments can be found in commandRef.
By default, Toil logs a lot of information related to the current environment in addition to messages from the batch system and jobs. This can be configured with the --logLevel flag. For example, to only log CRITICAL level messages to the screen:
$ python sort.py file:jobStore \
--logLevel=critical \
--overwriteOutput=True
This hides most of the information we get from the Toil run. For more detail, we can run the pipeline with --logLevel=debug to see a comprehensive output. For more information, see workflowOptions.
With Toil, you can recover gracefully from a bug in your pipeline without losing any progress from successfully completed jobs. To demonstrate this, let's add a bug to our example code to see how Toil handles a failure and how we can resume a pipeline after that happens. Add a bad assertion at line 52 of the example (the first line of down()):
def down(job, inputFileStoreID, N, downCheckpoints, memory=sortMemory):
...
assert 1 == 2, "Test error!"
When we run the pipeline, Toil will show a detailed failure log with a traceback:
$ python sort.py file:jobStore ... ---TOIL WORKER OUTPUT LOG--- ... m/j/jobonrSMP Traceback (most recent call last): m/j/jobonrSMP File "toil/src/toil/worker.py", line 340, in main m/j/jobonrSMP job._runner(jobGraph=jobGraph, jobStore=jobStore, fileStore=fileStore) m/j/jobonrSMP File "toil/src/toil/job.py", line 1270, in _runner m/j/jobonrSMP returnValues = self._run(jobGraph, fileStore) m/j/jobonrSMP File "toil/src/toil/job.py", line 1217, in _run m/j/jobonrSMP return self.run(fileStore) m/j/jobonrSMP File "toil/src/toil/job.py", line 1383, in run m/j/jobonrSMP rValue = userFunction(*((self,) + tuple(self._args)), **self._kwargs) m/j/jobonrSMP File "toil/example.py", line 30, in down m/j/jobonrSMP assert 1 == 2, "Test error!" m/j/jobonrSMP AssertionError: Test error!
If we try and run the pipeline again, Toil will give us an error message saying that a job store of the same name already exists. By default, in the event of a failure, the job store is preserved so that the workflow can be restarted, starting from the previously failed jobs. We can restart the pipeline by running
$ python sort.py file:jobStore \
--restart \
--overwriteOutput=True
We can also change the number of times Toil will attempt to retry a failed job:
$ python sort.py file:jobStore \
--retryCount 2 \
--restart \
--overwriteOutput=True
You'll now see Toil attempt to rerun the failed job until it runs out of tries. --retryCount is useful for non-systemic errors, like downloading a file that may experience a sporadic interruption, or some other non-deterministic failure.
To successfully restart our pipeline, we can edit our script to comment out line 30, or remove it, and then run
$ python sort.py file:jobStore \
--restart \
--overwriteOutput=True
The pipeline will run successfully, and the job store will be removed on the pipeline's completion.
Please see the cli_status section for more on gathering runtime and resource info on jobs.
After having installed the aws extra for Toil during the installation-ref and set up AWS (see prepareAWS), the user can run the basic helloWorld.py script (Running a basic workflow) on a VM in AWS just by modifying the run command.
Note that when running in AWS, users can either run the workflow on a single instance or run it on a cluster (which is running across multiple containers on multiple AWS instances). For more information on running Toil workflows on a cluster, see runningAWS.
Also! Remember to use the destroyCluster command when finished to destroy the cluster! Otherwise things may not be cleaned up properly.
$ toil launch-cluster <cluster-name> \
--keyPairName <AWS-key-pair-name> \
--leaderNodeType t2.medium \
--zone us-west-2a
The arguments keyPairName, leaderNodeType, and zone are required to launch a cluster.
$ toil rsync-cluster --zone us-west-2a <cluster-name> helloWorld.py :/tmp
Note that the command requires defining the file to copy as well as the target location on the cluster leader node.
$ toil ssh-cluster --zone us-west-2a <cluster-name>
Note that this command will log you in as the root user.
$ python /tmp/helloWorld.py aws:us-west-2:my-S3-bucket
In this particular case, we create an S3 bucket called my-S3-bucket in the us-west-2 availability zone to store intermediate job results.
Along with some other INFO log messages, you should get the following output in your terminal window: Hello, world!, here's a message: You did it!.
$ exit
$ toil destroy-cluster --zone us-west-2a <cluster-name>
Note that this command will destroy the cluster leader node and any resources created to run the job, including the S3 bucket.
After having installed the aws and cwl extras for Toil during the installation-ref and set up AWS (see prepareAWS), the user can run a CWL workflow with Toil on AWS.
Also! Remember to use the destroyCluster command when finished to destroy the cluster! Otherwise things may not be cleaned up properly.
$ toil launch-cluster <cluster-name> \
--keyPairName <AWS-key-pair-name> \
--leaderNodeType t2.medium \
--zone us-west-2a
toil rsync-cluster --zone us-west-2a <cluster-name> example.cwl :/tmp toil rsync-cluster --zone us-west-2a <cluster-name> example-job.yaml :/tmp
$ toil ssh-cluster --zone us-west-2a <cluster-name>
sudo apt-get update sudo apt-get -y upgrade sudo apt-get -y dist-upgrade sudo apt-get -y install git sudo pip install mesos.cli
virtualenv --system-site-packages venv source venv/bin/activate
(venv) $ toil-cwl-runner \
--provisioner aws \
--jobStore aws:us-west-2a:any-name \
/tmp/example.cwl /tmp/example-job.yaml
TIP:
$ toil destroy-cluster --zone us-west-2a <cluster-name>
Cactus is a reference-free, whole-genome multiple alignment program that can be run on any of the cloud platforms Toil supports.
NOTE:
This example provides a "cloud agnostic" view of running Cactus with Toil. Most options will not change between cloud providers. However, each provisioner has unique inputs for --leaderNodeType, --nodeType and --zone. We recommend the following:
Option | Used in | AWS | |
--leaderNodeType | launch-cluster | t2.medium | n1-standard-1 |
--zone | launch-cluster | us-west-2a | us-west1-a |
--zone | cactus | us-west-2 | |
--nodeType | cactus | c3.4xlarge | n1-standard-8 |
When executing toil launch-cluster with gce specified for --provisioner, the option --boto must be specified and given a path to your .boto file. See runningGCE for more information about the --boto option.
Also! Remember to use the destroyCluster command when finished to destroy the cluster! Otherwise things may not be cleaned up properly.
(venv) $ toil launch-cluster <cluster-name> \
--provisioner <aws, gce> \
--keyPairName <key-pair-name> \
--leaderNodeType <type> \
--zone <zone>
NOTE:
When using AWS, setting the environment variable eliminates having to specify the --zone option for each command. This will be supported for GCE in the future.
$ export TOIL_AWS_ZONE=us-west-2c
$ toil ssh-cluster --provisioner <aws, gce> <cluster-name> $ mkdir /root/cact_ex $ exit
$ toil rsync-cluster --provisioner <aws, gce> <cluster-name> pestis-short-aws-seqFile.txt :/root/cact_ex $ toil rsync-cluster --provisioner <aws, gce> <cluster-name> GCF_000169655.1_ASM16965v1_genomic.fna :/root/cact_ex $ toil rsync-cluster --provisioner <aws, gce> <cluster-name> GCF_000006645.1_ASM664v1_genomic.fna :/root/cact_ex $ toil rsync-cluster --provisioner <aws, gce> <cluster-name> GCF_000182485.1_ASM18248v1_genomic.fna :/root/cact_ex $ toil rsync-cluster --provisioner <aws, gce> <cluster-name> GCF_000013805.1_ASM1380v1_genomic.fna :/root/cact_ex $ toil rsync-cluster --provisioner <aws, gce> <cluster-name> setup_leaderNode.sh :/root/cact_ex $ toil rsync-cluster --provisioner <aws, gce> <cluster-name> blockTrim1.xml :/root/cact_ex $ toil rsync-cluster --provisioner <aws, gce> <cluster-name> blockTrim3.xml :/root/cact_ex
$ toil ssh-cluster --provisioner <aws, gce> <cluster-name>
$ bash /root/cact_ex/setup_leaderNode.sh $ source cact_venv/bin/activate (cact_venv) $ cd cactus (cact_venv) $ pip install --upgrade .
(cact_venv) $ TOIL_APPLIANCE_SELF=quay.io/ucsc_cgl/toil:3.14.0 cactus \
--provisioner <aws, gce> \
--nodeType <type> \
--maxNodes 2 \
--minNodes 0 \
--retry 10 \
--batchSystem mesos \
--logDebug \
--logFile /logFile_pestis3 \
--configFile \
/root/cact_ex/blockTrim3.xml <aws, google>:<zone>:cactus-pestis \
/root/cact_ex/pestis-short-aws-seqFile.txt \
/root/cact_ex/pestis_output3.hal
NOTE:
TOIL_APPLIANCE_SELF=quay.io/ucsc_cgl/toil:3.14.0 --- specifies the version of Toil being used, 3.14.0; if the latest one is desired, please eliminate.
--nodeType --- determines the instance type used for worker nodes. The instance type specified here must be on the same cloud provider as the one specified with --leaderNodeType
--maxNodes 2 --- creates up to two instances of the type specified with --nodeType and launches Mesos worker containers inside them.
--logDebug --- equivalent to --logLevel DEBUG.
--logFile /logFile_pestis3 --- writes logs in a file named logFile_pestis3 under / folder.
--configFile --- this is not required depending on whether a specific configuration file is intended to run the alignment.
<aws, google>:<zone>:cactus-pestis --- creates a bucket, named cactus-pestis, with the specified cloud provider to store intermediate job files and metadata. NOTE: If you want to use a GCE-based jobstore, specify google here, not gce.
The result file, named pestis_output3.hal, is stored under /root/cact_ex folder of the leader node.
Use cactus --help to see all the Cactus and Toil flags available.
(cact_venv) $ exit
(venv) $ toil rsync-cluster \
--provisioner <aws, gce> <cluster-name> \
:/root/cact_ex/pestis_output3.hal \
<path-of-folder-on-local-machine>
(venv) $ toil destroy-cluster --provisioner <aws, gce> <cluster-name>
Toil runs in various environments, including locally and in the cloud (Amazon Web Services and Google Compute Engine). Toil also supports two DSLs: CWL and (Amazon Web Services and Google Compute Engine). Toil also supports two DSLs: CWL and WDL (experimental).
Toil is built in a modular way so that it can be used on lots of different systems, and with different configurations. The three configurable pieces are the
The job store is a storage abstraction which contains all of the information used in a Toil run. This centralizes all of the files used by jobs in the workflow and also the details of the progress of the run. If a workflow crashes or fails, the job store contains all of the information necessary to resume with minimal repetition of work.
Several different job stores are supported, including the file job store and cloud job stores.
The file job store is for use locally, and keeps the workflow information in a directory on the machine where the workflow is launched. This is the simplest and most convenient job store for testing or for small runs.
For an example that uses the file job store, see quickstart.
Toil currently supports the following cloud storage systems as job stores:
These use cloud buckets to house all of the files. This is useful if there are several different worker machines all running jobs that need to access the job store.
A Toil batch system is either a local single-machine (one computer) or a currently supported HPC cluster of computers (lsf, parasol, mesos, slurm, torque, htcondor, or grid_engine). Mesos is a special case, and is launched for cloud environments. These environments manage individual worker nodes under a leader node to process the work required in a workflow. The leader and its workers all coordinate their tasks and files through a centralized job store location.
See batchSystemInterface for a more detailed description of different batch systems.
The Toil provisioner provides a tool set for running a Toil workflow on a particular cloud platform.
The clusterRef are command line tools used to provision nodes in your desired cloud platform. They allows you to launch nodes, ssh to the leader, and rsync files back and forth.
For detailed instructions for using the provisioner see runningAWS or runningGCE.
A quick way to see all of Toil's commandline options is by executing the following on a toil script:
$ toil example.py --help
For a basic toil workflow, Toil has one mandatory argument, the job store. All other arguments are optional.
Running toil scripts requires a filepath or url to a centralizing location for all of the files of the workflow. This is Toil's one required positional argument: the job store. To use the quickstart example, if you're on a node that has a large /scratch volume, you can specify that the jobstore be created there by executing: python HelloWorld.py /scratch/my-job-store, or more explicitly, python HelloWorld.py file:/scratch/my-job-store.
Syntax for specifying different job stores:
AWS: aws:region-here:job-store-name
Google: google:projectID-here:job-store-name
Different types of job store options can be found below.
Core Toil Options
Logging Options
Toil hides stdout and stderr by default except in case of job failure. Log levels in toil are based on priority from the logging module:
Batch System Options
Autoscaling Options
Miscellaneous Options
In the event of failure, Toil can resume the pipeline by adding the argument --restart and rerunning the python script. Toil pipelines can even be edited and resumed which is useful for development or troubleshooting.
Toil supports jobs, or clusters of jobs, that run as services to other accessor jobs. Example services include server databases or Apache Spark Clusters. As service jobs exist to provide services to accessor jobs their runtime is dependent on the concurrent running of their accessor jobs. The dependencies between services and their accessor jobs can create potential deadlock scenarios, where the running of the workflow hangs because only service jobs are being run and their accessor jobs can not be scheduled because of too limited resources to run both simultaneously. To cope with this situation Toil attempts to schedule services and accessors intelligently, however to avoid a deadlock with workflows running service jobs it is advisable to use the following parameters:
Specifying these parameters so that at a maximum cluster size there will be sufficient resources to run accessors in addition to services will ensure that such a deadlock can not occur.
If too low a limit is specified then a deadlock can occur in which toil can not schedule sufficient service jobs concurrently to complete the workflow. Toil will detect this situation if it occurs and throw a toil.DeadlockException exception. Increasing the cluster size and these limits will resolve the issue.
It's good to remember that commandline options can be overridden in the Toil script itself. For example, toil.job.Job.Runner.getDefaultOptions() can be used to run toil with all default options, and in this example, it will override commandline args to run the default options and always run with the "./toilWorkflow" directory specified as the jobstore:
options = Job.Runner.getDefaultOptions("./toilWorkflow") # Get the options object with Toil(options) as toil:
toil.start(Job()) # Run the script
However, each option can be explicitly set within the script by supplying arguments (in this example, we are setting logLevel = "DEBUG" (all log statements are shown) and clean="ALWAYS" (always delete the jobstore) like so:
options = Job.Runner.getDefaultOptions("./toilWorkflow") # Get the options object options.logLevel = "DEBUG" # Set the log level to the debug level. options.clean = "ALWAYS" # Always delete the jobStore after a run with Toil(options) as toil:
toil.start(Job()) # Run the script
However, the usual incantation is to accept commandline args from the user with the following:
parser = Job.Runner.getDefaultArgumentParser() # Get the parser options = parser.parse_args() # Parse user args to create the options object with Toil(options) as toil:
toil.start(Job()) # Run the script
Which can also, of course, then accept script supplied arguments as before (which will overwrite any user supplied args):
parser = Job.Runner.getDefaultArgumentParser() # Get the parser options = parser.parse_args() # Parse user args to create the options object options.logLevel = "DEBUG" # Set the log level to the debug level. options.clean = "ALWAYS" # Always delete the jobStore after a run with Toil(options) as toil:
toil.start(Job()) # Run the script
Toil has a number of tools to assist in debugging. Here we provide help in working through potential problems that a user might encounter in attempting to run a workflow.
Note: Currently these features are only implemented for use locally (single machine) with the fileJobStore.
To view what files currently reside in the jobstore, run the following command:
$ toil debug-file file:path-to-jobstore-directory \
--listFilesInJobStore
When run from the commandline, this should generate a file containing the contents of the job store (in addition to displaying a series of log messages to the terminal). This file is named "jobstore_files.txt" by default and will be generated in the current working directory.
If one wishes to copy any of these files to a local directory, one can run for example:
$ toil debug-file file:path-to-jobstore \
--fetch overview.txt *.bam *.fastq \
--localFilePath=/home/user/localpath
To fetch overview.txt, and all .bam and .fastq files. This can be used to recover previously used input and output files for debugging or reuse in other workflows, or use in general debugging to ensure that certain outputs were imported into the jobStore.
See cli_status for more about gathering statistics about job success, runtime, and resource usage from workflows.
If you execute a workflow using the --debugWorker flag, Toil will not fork in order to run jobs, which means you can either use pdb, or an IDE that supports debugging Python as you would normally. Note that the --debugWorker flag will only work with the singleMachine batch system (the default), and not any of the custom job schedulers.
Toil supports Amazon Web Services (AWS) and Google Compute Engine (GCE) in the cloud and has autoscaling capabilities that can adapt to the size of your workflow, whether your workflow requires 10 instances or 20,000.
Toil does this by creating a virtual cluster with Apache Mesos. Apache Mesos requires a leader node to coordinate the workflow, and worker nodes to execute the various tasks within the workflow. As the workflow runs, Toil will "autoscale", creating and terminating workers as needed to meet the demands of the workflow.
Once a user is familiar with the basics of running toil locally (specifying a jobStore, and how to write a toil script), they can move on to the guides below to learn how to translate these workflows into cloud ready workflows.
Toil can launch and manage a cluster of virtual machines to run using the provisioner to run a workflow distributed over several nodes. The provisioner also has the ability to automatically scale up or down the size of the cluster to handle dynamic changes in computational demand (autoscaling). Currently we have working provisioners with AWS and GCE (Azure support has been deprecated).
Toil uses Apache Mesos as the batchSystemOverview.
See here for instructions for runningAWS.
See here for instructions for runningGCE.
Toil can make use of cloud storage such as AWS or Google buckets to take care of storage needs.
This is useful when running Toil in single machine mode on any cloud platform since it allows you to make use of their integrated storage systems.
For an overview of the job store see jobStoreOverview.
For instructions configuring a particular job store see:
Kubernetes is a very popular container orchestration tool that has become a de facto cross-cloud-provider API for accessing cloud resources. Major cloud providers like Amazon, Microsoft, Kubernetes owner Google, and DigitalOcean have invested heavily in making Kubernetes work well on their platforms, by writing their own deployment documentation and developing provider-managed Kubernetes-based products. Using minikube, Kubernetes can even be run on a single machine.
Toil supports running Toil workflows against a Kubernetes cluster, either in the cloud or deployed on user-owned hardware.
To run Toil workflows on Kubernetes, you need to have a Kubernetes cluster set up. This will not be covered here, but there are many options available, and which one you choose will depend on which cloud ecosystem if any you use already, and on pricing. If you are just following along with the documentation, use minikube on your local machine.
Note that currently the only way to run a Toil workflow on Kubernetes is to use the AWS Job Store, so your Kubernetes workflow will currently have to store its data in Amazon's cloud regardless of where you run it. This can result in significant egress charges from Amazon if you run it outside of Amazon.
Kubernetes Cluster Providers:
There are two main ways to run Toil workflows on Kubernetes. You can either run the Toil leader on a machine outside the cluster, with jobs submitted to and run on the cluster, or you can submit the Toil leader itself as a job and have it run inside the cluster. Either way, you will need to configure your own machine to be able to submit jobs to the Kubernetes cluster. Generally, this involves creating and populating a file named .kube/config in your user's home directory, and specifying the cluster to connect to, the certificate and token information needed for mutual authentication, and the Kubernetes namespace within which to work. However, Kubernetes configuration can also be picked up from other files in the .kube directory, environment variables, and the enclosing host when running inside a Kubernetes-managed container.
You will have to do different things here depending on where you got your Kubernetes cluster:
Toil's internal Kubernetes configuration logic mirrors that of the kubectl command. Toil workflows will use the current kubectl context to launch their Kubernetes jobs.
If you are going to run your workflow's leader within the Kubernetes cluster (see Option 1: Running the Leader Inside Kubernetes), you will need a service account in your chosen Kubernetes namespace. Most namespaces should have a service account named default which should work fine. If your cluster requires you to use a different service account, you will need to obtain its name and use it when launching the Kubernetes job containing the Toil leader.
Your local Kubernetes context and/or the service account you are using to run the leader in the cluster will need to have certain permissions in order to run the workflow. Toil needs to be able to interact with jobs and pods in the cluster, and to retrieve pod logs. You as a user may need permission to set up an AWS credentials secret, if one is not already available. Additionally, it is very useful for you as a user to have permission to interact with nodes, and to shell into pods.
The appropriate permissions may already be available to you and your service account by default, especially in managed or ease-of-use-optimized setups such as EKS or minikube.
However, if the appropriate permissions are not already available, you or your cluster administrator will have to grant them manually. The following Role (toil-user) and ClusterRole (node-reader), to be applied with kubectl apply -f filename.yaml, should grant sufficient permissions to run Toil workflows when bound to your account and the service account used by Toil workflows. Be sure to replace YOUR_NAMESPACE_HERE with the namespace you are running your workflows in
apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata:
namespace: YOUR_NAMESPACE_HERE
name: toil-user rules: - apiGroups: ["*"]
resources: ["*"]
verbs: ["explain", "get", "watch", "list", "describe", "logs", "attach", "exec", "port-forward", "proxy", "cp", "auth"] - apiGroups: ["batch"]
resources: ["*"]
verbs: ["get", "watch", "list", "create", "run", "set", "delete"] - apiGroups: [""]
resources: ["secrets", "pods", "pods/attach", "podtemplates", "configmaps", "events", "services"]
verbs: ["patch", "get", "update", "watch", "list", "create", "run", "set", "delete", "exec"] - apiGroups: [""]
resources: ["pods", "pods/log"]
verbs: ["get", "list"] - apiGroups: [""]
resources: ["pods/exec"]
verbs: ["create"]
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata:
name: node-reader rules: - apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "describe"] - apiGroups: [""]
resources: ["namespaces"]
verbs: ["get", "list", "describe"] - apiGroups: ["metrics.k8s.io"]
resources: ["*"]
verbs: ["*"]
To bind a user or service account to the Role or ClusterRole and actually grant the permissions, you will need a RoleBinding and a ClusterRoleBinding, respectively. Make sure to fill in the namespace, username, and service account name, and add more user stanzas if your cluster is to support multiple Toil users.
apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata:
name: toil-developer-member
namespace: toil subjects: - kind: User
name: YOUR_KUBERNETES_USERNAME_HERE
apiGroup: rbac.authorization.k8s.io - kind: ServiceAccount
name: YOUR_SERVICE_ACCOUNT_NAME_HERE
namespace: YOUR_NAMESPACE_HERE roleRef:
kind: Role
name: toil-user
apiGroup: rbac.authorization.k8s.io
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata:
name: read-nodes subjects: - kind: User
name: YOUR_KUBERNETES_USERNAME_HERE
apiGroup: rbac.authorization.k8s.io - kind: ServiceAccount
name: YOUR_SERVICE_ACCOUNT_NAME_HERE
namespace: YOUR_NAMESPACE_HERE roleRef:
kind: ClusterRole
name: node-reader
apiGroup: rbac.authorization.k8s.io
Currently, the only job store, which is what Toil uses to exchange data between jobs, that works with jobs running on Kubernetes is the AWS Job Store. This requires that the Toil leader and Kubernetes jobs be able to connect to and use Amazon S3 and Amazon SimpleDB. It also requires that you have an Amazon Web Services account.
In your AWS account, you need to create an AWS access key. First go to the IAM dashboard; for "us-west1", the link would be:
https://console.aws.amazon.com/iam/home?region=us-west-1#/home
Then create an access key, and save the Access Key ID and the Secret Key. As documented in the AWS documentation:
Make sure that, if your AWS infrastructure requires your user to authenticate with a multi-factor authentication (MFA) token, you obtain a second secret key and access key that don't have this requirement. The secret key and access key used to populate the Kubernetes secret that allows the jobs to contact the job store need to be usable without human intervention.
This only really needs to happen if you run the leader on the local machine. But we need the files in place to fill in the secret in the next step. Run:
$ aws configure
Then when prompted, enter your secret key and access key. This should create a file ~/.aws/credentials that looks like this:
[default] aws_access_key_id = BLAH aws_secret_access_key = blahblahblah
$ cd ~/.aws
Then, create a Kubernetes secret that contains it. We'll call it aws-credentials:
$ kubectl create secret generic aws-credentials --from-file credentials
To configure your workflow to run on Kubernetes, you will have to configure several environment variables, in addition to passing the --batchSystem kubernetes option. Doing the research to figure out what values to give these variables may require talking to your cluster provider.
Note that Docker containers cannot be run inside of unprivileged Kubernetes pods (which are themselves containers). The Docker daemon does not (yet) support this. Other tools, such as Singularity in its user-namespace mode, are able to run containers from within containers. If using Singularity to run containerized tools, and you want downloaded container images to persist between Toil jobs, you will also want to set TOIL_KUBERNETES_HOST_PATH and make sure that Singularity is downloading its containers under the Toil work directory (/var/lib/toil buy default) by setting SINGULARITY_CACHEDIR. However, you will need to make sure that no two jobs try to download the same container at the same time; Singularity has no synchronization or locking around its cache, but the cache is also not safe for simultaneous access by multiple Singularity invocations. Some Toil workflows use their own custom workaround logic for this problem; this work is likely to be made part of Toil in a future release.
To run the workflow, you will need to run the Toil leader process somewhere. It can either be run inside Kubernetes as a Kubernetes job, or outside Kubernetes as a normal command.
Once you have determined a set of environment variable values for your workflow run, write a YAML file that defines a Kubernetes job to run your workflow with that configuration. Some configuration items (such as your username, and the name of your AWS credentials secret) need to be written into the YAML so that they can be used from the leader as well.
Note that the leader pod will need your workflow script, its other dependencies, and Toil all installed. An easy way to get Toil installed is to start with the Toil appliance image for the version of Toil you want to use. In this example, we use quay.io/ucsc_cgl/toil:4.1.0.
Here's an example YAML file to run a test workflow:
apiVersion: batch/v1 kind: Job metadata:
# It is good practice to include your username in your job name.
# Also specify it in TOIL_KUBERNETES_OWNER
name: demo-user-toil-test # Do not try and rerun the leader job if it fails spec:
backoffLimit: 0
template:
spec:
# Do not restart the pod when the job fails, but keep it around so the
# log can be retrieved
restartPolicy: Never
volumes:
- name: aws-credentials-vol
secret:
# Make sure the AWS credentials are available as a volume.
# This should match TOIL_AWS_SECRET_NAME
secretName: aws-credentials
# You may need to replace this with a different service account name as
# appropriate for your cluster.
serviceAccountName: default
containers:
- name: main
image: quay.io/ucsc_cgl/toil:4.1.0
env:
# Specify your username for inclusion in job names
- name: TOIL_KUBERNETES_OWNER
value: demo-user
# Specify where to find the AWS credentials to access the job store with
- name: TOIL_AWS_SECRET_NAME
value: aws-credentials
# Specify where per-host caches should be stored, on the Kubernetes hosts.
# Needs to be set for Toil's caching to be efficient.
- name: TOIL_KUBERNETES_HOST_PATH
value: /data/scratch
volumeMounts:
# Mount the AWS credentials volume
- mountPath: /root/.aws
name: aws-credentials-vol
resources:
# Make sure to set these resource limits to values large enough
# to accomodate the work your workflow does in the leader
# process, but small enough to fit on your cluster.
#
# Since no request values are specified, the limits are also used
# for the requests.
limits:
cpu: 2
memory: "4Gi"
ephemeral-storage: "10Gi"
command:
- /bin/bash
- -c
- |
# This Bash script will set up Toil and the workflow to run, and run them.
set -e
# We make sure to create a work directory; Toil can't hot-deploy a
# script from the root of the filesystem, which is where we start.
mkdir /tmp/work
cd /tmp/work
# We make a virtual environment to allow workflow dependencies to be
# hot-deployed.
#
# We don't really make use of it in this example, but for workflows
# that depend on PyPI packages we will need this.
#
# We use --system-site-packages so that the Toil installed in the
# appliance image is still available.
virtualenv --python python3 --system-site-packages venv
. venv/bin/activate
# Now we install the workflow. Here we're using a demo workflow
# script from Toil itself.
wget https://raw.githubusercontent.com/DataBiosphere/toil/releases/4.1.0/src/toil/test/docs/scripts/tutorial_helloworld.py
# Now we run the workflow. We make sure to use the Kubernetes batch
# system and an AWS job store, and we set some generally useful
# logging options. We also make sure to enable caching.
python3 tutorial_helloworld.py \
aws:us-west-2:demouser-toil-test-jobstore \
--batchSystem kubernetes \
--realTimeLogging \
--logInfo \
--disableCaching false
You can save this YAML as leader.yaml, and then run it on your Kubernetes installation with:
$ kubectl apply -f leader.yaml
To monitor the progress of the leader job, you will want to read its logs. If you are using a Kubernetes dashboard such as k9s, you can simply find the pod created for the job in the dashboard, and view its logs there. If not, you will need to locate the pod by hand.
The following techniques are most useful for looking at the pod which holds the Toil leader, but they can also be applied to individual Toil jobs on Kubernetes, even when the leader is outside the cluster.
Kubernetes names pods for jobs by appending a short random string to the name of the job. You can find the name of the pod for your job by doing:
$ kubectl get pods | grep demo-user-toil-test demo-user-toil-test-g5496 1/1 Running 0 2m
Assuming you have set TOIL_KUBERNETES_OWNER correctly, you should be able to find all of your workflow's pods by searching for your username:
$ kubectl get pods | grep demo-user
If the status of a pod is anything other than Pending, you will be able to view its logs with:
$ kubectl logs demo-user-toil-test-g5496
This will dump the pod's logs from the beginning to now and terminate. To follow along with the logs from a running pod, add the -f option:
$ kubectl logs -f demo-user-toil-test-g5496
A status of ImagePullBackoff suggests that you have requested to use an image that is not available. Check the image section of your YAML if you are looking at a leader, or the value of TOIL_APPLIANCE_SELF if you are delaying with a worker job. You also might want to check your Kubernetes node's Internet connectivity and DNS function; in Kubernetes, DNS depends on system-level pods which can be terminated or evicted in cases of resource oversubscription, just like user workloads.
If your pod seems to be stuck Pending, ContainerCreating, you can get information on what is wrong with it by using kubectl describe pod:
$ kubectl describe pod demo-user-toil-test-g5496
Pay particular attention to the Events: section at the end of the output. An indication that a job is too big for the available nodes on your cluster, or that your cluster is too busy for your jobs, is FailedScheduling events:
Type Reason Age From Message ---- ------ ---- ---- ------- Warning FailedScheduling 13s (x79 over 100m) default-scheduler 0/4 nodes are available: 1 Insufficient cpu, 1 Insufficient ephemeral-storage, 4 Insufficient memory.
If a pod is running but seems to be behaving erratically, or seems stuck, you can shell into it and look around:
$ kubectl exec -ti demo-user-toil-test-g5496 /bin/bash
One common cause of stuck pods is attempting to use more memory than allowed by Kubernetes (or by the Toil job's memory resource requirement), but in a way that does not trigger the Linux OOM killer to terminate the pod's processes. In these cases, the pod can remain stuck at nearly 100% memory usage more or less indefinitely, and attempting to shell into the pod (which needs to start a process within the pod, using some of its memory) will fail. In these cases, the recommended solution is to kill the offending pod and increase its (or its Toil job's) memory requirement, or reduce its memory needs by adapting user code.
The Toil Kubernetes batch system includes cleanup code to terminate worker jobs when the leader shuts down. However, if the leader pod is removed by Kubernetes, is forcibly killed or otherwise suffers a sudden existence failure, it can go away while its worker jobs live on. It is not recommended to restart a workflow in this state, as jobs from the previous invocation will remain running and will be trying to modify the job store concurrently with jobs from the new invocation.
To clean up dangling jobs, you can use the following snippet:
$ kubectl get jobs | grep demo-user | cut -f1 -d' ' | xargs -n10 kubectl delete job
This will delete all jobs with demo-user's username in their names, in batches of 10. You can also use the UUID that Toil assigns to a particular workflow invocation in the filter, to clean up only the jobs pertaining to that workflow invocation.
If you don't want to run your Toil leader inside Kubernetes, you can run it locally instead. This can be useful when developing a workflow; files can be hot-deployed from your local machine directly to Kubernetes. However, your local machine will have to have (ideally role-assumption- and MFA-free) access to AWS, and access to Kubernetes. Real time logging will not work unless your local machine is able to listen for incoming UDP packets on arbitrary ports on the address it uses to contact the IPv4 Internet; Toil does no NAT traversal or detection.
Note that if you set TOIL_WORKDIR when running your workflow like this, it will need to be a directory that exists both on the host and in the Toil appliance.
Here is an example of running our test workflow leader locally, outside of Kubernetes:
$ export TOIL_KUBERNETES_OWNER=demo-user # This defaults to your local username if not set $ export TOIL_AWS_SECRET_NAME=aws-credentials $ export TOIL_KUBERNETES_HOST_PATH=/data/scratch $ virtualenv --python python3 --system-site-packages venv $ . venv/bin/activate $ wget https://raw.githubusercontent.com/DataBiosphere/toil/releases/4.1.0/src/toil/test/docs/scripts/tutorial_helloworld.py $ python3 tutorial_helloworld.py \
aws:us-west-2:demouser-toil-test-jobstore \
--batchSystem kubernetes \
--realTimeLogging \
--logInfo \
--disableCaching false
Toil jobs can be run on a variety of cloud platforms. Of these, Amazon Web Services (AWS) is currently the best-supported solution. Toil provides the clusterRef to conveniently create AWS clusters, connect to the leader of the cluster, and then launch a workflow. The leader handles distributing the jobs over the worker nodes and autoscaling to optimize costs.
The Running a Workflow with Autoscaling section details how to create a cluster and run a workflow that will dynamically scale depending on the workflow's needs.
The Static Provisioning section explains how a static cluster (one that won't automatically change in size) can be created and provisioned (grown, shrunk, destroyed, etc.).
To use Amazon Web Services (AWS) to run Toil or to just use S3 to host the files during the computation of a workflow, first set up and configure an account with AWS:
$ ssh-keygen -t rsa
~/.ssh/id_rsa
$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys $ eval `ssh-agent -s` $ ssh-add
$ chmod 400 id_rsa
https://us-west-1.console.aws.amazon.com/ec2/v2/home?region=us-west-1#KeyPairs:sort=keyName
https://console.aws.amazon.com/iam/home?region=us-west-1#/home
$ pip install awscli --upgrade --user
$ aws configure
" AWS Access Key ID [****************Q65Q]: " " AWS Secret Access Key [****************G0ys]: " " Default region name [us-west-1]: " " Default output format [json]: "
This will create the files ~/.aws/config and ~/.aws/credentials.
$ virtualenv venv $ source venv/bin/activate $ pip install toil[all]==3.12.0
$ TOIL_APPLIANCE_SELF=quay.io/ucsc_cgl/toil:3.12.0 \
toil launch-cluster clustername \
--leaderNodeType t2.medium \
--zone us-west-1a \
--keyPairName id_rsa
To further break down each of these commands:
toil launch-cluster --- Base command in toil to launch a cluster.
clustername --- Just choose a name for your cluster.
--leaderNodeType t2.medium --- Specify the leader node type. Make a t2.medium (2CPU; 4Gb RAM; $0.0464/Hour). List of available AWS instances: https://aws.amazon.com/ec2/pricing/on-demand/
--zone us-west-1a --- Specify the AWS zone you want to launch the instance in. Must have the same prefix as the zone in your awscli credentials (which, in the example of this tutorial is: "us-west-1").
--keyPairName id_rsa --- The name of your key pair, which should be "id_rsa" if you've followed this tutorial.
Using the AWS job store is straightforward after you've finished Preparing your AWS environment; all you need to do is specify the prefix for the job store name.
To run the sort example sort example with the AWS job store you would type
$ python sort.py aws:us-west-2:my-aws-sort-jobstore
The Toil provisioner is included in Toil alongside the [aws] extra and allows us to spin up a cluster.
Getting started with the provisioner is simple:
The Toil provisioner is built around the Toil Appliance, a Docker image that bundles Toil and all its requirements (e.g. Mesos). This makes deployment simple across platforms, and you can even simulate a cluster locally (see appliance_dev for details).
When using the Toil provisioner, the appliance image will be automatically chosen based on the pip-installed version of Toil on your system. That choice can be overridden by setting the environment variables TOIL_DOCKER_REGISTRY and TOIL_DOCKER_NAME or TOIL_APPLIANCE_SELF. See envars for more information on these variables. If you are developing with autoscaling and want to test and build your own appliance have a look at appliance_dev.
For information on using the Toil Provisioner have a look at Running a Workflow with Autoscaling.
Using the provisioner to launch a Toil leader instance is simple using the launch-cluster command. For example, to launch a cluster named "my-cluster" with a t2.medium leader in the us-west-2a zone, run
(venv) $ toil launch-cluster my-cluster \
--leaderNodeType t2.medium \
--zone us-west-2a \
--keyPairName <your-AWS-key-pair-name>
The cluster name is used to uniquely identify your cluster and will be used to populate the instance's Name tag. Also, the Toil provisioner will automatically tag your cluster with an Owner tag that corresponds to your keypair name to facilitate cost tracking. In addition, the ToilNodeType tag can be used to filter "leader" vs. "worker" nodes in your cluster.
The leaderNodeType is an EC2 instance type. This only affects the leader node.
The --zone parameter specifies which EC2 availability zone to launch the cluster in. Alternatively, you can specify this option via the TOIL_AWS_ZONE environment variable. Note: the zone is different from an EC2 region. A region corresponds to a geographical area like us-west-2 (Oregon), and availability zones are partitions of this area like us-west-2a.
By default, Toil creates an IAM role for each cluster with sufficient permissions to perform cluster operations (e.g. full S3, EC2, and SDB access). If the default permissions are not sufficient for your use case (e.g. if you need access to ECR), you may create a custom IAM role with all necessary permissions and set the --awsEc2ProfileArn parameter when launching the cluster. Note that your custom role must at least have these permissions in order for the Toil cluster to function properly.
In addition, Toil creates a new security group with the same name as the cluster name with default rules (e.g. opens port 22 for SSH access). If you require additional security groups, you may use the --awsEc2ExtraSecurityGroupId parameter when launching the cluster. Note: Do not use the same name as the cluster name for the extra security groups as any security group matching the cluster name will be deleted once the cluster is destroyed.
For more information on options try:
(venv) $ toil launch-cluster --help
Toil can be used to manage a cluster in the cloud by using the clusterRef. The cluster utilities also make it easy to run a toil workflow directly on this cluster. We call this static provisioning because the size of the cluster does not change. This is in contrast with Running a Workflow with Autoscaling.
To launch worker nodes alongside the leader we use the -w option:
(venv) $ toil launch-cluster my-cluster \
--leaderNodeType t2.small -z us-west-2a \
--keyPairName your-AWS-key-pair-name \
--nodeTypes m3.large,t2.micro -w 1,4
This will spin up a leader node of type t2.small with five additional workers --- one m3.large instance and four t2.micro.
Currently static provisioning is only possible during the cluster's creation. The ability to add new nodes and remove existing nodes via the native provisioner is in development. Of course the cluster can always be deleted with the destroyCluster utility.
Now that our cluster is launched, we use the rsyncCluster utility to copy the workflow to the leader. For a simple workflow in a single file this might look like
(venv) $ toil rsync-cluster -z us-west-2a my-cluster toil-workflow.py :/
NOTE:
Autoscaling is a feature of running Toil in a cloud whereby additional cloud instances are launched to run the workflow. Autoscaling leverages Mesos containers to provide an execution environment for these workflows.
NOTE:
(venv) $ toil launch-cluster <cluster-name> \
--keyPairName <AWS-key-pair-name> \
--leaderNodeType t2.medium \
--zone us-west-2a
(venv) $ toil rsync-cluster -z us-west-2a <cluster-name> sort.py :/root
(venv) $ toil ssh-cluster -z us-west-2a <cluster-name>
$ python /root/sort.py aws:us-west-2:<my-jobstore-name> \
--provisioner aws \
--nodeTypes c3.large \
--maxNodes 2 \
--batchSystem mesos
NOTE:
$ head fileToSort.txt
$ head sortedFile.txt
For more information on other autoscaling (and other) options have a look at workflowOptions and/or run
$ python my-toil-script.py --help
IMPORTANT:
Toil can run on a heterogeneous cluster of both preemptable and non-preemptable nodes. Being preemptable node simply means that the node may be shut down at any time, while jobs are running. These jobs can then be restarted later somewhere else.
A node type can be specified as preemptable by adding a spot bid to its entry in the list of node types provided with the --nodeTypes flag. If spot instance prices rise above your bid, the preemptable node whill be shut down.
While individual jobs can each explicitly specify whether or not they should be run on preemptable nodes via the boolean preemptable resource requirement, the --defaultPreemptable flag will allow jobs without a preemptable requirement to run on preemptable machines.
Ensure that your choices for --nodeTypes and --maxNodes <> make sense for your workflow and won't cause it to hang. You should make sure the provisioner is able to create nodes large enough to run the largest job in the workflow, and that non-preemptable node types are allowed if there are non-preemptable jobs in the workflow.
Finally, the --preemptableCompensation flag can be used to handle cases where preemptable nodes may not be available but are required for your workflow. With this flag enabled, the autoscaler will attempt to compensate for a shortage of preemptable nodes of a certain type by creating non-preemptable nodes of that type, if non-preemptable nodes of that type were specified in --nodeTypes.
Toil can be configured to access files stored in an S3-compatible object store such as MinIO. The following environment variables can be used to configure the S3 connection used:
Examples:
TOIL_S3_HOST=127.0.0.1 TOIL_S3_PORT=9010 TOIL_S3_USE_SSL=False
Toil provides a dashboard for viewing the RAM and CPU usage of each node, the number of issued jobs of each type, the number of failed jobs, and the size of the jobs queue. To launch this dashboard for a toil workflow, include the --metrics flag in the toil script command. The dashboard can then be viewed in your browser at localhost:3000 while connected to the leader node through toil ssh-cluster. On AWS, the dashboard keeps track of every node in the cluster to monitor CPU and RAM usage, but it can also be used while running a workflow on a single machine. The dashboard uses Grafana as the front end for displaying real-time plots, and Prometheus for tracking metrics exported by toil. In order to use the dashboard for a non-released toil version, you will have to build the containers locally with make docker, since the prometheus, grafana, and mtail containers used in the dashboard are tied to a specific toil version.
Toil supports a provisioner with Google, and a Google Job Store. To get started, follow instructions for Preparing your Google environment.
Toil supports using the Google Cloud Platform. Setting this up is easy!
$ ssh-keygen -t rsa -f ~/.ssh/id_rsa -C [USERNAME]
where [USERNAME] is something like jane@example.com. Make sure to leave your password blank.
WARNING:
Make sure only you can read the SSH keys:
$ chmod 400 ~/.ssh/id_rsa ~/.ssh/id_rsa.pub
Near the top of the screen click on 'SSH Keys', then edit, add item, and paste the key. Then save: [image]
For more details look at Google's instructions for adding SSH keys.
To use the Google Job Store you will need to set the GOOGLE_APPLICATION_CREDENTIALS environment variable by following Google's instructions.
Then to run the sort example with the Google job store you would type
$ python sort.py google:my-project-id:my-google-sort-jobstore
WARNING:
The steps to run a GCE workflow are similar to those of AWS (Autoscaling), except you will need to explicitly specify the --provisioner gce option which otherwise defaults to aws.
(venv) $ toil launch-cluster <CLUSTER-NAME> \
--provisioner gce \
--leaderNodeType n1-standard-1 \
--keyPairName <SSH-KEYNAME> \
--zone us-west1-a
Where <SSH-KEYNAME> is the first part of [USERNAME] used when setting up your ssh key. For example if [USERNAME] was jane@example.com, <SSH-KEYNAME> should be jane.
The --keyPairName option is for an SSH key that was added to the Google account. If your ssh key [USERNAME] was jane@example.com, then your key pair name will be just jane.
(venv) $ toil rsync-cluster --provisioner gce <CLUSTER-NAME> sort.py :/root (venv) $ toil ssh-cluster --provisioner gce <CLUSTER-NAME>
$ python /root/sort.py google:<PROJECT-ID>:<JOBSTORE-NAME> \
--provisioner gce \
--batchSystem mesos \
--nodeTypes n1-standard-2 \
--maxNodes 2
$ exit # this exits the ssh from the leader node (venv) $ toil destroy-cluster --provisioner gce <CLUSTER-NAME>
There are several utilities used for starting and managing a Toil cluster using the AWS provisioner. They are installed via the [aws] or [google] extra. For installation details see installProvisioner. The cluster utilities are used for runningAWS and are comprised of toil launch-cluster, toil rsync-cluster, toil ssh-cluster, and toil destroy-cluster entry points.
Cluster commands specific to toil are:
stats --- Inspects a job store to see which jobs have failed, run successfully, etc.
destroy-cluster --- For autoscaling. Terminates the specified cluster and associated resources.
launch-cluster --- For autoscaling. This is used to launch a toil leader instance with the specified provisioner.
rsync-cluster --- For autoscaling. Used to transfer files to a cluster launched with toil launch-cluster.
ssh-cluster --- SSHs into the toil appliance container running on the leader of the cluster.
clean --- Delete the job store used by a previous Toil workflow invocation.
kill --- Kills any running jobs in a rogue toil.
For information on a specific utility run:
toil launch-cluster --help
for a full list of its options and functionality.
The cluster utilities can be used for runningGCE and runningAWS.
TIP:
NOTE:
runningGCE contains instructions for
To use the stats command, a workflow must first be run using the --stats option. Using this command makes certain that toil does not delete the job store, no matter what other options are specified (i.e. normally the option --clean=always would delete the job, but --stats will override this).
An example of this would be running the following:
python discoverfiles.py file:my-jobstore --stats
Where discoverfiles.py is the following:
import os import subprocess from toil.common import Toil from toil.job import Job class discoverFiles(Job):
"""Views files at a specified path using ls."""
def __init__(self, path, *args, **kwargs):
self.path = path
super(discoverFiles, self).__init__(*args, **kwargs)
def run(self, fileStore):
if os.path.exists(self.path):
subprocess.check_call(["ls", self.path]) def main():
options = Job.Runner.getDefaultArgumentParser().parse_args()
options.clean = "always"
job1 = discoverFiles(path="/sys/", displayName='sysFiles')
job2 = discoverFiles(path=os.path.expanduser("~"), displayName='userFiles')
job3 = discoverFiles(path="/tmp/")
job1.addChild(job2)
job2.addChild(job3)
with Toil(options) as toil:
if not toil.options.restart:
toil.start(job1)
else:
toil.restart() if __name__ == '__main__':
main()
Notice the displayName key, which can rename a job, giving it an alias when it is finally displayed in stats. Running this workflow file should record three job names: sysFiles (job1), userFiles (job2), and discoverFiles (job3). To see the runtime and resources used for each job when it was run, type
toil stats file:my-jobstore
This should output the following:
Batch System: singleMachine Default Cores: 1 Default Memory: 2097152K Max Cores: 9.22337e+18 Total Clock: 0.56 Total Runtime: 1.01 Worker
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
1 | 0.14 0.14 0.14 0.14 0.14 | 0.13 0.13 0.13 0.13 0.13 | 0.01 0.01 0.01 0.01 0.01 | 76K 76K 76K 76K 76K Job
Worker Jobs | min med ave max
| 3 3 3 3
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
3 | 0.01 0.06 0.05 0.07 0.14 | 0.00 0.06 0.04 0.07 0.12 | 0.00 0.01 0.00 0.01 0.01 | 76K 76K 76K 76K 229K
sysFiles
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
1 | 0.01 0.01 0.01 0.01 0.01 | 0.00 0.00 0.00 0.00 0.00 | 0.01 0.01 0.01 0.01 0.01 | 76K 76K 76K 76K 76K
userFiles
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
1 | 0.06 0.06 0.06 0.06 0.06 | 0.06 0.06 0.06 0.06 0.06 | 0.01 0.01 0.01 0.01 0.01 | 76K 76K 76K 76K 76K
discoverFiles
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
1 | 0.07 0.07 0.07 0.07 0.07 | 0.07 0.07 0.07 0.07 0.07 | 0.00 0.00 0.00 0.00 0.00 | 76K 76K 76K 76K 76K
Once we're done, we can clean up the job store by running
toil clean file:my-jobstore
Continuing the example from the stats section above, if we ran our workflow with the command
python discoverfiles.py file:my-jobstore --stats
We could interrogate our jobstore with the status command, for example:
toil status file:my-jobstore
If the run was successful, this would not return much valuable information, something like
2018-01-11 19:31:29,739 - toil.lib.bioio - INFO - Root logger is at level 'INFO', 'toil' logger at level 'INFO'. 2018-01-11 19:31:29,740 - toil.utils.toilStatus - INFO - Parsed arguments 2018-01-11 19:31:29,740 - toil.utils.toilStatus - INFO - Checking if we have files for Toil The root job of the job store is absent, the workflow completed successfully.
Otherwise, the status command should return the following:
If a Toil pipeline didn't finish successfully, or was run using --clean=always or --stats, the job store will exist until it is deleted. toil clean <jobStore> ensures that all artifacts associated with a job store are removed. This is particularly useful for deleting AWS job stores, which reserves an SDB domain as well as an S3 bucket.
The deletion of the job store can be modified by the --clean argument, and may be set to always, onError, never, or onSuccess (default).
Temporary directories where jobs are running can also be saved from deletion using the --cleanWorkDir, which has the same options as --clean. This option should only be run when debugging, as intermediate jobs will fill up disk space.
Running toil launch-cluster starts up a leader for a cluster. Workers can be added to the initial cluster by specifying the -w option. An example would be
$ toil launch-cluster my-cluster \
--leaderNodeType t2.small -z us-west-2a \
--keyPairName your-AWS-key-pair-name \
--nodeTypes m3.large,t2.micro -w 1,4
Options are listed below. These can also be displayed by running
$ toil launch-cluster --help
launch-cluster's main positional argument is the clusterName. This is simply the name of your cluster. If it does not exist yet, Toil will create it for you.
Launch-Cluster Options
Logging Options
Toil provides the ability to ssh into the leader of the cluster. This can be done as follows:
$ toil ssh-cluster CLUSTER-NAME-HERE
This will open a shell on the Toil leader and is used to start an Autoscaling run. Issues with docker prevent using screen and tmux when sshing the cluster (The shell doesn't know that it is a TTY which prevents it from allocating a new screen session). This can be worked around via
$ script $ screen
Simply running screen within script will get things working properly again.
Finally, you can execute remote commands with the following syntax:
$ toil ssh-cluster CLUSTER-NAME-HERE remoteCommand
It is not advised that you run your Toil workflow using remote execution like this unless a tool like nohup is used to ensure the process does not die if the SSH connection is interrupted.
For an example usage, see Autoscaling.
The most frequent use case for the rsync-cluster utility is deploying your Toil script to the Toil leader. Note that the syntax is the same as traditional rsync with the exception of the hostname before the colon. This is not needed in toil rsync-cluster since the hostname is automatically determined by Toil.
Here is an example of its usage:
$ toil rsync-cluster CLUSTER-NAME-HERE \
~/localFile :/remoteDestination
The destroy-cluster command is the advised way to get rid of any Toil cluster launched using the Launch-Cluster Command command. It ensures that all attached nodes, volumes, security groups, etc. are deleted. If a node or cluster is shut down using Amazon's online portal residual resources may still be in use in the background. To delete a cluster run
$ toil destroy-cluster CLUSTER-NAME-HERE
To kill all currently running jobs for a given jobstore, use the command
toil kill file:my-jobstore
Toil is a flexible framework that can be leveraged in a variety of environments, including high-performance computing (HPC) environments. Toil provides support for a number of batch systems, including Grid Engine, Slurm, Torque and LSF, which are popular schedulers used in these environments. Toil also supports HTCondor, which is a popular scheduler for high-throughput computing (HTC). To use one of these batch systems specify the "-\-batchSystem" argument to the toil script.
Due to the cost and complexity of maintaining support for these schedulers we currently consider them to be "community supported", that is the core development team does not regularly test or develop support for these systems. However, there are members of the Toil community currently deploying Toil in HPC environments and we welcome external contributions.
Developing the support of a new or existing batch system involves extending the abstract batch system class toil.batchSystems.abstractBatchSystem.AbstractBatchSystem.
Standard output and error from batch system jobs (except for the Parasol and Mesos batch systems) are redirected to files in the toil-<workflowID> directory created within the temporary directory specified by the --workDir option; see optionsRef. Each file is named as follows: toil_job_<Toil job ID>_batch_<name of batch system>_<job ID from batch system>_<file description>.log, where <file description> is std_output for standard output, and std_error for standard error. HTCondor will also write job event log files with <file description> = job_events.
If capturing standard output and error is desired, --workDir will generally need to be on a shared file system; otherwise if these are written to local temporary directories on each node (e.g. /tmp) Toil will not be able to retrieve them. Alternatively, the --noStdOutErr option forces Toil to discard all standard output and error from batch system jobs.
The Common Workflow Language (CWL) is an emerging standard for writing workflows that are portable across multiple workflow engines and platforms. Toil has full support for the CWL v1.0.1 specification.
The toil-cwl-runner command provides cwl-parsing functionality using cwltool, and leverages the job-scheduling and batch system support of Toil.
To run in local batch mode, provide the CWL file and the input object file:
$ toil-cwl-runner example.cwl example-job.yml
For a simple example of CWL with Toil see cwlquickstart.
When invoking CWL documents that make use of Docker containers if you see errors that look like
docker: Error response from daemon: Mounts denied: The paths /var/...tmp are not shared from OS X and are not known to Docker.
you may need to add
export TMPDIR=/tmp/docker_tmp
either in your startup file (.bashrc) or add it manually in your shell before invoking toil.
Help information can be found by using this toil command:
$ toil-cwl-runner -h
A more detailed example shows how we can specify both Toil and cwltool arguments for our workflow:
$ toil-cwl-runner \
--singularity \
--jobStore my_jobStore \
--batchSystem lsf \
--workDir `pwd` \
--outdir `pwd` \
--logFile cwltoil.log \
--writeLogs `pwd` \
--logLevel DEBUG \
--retryCount 2 \
--maxLogFileSize 20000000000 \
--stats \
standard_bam_processing.cwl \
inputs.yaml
In this example, we set the following options, which are all passed to Toil:
--singularity: Specifies that all jobs with Docker fornat containers specified should be run using the Singularity container engine instead of the Docker container engine.
--jobStore: Path to a folder that already exists, which will contain the Toil jobstore and all related job-tracking information.
--batchSystem: Use the specified HPC or Cloud-based cluster platform.
--workDir: The directory where all temporary files will be created for the workflow. A subdirectory of this will be set as the $TMPDIR environment variable and this subdirectory can be referenced using the CWL parameter reference $(runtime.tmpdir) in CWL tools and workflows.
--outdir: Directory where final File and Directory outputs will be written. References to these and other output types will be in the JSON object printed to the stdout stream after workflow execution.
--logFile: Path to the main logfile with logs from all jobs.
--writeLogs: Directory where all job logs will be stored.
--retryCount: How many times to retry each Toil job.
--maxLogFileSize: Logs that get larger than this value will be truncated.
--stats: Save resources usages in json files that can be collected with the toil stats command after the workflow is done.
To run in cloud and HPC configurations, you may need to provide additional command line parameters to select and configure the batch system to use.
To run a CWL workflow in AWS with toil see awscwl.
A CWL workflow can be run indirectly in a native Toil script. However, this is not the standard way to run CWL workflows with Toil and doing so comes at the cost of job efficiency. For some use cases, such as running one process on multiple files, it may be useful. For example, if you want to run a CWL workflow with 3 YML files specifying different samples inputs, it could look something like:
import os import subprocess from toil.common import Toil from toil.job import Job def initialize_jobs(job):
job.fileStore.logToMaster('initialize_jobs') def runQC(job, cwl_file, cwl_filename, yml_file, yml_filename, outputs_dir, output_num):
job.fileStore.logToMaster("runQC")
tempDir = job.fileStore.getLocalTempDir()
cwl = job.fileStore.readGlobalFile(cwl_file, userPath=os.path.join(tempDir, cwl_filename))
yml = job.fileStore.readGlobalFile(yml_file, userPath=os.path.join(tempDir, yml_filename))
subprocess.check_call(["toil-cwl-runner", cwl, yml])
output_filename = "output.txt"
output_file = job.fileStore.writeGlobalFile(output_filename)
job.fileStore.readGlobalFile(output_file, userPath=os.path.join(outputs_dir, "sample_" + output_num + "_" + output_filename))
return output_file if __name__ == "__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
with Toil(options) as toil:
# specify the folder where the cwl and yml files live
inputs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cwlExampleFiles")
# specify where you wish the outputs to be written
outputs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cwlExampleFiles")
job0 = Job.wrapJobFn(initialize_jobs)
cwl_filename = "hello.cwl"
cwl_file = toil.importFile("file://" + os.path.abspath(os.path.join(inputs_dir, cwl_filename)))
# add list of yml config inputs here or import and construct from file
yml_files = ["hello1.yml", "hello2.yml", "hello3.yml"]
i = 0
for yml in yml_files:
i = i + 1
yml_file = toil.importFile("file://" + os.path.abspath(os.path.join(inputs_dir, yml)))
yml_filename = yml
job = Job.wrapJobFn(runQC, cwl_file, cwl_filename, yml_file, yml_filename, outputs_dir, output_num=str(i))
job0.addChild(job)
toil.start(job0)
See logs for just one job by using the full log file
This requires knowing the job's toil-generated ID, which can be found in the log files.
cat cwltoil.log | grep jobVM1fIs
Grep for full tool commands from toil logs
This gives you a more concise view of the commands being run (note that this information is only available from Toil when running with --logDebug).
pcregrep -M "\[job .*\.cwl.*$\n(.* .*$\n)*" cwltoil.log # ^allows for multiline matching
Find Bams that have been generated for specific step while pipeline is running:
find . | grep -P '^./out_tmpdir.*_MD\.bam$'
See what jobs have been run
cat log/cwltoil.log | grep -oP "\[job .*.cwl\]" | sort | uniq
or:
cat log/cwltoil.log | grep -i "issued job"
Get status of a workflow
$ toil status /home/johnsoni/TEST_RUNS_3/TEST_run/tmp/jobstore-09ae0acc-c800-11e8-9d09-70106fb1697e <hostname> 2018-10-04 15:01:44,184 MainThread INFO toil.lib.bioio: Root logger is at level 'INFO', 'toil' logger at level 'INFO'. <hostname> 2018-10-04 15:01:44,185 MainThread INFO toil.utils.toilStatus: Parsed arguments <hostname> 2018-10-04 15:01:47,081 MainThread INFO toil.utils.toilStatus: Traversing the job graph gathering jobs. This may take a couple of minutes. Of the 286 jobs considered, there are 179 jobs with children, 107 jobs ready to run, 0 zombie jobs, 0 jobs with services, 0 services, and 0 jobs with log files currently in file:/home/user/jobstore-09ae0acc-c800-11e8-9d09-70106fb1697e.
Toil Stats
You can get run statistics broken down by CWL file. This only works once the workflow is finished:
$ toil stats /path/to/jobstore
The output will contain CPU, memory, and walltime information for all CWL job types:
<hostname> 2018-10-15 12:06:19,003 MainThread INFO toil.lib.bioio: Root logger is at level 'INFO', 'toil' logger at level 'INFO'. <hostname> 2018-10-15 12:06:19,004 MainThread INFO toil.utils.toilStats: Parsed arguments <hostname> 2018-10-15 12:06:19,004 MainThread INFO toil.utils.toilStats: Checking if we have files for toil <hostname> 2018-10-15 12:06:19,004 MainThread INFO toil.utils.toilStats: Checked arguments Batch System: lsf Default Cores: 1 Default Memory: 10485760K Max Cores: 9.22337e+18 Total Clock: 106608.01 Total Runtime: 86634.11 Worker
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
1659 | 0.00 0.80 264.87 12595.59 439424.40 | 0.00 0.46 449.05 42240.74 744968.80 | -35336.69 0.16 -184.17 4230.65 -305544.39 | 48K 223K 1020K 40235K 1692300K Job
Worker Jobs | min med ave max
| 1077 1077 1077 1077
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
1077 | 0.04 1.18 407.06 12593.43 438404.73 | 0.01 0.28 691.17 42240.35 744394.14 | -35336.83 0.27 -284.11 4230.49 -305989.41 | 135K 268K 1633K 40235K 1759734K
ResolveIndirect
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
205 | 0.04 0.07 0.16 2.29 31.95 | 0.01 0.02 0.02 0.14 3.60 | 0.02 0.05 0.14 2.28 28.35 | 190K 266K 256K 314K 52487K
CWLGather
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
40 | 0.05 0.17 0.29 1.90 11.62 | 0.01 0.02 0.02 0.05 0.80 | 0.03 0.14 0.27 1.88 10.82 | 188K 265K 250K 316K 10039K
CWLWorkflow
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
205 | 0.09 0.40 0.98 13.70 200.82 | 0.04 0.15 0.16 1.08 31.78 | 0.04 0.26 0.82 12.62 169.04 | 190K 270K 257K 316K 52826K
file:///home/johnsoni/pipeline_0.0.39/ACCESS-Pipeline/cwl_tools/expression_tools/group_waltz_files.cwl
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
99 | 0.29 0.49 0.59 2.50 58.11 | 0.14 0.26 0.29 1.04 28.95 | 0.14 0.22 0.29 1.48 29.16 | 135K 135K 135K 136K 13459K
file:///home/johnsoni/pipeline_0.0.39/ACCESS-Pipeline/cwl_tools/expression_tools/make_sample_output_dirs.cwl
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
11 | 0.34 0.52 0.74 2.63 8.18 | 0.20 0.30 0.41 1.17 4.54 | 0.14 0.20 0.33 1.45 3.65 | 136K 136K 136K 136K 1496K
file:///home/johnsoni/pipeline_0.0.39/ACCESS-Pipeline/cwl_tools/expression_tools/consolidate_files.cwl
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
8 | 0.31 0.59 0.71 1.80 5.69 | 0.18 0.35 0.37 0.63 2.94 | 0.13 0.27 0.34 1.17 2.75 | 136K 136K 136K 136K 1091K
file:///home/johnsoni/pipeline_0.0.39/ACCESS-Pipeline/cwl_tools/bwa-mem/bwa-mem.cwl
Count | Time* | Clock | Wait | Memory
n | min med* ave max total | min med ave max total | min med ave max total | min med ave max total
22 | 895.76 3098.13 3587.34 12593.43 78921.51 | 2127.02 7910.31 8123.06 16959.13 178707.34 | -11049.84 -3827.96 -4535.72 19.49 -99785.83 | 5659K 5950K 5854K 6128K 128807K
Understanding toil log files
There is a worker_log.txt file for each job, this file is written to while the job is running, and deleted after the job finishes. The contents are printed to the main log file and transferred to a log file in the --logDir folder once the job is completed successfully.
The new log file will be named something like:
file:<path to cwl tool>.cwl_<job ID>.log file:---home-johnsoni-pipeline_1.1.14-ACCESS--Pipeline-cwl_tools-marianas-ProcessLoopUMIFastq.cwl_I-O-jobfGsQQw000.log
This is the toil job command with spaces replaced by dashes.
Support is still in the alpha phase and should be able to handle basic wdl files. See the specification below for more details.
Recommended best practice when running wdl files is to first use the Broad's wdltool for syntax validation and generating the needed json input file. Full documentation can be found on the repository, and a precompiled jar binary can be downloaded here: wdltool (this requires java7).
That means two steps. First, make sure your wdl file is valid and devoid of syntax errors by running
java -jar wdltool.jar validate example_wdlfile.wdl
Second, generate a complementary json file if your wdl file needs one. This json will contain keys for every necessary input that your wdl file needs to run:
java -jar wdltool.jar inputs example_wdlfile.wdl
When this json template is generated, open the file, and fill in values as necessary by hand. WDL files all require json files to accompany them. If no variable inputs are needed, a json file containing only '{}' may be required.
Once a wdl file is validated and has an appropriate json file, workflows can be run in toil using:
toil-wdl-runner example_wdlfile.wdl example_jsonfile.json
See options below for more parameters.
To follow this example, you will need docker installed. The original workflow can be found here: https://github.com/ENCODE-DCC/pipeline-container
We've included the wdl file and data files in the toil repository needed to run this example. First, download the example code and unzip. The file needed is "testENCODE/encode_mapping_workflow.wdl".
Next, use wdltool (this requires java7) to validate this file:
java -jar wdltool.jar validate encode_mapping_workflow.wdl
Next, use wdltool to generate a json file for this wdl file:
java -jar wdltool.jar inputs encode_mapping_workflow.wdl
This json file once opened should look like this:
{ "encode_mapping_workflow.fastqs": "Array[File]", "encode_mapping_workflow.trimming_parameter": "String", "encode_mapping_workflow.reference": "File" }
The trimming_parameter should be set to 'native'. Download the example code and unzip. Inside are two data files required for the run
ENCODE_data/reference/GRCh38_chr21_bwa.tar.gz ENCODE_data/ENCFF000VOL_chr21.fq.gz
Editing the json to include these as inputs, the json should now look something like this:
{ "encode_mapping_workflow.fastqs": ["/path/to/unzipped/ENCODE_data/ENCFF000VOL_chr21.fq.gz"], "encode_mapping_workflow.trimming_parameter": "native", "encode_mapping_workflow.reference": "/path/to/unzipped/ENCODE_data/reference/GRCh38_chr21_bwa.tar.gz" }
The wdl and json files can now be run using the command
toil-wdl-runner encode_mapping_workflow.wdl encode_mapping_workflow.json
This should deposit the output files in the user's current working directory (to change this, specify a new directory with the '-o' option).
Simple examples of WDL can be found on the Broad's website as tutorials: https://software.broadinstitute.org/wdl/documentation/topic?name=wdl-tutorials.
One can follow along with these tutorials, write their own wdl files following the directions and run them using either cromwell or toil. For example, in tutorial 1, if you've followed along and named your wdl file 'helloHaplotypeCall.wdl', then once you've validated your wdl file using wdltool (this requires java7) using
java -jar wdltool.jar validate helloHaplotypeCaller.wdl
and generated a json file (and subsequently typed in appropriate filepaths* and variables) using
java -jar wdltool.jar inputs helloHaplotypeCaller.wdl
then the wdl script can be run using
toil-wdl-runner helloHaplotypeCaller.wdl helloHaplotypeCaller_inputs.json
'-o' or '-\-outdir': Specifies the output folder, and defaults to the current working directory if not specified by the user.
'-\-dev_mode': Creates "AST.out", which holds a printed AST of the wdl file and "mappings.out", which holds the printed task, workflow, csv, and tsv dictionaries generated by the parser. Also saves the compiled toil python workflow file for debugging.
Any number of arbitrary options may also be specified. These options will not be parsed immediately, but passed down as toil options once the wdl/json files are processed. For valid toil options, see the documentation: http://toil.readthedocs.io/en/latest/running/cliOptions.html
NOTE:
A WDL workflow can be run indirectly in a native Toil script. However, this is not the standard way to run WDL workflows with Toil and doing so comes at the cost of job efficiency. For some use cases, such as running one process on multiple files, it may be useful. For example, if you want to run a WDL workflow with 3 JSON files specifying different samples inputs, it could look something like:
import os import subprocess from toil.common import Toil from toil.job import Job def initialize_jobs(job):
job.fileStore.logToMaster("initialize_jobs") def runQC(job, wdl_file, wdl_filename, json_file, json_filename, outputs_dir, jar_loc,output_num):
job.fileStore.logToMaster("runQC")
tempDir = job.fileStore.getLocalTempDir()
wdl = job.fileStore.readGlobalFile(wdl_file, userPath=os.path.join(tempDir, wdl_filename))
json = job.fileStore.readGlobalFile(json_file, userPath=os.path.join(tempDir, json_filename))
subprocess.check_call(["java","-jar",jar_loc,"run",wdl,"--inputs",json])
output_filename = "output.txt"
output_file = job.fileStore.writeGlobalFile(outputs_dir + output_filename)
job.fileStore.readGlobalFile(output_file, userPath=os.path.join(outputs_dir, "sample_" + output_num + "_" + output_filename))
return output_file if __name__ == "__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
with Toil(options) as toil:
# specify the folder where the wdl and json files live
inputs_dir = "wdlExampleFiles/"
# specify where you wish the outputs to be written
outputs_dir = "wdlExampleFiles/"
# specify the location of your cromwell jar
jar_loc = os.path.abspath("wdlExampleFiles/cromwell-35.jar")
job0 = Job.wrapJobFn(initialize_jobs)
wdl_filename = "hello.wdl"
wdl_file = toil.importFile("file://" + os.path.abspath(os.path.join(inputs_dir, wdl_filename)))
# add list of yml config inputs here or import and construct from file
json_files = ["hello1.json", "hello2.json", "hello3.json"]
i = 0
for json in json_files:
i = i + 1
json_file = toil.importFile("file://" + os.path.join(inputs_dir, json))
json_filename = json
job = Job.wrapJobFn(runQC, wdl_file, wdl_filename, json_file, json_filename, outputs_dir, jar_loc, output_num=str(i))
job0.addChild(job)
toil.start(job0)
WDL language specifications can be found here: https://github.com/broadinstitute/wdl/blob/develop/SPEC.md
Implementing support for more features is currently underway, but a basic roadmap so far is:
This tutorial walks through the features of Toil necessary for developing a workflow using the Toil Python API.
NOTE:
To begin, consider this short toil script which illustrates defining a workflow:
from toil.common import Toil from toil.job import Job def helloWorld(message, memory="2G", cores=2, disk="3G"):
return "Hello, world!, here's a message: %s" % message if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "OFF"
options.clean = "always"
hello_job = Job.wrapFn(helloWorld, "Woot")
with Toil(options) as toil:
print(toil.start(hello_job)) #Prints Hello, world!, ...
The workflow consists of a single job. The resource requirements for that job are (optionally) specified by keyword arguments (memory, cores, disk). The script is run using toil.job.Job.Runner.getDefaultOptions(). Below we explain the components of this code in detail.
The atomic unit of work in a Toil workflow is a Job. User scripts inherit from this base class to define units of work. For example, here is a more long-winded class-based version of the job in the quick start example:
from toil.job import Job class HelloWorld(Job):
def __init__(self, message):
Job.__init__(self, memory="2G", cores=2, disk="3G")
self.message = message
def run(self, fileStore):
return "Hello, world!, here's a message: %s" % self.message
In the example a class, HelloWorld, is defined. The constructor requests 2 gigabytes of memory, 2 cores and 3 gigabytes of local disk to complete the work.
The toil.job.Job.run() method is the function the user overrides to get work done. Here it just logs a message using toil.job.Job.log(), which will be registered in the log output of the leader process of the workflow.
We can add to the previous example to turn it into a complete workflow by adding the necessary function calls to create an instance of HelloWorld and to run this as a workflow containing a single job. This uses the toil.job.Job.Runner class, which is used to start and resume Toil workflows. For example:
from toil.common import Toil from toil.job import Job class HelloWorld(Job):
def __init__(self, message):
Job.__init__(self, memory="2G", cores=2, disk="3G")
self.message = message
def run(self, fileStore):
return "Hello, world!, here's a message: %s" % self.message if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "OFF"
options.clean = "always"
hello_job = HelloWorld("Woot")
with Toil(options) as toil:
print(toil.start(hello_job))
NOTE:
Alternatively, the more powerful toil.common.Toil class can be used to run and resume workflows. It is used as a context manager and allows for preliminary setup, such as staging of files into the job store on the leader node. An instance of the class is initialized by specifying an options object. The actual workflow is then invoked by calling the toil.common.Toil.start() method, passing the root job of the workflow, or, if a workflow is being restarted, toil.common.Toil.restart() should be used. Note that the context manager should have explicit if else branches addressing restart and non restart cases. The boolean value for these if else blocks is toil.options.restart.
For example:
from toil.common import Toil from toil.job import Job class HelloWorld(Job):
def __init__(self, message):
Job.__init__(self, memory="2G", cores=2, disk="3G")
self.message = message
def run(self, fileStore):
self.log("Hello, world!, I have a message: {}".format(self.message)) if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
with Toil(options) as toil:
if not toil.options.restart:
job = HelloWorld("Woot!")
toil.start(job)
else:
toil.restart()
The call to toil.job.Job.Runner.getDefaultOptions() creates a set of default options for the workflow. The only argument is a description of how to store the workflow's state in what we call a job-store. Here the job-store is contained in a directory within the current working directory called "toilWorkflowRun". Alternatively this string can encode other ways to store the necessary state, e.g. an S3 bucket object store location. By default the job-store is deleted if the workflow completes successfully.
The workflow is executed in the final line, which creates an instance of HelloWorld and runs it as a workflow. Note all Toil workflows start from a single starting job, referred to as the root job. The return value of the root job is returned as the result of the completed workflow (see promises below to see how this is a useful feature!).
To allow command line control of the options we can use the toil.job.Job.Runner.getDefaultArgumentParser() method to create a argparse.ArgumentParser object which can be used to parse command line options for a Toil script. For example:
from toil.common import Toil from toil.job import Job class HelloWorld(Job):
def __init__(self, message):
Job.__init__(self, memory="2G", cores=2, disk="3G")
self.message = message
def run(self, fileStore):
return "Hello, world!, here's a message: %s" % self.message if __name__=="__main__":
parser = Job.Runner.getDefaultArgumentParser()
options = parser.parse_args()
options.logLevel = "OFF"
options.clean = "always"
hello_job = HelloWorld("Woot")
with Toil(options) as toil:
print(toil.start(hello_job))
Creates a fully fledged script with all the options Toil exposed as command line arguments. Running this script with "--help" will print the full list of options.
Alternatively an existing argparse.ArgumentParser or optparse.OptionParser object can have Toil script command line options added to it with the toil.job.Job.Runner.addToilOptions() method.
In the event that a workflow fails, either because of programmatic error within the jobs being run, or because of node failure, the workflow can be resumed. Workflows can only not be reliably resumed if the job-store itself becomes corrupt.
Critical to resumption is that jobs can be rerun, even if they have apparently completed successfully. Put succinctly, a user defined job should not corrupt its input arguments. That way, regardless of node, network or leader failure the job can be restarted and the workflow resumed.
To resume a workflow specify the "restart" option in the options object passed to toil.common.Toil.start(). If node failures are expected it can also be useful to use the integer "retryCount" option, which will attempt to rerun a job retryCount number of times before marking it fully failed.
In the common scenario that a small subset of jobs fail (including retry attempts) within a workflow Toil will continue to run other jobs until it can do no more, at which point toil.common.Toil.start() will raise a toil.leader.FailedJobsException exception. Typically at this point the user can decide to fix the script and resume the workflow or delete the job-store manually and rerun the complete workflow.
Defining jobs by creating class definitions generally involves the boilerplate of creating a constructor. To avoid this the classes toil.job.FunctionWrappingJob and toil.job.JobFunctionWrappingTarget allow functions to be directly converted to jobs. For example, the quick start example (repeated here):
from toil.common import Toil from toil.job import Job def helloWorld(message, memory="2G", cores=2, disk="3G"):
return "Hello, world!, here's a message: %s" % message if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "OFF"
options.clean = "always"
hello_job = Job.wrapFn(helloWorld, "Woot")
with Toil(options) as toil:
print(toil.start(hello_job)) #Prints Hello, world!, ...
Is equivalent to the previous example, but using a function to define the job.
The function call:
Job.wrapFn(helloWorld, "Woot")
Creates the instance of the toil.job.FunctionWrappingTarget that wraps the function.
The keyword arguments memory, cores and disk allow resource requirements to be specified as before. Even if they are not included as keyword arguments within a function header they can be passed as arguments when wrapping a function as a job and will be used to specify resource requirements.
We can also use the function wrapping syntax to a job function, a function whose first argument is a reference to the wrapping job. Just like a self argument in a class, this allows access to the methods of the wrapping job, see toil.job.JobFunctionWrappingTarget. For example:
from toil.common import Toil from toil.job import Job def helloWorld(job, message):
job.log("Hello world, I have a message: {}".format(message)) if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
hello_job = Job.wrapJobFn(helloWorld, "Woot!")
with Toil(options) as toil:
toil.start(hello_job)
Here helloWorld() is a job function. It uses the toil.job.Job.log() to log a message that will be printed to the output console. Here the only subtle difference to note is the line:
hello_job = Job.wrapJobFn(helloWorld, "Woot")
Which uses the function toil.job.Job.wrapJobFn() to wrap the job function instead of toil.job.Job.wrapFn() which wraps a vanilla function.
A parent job can have child jobs and follow-on jobs. These relationships are specified by methods of the job class, e.g. toil.job.Job.addChild() and toil.job.Job.addFollowOn().
Considering a set of jobs the nodes in a job graph and the child and follow-on relationships the directed edges of the graph, we say that a job B that is on a directed path of child/follow-on edges from a job A in the job graph is a successor of A, similarly A is a predecessor of B.
A parent job's child jobs are run directly after the parent job has completed, and in parallel. The follow-on jobs of a job are run after its child jobs and their successors have completed. They are also run in parallel. Follow-ons allow the easy specification of cleanup tasks that happen after a set of parallel child tasks. The following shows a simple example that uses the earlier helloWorld() job function:
from toil.common import Toil from toil.job import Job def helloWorld(job, message, memory="2G", cores=2, disk="3G"):
job.log("Hello world, I have a message: {}".format(message)) if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
j1 = Job.wrapJobFn(helloWorld, "first")
j2 = Job.wrapJobFn(helloWorld, "second or third")
j3 = Job.wrapJobFn(helloWorld, "second or third")
j4 = Job.wrapJobFn(helloWorld, "last")
j1.addChild(j2)
j1.addChild(j3)
j1.addFollowOn(j4)
with Toil(options) as toil:
toil.start(j1)
In the example four jobs are created, first j1 is run, then j2 and j3 are run in parallel as children of j1, finally j4 is run as a follow-on of j1.
There are multiple short hand functions to achieve the same workflow, for example:
from toil.common import Toil from toil.job import Job def helloWorld(job, message, memory="2G", cores=2, disk="3G"):
job.log("Hello world, I have a message: {}".format(message)) if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
j1 = Job.wrapJobFn(helloWorld, "first")
j2 = j1.addChildJobFn(helloWorld, "second or third")
j3 = j1.addChildJobFn(helloWorld, "second or third")
j4 = j1.addFollowOnJobFn(helloWorld, "last")
with Toil(options) as toil:
toil.start(j1)
Equivalently defines the workflow, where the functions toil.job.Job.addChildJobFn() and toil.job.Job.addFollowOnJobFn() are used to create job functions as children or follow-ons of an earlier job.
Jobs graphs are not limited to trees, and can express arbitrary directed acyclic graphs. For a precise definition of legal graphs see toil.job.Job.checkJobGraphForDeadlocks(). The previous example could be specified as a DAG as follows:
from toil.common import Toil from toil.job import Job def helloWorld(job, message, memory="2G", cores=2, disk="3G"):
job.log("Hello world, I have a message: {}".format(message)) if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
j1 = Job.wrapJobFn(helloWorld, "first")
j2 = j1.addChildJobFn(helloWorld, "second or third")
j3 = j1.addChildJobFn(helloWorld, "second or third")
j4 = j2.addChildJobFn(helloWorld, "last")
j3.addChild(j4)
with Toil(options) as toil:
toil.start(j1)
Note the use of an extra child edge to make j4 a child of both j2 and j3.
The previous examples show a workflow being defined outside of a job. However, Toil also allows jobs to be created dynamically within jobs. For example:
from toil.common import Toil from toil.job import Job def binaryStringFn(job, depth, message=""):
if depth > 0:
job.addChildJobFn(binaryStringFn, depth-1, message + "0")
job.addChildJobFn(binaryStringFn, depth-1, message + "1")
else:
job.log("Binary string: {}".format(message)) if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
with Toil(options) as toil:
toil.start(Job.wrapJobFn(binaryStringFn, depth=5))
The job function binaryStringFn logs all possible binary strings of length n (here n=5), creating a total of 2^(n+2) - 1 jobs dynamically and recursively. Static and dynamic creation of jobs can be mixed in a Toil workflow, with jobs defined within a job or job function being created at run time.
The previous example of dynamic job creation shows variables from a parent job being passed to a child job. Such forward variable passing is naturally specified by recursive invocation of successor jobs within parent jobs. This can also be achieved statically by passing around references to the return variables of jobs. In Toil this is achieved with promises, as illustrated in the following example:
from toil.common import Toil from toil.job import Job def fn(job, i):
job.log("i is: %s" % i, level=100)
return i+1 if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
j1 = Job.wrapJobFn(fn, 1)
j2 = j1.addChildJobFn(fn, j1.rv())
j3 = j1.addFollowOnJobFn(fn, j2.rv())
with Toil(options) as toil:
toil.start(j1)
Running this workflow results in three log messages from the jobs: i is 1 from j1, i is 2 from j2 and i is 3 from j3.
The return value from the first job is promised to the second job by the call to toil.job.Job.rv() in the following line:
j2 = j1.addChildFn(fn, j1.rv())
The value of j1.rv() is a promise, rather than the actual return value of the function, because j1 for the given input has at that point not been evaluated. A promise (toil.job.Promise) is essentially a pointer to for the return value that is replaced by the actual return value once it has been evaluated. Therefore, when j2 is run the promise becomes 2.
Promises also support indexing of return values:
def parent(job):
indexable = Job.wrapJobFn(fn)
job.addChild(indexable)
job.addFollowOnFn(raiseWrap, indexable.rv(2)) def raiseWrap(arg):
raise RuntimeError(arg) # raises "2" def fn(job):
return (0, 1, 2, 3)
Promises can be quite useful. For example, we can combine dynamic job creation with promises to achieve a job creation process that mimics the functional patterns possible in many programming languages:
from toil.common import Toil from toil.job import Job def binaryStrings(job, depth, message=""):
if depth > 0:
s = [ job.addChildJobFn(binaryStrings, depth-1, message + "0").rv(),
job.addChildJobFn(binaryStrings, depth-1, message + "1").rv() ]
return job.addFollowOnFn(merge, s).rv()
return [message] def merge(strings):
return strings[0] + strings[1] if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.loglevel = "OFF"
options.clean = "always"
with Toil(options) as toil:
print(toil.start(Job.wrapJobFn(binaryStrings, depth=5)))
The return value l of the workflow is a list of all binary strings of length 10, computed recursively. Although a toy example, it demonstrates how closely Toil workflows can mimic typical programming patterns.
Promised requirements are a special case of Promises that allow a job's return value to be used as another job's resource requirements.
This is useful when, for example, a job's storage requirement is determined by a file staged to the job store by an earlier job:
import os from toil.common import Toil from toil.job import Job, PromisedRequirement def parentJob(job):
downloadJob = Job.wrapJobFn(stageFn, "File://"+os.path.realpath(__file__), cores=0.1, memory='32M', disk='1M')
job.addChild(downloadJob)
analysis = Job.wrapJobFn(analysisJob, fileStoreID=downloadJob.rv(0),
disk=PromisedRequirement(downloadJob.rv(1)))
job.addFollowOn(analysis) def stageFn(job, url, cores=1):
importedFile = job.fileStore.importFile(url)
return importedFile, importedFile.size def analysisJob(job, fileStoreID, cores=2):
# now do some analysis on the file
pass if __name__ == "__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
with Toil(options) as toil:
toil.start(Job.wrapJobFn(parentJob))
Note that this also makes use of the size attribute of the FileID object. This promised requirements mechanism can also be used in combination with an aggregator for multiple jobs' output values:
def parentJob(job):
aggregator = []
for fileNum in range(0,10):
downloadJob = Job.wrapJobFn(stageFn, "File://"+os.path.realpath(__file__), cores=0.1, memory='32M', disk='1M')
job.addChild(downloadJob)
aggregator.append(downloadJob)
analysis = Job.wrapJobFn(analysisJob, fileStoreID=downloadJob.rv(0),
disk=PromisedRequirement(lambda xs: sum(xs), [j.rv(1) for j in aggregator]))
job.addFollowOn(analysis)
Just like regular promises, the return value must be determined prior to scheduling any job that depends on the return value. In our example above, notice how the dependent jobs were follow ons to the parent while promising jobs are children of the parent. This ordering ensures that all promises are properly fulfilled.
The toil.fileStore.FileID class is a small wrapper around Python's builtin string class. It is used to represent a file's ID in the file store, and has a size attribute that is the file's size in bytes. This object is returned by importFile and writeGlobalFile.
It is frequently the case that a workflow will want to create files, both persistent and temporary, during its run. The toil.fileStores.abstractFileStore.AbstractFileStore class is used by jobs to manage these files in a manner that guarantees cleanup and resumption on failure.
The toil.job.Job.run() method has a file store instance as an argument. The following example shows how this can be used to create temporary files that persist for the length of the job, be placed in a specified local disk of the node and that will be cleaned up, regardless of failure, when the job finishes:
from toil.common import Toil from toil.job import Job class LocalFileStoreJob(Job):
def run(self, fileStore):
# self.TempDir will always contain the name of a directory within the allocated disk space reserved for the job
scratchDir = self.tempDir
# Similarly create a temporary file.
scratchFile = fileStore.getLocalTempFile() if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
# Create an instance of FooJob which will have at least 2 gigabytes of storage space.
j = LocalFileStoreJob(disk="2G")
#Run the workflow
with Toil(options) as toil:
toil.start(j)
Job functions can also access the file store for the job. The equivalent of the LocalFileStoreJob class is
def localFileStoreJobFn(job):
scratchDir = job.tempDir
scratchFile = job.fileStore.getLocalTempFile()
Note that the fileStore attribute is accessed as an attribute of the job argument.
In addition to temporary files that exist for the duration of a job, the file store allows the creation of files in a global store, which persists during the workflow and are globally accessible (hence the name) between jobs. For example:
import os from toil.common import Toil from toil.job import Job def globalFileStoreJobFn(job):
job.log("The following example exercises all the methods provided"
" by the toil.fileStores.abstractFileStore.AbstractFileStore class")
# Create a local temporary file.
scratchFile = job.fileStore.getLocalTempFile()
# Write something in the scratch file.
with open(scratchFile, 'w') as fH:
fH.write("What a tangled web we weave")
# Write a copy of the file into the file-store; fileID is the key that can be used to retrieve the file.
# This write is asynchronous by default
fileID = job.fileStore.writeGlobalFile(scratchFile)
# Write another file using a stream; fileID2 is the
# key for this second file.
with job.fileStore.writeGlobalFileStream(cleanup=True) as (fH, fileID2):
fH.write(b"Out brief candle")
# Now read the first file; scratchFile2 is a local copy of the file that is read-only by default.
scratchFile2 = job.fileStore.readGlobalFile(fileID)
# Read the second file to a desired location: scratchFile3.
scratchFile3 = os.path.join(job.tempDir, "foo.txt")
job.fileStore.readGlobalFile(fileID2, userPath=scratchFile3)
# Read the second file again using a stream.
with job.fileStore.readGlobalFileStream(fileID2) as fH:
print(fH.read()) #This prints "Out brief candle"
# Delete the first file from the global file-store.
job.fileStore.deleteGlobalFile(fileID)
# It is unnecessary to delete the file keyed by fileID2 because we used the cleanup flag,
# which removes the file after this job and all its successors have run (if the file still exists) if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
with Toil(options) as toil:
toil.start(Job.wrapJobFn(globalFileStoreJobFn))
The example demonstrates the global read, write and delete functionality of the file-store, using both local copies of the files and streams to read and write the files. It covers all the methods provided by the file store interface.
What is obvious is that the file-store provides no functionality to update an existing "global" file, meaning that files are, barring deletion, immutable. Also worth noting is that there is no file system hierarchy for files in the global file store. These limitations allow us to fairly easily support different object stores and to use caching to limit the amount of network file transfer between jobs.
External files can be imported into or exported out of the job store prior to running a workflow when the toil.common.Toil context manager is used on the leader. The context manager provides methods toil.common.Toil.importFile(), and toil.common.Toil.exportFile() for this purpose. The destination and source locations of such files are described with URLs passed to the two methods. A list of the currently supported URLs can be found at toil.jobStores.abstractJobStore.AbstractJobStore.importFile(). To import an external file into the job store as a shared file, pass the optional sharedFileName parameter to that method.
If a workflow fails for any reason an imported file acts as any other file in the job store. If the workflow was configured such that it not be cleaned up on a failed run, the file will persist in the job store and needs not be staged again when the workflow is resumed.
Example:
import os from toil.common import Toil from toil.job import Job class HelloWorld(Job):
def __init__(self, id):
Job.__init__(self, memory="2G", cores=2, disk="3G")
self.inputFileID = id
def run(self, fileStore):
with self.fileStore.readGlobalFileStream(self.inputFileID) as fi:
with self.fileStore.writeGlobalFileStream() as (fo, outputFileID):
fo.write(fi.read() + 'World!')
return outputFileID if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
with Toil(options) as toil:
if not toil.options.restart:
ioFileDirectory = os.path.join(os.path.dirname(os.path.abspath(__file__)), "stagingExampleFiles")
inputFileID = toil.importFile("file://" + os.path.abspath(os.path.join(ioFileDirectory, "in.txt")))
outputFileID = toil.start(HelloWorld(inputFileID))
else:
outputFileID = toil.restart()
toil.exportFile(outputFileID, "file://" + os.path.abspath(os.path.join(ioFileDirectory, "out.txt")))
Docker containers are commonly used with Toil. The combination of Toil and Docker allows for pipelines to be fully portable between any platform that has both Toil and Docker installed. Docker eliminates the need for the user to do any other tool installation or environment setup.
In order to use Docker containers with Toil, Docker must be installed on all workers of the cluster. Instructions for installing Docker can be found on the Docker website.
When using Toil-based autoscaling, Docker will be automatically set up on the cluster's worker nodes, so no additional installation steps are necessary. Further information on using Toil-based autoscaling can be found in the Autoscaling documentation.
In order to use docker containers in a Toil workflow, the container can be built locally or downloaded in real time from an online docker repository like Quay. If the container is not in a repository, the container's layers must be accessible on each node of the cluster.
When invoking docker containers from within a Toil workflow, it is strongly recommended that you use dockerCall(), a toil job function provided in toil.lib.docker. dockerCall leverages docker's own python API, and provides container cleanup on job failure. When docker containers are run without this feature, failed jobs can result in resource leaks. Docker's API can be found at docker-py.
In order to use dockerCall, your installation of Docker must be set up to run without sudo. Instructions for setting this up can be found here.
An example of a basic dockerCall is below:
dockerCall(job=job,
tool='quay.io/ucsc_cgl/bwa',
workDir=job.tempDir,
parameters=['index', '/data/reference.fa'])
Note the assumption that reference.fa file is located in /data. This is Toil's standard convention as a mount location to reduce boilerplate when calling dockerCall. Users can choose their own mount locations by supplying a volumes kwarg to dockerCall, such as: volumes={working_dir: {'bind': '/data', 'mode': 'rw'}}, where working_dir is an absolute path on the user's filesystem.
dockerCall can also be added to workflows like any other job function:
import os from toil.common import Toil from toil.job import Job from toil.lib.docker import apiDockerCall align = Job.wrapJobFn(apiDockerCall,
image='ubuntu',
working_dir=os.getcwd(),
parameters=['ls', '-lha']) if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
with Toil(options) as toil:
toil.start(align)
cgl-docker-lib contains dockerCall-compatible Dockerized tools that are commonly used in bioinformatics analysis.
The documentation provides guidelines for developing your own Docker containers that can be used with Toil and dockerCall. In order for a container to be compatible with dockerCall, it must have an ENTRYPOINT set to a wrapper script, as described in cgl-docker-lib containerization standards. This can be set by passing in the optional keyword argument, 'entrypoint'. Example:
dockerCall supports currently the 75 keyword arguments found in the python Docker API, under the 'run' command.
It is sometimes desirable to run services, such as a database or server, concurrently with a workflow. The toil.job.Job.Service class provides a simple mechanism for spawning such a service within a Toil workflow, allowing precise specification of the start and end time of the service, and providing start and end methods to use for initialization and cleanup. The following simple, conceptual example illustrates how services work:
from toil.common import Toil from toil.job import Job class DemoService(Job.Service):
def start(self, fileStore):
# Start up a database/service here
# Return a value that enables another process to connect to the database
return "loginCredentials"
def check(self):
# A function that if it returns False causes the service to quit
# If it raises an exception the service is killed and an error is reported
return True
def stop(self, fileStore):
# Cleanup the database here
pass j = Job() s = DemoService() loginCredentialsPromise = j.addService(s) def dbFn(loginCredentials):
# Use the login credentials returned from the service's start method to connect to the service
pass j.addChildFn(dbFn, loginCredentialsPromise) if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
with Toil(options) as toil:
toil.start(j)
In this example the DemoService starts a database in the start method, returning an object from the start method indicating how a client job would access the database. The service's stop method cleans up the database, while the service's check method is polled periodically to check the service is alive.
A DemoService instance is added as a service of the root job j, with resource requirements specified. The return value from toil.job.Job.addService() is a promise to the return value of the service's start method. When the promised is fulfilled it will represent how to connect to the database. The promise is passed to a child job of j, which uses it to make a database connection. The services of a job are started before any of its successors have been run and stopped after all the successors of the job have completed successfully.
Multiple services can be created per job, all run in parallel. Additionally, services can define sub-services using toil.job.Job.Service.addChild(). This allows complex networks of services to be created, e.g. Apache Spark clusters, within a workflow.
Services complicate resuming a workflow after failure, because they can create complex dependencies between jobs. For example, consider a service that provides a database that multiple jobs update. If the database service fails and loses state, it is not clear that just restarting the service will allow the workflow to be resumed, because jobs that created that state may have already finished. To get around this problem Toil supports checkpoint jobs, specified as the boolean keyword argument checkpoint to a job or wrapped function, e.g.:
j = Job(checkpoint=True)
A checkpoint job is rerun if one or more of its successors fails its retry attempts, until it itself has exhausted its retry attempts. Upon restarting a checkpoint job all its existing successors are first deleted, and then the job is rerun to define new successors. By checkpointing a job that defines a service, upon failure of the service the database and the jobs that access the service can be redefined and rerun.
To make the implementation of checkpoint jobs simple, a job can only be a checkpoint if when first defined it has no successors, i.e. it can only define successors within its run method.
Let A be a root job potentially with children and follow-ons. Without an encapsulated job the simplest way to specify a job B which runs after A and all its successors is to create a parent of A, call it Ap, and then make B a follow-on of Ap. e.g.:
from toil.common import Toil from toil.job import Job if __name__=="__main__":
# A is a job with children and follow-ons, for example:
A = Job()
A.addChild(Job())
A.addFollowOn(Job())
# B is a job which needs to run after A and its successors
B = Job()
# The way to do this without encapsulation is to make a parent of A, Ap, and make B a follow-on of Ap.
Ap = Job()
Ap.addChild(A)
Ap.addFollowOn(B)
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
with Toil(options) as toil:
print(toil.start(Ap))
An encapsulated job E(A) of A saves making Ap, instead we can write:
from toil.common import Toil from toil.job import Job if __name__=="__main__":
# A
A = Job()
A.addChild(Job())
A.addFollowOn(Job())
# Encapsulate A
A = A.encapsulate()
# B is a job which needs to run after A and its successors
B = Job()
# With encapsulation A and its successor subgraph appear to be a single job, hence:
A.addChild(B)
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
options.clean = "always"
with Toil(options) as toil:
print(toil.start(A))
Note the call to toil.job.Job.encapsulate() creates the toil.job.Job.EncapsulatedJob.
If you are packing your workflow(s) as a pip-installable distribution on PyPI, you might be tempted to declare Toil as a dependency in your setup.py, via the install_requires keyword argument to setup(). Unfortunately, this does not work, for two reasons: For one, Toil uses Setuptools' extra mechanism to manage its own optional dependencies. If you explicitly declared a dependency on Toil, you would have to hard-code a particular combination of extras (or no extras at all), robbing the user of the choice what Toil extras to install. Secondly, and more importantly, declaring a dependency on Toil would only lead to Toil being installed on the leader node of a cluster, but not the worker nodes. Auto-deployment does not work here because Toil cannot auto-deploy itself, the classic "Which came first, chicken or egg?" problem.
In other words, you shouldn't explicitly depend on Toil. Document the dependency instead (as in "This workflow needs Toil version X.Y.Z to be installed") and optionally add a version check to your setup.py. Refer to the check_version() function in the toil-lib project's setup.py for an example. Alternatively, you can also just depend on toil-lib and you'll get that check for free.
If your workflow depends on a dependency of Toil, consider not making that dependency explicit either. If you do, you risk a version conflict between your project and Toil. The pip utility may silently ignore that conflict, breaking either Toil or your workflow. It is safest to simply assume that Toil installs that dependency for you. The only downside is that you are locked into the exact version of that dependency that Toil declares. But such is life with Python, which, unlike Java, has no means of dependencies belonging to different software components within the same process, and whose favored software distribution utility is incapable of properly resolving overlapping dependencies and detecting conflicts.
Computational Genomics Lab's Dockstore based production system provides workflow authors a way to run Dockerized versions of their pipeline in an automated, scalable fashion. To be compatible with this system of a workflow should meet the following requirements. In addition to the Docker container, a common workflow language descriptor file is needed. For inputs:
For outputs:
The Toil class configures and starts a Toil run.
See toil.jobStores.abstractJobStore.AbstractJobStore.importFile() for a full description
See toil.jobStores.abstractJobStore.AbstractJobStore.exportFile() for a full description
Overwriting the current contents of pid.log is a feature, not a bug of this method. Other methods will rely on always having the most current pid available. So far there is no reason to store any old pids.
The job store interface is an abstraction layer that that hides the specific details of file storage, for example standard file systems, S3, etc. The AbstractJobStore API is implemented to support a give file store, e.g. S3. Implement this API to support a new file store.
JobStores are responsible for storing toil.job.JobDescription (which relate jobs to each other) and files.
Actual toil.job.Job objects are stored in files, referenced by JobDescriptions. All the non-file CRUD methods the JobStore provides deal in JobDescriptions and not full, executable Jobs.
To actually get ahold of a toil.job.Job, use toil.job.Job.loadJob() with a JobStore and the relevant JobDescription.
Raises an exception if the root job hasn't fulfilled its promise yet.
Currently supported schemes are:
Refer to AbstractJobStore.importFile() documentation for currently supported URL schemes.
Note that the helper method _exportFile is used to read from the source and write to destination. To implement any optimizations that circumvent this, the _exportFile method should be overridden by subclasses of AbstractJobStore.
Files associated with the assigned ID will be accepted even if the JobDescription has never been created or updated.
Returns a publicly accessible URL to the given file in the job store. The returned URL starts with 'http:', 'https:' or 'file:'. The returned URL may expire as early as 1h after its been returned. Throw an exception if the file does not exist.
May declare the job to have failed (see toil.job.JobDescription.setupJobAfterFailure()) if there is evidence of a failed update attempt.
This operation is idempotent, i.e. deleting a job twice or deleting a non-existent job will succeed silently.
FIXME: some implementations may not raise this
FIXME: some implementations may not raise this
The file at the given local path may not be modified after this method returns!
Note that job stores which encrypt files might return overestimates of file sizes, since the encrypted file may have been padded to the nearest block, augmented with an initialization vector, etc.
Functions to wrap jobs and return values (promises).
The subclass of Job for wrapping user functions.
The keywords memory, cores, disk, preemptable and checkpoint are reserved keyword arguments that if specified will be used to determine the resources required for the job, as toil.job.Job.__init__(). If they are keyword arguments to the function they will be extracted from the function definition, but may be overridden by the user (as you would expect).
The subclass of FunctionWrappingJob for wrapping user job functions.
To enable the job function to get access to the toil.fileStores.abstractFileStore.AbstractFileStore instance (see toil.job.Job.run()), it is made a variable of the wrapping job called fileStore.
To specify a job's resource requirements the following default keyword arguments can be specified:
For example to wrap a function into a job we would call:
Job.wrapJobFn(myJob, memory='100k', disk='1M', cores=0.1)
The subclass of Job for encapsulating a job, allowing a subgraph of jobs to be treated as a single job.
Let A be the root job of a job subgraph and B be another job we'd like to run after A and all its successors have completed, for this use encapsulate:
# Job A and subgraph, Job B A, B = A(), B() Aprime = A.encapsulate() Aprime.addChild(B) # B will run after A and all its successors have completed, A and its subgraph of # successors in effect appear to be just one job.
If the job being encapsulated has predecessors (e.g. is not the root job), then the encapsulated job will inherit these predecessors. If predecessors are added to the job being encapsulated after the encapsulated job is created then the encapsulating job will NOT inherit these predecessors automatically. Care should be exercised to ensure the encapsulated job has the proper set of predecessors.
The return value of an encapsulatd job (as accessed by the toil.job.Job.rv() function) is the return value of the root job, e.g. A().encapsulate().rv() and A().rv() will resolve to the same value after A or A.encapsulate() has been run.
The toil.job.Job.Service.start() method of the service will be called after the run method has completed but before any successors are run. The service's toil.job.Job.Service.stop() method will be called once the successors of the job have been run.
Services allow things like databases and servers to be started and accessed by jobs in a workflow.
The class used to reference return values of jobs/services not yet run/started.
Let T be a job. Instances of Promise (termed a promise) are returned by T.rv(), which is used to reference the return value of T's run function. When the promise is passed to the constructor (or as an argument to a wrapped function) of a different, successor job the promise will be replaced by the actual referenced return value. This mechanism allows a return values from one job's run method to be input argument to job before the former job's run function has been executed.
Use when resource requirements depend on the return value of a parent function. PromisedRequirements can be modified by passing a function that takes the Promise as input.
For example, let f, g, and h be functions. Then a Toil workflow can be defined as follows:: A = Job.wrapFn(f) B = A.addChildFn(g, cores=PromisedRequirement(A.rv()) C = B.addChildFn(h, cores=PromisedRequirement(lambda x: 2*x, B.rv()))
Jobs are the units of work in Toil which are composed into workflows.
The toil.job.Job.Service.start() method of the service will be called after the run method has completed but before any successors are run. The service's toil.job.Job.Service.stop() method will be called once the successors of the job have been run.
Services allow things like databases and servers to be started and accessed by jobs in a workflow.
See toil.job.Job.checkJobGraphConnected(), toil.job.Job.checkJobGraphAcyclic() and toil.job.Job.checkNewCheckpointsAreLeafVertices() for more info.
A root is a job with no predecessors.
Only deals with jobs created here, rather than loaded from the job store.
:rtype : set of Job objects with no predecessors (i.e. which are not children, follow-ons, or services)
As execution always starts from one root job, having multiple root jobs will cause a deadlock to occur.
Only deals with jobs created here, rather than loaded from the job store.
A follow-on edge (A, B) between two jobs A and B is equivalent to adding a child edge to B from (1) A, (2) from each child of A, and (3) from the successors of each child of A. We call each such edge an edge an "implied" edge. The augmented job graph is a job graph including all the implied edges.
For a job graph G = (V, E) the algorithm is O(|V|^2). It is O(|V| + |E|) for a graph with no follow-ons. The former follow-on case could be improved!
Only deals with jobs created here, rather than loaded from the job store.
A job is a leaf it is has no successors.
A checkpoint job must be a leaf when initially added to the job graph. When its run method is invoked it can then create direct successors. This restriction is made to simplify implementation.
Only works on connected components of jobs not yet added to the JobStore.
Examples for deferred functions are ones that handle cleanup of resources external to Toil, like Docker containers, files outside the work directory, etc.
Only considers jobs in this job's subgraph that are newly added, not loaded from the job store.
Ignores service jobs.
The Job's JobDescription must have already had a real jobStoreID assigned to it.
Does not save the JobDescription.
The Runner contains the methods needed to configure and start a Toil run.
The AbstractFileStore is an abstraction of a Toil run's shared storage.
Also provides the interface to other Toil facilities used by user code, including:
Stores user files in the jobStore, but keeps them separate from actual jobs.
May implement caching.
Passed as argument to the toil.job.Job.run() method.
Access to files is only permitted inside the context manager provided by toil.fileStores.abstractFileStore.AbstractFileStore.open().
Also responsible for committing completed jobs back to the job store with an update operation, and allowing that commit operation to be waited for.
This is a destructive operation and it is important to ensure that there are no other running processes on the system that are modifying or using the file store for this workflow.
This is the intended to be the last call to the file store in a Toil run, called by the batch system cleanup function upon batch system shutdown.
Implementations must only yield from within with super().open(job):.
If an executable file on the local filesystem is uploaded, its executability will be preserved when it is downloaded again.
Must be called by readGlobalFile() and readGlobalFileStream() implementations.
If a user path is specified, it is used as the destination. If a user path isn't specified, the file is stored in the local temp directory with an encoded name.
The destination file must not be deleted by the user; it can only be deleted through deleteLocalFile.
Implementations must call logAccess() to report the download.
Implementations must call logAccess() to report the download.
If a FileID or something else with a non-None 'size' field, gets that.
Otherwise, asks the job store to poll the file's size.
Note that the job store may overestimate the file's size, for example if it is encrypted and had to be augmented with an IV or other encryption framing.
Raises an OSError with an errno of errno.ENOENT if no such local copies exist. Thus, cannot be called multiple times in succession.
The files deleted are all those previously read from this file ID via readGlobalFile by the current job into the job's file-store-provided temp directory, plus the file that was written to create the given file ID, if it was written by the current job from the job's file-store-provided temp directory.
May start an asynchronous process. Call waitForCommit() to wait on that process.
Might be called when startCommit is never called on a particular instance, in which case it does not block.
This is intended to be called on batch system shutdown.
Calls into the file store can use bare strings; size will be queried from the job store if unavailable in the ID.
The batch system interface is used by Toil to abstract over different ways of running batches of jobs, for example Slurm, GridEngine, Mesos, Parasol and a single node. The toil.batchSystems.abstractBatchSystem.AbstractBatchSystem API is implemented to run jobs using a given job management system, e.g. Mesos.
Environmental variables allow passing of scheduler specific parameters.
For SLURM:
export TOIL_SLURM_ARGS="-t 1:00:00 -q fatq"
For TORQUE there are two environment variables - one for everything but the resource requirements, and another - for resources requirements (without the -l prefix):
export TOIL_TORQUE_ARGS="-q fatq" export TOIL_TORQUE_REQS="walltime=1:00:00"
For GridEngine (SGE, UGE), there is an additional environmental variable to define the parallel environment for running multicore jobs:
export TOIL_GRIDENGINE_PE='smp' export TOIL_GRIDENGINE_ARGS='-q batch.q'
For HTCondor, additional parameters can be included in the submit file passed to condor_submit:
export TOIL_HTCONDOR_PARAMS='requirements = TARGET.has_sse4_2 == true; accounting_group = test'
The environment variable is parsed as a semicolon-separated string of parameter = value pairs.
Note to implementors: If your implementation returns True here, it should also override
:param jobDesc a toil.job.JobDescription
Does not return info for jobs killed by killBatchJobs, although they may cause None to be returned earlier than maxWait.
If no useful message is available, return None.
This can be used to report what resource is the limiting factor when scheduling jobs, for example. If the leader thinks the workflow is stuck, the message can be displayed to the user to help them diagnose why it might be stuck.
If no value is provided it will be looked up from the current environment.
Can be used to ask the Toil worker to do things in-process (such as configuring environment variables, hot-deploying user scripts, or cleaning up a node) that would otherwise require a wrapping "executor" process.
The Service class allows databases and servers to be spawned within a Toil workflow.
Should be subclassed by the user to define services.
Is not executed as a job; runs within a ServiceHostJob.
Toil specific exceptions.
Test make targets, invoked as $ make <target>, subject to which environment variables are set (see Running Integration Tests).
TARGET | DESCRIPTION |
test | Invokes all tests. |
integration_test | Invokes only the integration tests. |
test_offline | Skips building the Docker appliance and only invokes tests that have no docker dependencies. |
integration_test_local | Makes integration tests easier to debug locally by running the integration tests serially and doesn't redirect output. This makes it appears on the terminal as expected. |
Before running tests for the first time, initialize your virtual environment following the steps in buildFromSource.
Run all tests (including slow tests):
$ make test
Run only quick tests (as of Jul 25, 2018, this was ~ 20 minutes):
$ export TOIL_TEST_QUICK=True; make test
Run an individual test with:
$ make test tests=src/toil/test/sort/sortTest.py::SortTest::testSort
The default value for tests is "src" which includes all tests in the src/ subdirectory of the project root. Tests that require a particular feature will be skipped implicitly. If you want to explicitly skip tests that depend on a currently installed feature, use
$ make test tests="-m 'not aws' src"
This will run only the tests that don't depend on the aws extra, even if that extra is currently installed. Note the distinction between the terms feature and extra. Every extra is a feature but there are features that are not extras, such as the gridengine and parasol features. To skip tests involving both the parasol feature and the aws extra, use the following:
$ make test tests="-m 'not aws and not parasol' src"
Often it is simpler to use pytest directly, instead of calling the make wrapper. This usually works as expected, but some tests need some manual preparation. To run a specific test with pytest, use the following:
python -m pytest src/toil/test/sort/sortTest.py::SortTest::testSort
For more information, see the pytest documentation.
These tests are generally only run using in our CI workflow due to their resource requirements and cost. However, they can be made available for local testing:
$ make push_docker
$ export TOIL_TEST_INTEGRATIVE=True
$ export TOIL_X_KEYNAME=[Your Keyname] $ export TOIL_X_ZONE=[Desired Zone]
Where X is one of our currently supported cloud providers (GCE, AWS).
TOIL_TEST_TEMP | An absolute path to a directory where Toil tests will write their temporary files. Defaults to the system's standard temporary directory. |
TOIL_TEST_INTEGRATIVE | If True, this allows the integration tests to run. Only valid when running the tests from the source directory via make test or make test_parallel. |
TOIL_AWS_KEYNAME | An AWS keyname (see prepareAWS), which is required to run the AWS tests. |
TOIL_GOOGLE_PROJECTID | A Google Cloud account projectID (see runningGCE), which is required to to run the Google Cloud tests. |
TOIL_TEST_QUICK | If True, long running tests are skipped. |
Some tests may fail with an ImportError if the required extras are not installed. Install Toil with all of the extras do prevent such errors.
Docker is needed for some of the tests. Follow the appropriate installation instructions for your system on their website to get started.
When running make test you might still get the following error:
$ make test Please set TOIL_DOCKER_REGISTRY, e.g. to quay.io/USER.
To solve, make an account with Quay and specify it like so:
$ TOIL_DOCKER_REGISTRY=quay.io/USER make test
where USER is your Quay username.
For convenience you may want to add this variable to your bashrc by running
$ echo 'export TOIL_DOCKER_REGISTRY=quay.io/USER' >> $HOME/.bashrc
If you're running Toil's Mesos tests, be sure to create the virtualenv with --system-site-packages to include the Mesos Python bindings. Verify this by activating the virtualenv and running pip list | grep mesos. On macOS, this may come up empty. To fix it, run the following:
for i in /usr/local/lib/python2.7/site-packages/*mesos*; do ln -snf $i venv/lib/python2.7/site-packages/; done
To develop on features reliant on the Toil Appliance (the docker image toil uses for AWS autoscaling), you should consider setting up a personal registry on Quay or Docker Hub. Because the Toil Appliance images are tagged with the Git commit they are based on and because only commits on our master branch trigger an appliance build on Quay, as soon as a developer makes a commit or dirties the working copy they will no longer be able to rely on Toil to automatically detect the proper Toil Appliance image. Instead, developers wishing to test any appliance changes in autoscaling should build and push their own appliance image to a personal Docker registry. This is described in the next section.
Note! Toil checks if the docker image specified by TOIL_APPLIANCE_SELF exists prior to launching by using the docker v2 schema. This should be valid for any major docker repository, but there is an option to override this if desired using the option: -\-forceDockerAppliance.
Here is a general workflow (similar instructions apply when using Docker Hub):
$ make docker
to automatically build a docker image that can now be uploaded to your personal Quay account. If you have not installed Toil source code yet see buildFromSource.
export TOIL_DOCKER_REGISTRY=quay.io/<MY_QUAY_USERNAME>
to your .bashrc or equivalent.
$ make push_docker
which will upload the docker image to your Quay account. Take note of the image's tag for the next step.
The Toil Appliance container can also be useful as a test environment since it can simulate a Toil cluster locally. An important caveat for this is autoscaling, since autoscaling will only work on an EC2 instance and cannot (at this time) be run on a local machine.
To spin up a local cluster, start by using the following Docker run command to launch a Toil leader container:
docker run \
--entrypoint=mesos-master \
--net=host \
-d \
--name=leader \
--volume=/home/jobStoreParentDir:/jobStoreParentDir \
quay.io/ucsc_cgl/toil:3.6.0 \
--registry=in_memory \
--ip=127.0.0.1 \
--port=5050 \
--allocation_interval=500ms
A couple notes on this command: the -d flag tells Docker to run in daemon mode so the container will run in the background. To verify that the container is running you can run docker ps to see all containers. If you want to run your own container rather than the official UCSC container you can simply replace the quay.io/ucsc_cgl/toil:3.6.0 parameter with your own container name.
Also note that we are not mounting the job store directory itself, but rather the location where the job store will be written. Due to complications with running Docker on MacOS, I recommend only mounting directories within your home directory. The next command will launch the Toil worker container with similar parameters:
docker run \
--entrypoint=mesos-slave \
--net=host \
-d \
--name=worker \
--volume=/home/jobStoreParentDir:/jobStoreParentDir \
quay.io/ucsc_cgl/toil:3.6.0 \
--work_dir=/var/lib/mesos \
--master=127.0.0.1:5050 \
--ip=127.0.0.1 \
—-attributes=preemptable:False \
--resources=cpus:2
Note here that we are specifying 2 CPUs and a non-preemptable worker. We can easily change either or both of these in a logical way. To change the number of cores we can change the 2 to whatever number you like, and to change the worker to be preemptable we change preemptable:False to preemptable:True. Also note that the same volume is mounted into the worker. This is needed since both the leader and worker write and read from the job store. Now that your cluster is running, you can run
docker exec -it leader bash
to get a shell in your leader 'node'. You can also replace the leader parameter with worker to get shell access in your worker.
If you want to run Docker inside this Docker cluster (Dockerized tools, perhaps), you should also mount in the Docker socket via -v /var/run/docker.sock:/var/run/docker.sock. This will give the Docker client inside the Toil Appliance access to the Docker engine on the host. Client/engine version mismatches have been known to cause issues, so we recommend using Docker version 1.12.3 on the host to be compatible with the Docker client installed in the Appliance. Finally, be careful where you write files inside the Toil Appliance - 'child' Docker containers launched in the Appliance will actually be siblings to the Appliance since the Docker engine is located on the host. This means that the 'child' container can only mount in files from the Appliance if the files are located in a directory that was originally mounted into the Appliance from the host - that way the files are accessible to the sibling container. Note: if Docker can't find the file/directory on the host it will silently fail and mount in an empty directory.
In general, as developers and maintainers of the code, we adhere to the following guidelines:
Say there is an issue numbered #123 titled Foo does not work. The branch name would be issues/123-fix-foo and the title of the commit would be Fix foo in case of bar (resolves #123).
./contrib/admin/test-pr theirusername their-branch issues/123-fix-description-here
This must be repeated every time the PR submitter updates their PR, after checking to see that the update is not malicious.
If there is no issue corresponding to the PR, after which the branch can be named, the reviewer of the PR should first create the issue.
Developers who have push access to the main Toil repository are encouraged to make their pull requests from within the repository, to avoid this step.
When squashing a PR from multiple authors, please add Co-authored-by to give credit to all contributing authors.
See Issue #2816 for more details.
These are the steps to take to publish a Toil release:
baseVersion = 'X.Y.0a1'
Make it look like this instead:
baseVersion = 'X.Y.Z'
Commit your change to the branch.
baseVersion = 'X.Y+1.0a1'
Make sure to replace X and Y+1 with actual numbers.
See toil.lib.retry .
retry() can be used to decorate any function based on the list of errors one wishes to retry on.
This list of errors can contain normal Exception objects, and/or RetryCondition objects wrapping Exceptions to include additional conditions.
For example, retrying on a one Exception (HTTPError):
from requests import get from requests.exceptions import HTTPError @retry(errors=[HTTPError]) def update_my_wallpaper():
return get('https://www.deviantart.com/')
Or:
from requests import get from requests.exceptions import HTTPError @retry(errors=[HTTPError, ValueError]) def update_my_wallpaper():
return get('https://www.deviantart.com/')
The examples above will retry for the default interval on any errors specified the "errors=" arg list.
To retry on specifically 500/502/503/504 errors, you could specify an ErrorCondition object instead, for example:
from requests import get from requests.exceptions import HTTPError @retry(errors=[
ErrorCondition(
error=HTTPError,
error_codes=[500, 502, 503, 504]
)]) def update_my_wallpaper():
return requests.get('https://www.deviantart.com/')
To retry on specifically errors containing the phrase "NotFound":
from requests import get from requests.exceptions import HTTPError @retry(errors=[
ErrorCondition(
error=HTTPError,
error_message_must_include="NotFound"
)]) def update_my_wallpaper():
return requests.get('https://www.deviantart.com/')
To retry on all HTTPError errors EXCEPT an HTTPError containing the phrase "NotFound":
from requests import get from requests.exceptions import HTTPError @retry(errors=[
HTTPError,
ErrorCondition(
error=HTTPError,
error_message_must_include="NotFound",
retry_on_this_condition=False
)]) def update_my_wallpaper():
return requests.get('https://www.deviantart.com/')
To retry on boto3's specific status errors, an example of the implementation is:
import boto3 from botocore.exceptions import ClientError @retry(errors=[
ErrorCondition(
error=ClientError,
boto_error_codes=["BucketNotFound"]
)]) def boto_bucket(bucket_name):
boto_session = boto3.session.Session()
s3_resource = boto_session.resource('s3')
return s3_resource.Bucket(bucket_name)
Any combination of these will also work, provided the codes are matched to the correct exceptions. A ValueError will not return a 404, for example.
The retry function as a decorator should make retrying functions easier and clearer. It also encourages smaller independent functions, as opposed to lumping many different things that may need to be retried on different conditions in the same function.
The ErrorCondition object tries to take some of the heavy lifting of writing specific retry conditions and boil it down to an API that covers all common use-cases without the user having to write any new bespoke functions.
Use-cases covered currently:
If new functionality is needed, it's currently best practice in Toil to add functionality to the ErrorCondition itself rather than making a new custom retry method.
The following diagram layouts out the software architecture of Toil.
Toil implements lots of optimizations designed for scalability. Here we detail some of the key optimizations.
The leader process is currently implemented as a single thread. Most of the leader's tasks revolve around processing the state of jobs, each stored as a file within the job-store. To minimise the load on this thread, each worker does as much work as possible to manage the state of the job it is running. As a result, with a couple of minor exceptions, the leader process never needs to write or update the state of a job within the job-store. For example, when a job is complete and has no further successors the responsible worker deletes the job from the job-store, marking it complete. The leader then only has to check for the existence of the file when it receives a signal from the batch-system to know that the job is complete. This off-loading of state management is orthogonal to future parallelization of the leader.
The scheduling of successor jobs is partially managed by the worker, reducing the number of individual jobs the leader needs to process. Currently this is very simple: if the there is a single next successor job to run and its resources fit within the resources of the current job and closely match the resources of the current job then the job is run immediately on the worker without returning to the leader. Further extensions of this strategy are possible, but for many workflows which define a series of serial successors (e.g. map sequencing reads, post-process mapped reads, etc.) this pattern is very effective at reducing leader workload.
Critical to running at large-scale is dealing with intermittent node failures. Toil is therefore designed to always be resumable providing the job-store does not become corrupt. This robustness allows Toil to run on preemptible nodes, which are only available when others are not willing to pay more to use them. Designing workflows that divide into many short individual jobs that can use preemptable nodes allows for workflows to be efficiently scheduled and executed.
Running bioinformatic pipelines often require the passing of large datasets between jobs. Toil caches the results from jobs such that child jobs running on the same node can directly use the same file objects, thereby eliminating the need for an intermediary transfer to the job store. Caching also reduces the burden on the local disks, because multiple jobs can share a single file. The resulting drop in I/O allows pipelines to run faster, and, by the sharing of files, allows users to run more jobs in parallel by reducing overall disk requirements.
To demonstrate the efficiency of caching, we ran an experimental internal pipeline on 3 samples from the TCGA Lung Squamous Carcinoma (LUSC) dataset. The pipeline takes the tumor and normal exome fastqs, and the tumor rna fastq and input, and predicts MHC presented neoepitopes in the patient that are potential targets for T-cell based immunotherapies. The pipeline was run individually on the samples on c3.8xlarge machines on AWS (60GB RAM,600GB SSD storage, 32 cores). The pipeline aligns the data to hg19-based references, predicts MHC haplotypes using PHLAT, calls mutations using 2 callers (MuTect and RADIA) and annotates them using SnpEff, then predicts MHC:peptide binding using the IEDB suite of tools before running an in-house rank boosting algorithm on the final calls.
To optimize time taken, The pipeline is written such that mutations are called on a per-chromosome basis from the whole-exome bams and are merged into a complete vcf. Running mutect in parallel on whole exome bams requires each mutect job to download the complete Tumor and Normal Bams to their working directories -- An operation that quickly fills the disk and limits the parallelizability of jobs. The script was run in Toil, with and without caching, and Figure 2 shows that the workflow finishes faster in the cached case while using less disk on average than the uncached run. We believe that benefits of caching arising from file transfers will be much higher on magnetic disk-based storage systems as compared to the SSD systems we tested this on.
The CWL document and input document are loaded using the 'cwltool.load_tool' module. This performs normalization and URI expansion (for example, relative file references are turned into absolute file URIs), validates the document against the CWL schema, initializes Python objects corresponding to major document elements (command line tools, workflows, workflow steps), and performs static type checking that sources and sinks have compatible types.
Input files referenced by the CWL document and input document are imported into the Toil file store. CWL documents may use any URI scheme supported by Toil file store, including local files and object storage.
The 'location' field of File references are updated to reflect the import token returned by the Toil file store.
For directory inputs, the directory listing is stored in Directory object. Each individual files is imported into Toil file store.
An initial workflow Job is created from the toplevel CWL document. Then, control passes to the Toil engine which schedules the initial workflow job to run.
When the toplevel workflow job runs, it traverses the CWL workflow and creates a toil job for each step. The dependency graph is expressed by making downstream jobs children of upstream jobs, and initializing the child jobs with an input object containing the promises of output from upstream jobs.
Because Toil jobs have a single output, but CWL permits steps to have multiple output parameters that may feed into multiple other steps, the input to a CWLJob is expressed with an "indirect dictionary". This is a dictionary of input parameters, where each entry value is a tuple of a promise and a promise key. When the job runs, the indirect dictionary is turned into a concrete input object by resolving each promise into its actual value (which is always a dict), and then looking up the promise key to get the actual value for the the input parameter.
If a workflow step specifies a scatter, then a scatter job is created and connected into the workflow graph as described above. When the scatter step runs, it creates child jobs for each parameterizations of the scatter. A gather job is added as a follow-on to gather the outputs into arrays.
When running a command line tool, it first creates output and temporary directories under the Toil local temp dir. It runs the command line tool using the single_job_executor from CWLTool, providing a Toil-specific constructor for filesystem access, and overriding the default PathMapper to use ToilPathMapper.
The ToilPathMapper keeps track of a file's symbolic identifier (the Toil FileID), its local path on the host (the value returned by readGlobalFile) and the the location of the file inside the Docker container.
After executing single_job_executor from CWLTool, it gets back the output object and status. If the underlying job failed, raise an exception. Files from the output object are added to the file store using writeGlobalFile and the 'location' field of File references are updated to reflect the token returned by the Toil file store.
When the workflow completes, it returns an indirect dictionary linking to the outputs of the job steps that contribute to the final output. This is the value returned by toil.start() or toil.restart(). This is resolved to get the final output object. The files in this object are exported from the file store to 'outdir' on the host file system, and the 'location' field of File references are updated to reflect the final exported location of the output files.
Toil requires at least the following permissions in an IAM role to operate on a cluster. These are added by default when launching a cluster. However, ensure that they are present if creating a custom IAM role when launching a cluster with the --awsEc2ProfileArn parameter.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:*",
"s3:*",
"sdb:*",
"iam:PassRole"
],
"Resource": "*"
}
] }
If you want to run your workflow in a distributed environment, on multiple worker machines, either in the cloud or on a bare-metal cluster, your script needs to be made available to those other machines. If your script imports other modules, those modules also need to be made available on the workers. Toil can automatically do that for you, with a little help on your part. We call this feature auto-deployment of a workflow.
Let's first examine various scenarios of auto-deploying a workflow, which, as we'll see shortly cannot be auto-deployed. Lastly, we'll deal with the issue of declaring Toil as a dependency of a workflow that is packaged as a setuptools distribution.
Toil can be easily deployed to a remote host. First, assuming you've followed our prepareAWS section to install Toil and use it to create a remote leader node on (in this example) AWS, you can now log into this into using sshCluster and once on the remote host, create and activate a virtualenv (noting to make sure to use the --system-site-packages option!):
$ virtualenv --system-site-packages venv $ . venv/bin/activate
Note the --system-site-packages option, which ensures that globally-installed packages are accessible inside the virtualenv. Do not (re)install Toil after this! The --system-site-packages option has already transferred Toil and the dependencies from your local installation of Toil for you.
From here, you can install a project and its dependencies:
$ tree . ├── util │ ├── __init__.py │ └── sort │ ├── __init__.py │ └── quick.py └── workflow
├── __init__.py
└── main.py 3 directories, 5 files $ pip install matplotlib $ cp -R workflow util venv/lib/python2.7/site-packages
Ideally, your project would have a setup.py file (see setuptools) which streamlines the installation process:
$ tree . ├── util │ ├── __init__.py │ └── sort │ ├── __init__.py │ └── quick.py ├── workflow │ ├── __init__.py │ └── main.py └── setup.py 3 directories, 6 files $ pip install .
Or, if your project has been published to PyPI:
$ pip install my-project
In each case, we have created a virtualenv with the --system-site-packages flag in the venv subdirectory then installed the matplotlib distribution from PyPI along with the two packages that our project consists of. (Again, both Python and Toil are assumed to be present on the leader and all worker nodes.)
We can now run our workflow:
$ python main.py --batchSystem=mesos …
IMPORTANT:
WARNING:
Also note that using the --single-version-externally-managed flag with setup.py will prevent the installation of your package as an .egg. It will also disable the automatic installation of your project's dependencies.
This scenario applies if the user script imports modules that are its siblings:
$ cd my_project $ ls userScript.py utilities.py $ ./userScript.py --batchSystem=mesos …
Here userScript.py imports additional functionality from utilities.py. Toil detects that userScript.py has sibling modules and copies them to the workers, alongside the user script. Note that sibling modules will be auto-deployed regardless of whether they are actually imported by the user script–all .py files residing in the same directory as the user script will automatically be auto-deployed.
Sibling modules are a suitable method of organizing the source code of reasonably complicated workflows.
Recall that in Python, a package is a directory containing one or more .py files—one of which must be called __init__.py—and optionally other packages. For more involved workflows that contain a significant amount of code, this is the recommended way of organizing the source code. Because we use a package hierarchy, we can't really refer to the user script as such, we call it the user module instead. It is merely one of the modules in the package hierarchy. We need to inform Toil that we want to use a package hierarchy by invoking Python's -m option. That enables Toil to identify the entire set of modules belonging to the workflow and copy all of them to each worker. Note that while using the -m option is optional in the scenarios above, it is mandatory in this one.
The following shell session illustrates this:
$ cd my_project $ tree . ├── utils │ ├── __init__.py │ └── sort │ ├── __init__.py │ └── quick.py └── workflow
├── __init__.py
└── main.py 3 directories, 5 files $ python -m workflow.main --batchSystem=mesos …
Here the user module main.py does not reside in the current directory, but is part of a package called util, in a subdirectory of the current directory. Additional functionality is in a separate module called util.sort.quick which corresponds to util/sort/quick.py. Because we invoke the user module via python -m workflow.main, Toil can determine the root directory of the hierarchy–my_project in this case–and copy all Python modules underneath it to each worker. The -m option is documented here
When -m is passed, Python adds the current working directory to sys.path, the list of root directories to be considered when resolving a module name like workflow.main. Without that added convenience we'd have to run the workflow as PYTHONPATH="$PWD" python -m workflow.main. This also means that Toil can detect the root directory of the user module's package hierarchy even if it isn't the current working directory. In other words we could do this:
$ cd my_project $ export PYTHONPATH="$PWD" $ cd /some/other/dir $ python -m workflow.main --batchSystem=mesos …
Also note that the root directory itself must not be package, i.e. must not contain an __init__.py.
Bare-metal clusters typically mount a shared file system like NFS on each node. If every node has that file system mounted at the same path, you can place your project on that shared filesystem and run your user script from there. Additionally, you can clone the Toil source tree into a directory on that shared file system and you won't even need to install Toil on every worker. Be sure to add both your project directory and the Toil clone to PYTHONPATH. Toil replicates PYTHONPATH from the leader to every worker.
Toil currently only supports a tempdir set to a local, non-shared directory.
The term Toil Appliance refers to the Mesos Docker image that Toil uses to simulate the machines in the virtual mesos cluster. It's easily deployed, only needs Docker, and allows for workflows to be run in single-machine mode and for clusters of VMs to be provisioned. To specify a different image, see the Toil envars section. For more information on the Toil Appliance, see the runningAWS section.
There are several environment variables that affect the way Toil runs.
TOIL_CHECK_ENV | A flag that determines whether Toil will try to refer back to a Python virtual environment in which it is installed when composing commands that may be run on other hosts. If set to True, if Toil is installed in the current virtual environment, it will use absolute paths to its own executables (and the virtual environment must thus be available on at the same path on all nodes). Otherwise, Toil internal commands such as _toil_worker will be resolved according to the PATH on the node where they are executed. This setting can be useful in a shared HPC environment, where users may have their own Toil installations in virtual environments. |
TOIL_WORKDIR | An absolute path to a directory where Toil will write its temporary files. This directory must exist on each worker node and may be set to a different value on each worker. The --workDir command line option overrides this. When using the Toil docker container, such as on Kubernetes, this defaults to /var/lib/toil. When using Toil autoscaling with Mesos, this is somewhere inside the Mesos sandbox. In all other cases, the system's standard temporary directory is used. |
TOIL_KUBERNETES_HOST_PATH | A path on Kubernetes hosts that will be mounted as /tmp in the workers, to allow for shared caching. |
TOIL_KUBERNETES_OWNER | A name prefix for easy identification of Kubernetes jobs. If not set, Toil will use the current user name. |
KUBE_WATCH_ENABLED | A boolean variable that allows for users to utilize kubernetes watch stream feature intead of polling for running jobs. Default value is set to False. |
TOIL_APPLIANCE_SELF | The fully qualified reference for the Toil Appliance you wish to use, in the form REPO/IMAGE:TAG. quay.io/ucsc_cgl/toil:3.6.0 and cket/toil:3.5.0 are both examples of valid options. Note that since Docker defaults to Dockerhub repos, only quay.io repos need to specify their registry. |
TOIL_DOCKER_REGISTRY | The URL of the registry of the Toil Appliance image you wish to use. Docker will use Dockerhub by default, but the quay.io registry is also very popular and easily specifiable by setting this option to quay.io. |
TOIL_DOCKER_NAME | The name of the Toil Appliance image you wish to use. Generally this is simply toil but this option is provided to override this, since the image can be built with arbitrary names. |
TOIL_AWS_SECRET_NAME | For the Kubernetes batch system, the name of a Kubernetes secret which contains a credentials file granting access to AWS resources. Will be mounted as ~/.aws inside Kubernetes-managed Toil containers. Enables the AWSJobStore to be used with the Kubernetes batch system, if the credentials allow access to S3 and SimpleDB. |
TOIL_AWS_ZONE | The EC2 zone to provision nodes in if using Toil's provisioner. |
TOIL_AWS_AMI | ID of the AMI to use in node provisioning. If in doubt, don't set this variable. |
TOIL_AWS_NODE_DEBUG | Determines whether to preserve nodes that have failed health checks. If set to True, nodes that fail EC2 health checks won't immediately be terminated so they can be examined and the cause of failure determined. If any EC2 nodes are left behind in this manner, the security group will also be left behind by necessity as it cannot be deleted until all associated nodes have been terminated. |
TOIL_SLURM_ARGS | Arguments for sbatch for the slurm batch system. Do not pass CPU or memory specifications here. Instead, define resource requirements for the job. There is no default value for this variable. |
TOIL_GRIDENGINE_ARGS | Arguments for qsub for the gridengine batch system. Do not pass CPU or memory specifications here. Instead, define resource requirements for the job. There is no default value for this variable. |
TOIL_GRIDENGINE_PE | Parallel environment arguments for qsub and for the gridengine batch system. There is no default value for this variable. |
TOIL_TORQUE_ARGS | Arguments for qsub for the Torque batch system. Do not pass CPU or memory specifications here. Instead, define extra parameters for the job such as queue. Example: -q medium Use TOIL_TORQUE_REQS to pass extra values for the -l resource requirements parameter. There is no default value for this variable. |
TOIL_TORQUE_REQS | Arguments for the resource requirements for Torque batch system. Do not pass CPU or memory specifications here. Instead, define extra resource requirements as a string that goes after the -l argument to qsub. Example: walltime=2:00:00,file=50gb There is no default value for this variable. |
TOIL_LSF_ARGS | Additional arguments for the LSF's bsub command. Instead, define extra parameters for the job such as queue. Example: -q medium. There is no default value for this variable. |
TOIL_HTCONDOR_PARAMS | Additional parameters to include in the HTCondor submit file passed to condor_submit. Do not pass CPU or memory specifications here. Instead define extra parameters which may be required by HTCondor. This variable is parsed as a semicolon-separated string of parameter = value pairs. Example: requirements = TARGET.has_sse4_2 == true; accounting_group = test. There is no default value for this variable. |
TOIL_CUSTOM_DOCKER_INIT_COMMAND | Any custom bash command to run in the Toil docker container prior to running the Toil services. Can be used for any custom initialization in the worker and/or primary nodes such as private docker docker authentication. Example for AWS ECR: pip install awscli && eval $(aws ecr get-login --no-include-email --region us-east-1). |
TOIL_CUSTOM_INIT_COMMAND | Any custom bash command to run prior to starting the Toil appliance. Can be used for any custom initialization in the worker and/or primary nodes such as private docker authentication for the Toil appliance itself (i.e. from TOIL_APPLIANCE_SELF). |
TOIL_S3_HOST | the IP address or hostname to use for connecting to S3. Example: TOIL_S3_HOST=127.0.0.1 |
TOIL_S3_PORT | a port number to use for connecting to S3. Example: TOIL_S3_PORT=9001 |
TOIL_S3_USE_SSL | enable or disable the usage of SSL for connecting to S3 (True by default). Example: TOIL_S3_USE_SSL=False |
UCSC Computational Genomics Lab
2021 – 2021 UCSC Computational Genomics Lab
February 10, 2021 | 5.2.0 |