Statistics
| Branch: | Revision:

arvados / crunch_scripts / picard-gatk2-prep @ master

History | View | Annotate | Download (6.6 KB)

1
#!/usr/bin/env python
2

    
3
import arvados
4
import os
5
import re
6
import sys
7
import subprocess
8
import arvados_picard
9
from arvados_ipc import *
10

    
11
arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
12

    
13
this_job = arvados.current_job()
14
this_task = arvados.current_task()
15
ref_dir = arvados.util.collection_extract(
16
    collection = this_job['script_parameters']['reference'],
17
    path = 'reference',
18
    decompress = True)
19
ref_fasta_files = [os.path.join(ref_dir, f)
20
                   for f in os.listdir(ref_dir)
21
                   if re.search(r'\.fasta(\.gz)?$', f)]
22
input_collection = this_task['parameters']['input']
23

    
24
for s in arvados.CollectionReader(input_collection).all_streams():
25
    for f in s.all_files():
26
        input_stream_name = s.name()
27
        input_file_name = f.name()
28
        break
29

    
30
# Unfortunately, picard FixMateInformation cannot read from a pipe. We
31
# must copy the input to a temporary file before running picard.
32
input_bam_path = os.path.join(this_task.tmpdir, input_file_name)
33
with open(input_bam_path, 'wb') as bam:
34
    for s in arvados.CollectionReader(input_collection).all_streams():
35
        for f in s.all_files():
36
            for s in f.readall():
37
                bam.write(s)
38

    
39
children = {}
40
pipes = {}
41

    
42
pipe_setup(pipes, 'fixmate')
43
if 0==named_fork(children, 'fixmate'):
44
    pipe_closeallbut(pipes, ('fixmate', 'w'))
45
    arvados_picard.run(
46
        'FixMateInformation',
47
        params={
48
            'i': input_bam_path,
49
            'o': '/dev/stdout',
50
            'quiet': 'true',
51
            'so': 'coordinate',
52
            'validation_stringency': 'LENIENT',
53
            'compression_level': 0
54
            },
55
        stdout=os.fdopen(pipes['fixmate','w'], 'wb', 2**20))
56
    os._exit(0)
57
os.close(pipes.pop(('fixmate','w'), None))
58

    
59
pipe_setup(pipes, 'sortsam')
60
if 0==named_fork(children, 'sortsam'):
61
    pipe_closeallbut(pipes, ('fixmate', 'r'), ('sortsam', 'w'))
62
    arvados_picard.run(
63
        'SortSam',
64
        params={
65
            'i': '/dev/stdin',
66
            'o': '/dev/stdout',
67
            'quiet': 'true',
68
            'so': 'coordinate',
69
            'validation_stringency': 'LENIENT',
70
            'compression_level': 0
71
            },
72
        stdin=os.fdopen(pipes['fixmate','r'], 'rb', 2**20),
73
        stdout=os.fdopen(pipes['sortsam','w'], 'wb', 2**20))
74
    os._exit(0)
75

    
76
pipe_setup(pipes, 'reordersam')
77
if 0==named_fork(children, 'reordersam'):
78
    pipe_closeallbut(pipes, ('sortsam', 'r'), ('reordersam', 'w'))
79
    arvados_picard.run(
80
        'ReorderSam',
81
        params={
82
            'i': '/dev/stdin',
83
            'o': '/dev/stdout',
84
            'reference': ref_fasta_files[0],
85
            'quiet': 'true',
86
            'validation_stringency': 'LENIENT',
87
            'compression_level': 0
88
            },
89
        stdin=os.fdopen(pipes['sortsam','r'], 'rb', 2**20),
90
        stdout=os.fdopen(pipes['reordersam','w'], 'wb', 2**20))
91
    os._exit(0)
92

    
93
pipe_setup(pipes, 'addrg')
94
if 0==named_fork(children, 'addrg'):
95
    pipe_closeallbut(pipes, ('reordersam', 'r'), ('addrg', 'w'))
