Project

General

Profile

Python SDK » History » Version 16

Brett Smith, 02/17/2015 07:44 PM
updates based on feedback+discussion from Tom

1 1 Tom Clegg
h1. Python SDK
2
3 10 Brett Smith
This is a design draft for a next-generation Python SDK, making it easier to write Crunch scripts today, with room to grow for future workflow changes in Arvados.
4 1 Tom Clegg
5 10 Brett Smith
{{toc}}
6 1 Tom Clegg
7 10 Brett Smith
h2. Goals
8 1 Tom Clegg
9 10 Brett Smith
As I understand them, in order of importance:
10 1 Tom Clegg
11 10 Brett Smith
* The SDK should provide a high-level, Pythonic interface to dispatch work through Arvados.  It should eliminate any need to manipulate jobs or tasks directly via API calls, or manually instantiate objects from parameter strings.  There should be a bare minimum of boilerplate code in a user's Crunch script—just the tiny bit necessary to use the SDK.
12
* The SDK should be usable with today's Crunch, so users can start working with it and give us feedback to inform future development.
13
* The SDK should have clear room to grow to accommodate future workflow improvements, whether that's new Crunch features (including possibly reusable tasks and/or elimination of the job/task distinction), hooks with the Common Workflow Language, or whatever.
14
  The first version of the SDK will not support features we expect these systems to have—that would conflict with the previous goal.  The idea here is that the API should not rely on details of today's Crunch implementation, and should have clear spaces where users will be able to use the features of more powerful work dispatch systems in the future.
