Statistics
| Branch: | Revision:

arvados / crunch_scripts / bwa-aln @ master

History | View | Annotate | Download (3.84 KB)

1
#!/usr/bin/env python
2

    
3
import arvados
4
import arvados_bwa
5
import arvados_samtools
6
import os
7
import re
8
import sys
9
import subprocess
10

    
11
arvados_bwa.one_task_per_pair_input_file(if_sequence=0, and_end_task=True)
12

    
13
this_job = arvados.current_job()
14
this_task = arvados.current_task()
15
ref_dir = arvados.util.collection_extract(
16
    collection = this_job['script_parameters']['reference_index'],
17
    path = 'reference',
18
    decompress = False)
19

    
20
ref_basename = None
21
for f in os.listdir(ref_dir):
22
    basename = re.sub(r'\.bwt$', '', f)
23
    if basename != f:
24
        ref_basename = os.path.join(ref_dir, basename)
25
if ref_basename == None:
26
    raise Exception("Could not find *.bwt in reference collection.")
27

    
28
tmp_dir = arvados.current_task().tmpdir
29

    
30
class Aligner:
31
    def input_filename(self):
32
        for s in arvados.CollectionReader(self.collection).all_streams():
33
            for f in s.all_files():
34
                return f.decompressed_name()
35
    def generate_input(self):
36
        for s in arvados.CollectionReader(self.collection).all_streams():
37
            for f in s.all_files():
38
                for s in f.readall_decompressed():
39
                    yield s
40
    def aln(self, input_param):
41
        self.collection = this_task['parameters'][input_param]
42
        reads_filename = os.path.join(tmp_dir, self.input_filename())
43
        aln_filename = os.path.join(tmp_dir, self.input_filename() + '.sai')
44
        reads_pipe_r, reads_pipe_w = os.pipe()
45
        if os.fork() == 0:
46
            os.close(reads_pipe_r)
47
            reads_file = open(reads_filename, 'wb')
48
            for s in self.generate_input():
49
                if len(s) != os.write(reads_pipe_w, s):
50
                    raise Exception("short write")
51
                reads_file.write(s)
52
            reads_file.close()
53
            os.close(reads_pipe_w)
54
            sys.exit(0)
55
        os.close(reads_pipe_w)
56

    
57
        aln_file = open(aln_filename, 'wb')
58
        bwa_proc = subprocess.Popen(
59
            [arvados_bwa.bwa_binary(),
60
             'aln', '-t', '16',
61
             ref_basename,
62
             '-'],
63
            stdin=os.fdopen(reads_pipe_r, 'rb', 2**20),
64
            stdout=aln_file)
65
        aln_file.close()
66
        return reads_filename, aln_filename
67

    
68
reads_1, alignments_1 = Aligner().aln('input_1')
69
reads_2, alignments_2 = Aligner().aln('input_2')
70
pid1, exit1 = os.wait()
71
pid2, exit2 = os.wait()
72
if exit1 != 0 or exit2 != 0:
73
    raise Exception("bwa aln exited non-zero (0x%x, 0x%x)" % (exit1, exit2))
74

    
75
# output alignments in sam format to pipe
76
sam_pipe_r, sam_pipe_w = os.pipe()
77
sam_pid = os.fork()
78
if sam_pid != 0:
79
    # parent
80
    os.close(sam_pipe_w)
81
else:
82
    # child
83
    os.close(sam_pipe_r)
84
    arvados_bwa.run('sampe',
85
                    [ref_basename,
86
                     alignments_1, alignments_2,
87
                     reads_1, reads_2],
88
                    stdout=os.fdopen(sam_pipe_w, 'wb', 2**20))
89
    sys.exit(0)
90

    
91
# convert sam (sam_pipe_r) to bam (bam_pipe_w)
92
bam_pipe_r, bam_pipe_w = os.pipe()
93
bam_pid = os.fork()
94
if bam_pid != 0:
95
    # parent
96
    os.close(bam_pipe_w)
97
    os.close(sam_pipe_r)
98
else:
99
    # child
100
    os.close(bam_pipe_r)
101
    arvados_samtools.run('view',
102
                         ['-S', '-b',
103
                          '-'],
104
                         stdin=os.fdopen(sam_pipe_r, 'rb', 2**20),
105
                         stdout=os.fdopen(bam_pipe_w, 'wb', 2**20))
106
    sys.exit(0)
107

    
108
# copy bam (bam_pipe_r) to Keep
109
out_bam_filename = os.path.split(reads_1)[-1] + '.bam'
110
out = arvados.CollectionWriter()
111
out.start_new_stream()
112
out.start_new_file(out_bam_filename)
113
out.write(os.fdopen(bam_pipe_r, 'rb', 2**20))
114

    
115
# make sure everyone exited nicely
116
pid3, exit3 = os.waitpid(sam_pid, 0)
117
if exit3 != 0:
118
    raise Exception("bwa sampe exited non-zero (0x%x)" % exit3)
119
pid4, exit4 = os.waitpid(bam_pid, 0)
120
if exit4 != 0:
121
    raise Exception("samtools view exited non-zero (0x%x)" % exit4)
122

    
123
# proclaim success
124
this_task.set_output(out.finish())