Python SDK

This is a design draft for a next-generation Python SDK, making it easier to write Crunch scripts today, with room to grow for future workflow changes in Arvados.

Goals

As I understand them, in order of importance:

  • The SDK should provide a high-level, Pythonic interface to dispatch work through Arvados. It should eliminate any need to manipulate jobs or tasks directly via API calls, or manually instantiate objects from parameter strings. There should be a bare minimum of boilerplate code in a user's Crunch script—just the tiny bit necessary to use the SDK.
  • The SDK should be usable with today's Crunch, so users can start working with it and give us feedback to inform future development.
  • The SDK should have clear room to grow to accommodate future workflow improvements, whether that's new Crunch features (including possibly reusable tasks and/or elimination of the job/task distinction), hooks with the Common Workflow Language, or whatever.
    The first version of the SDK will not support features we expect these systems to have—that would conflict with the previous goal. The idea here is that the API should not rely on details of today's Crunch implementation, and should have clear spaces where users will be able to use the features of more powerful work dispatch systems in the future.

Components

Dispatcher

Dispatcher classes provide a consistent interface to talk to different scheduling systems: they know how to run the right code for this instantiation (e.g., the current task); how to dispatch new work to the scheduling system; how to de/serialize parameters; etc.

Early on, we'll have a traditional Crunch dispatcher, and a "running locally for debugging" dispatcher. If we want to support entirely different backends in the future, the Dispatcher interface should provide the right level of abstraction to do that. While I'm not sketching out an API here (since it's not user-facing), we do need to be careful to make sure that Dispatcher methods hit that design target.

Users should be able to instantiate Dispatcher classes to customize their behavior. However, all Dispatchers should be able to run without any customization, and users shouldn't have to deal with them at all for the easy cases where they're fine with the scheduler's default behavior. The SDK provides an AutoDispatcher callable that automatically resolves to the best Dispatcher available for the environment, or raises an exception if none are suitable.

CrunchModule

The bulk of a user's Crunch script will define a subclass of the SDK's CrunchModule class. The user extends it with WorkMethods that represent work units, and a run() method that implements the highest-level work, returning the final work output (e.g., the job output as opposed to task output).

run() and WorkMethods follow similar patterns of rules for how they're run. There are some small differences between them. Right now you can think of them as parallels to the differences between jobs and tasks, but the concepts are general enough that they should be applicable to other scheduling systems.

  • run() is called with parameters that are deserialized from the Arvados job record. WorkMethods are called with parameters provided in the user's CrunchModule code directly. The Crunch dispatcher will stash those parameters in task parameters.
  • Both run() and WorkMethods can call other WorkMethods.
  • run() and WorkMethods can return a Collection object, a sequence of one or more FutureOutput objects, a simple (JSON-encodable) Python object, or None. This output is serialized appropriately and recorded in Arvados. run()'s output is the job output; WorkMethod output is task output.
    When the output is a sequence of FutureOutputs, it means "create my output by combining the outputs of these WorkMethods"—similarly to the way crunch-job creates job output by concatenating task output today. The SDK will record the references immediately, and then later resolve them when able to record "real" output to aid work reuse.

The user should pass their CrunchModule class to the run_work() method of an instantiated dispatcher. run_work() will instantiate the CrunchModule so that WorkMethods can be dispatched appropriately, then invoke run() or the user code of a WorkMethod appropriately. To better illustrate, I'll describe how the current Crunch dispatcher would take over from here.

The Crunch Dispatcher gets the current task. If that's task 0, it gets the current job, deserializes the job parameters, and calls crunch_module.run() with those arguments. For any other task, it looks up the task's corresponding WorkMethod, and runs its code with deserialized task parameters.

Either way, the method's return value is captured. If it's a Collection object, the Dispatcher makes sure it's saved, then records the portable data hash as the task's output. For any other return value except None, it serializes the return value to JSON, saves that in a Collection, and uses that as the task's output.

WorkMethod

WorkMethod is a class that should decorate instance methods defined on the CrunchModule subclass. It demarcates work that can be scheduled through a Dispatcher. Use looks like:

@WorkMethod()
def small_work(self, param1, param2):
    # implementation

When a WorkMethod is called—from run() or another WorkMethod's code—the user's code isn't run. Instead, we give the Dispatcher all the information about the call. The Dispatcher serializes the parameters and schedules the real work method to be run later. We wrap information about the work's scheduling in a FutureOutput object, which I'll get to next.

Because the Dispatcher has to be able to serialize arguments and output, those values can only have types that the SDK knows how to work with, and some type information may be lost. For the first SDK implementation, objects we can work with will be objects that can be encoded to JSON, plus Collection objects. Sequence types that aren't native to JSON, like tuples and iterators, will become lists at the method's boundaries. (TBD: Or maybe we should require exactly a known type, to avoid surprising coercions?)

Note that WorkMethod accepts arguments, and we're passing it none. We expect the constructor to grow optional keyword arguments over time that let the user specify scheduling constraints.

FutureOutput

FutureOutput objects represent the output of work that has been scheduled but not yet run. Whenever a WorkMethod is called, it returns a FutureOutput object. Users can do two things with these objects: pass them in as arguments to other WorkMethod calls, and return them from run() or WorkMethods.

When a WorkMethod is called with at least one FutureOutput object, the Dispatcher does two things. First, it can introspect those FutureObjects to figure out how the new work should be scheduled. Second, when it's time to run the work of the second WorkMethod, it finds the real output generated by running the first WorkMethod, and passes that in when running the user code.

When run() or a WorkMethod returns a list of FutureOutput objects, those outputs will be combined to create the final corresponding output once they're available. Refer to the discussion in the CrunchModule section above.

To be clear: each Dispatcher will have its own paired FutureObject implementation.

FutureOutput objects cannot be resolved or manipulated by the user directly. Users can't wait for the real output to become available (because in some schedulers, like current Crunch, you'll wait forever). The objects should have no user-facing methods that might suggest this is possible. We'll probably even redefine some very basic methods (e.g., __str__) to be less useful than object's default implementation, to really drive this point home.