15 1 Tom Clegg
16 10 Brett Smith
h2. Components
17 1 Tom Clegg
18 10 Brett Smith
h3. Dispatcher
19 1 Tom Clegg
20 14 Brett Smith
Dispatcher classes provide a consistent interface to talk to different scheduling systems: they know how to run the right code for this instantiation (e.g., the current task); how to dispatch new work to the scheduling system; how to de/serialize parameters; etc.
21 1 Tom Clegg
22 10 Brett Smith
Early on, we'll have a traditional Crunch dispatcher, and a "running locally for debugging" dispatcher.  If we want to support entirely different backends in the future, the Dispatcher interface should provide the right level of abstraction to do that.  While I'm not sketching out an API here (since it's not user-facing), we do need to be careful to make sure that Dispatcher methods hit that design target.
23 1 Tom Clegg
24 16 Brett Smith
Users should be able to instantiate Dispatcher classes to customize their behavior.  However, all Dispatchers should be able to run without any customization, and users shouldn't have to deal with them at all for the easy cases where they're fine with the scheduler's default behavior.  The SDK provides an AutoDispatcher callable that automatically resolves to the best Dispatcher available for the environment, or raises an exception if none are suitable.
25 14 Brett Smith
26 16 Brett Smith
h3. CrunchModule
27 14 Brett Smith
28 16 Brett Smith
The bulk of a user's Crunch script will define a subclass of the SDK's CrunchModule class.  The user extends it with WorkMethods that represent work units, and a run() method that implements the highest-level work, returning the final work output (e.g., the job output as opposed to task output).
29 1 Tom Clegg
30 16 Brett Smith
run() and WorkMethods follow similar patterns of rules for how they're run.  There are some small differences between them.  Right now you can think of them as parallels to the differences between jobs and tasks, but the concepts are general enough that they should be applicable to other scheduling systems.
31 1 Tom Clegg
32 16 Brett Smith
* run() is called with parameters that are deserialized from the Arvados job record.  WorkMethods are called with parameters provided in the user's CrunchModule code directly.  The Crunch dispatcher will stash those parameters in task parameters.
33
* Both run() and WorkMethods can call other WorkMethods.
34
* run() and WorkMethods can return a Collection object, a sequence of one or more FutureOutput objects, a simple (JSON-encodable) Python object, or None.  This output is serialized appropriately and recorded in Arvados.  run()'s output is the job output; WorkMethod output is task output.
35
  When the output is a sequence of FutureOutputs, it means "create my output by combining the outputs of these WorkMethods"—similarly to the way crunch-job creates job output by concatenating task output today.  The SDK will record the references immediately, and then later resolve them when able to record "real" output to aid work reuse.
36 14 Brett Smith
37 16 Brett Smith
The user should pass their CrunchModule class to the run_work() method of an instantiated dispatcher.  run_work() will instantiate the CrunchModule so that WorkMethods can be dispatched appropriately, then invoke run() or the user code of a WorkMethod appropriately.  To better illustrate, I'll describe how the current Crunch dispatcher would take over from here.
38 1 Tom Clegg
39 16 Brett Smith
The Crunch Dispatcher gets the current task.  If that's task 0, it gets the current job, deserializes the job parameters, and calls crunch_module.run() with those arguments.  For any other task, it looks up the task's corresponding WorkMethod, and runs its code with deserialized task parameters.
40 13 Brett Smith
41 10 Brett Smith
Either way, the method's return value is captured.  If it's a Collection object, the Dispatcher makes sure it's saved, then records the portable data hash as the task's output.  For any other return value except None, it serializes the return value to JSON, saves that in a Collection, and uses that as the task's output.
42 1 Tom Clegg
43
h3. WorkMethod
44 10 Brett Smith
45 16 Brett Smith
WorkMethod is a class that should decorate instance methods defined on the CrunchModule subclass.  It demarcates work that can be scheduled through a Dispatcher.  Use looks like:
46 10 Brett Smith
47
<pre><code class="python">@WorkMethod()
48 16 Brett Smith
def small_work(self, param1, param2):
49 1 Tom Clegg
    # implementation
50
</code></pre>
51
52 16 Brett Smith
When a WorkMethod is called—from run() or another WorkMethod's code—the user's code isn't run.  Instead, we give the Dispatcher all the information about the call.  The Dispatcher serializes the parameters and schedules the real work method to be run later.  We wrap information about the work's scheduling in a FutureOutput object, which I'll get to next.
53 15 Brett Smith
54 16 Brett Smith
Because the Dispatcher has to be able to serialize arguments and output, those values can only have types that the SDK knows how to work with, and some type information may be lost.  For the first SDK implementation, objects we can work with will be objects that can be encoded to JSON, plus Collection objects.  Sequence types that aren't native to JSON, like tuples and iterators, will become lists at the method's boundaries.  (TBD: Or maybe we should require exactly a known type, to avoid surprising coercions?)
55 1 Tom Clegg
56
Note that WorkMethod accepts arguments, and we're passing it none.  We expect the constructor to grow optional keyword arguments over time that let the user specify scheduling constraints.
57
58
h3. FutureOutput
59
60 16 Brett Smith
FutureOutput objects represent the output of work that has been scheduled but not yet run.  Whenever a WorkMethod is called, it returns a FutureOutput object.  Users can do two things with these objects: pass them in as arguments to other WorkMethod calls, and return them from run() or WorkMethods.
61 1 Tom Clegg
62 16 Brett Smith
When a WorkMethod is called with at least one FutureOutput object, the Dispatcher does two things.  First, it can introspect those FutureObjects to figure out how the new work should be scheduled.  Second, when it's time to run the work of the second WorkMethod, it finds the real output generated by running the first WorkMethod, and passes that in when running the user code.
63 1 Tom Clegg
64 16 Brett Smith
When run() or a WorkMethod returns a list of FutureOutput objects, those outputs will be combined to create the final corresponding output once they're available.  Refer to the discussion in the CrunchModule section above.
65 1 Tom Clegg
66 16 Brett Smith
To be clear: each Dispatcher will have its own paired FutureObject implementation.
67 1 Tom Clegg
68 16 Brett Smith
FutureOutput objects cannot be resolved or manipulated by the user directly.  Users can't wait for the real output to become available (because in some schedulers, like current Crunch, you'll wait forever).  The objects should have no user-facing methods that might suggest this is possible.  We'll probably even redefine some very basic methods (e.g., @__str__@) to be _less_ useful than object's default implementation, to really drive this point home.
69 1 Tom Clegg
70 16 Brett Smith
h3. Crunch dispatch example
71 1 Tom Clegg
72 16 Brett Smith
Let me talk about a Crunch example to make things concrete.  The Crunch dispatcher is expected to create tasks at set sequence levels, based on when they can run:
73
74
* S1: Sequence for WorkMethod tasks that can be run now—all parameters can be deserialized into useful objects
75
* S2: Sequence for WorkMethod tasks that have FutureOutput parameters that need to be resolved
76
* S3: Sequence for internal tasks to set real output records for methods that returned FutureOutput objects
77
78
Roughly, the FutureObject will wrap the task we create to represent the work.  Imagine a simple linear Crunch job:
79
80
<pre><code class="python">class CrunchExample(CrunchModule):
81
    def run(self, arg1, arg2, arg3):