96
    arvados_picard.run(
97
        'AddOrReplaceReadGroups',
98
        params={
99
            'i': '/dev/stdin',
100
            'o': '/dev/stdout',
101
            'quiet': 'true',
102
            'rglb': this_job['script_parameters'].get('rglb', 0),
103
            'rgpl': this_job['script_parameters'].get('rgpl', 'illumina'),
104
            'rgpu': this_job['script_parameters'].get('rgpu', 0),
105
            'rgsm': this_job['script_parameters'].get('rgsm', 0),
106
            'validation_stringency': 'LENIENT'
107
            },
108
        stdin=os.fdopen(pipes['reordersam','r'], 'rb', 2**20),
109
        stdout=os.fdopen(pipes['addrg','w'], 'wb', 2**20))
110
    os._exit(0)
111

    
112
pipe_setup(pipes, 'bammanifest')
113
pipe_setup(pipes, 'bam')
114
pipe_setup(pipes, 'casm_in')
115
if 0==named_fork(children, 'bammanifest'):
116
    pipe_closeallbut(pipes,
117
                     ('addrg', 'r'),
118
                     ('bammanifest', 'w'),
119
                     ('bam', 'w'),
120
                     ('casm_in', 'w'))
121
    out = arvados.CollectionWriter()
122
    out.start_new_stream(input_stream_name)
123
    out.start_new_file(input_file_name)
124
    while True:
125
        buf = os.read(pipes['addrg','r'], 2**20)
126
        if len(buf) == 0:
127
            break
128
        os.write(pipes['bam','w'], buf)
129
        os.write(pipes['casm_in','w'], buf)
130
        out.write(buf)
131
    os.write(pipes['bammanifest','w'], out.manifest_text())
132
    os.close(pipes['bammanifest','w'])
133
    os._exit(0)
134

    
135
pipe_setup(pipes, 'casm')
136
if 0 == named_fork(children, 'casm'):
137
    pipe_closeallbut(pipes, ('casm_in', 'r'), ('casm', 'w'))
138
    arvados_picard.run(
139
        'CollectAlignmentSummaryMetrics',
140
        params={
141
            'input': '/dev/fd/' + str(pipes['casm_in','r']),
142
            'output': '/dev/fd/' + str(pipes['casm','w']),
143
            'reference_sequence': ref_fasta_files[0],
144
            'validation_stringency': 'LENIENT',
145
            },
146
        close_fds=False)
147
    os._exit(0)
148

    
149
pipe_setup(pipes, 'index')
150
if 0==named_fork(children, 'index'):
151
    pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
152
    arvados_picard.run(
153
        'BuildBamIndex',
154
        params={
155
            'i': '/dev/stdin',
156
            'o': '/dev/stdout',
157
            'quiet': 'true',
158
            'validation_stringency': 'LENIENT'
159
            },
160
        stdin=os.fdopen(pipes['bam','r'], 'rb', 2**20),
161
        stdout=os.fdopen(pipes['index','w'], 'wb', 2**20))
162
    os._exit(0)
163

    
164
pipe_setup(pipes, 'indexmanifest')
165
if 0==named_fork(children, 'indexmanifest'):
166
    pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
167
    out = arvados.CollectionWriter()
168
    out.start_new_stream(input_stream_name)
169
    out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
170
    while True:
171
        buf = os.read(pipes['index','r'], 2**20)
172
        if len(buf) == 0:
173
            break
174
        out.write(buf)
175
    os.write(pipes['indexmanifest','w'], out.manifest_text())
176
    os.close(pipes['indexmanifest','w'])
177
    os._exit(0)
178

    
179
pipe_closeallbut(pipes,
180
                 ('bammanifest', 'r'),
181
                 ('indexmanifest', 'r'),
182
                 ('casm', 'r'))
183

    
184
outmanifest = ''
185

    
186
for which in ['bammanifest', 'indexmanifest']:
187
    with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
188
        while True:
189
            buf = f.read()
190
            if buf == '':
191
                break
192
            outmanifest += buf
193

    
194
casm_out = arvados.CollectionWriter()
195
casm_out.start_new_stream(input_stream_name)
196
casm_out.start_new_file(input_file_name + '.casm.tsv')
197
casm_out.write(os.fdopen(pipes.pop(('casm','r'))))
198

    
199
outmanifest += casm_out.manifest_text()
200

    
201
all_ok = True
202
for (childname, pid) in children.items():
203
    all_ok = all_ok and waitpid_and_check_exit(pid, childname)
204

    
205
if all_ok:
206
    this_task.set_output(outmanifest)
207
else:
208
    sys.exit(1)