Project

General

Profile

Python SDK » History » Version 1

Tom Clegg, 08/16/2014 01:03 AM

1 1 Tom Clegg
h1. Python SDK
2
3
(design draft)
4
5
<pre><code class="python">
6
#!/usr/bin/env python
7
8
from arvados import CrunchJob
9
10
import examplelib
11
import re
12
13
class NormalizeMatchingFiles(CrunchJob):
14
    @CrunchJob.task()
15
    def grep_files(self):
16
        # CrunchJob instantiates input parameters based on the
17
        # dataclass attribute.  When we ask for the input parameter,
18
        # CrunchJob sees that it's a Collection, and returns a
19
        # CollectionReader object.
20
        for filename in self.job_param('input').filenames():
21
            self.grep_file(filename)
22
23
    @CrunchJob.task()
24
    def grep_file(self, filename):
25
        regexp = re.compile(self.job_param('pattern'))
26
        with self.job_param('input').open(filename) as in_file:
27
            for line in in_file:
28
                if regexp.search(line):
29
                    self.normalize(filename)
30
                    break
31
32
    # examplelib is already multi-threaded and will peg the whole
33
    # compute node.  These tasks should run sequentially.
34
    @CrunchJob.task(parallel_with=[])
35
    def normalize(self, filename):
36
        output = examplelib.frob(self.job_param('input').mount_path(filename))
37
        # self.output is a CollectionWriter.  When this task method finishes,
38
        # CrunchJob checks if we wrote anything to it.  If so, it takes care
39
        # of finishing the upload process, and sets this task's output to the
40
        # Collection UUID.
41
        with self.output.open(filename) as out_file:
42
            out_file.write(output)
43
44
45
if __name__ == '__main__':
46
    NormalizeMatchingFiles(task0='grep_files').main()
47
</code></pre>