Project

General

Profile

Actions

Python SDK » History » Revision 1

Revision 1/17 | Next »
Tom Clegg, 08/16/2014 01:03 AM


Python SDK

(design draft)

#!/usr/bin/env python

from arvados import CrunchJob

import examplelib
import re

class NormalizeMatchingFiles(CrunchJob):
    @CrunchJob.task()
    def grep_files(self):
        # CrunchJob instantiates input parameters based on the
        # dataclass attribute.  When we ask for the input parameter,
        # CrunchJob sees that it's a Collection, and returns a
        # CollectionReader object.
        for filename in self.job_param('input').filenames():
            self.grep_file(filename)

    @CrunchJob.task()
    def grep_file(self, filename):
        regexp = re.compile(self.job_param('pattern'))
        with self.job_param('input').open(filename) as in_file:
            for line in in_file:
                if regexp.search(line):
                    self.normalize(filename)
                    break

    # examplelib is already multi-threaded and will peg the whole
    # compute node.  These tasks should run sequentially.
    @CrunchJob.task(parallel_with=[])
    def normalize(self, filename):
        output = examplelib.frob(self.job_param('input').mount_path(filename))
        # self.output is a CollectionWriter.  When this task method finishes,
        # CrunchJob checks if we wrote anything to it.  If so, it takes care
        # of finishing the upload process, and sets this task's output to the
        # Collection UUID.
        with self.output.open(filename) as out_file:
            out_file.write(output)

if __name__ == '__main__':
    NormalizeMatchingFiles(task0='grep_files').main()

Updated by Tom Clegg over 9 years ago · 1 revisions