82
        output1 = work_method1(arg1, arg2)
83
        return [work_method2(output1, arg3)]</code></pre>
84
85
work_method1 is called, and immediately returns a FutureOutput with information about the task scheduled for it at sequence S1.  When work_method2 is called, the Crunch Dispatcher introspects the arguments, and finds the output1 FutureObject among them.  The work_method2 code can't be run until work_method1 runs and generates output.  The Crunch Dispatcher creates a task to run work_method2 at sequence S2.  work_method2 returns a FutureOutput for this task, which run() returns directly.  Because run() has returned a list of FutureOutput objects, the Crunch Dispatcher creates an internal task to run at S3 to combine these outputs into a single collection and set them as the final job output.
86
87
When Crunch actually starts the task for work_method2, the Crunch Dispatcher will see that an argument should have the value of work_method1's output.  It will deserialize that from the task output mirroring the rules described earlier, then duplicate this task to run at sequence S1 with real inputs.  This dispatch strategy means the Crunch job will run as many tasks as possible before trying to resolve FutureOutputs (it will keep running tasks at S1 until none remain), and tasks will be maximally reusable (because tasks that actually run have real inputs, without requiring an SDK to resolve them).
88
89 10 Brett Smith
h3. Review
90
91
Here are the interfaces that script authors will use:
92
93 16 Brett Smith
* They will subclass CrunchModule to define their own run() method and WorkMethods.
94 10 Brett Smith
* They will decorate their work methods with WorkMethod().  This decorator will not currently accept any arguments, but you'll still call it as if it did, so it can grow to accept new keyword arguments in the future.
95 1 Tom Clegg
* They will receive FutureOutput objects when they call WorkMethods, and pass those to other WorkMethod calls.  The user doesn't need to know any of the semantics of FutureOutput at all; only the Dispatcher does.
96 16 Brett Smith
* The main body of the script will call dispatcher.run_work(), passing in their CrunchModule subclass.
97 1 Tom Clegg
98 16 Brett Smith
This is a very small interface.  There's nothing specific to current Crunch in it.  When the library takes responsibility for some Arvados interface, it takes responsibility for all interactions with it: e.g., it both serializes and deserializes parameters; it controls all aspects of work scheduling.  Users shouldn't have to encode any Crunch specifics in their scripts.  I think that bodes well for future development of the SDK.
99 1 Tom Clegg
100
h2. Hypothetical future Crunch scripts
101
102
These are completely untested, since they can't actually run.  Please forgive cosmetic mistakes.
103 10 Brett Smith
104 6 Brett Smith
h3. Example: Crunch statistics on a Collection of fastj files indicating a tumor
105
106
This demonstrates scheduling tasks in order and explicit job output.
107
108
<pre><code class="python">
109
#!/usr/bin/env python
110 1 Tom Clegg
111 6 Brett Smith
import os
112
import pprint
113 14 Brett Smith
import subprocess
114 6 Brett Smith
115 16 Brett Smith
from arvados.crunch import CrunchModule, WorkMethod, AutoDispatcher
116 10 Brett Smith
from subprocess import check_call
117
118 16 Brett Smith
class TumorAnalysis(CrunchModule):
119 10 Brett Smith
    # input is a Collection named in a job parameter.
120
    # The SDK instantiates the corresponding Colection object and passes it in.
121 16 Brett Smith
    def run(self, input):
122 10 Brett Smith
        # analyze_tumors is being called with a generator of
123
        # FutureOutput objects.  The Dispatcher will unroll this (it
124
        # inevitably needs to traverse recursive data structures),
125 16 Brett Smith
        # schedule all the classify tasks at sequence N-1, then
126
        # schedule analyze_tumors at sequence N.  When the analyze_tumors
127
        # task runs, it will resolve all the futures and schedule a real
128
        # task to run at sequence N-1.
129
        return self.analyze_tumors(self.classify(in_file)
130
                                   for in_file in input.all_files()
131
                                   if in_file.name.endswith('.fastj'))
132 1 Tom Clegg
133 10 Brett Smith
    @WorkMethod()
134
    def classify(self, in_file):
135
        out_file = tempfile.NamedTemporaryFile()