Crunch dispatch example

Let me talk about a Crunch example to make things concrete. The Crunch dispatcher is expected to create tasks at set sequence levels, based on when they can run:

  • S1: Sequence for WorkMethod tasks that can be run now—all parameters can be deserialized into useful objects
  • S2: Sequence for WorkMethod tasks that have FutureOutput parameters that need to be resolved
  • S3: Sequence for internal tasks to set real output records for methods that returned FutureOutput objects

Roughly, the FutureObject will wrap the task we create to represent the work. Imagine a simple linear Crunch job:

class CrunchExample(CrunchModule):
    def run(self, arg1, arg2, arg3):
        output1 = work_method1(arg1, arg2)
        return [work_method2(output1, arg3)]

work_method1 is called, and immediately returns a FutureOutput with information about the task scheduled for it at sequence S1. When work_method2 is called, the Crunch Dispatcher introspects the arguments, and finds the output1 FutureObject among them. The work_method2 code can't be run until work_method1 runs and generates output. The Crunch Dispatcher creates a task to run work_method2 at sequence S2. work_method2 returns a FutureOutput for this task, which run() returns directly. Because run() has returned a list of FutureOutput objects, the Crunch Dispatcher creates an internal task to run at S3 to combine these outputs into a single collection and set them as the final job output.

When Crunch actually starts the task for work_method2, the Crunch Dispatcher will see that an argument should have the value of work_method1's output. It will deserialize that from the task output mirroring the rules described earlier, then duplicate this task to run at sequence S1 with real inputs. This dispatch strategy means the Crunch job will run as many tasks as possible before trying to resolve FutureOutputs (it will keep running tasks at S1 until none remain), and tasks will be maximally reusable (because tasks that actually run have real inputs, without requiring an SDK to resolve them).

Review

Here are the interfaces that script authors will use:

  • They will subclass CrunchModule to define their own run() method and WorkMethods.
  • They will decorate their work methods with WorkMethod(). This decorator will not currently accept any arguments, but you'll still call it as if it did, so it can grow to accept new keyword arguments in the future.
  • They will receive FutureOutput objects when they call WorkMethods, and pass those to other WorkMethod calls. The user doesn't need to know any of the semantics of FutureOutput at all; only the Dispatcher does.
  • The main body of the script will call dispatcher.run_work(), passing in their CrunchModule subclass.

This is a very small interface. There's nothing specific to current Crunch in it. When the library takes responsibility for some Arvados interface, it takes responsibility for all interactions with it: e.g., it both serializes and deserializes parameters; it controls all aspects of work scheduling. Users shouldn't have to encode any Crunch specifics in their scripts. I think that bodes well for future development of the SDK.

