Project

General

Profile

Python SDK » History » Revision 8

Revision 7 (Brett Smith, 09/23/2014 02:02 PM) → Revision 8/17 (Brett Smith, 09/23/2014 02:21 PM)

h1. Python SDK 

 (design draft) 

 

 h1. Hypothetical future Crunch scripts 

 We're writing these out with the goal of designing a new SDK for Crunch script authors. 

 {{toc}} 

 h2. Example scripts 

 

 h3. Example: "Normalize" files matching a regexp 

 <pre><code class="python"> 
 #!/usr/bin/env python 

 from arvados import CrunchJob 

 import examplelib 
 import re 

 class NormalizeMatchingFiles(CrunchJob): 
     @CrunchJob.task() 
     def grep_files(self): 
         # CrunchJob instantiates input parameters based on the 
         # dataclass attribute.    When we ask for the input parameter, 
         # CrunchJob sees that it's a Collection, and returns a 
         # CollectionReader object. 
         input_collection = self.job_param('input') 
         for filename in input_collection.filenames(): 
             self.grep_file(self.job_param('pattern'), input_collection, filename) 

     @CrunchJob.task() 
     def grep_file(self, pattern, collection, filename): 
         regexp = re.compile(pattern) 
         with collection.open(filename) as in_file: 
             for line in in_file: 
                 if regexp.search(line): 
                     self.normalize(in_file) 
                     break 

     # examplelib is already multi-threaded and will peg the whole 
     # compute node.    These tasks should run sequentially. 
     # Tasks are normally parallel with themselves; here we override that 
     # default to say that the task is parallel with nothing, not even itself. 
     # When tasks are created, Arvados-specific objects like Collection file 
     # objects are serialized as task parameters.    CrunchJob instantiates 
     # these parameters as real objects when it runs the task. 
     @CrunchJob.task(parallel_with=[]) 
     def normalize(self, collection_file): 
         output = examplelib.frob(collection_file.mount_path()) 
         # self.output is a CollectionWriter.    When this task method finishes, 
         # CrunchJob checks if we wrote anything to it.    If so, it takes care 
         # of finishing the upload process, and sets this task's output to the 
         # Collection UUID. 
         with self.output.open(collection_file.name) as out_file: 
             out_file.write(output) 


 if __name__ == '__main__': 
     NormalizeMatchingFiles(task0='grep_files').run() 
 </code></pre> 


 


 h3. Example: Crunch statistics on a Collection of fastj files indicating a tumor 

 This demonstrates scheduling tasks in order and explicit job output. 

 <pre><code class="python"> 
 #!/usr/bin/env python 

 import glob 
 import os 
 import pprint 

 from arvados import CrunchJob 
 from subprocess import check_call 

 class TumorAnalysis(CrunchJob): 
     OUT_EXT = '.analysis' 

     @CrunchJob.task() 
     def check_fastjs(self): 
         in_coll = self.job_param('input') 
         for name in in_coll.filenames(): 
             if name.endswith('.fastj'): 
                 self.classify(in_coll, name) 
         # analyze_tumors gets scheduled to run after all the classification, 
         # since they're not parallel with each other (and it was invoked later). 
         self.analyze_tumors() 

     @CrunchJob.task() 
     def classify(self, collection, filename): 
         # Methods that refer to directories, like mount_path and job_dir, 
         # work like os.path.join when you pass them arguments. 
         check_call(['normal_or_tumor', collection.mount_path(filename)]) 
         outpath = filename + self.OUT_EXT 
         with open(outpath) as result: 
              is_tumor = 'tumor' in result.read(4096) 
         if is_tumor: 
             os.rename(outpath, self.job_dir(outpath)) 

     @CrunchJob.task() 
     def analyze_tumors(self): 
         compiled = {} 
         results = glob.glob(self.job_dir('*' + self.OUT_EXT)) 
         for outpath in results: 
             with open(outpath) as outfile: 
                 compiled[thing] = ...    # Imagine this is a reduce-type step. 
         # job_output is a CollectionWriter.    Writing to it overrides the 
         # default behavior where job output is collated task output. 
         with self.job_output.open('compiled_numbers.log') as resultfile: 
             pprint.pprint(compiled, resultfile) 


 if __name__ == '__main__': 
     TumorAnalysis(task0='check_fastjs').run() 
 </code></pre> 

 h3. Example from #3603 

 This is the script that Abram used to illustrate #3603. 

 <pre><code class="python"> 
 #!/usr/bin/env python 

 from arvados import Collection, CrunchJob 
 from subprocess import check_call 

 class Example3603(CrunchJob): 
     @CrunchJob.task() 
     def parse_human_map(self): 
         refpath = self.job_param('REFPATH').name 
         for line in self.job_param('HUMAN_COLLECTION_LIST'): 
             fastj_id, human_id = line.strip().split(',') 
             self.run_ruler(refpath, fastj_id, human_id) 

     @CrunchJob.task() 
     def run_ruler(self, refpath, fastj_id, human_id): 
         check_call(["tileruler", "--crunch", "--noterm", "abv", 
                     "-human", human_id, 
                     "-fastj-path", Collection(fastj_id).mount_path(), 
                     "-lib-path", refpath]) 
         self.output.add('.')    # Or the path where tileruler writes output. 


 if __name__ == '__main__': 
     Example3603(task0='parse_human_map').run() 
 </code></pre> 

 h2. Notes/TODO 

 * Important concurrency limits that job scripts must be able to express: 
 ** Task Z cannot start until all outputs/side effects of tasks W, X, Y are known/complete (e.g., because Z uses WXY's outputs as its inputs). 
 ** Task Y and Z cannot run on the same worker node without interfering with each other (e.g., due to RAM requirements). 
 ** Schedule at most N instance of task T on the same node.    (The @parallel_with=[]@ example above is stronger than we actually want, for a multithreaded task.    It would be okay to run several of these tasks in parallel, as long as there's only one per node.    This could be expressed as a particular case of the above bullet: "Task T cannot run on the same worker node as task T without interference."    But that feels less natural.) 
 * In general, the output name is not known until the task is nearly finished. Frequently it is clearer to specify it when the task is queued, though. We should provide a convenient way to do this without any boilerplate in the queued task. 
 * A second example that uses a "case" and "control" input (e.g., "tumor" and "normal") might help reveal features. 
 * Should get more clear about how the output of the job (as opposed to the output of the last task) is to be set. The obvious way (concatenate all task outputs) should be a one-liner, if not implicit. Either way, it should run in a task rather than being left up to @crunch-job@.