Pipeline Optimization


This wiki page is designed to help users make their Arvados pipelines cost and compute efficient for production level data. It will teach users how to make the most out of their pipeline to save money or compute time on their clusters.

Pipeline design

Choosing the right number of jobs

Currently, your pipeline may have one script that runs everything from alignment to variant calling. If something unexpected happens and your pipeline fails, you don't want to start from the beginning again, you want to resume from where you left off!

This is where the notion of choosing which jobs to create comes into play. The right number of jobs depends on how versatile you want your pipeline to be. Specifically, how many steps do you want your pipeline to have?

Relevant questions include:

Do you want to output all your intermediate files to keep?
How many checkpoints do you want your pipeline to have?
Do you want to do alignment and variant calling in one step? Should I separate them? (usually, yes!)

Each job must output a collection of files. This is how you can checkpoint your pipeline. The outputs of jobs are always available, so you can reuse them when the next job fails.

If you don't want to have a lot of jobs or a lot of outputs, you should combine multiple commands in a job. You can do all your bam realignment/recalibration in one step, and you don't need to output all the intermediate bam files to keep. Keep in mind, you always choose what to upload to keep, so if you don't need files later on, its best to leave it on the compute node. One way to do so is to use a temporary directory or tmpdir to store all your intermediate files, and then only upload what you want to keep.

If you choose to do multiple computations in a job, you should try piping them together. Creating pipes between tools is generally faster than writing/reading from disk. Feel free to pipe your tools together, for example using subprocess.PIPE in the python subprocess module. One example that piping has shown to be slower is when both tools are fighting for resources. For instance, not having enough memory to support both processes or arv-mount cache is too small to support reading from multiple processes reading from different files at the same time.

If you want a lot of checkpoints you should have a job for each command. You'll be able to resume/restart work easily if any unexpected interruption happens.

If you have separate jobs, you can choose different node types for each command. For example, BWA-mem has native multithreading, and varscan does not. So using a 16 core node for varscan would be overkill, whereas BWA-mem can scale with cores, so using a 16 core node would be beneficial. Generally, it depends on the tools, if all your tools have the same threading, then using them all on the same node type would be a good way to get efficient resource usage. Furthermore, if your tools don't use a lot of memory, then they will run free on more cores.

One more thing to keep in mind is that jobs don't necessarily depend on each other (unless you set output_of in a parameter). If you have multiple jobs depending on one input, then they will all run at the same time. For example, if you have multiple variant callers, they will all run in parallel against one bam file.

Writing to keep

Once you've chosen how to create jobs, you'll need a way to write your outputs to keep.

There are two ways to write your output collection to keep. Writing straight to keep and staging a file in a temporary directory and then uploading to keep. Methods for both of these are discussed in Writing_a_Script_calling_a_Third_Party_Tool.

In general, writing straight to keep will reap more benefits. TaskOutputDir acts like a pipe and operates on a fuse-mount, so you never have to spend node time on uploading data. One problem though is if your job is dependent on using your output directory as a temporary space for files. If your job uses its output directory for computation, then your job will be trying to compute over a network and could become very slow. That being said, your alternative would be to write to a temporary directory then spending compute time uploading to keep. As long as your node has enough space, then you're able to do this, but keep in mind that using TaskOutputDir writes to keep, which has significantly more scratch space than your node.

Choosing the right number of tasks

Once you have your job architecture set up, you need to start looking at the core level of the compute node. Specifically, how much compute are you using and how can you take advantage of a machine.

If you know how much RAM you want your job to have, you can set min_ram_mb_per_node in runtime_constraints to ensure that you get a compute node big enough for your computation. Most tools scale up with more RAM, so feel free to test your job on different nodetypes to see what it works on the best.

If you have a job that uses non-sequential access, which typically means either one process doing a lot of seeking or (more likely) one or more processes reading multiple files at once, it may be worth it to change keep_cache_mb_per_task in runtime_constraints. The default is 256 MiB, which is equal to 4 keep blocks. Try 512MiB or 1024MiB to see if you get better performance.

If you have a job that seeks a lot on its input (more than increasing arv-cache can handle) or writes/reads a lot of data from keep, you can try copying the file to a temporary directory on the node and reading from there so you don't have to battle the network.

