Statistics
| Branch: | Revision:

arvados / crunch_scripts / GATK2-merge-call @ master

History | View | Annotate | Download (7.21 KB)

1
#!/usr/bin/env python
2

    
3
import os
4
import re
5
import string
6
import threading
7
import arvados
8
import arvados_gatk2
9
import arvados_picard
10
from arvados_ipc import *
11

    
12
class InvalidArgumentError(Exception):
13
    pass
14

    
15
this_job = arvados.current_job()
16
this_task = arvados.current_task()
17
tmpdir = arvados.current_task().tmpdir
18
arvados.util.clear_tmpdir()
19

    
20
bundle_dir = arvados.util.collection_extract(
21
    collection = this_job['script_parameters']['gatk_bundle'],
22
    files = [
23
        'human_g1k_v37.dict',
24
        'human_g1k_v37.fasta',
25
        'human_g1k_v37.fasta.fai',
26
        'dbsnp_137.b37.vcf',
27
        'dbsnp_137.b37.vcf.idx',
28
        ],
29
    path = 'gatk_bundle')
30
ref_fasta_files = [os.path.join(bundle_dir, f)
31
                   for f in os.listdir(bundle_dir)
32
                   if re.search(r'\.fasta(\.gz)?$', f)]
33
regions_args = []
34
if 'regions' in this_job['script_parameters']:
35
    regions_dir = arvados.util.collection_extract(
36
        collection = this_job['script_parameters']['regions'],
37
        path = 'regions')
38
    region_padding = int(this_job['script_parameters']['region_padding'])
39
    for f in os.listdir(regions_dir):
40
        if re.search(r'\.bed$', f):
41
            regions_args += [
42
                '--intervals', os.path.join(regions_dir, f),
43
                '--interval_padding', str(region_padding)
44
                ]
45

    
46

    
47
# Start a child process for each input file, feeding data to picard.
48

    
49
input_child_names = []
50
children = {}
51
pipes = {}
52

    
53
input_collection = this_job['script_parameters']['input']
54
input_index = 0
55
for s in arvados.CollectionReader(input_collection).all_streams():
56
    for f in s.all_files():
57
        if not re.search(r'\.bam$', f.name()):
58
            continue
59
        input_index += 1
60
        childname = 'input-' + str(input_index)
61
        input_child_names += [childname]
62
        pipe_setup(pipes, childname)
63
        childpid = named_fork(children, childname)
64
        if childpid == 0:
65
            pipe_closeallbut(pipes, (childname, 'w'))
66
            for s in f.readall():
67
                os.write(pipes[childname, 'w'], s)
68
            os.close(pipes[childname, 'w'])
69
            os._exit(0)
70
        sys.stderr.write("pid %d writing %s to fd %d->%d\n" %
71
                         (childpid,
72
                          s.name()+'/'+f.name(),
73
                          pipes[childname, 'w'],
74
                          pipes[childname, 'r']))
75
        pipe_closeallbut(pipes, *[(childname, 'r')
76
                                  for childname in input_child_names])
77

    
78

    
79
# Merge-sort the input files to merge.bam
80

    
81
arvados_picard.run(
82
    'MergeSamFiles',
83
    args=[
84
        'I=/dev/fd/' + str(pipes[childname, 'r'])
85
        for childname in input_child_names
86
        ],
87
    params={
88
        'o': 'merge.bam',
89
        'quiet': 'true',
90
        'so': 'coordinate',
91
        'use_threading': 'true',
92
        'create_index': 'true',
93
        'validation_stringency': 'LENIENT',
94
        },
95
    close_fds=False,
96
    )
97
pipe_closeallbut(pipes)
98

    
99

    
100
# Run CoverageBySample on merge.bam
101

    
102
pipe_setup(pipes, 'stats_log')
103
pipe_setup(pipes, 'stats_out')
104
if 0 == named_fork(children, 'GATK'):
105
    pipe_closeallbut(pipes,
106
                     ('stats_log', 'w'),
107
                     ('stats_out', 'w'))
108
    arvados_gatk2.run(
109
        args=[
110
            '-T', 'CoverageBySample',
111
            '-R', ref_fasta_files[0],
112
            '-I', 'merge.bam',
113
            '-o', '/dev/fd/' + str(pipes['stats_out', 'w']),
114
            '--log_to_file', '/dev/fd/' + str(pipes['stats_log', 'w']),
115
            ]
116
        + regions_args,
117
        close_fds=False)
118
    pipe_closeallbut(pipes)
119
    os._exit(0)
