Statistics
| Branch: | Tag: | Revision:

arvados / crunch_scripts / bwa-aln @ master

History | View | Annotate | Download (3.94 KB)

1
#!/usr/bin/env python
2
# Copyright (C) The Arvados Authors. All rights reserved.
3
#
4
# SPDX-License-Identifier: Apache-2.0
5

    
6
import arvados
7
import arvados_bwa
8
import arvados_samtools
9
import os
10
import re
11
import sys
12
import subprocess
13

    
14
arvados_bwa.one_task_per_pair_input_file(if_sequence=0, and_end_task=True)
15

    
16
this_job = arvados.current_job()
17
this_task = arvados.current_task()
18
ref_dir = arvados.util.collection_extract(
19
    collection = this_job['script_parameters']['reference_index'],
20
    path = 'reference',
21
    decompress = False)
22

    
23
ref_basename = None
24
for f in os.listdir(ref_dir):
25
    basename = re.sub(r'\.bwt$', '', f)
26
    if basename != f:
27
        ref_basename = os.path.join(ref_dir, basename)
28
if ref_basename == None:
29
    raise Exception("Could not find *.bwt in reference collection.")
30

    
31
tmp_dir = arvados.current_task().tmpdir
32

    
33
class Aligner:
34
    def input_filename(self):
35
        for s in arvados.CollectionReader(self.collection).all_streams():
36
            for f in s.all_files():
37
                return f.decompressed_name()
38
    def generate_input(self):
39
        for s in arvados.CollectionReader(self.collection).all_streams():
40
            for f in s.all_files():
41
                for s in f.readall_decompressed():
42
                    yield s
43
    def aln(self, input_param):
44
        self.collection = this_task['parameters'][input_param]
45
        reads_filename = os.path.join(tmp_dir, self.input_filename())
46
        aln_filename = os.path.join(tmp_dir, self.input_filename() + '.sai')
47
        reads_pipe_r, reads_pipe_w = os.pipe()
48
        if os.fork() == 0:
49
            os.close(reads_pipe_r)
50
            reads_file = open(reads_filename, 'wb')
51
            for s in self.generate_input():
52
                if len(s) != os.write(reads_pipe_w, s):
53
                    raise Exception("short write")
54
                reads_file.write(s)
55
            reads_file.close()
56
            os.close(reads_pipe_w)
57
            sys.exit(0)
58
        os.close(reads_pipe_w)
59

    
60
        aln_file = open(aln_filename, 'wb')
61
        bwa_proc = subprocess.Popen(
62
            [arvados_bwa.bwa_binary(),
63
             'aln', '-t', '16',
64
             ref_basename,
65
             '-'],
66
            stdin=os.fdopen(reads_pipe_r, 'rb', 2**20),
67
            stdout=aln_file)
68
        aln_file.close()
69
        return reads_filename, aln_filename
70

    
71
reads_1, alignments_1 = Aligner().aln('input_1')
72
reads_2, alignments_2 = Aligner().aln('input_2')
73
pid1, exit1 = os.wait()
74
pid2, exit2 = os.wait()
75
if exit1 != 0 or exit2 != 0:
76
    raise Exception("bwa aln exited non-zero (0x%x, 0x%x)" % (exit1, exit2))
77

    
78
# output alignments in sam format to pipe
79
sam_pipe_r, sam_pipe_w = os.pipe()
80
sam_pid = os.fork()
81
if sam_pid != 0:
82
    # parent
83
    os.close(sam_pipe_w)
84
else:
85
    # child
86
    os.close(sam_pipe_r)
87
    arvados_bwa.run('sampe',
88
                    [ref_basename,
89
                     alignments_1, alignments_2,
90
                     reads_1, reads_2],
91
                    stdout=os.fdopen(sam_pipe_w, 'wb', 2**20))
92
    sys.exit(0)
93

    
94
# convert sam (sam_pipe_r) to bam (bam_pipe_w)
95
bam_pipe_r, bam_pipe_w = os.pipe()
96
bam_pid = os.fork()
97
if bam_pid != 0:
98
    # parent
99
    os.close(bam_pipe_w)
100
    os.close(sam_pipe_r)
101
else:
102
    # child
103
    os.close(bam_pipe_r)
104
    arvados_samtools.run('view',
105
                         ['-S', '-b',
106
                          '-'],
107
                         stdin=os.fdopen(sam_pipe_r, 'rb', 2**20),
108
                         stdout=os.fdopen(bam_pipe_w, 'wb', 2**20))
109
    sys.exit(0)
110

    
111
# copy bam (bam_pipe_r) to Keep
112
out_bam_filename = os.path.split(reads_1)[-1] + '.bam'
113
out = arvados.CollectionWriter()
114
out.start_new_stream()
115
out.start_new_file(out_bam_filename)
116
out.write(os.fdopen(bam_pipe_r, 'rb', 2**20))
117

    
118
# make sure everyone exited nicely
119
pid3, exit3 = os.waitpid(sam_pid, 0)
120
if exit3 != 0:
121
    raise Exception("bwa sampe exited non-zero (0x%x)" % exit3)
122
pid4, exit4 = os.waitpid(bam_pid, 0)
123
if exit4 != 0:
124
    raise Exception("samtools view exited non-zero (0x%x)" % exit4)
125

    
126
# proclaim success
127
this_task.set_output(out.finish())