Statistics
| Branch: | Revision:

arvados / crunch_scripts / picard-gatk2-prep @ master

History | View | Annotate | Download (6.7 KB)

1
#!/usr/bin/env python
2
# Copyright (C) The Arvados Authors. All rights reserved.
3
#
4
# SPDX-License-Identifier: Apache-2.0
5

    
6
import arvados
7
import os
8
import re
9
import sys
10
import subprocess
11
import arvados_picard
12
from arvados_ipc import *
13

    
14
arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
15

    
16
this_job = arvados.current_job()
17
this_task = arvados.current_task()
18
ref_dir = arvados.util.collection_extract(
19
    collection = this_job['script_parameters']['reference'],
20
    path = 'reference',
21
    decompress = True)
22
ref_fasta_files = [os.path.join(ref_dir, f)
23
                   for f in os.listdir(ref_dir)
24
                   if re.search(r'\.fasta(\.gz)?$', f)]
25
input_collection = this_task['parameters']['input']
26

    
27
for s in arvados.CollectionReader(input_collection).all_streams():
28
    for f in s.all_files():
29
        input_stream_name = s.name()
30
        input_file_name = f.name()
31
        break
32

    
33
# Unfortunately, picard FixMateInformation cannot read from a pipe. We
34
# must copy the input to a temporary file before running picard.
35
input_bam_path = os.path.join(this_task.tmpdir, input_file_name)
36
with open(input_bam_path, 'wb') as bam:
37
    for s in arvados.CollectionReader(input_collection).all_streams():
38
        for f in s.all_files():
39
            for s in f.readall():
40
                bam.write(s)
41

    
42
children = {}
43
pipes = {}
44

    
45
pipe_setup(pipes, 'fixmate')
46
if 0==named_fork(children, 'fixmate'):
47
    pipe_closeallbut(pipes, ('fixmate', 'w'))
48
    arvados_picard.run(
49
        'FixMateInformation',
50
        params={
51
            'i': input_bam_path,
52
            'o': '/dev/stdout',
53
            'quiet': 'true',
54
            'so': 'coordinate',
55
            'validation_stringency': 'LENIENT',
56
            'compression_level': 0
57
            },
58
        stdout=os.fdopen(pipes['fixmate','w'], 'wb', 2**20))
59
    os._exit(0)
60
os.close(pipes.pop(('fixmate','w'), None))
61

    
62
pipe_setup(pipes, 'sortsam')
63
if 0==named_fork(children, 'sortsam'):
64
    pipe_closeallbut(pipes, ('fixmate', 'r'), ('sortsam', 'w'))
65
    arvados_picard.run(
66
        'SortSam',
67
        params={
68
            'i': '/dev/stdin',
69
            'o': '/dev/stdout',
70
            'quiet': 'true',
71
            'so': 'coordinate',
72
            'validation_stringency': 'LENIENT',
73
            'compression_level': 0
74
            },
75
        stdin=os.fdopen(pipes['fixmate','r'], 'rb', 2**20),
76
        stdout=os.fdopen(pipes['sortsam','w'], 'wb', 2**20))
77
    os._exit(0)
78

    
79
pipe_setup(pipes, 'reordersam')
80
if 0==named_fork(children, 'reordersam'):
81
    pipe_closeallbut(pipes, ('sortsam', 'r'), ('reordersam', 'w'))
82
    arvados_picard.run(
83
        'ReorderSam',
84
        params={
85
            'i': '/dev/stdin',
86
            'o': '/dev/stdout',
87
            'reference': ref_fasta_files[0],
88
            'quiet': 'true',
89
            'validation_stringency': 'LENIENT',
90
            'compression_level': 0
91
            },
92
        stdin=os.fdopen(pipes['sortsam','r'], 'rb', 2**20),
93
        stdout=os.fdopen(pipes['reordersam','w'], 'wb', 2**20))
94
    os._exit(0)
95

    
96
pipe_setup(pipes, 'addrg')
97
if 0==named_fork(children, 'addrg'):
98
    pipe_closeallbut(pipes, ('reordersam', 'r'), ('addrg', 'w'))
