Statistics
| Branch: | Revision:

arvados / crunch_scripts / GATK2-merge-call @ master

History | View | Annotate | Download (7.3 KB)

1
#!/usr/bin/env python
2
# Copyright (C) The Arvados Authors. All rights reserved.
3
#
4
# SPDX-License-Identifier: Apache-2.0
5

    
6
import os
7
import re
8
import string
9
import threading
10
import arvados
11
import arvados_gatk2
12
import arvados_picard
13
from arvados_ipc import *
14

    
15
class InvalidArgumentError(Exception):
16
    pass
17

    
18
this_job = arvados.current_job()
19
this_task = arvados.current_task()
20
tmpdir = arvados.current_task().tmpdir
21
arvados.util.clear_tmpdir()
22

    
23
bundle_dir = arvados.util.collection_extract(
24
    collection = this_job['script_parameters']['gatk_bundle'],
25
    files = [
26
        'human_g1k_v37.dict',
27
        'human_g1k_v37.fasta',
28
        'human_g1k_v37.fasta.fai',
29
        'dbsnp_137.b37.vcf',
30
        'dbsnp_137.b37.vcf.idx',
31
        ],
32
    path = 'gatk_bundle')
33
ref_fasta_files = [os.path.join(bundle_dir, f)
34
                   for f in os.listdir(bundle_dir)
35
                   if re.search(r'\.fasta(\.gz)?$', f)]
36
regions_args = []
37
if 'regions' in this_job['script_parameters']:
38
    regions_dir = arvados.util.collection_extract(
39
        collection = this_job['script_parameters']['regions'],
40
        path = 'regions')
41
    region_padding = int(this_job['script_parameters']['region_padding'])
42
    for f in os.listdir(regions_dir):
43
        if re.search(r'\.bed$', f):
44
            regions_args += [
45
                '--intervals', os.path.join(regions_dir, f),
46
                '--interval_padding', str(region_padding)
47
                ]
48

    
49

    
50
# Start a child process for each input file, feeding data to picard.
51

    
52
input_child_names = []
53
children = {}
54
pipes = {}
55

    
56
input_collection = this_job['script_parameters']['input']
57
input_index = 0
58
for s in arvados.CollectionReader(input_collection).all_streams():
59
    for f in s.all_files():
60
        if not re.search(r'\.bam$', f.name()):
61
            continue
62
        input_index += 1
63
        childname = 'input-' + str(input_index)
64
        input_child_names += [childname]
65
        pipe_setup(pipes, childname)
66
        childpid = named_fork(children, childname)
67
        if childpid == 0:
68
            pipe_closeallbut(pipes, (childname, 'w'))
69
            for s in f.readall():
70
                os.write(pipes[childname, 'w'], s)
71
            os.close(pipes[childname, 'w'])
72
            os._exit(0)
73
        sys.stderr.write("pid %d writing %s to fd %d->%d\n" %
74
                         (childpid,
75
                          s.name()+'/'+f.name(),
76
                          pipes[childname, 'w'],
77
                          pipes[childname, 'r']))
78
        pipe_closeallbut(pipes, *[(childname, 'r')
79
                                  for childname in input_child_names])
80

    
81

    
82
# Merge-sort the input files to merge.bam
83

    
84
arvados_picard.run(
85
    'MergeSamFiles',
86
    args=[
87
        'I=/dev/fd/' + str(pipes[childname, 'r'])
88
        for childname in input_child_names
89
        ],
90
    params={
91
        'o': 'merge.bam',
92
        'quiet': 'true',
93
        'so': 'coordinate',
94
        'use_threading': 'true',
95
        'create_index': 'true',
96
        'validation_stringency': 'LENIENT',
97
        },
98
    close_fds=False,
99
    )
100
pipe_closeallbut(pipes)
101

    
102

    
103
# Run CoverageBySample on merge.bam
104

    
105
pipe_setup(pipes, 'stats_log')
106
pipe_setup(pipes, 'stats_out')
107
if 0 == named_fork(children, 'GATK'):
108
    pipe_closeallbut(pipes,
109
                     ('stats_log', 'w'),
110
                     ('stats_out', 'w'))
111
    arvados_gatk2.run(
112
        args=[
113
            '-T', 'CoverageBySample',
114
            '-R', ref_fasta_files[0],
115
            '-I', 'merge.bam',
116
            '-o', '/dev/fd/' + str(pipes['stats_out', 'w']),
117
            '--log_to_file', '/dev/fd/' + str(pipes['stats_log', 'w']),
118
            ]
119
        + regions_args,
120
        close_fds=False)
121
    pipe_closeallbut(pipes)
122
    os._exit(0)
