Project

General

Profile

Python SDK » History » Version 10

Brett Smith, 01/14/2015 10:57 PM
revamp with new ideas

1 1 Tom Clegg
h1. Python SDK
2
3 10 Brett Smith
This is a design draft for a next-generation Python SDK, making it easier to write Crunch scripts today, with room to grow for future workflow changes in Arvados.
4 1 Tom Clegg
5 10 Brett Smith
{{toc}}
6 1 Tom Clegg
7 10 Brett Smith
h2. Goals
8 1 Tom Clegg
9 10 Brett Smith
As I understand them, in order of importance:
10 1 Tom Clegg
11 10 Brett Smith
* The SDK should provide a high-level, Pythonic interface to dispatch work through Arvados.  It should eliminate any need to manipulate jobs or tasks directly via API calls, or manually instantiate objects from parameter strings.  There should be a bare minimum of boilerplate code in a user's Crunch script—just the tiny bit necessary to use the SDK.
12
* The SDK should be usable with today's Crunch, so users can start working with it and give us feedback to inform future development.
13
* The SDK should have clear room to grow to accommodate future workflow improvements, whether that's new Crunch features (including possibly reusable tasks and/or elimination of the job/task distinction), hooks with the Common Workflow Language, or whatever.
14
  The first version of the SDK will not support features we expect these systems to have—that would conflict with the previous goal.  The idea here is that the API should not rely on details of today's Crunch implementation, and should have clear spaces where users will be able to use the features of more powerful work dispatch systems in the future.