If you have a job is natively multithreaded, you should set num_cores_per_node in runtime_constraints to the number of threads you want. Most clouds have machines up to 16 threads, so it would be worth testing on multiple nodetypes to see what works best for you.

If you want multiple computations (tasks) on a node, setting max_tasks_per_node in runtime_constraints allows you to choose how many tasks you would like to run on a machine. For example, if you have a lot of small tasks that use 1 core/1GB ram, you can put multiple of those on a bigger machine. For example, 8 tasks on an 8 core machine. If you want to utilize machines better for cost savings, you should use crunchstat-summary to find out the maximum memory/cpu usage for one task, and see if you can fit more than 1 of those on a machine. One warning, however is if you do run out of RAM (some compute nodes can't swap) your process will die.

How to optimize the number of tasks when you don't have native multithreading

Tools like GATK have native multi-threading where you can pass a flag indicating the number of cores it should use. You usually want take advantage of this, and choose the min_cores_per_node that equals your threading parameter. You can use any number of min_tasks_per_node making sure that your tool-threading*min_tasks_per_node is <= min_cores_per_node. Also making sure that your node has enough RAM to allocate to all the tasks.

Tools like varscan/freebayes don't have native multi-threading so you need to find a workaround. Generally, these tools have a -L/--intervals to pass in certain loci to work on. If you have a bed file you can split reads on, then you can create a new task per interval. Then, have a job merge the outputs together.

Crunchstat Summary

Crunchstat-summary is an arvados tool to help choose optimal configurations for arvados jobs and pipeline instances. It helps you choose runtime_constraints specified in the pipeline template under each job by graphing job statistics. For example: CPU usage, RAM, and Keep network traffic over time.

How to install crunchstat-summary

$ git clone https://github.com/curoverse/arvados.git
$ cd arvados/tools/crunchstat-summary/
$ python setup.py build
$ python setup.py install --user

How to use crunchstat-summary

$ ./bin/crunchstat-summary --help
usage: crunchstat-summary [-h]
                          [--job UUID | --pipeline-instance UUID | --log-file LOG_FILE]
                          [--skip-child-jobs] [--format {html,text}]

Summarize resource usage of an Arvados Crunch job

optional arguments:
  -h, --help            show this help message and exit
  --job UUID            Look up the specified job and read its log data from
                        Keep (or from the Arvados event log, if the job is
                        still running)
  --pipeline-instance UUID
                        Summarize each component of the given pipeline
  --log-file LOG_FILE   Read log data from a regular file
  --skip-child-jobs     Do not include stats from child jobs
  --format {html,text}  Report format
  --verbose, -v         Log more information (once for progress, twice for

There are two ways of using crunchstat-summary: a text view for an overall view of a job or an html page, which graphs usage over time.

Case 1: A job that does bwa-aln mapping and converts to bam using samtools.

$ ~/arvados/tools/crunchstat-summary/bin/crunchstat-summary --format text --job qr1hi-8i9sb-bzn6hzttfu9cetv
category        metric  task_max        task_max_rate   job_total
blkio:202:0     read    310334464       -       913853440
blkio:202:0     write   2567127040      -       7693406208
blkio:202:16    read    8036201472      155118884.01    4538585088
blkio:202:16    write   55502038016     0       0
blkio:202:32    read    2756608 100760.59       6717440
blkio:202:32    write   53570560        0       99514368
cpu     cpus    8       -       -
cpu     sys     1592.34 1.17    805.32
cpu     user    11061.28        7.98    4620.17
cpu     user+sys        12653.62        8.00    5425.49
mem     cache   7454289920      -       -
mem     pgmajfault      1859    -       830
mem     rss     7965265920      -       -
mem     swap    5537792 -       -
net:docker0     rx      2023609029      -       2093089079
net:docker0     tx      21404100070     -       49909181906
net:docker0     tx+rx   23427709099     -       52002270985
net:eth0        rx      44750669842     67466325.07     14233805360
net:eth0        tx      2126085781      20171074.09     3670464917
net:eth0        tx+rx   46876755623     67673532.73     17904270277
time    elapsed 949     -       1899
# Number of tasks: 3
# Max CPU time spent by a single task: 12653.62s
# Max CPU usage in a single interval: 799.88%
# Overall CPU usage: 285.70%
# Max memory used by a single task: 7.97GB
# Max network traffic in a single task: 46.88GB
# Max network speed in a single interval: 67.67MB/s
# Keep cache miss rate 0.00%
# Keep cache utilization 0.00%
#!! qr1hi-8i9sb-bzn6hzttfu9cetv max CPU usage was 800% -- try runtime_constraints "min_cores_per_node":8
#!! qr1hi-8i9sb-bzn6hzttfu9cetv max RSS was 7597 MiB -- try runtime_constraints "min_ram_mb_per_node":7782
$ ~/arvados/tools/crunchstat-summary/bin/crunchstat-summary --format html --job qr1hi-8i9sb-bzn6hzttfu9cetv > qr1hi-8i9sb-bzn6hzttfu9cetv.html

Here, you can see the distinct computation steps between the bwa-aln and the samtools step. Since there is a noticeable plateau on CPU usage for both computations, it would be worth trying to run the job on a bigger node. For example, a 16 core node to see if the computation can scale higher than 8 cores.

Another thing to note is you can also see the runtime_constraints recommendations. These suggest ways to introduce or reduce runtime constraints in order to use cheaper nodes when running similar jobs, without making them slow down or run out of memory.

Case study 2: FastQC

$ ~/arvados/tools/crunchstat-summary/bin/crunchstat-summary --format text --job qr1hi-8i9sb-nxqqxravvapt10h
category    metric    task_max    task_max_rate    job_total
blkio:0:0    read    174349211138    65352499.20    174349211138
blkio:0:0    write    0    0    0
cpu    cpus    8    -    -
cpu    sys    364.95    0.17    364.95
cpu    user    17589.59    6.59    17589.59
cpu    user+sys    17954.54    6.72    17954.54
fuseops    read    1330241    498.40    1330241
fuseops    write    0    0    0
keepcache    hit    2655806    1038.00    2655806
keepcache    miss    2633    1.60    2633
keepcalls    get    2658439    1039.00    2658439
keepcalls    put    0    0    0
mem    cache    19836608512    -    -
mem    pgmajfault    19    -    19
mem    rss    1481367552    -    -
net:eth0    rx    178321    17798.40    178321
net:eth0    tx    7156    685.00    7156
net:eth0    tx+rx    185477    18483.40    185477
net:keep0    rx    175959092914    107337311.20    175959092914
net:keep0    tx    0    0    0
net:keep0    tx+rx    175959092914    107337311.20    175959092914
time    elapsed    3301    -    3301
# Number of tasks: 1
# Max CPU time spent by a single task: 17954.54s
# Max CPU usage in a single interval: 672.01%
# Overall CPU usage: 543.91%
# Max memory used by a single task: 1.48GB
# Max network traffic in a single task: 175.96GB
# Max network speed in a single interval: 107.36MB/s
# Keep cache miss rate 0.10%
# Keep cache utilization 99.09%
#!! qr1hi-8i9sb-nxqqxravvapt10h max CPU usage was 673% -- try runtime_constraints "min_cores_per_node":7
#!! qr1hi-8i9sb-nxqqxravvapt10h max RSS was 1413 MiB -- try runtime_constraints "min_ram_mb_per_node":1945
$ ~/arvados/tools/crunchstat-summary/bin/crunchstat-summary --format html --job qr1hi-8i9sb-nxqqxravvapt10h > qr1hi-8i9sb-nxqqxravvapt10h.html

One thing to point out here is keep_cache utilization, which can be changed using 'keep_cache_mb_per_task'. You can see keep cache utilization at 99.09%, which means its at a good point. Crunchstat-summary suggests increasing the cache when utilization is below 80% (i.e., for every 100 bytes arv-mount reads from the network, your program is only getting <80 bytes of input) because this usually means chunks are being read from the network but then ejected from the cache before your program gets a chance to read them.

Another thing to note is to look at the CPU usage and keep transfer rate graphs. You should look to see if they ever mirror each other, which is a sign of a cpu bound job, or an i/o bound job. For example, if keep transfer is low but CPU usage is high, then your job is highly dependent on CPU, which means you should upgrade to a higher core node. If CPU usage is low and keep transfer is high, then you may want to increase the keep_cache_mb_per_task in order to be able to compute on more data.