Project

General

Profile

Python SDK » History » Version 17

Peter Amstutz, 01/16/2020 02:26 PM

1 1 Tom Clegg
h1. Python SDK
2
3 17 Peter Amstutz
OUTDATED based on obsolete jobs api
4
5 10 Brett Smith
This is a design draft for a next-generation Python SDK, making it easier to write Crunch scripts today, with room to grow for future workflow changes in Arvados.
6 1 Tom Clegg
7 10 Brett Smith
{{toc}}
8 1 Tom Clegg
9 10 Brett Smith
h2. Goals
10 1 Tom Clegg
11 10 Brett Smith
As I understand them, in order of importance:
12 1 Tom Clegg
13 10 Brett Smith
* The SDK should provide a high-level, Pythonic interface to dispatch work through Arvados.  It should eliminate any need to manipulate jobs or tasks directly via API calls, or manually instantiate objects from parameter strings.  There should be a bare minimum of boilerplate code in a user's Crunch script—just the tiny bit necessary to use the SDK.
14
* The SDK should be usable with today's Crunch, so users can start working with it and give us feedback to inform future development.
15
* The SDK should have clear room to grow to accommodate future workflow improvements, whether that's new Crunch features (including possibly reusable tasks and/or elimination of the job/task distinction), hooks with the Common Workflow Language, or whatever.
16
  The first version of the SDK will not support features we expect these systems to have—that would conflict with the previous goal.  The idea here is that the API should not rely on details of today's Crunch implementation, and should have clear spaces where users will be able to use the features of more powerful work dispatch systems in the future.