15 1 Tom Clegg
16 10 Brett Smith
h2. Components
17 1 Tom Clegg
18 10 Brett Smith
h3. Dispatcher
19 1 Tom Clegg
20 10 Brett Smith
Dispatcher classes are internal to the SDK; users do not interact with them at all.  They provide a consistent interface to talk to the scheduling system: they know how to run the right code for this instantiation (e.g., the current task); how to dispatch new work to the scheduling system; how to de/serialize parameters, etc.
21 1 Tom Clegg
22 10 Brett Smith
Early on, we'll have a traditional Crunch dispatcher, and a "running locally for debugging" dispatcher.  If we want to support entirely different backends in the future, the Dispatcher interface should provide the right level of abstraction to do that.  While I'm not sketching out an API here (since it's not user-facing), we do need to be careful to make sure that Dispatcher methods hit that design target.
23 1 Tom Clegg
24 10 Brett Smith
h3. CrunchScript
25 1 Tom Clegg
26 10 Brett Smith
The bulk of a user's Crunch script will define a subclass of the SDK's CrunchScript class.  The user extends it with WorkMethods that represent work units, and a start() method that implements code to start the work.  That done, the user calls subclass.run() to get the appropriate code running.
27 1 Tom Clegg
28 10 Brett Smith
The run() class method introspects the environment to find the appropriate Dispatcher class to use.  It instantiates that, passing along an instantiation of itself.  To better illustrate, I'll describe how the traditional Crunch dispatcher would take over from here.
29 1 Tom Clegg
30 10 Brett Smith
The Crunch Dispatcher gets the current task.  If that's task 0, it gets the current job, deserializes the job parameters, and calls subclass.start() with those arguments.  For any other task, it looks up the task's corresponding WorkMethod, and runs its code with deserialized parameters.
31 1 Tom Clegg
32 10 Brett Smith
Either way, the method's return value is captured.  If it's a Collection object, the Dispatcher makes sure it's saved, then records the portable data hash as the task's output.  For any other return value except None, it serializes the return value to JSON, saves that in a Collection, and uses that as the task's output.
33
34
h3. WorkMethod
35
36
WorkMethod is a class that should decorate instance methods defined on the CrunchScript subclass.  It demarcates work that can be scheduled through a Dispatcher.  Use looks like:
37
38
<pre><code class="python">@WorkMethod()
39
def task_code(self, param1, param2):
40
    # implementation
41 1 Tom Clegg
</code></pre>
42
43 10 Brett Smith
When a WorkMethod is called—from start() or another WorkMethod's code—the user's code isn't run.  Instead, we give the Dispatcher all the information about the call.  The Dispatcher serializes the parameters and schedules the real work method to be run later.  We wrap information about the work's scheduling in a FutureOutput object, which I'll get to next.
44 1 Tom Clegg
45 10 Brett Smith
Note that WorkMethod accepts arguments, and we're passing it none.  We expect the constructor to grow optional keyword arguments over time that let the user specify scheduling constraints.
46
47
h3. FutureOutput
48
49
FutureOutput objects represent the output of work that has been scheduled but not yet run.  Whenever a WorkMethod is called, it returns a FutureOutput object, and the user can pass that back in as an argument to later WorkMethod calls.  When that happens, the Dispatcher does two things.  First, it introspects all the FutureObjects passed in to figure out how the new work should be scheduled.  Second, when it's time to run the later work, it finds the real output generated by the earlier work, and passes that in when running the user code.  To be clear: each Dispatcher will have its own paired FutureObject implementation.
50
51
Let me talk about a Crunch example to make things concrete.  Roughly, the FutureObject will wrap the task we create to represent the work.  Imagine a simple linear Crunch job:
52
53
<pre><code class="python">step1out = task1(arg1, arg2)
54
task2(step1out, arg)</code></pre>
55
56
When task2 is called, the Crunch Dispatcher introspects the arguments, and finds the step1out FutureObject among them.  The task2 code can't be run until task1 has finished.  The Crunch Dispatcher gets task1's sequence number from step1out, and creates a task to run task2 with a greater sequence number.
57
58
When Crunch actually starts the compute work for task2, the Crunch Dispatcher will see that this argument should have the value of task1's output.  It will deserialize that from the task output mirroring the rules described earlier, then pass that object in as the value when it calls the user's code.
59
60
h3. Review
61
62
Here are the interfaces that script authors will use:
63
64
* They will subclass CrunchScript to define their own start() method and WorkMethods.
65
* They will decorate their work methods with WorkMethod().  This decorator will not currently accept any arguments, but you'll still call it as if it did, so it can grow to accept new keyword arguments in the future.
66
* They will receive FutureOutput objects when they call WorkMethods, and pass those to other WorkMethod calls.  The user doesn't need to know any of the semantics of FutureOutput at all; only the Dispatcher does.
67
* The main body of the script will call the run() class method on their CrunchScript subclass.
68
69
This is a very small interface.  There's nothing Crunch-specific in it.  When the library takes responsibility for some Arvados interface, it takes responsibility for all interactions with it: e.g., it both serializes and deserializes parameters; it controls all aspects of work scheduling.  Users shouldn't have to encode any Crunch specifics in their scripts.  I think that bodes well for future development of the SDK.
70
71
h2. Hypothetical future Crunch scripts
72
73
These are completely untested, since they can't actually run.  Please forgive cosmetic mistakes.
74
75 6 Brett Smith
h3. Example: Crunch statistics on a Collection of fastj files indicating a tumor
76
77
This demonstrates scheduling tasks in order and explicit job output.
78
79
<pre><code class="python">
80
#!/usr/bin/env python
81
82
import os
83
import pprint
84 10 Brett Smith
import subprocess
85 6 Brett Smith
86 10 Brett Smith
from arvados import CrunchScript, WorkMethod
87 6 Brett Smith
from subprocess import check_call
88
89 10 Brett Smith
class TumorAnalysis(CrunchScript):
90
    # input is a Collection named in a job parameter.
91
    # The SDK instantiates the corresponding Colection object and passes it in.
92
    def start(self, input):
93
        # analyze_tumors is being called with a generator of
94
        # FutureOutput objects.  The Dispatcher will unroll this (it
95
        # inevitably needs to traverse recursive data structures),
96
        # schedule all the classify tasks at sequence N+1, then
97
        # schedule analyze_tumors after at sequence N+2.
98
        self.analyze_tumors(self.classify(in_file)
99
                            for in_file in input.all_files()
100
                            if in_file.name.endswith('.fastj'))
101 6 Brett Smith
102 10 Brett Smith
    @WorkMethod()
103
    def classify(self, in_file):
104
        out_file = tempfile.NamedTemporaryFile()
105
        proc = subprocess.Popen(['normal_or_tumor'],
106
                                stdin=subprocess.PIPE, stdout=out_file)
107
        for data in in_file.readall():
108
            proc.stdin.write(data)
109
        proc.stdin.close()
110
        proc.wait()
111
        if 'tumor' in outfile.read(4096):
112
            out_file.seek(0)
113
            out_coll = CollectionWriter()
114
            with out_coll.open(in_file.name) as output:
115
                output.write(out_file.read())
116
            return out_coll
117
        return None
118 5 Brett Smith
119 10 Brett Smith
    @WorkMethod()
120
    def analyze_tumors(self, results):
121 5 Brett Smith
        compiled = {}
122 10 Brett Smith
        for result in results:
123
            if result is None:
124
                continue
125
            # Imagine a reduce-type step operating on the file in the
126
            # Collection.
127
            compiled[thing] = ...
128
        return compiled
129 5 Brett Smith
130
131
if __name__ == '__main__':
132 10 Brett Smith
    TumorAnalysis.run()
133 5 Brett Smith
</code></pre>
134
135 10 Brett Smith
h3. Example from #3603 (FIXME: UPDATE THIS)
136 5 Brett Smith
137
This is the script that Abram used to illustrate #3603.
138
139
<pre><code class="python">
140
#!/usr/bin/env python
141
142
from arvados import Collection, CrunchJob
143
from subprocess import check_call
144
145
class Example3603(CrunchJob):
146
    @CrunchJob.task()
147 2 Tom Clegg
    def parse_human_map(self):
148
        refpath = self.job_param('REFPATH').name
149 9 Brett Smith
        for line in self.job_param('HUMAN_COLLECTION_LIST'):
150 2 Tom Clegg
            fastj_id, human_id = line.strip().split(',')
151 8 Brett Smith
            self.run_ruler(refpath, fastj_id, human_id)
152 9 Brett Smith
153 2 Tom Clegg
    @CrunchJob.task()
154 9 Brett Smith
    def run_ruler(self, refpath, fastj_id, human_id):
155 2 Tom Clegg
        check_call(["tileruler", "--crunch", "--noterm", "abv",
156 1 Tom Clegg
                    "-human", human_id,
157 9 Brett Smith
                    "-fastj-path", Collection(fastj_id).mount_path(),
158 1 Tom Clegg
                    "-lib-path", refpath])
159
        self.output.add('.')  # Or the path where tileruler writes output.
160
161
162
if __name__ == '__main__':
163
    Example3603(task0='parse_human_map').run()
164
</code></pre>
165
166
h2. Notes/TODO
167
168
* Important concurrency limits that job scripts must be able to express:
169
** Task Z cannot start until all outputs/side effects of tasks W, X, Y are known/complete (e.g., because Z uses WXY's outputs as its inputs).
170
*** I'm considering having task methods immediately return some kind of future object.  You could pass that in to other task methods, and the SDK would introspect that and schedule the second task to come after the first.  This would also make it look more like normal Python: a higher-level function can call one task method, grab its output, and pass it in to another.  Probably the only catch is that we wouldn't be using the original method's real return value, but instead the task's output collection.  When we run the second task, what was represented by a future can be replaced with a CollectionReader object for the task's output.
171
** Task Y and Z cannot run on the same worker node without interfering with each other (e.g., due to RAM requirements).
172
** Schedule at most N instance of task T on the same node.  (The @parallel_with=[]@ example above is stronger than we actually want, for a multithreaded task.  It would be okay to run several of these tasks in parallel, as long as there's only one per node.  This could be expressed as a particular case of the above bullet: "Task T cannot run on the same worker node as task T without interference."  But that feels less natural.)
173
*** This is an SDK for Crunch, and currently Crunch gives the user no control of task scheduling beyond the sequence, like how one compute node can or can't be shared.  I don't know how to write these features given that there's no way to express these constraints in a way Crunch will recognize and expect (beyond the brute-force "don't parallelize this").
174
* In general, the output name is not known until the task is nearly finished. Frequently it is clearer to specify it when the task is queued, though. We should provide a convenient way to do this without any boilerplate in the queued task.
175
** This seems pretty easily doable by passing the desired filename into the task method as a parameter.  Then the code can work with @self.output.open(passed_filename)@.
176
* A second example that uses a "case" and "control" input (e.g., "tumor" and "normal") might help reveal features.
177
* Should get more clear about how the output of the job (as opposed to the output of the last task) is to be set. The obvious way (concatenate all task outputs) should be a one-liner, if not implicit. Either way, it should run in a task rather than being left up to @crunch-job@.
178
** This also sounds like something that needs to be settled at the Crunch level.  But it seems like we could be pretty accommodating by having crunch-job just say, "If the job set an output, great; if not, collate the task outputs."  Then the SDK can provide pre-defined task methods for common output setups: collate the output of all tasks or a subset of them; use the output of a specific task; etc.