Project

General

Profile

Python SDK » History » Version 8

Brett Smith, 09/23/2014 02:21 PM
notes about parallel tasks that already have parallelized code

1 1 Tom Clegg
h1. Python SDK
2
3
(design draft)
4
5 5 Brett Smith
h1. Hypothetical future Crunch scripts
6 1 Tom Clegg
7 5 Brett Smith
We're writing these out with the goal of designing a new SDK for Crunch script authors.
8
9
{{toc}}
10
11
h2. Example scripts
12
13 6 Brett Smith
h3. Example: "Normalize" files matching a regexp
14 5 Brett Smith
15 1 Tom Clegg
<pre><code class="python">
16
#!/usr/bin/env python
17
18
from arvados import CrunchJob
19
20
import examplelib
21
import re
22
23
class NormalizeMatchingFiles(CrunchJob):
24
    @CrunchJob.task()
25
    def grep_files(self):
26
        # CrunchJob instantiates input parameters based on the
27
        # dataclass attribute.  When we ask for the input parameter,
28
        # CrunchJob sees that it's a Collection, and returns a
29
        # CollectionReader object.
30 7 Brett Smith
        input_collection = self.job_param('input')
31
        for filename in input_collection.filenames():
32
            self.grep_file(self.job_param('pattern'), input_collection, filename)
33 1 Tom Clegg
34
    @CrunchJob.task()
35 3 Brett Smith
    def grep_file(self, pattern, collection, filename):
36
        regexp = re.compile(pattern)
37
        with collection.open(filename) as in_file:
38 1 Tom Clegg
            for line in in_file:
39
                if regexp.search(line):
40 4 Brett Smith
                    self.normalize(in_file)
41 1 Tom Clegg
                    break
42
43
    # examplelib is already multi-threaded and will peg the whole
44
    # compute node.  These tasks should run sequentially.
45 8 Brett Smith
    # Tasks are normally parallel with themselves; here we override that
46
    # default to say that the task is parallel with nothing, not even itself.
47 4 Brett Smith
    # When tasks are created, Arvados-specific objects like Collection file
48
    # objects are serialized as task parameters.  CrunchJob instantiates
49
    # these parameters as real objects when it runs the task.
50 1 Tom Clegg
    @CrunchJob.task(parallel_with=[])
51 7 Brett Smith
    def normalize(self, collection_file):
52
        output = examplelib.frob(collection_file.mount_path())
53 1 Tom Clegg
        # self.output is a CollectionWriter.  When this task method finishes,
54
        # CrunchJob checks if we wrote anything to it.  If so, it takes care
55
        # of finishing the upload process, and sets this task's output to the
56
        # Collection UUID.
57 7 Brett Smith
        with self.output.open(collection_file.name) as out_file:
58 1 Tom Clegg
            out_file.write(output)
59
60
61
if __name__ == '__main__':
62 7 Brett Smith
    NormalizeMatchingFiles(task0='grep_files').run()
63 1 Tom Clegg
</code></pre>
64 7 Brett Smith
65 6 Brett Smith
66
h3. Example: Crunch statistics on a Collection of fastj files indicating a tumor
67
68
This demonstrates scheduling tasks in order and explicit job output.
69
70
<pre><code class="python">
71
#!/usr/bin/env python
72
73
import glob
74
import os
75
import pprint
76
77
from arvados import CrunchJob
78
from subprocess import check_call
79
80
class TumorAnalysis(CrunchJob):
81
    OUT_EXT = '.analysis'
82
83
    @CrunchJob.task()
84
    def check_fastjs(self):
85
        in_coll = self.job_param('input')
86
        for name in in_coll.filenames():
87
            if name.endswith('.fastj'):
88
                self.classify(in_coll, name)
89
        # analyze_tumors gets scheduled to run after all the classification,
90
        # since they're not parallel with each other (and it was invoked later).
91
        self.analyze_tumors()
92
93
    @CrunchJob.task()
94
    def classify(self, collection, filename):
95
        # Methods that refer to directories, like mount_path and job_dir,
96
        # work like os.path.join when you pass them arguments.
97
        check_call(['normal_or_tumor', collection.mount_path(filename)])
98
        outpath = filename + self.OUT_EXT
99
        with open(outpath) as result:
100
             is_tumor = 'tumor' in result.read(4096)
101
        if is_tumor:
102
            os.rename(outpath, self.job_dir(outpath))
103
104
    @CrunchJob.task()
105
    def analyze_tumors(self):
106
        compiled = {}
107
        results = glob.glob(self.job_dir('*' + self.OUT_EXT))
108
        for outpath in results:
109
            with open(outpath) as outfile:
110
                compiled[thing] = ...  # Imagine this is a reduce-type step.
111
        # job_output is a CollectionWriter.  Writing to it overrides the
112
        # default behavior where job output is collated task output.
113
        with self.job_output.open('compiled_numbers.log') as resultfile:
114
            pprint.pprint(compiled, resultfile)
115
116
117
if __name__ == '__main__':
118
    TumorAnalysis(task0='check_fastjs').run()
119 1 Tom Clegg
</code></pre>
120
121 5 Brett Smith
h3. Example from #3603
122
123
This is the script that Abram used to illustrate #3603.
124
125
<pre><code class="python">
126
#!/usr/bin/env python
127
128
from arvados import Collection, CrunchJob
129
from subprocess import check_call
130
131
class Example3603(CrunchJob):
132
    @CrunchJob.task()
133
    def parse_human_map(self):
134
        refpath = self.job_param('REFPATH').name
135
        for line in self.job_param('HUMAN_COLLECTION_LIST'):
136
            fastj_id, human_id = line.strip().split(',')
137
            self.run_ruler(refpath, fastj_id, human_id)
138
139
    @CrunchJob.task()
140
    def run_ruler(self, refpath, fastj_id, human_id):
141
        check_call(["tileruler", "--crunch", "--noterm", "abv",
142
                    "-human", human_id,
143
                    "-fastj-path", Collection(fastj_id).mount_path(),
144
                    "-lib-path", refpath])
145
        self.output.add('.')  # Or the path where tileruler writes output.
146
147
148
if __name__ == '__main__':
149
    Example3603(task0='parse_human_map').run()
150
</code></pre>
151
152
h2. Notes/TODO
153
154 2 Tom Clegg
* Important concurrency limits that job scripts must be able to express:
155
** Task Z cannot start until all outputs/side effects of tasks W, X, Y are known/complete (e.g., because Z uses WXY's outputs as its inputs).
156
** Task Y and Z cannot run on the same worker node without interfering with each other (e.g., due to RAM requirements).
157 8 Brett Smith
** Schedule at most N instance of task T on the same node.  (The @parallel_with=[]@ example above is stronger than we actually want, for a multithreaded task.  It would be okay to run several of these tasks in parallel, as long as there's only one per node.  This could be expressed as a particular case of the above bullet: "Task T cannot run on the same worker node as task T without interference."  But that feels less natural.)
158 2 Tom Clegg
* In general, the output name is not known until the task is nearly finished. Frequently it is clearer to specify it when the task is queued, though. We should provide a convenient way to do this without any boilerplate in the queued task.
159
* A second example that uses a "case" and "control" input (e.g., "tumor" and "normal") might help reveal features.
160 1 Tom Clegg
* Should get more clear about how the output of the job (as opposed to the output of the last task) is to be set. The obvious way (concatenate all task outputs) should be a one-liner, if not implicit. Either way, it should run in a task rather than being left up to @crunch-job@.