Actions
Python SDK » History » Revision 1
Revision 1/17
| Next »
Tom Clegg, 08/16/2014 01:03 AM
Python SDK¶
(design draft)
#!/usr/bin/env python
from arvados import CrunchJob
import examplelib
import re
class NormalizeMatchingFiles(CrunchJob):
@CrunchJob.task()
def grep_files(self):
# CrunchJob instantiates input parameters based on the
# dataclass attribute. When we ask for the input parameter,
# CrunchJob sees that it's a Collection, and returns a
# CollectionReader object.
for filename in self.job_param('input').filenames():
self.grep_file(filename)
@CrunchJob.task()
def grep_file(self, filename):
regexp = re.compile(self.job_param('pattern'))
with self.job_param('input').open(filename) as in_file:
for line in in_file:
if regexp.search(line):
self.normalize(filename)
break
# examplelib is already multi-threaded and will peg the whole
# compute node. These tasks should run sequentially.
@CrunchJob.task(parallel_with=[])
def normalize(self, filename):
output = examplelib.frob(self.job_param('input').mount_path(filename))
# self.output is a CollectionWriter. When this task method finishes,
# CrunchJob checks if we wrote anything to it. If so, it takes care
# of finishing the upload process, and sets this task's output to the
# Collection UUID.
with self.output.open(filename) as out_file:
out_file.write(output)
if __name__ == '__main__':
NormalizeMatchingFiles(task0='grep_files').main()
Updated by Tom Clegg over 8 years ago · 1 revisions