Project

General

Profile

Python SDK » History » Revision 16

Revision 15 (Brett Smith, 01/22/2015 07:33 PM) → Revision 16/17 (Brett Smith, 02/17/2015 07:44 PM)

h1. Python SDK 

 This is a design draft for a next-generation Python SDK, making it easier to write Crunch scripts today, with room to grow for future workflow changes in Arvados. 

 {{toc}} 

 h2. Goals 

 As I understand them, in order of importance: 

 * The SDK should provide a high-level, Pythonic interface to dispatch work through Arvados.    It should eliminate any need to manipulate jobs or tasks directly via API calls, or manually instantiate objects from parameter strings.    There should be a bare minimum of boilerplate code in a user's Crunch script—just the tiny bit necessary to use the SDK. 
 * The SDK should be usable with today's Crunch, so users can start working with it and give us feedback to inform future development. 
 * The SDK should have clear room to grow to accommodate future workflow improvements, whether that's new Crunch features (including possibly reusable tasks and/or elimination of the job/task distinction), hooks with the Common Workflow Language, or whatever. 
   The first version of the SDK will not support features we expect these systems to have—that would conflict with the previous goal.    The idea here is that the API should not rely on details of today's Crunch implementation, and should have clear spaces where users will be able to use the features of more powerful work dispatch systems in the future. 

 h2. Components 

 h3. Dispatcher 

 Dispatcher classes provide a consistent interface to talk to different scheduling systems: they know how to run the right code for this instantiation (e.g., the current task); how to dispatch new work to the scheduling system; how to de/serialize parameters; etc. 

 Early on, we'll have a traditional Crunch dispatcher, and a "running locally for debugging" dispatcher.    If we want to support entirely different backends in the future, the Dispatcher interface should provide the right level of abstraction to do that.    While I'm not sketching out an API here (since it's not user-facing), we do need to be careful to make sure that Dispatcher methods hit that design target. 

 Users should be able to can instantiate Dispatcher classes to customize their behavior.    For example, one thing we know we want to support early on is telling Crunch how to set the job output from given tasks.    Since the job/task distinction is specific to Crunch v1, that customization belongs in the Crunch Dispatcher class.    We'll let users pass in task names to specify what output should become the job output: 

 @dispatcher = CrunchDispatcher(job_output_from=['taskB', 'taskD'])@ 

 However, all Dispatchers should be able to run without any customization, and users shouldn't have to deal with them at all for the easy cases where they're fine with the scheduler's default behavior.    The SDK provides an AutoDispatcher callable that automatically resolves to the best Dispatcher available for the environment, or raises an exception if none are suitable. 

 h3. CrunchModule CrunchScript 

 The bulk of a user's Crunch script will define a subclass of the SDK's CrunchModule CrunchScript class.    The user extends it with WorkMethods that represent work units, and a run() start() method that implements code to start the highest-level work, returning work.    That done, the final work output (e.g., user calls subclass.run() to get the job output as opposed to task output). appropriate code running. 

 The run() and WorkMethods follow similar patterns of rules for how they're run.    There are some small differences between them.    Right now you can think of them as parallels to class method accepts the differences between jobs and tasks, but the concepts are general enough that they should be applicable Dispatcher to other scheduling systems. 

 * run() is called with parameters that are deserialized from the Arvados job record.    WorkMethods are called with parameters provided use in the user's CrunchModule code directly. @dispatcher@ keyword argument.    The Crunch dispatcher will stash those parameters in task parameters. 
 * Both run() and WorkMethods can call other WorkMethods. 
 * run() and WorkMethods can return a Collection object, a sequence of one or more FutureOutput objects, a simple (JSON-encodable) Python object, or None.    This output If none is serialized appropriately and recorded in Arvados.    run()'s output is the job output; WorkMethod output is task output. 
   When the output is a sequence of FutureOutputs, provided, it means "create my output by combining introspects the outputs of these WorkMethods"—similarly environment to find the way crunch-job creates job output by concatenating task output today.    The SDK will record the references immediately, and then later resolve them when able to record "real" output to aid work reuse. 

 The user should pass their CrunchModule appropriate Dispatcher class to the run_work() method of use.    It instantiates that, passing along an instantiated dispatcher.    run_work() will instantiate the CrunchModule so that WorkMethods can be dispatched appropriately, then invoke run() or the user code instantiation of a WorkMethod appropriately. itself.    To better illustrate, I'll describe how the current Crunch dispatcher would take over from here. 

 The Crunch Dispatcher gets the current task.    If that's task 0, it gets the current job, deserializes the job parameters, and calls crunch_module.run() subclass.start() with those arguments.    For any other task, it looks up the task's corresponding WorkMethod, and runs its code with deserialized task parameters. 

 Either way, the method's return value is captured.    If it's a Collection object, the Dispatcher makes sure it's saved, then records the portable data hash as the task's output.    For any other return value except None, it serializes the return value to JSON, saves that in a Collection, and uses that as the task's output. 

 h3. WorkMethod 

 WorkMethod is a class that should decorate instance methods defined on the CrunchModule CrunchScript subclass.    It demarcates work that can be scheduled through a Dispatcher.    Use looks like: 

 <pre><code class="python">@WorkMethod() 
 def small_work(self, task_code(self, param1, param2): 
     # implementation 
 </code></pre> 

 When a WorkMethod is called—from run() start() or another WorkMethod's code—the user's code isn't run.    Instead, we give the Dispatcher all the information about the call.    The Dispatcher serializes the parameters and schedules the real work method to be run later.    We wrap information about the work's scheduling in a FutureOutput object, which I'll get to next. 

 Because the Dispatcher has to be able to serialize arguments and output, those values can only have types that the SDK knows how to work with, and some type information may be lost.    For the first SDK implementation, objects we can work with will be objects that can be encoded to JSON, plus Collection objects.    Sequence types that aren't native to JSON, like tuples and iterators, will become lists at the method's boundaries.    (TBD: Or maybe we should require exactly a known type, to avoid surprising coercions?) 

 Note that WorkMethod accepts arguments, and we're passing it none.    We expect the constructor to grow optional keyword arguments over time that let the user specify scheduling constraints. 

 h3. FutureOutput 

 FutureOutput objects represent the output of work that has been scheduled but not yet run.    Whenever a WorkMethod is called, it returns a FutureOutput object.    Users The only thing the user can do two things with these objects: this object is pass them it in as arguments an argument to other WorkMethod calls, and return them from run() or WorkMethods. 

 calls.    When a WorkMethod is called with at least one FutureOutput object, that happens, the Dispatcher does two things.    First, it can introspect those introspects all the FutureObjects passed in to figure out how the new work should be scheduled.    Second, when it's time to run the work of the second WorkMethod, it finds the real output generated by running the first WorkMethod, and passes that in when running the user code. 

 When run() or a WorkMethod returns a list of FutureOutput objects, those outputs will be combined to create the final corresponding output once they're available.    Refer to the discussion in the CrunchModule section above. 

 To be clear: each Dispatcher will have its own paired FutureObject implementation. 

 FutureOutput objects cannot be resolved or manipulated by the user directly.    Users can't wait for the real output to become available (because in some schedulers, like current Crunch, you'll wait forever). Current, it never will be).    The objects should have no user-facing methods that might suggest this is possible.    We'll probably even redefine some very basic methods (e.g., @__str__@) to be _less_ useful than object's default implementation, to really drive this point home. 

 h3. Crunch dispatch example 

 Let me talk about a Crunch example to make things concrete.    The Crunch dispatcher is expected to create tasks at set sequence levels, based on when they can run: 

 * S1: Sequence for WorkMethod tasks that can be run now—all parameters can be deserialized into useful objects 
 * S2: Sequence for WorkMethod tasks that have FutureOutput parameters that need to be resolved 
 * S3: Sequence for internal tasks to set real output records for methods that returned FutureOutput objects 

 Roughly, the FutureObject will wrap the task we create to represent the work.    Imagine a simple linear Crunch job: 

 <pre><code class="python">class CrunchExample(CrunchModule): 
     def run(self, arg1, arg2, arg3): 
         output1 class="python">step1out = work_method1(arg1, task1(arg1, arg2) 
         return [work_method2(output1, arg3)]</code></pre> 
 task2(step1out, arg)</code></pre> 

 work_method1 is called, and immediately returns a FutureOutput with information about the task scheduled for it at sequence S1.    When work_method2 task2 is called, the Crunch Dispatcher introspects the arguments, and finds the output1 step1out FutureObject among them.    The work_method2 task2 code can't be run until work_method1 runs and generates output. task1 has finished.    The Crunch Dispatcher gets task1's sequence number from step1out, and creates a task to run work_method2 at task2 with a greater sequence S2.    work_method2 returns a FutureOutput for this task, which run() returns directly.    Because run() has returned a list of FutureOutput objects, the Crunch Dispatcher creates an internal task to run at S3 to combine these outputs into a single collection and set them as the final job output. number. 

 When Crunch actually starts the task compute work for work_method2, task2, the Crunch Dispatcher will see that an this argument should have the value of work_method1's task1's output.    It will deserialize that from the task output mirroring the rules described earlier, then duplicate this task to run at sequence S1 with real inputs.    This dispatch strategy means pass that object in as the Crunch job will run as many tasks as possible before trying to resolve FutureOutputs (it will keep running tasks at S1 until none remain), and tasks will be maximally reusable (because tasks that actually run have real inputs, without requiring an SDK to resolve them). value when it calls the user's code. 

 h3. Review 

 Here are the interfaces that script authors will use: 

 * They will subclass CrunchModule CrunchScript to define their own run() start() method and WorkMethods. 
 * They will decorate their work methods with WorkMethod().    This decorator will not currently accept any arguments, but you'll still call it as if it did, so it can grow to accept new keyword arguments in the future. 
 * They will receive FutureOutput objects when they call WorkMethods, and pass those to other WorkMethod calls.    The user doesn't need to know any of the semantics of FutureOutput at all; only the Dispatcher does. 
 * The main body of the script will call dispatcher.run_work(), passing in the run() class method on their CrunchModule CrunchScript subclass. 

 This is a very small interface.    There's nothing specific to current Crunch Crunch-specific in it.    When the library takes responsibility for some Arvados interface, it takes responsibility for all interactions with it: e.g., it both serializes and deserializes parameters; it controls all aspects of work scheduling.    Users shouldn't have to encode any Crunch specifics in their scripts.    I think that bodes well for future development of the SDK. 

 h2. Hypothetical future Crunch scripts 

 These are completely untested, since they can't actually run.    Please forgive cosmetic mistakes. 

 h3. Example: Crunch statistics on a Collection of fastj files indicating a tumor 

 This demonstrates scheduling tasks in order and explicit job output. 

 <pre><code class="python"> 
 #!/usr/bin/env python 

 import os 
 import pprint 
 import subprocess 

 from arvados.crunch import CrunchModule, CrunchScript, WorkMethod, AutoDispatcher CrunchDispatcher 
 from subprocess import check_call 

 class TumorAnalysis(CrunchModule): TumorAnalysis(CrunchScript): 
     # input is a Collection named in a job parameter. 
     # The SDK instantiates the corresponding Colection object and passes it in. 
     def run(self, start(self, input): 
         # analyze_tumors is being called with a generator of 
         # FutureOutput objects.    The Dispatcher will unroll this (it 
         # inevitably needs to traverse recursive data structures), 
         # schedule all the classify tasks at sequence N-1, N+1, then 
         # schedule analyze_tumors after at sequence N.    When the analyze_tumors N+2. 
         # task runs, it will resolve all the futures and schedule a real 
         # task to run at sequence N-1. 
         return self.analyze_tumors(self.classify(in_file) 
                                    
                             for in_file in input.all_files() 
                                    
                             if in_file.name.endswith('.fastj')) 

     @WorkMethod() 
     def classify(self, in_file): 
         out_file = tempfile.NamedTemporaryFile() 
         proc = subprocess.Popen(['normal_or_tumor'], 
                                 stdin=subprocess.PIPE, stdout=out_file) 
         for data in in_file.readall(): 
             proc.stdin.write(data) 
         proc.stdin.close() 
         proc.wait() 
         if 'tumor' in outfile.read(4096): 
             out_file.seek(0) 
             out_coll = CollectionWriter() 
             with out_coll.open(in_file.name) as output: 
                 output.write(out_file.read()) 
             return out_coll 
         return None 

     @WorkMethod() 
     def analyze_tumors(self, results): 
         compiled = {} 
         for result in results: 
             if result is None: 
                 continue 
             # Imagine a reduce-type step operating on the file in the 
             # Collection. 
             compiled[thing] = ... 
         return compiled 


 if __name__ == '__main__': 
     AutoDispatcher().run_work(TumorAnalysis) dispatcher = CrunchDispatcher(job_output_from=['analyze_tumors']) 
     TumorAnalysis.run(dispatcher=dispatcher) 
 </code></pre> 

 h3. Example from #3603 

 This is the script that Abram used to illustrate #3603. 

 <pre><code class="python"> 
 #!/usr/bin/env python 

 from arvados.crunch import CrunchModule, WorkMethod, AutoDispatcher CrunchScript, WorkMethod 
 from subprocess import check_call 

 class Example3603(CrunchModule): Example3603(CrunchScript): 
     def run(self, start(self, human_collection_list, refpath): 
         return [self.run_ruler(refpath, line) for line in human_collection_list] human_collection_list: 
             fastj_id, human_id = line.strip().split(',') 
             self.run_ruler(refpath, fastj_id, human_id) 

     @WorkMethod() 
     def run_ruler(self, refpath, input_line): 
         fastj_id, human_id = input_line.strip().split(',') human_id): 
         # CollectionReader and StreamFileReader objects should grow a 
         # mount_path() method that gives the object's location under 
         # $TASK_KEEPMOUNT. 
         check_call(["tileruler", "--crunch", "--noterm", "abv", 
                     "-human", human_id, 
                     "-fastj-path", CollectionReader(fastj_id).mount_path(), 
                     "-lib-path", refpath]) 
         out_coll = CollectionWriter() 
         # The argument is the path where tileruler writes output. 
         out_coll.write_directory_tree('.') 
         return out_coll 


 if __name__ == '__main__': 
     AutoDispatcher().run_work(Example3603) # Note that there will be many run_ruler tasks.    Their output will be 
     # concatenated.    The default behavior, as usual, is to concatenate 
     # everything. 
     dispatcher = CrunchDispatcher(job_output_from=['run_ruler']) 
     Example3603.run(dispatcher=dispatcher) 
 </code></pre> 

 h2. Notes/TODO 

 (Some of these may be out-of-date, but I'll leave that for the suggesters to decide.) 

 * Important concurrency limits that job scripts must be able to express: 
 ** Task Z cannot start until all outputs/side effects of tasks W, X, Y are known/complete (e.g., because Z uses WXY's outputs as its inputs). 
 *** I'm considering having task methods immediately return some kind of future object.    You could pass that in to other task methods, and the SDK would introspect that and schedule the second task to come after the first.    This would also make it look more like normal Python: a higher-level function can call one task method, grab its output, and pass it in to another.    Probably the only catch is that we wouldn't be using the original method's real return value, but instead the task's output collection.    When we run the second task, what was represented by a future can be replaced with a CollectionReader object for the task's output. 
 ** Task Y and Z cannot run on the same worker node without interfering with each other (e.g., due to RAM requirements). 
 ** Schedule at most N instance of task T on the same node.    (The @parallel_with=[]@ example above is stronger than we actually want, for a multithreaded task.    It would be okay to run several of these tasks in parallel, as long as there's only one per node.    This could be expressed as a particular case of the above bullet: "Task T cannot run on the same worker node as task T without interference."    But that feels less natural.) 
 *** This is an SDK for Crunch, and currently Crunch gives the user no control of task scheduling beyond the sequence, like how one compute node can or can't be shared.    I don't know how to write these features given that there's no way to express these constraints in a way Crunch will recognize and expect (beyond the brute-force "don't parallelize this"). 
 * In general, the output name is not known until the task is nearly finished. Frequently it is clearer to specify it when the task is queued, though. We should provide a convenient way to do this without any boilerplate in the queued task. 
 ** This seems pretty easily doable by passing the desired filename into the task method as a parameter.    Then the code can work with @self.output.open(passed_filename)@. 
 * A second example that uses a "case" and "control" input (e.g., "tumor" and "normal") might help reveal features. 
 * Should get more clear about how the output of the job (as opposed to the output of the last task) is to be set. The obvious way (concatenate all task outputs) should be a one-liner, if not implicit. Either way, it should run in a task rather than being left up to @crunch-job@. 
 ** This also sounds like something that needs to be settled at the Crunch level.    But it seems like we could be pretty accommodating by having crunch-job just say, "If the job set an output, great; if not, collate the task outputs."    Then the SDK can provide pre-defined task methods for common output setups: collate the output of all tasks or a subset of them; use the output of a specific task; etc.