17 1 Tom Clegg
18 10 Brett Smith
h2. Components
19 1 Tom Clegg
20 10 Brett Smith
h3. Dispatcher
21 1 Tom Clegg
22 14 Brett Smith
Dispatcher classes provide a consistent interface to talk to different scheduling systems: they know how to run the right code for this instantiation (e.g., the current task); how to dispatch new work to the scheduling system; how to de/serialize parameters; etc.
23 1 Tom Clegg
24 10 Brett Smith
Early on, we'll have a traditional Crunch dispatcher, and a "running locally for debugging" dispatcher.  If we want to support entirely different backends in the future, the Dispatcher interface should provide the right level of abstraction to do that.  While I'm not sketching out an API here (since it's not user-facing), we do need to be careful to make sure that Dispatcher methods hit that design target.
25 1 Tom Clegg
26 16 Brett Smith
Users should be able to instantiate Dispatcher classes to customize their behavior.  However, all Dispatchers should be able to run without any customization, and users shouldn't have to deal with them at all for the easy cases where they're fine with the scheduler's default behavior.  The SDK provides an AutoDispatcher callable that automatically resolves to the best Dispatcher available for the environment, or raises an exception if none are suitable.
27 14 Brett Smith
28 16 Brett Smith
h3. CrunchModule
29 14 Brett Smith
30 16 Brett Smith
The bulk of a user's Crunch script will define a subclass of the SDK's CrunchModule class.  The user extends it with WorkMethods that represent work units, and a run() method that implements the highest-level work, returning the final work output (e.g., the job output as opposed to task output).
31 1 Tom Clegg
32 16 Brett Smith
run() and WorkMethods follow similar patterns of rules for how they're run.  There are some small differences between them.  Right now you can think of them as parallels to the differences between jobs and tasks, but the concepts are general enough that they should be applicable to other scheduling systems.
33 1 Tom Clegg
34 16 Brett Smith
* run() is called with parameters that are deserialized from the Arvados job record.  WorkMethods are called with parameters provided in the user's CrunchModule code directly.  The Crunch dispatcher will stash those parameters in task parameters.
35
* Both run() and WorkMethods can call other WorkMethods.
36
* run() and WorkMethods can return a Collection object, a sequence of one or more FutureOutput objects, a simple (JSON-encodable) Python object, or None.  This output is serialized appropriately and recorded in Arvados.  run()'s output is the job output; WorkMethod output is task output.
37
  When the output is a sequence of FutureOutputs, it means "create my output by combining the outputs of these WorkMethods"—similarly to the way crunch-job creates job output by concatenating task output today.  The SDK will record the references immediately, and then later resolve them when able to record "real" output to aid work reuse.
38 14 Brett Smith
39 16 Brett Smith
The user should pass their CrunchModule class to the run_work() method of an instantiated dispatcher.  run_work() will instantiate the CrunchModule so that WorkMethods can be dispatched appropriately, then invoke run() or the user code of a WorkMethod appropriately.  To better illustrate, I'll describe how the current Crunch dispatcher would take over from here.
40 1 Tom Clegg
41 16 Brett Smith
The Crunch Dispatcher gets the current task.  If that's task 0, it gets the current job, deserializes the job parameters, and calls crunch_module.run() with those arguments.  For any other task, it looks up the task's corresponding WorkMethod, and runs its code with deserialized task parameters.
42 13 Brett Smith
43 10 Brett Smith
Either way, the method's return value is captured.  If it's a Collection object, the Dispatcher makes sure it's saved, then records the portable data hash as the task's output.  For any other return value except None, it serializes the return value to JSON, saves that in a Collection, and uses that as the task's output.
44 1 Tom Clegg
45
h3. WorkMethod
46 10 Brett Smith
47 16 Brett Smith
WorkMethod is a class that should decorate instance methods defined on the CrunchModule subclass.  It demarcates work that can be scheduled through a Dispatcher.  Use looks like:
48 10 Brett Smith
49
<pre><code class="python">@WorkMethod()
50 16 Brett Smith
def small_work(self, param1, param2):
51 1 Tom Clegg
    # implementation
52
</code></pre>
53
54 16 Brett Smith
When a WorkMethod is called—from run() or another WorkMethod's code—the user's code isn't run.  Instead, we give the Dispatcher all the information about the call.  The Dispatcher serializes the parameters and schedules the real work method to be run later.  We wrap information about the work's scheduling in a FutureOutput object, which I'll get to next.
55 15 Brett Smith
56 16 Brett Smith
Because the Dispatcher has to be able to serialize arguments and output, those values can only have types that the SDK knows how to work with, and some type information may be lost.  For the first SDK implementation, objects we can work with will be objects that can be encoded to JSON, plus Collection objects.  Sequence types that aren't native to JSON, like tuples and iterators, will become lists at the method's boundaries.  (TBD: Or maybe we should require exactly a known type, to avoid surprising coercions?)
57 1 Tom Clegg
58
Note that WorkMethod accepts arguments, and we're passing it none.  We expect the constructor to grow optional keyword arguments over time that let the user specify scheduling constraints.
59
60
h3. FutureOutput
61
62 16 Brett Smith
FutureOutput objects represent the output of work that has been scheduled but not yet run.  Whenever a WorkMethod is called, it returns a FutureOutput object.  Users can do two things with these objects: pass them in as arguments to other WorkMethod calls, and return them from run() or WorkMethods.
63 1 Tom Clegg
64 16 Brett Smith
When a WorkMethod is called with at least one FutureOutput object, the Dispatcher does two things.  First, it can introspect those FutureObjects to figure out how the new work should be scheduled.  Second, when it's time to run the work of the second WorkMethod, it finds the real output generated by running the first WorkMethod, and passes that in when running the user code.
65 1 Tom Clegg
66 16 Brett Smith
When run() or a WorkMethod returns a list of FutureOutput objects, those outputs will be combined to create the final corresponding output once they're available.  Refer to the discussion in the CrunchModule section above.
67 1 Tom Clegg
68 16 Brett Smith
To be clear: each Dispatcher will have its own paired FutureObject implementation.
69 1 Tom Clegg
70 16 Brett Smith
FutureOutput objects cannot be resolved or manipulated by the user directly.  Users can't wait for the real output to become available (because in some schedulers, like current Crunch, you'll wait forever).  The objects should have no user-facing methods that might suggest this is possible.  We'll probably even redefine some very basic methods (e.g., @__str__@) to be _less_ useful than object's default implementation, to really drive this point home.
71 1 Tom Clegg
72 16 Brett Smith
h3. Crunch dispatch example
73 1 Tom Clegg
74 16 Brett Smith
Let me talk about a Crunch example to make things concrete.  The Crunch dispatcher is expected to create tasks at set sequence levels, based on when they can run:
75
76
* S1: Sequence for WorkMethod tasks that can be run now—all parameters can be deserialized into useful objects
77
* S2: Sequence for WorkMethod tasks that have FutureOutput parameters that need to be resolved
78
* S3: Sequence for internal tasks to set real output records for methods that returned FutureOutput objects
79
80
Roughly, the FutureObject will wrap the task we create to represent the work.  Imagine a simple linear Crunch job:
81
82
<pre><code class="python">class CrunchExample(CrunchModule):
83
    def run(self, arg1, arg2, arg3):