99
    arvados_picard.run(
100
        'AddOrReplaceReadGroups',
101
        params={
102
            'i': '/dev/stdin',
103
            'o': '/dev/stdout',
104
            'quiet': 'true',
105
            'rglb': this_job['script_parameters'].get('rglb', 0),
106
            'rgpl': this_job['script_parameters'].get('rgpl', 'illumina'),
107
            'rgpu': this_job['script_parameters'].get('rgpu', 0),
108
            'rgsm': this_job['script_parameters'].get('rgsm', 0),
109
            'validation_stringency': 'LENIENT'
110
            },
111
        stdin=os.fdopen(pipes['reordersam','r'], 'rb', 2**20),
112
        stdout=os.fdopen(pipes['addrg','w'], 'wb', 2**20))
113
    os._exit(0)
114

    
115
pipe_setup(pipes, 'bammanifest')
116
pipe_setup(pipes, 'bam')
117
pipe_setup(pipes, 'casm_in')
118
if 0==named_fork(children, 'bammanifest'):
119
    pipe_closeallbut(pipes,
120
                     ('addrg', 'r'),
121
                     ('bammanifest', 'w'),
122
                     ('bam', 'w'),
123
                     ('casm_in', 'w'))
124
    out = arvados.CollectionWriter()
125
    out.start_new_stream(input_stream_name)
126
    out.start_new_file(input_file_name)
127
    while True:
128
        buf = os.read(pipes['addrg','r'], 2**20)
129
        if len(buf) == 0:
130
            break
131
        os.write(pipes['bam','w'], buf)
132
        os.write(pipes['casm_in','w'], buf)
133
        out.write(buf)
134
    os.write(pipes['bammanifest','w'], out.manifest_text())
135
    os.close(pipes['bammanifest','w'])
136
    os._exit(0)
137

    
138
pipe_setup(pipes, 'casm')
139
if 0 == named_fork(children, 'casm'):
140
    pipe_closeallbut(pipes, ('casm_in', 'r'), ('casm', 'w'))
141
    arvados_picard.run(
142
        'CollectAlignmentSummaryMetrics',
143
        params={
144
            'input': '/dev/fd/' + str(pipes['casm_in','r']),
145
            'output': '/dev/fd/' + str(pipes['casm','w']),
146
            'reference_sequence': ref_fasta_files[0],
147
            'validation_stringency': 'LENIENT',
148
            },
149
        close_fds=False)
150
    os._exit(0)
151

    
152
pipe_setup(pipes, 'index')
153
if 0==named_fork(children, 'index'):
154
    pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
155
    arvados_picard.run(
156
        'BuildBamIndex',
157
        params={
158
            'i': '/dev/stdin',
159
            'o': '/dev/stdout',
160
            'quiet': 'true',
161
            'validation_stringency': 'LENIENT'
162
            },
163
        stdin=os.fdopen(pipes['bam','r'], 'rb', 2**20),
164
        stdout=os.fdopen(pipes['index','w'], 'wb', 2**20))
165
    os._exit(0)
166

    
167
pipe_setup(pipes, 'indexmanifest')
168
if 0==named_fork(children, 'indexmanifest'):
169
    pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
170
    out = arvados.CollectionWriter()
171
    out.start_new_stream(input_stream_name)
172
    out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
173
    while True:
174
        buf = os.read(pipes['index','r'], 2**20)
175
        if len(buf) == 0:
176
            break
177
        out.write(buf)
178
    os.write(pipes['indexmanifest','w'], out.manifest_text())
179
    os.close(pipes['indexmanifest','w'])
180
    os._exit(0)
181

    
182
pipe_closeallbut(pipes,
183
                 ('bammanifest', 'r'),
184
                 ('indexmanifest', 'r'),
185
                 ('casm', 'r'))
186

    
187
outmanifest = ''
188

    
189
for which in ['bammanifest', 'indexmanifest']:
190
    with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
191
        while True:
192
            buf = f.read()
193
            if buf == '':
194
                break
195
            outmanifest += buf
196

    
197
casm_out = arvados.CollectionWriter()
198
casm_out.start_new_stream(input_stream_name)
199
casm_out.start_new_file(input_file_name + '.casm.tsv')
200
casm_out.write(os.fdopen(pipes.pop(('casm','r'))))
201

    
202
outmanifest += casm_out.manifest_text()
203

    
204
all_ok = True
205
for (childname, pid) in children.items():
206
    all_ok = all_ok and waitpid_and_check_exit(pid, childname)
207

    
208
if all_ok:
209
    this_task.set_output(outmanifest)
210
else:
211
    sys.exit(1)