136
        proc = subprocess.Popen(['normal_or_tumor'],
137
                                stdin=subprocess.PIPE, stdout=out_file)
138
        for data in in_file.readall():
139 1 Tom Clegg
            proc.stdin.write(data)
140
        proc.stdin.close()
141
        proc.wait()
142 10 Brett Smith
        if 'tumor' in outfile.read(4096):
143 1 Tom Clegg
            out_file.seek(0)
144
            out_coll = CollectionWriter()
145 10 Brett Smith
            with out_coll.open(in_file.name) as output:
146
                output.write(out_file.read())
147
            return out_coll
148
        return None
149 5 Brett Smith
150 1 Tom Clegg
    @WorkMethod()
151 14 Brett Smith
    def analyze_tumors(self, results):
152
        compiled = {}
153 12 Brett Smith
        for result in results:
154 1 Tom Clegg
            if result is None:
155 5 Brett Smith
                continue
156
            # Imagine a reduce-type step operating on the file in the
157
            # Collection.
158
            compiled[thing] = ...
159
        return compiled
160
161 12 Brett Smith
162 1 Tom Clegg
if __name__ == '__main__':
163 16 Brett Smith
    AutoDispatcher().run_work(TumorAnalysis)
164 12 Brett Smith
</code></pre>
165
166
h3. Example from #3603
167 1 Tom Clegg
168
This is the script that Abram used to illustrate #3603.
169
170 12 Brett Smith
<pre><code class="python">
171 1 Tom Clegg
#!/usr/bin/env python
172 12 Brett Smith
173 16 Brett Smith
from arvados.crunch import CrunchModule, WorkMethod, AutoDispatcher
174 12 Brett Smith
from subprocess import check_call
175 1 Tom Clegg
176 16 Brett Smith
class Example3603(CrunchModule):
177
    def run(self, human_collection_list, refpath):
178
        return [self.run_ruler(refpath, line) for line in human_collection_list]
179 1 Tom Clegg
180
    @WorkMethod()
181 16 Brett Smith
    def run_ruler(self, refpath, input_line):
182
        fastj_id, human_id = input_line.strip().split(',')
183 1 Tom Clegg
        # CollectionReader and StreamFileReader objects should grow a
184 12 Brett Smith
        # mount_path() method that gives the object's location under
185
        # $TASK_KEEPMOUNT.
186 14 Brett Smith
        check_call(["tileruler", "--crunch", "--noterm", "abv",
187
                    "-human", human_id,
188
                    "-fastj-path", CollectionReader(fastj_id).mount_path(),
189
                    "-lib-path", refpath])
190
        out_coll = CollectionWriter()
191 13 Brett Smith
        # The argument is the path where tileruler writes output.
192 1 Tom Clegg
        out_coll.write_directory_tree('.')
193 12 Brett Smith
        return out_coll
194 14 Brett Smith
195
196 1 Tom Clegg
if __name__ == '__main__':
197 16 Brett Smith
    AutoDispatcher().run_work(Example3603)
198 1 Tom Clegg
</code></pre>
199
200
h2. Notes/TODO
201
202
(Some of these may be out-of-date, but I'll leave that for the suggesters to decide.)
203
204
* Important concurrency limits that job scripts must be able to express:
205
** Task Y and Z cannot run on the same worker node without interfering with each other (e.g., due to RAM requirements).
206
** Schedule at most N instance of task T on the same node.  (The @parallel_with=[]@ example above is stronger than we actually want, for a multithreaded task.  It would be okay to run several of these tasks in parallel, as long as there's only one per node.  This could be expressed as a particular case of the above bullet: "Task T cannot run on the same worker node as task T without interference."  But that feels less natural.)
207
*** This is an SDK for Crunch, and currently Crunch gives the user no control of task scheduling beyond the sequence, like how one compute node can or can't be shared.  I don't know how to write these features given that there's no way to express these constraints in a way Crunch will recognize and expect (beyond the brute-force "don't parallelize this").
208
* In general, the output name is not known until the task is nearly finished. Frequently it is clearer to specify it when the task is queued, though. We should provide a convenient way to do this without any boilerplate in the queued task.
209
** This seems pretty easily doable by passing the desired filename into the task method as a parameter.  Then the code can work with @self.output.open(passed_filename)@.
210
* A second example that uses a "case" and "control" input (e.g., "tumor" and "normal") might help reveal features.