123
pipe_closeallbut(pipes, ('stats_log', 'r'), ('stats_out', 'r'))
124

    
125

    
126
# Start two threads to read from CoverageBySample pipes
127

    
128
class ExceptionPropagatingThread(threading.Thread):
129
    """
130
    If a subclassed thread calls _raise(e) in run(), running join() on
131
    the thread will raise e in the thread that calls join().
132
    """
133
    def __init__(self, *args, **kwargs):
134
        super(ExceptionPropagatingThread, self).__init__(*args, **kwargs)
135
        self.__exception = None
136
    def join(self, *args, **kwargs):
137
        ret = super(ExceptionPropagatingThread, self).join(*args, **kwargs)
138
        if self.__exception:
139
            raise self.__exception
140
        return ret
141
    def _raise(self, exception):
142
        self.__exception = exception
143

    
144
class StatsLogReader(ExceptionPropagatingThread):
145
    def __init__(self, **kwargs):
146
        super(StatsLogReader, self).__init__()
147
        self.args = kwargs
148
    def run(self):
149
        try:
150
            for logline in self.args['infile']:
151
                x = re.search('Processing (\d+) bp from intervals', logline)
152
                if x:
153
                    self._total_bp = int(x.group(1))
154
        except Exception as e:
155
            self._raise(e)
156
    def total_bp(self):
157
        self.join()
158
        return self._total_bp
159
stats_log_thr = StatsLogReader(infile=os.fdopen(pipes.pop(('stats_log', 'r'))))
160
stats_log_thr.start()
161

    
162
class StatsOutReader(ExceptionPropagatingThread):
163
    """
164
    Read output of CoverageBySample and collect a histogram of
165
    coverage (last column) -> number of loci (number of rows).
166
    """
167
    def __init__(self, **kwargs):
168
        super(StatsOutReader, self).__init__()
169
        self.args = kwargs
170
    def run(self):
171
        try:
172
            hist = [0]
173
            histtot = 0
174
            for line in self.args['infile']:
175
                try:
176
                    i = int(string.split(line)[-1])
177
                except ValueError:
178
                    continue
179
                if i >= 1:
180
                    if len(hist) <= i:
181
                        hist.extend([0 for x in range(1+i-len(hist))])
182
                    hist[i] += 1
183
                    histtot += 1
184
            hist[0] = stats_log_thr.total_bp() - histtot
185
            self._histogram = hist
186
        except Exception as e:
187
            self._raise(e)
188
    def histogram(self):
189
        self.join()
190
        return self._histogram
191
stats_out_thr = StatsOutReader(infile=os.fdopen(pipes.pop(('stats_out', 'r'))))
192
stats_out_thr.start()
193

    
194

    
195
# Run UnifiedGenotyper on merge.bam
196

    
197
arvados_gatk2.run(
198
    args=[
199
        '-nt', arvados_gatk2.cpus_on_this_node(),
200
        '-T', 'UnifiedGenotyper',
201
        '-R', ref_fasta_files[0],
202
        '-I', 'merge.bam',
203
        '-o', os.path.join(tmpdir, 'out.vcf'),
204
        '--dbsnp', os.path.join(bundle_dir, 'dbsnp_137.b37.vcf'),
205
        '-metrics', 'UniGenMetrics',
206
        '-A', 'DepthOfCoverage',
207
        '-A', 'AlleleBalance',
208
        '-A', 'QualByDepth',
209
        '-A', 'HaplotypeScore',
210
        '-A', 'MappingQualityRankSumTest',
211
        '-A', 'ReadPosRankSumTest',
212
        '-A', 'FisherStrand',
213
        '-glm', 'both',
214
        ]
215
    + regions_args
216
    + arvados.getjobparam('GATK2_UnifiedGenotyper_args',[]))
217

    
218
# Copy the output VCF file to Keep
219

    
220
out = arvados.CollectionWriter()
221
out.start_new_stream()
222
out.start_new_file('out.vcf')
223
out.write(open(os.path.join(tmpdir, 'out.vcf'), 'rb'))
224

    
225

    
226
# Write statistics to Keep
227

    
228
out.start_new_file('mincoverage_nlocus.csv')
229
sofar = 0
230
hist = stats_out_thr.histogram()
231
total_bp = stats_log_thr.total_bp()
232
for i in range(len(hist)):
233
    out.write("%d,%d,%f\n" %
234
              (i,
235
               total_bp - sofar,
236
               100.0 * (total_bp - sofar) / total_bp))
237
    sofar += hist[i]
238

    
239
if waitpid_and_check_children(children):
240
    this_task.set_output(out.finish())
241
else:
242
    sys.exit(1)