84
        output1 = work_method1(arg1, arg2)
85
        return [work_method2(output1, arg3)]</code></pre>
86
87
work_method1 is called, and immediately returns a FutureOutput with information about the task scheduled for it at sequence S1.  When work_method2 is called, the Crunch Dispatcher introspects the arguments, and finds the output1 FutureObject among them.  The work_method2 code can't be run until work_method1 runs and generates output.  The Crunch Dispatcher creates a task to run work_method2 at sequence S2.  work_method2 returns a FutureOutput for this task, which run() returns directly.  Because run() has returned a list of FutureOutput objects, the Crunch Dispatcher creates an internal task to run at S3 to combine these outputs into a single collection and set them as the final job output.
88
89
When Crunch actually starts the task for work_method2, the Crunch Dispatcher will see that an argument should have the value of work_method1's output.  It will deserialize that from the task output mirroring the rules described earlier, then duplicate this task to run at sequence S1 with real inputs.  This dispatch strategy means the Crunch job will run as many tasks as possible before trying to resolve FutureOutputs (it will keep running tasks at S1 until none remain), and tasks will be maximally reusable (because tasks that actually run have real inputs, without requiring an SDK to resolve them).
90
91 10 Brett Smith
h3. Review
92
93
Here are the interfaces that script authors will use:
94
95 16 Brett Smith
* They will subclass CrunchModule to define their own run() method and WorkMethods.
96 10 Brett Smith
* They will decorate their work methods with WorkMethod().  This decorator will not currently accept any arguments, but you'll still call it as if it did, so it can grow to accept new keyword arguments in the future.
97 1 Tom Clegg
* They will receive FutureOutput objects when they call WorkMethods, and pass those to other WorkMethod calls.  The user doesn't need to know any of the semantics of FutureOutput at all; only the Dispatcher does.
98 16 Brett Smith
* The main body of the script will call dispatcher.run_work(), passing in their CrunchModule subclass.
99 1 Tom Clegg
100 16 Brett Smith
This is a very small interface.  There's nothing specific to current Crunch in it.  When the library takes responsibility for some Arvados interface, it takes responsibility for all interactions with it: e.g., it both serializes and deserializes parameters; it controls all aspects of work scheduling.  Users shouldn't have to encode any Crunch specifics in their scripts.  I think that bodes well for future development of the SDK.
101 1 Tom Clegg
102
h2. Hypothetical future Crunch scripts
103
104
These are completely untested, since they can't actually run.  Please forgive cosmetic mistakes.
105 10 Brett Smith
106 6 Brett Smith
h3. Example: Crunch statistics on a Collection of fastj files indicating a tumor
107
108
This demonstrates scheduling tasks in order and explicit job output.
109
110
<pre><code class="python">
111
#!/usr/bin/env python
112 1 Tom Clegg
113 6 Brett Smith
import os
114
import pprint
115 14 Brett Smith
import subprocess
116 6 Brett Smith
117 16 Brett Smith
from arvados.crunch import CrunchModule, WorkMethod, AutoDispatcher
118 10 Brett Smith
from subprocess import check_call
119
120 16 Brett Smith
class TumorAnalysis(CrunchModule):
121 10 Brett Smith
    # input is a Collection named in a job parameter.
122
    # The SDK instantiates the corresponding Colection object and passes it in.
123 16 Brett Smith
    def run(self, input):
124 10 Brett Smith
        # analyze_tumors is being called with a generator of
125
        # FutureOutput objects.  The Dispatcher will unroll this (it
126
        # inevitably needs to traverse recursive data structures),
127 16 Brett Smith
        # schedule all the classify tasks at sequence N-1, then
128
        # schedule analyze_tumors at sequence N.  When the analyze_tumors
129
        # task runs, it will resolve all the futures and schedule a real
130
        # task to run at sequence N-1.
131
        return self.analyze_tumors(self.classify(in_file)
132
                                   for in_file in input.all_files()
133
                                   if in_file.name.endswith('.fastj'))
134 1 Tom Clegg
135 10 Brett Smith
    @WorkMethod()
136
    def classify(self, in_file):
137
        out_file = tempfile.NamedTemporaryFile()