120
pipe_closeallbut(pipes, ('stats_log', 'r'), ('stats_out', 'r'))
121

    
122

    
123
# Start two threads to read from CoverageBySample pipes
124

    
125
class ExceptionPropagatingThread(threading.Thread):
126
    """
127
    If a subclassed thread calls _raise(e) in run(), running join() on
128
    the thread will raise e in the thread that calls join().
129
    """
130
    def __init__(self, *args, **kwargs):
131
        super(ExceptionPropagatingThread, self).__init__(*args, **kwargs)
132
        self.__exception = None
133
    def join(self, *args, **kwargs):
134
        ret = super(ExceptionPropagatingThread, self).join(*args, **kwargs)
135
        if self.__exception:
136
            raise self.__exception
137
        return ret
138
    def _raise(self, exception):
139
        self.__exception = exception
140

    
141
class StatsLogReader(ExceptionPropagatingThread):
142
    def __init__(self, **kwargs):
143
        super(StatsLogReader, self).__init__()
144
        self.args = kwargs
145
    def run(self):
146
        try:
147
            for logline in self.args['infile']:
148
                x = re.search('Processing (\d+) bp from intervals', logline)
149
                if x:
150
                    self._total_bp = int(x.group(1))
151
        except Exception as e:
152
            self._raise(e)
153
    def total_bp(self):
154
        self.join()
155
        return self._total_bp
156
stats_log_thr = StatsLogReader(infile=os.fdopen(pipes.pop(('stats_log', 'r'))))
157
stats_log_thr.start()
158

    
159
class StatsOutReader(ExceptionPropagatingThread):
160
    """
161
    Read output of CoverageBySample and collect a histogram of
162
    coverage (last column) -> number of loci (number of rows).
163
    """
164
    def __init__(self, **kwargs):
165
        super(StatsOutReader, self).__init__()
166
        self.args = kwargs
167
    def run(self):
168
        try:
169
            hist = [0]
170
            histtot = 0
171
            for line in self.args['infile']:
172
                try:
173
                    i = int(string.split(line)[-1])
174
                except ValueError:
175
                    continue
176
                if i >= 1:
177
                    if len(hist) <= i:
178
                        hist.extend([0 for x in range(1+i-len(hist))])
179
                    hist[i] += 1
180
                    histtot += 1
181
            hist[0] = stats_log_thr.total_bp() - histtot
182
            self._histogram = hist
183
        except Exception as e:
184
            self._raise(e)
185
    def histogram(self):
186
        self.join()
187
        return self._histogram
188
stats_out_thr = StatsOutReader(infile=os.fdopen(pipes.pop(('stats_out', 'r'))))
189
stats_out_thr.start()
190

    
191

    
192
# Run UnifiedGenotyper on merge.bam
193

    
194
arvados_gatk2.run(
195
    args=[
196
        '-nt', arvados_gatk2.cpus_on_this_node(),
197
        '-T', 'UnifiedGenotyper',
198
        '-R', ref_fasta_files[0],
199
        '-I', 'merge.bam',
200
        '-o', os.path.join(tmpdir, 'out.vcf'),
201
        '--dbsnp', os.path.join(bundle_dir, 'dbsnp_137.b37.vcf'),
202
        '-metrics', 'UniGenMetrics',
203
        '-A', 'DepthOfCoverage',
204
        '-A', 'AlleleBalance',
205
        '-A', 'QualByDepth',
206
        '-A', 'HaplotypeScore',
207
        '-A', 'MappingQualityRankSumTest',
208
        '-A', 'ReadPosRankSumTest',
209
        '-A', 'FisherStrand',
210
        '-glm', 'both',
211
        ]
212
    + regions_args
213
    + arvados.getjobparam('GATK2_UnifiedGenotyper_args',[]))
214

    
215
# Copy the output VCF file to Keep
216

    
217
out = arvados.CollectionWriter()
218
out.start_new_stream()
219
out.start_new_file('out.vcf')
220
out.write(open(os.path.join(tmpdir, 'out.vcf'), 'rb'))
221

    
222

    
223
# Write statistics to Keep
224

    
225
out.start_new_file('mincoverage_nlocus.csv')
226
sofar = 0
227
hist = stats_out_thr.histogram()
228
total_bp = stats_log_thr.total_bp()
229
for i in range(len(hist)):
230
    out.write("%d,%d,%f\n" %
231
              (i,
232
               total_bp - sofar,
233
               100.0 * (total_bp - sofar) / total_bp))
234
    sofar += hist[i]
235

    
236
if waitpid_and_check_children(children):
237
    this_task.set_output(out.finish())
238
else:
239
    sys.exit(1)