Statistics
| Branch: | Tag: | Revision:

arvados / crunch_scripts / GATK2-bqsr @ master

History | View | Annotate | Download (2.87 KB)

1
#!/usr/bin/env python
2
# Copyright (C) The Arvados Authors. All rights reserved.
3
#
4
# SPDX-License-Identifier: Apache-2.0
5

    
6
import os
7
import re
8
import arvados
9
import arvados_gatk2
10
import arvados_samtools
11
from arvados_ipc import *
12

    
13
class InvalidArgumentError(Exception):
14
    pass
15

    
16
arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
17

    
18
this_job = arvados.current_job()
19
this_task = arvados.current_task()
20
tmpdir = arvados.current_task().tmpdir
21
arvados.util.clear_tmpdir()
22

    
23
known_sites_files = arvados.getjobparam(
24
    'known_sites',
25
    ['dbsnp_137.b37.vcf',
26
     'Mills_and_1000G_gold_standard.indels.b37.vcf',
27
     ])
28
bundle_dir = arvados.util.collection_extract(
29
    collection = this_job['script_parameters']['gatk_bundle'],
30
    files = [
31
        'human_g1k_v37.dict',
32
        'human_g1k_v37.fasta',
33
        'human_g1k_v37.fasta.fai'
34
        ] + known_sites_files + [v + '.idx' for v in known_sites_files],
35
    path = 'gatk_bundle')
36
ref_fasta_files = [os.path.join(bundle_dir, f)
37
                   for f in os.listdir(bundle_dir)
38
                   if re.search(r'\.fasta(\.gz)?$', f)]
39

    
40
input_collection = this_task['parameters']['input']
41
input_dir = arvados.util.collection_extract(
42
    collection = input_collection,
43
    path = os.path.join(this_task.tmpdir, 'input'))
44
input_bam_files = []
45
for f in arvados.util.listdir_recursive(input_dir):
46
    if re.search(r'\.bam$', f):
47
        input_stream_name, input_file_name = os.path.split(f)
48
        input_bam_files += [os.path.join(input_dir, f)]
49
if len(input_bam_files) != 1:
50
    raise InvalidArgumentError("Expected exactly one bam file per task.")
51

    
52
known_sites_args = []
53
for f in known_sites_files:
54
    known_sites_args += ['-knownSites', os.path.join(bundle_dir, f)]
55

    
56
recal_file = os.path.join(tmpdir, 'recal.csv')
57

    
58
children = {}
59
pipes = {}
60

    
61
arvados_gatk2.run(
62
    args=[
63
        '-nct', arvados_gatk2.cpus_on_this_node(),
64
        '-T', 'BaseRecalibrator',
65
        '-R', ref_fasta_files[0],
66
        '-I', input_bam_files[0],
67
        '-o', recal_file,
68
        ] + known_sites_args)
69

    
70
pipe_setup(pipes, 'BQSR')
71
if 0 == named_fork(children, 'BQSR'):
72
    pipe_closeallbut(pipes, ('BQSR', 'w'))
73
    arvados_gatk2.run(
74
        args=[
75
        '-T', 'PrintReads',
76
        '-R', ref_fasta_files[0],
77
        '-I', input_bam_files[0],
78
        '-o', '/dev/fd/' + str(pipes['BQSR','w']),
79
        '-BQSR', recal_file,
80
        '--disable_bam_indexing',
81
        ],
82
        close_fds=False)
83
    os._exit(0)
84
os.close(pipes.pop(('BQSR','w'), None))
85

    
86
out = arvados.CollectionWriter()
87
out.start_new_stream(input_stream_name)
88

    
89
out.start_new_file(input_file_name + '.recal.csv')
90
out.write(open(recal_file, 'rb'))
91

    
92
out.start_new_file(input_file_name)
93
while True:
94
    buf = os.read(pipes['BQSR','r'], 2**20)
95
    if len(buf) == 0:
96
        break
97
    out.write(buf)
98
pipe_closeallbut(pipes)
99

    
100
if waitpid_and_check_children(children):
101
    this_task.set_output(out.finish())
102
else:
103
    sys.exit(1)