Hypothetical future Crunch scripts

These are completely untested, since they can't actually run. Please forgive cosmetic mistakes.

Example: Crunch statistics on a Collection of fastj files indicating a tumor

This demonstrates scheduling tasks in order and explicit job output.

#!/usr/bin/env python

import os
import pprint
import subprocess

from arvados.crunch import CrunchModule, WorkMethod, AutoDispatcher
from subprocess import check_call

class TumorAnalysis(CrunchModule):
    # input is a Collection named in a job parameter.
    # The SDK instantiates the corresponding Colection object and passes it in.
    def run(self, input):
        # analyze_tumors is being called with a generator of
        # FutureOutput objects.  The Dispatcher will unroll this (it
        # inevitably needs to traverse recursive data structures),
        # schedule all the classify tasks at sequence N-1, then
        # schedule analyze_tumors at sequence N.  When the analyze_tumors
        # task runs, it will resolve all the futures and schedule a real
        # task to run at sequence N-1.
        return self.analyze_tumors(self.classify(in_file)
                                   for in_file in input.all_files()
                                   if in_file.name.endswith('.fastj'))

    @WorkMethod()
    def classify(self, in_file):
        out_file = tempfile.NamedTemporaryFile()
        proc = subprocess.Popen(['normal_or_tumor'],
                                stdin=subprocess.PIPE, stdout=out_file)
        for data in in_file.readall():
            proc.stdin.write(data)
        proc.stdin.close()
        proc.wait()
        if 'tumor' in outfile.read(4096):
            out_file.seek(0)
            out_coll = CollectionWriter()
            with out_coll.open(in_file.name) as output:
                output.write(out_file.read())
            return out_coll
        return None

    @WorkMethod()
    def analyze_tumors(self, results):
        compiled = {}
        for result in results:
            if result is None:
                continue
            # Imagine a reduce-type step operating on the file in the
            # Collection.
            compiled[thing] = ...
        return compiled

if __name__ == '__main__':
    AutoDispatcher().run_work(TumorAnalysis)

Example from #3603

This is the script that Abram used to illustrate #3603.

#!/usr/bin/env python

from arvados.crunch import CrunchModule, WorkMethod, AutoDispatcher
from subprocess import check_call

class Example3603(CrunchModule):
    def run(self, human_collection_list, refpath):
        return [self.run_ruler(refpath, line) for line in human_collection_list]

    @WorkMethod()
    def run_ruler(self, refpath, input_line):
        fastj_id, human_id = input_line.strip().split(',')
        # CollectionReader and StreamFileReader objects should grow a
        # mount_path() method that gives the object's location under
        # $TASK_KEEPMOUNT.
        check_call(["tileruler", "--crunch", "--noterm", "abv",
                    "-human", human_id,
                    "-fastj-path", CollectionReader(fastj_id).mount_path(),
                    "-lib-path", refpath])
        out_coll = CollectionWriter()
        # The argument is the path where tileruler writes output.
        out_coll.write_directory_tree('.')
        return out_coll

if __name__ == '__main__':
    AutoDispatcher().run_work(Example3603)

Notes/TODO

(Some of these may be out-of-date, but I'll leave that for the suggesters to decide.)

  • Important concurrency limits that job scripts must be able to express:
    • Task Y and Z cannot run on the same worker node without interfering with each other (e.g., due to RAM requirements).
    • Schedule at most N instance of task T on the same node. (The parallel_with=[] example above is stronger than we actually want, for a multithreaded task. It would be okay to run several of these tasks in parallel, as long as there's only one per node. This could be expressed as a particular case of the above bullet: "Task T cannot run on the same worker node as task T without interference." But that feels less natural.)
      • This is an SDK for Crunch, and currently Crunch gives the user no control of task scheduling beyond the sequence, like how one compute node can or can't be shared. I don't know how to write these features given that there's no way to express these constraints in a way Crunch will recognize and expect (beyond the brute-force "don't parallelize this").
  • In general, the output name is not known until the task is nearly finished. Frequently it is clearer to specify it when the task is queued, though. We should provide a convenient way to do this without any boilerplate in the queued task.
    • This seems pretty easily doable by passing the desired filename into the task method as a parameter. Then the code can work with self.output.open(passed_filename).
  • A second example that uses a "case" and "control" input (e.g., "tumor" and "normal") might help reveal features.