138
        proc = subprocess.Popen(['normal_or_tumor'],
139
                                stdin=subprocess.PIPE, stdout=out_file)
140
        for data in in_file.readall():
141 1 Tom Clegg
            proc.stdin.write(data)
142
        proc.stdin.close()
143
        proc.wait()
144 10 Brett Smith
        if 'tumor' in outfile.read(4096):
145 1 Tom Clegg
            out_file.seek(0)
146
            out_coll = CollectionWriter()
147 10 Brett Smith
            with out_coll.open(in_file.name) as output:
148
                output.write(out_file.read())
149
            return out_coll
150
        return None
151 5 Brett Smith
152 1 Tom Clegg
    @WorkMethod()
153 14 Brett Smith
    def analyze_tumors(self, results):
154
        compiled = {}
155 12 Brett Smith
        for result in results:
156 1 Tom Clegg
            if result is None:
157 5 Brett Smith
                continue
158
            # Imagine a reduce-type step operating on the file in the
159
            # Collection.
160
            compiled[thing] = ...
161
        return compiled
162
163 12 Brett Smith
164 1 Tom Clegg
if __name__ == '__main__':
165 16 Brett Smith
    AutoDispatcher().run_work(TumorAnalysis)
166 12 Brett Smith
</code></pre>
167
168
h3. Example from #3603
169 1 Tom Clegg
170
This is the script that Abram used to illustrate #3603.
171
172 12 Brett Smith
<pre><code class="python">
173 1 Tom Clegg
#!/usr/bin/env python
174 12 Brett Smith
175 16 Brett Smith
from arvados.crunch import CrunchModule, WorkMethod, AutoDispatcher
176 12 Brett Smith
from subprocess import check_call
177 1 Tom Clegg
178 16 Brett Smith
class Example3603(CrunchModule):
179
    def run(self, human_collection_list, refpath):
180
        return [self.run_ruler(refpath, line) for line in human_collection_list]
181 1 Tom Clegg
182
    @WorkMethod()
183 16 Brett Smith
    def run_ruler(self, refpath, input_line):
184
        fastj_id, human_id = input_line.strip().split(',')
185 1 Tom Clegg
        # CollectionReader and StreamFileReader objects should grow a
186 12 Brett Smith
        # mount_path() method that gives the object's location under
187
        # $TASK_KEEPMOUNT.
188 14 Brett Smith
        check_call(["tileruler", "--crunch", "--noterm", "abv",
189
                    "-human", human_id,
190
                    "-fastj-path", CollectionReader(fastj_id).mount_path(),
191
                    "-lib-path", refpath])
192
        out_coll = CollectionWriter()
193 13 Brett Smith
        # The argument is the path where tileruler writes output.
194 1 Tom Clegg
        out_coll.write_directory_tree('.')
195 12 Brett Smith
        return out_coll
196 14 Brett Smith
197
198 1 Tom Clegg
if __name__ == '__main__':
199 16 Brett Smith
    AutoDispatcher().run_work(Example3603)
200 1 Tom Clegg
</code></pre>
201
202
h2. Notes/TODO
203
204
(Some of these may be out-of-date, but I'll leave that for the suggesters to decide.)
205
206
* Important concurrency limits that job scripts must be able to express:
207
** Task Y and Z cannot run on the same worker node without interfering with each other (e.g., due to RAM requirements).
208
** Schedule at most N instance of task T on the same node.  (The @parallel_with=[]@ example above is stronger than we actually want, for a multithreaded task.  It would be okay to run several of these tasks in parallel, as long as there's only one per node.  This could be expressed as a particular case of the above bullet: "Task T cannot run on the same worker node as task T without interference."  But that feels less natural.)
209
*** This is an SDK for Crunch, and currently Crunch gives the user no control of task scheduling beyond the sequence, like how one compute node can or can't be shared.  I don't know how to write these features given that there's no way to express these constraints in a way Crunch will recognize and expect (beyond the brute-force "don't parallelize this").
210
* In general, the output name is not known until the task is nearly finished. Frequently it is clearer to specify it when the task is queued, though. We should provide a convenient way to do this without any boilerplate in the queued task.
211
** This seems pretty easily doable by passing the desired filename into the task method as a parameter.  Then the code can work with @self.output.open(passed_filename)@.
212
* A second example that uses a "case" and "control" input (e.g., "tumor" and "normal") might help reveal features.