Statistics
| Branch: | Revision:

arvados / crunch_scripts / GATK2-realign @ master

History | View | Annotate | Download (5.09 KB)

1
#!/usr/bin/env python
2

    
3
import os
4
import re
5
import arvados
6
import arvados_gatk2
7
import arvados_picard
8
import arvados_samtools
9
from arvados_ipc import *
10

    
11
class InvalidArgumentError(Exception):
12
    pass
13

    
14
arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
15

    
16
this_job = arvados.current_job()
17
this_task = arvados.current_task()
18
tmpdir = arvados.current_task().tmpdir
19
arvados.util.clear_tmpdir()
20

    
21
known_sites_files = arvados.getjobparam(
22
    'known_sites',
23
    ['dbsnp_137.b37.vcf',
24
     'Mills_and_1000G_gold_standard.indels.b37.vcf',
25
     ])
26
bundle_dir = arvados.util.collection_extract(
27
    collection = this_job['script_parameters']['gatk_bundle'],
28
    files = [
29
        'human_g1k_v37.dict',
30
        'human_g1k_v37.fasta',
31
        'human_g1k_v37.fasta.fai'
32
        ] + known_sites_files + [v + '.idx' for v in known_sites_files],
33
    path = 'gatk_bundle')
34
ref_fasta_files = [os.path.join(bundle_dir, f)
35
                   for f in os.listdir(bundle_dir)
36
                   if re.search(r'\.fasta(\.gz)?$', f)]
37
regions_args = []
38
if 'regions' in this_job['script_parameters']:
39
    regions_dir = arvados.util.collection_extract(
40
        collection = this_job['script_parameters']['regions'],
41
        path = 'regions')
42
    region_padding = int(this_job['script_parameters']['region_padding'])
43
    for f in os.listdir(regions_dir):
44
        if re.search(r'\.bed$', f):
45
            regions_args += [
46
                '--intervals', os.path.join(regions_dir, f),
47
                '--interval_padding', str(region_padding)
48
                ]
49

    
50
input_collection = this_task['parameters']['input']
51
input_dir = arvados.util.collection_extract(
52
    collection = input_collection,
53
    path = os.path.join(this_task.tmpdir, 'input'))
54
input_bam_files = []
55
for f in arvados.util.listdir_recursive(input_dir):
56
    if re.search(r'\.bam$', f):
57
        input_stream_name, input_file_name = os.path.split(f)
58
        input_bam_files += [os.path.join(input_dir, f)]
59
if len(input_bam_files) != 1:
60
    raise InvalidArgumentError("Expected exactly one bam file per task.")
61

    
62
known_sites_args = []
63
for f in known_sites_files:
64
    known_sites_args += ['-known', os.path.join(bundle_dir, f)]
65

    
66
children = {}
67
pipes = {}
68

    
69
arvados_gatk2.run(
70
    args=[
71
        '-nt', arvados_gatk2.cpus_per_task(),
72
        '-T', 'RealignerTargetCreator',
73
        '-R', ref_fasta_files[0],
74
        '-I', input_bam_files[0],
75
        '-o', os.path.join(tmpdir, 'intervals.list')
76
        ] + known_sites_args + regions_args)
77

    
78
pipe_setup(pipes, 'IndelRealigner')
79
if 0 == named_fork(children, 'IndelRealigner'):
80
    pipe_closeallbut(pipes, ('IndelRealigner', 'w'))
81
    arvados_gatk2.run(
82
        args=[
83
        '-T', 'IndelRealigner',
84
        '-R', ref_fasta_files[0],
85
        '-targetIntervals', os.path.join(tmpdir, 'intervals.list'),
86
        '-I', input_bam_files[0],
87
        '-o', '/dev/fd/' + str(pipes['IndelRealigner','w']),
88
        '--disable_bam_indexing',
89
        ] + known_sites_args + regions_args,
90
        close_fds=False)
91
    os._exit(0)
92
os.close(pipes.pop(('IndelRealigner','w'), None))
93

    
94
pipe_setup(pipes, 'bammanifest')
95
pipe_setup(pipes, 'bam')
96
if 0==named_fork(children, 'bammanifest'):
97
    pipe_closeallbut(pipes,
98
                     ('IndelRealigner', 'r'),
99
                     ('bammanifest', 'w'),
100
                     ('bam', 'w'))
101
    out = arvados.CollectionWriter()
102
    out.start_new_stream(input_stream_name)
103
    out.start_new_file(input_file_name)
104
    while True:
105
        buf = os.read(pipes['IndelRealigner','r'], 2**20)
106
        if len(buf) == 0:
107
            break
108
        os.write(pipes['bam','w'], buf)
109
        out.write(buf)
110
    os.write(pipes['bammanifest','w'], out.manifest_text())
111
    os.close(pipes['bammanifest','w'])
112
    os._exit(0)
113

    
114
pipe_setup(pipes, 'index')
115
if 0==named_fork(children, 'index'):
116
    pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
117
    arvados_picard.run(
118
        'BuildBamIndex',
119
        params={
120
            'i': '/dev/fd/' + str(pipes['bam','r']),
121
            'o': '/dev/fd/' + str(pipes['index','w']),
122
            'quiet': 'true',
123
            'validation_stringency': 'LENIENT'
124
            },
125
        close_fds=False)
126
    os._exit(0)
127

    
128
pipe_setup(pipes, 'indexmanifest')
129
if 0==named_fork(children, 'indexmanifest'):
130
    pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
131
    out = arvados.CollectionWriter()
132
    out.start_new_stream(input_stream_name)
133
    out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
134
    while True:
135
        buf = os.read(pipes['index','r'], 2**20)
136
        if len(buf) == 0:
137
            break
138
        out.write(buf)
139
    os.write(pipes['indexmanifest','w'], out.manifest_text())
140
    os.close(pipes['indexmanifest','w'])
141
    os._exit(0)
142

    
143
pipe_closeallbut(pipes, ('bammanifest', 'r'), ('indexmanifest', 'r'))
144
outmanifest = ''
145
for which in ['bammanifest', 'indexmanifest']:
146
    with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
147
        while True:
148
            buf = f.read()
149
            if buf == '':
150
                break
151
            outmanifest += buf
152

    
153
all_ok = True
154
for (childname, pid) in children.items():
155
    all_ok = all_ok and waitpid_and_check_exit(pid, childname)
156

    
157
if all_ok:
158
    this_task.set_output(outmanifest)
159
else:
160
    sys.exit(1)