Distributed workflows

Problem description

A user wants to run a meta-analysis on data located on several different clusters. For either efficiency or legal reasons, the data should be analyzed in place and the results aggregated and returned to a central location. The user should be able to express the multi-cluster computation as a single CWL workflow, and no manual intervention (data transfer, etc) to complete the workflow.

Simplifying assumptions

User explicitly indicates in the workflow which cluster a certain computation (data+code) happens.

Data transfer only occurs between the primary cluster and the secondary clusters, not between secondary clusters.

Proposed solution

Run subworkflow on cluster

A workflow step can be given a CWL hint "RunOnCluster". This indicates the tool or subworkflow run by the workflow step should run on a specific Arvados cluster, instead of submitted to the cluster that the workflow runner is currently running on. The implementation of this would be similar to the "RunInSingleContainer" feature, constructing a container request to run the workflow runner on the remote cluster and wait for results.

Data transfer

In order for the workflow to run successfully on the remote cluster, it needs its data dependencies (docker images, scripts, reference data, etc). These are several options:

  1. Don't do any data transfer of dependencies. Workflows will fail if dependencies are not available. User must manually transfer collections using arv-copy.
    • pros: least work
    • cons: terrible user experience. workflow patterns that transfer data out to remote clusters don't work.
  2. Distribute dependencies as part of workflow registration (requires proactively distributing dependencies to every cluster that might ever need it).
    • pros: less burden on user compared to option (1)
    • cons: doesn't guarantee the dependencies are available where needed, --create/update-workflow option of arvados-cwl-runner has to orchestrate upload of data to every cluster in the federation. workflow patterns that transfer data out to remote clusters don't work.
  3. Workflow runner determines which dependencies are missing from the remote cluster and pushes them before scheduling the subworkflow.
    • pros: no user intervention required, only copy data to clusters that we think will need it
    • cons: copies all dependencies regardless of whether they are actually used, requires that the primary runner have all the dependencies, or is able to facilitate transfer from some other cluster
  4. Workflow runner on remote cluster determines which dependencies are missing and pulls them from federated peers on demand.
    • pros: no user intervention required, only copy data we actually need
    • cons: requires that the primary runner have all the dependencies, or is able to facilitate transfer from some other cluster
  5. Federated access to collections, fetch data blocks on demand from another cluster
    • pros: only fetch data blocks that are actually needed, no additional copies of collections (although, like user records, they may be cached on the remote cluster)
    • cons: requires building out federation support on the API server, requires provenance improvements (supply both UUID and PDH when submitting container requests), inefficient when substantial remote data is read many times (e.g., docker images) unless some sort of cross-cluster data caching is also added.
  6. Hybrid federation, copy a collection to remote cluster but retain UUID/permission from source
    • pros: no user intervention, only fetch blocks we need, fetch data blocks from local keep if available, remote keep if necessary
    • cons: semantics/permission model for "cached" collection records are not yet defined.


Options 1 and 2 cannot support workflows that involve some local computation, and then passing intermediate results to a remote cluster for computation.

Options 2, 3 and 4 involve a similar level of effort, mainly involving arvados-cwl-runner. Of these, option 4 seems to cover the most use cases. A general "transfer required collections" method will cover data transfer for dependencies, intermediate collections, and outputs.

Option 5 involves adding federation-awareness to the Python/Go SDKs, arv-mount, crunch-run, and API server. In order to work efficiently when clusters are distant and large remote collections are accessed more than once per workflow, it will need an infrastructure-level cache solution.

Option 6 level of effort is probably somewhere between options 4 and 5.


Finally, after a subworkflow runs on a remote cluster, the primary cluster needs to access the output and possibly run additional steps. This requires accessing a single output collection, either by pulling it to the primary cluster (using the same features supporting option 4), or by federation (options 5, 6).