Statistics
| Branch: | Revision:

arvados / crunch_scripts / GATK2-bqsr @ master

History | View | Annotate | Download (2.77 KB)

1
#!/usr/bin/env python
2

    
3
import os
4
import re
5
import arvados
6
import arvados_gatk2
7
import arvados_samtools
8
from arvados_ipc import *
9

    
10
class InvalidArgumentError(Exception):
11
    pass
12

    
13
arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
14

    
15
this_job = arvados.current_job()
16
this_task = arvados.current_task()
17
tmpdir = arvados.current_task().tmpdir
18
arvados.util.clear_tmpdir()
19

    
20
known_sites_files = arvados.getjobparam(
21
    'known_sites',
22
    ['dbsnp_137.b37.vcf',
23
     'Mills_and_1000G_gold_standard.indels.b37.vcf',
24
     ])
25
bundle_dir = arvados.util.collection_extract(
26
    collection = this_job['script_parameters']['gatk_bundle'],
27
    files = [
28
        'human_g1k_v37.dict',
29
        'human_g1k_v37.fasta',
30
        'human_g1k_v37.fasta.fai'
31
        ] + known_sites_files + [v + '.idx' for v in known_sites_files],
32
    path = 'gatk_bundle')
33
ref_fasta_files = [os.path.join(bundle_dir, f)
34
                   for f in os.listdir(bundle_dir)
35
                   if re.search(r'\.fasta(\.gz)?$', f)]
36

    
37
input_collection = this_task['parameters']['input']
38
input_dir = arvados.util.collection_extract(
39
    collection = input_collection,
40
    path = os.path.join(this_task.tmpdir, 'input'))
41
input_bam_files = []
42
for f in arvados.util.listdir_recursive(input_dir):
43
    if re.search(r'\.bam$', f):
44
        input_stream_name, input_file_name = os.path.split(f)
45
        input_bam_files += [os.path.join(input_dir, f)]
46
if len(input_bam_files) != 1:
47
    raise InvalidArgumentError("Expected exactly one bam file per task.")
48

    
49
known_sites_args = []
50
for f in known_sites_files:
51
    known_sites_args += ['-knownSites', os.path.join(bundle_dir, f)]
52

    
53
recal_file = os.path.join(tmpdir, 'recal.csv')
54

    
55
children = {}
56
pipes = {}
57

    
58
arvados_gatk2.run(
59
    args=[
60
        '-nct', arvados_gatk2.cpus_on_this_node(),
61
        '-T', 'BaseRecalibrator',
62
        '-R', ref_fasta_files[0],
63
        '-I', input_bam_files[0],
64
        '-o', recal_file,
65
        ] + known_sites_args)
66

    
67
pipe_setup(pipes, 'BQSR')
68
if 0 == named_fork(children, 'BQSR'):
69
    pipe_closeallbut(pipes, ('BQSR', 'w'))
70
    arvados_gatk2.run(
71
        args=[
72
        '-T', 'PrintReads',
73
        '-R', ref_fasta_files[0],
74
        '-I', input_bam_files[0],
75
        '-o', '/dev/fd/' + str(pipes['BQSR','w']),
76
        '-BQSR', recal_file,
77
        '--disable_bam_indexing',
78
        ],
79
        close_fds=False)
80
    os._exit(0)
81
os.close(pipes.pop(('BQSR','w'), None))
82

    
83
out = arvados.CollectionWriter()
84
out.start_new_stream(input_stream_name)
85

    
86
out.start_new_file(input_file_name + '.recal.csv')
87
out.write(open(recal_file, 'rb'))
88

    
89
out.start_new_file(input_file_name)
90
while True:
91
    buf = os.read(pipes['BQSR','r'], 2**20)
92
    if len(buf) == 0:
93
        break
94
    out.write(buf)
95
pipe_closeallbut(pipes)
96

    
97
if waitpid_and_check_children(children):
98
    this_task.set_output(out.finish())
99
else:
100
    sys.exit(1)