Statistics
| Branch: | Revision:

arvados / crunch_scripts / GATK2-realign @ master

History | View | Annotate | Download (5.19 KB)

1
#!/usr/bin/env python
2
# Copyright (C) The Arvados Authors. All rights reserved.
3
#
4
# SPDX-License-Identifier: Apache-2.0
5

    
6
import os
7
import re
8
import arvados
9
import arvados_gatk2
10
import arvados_picard
11
import arvados_samtools
12
from arvados_ipc import *
13

    
14
class InvalidArgumentError(Exception):
15
    pass
16

    
17
arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
18

    
19
this_job = arvados.current_job()
20
this_task = arvados.current_task()
21
tmpdir = arvados.current_task().tmpdir
22
arvados.util.clear_tmpdir()
23

    
24
known_sites_files = arvados.getjobparam(
25
    'known_sites',
26
    ['dbsnp_137.b37.vcf',
27
     'Mills_and_1000G_gold_standard.indels.b37.vcf',
28
     ])
29
bundle_dir = arvados.util.collection_extract(
30
    collection = this_job['script_parameters']['gatk_bundle'],
31
    files = [
32
        'human_g1k_v37.dict',
33
        'human_g1k_v37.fasta',
34
        'human_g1k_v37.fasta.fai'
35
        ] + known_sites_files + [v + '.idx' for v in known_sites_files],
36
    path = 'gatk_bundle')
37
ref_fasta_files = [os.path.join(bundle_dir, f)
38
                   for f in os.listdir(bundle_dir)
39
                   if re.search(r'\.fasta(\.gz)?$', f)]
40
regions_args = []
41
if 'regions' in this_job['script_parameters']:
42
    regions_dir = arvados.util.collection_extract(
43
        collection = this_job['script_parameters']['regions'],
44
        path = 'regions')
45
    region_padding = int(this_job['script_parameters']['region_padding'])
46
    for f in os.listdir(regions_dir):
47
        if re.search(r'\.bed$', f):
48
            regions_args += [
49
                '--intervals', os.path.join(regions_dir, f),
50
                '--interval_padding', str(region_padding)
51
                ]
52

    
53
input_collection = this_task['parameters']['input']
54
input_dir = arvados.util.collection_extract(
55
    collection = input_collection,
56
    path = os.path.join(this_task.tmpdir, 'input'))
57
input_bam_files = []
58
for f in arvados.util.listdir_recursive(input_dir):
59
    if re.search(r'\.bam$', f):
60
        input_stream_name, input_file_name = os.path.split(f)
61
        input_bam_files += [os.path.join(input_dir, f)]
62
if len(input_bam_files) != 1:
63
    raise InvalidArgumentError("Expected exactly one bam file per task.")
64

    
65
known_sites_args = []
66
for f in known_sites_files:
67
    known_sites_args += ['-known', os.path.join(bundle_dir, f)]
68

    
69
children = {}
70
pipes = {}
71

    
72
arvados_gatk2.run(
73
    args=[
74
        '-nt', arvados_gatk2.cpus_per_task(),
75
        '-T', 'RealignerTargetCreator',
76
        '-R', ref_fasta_files[0],
77
        '-I', input_bam_files[0],
78
        '-o', os.path.join(tmpdir, 'intervals.list')
79
        ] + known_sites_args + regions_args)
80

    
81
pipe_setup(pipes, 'IndelRealigner')
82
if 0 == named_fork(children, 'IndelRealigner'):
83
    pipe_closeallbut(pipes, ('IndelRealigner', 'w'))
84
    arvados_gatk2.run(
85
        args=[
86
        '-T', 'IndelRealigner',
87
        '-R', ref_fasta_files[0],
88
        '-targetIntervals', os.path.join(tmpdir, 'intervals.list'),
89
        '-I', input_bam_files[0],
90
        '-o', '/dev/fd/' + str(pipes['IndelRealigner','w']),
91
        '--disable_bam_indexing',
92
        ] + known_sites_args + regions_args,
93
        close_fds=False)
94
    os._exit(0)
95
os.close(pipes.pop(('IndelRealigner','w'), None))
96

    
97
pipe_setup(pipes, 'bammanifest')
98
pipe_setup(pipes, 'bam')
99
if 0==named_fork(children, 'bammanifest'):
100
    pipe_closeallbut(pipes,
101
                     ('IndelRealigner', 'r'),
102
                     ('bammanifest', 'w'),
103
                     ('bam', 'w'))
104
    out = arvados.CollectionWriter()
105
    out.start_new_stream(input_stream_name)
106
    out.start_new_file(input_file_name)
107
    while True:
108
        buf = os.read(pipes['IndelRealigner','r'], 2**20)
109
        if len(buf) == 0:
110
            break
111
        os.write(pipes['bam','w'], buf)
112
        out.write(buf)
113
    os.write(pipes['bammanifest','w'], out.manifest_text())
114
    os.close(pipes['bammanifest','w'])
115
    os._exit(0)
116

    
117
pipe_setup(pipes, 'index')
118
if 0==named_fork(children, 'index'):
119
    pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
120
    arvados_picard.run(
121
        'BuildBamIndex',
122
        params={
123
            'i': '/dev/fd/' + str(pipes['bam','r']),
124
            'o': '/dev/fd/' + str(pipes['index','w']),
125
            'quiet': 'true',
126
            'validation_stringency': 'LENIENT'
127
            },
128
        close_fds=False)
129
    os._exit(0)
130

    
131
pipe_setup(pipes, 'indexmanifest')
132
if 0==named_fork(children, 'indexmanifest'):
133
    pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
134
    out = arvados.CollectionWriter()
135
    out.start_new_stream(input_stream_name)
136
    out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
137
    while True:
138
        buf = os.read(pipes['index','r'], 2**20)
139
        if len(buf) == 0:
140
            break
141
        out.write(buf)
142
    os.write(pipes['indexmanifest','w'], out.manifest_text())
143
    os.close(pipes['indexmanifest','w'])
144
    os._exit(0)
145

    
146
pipe_closeallbut(pipes, ('bammanifest', 'r'), ('indexmanifest', 'r'))
147
outmanifest = ''
148
for which in ['bammanifest', 'indexmanifest']:
149
    with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
150
        while True:
151
            buf = f.read()
152
            if buf == '':
153
                break
154
            outmanifest += buf
155

    
156
all_ok = True
157
for (childname, pid) in children.items():
158
    all_ok = all_ok and waitpid_and_check_exit(pid, childname)
159

    
160
if all_ok:
161
    this_task.set_output(outmanifest)
162
else:
163
    sys.exit(1)